Dubbo中@Service工作过程解析

Dubbo中@Service工作过程解析

Dubbo @Service注解的加载和启动过程

Spring中的BeanPostProcessor

首先我们应当了解到在spring体系中BeanPostProcessor是什么、加载流程

它是什么

BeanPostProcessor也也称为后置处理器。在spring容加载流程。

spring容器bean加载流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Prepare this context for refreshing.
prepareRefresh();

// 获取beanFactory并加载容器中定义的bean信息
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();

// Prepare the bean factory for use in this context.
prepareBeanFactory(beanFactory);

try {
// 归类bean工厂的后置处理器
postProcessBeanFactory(beanFactory);

// 处理这些bean工厂的后置处理器
invokeBeanFactoryPostProcessors(beanFactory);

// 调用所有实现了beanpostprocessor接口的类。(先加载实现了priority接口的,然后加载order的,最后加载剩余的)
registerBeanPostProcessors(beanFactory);

// Initialize message source for this context.
initMessageSource();

// Initialize event multicaster for this context.
initApplicationEventMulticaster();

// Initialize other special beans in specific context subclasses.
onRefresh();

// Check for listener beans and register them.
registerListeners();

// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);

// Last step: publish corresponding event.
finishRefresh();
}

后置处理器的加载和工作

所以由上可以看出,在registerBeanPostProcessors这一步的时候会划分容器中各种后置处理器,首先归类有@Priority注解,其次归类有@Order注解。最后划分其它的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 1
sortPostProcessors(priorityOrderedPostProcessors, beanFactory);
registerBeanPostProcessors(beanFactory, priorityOrderedPostProcessors);

// 2
sortPostProcessors(orderedPostProcessors, beanFactory);
registerBeanPostProcessors(beanFactory, orderedPostProcessors);

// 3
registerBeanPostProcessors(beanFactory, nonOrderedPostProcessors);

// Finally, re-register all internal BeanPostProcessors.
sortPostProcessors(internalPostProcessors, beanFactory);
registerBeanPostProcessors(beanFactory, internalPostProcessors);

这样,就将所有的后置处理器注册到容器中。后续在容器启动的过程中,会通过反射的方式调用各个实现了BeanPostProcessor的实现类的beforexxxxafterxxx的方法来做进一步的处理。

有了以上基础,就可以去dubbo包中找到各自的xxxBeanPostProcessor了。
dubbo中提供的各个beanpostprocessor

dubbo中的ServiceAnnotationBeanPostProcessor

类继承关系

类图

作用

BeanDefinitionRegistryPostProcessor继承自BeanFactoryPostProcessor,是一种比较特殊的BeanFactoryPostProcessor。BeanDefinitionRegistryPostProcessor中定义的postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry)方法 可以让我们实现自定义的注册bean定义的逻辑。

从以上论述可以看出实现BeanDefinitionRegistryPostProcessor的作用就是向spring容器中注册响应的bean实例。

具体分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
// 读取配置中声明的dubbo扫描类
Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);

if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
// 将加有@Service注解的类注册到spring容器中
registerServiceBeans(resolvedPackagesToScan, registry);
} else {
if (logger.isWarnEnabled()) {
logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
}
}

}

下面就分析registerServiceBeans的处理过程。

  1. 扫描系统配置的 basePackages,将@Service注解的类放到一个set集合中。
  2. 找到所有标注@Service的类是否被扫面到。
  3. registerServiceBean循环遍历这个集合,并将它们注入到spring容器中。

ServiceBean的作用

类图继承关系

ServiceBean的类图继承关系
主要看它实现了InitializingBean。通过实现它来对bean初始化之后做一定操作(调用afterPropertiesSet())。

代码实现

  1. ServiceBean初始化过程。如下所示:

    1
    2
    3
    4
    5
    6
    7
    @SuppressWarnings({"unchecked", "deprecation"})
    public void afterPropertiesSet() throws Exception {
    // 进行大量的操作,来读取对应的@Service组成相应的对象信息.....
    .....
    // 最后一步,导出服务。
    export()
    }
  2. export()导出服务

    最后得到一系列的URL信息(形如注册在zk上的provider节点信息)

1
export() -> doExportUrlsFor1Protocol()
  1. 获取暴露的host和端口

    1
    2
    3
    4
    // 获取部署主机信息
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    // 获取配置的端口信息。(默认20880,)
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
  2. 导出有两步

    1. exportLocal
    2. exportJVM
  3. 根据第3步启动相关的服务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
    String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
    if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
    if (logger.isWarnEnabled()) {
    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
    "], has set stubproxy support event ,but no stub methods founded."));
    }

    } else {
    stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
    }
    }
    // 根据url配置host、port信息启动服务(默认使用netty)
    openServer(url);
    optimizeSerialization(url);

    return exporter;
    }

    netty服务启动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
     private ExchangeServer createServer(URL url) {
    url = URLBuilder.from(url)
    // send readonly event when server closes, it's enabled by default
    .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
    // enable heartbeat by default
    .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
    .addParameter(CODEC_KEY, DubboCodec.NAME)
    .build();
    String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
    throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    }

    ExchangeServer server;
    try {
    server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
    throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    str = url.getParameter(CLIENT_KEY);
    if (str != null && str.length() > 0) {
    Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
    if (!supportedTypes.contains(str)) {
    throw new RpcException("Unsupported client type: " + str);
    }
    }

    return server;
    }