在分散式服務中,原來的單體服務會被拆分成一個個微服務,服務註冊範例到註冊中心,服務消費者通過註冊中心獲取範例列表,直接請求呼叫服務。
服務是如何註冊到註冊中心,服務如果掛了,服務是如何檢測?帶著這些問題,我們從原始碼上對服務註冊進行簡單的原始碼分析。
Nacos Server:2.1.1
spring-cloud-starter-alibaba:2.1.1.RELEASE
spring-boot:2.1.1.RELEASE
方便統一版本,使用者端和伺服器端版本號都為
2.1.1
。
啟動nacos
服務註冊和發現需要新增maven
依賴:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>${latest.version}</version>
</dependency>
根據maven
依賴找到對應的spring.factories
檔案:
在spring.factories
檔案裡找到啟動設定類資訊,SpringBoot
服務啟動時會將這些設定類資訊注入到bean
容器中。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
服務註冊的核心設定類為:NacosDiscoveryAutoConfiguration
,該類設定三個bean
物件:
NacosServiceRegistry
NacosRegistration
NacosAutoServiceRegistration
NacosAutoServiceRegistration
繼承了抽象類AbstractAutoServiceRegistration
。AbstractAutoServiceRegistration
抽象類又實現了ApplicationListener
介面。
實現ApplicationListener
介面的方法,會在Spring
容器初始化完成之後呼叫onApplicationEvent
方法:
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
呼叫bind
方法:
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
// 呼叫 start 方法
this.start();
}
呼叫了start
方法:
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
呼叫了register
方法,最終呼叫的是NacosServiceRegistry
類的register
方法。
根據上文可知,伺服器啟動後呼叫NacosServiceRegistry
類的register
方法,該方法實現將範例註冊到伺服器端:
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
// 建立範例
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// 註冊範例
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
}
}
建立範例,然後通過namingService.registerInstance
方法註冊範例,然後檢視registerInstance
方法:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
// 封裝心跳包
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
// 傳送心跳包
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
// 傳送範例
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
registerInstance
主要做兩件事:
beatReactor.addBeatInfo
使用定時服務,每隔5s
向伺服器端傳送一次心跳請求,通過http
請求傳送心跳資訊,路徑為/v1/ns/instance/beat
。
心跳請求定時任務使用執行緒池ScheduledThreadPoolExecutor.schedule()
,而該方法只會呼叫一次,定時任務的實現是在每次請求任務只會再呼叫一次ScheduledThreadPoolExecutor.schedule()
,
簡單說就是nacos
在傳送心跳的時候,會呼叫schedule
方法,在schedule
要執行的任務中,如果正常傳送完心跳,會再次呼叫schedule
方法。
那為什麼不直接呼叫週期執行的執行緒池ScheduledThreadPoolExecutor.scheduleAtFixedRate()
?可能是由於傳送心跳服務發生異常後,定時任務還會繼續執行,但是週期執行的執行緒池遇到報錯後也不會重複呼叫執行的任務。
執行緒任務
BeatTask
的run
方法,,每次執行會先判斷isStopped
,如果是false
,說明心跳停止,就不會觸發下次執行任務。如果使用定時任務scheduleAtFixedRate
,即使心跳停止還會繼續執行任務,造成資源不必要浪費。
registerService
主要封裝範例資訊,比如ip
、port
、servicename
,將這些資訊通過http
請求傳送給伺服器端。路徑為/v1/ns/instance
。
根據上面流程,檢視以下的流程圖:
伺服器端就是註冊中心,服務註冊到註冊中心,在https://github.com/alibaba/nacos/releases/tag/2.1.1
下載原始碼部署到本地,方便調式和檢視,部署方式詳見我的另外一篇文章Nacos 原始碼環境搭建。
伺服器端主要接收兩個資訊:心跳包和範例資訊。
使用者端向服務請求的路徑為/v1/ns/instance/beat
,對應的伺服器端為InstanceController
類的beat
方法:
@PutMapping("/beat")
@Secured(action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
// 判斷是否有心跳,存在心跳就轉成RsInfo
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
serviceName, namespaceId);
// 獲取範例資訊
BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
builder.setRequest(request);
int resultCode = getInstanceOperator()
.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
result.put(CommonParams.CODE, resultCode);
// 下次傳送心跳包間隔
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
在handleBeat
方法中執行執行緒任務ClientBeatProcessorV2
的run
方法,延長lastHeartBeatTime
時間。註冊中心會定時查詢範例,當前時間 - lastHeartBeatTime
> 設定時間(預設15秒),就標記範例為不健康範例。如果心跳範例不健康,傳送通知給訂閱方,變更範例。
伺服器端在
15
秒沒有收到心跳包會將範例設定為不健康,在30
秒沒有收到心跳包會將臨時範例移除掉。
使用者端請求的地址是/nacos/v1/ns/instance
, 對應的是伺服器端是在InstanceController
類。找到類上對應的post
請求方法上。
註冊流程:
InstanceController#register ——>InstanceOperatorClientImpl#registerInstance ——>ClientOperationServiceProxy#registerInstance ——>EphemeralClientOperationServiceImpl#registerInstance
服務註冊後,將服務儲存在一個雙層map
集合中:
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
通過是否存在ephemeral
,true
,走AP
模式,否則走CP
模式。
Nacos
預設就是採用的AP
模式使用Distro
協定實現。實現的介面是EphemeralConsistencyService
對節點資訊的持久化主要是呼叫put
方法,
會先寫入到DataStore
中:
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 資料持久化到快取中
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.CHANGE);
}
NacosDiscoveryAutoConfiguration
,主要設定三個物件
NacosServiceRegistry
NacosRegistration
NacosAutoServiceRegistration
NacosServiceRegistry
類的register
方法,封裝範例和心跳資訊
http
請求,定時傳送傳送心跳包,預設時間間隔是5
秒。http
請求,傳送範例資訊。15
秒沒有收到心跳包會將範例設為不健康,在30
秒沒有收到心跳包會將臨時範例移除掉。ephemeral
判斷是否走AP
還是走CP
,AP
模式使用Distro
協定。通過呼叫EphemeralConsistencyService
介面實現,持久化範例資訊。