闽公网安备 35020302035485号
Spring Cloud Config:Spring Cloud 生态组件,与 Spring Cloud 体系无缝整合。

3.Apollo客户端从配置中心拉取最新的配置、更新本地配置并通知到应用
// guava中的Multimap,一个key可以保持多个value
private final Multimap<String, DeferredResult<String>> deferredResultMap = HashMultimap.create();
// 超时时间 10s
private long DEFAULT_LONG_POLLING_TIMEOUT = 10 * 1000;
/**
* 堆代码 duidaima.com
* 模拟监听namespace配置
*/
@GetMapping("/listen")
public DeferredResult<String> pollNotification(@RequestParam("namespace") String namespace) {
// 创建DeferredResult对象,设置超时时间和超时返回对象
DeferredResult<String> result = new DeferredResult<>(DEFAULT_LONG_POLLING_TIMEOUT, "timeout");
result.onTimeout(() -> log.info("timeout"));
result.onCompletion(() -> {
log.info("completion");
deferredResultMap.remove(namespace, result);
});
deferredResultMap.put(namespace, result);
return result;
}
/**
* 模拟发布namespace配置
*/
@GetMapping(value = "/publish")
public Object publishConfig(@RequestParam("namespace") String namespace, @RequestParam("context") String context) {
if (deferredResultMap.containsKey(namespace)) {
Collection<DeferredResult<String>> deferredResults = deferredResultMap.get(namespace);
for (DeferredResult<String> deferredResult : deferredResults) {
deferredResult.setResult(context);
}
}
return "success";
}
首先,我们肯定会维护一个全局的 deferredResultMap 哈希表记录所有长轮询的请求,例子中我们是采用 namespace 做为key。请求 /listen 接口,相当于模拟应用长轮询 Apollo 配置中心的场景,监听配置变更的通知。然后再调用提供的一个模拟发布配置的接口,触发 DeferredResult 的 setResult() 方法,就相当于唤醒异步任务线程,返回结果。
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
// 堆代码 duidaima.com
// 省略
.......
try {
// 1.随机获取一个服务端的配置DTO
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}
// 2.封装请求apollo服务端的url和参数(包括 notificationId)
url = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
transaction.addData("Url", url);
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpClient.doGet(request, m_responseType);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200 && response.getBody() != null) {
// 3.更新notificationId
updateNotifications(response.getBody());
updateRemoteNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
notify(lastServiceDto, response.getBody());
}
// 省略
.......
}
}
在注释的步骤2 中,方法 assembleLongPollRefreshUrl 会把上一次获取到的 notificationId 作为长轮询接口的参数。