apollo 配置中心源码解读

apollo配置中心事件通知:ApplicationEventPublisher

点击发布的时候触发一下操作:

1
portal: '/apps/:appId/envs/:env/clusters/:clusterName/namespaces/:namespaceName/releases’
1
portal:http://localhost:8070/apps/demo-apollo-test/envs/DEV/clusters/default/namespaces/application/releases

admin-service:apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases

Apollo配置中心源码解读,启动过程,配置拉取……

Apollo配置中心监听器

1. apollo的核心代码分享

  • SpringApplication启动的关键步骤

  • 在SpringApplication中,会加载所有实现了Init方法的类

1
2
3
4
5
6
7
8
protected void applyInitializers(ConfigurableApplicationContext context) {
for (ApplicationContextInitializer initializer : getInitializers()) {
Class<?> requiredType = GenericTypeResolver.resolveTypeArgument(
initializer.getClass(), ApplicationContextInitializer.class);
Assert.isInstanceOf(requiredType, context, "Unable to call initializer.");
initializer.initialize(context);
}
}
  • 通过上述步骤,Apollo自己实现的ApplicationContextInitializer也就 被加载到容器中了。具体的加载流程如下:

    1
    2
    3
    1.initialize->
    2.initializeSystemProperty(environment) 读取项目中Apollo相关的配置文件,在首次读取的时候都是为空的,配置文件还没有加载进来;如果读到了相关配置,就会将配置信息放到容器的环境变量中。
    3.
1
2
3
4
5
6
7
8
9
10
11
12
CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
//循环遍历项目配置的namespace,
for (String namespace : namespaceList) {
//1.调用trySync(),来同步apollo的和本地缓存的配置信息
//2.将这些配置信息转换为应用的全局property
Config config = ConfigService.getConfig(namespace);

composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
}

environment.getPropertySources().addFirst(composite);
}

2.Apollo启动一览

2.1 ApolloApplicationContextInitializer的作用

定义apollo的容器启动的时候具体的工作事项

1
2
ApolloApplicationContextInitializer implements
ApplicationContextInitializer<ConfigurableApplicationContext>

容器启动的时候调用init方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

@Override
public void initialize(ConfigurableApplicationContext context) {
ConfigurableEnvironment environment = context.getEnvironment();

------
//关键步骤
for (String namespace : namespaceList) {
//关键步骤:

Config config = ConfigService.getConfig(namespace);
/*
1.调用ConfigService.getService
public static Config getConfig(String namespace) {
return s_instance.getManager().getConfig(namespace);
}
2.DefaultConfigManager.getConfig
if (config == null) {
ConfigFactory factory = m_factoryManager.getFactory(namespace);
config = factory.create(namespace);
m_configs.put(namespace, config);
}
3.DefaultConfigFactory.create(String namespace)
DefaultConfig defaultConfig =
new DefaultConfig(namespace, createLocalConfigRepository(namespace));
4.createLocalConfigRepository-->new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
5.调用 LocalFileConfigRepository的构造方法 --> RemoteConfigRepository
6.调用RemoteConfigRepository构造方法
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
gson = new Gson();
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
}

*/
composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
}

environment.getPropertySources().addFirst(composite);
}

终上,在容器启动的时候,会调用RemoteConfigRepository的构造方法,而实现配置中心的同步主要是调用trySync,schedulePeriodicRefresh,scheduleLongPollingRefresh这个三个方法来实现配置的实时同步

2.2trySync()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
protected boolean trySync() {
try {
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}
//实际调用
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");

try {
//从缓存中获取,如果有的话,启动的时候previos唯恐
ApolloConfig previous = m_configCache.get();
//获取当前的配置文件
ApolloConfig current = loadApolloConfig();
//比较两者是否有差异,
if (previous != current) {
logger.debug("Remote Config refreshed!");
//如果缓存的配置信息与当前查数据库获取到的信息不同,那么就将从数据库中获取到的配置信息放到缓存中。这样在程序启动的时候,configCache就完成了初始化
m_configCache.set(current);
this.fireRepositoryChange(m_namespace, this.getConfig());
}
----
}

//如果两者有差异,就触发此操作
protected void fireRepositoryChange(String namespace, Properties newProperties) {
for (RepositoryChangeListener listener : m_listeners) {
try {
//如果两者有差异,那么刷新缓存配置,并且将重写本地的缓存文件
listener.onRepositoryChange(namespace, newProperties);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
}
}
}
2.3 schedulePeriodicRefresh

开启多线程,调用 trySync();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
logger.debug("refresh config for namespace: {}", m_namespace);
trySync();
Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshIntervalTimeUnit());
}
2.4 scheduleLongPollingRefresh
1
2
3
4
5
6
7
8
9
10
11
12

private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
startLongPolling();
}
return added;
}

整个apollo配置中心的逻辑就是这样,简单的说就是无线循环的去获取配置信息,当获取到的配置信息与上次获取到的不同那么就刷新容器缓存的配置项并且更新客户端缓存的配置信息。

3. 注解ApolloConfigChangeListener分析

3.1@ApolloConfigChangeListener实现原理

Apollo配置中心有声明一个后置处理器,所以在程序启动的时候,spring容器会自动加载这个PostProcessor。

类图如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
*
*/
public abstract class ApolloProcessor implements BeanPostProcessor, PriorityOrdered {

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
Class clazz = bean.getClass();
for (Field field : findAllField(clazz)) {
processField(bean, beanName, field);
}
for (Method method : findAllMethod(clazz)) {
processMethod(bean, beanName, method);
}
return bean;
}

由ApolloProcessor的具体实现可以看到,在postProcessBeforeInitialization(后置处理器生成之前,会调用子类的processField、processMethod方法)。就是说在ApolloProcessor构造后置处理器之前,会调用ApolloAnnotationProcessor的processMethod

ApolloAnnotationProcessor的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

@Override
protected void processMethod(final Object bean, String beanName, final Method method) {
//判断方法上是否加上ApolloConfigChangeListener注解
ApolloConfigChangeListener annotation = AnnotationUtils
.findAnnotation(method, ApolloConfigChangeListener.class);
if (annotation == null) {
return;
}
Class<?>[] parameterTypes = method.getParameterTypes();
Preconditions.checkArgument(parameterTypes.length == 1,
"Invalid number of parameters: %s for method: %s, should be 1", parameterTypes.length,
method);
Preconditions.checkArgument(ConfigChangeEvent.class.isAssignableFrom(parameterTypes[0]),
"Invalid parameter type: %s for method: %s, should be ConfigChangeEvent", parameterTypes[0],
method);
//将 标有注解ApolloConfigChangeListener的方法设为公有的
ReflectionUtils.makeAccessible(method);
//ApolloConfigChangeListener注解上是否加上指定的namespace,如果没有的话,默认使用的namespace为application
String[] namespaces = annotation.value();
String[] annotatedInterestedKeys = annotation.interestedKeys();
Set<String> interestedKeys = annotatedInterestedKeys.length > 0 ? Sets.newHashSet(annotatedInterestedKeys) : null;
ConfigChangeListener configChangeListener = new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
ReflectionUtils.invokeMethod(method, bean, changeEvent);
}
};

for (String namespace : namespaces) {
Config config = ConfigService.getConfig(namespace);

if (interestedKeys == null) {
config.addChangeListener(configChangeListener);
} else {
config.addChangeListener(configChangeListener, interestedKeys);
}
}
}
1
2
3
4
RemoteConfigLongPollService
doLongPollingRefresh
notify(lastServiceDto, response.getBody()); //通知同步更新
调用sync()比较配置文件是否发生改变,变化就同步更新

在配置文件发生变动的时候,调用顺序就跟第一大节说的顺序一致。

4 实际使用

4.1配置多个环境列表(一个portal管理多个环境的配置)

在启动portal的时候需要添加参数来指定某个环境对应的注册中心是什么。如下:

在启动Portal的时候,当点击的是dev也签,调用的注册中心是dev_meta;

1
2
3
4
5
6
7
-Dapollo_profile=github,auth
-Dspring.datasource.url=jdbc:mysql://yun1:3306/ApolloPortalDB?characterEncoding=utf8
-Dspring.datasource.username=root
-Dspring.datasource.password=Blue123!
-Ddev_meta=http://localhost:8080
-Dfat_meta=http://yun2:8080
-Dserver.port=8070

**在apollo中,可以支持多个环境列表的,通过阅读源码可以知道;在portal模块启动的时候,Apollo会将PortalDB库中的ServerConfig表中的数据添加到运行变量中去,其中就有环境列表的信息,这里需要手动加上去,并且用逗号隔开,添加的值也只能是它规定的那几个值。代码如下:

  • 获取表中的数据并将它们设置到环境变量中
1
2
3
4
5
6
7
8
9
10
public List<Env> portalSupportedEnvs() {
String[] configurations = getArrayProperty("apollo.portal.envs", new String[]{"FAT", "UAT", "PRO"});
List<Env> envs = Lists.newLinkedList();

for (String env : configurations) {
envs.add(Env.fromString(env));
}

return envs;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public PortalDBPropertySource() {
super("DBConfig", Maps.newConcurrentMap());
}

//将PortalDB.ServerConfig中的表数据全部放入到运行变量中
@Override
protected void refresh() {
Iterable<ServerConfig> dbConfigs = serverConfigRepository.findAll();

for (ServerConfig config: dbConfigs) {
String key = config.getKey();
Object value = config.getValue();

if (this.source.isEmpty()) {
logger.info("Load config from DB : {} = {}", key, value);
} else if (!Objects.equals(this.source.get(key), value)) {
logger.info("Load config from DB : {} = {}. Old value = {}", key,
value, this.source.get(key));
}

this.source.put(key, value);
}
}
4.2 指定运行环境
  • 1.在默认路径 /opt/settings/server.properties中指定代码的运行时环境。在项目启动的时候,会找到classpath路径下面的 apollo-env.properties,由它来指定具体的环境与注册中心的对应关系。这样,就不需要添加-Dapollo.mata这个变量了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
MetaDomainConsts
static {
Properties prop = new Properties();
prop = ResourceUtils.readConfigFile("apollo-env.properties", prop);
Properties env = System.getProperties();
domains.put(Env.LOCAL,
env.getProperty("local_meta", prop.getProperty("local.meta", DEFAULT_META_URL)));
domains.put(Env.DEV,
env.getProperty("dev_meta", prop.getProperty("dev.meta", DEFAULT_META_URL)));
domains.put(Env.FAT,
env.getProperty("fat_meta", prop.getProperty("fat.meta", DEFAULT_META_URL)));
domains.put(Env.UAT,
env.getProperty("uat_meta", prop.getProperty("uat.meta", DEFAULT_META_URL)));
domains.put(Env.LPT,
env.getProperty("lpt_meta", prop.getProperty("lpt.meta", DEFAULT_META_URL)));
domains.put(Env.PRO,
env.getProperty("pro_meta", prop.getProperty("pro.meta", DEFAULT_META_URL)));
}