上一篇部落格中MyRpc框架實現了基本的對等rpc通訊功能。而在這篇部落格中我們需要實現MyRpc的叢集間rpc通訊功能。
上篇部落格的對等rpc通訊實現中,使用者端和伺服器端的ip地址和埠都是固定設定死的。而通常為了提升服務總負載,使用者端和伺服器端都是以叢集的方式部署的(水平拓展),使用者端和伺服器端的節點都不止1個。
叢集條件下出現了很多新的問題需要解決:
MyRpc目前支援使用zookeeper作為註冊中心。
zookeeper作為一個高效能的分散式協調器,儲存的資料以ZNode節點樹的形式存在。ZNode節點有兩種屬性,有序/無序,持久/臨時。
MyRpc的zookeeper節點結構圖
註冊中心介面
/**
* 註冊中心的抽象
* */
public interface Registry {
/**
* 服務註冊
* */
void doRegistry(ServiceInfo serviceInfo);
/**
* 服務發現
* */
List<ServiceInfo> discovery(String serviceName);
}
zookeeper註冊中心實現(原始的zk使用者端)
/**
* 簡易的zk註冊中心(原始的zk使用者端很多地方都需要使用者去處理異常,但為了更簡單的展示zk註冊中心的使用,基本上沒有處理這些異常情況)
* */
public class ZookeeperRegistry implements Registry{
private static final Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
private final ZooKeeper zooKeeper;
private final ConcurrentHashMap<String,List<ServiceInfo>> serviceInfoCacheMap = new ConcurrentHashMap<>();
public ZookeeperRegistry(String zkServerAddress) {
try {
this.zooKeeper = new ZooKeeper(zkServerAddress,2000, event -> {});
// 確保root節點是一定存在的
createPersistentNode(MyRpcRegistryConstants.BASE_PATH);
} catch (Exception e) {
throw new MyRpcException("init zkClient error",e);
}
}
@Override
public void doRegistry(ServiceInfo serviceInfo) {
// 先建立永久的服務名節點
createServiceNameNode(serviceInfo.getServiceName());
// 再建立臨時的providerInfo節點
createProviderInfoNode(serviceInfo);
}
@Override
public List<ServiceInfo> discovery(String serviceName) {
return serviceInfoCacheMap.computeIfAbsent(serviceName,(key)-> findProviderInfoList(serviceName));
}
private String getServiceNameNodePath(String serviceName){
return MyRpcRegistryConstants.BASE_PATH + "/" + serviceName;
}
// ================================ zk工具方法 ==================================
private void createServiceNameNode(String serviceName){
try {
String serviceNameNodePath = getServiceNameNodePath(serviceName);
// 服務名節點是永久節點
createPersistentNode(serviceNameNodePath);
logger.info("createServiceNameNode success! serviceNameNodePath={}",serviceNameNodePath);
} catch (Exception e) {
throw new MyRpcException("createServiceNameNode error",e);
}
}
private void createProviderInfoNode(ServiceInfo serviceInfo){
try {
String serviceNameNodePath = getServiceNameNodePath(serviceInfo.getServiceName());
// 子節點用一個uuid做path防重複
String providerInfoNodePath = serviceNameNodePath + "/" + UUID.randomUUID();
String providerInfoJsonStr = JsonUtil.obj2Str(serviceInfo);
// providerInfo節點是臨時節點(如果節點宕機了,zk的連線斷開一段時間後,臨時節點會被自動刪除)
zooKeeper.create(providerInfoNodePath, providerInfoJsonStr.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
logger.info("createProviderInfoNode success! path={}",providerInfoNodePath);
} catch (Exception e) {
throw new MyRpcException("createProviderInfoNode error",e);
}
}
private void createPersistentNode(String path){
try {
if (zooKeeper.exists(path, false) == null) {
// 服務名節點是永久節點
zooKeeper.create(path, "".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}catch (Exception e){
throw new MyRpcException("createPersistentNode error",e);
}
}
private List<ServiceInfo> findProviderInfoList(String serviceName){
String serviceNameNodePath = getServiceNameNodePath(serviceName);
List<ServiceInfo> serviceInfoList = new ArrayList<>();
try {
List<String> providerInfoPathList = zooKeeper.getChildren(serviceNameNodePath, new ZookeeperListener(serviceNameNodePath));
for(String providerInfoPath : providerInfoPathList){
try{
String fullProviderInfoPath = serviceNameNodePath + "/" + providerInfoPath;
byte[] data = zooKeeper.getData(fullProviderInfoPath,false,null);
String jsonStr = new String(data,StandardCharsets.UTF_8);
ServiceInfo serviceInfo = JsonUtil.json2Obj(jsonStr,ServiceInfo.class);
serviceInfoList.add(serviceInfo);
}catch (Exception e){
logger.error("findProviderInfoList getData error",e);
}
}
logger.info("findProviderInfoList={}",JsonUtil.obj2Str(serviceInfoList));
return serviceInfoList;
} catch (Exception e) {
throw new MyRpcException("findProviderInfoList error",e);
}
}
private class ZookeeperListener implements Watcher{
private final String path;
private final String serviceName;
public ZookeeperListener(String serviceName) {
this.path = getServiceNameNodePath(serviceName);
this.serviceName = serviceName;
}
@Override
public void process(WatchedEvent event) {
logger.info("ZookeeperListener process! path={}",path);
try {
// 重新整理快取
List<ServiceInfo> serviceInfoList = findProviderInfoList(path);
serviceInfoCacheMap.put(serviceName,serviceInfoList);
} catch (Exception e) {
logger.error("ZookeeperListener getChildren error! path={}",path,e);
}
}
}
}
zookeeper註冊中心實現(curator使用者端,通過自動重試等操作解決了原生使用者端的一些坑)
public class ZkCuratorRegistry implements Registry {
private static final Logger logger = LoggerFactory.getLogger(ZkCuratorRegistry.class);
private CuratorFramework curatorZkClient;
private final ConcurrentHashMap<String, List<ServiceInfo>> serviceInfoCacheMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, PathChildrenCache> nodeCacheMap = new ConcurrentHashMap<>();
public ZkCuratorRegistry(String zkServerAddress) {
try {
this.curatorZkClient = CuratorFrameworkFactory.newClient(zkServerAddress, new ExponentialBackoffRetry(3000, 1));
this.curatorZkClient.start();
} catch (Exception e) {
throw new MyRpcException("init zkClient error", e);
}
}
@Override
public void doRegistry(ServiceInfo serviceInfo) {
// 先建立永久的服務名節點
createServiceNameNode(serviceInfo.getServiceName());
// 再建立臨時的providerInfo節點
createProviderInfoNode(serviceInfo);
}
@Override
public List<ServiceInfo> discovery(String serviceName) {
return this.serviceInfoCacheMap.computeIfAbsent(serviceName,(key)->{
List<ServiceInfo> serviceInfoList = findProviderInfoList(serviceName);
// 建立對子節點的監聽
String serviceNodePath = getServiceNameNodePath(serviceName);
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorZkClient, serviceNodePath, true);
try {
pathChildrenCache.start();
nodeCacheMap.put(serviceName,pathChildrenCache);
pathChildrenCache.getListenable().addListener(new ZkCuratorListener(serviceName));
} catch (Exception e) {
throw new MyRpcException("PathChildrenCache start error!",e);
}
return serviceInfoList;
});
}
private void createServiceNameNode(String serviceName) {
try {
String serviceNameNodePath = getServiceNameNodePath(serviceName);
// 服務名節點是永久節點
if (curatorZkClient.checkExists().forPath(serviceNameNodePath) == null) {
curatorZkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(serviceNameNodePath);
}
logger.info("createServiceNameNode success! serviceNameNodePath={}", serviceNameNodePath);
} catch (Exception e) {
throw new MyRpcException("createServiceNameNode error", e);
}
}
private void createProviderInfoNode(ServiceInfo serviceInfo) {
try {
String serviceNameNodePath = getServiceNameNodePath(serviceInfo.getServiceName());
// 子節點用一個uuid做path防重複
String providerInfoNodePath = serviceNameNodePath + "/" + UUID.randomUUID();
String providerInfoJsonStr = JsonUtil.obj2Str(serviceInfo);
// providerInfo節點是臨時節點(如果節點宕機了,zk的連線斷開一段時間後,臨時節點會被自動刪除)
curatorZkClient.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(providerInfoNodePath, providerInfoJsonStr.getBytes(StandardCharsets.UTF_8));
logger.info("createProviderInfoNode success! path={}", providerInfoNodePath);
} catch (Exception e) {
throw new MyRpcException("createProviderInfoNode error", e);
}
}
private String getServiceNameNodePath(String serviceName) {
return MyRpcRegistryConstants.BASE_PATH + "/" + serviceName;
}
private List<ServiceInfo> findProviderInfoList(String serviceName) {
String serviceNameNodePath = getServiceNameNodePath(serviceName);
try {
List<String> providerInfoPathList = curatorZkClient.getChildren().forPath(serviceNameNodePath);
List<ServiceInfo> serviceInfoList = new ArrayList<>();
for(String providerInfoPath : providerInfoPathList){
try{
String fullProviderInfoPath = serviceNameNodePath + "/" + providerInfoPath;
byte[] data = curatorZkClient.getData().forPath(fullProviderInfoPath);
String jsonStr = new String(data,StandardCharsets.UTF_8);
ServiceInfo serviceInfo = JsonUtil.json2Obj(jsonStr,ServiceInfo.class);
serviceInfoList.add(serviceInfo);
}catch (Exception e){
logger.error("findProviderInfoList getData error",e);
}
}
logger.info("findProviderInfoList={}",JsonUtil.obj2Str(serviceInfoList));
return serviceInfoList;
} catch (Exception e) {
throw new MyRpcException("findProviderInfoList error",e);
}
}
private class ZkCuratorListener implements PathChildrenCacheListener {
private final String serviceName;
public ZkCuratorListener(String serviceName) {
this.serviceName = serviceName;
}
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
logger.info("ZookeeperListener process! serviceName={}",serviceName);
try {
// 重新整理快取
List<ServiceInfo> serviceInfoList = findProviderInfoList(serviceName);
serviceInfoCacheMap.put(serviceName,serviceInfoList);
} catch (Exception e) {
logger.error("ZookeeperListener getChildren error! serviceName={}",serviceName,e);
}
}
}
}
現在使用者端已經能通過服務發現得到實時的provider集合了,那麼使用者端發起請求時應該如何決定向哪個provider發起請求以實現provider側的負載均衡呢?
負載均衡介面
/**
* 負載均衡選擇器
* */
public interface LoadBalance {
ServiceInfo select(List<ServiceInfo> serviceInfoList);
}
隨機負載均衡
/**
* 無權重,純隨機的負載均衡選擇器
* */
public class RandomLoadBalance implements LoadBalance{
@Override
public ServiceInfo select(List<ServiceInfo> serviceInfoList) {
int selectedIndex = ThreadLocalRandom.current().nextInt(serviceInfoList.size());
return serviceInfoList.get(selectedIndex);
}
}
/**
* 無權重的輪訓負載均衡(後續增加帶權重的輪訓)
* */
public class SimpleRoundRobinBalance implements LoadBalance{
private final AtomicInteger count = new AtomicInteger();
@Override
public ServiceInfo select(List<ServiceInfo> serviceInfoList) {
if(serviceInfoList.isEmpty()){
throw new MyRpcException("serviceInfoList is empty!");
}
// 考慮一下溢位,取絕對值
int selectedIndex = Math.abs(count.getAndIncrement());
return serviceInfoList.get(selectedIndex % serviceInfoList.size());
}
}
由於需要通過網路發起rpc呼叫,比起本地呼叫很容易因為網路波動、遠端機器故障等原因而導致呼叫失敗。
使用者端有時希望能通過重試等方式遮蔽掉可能出現的偶發錯誤,儘可能的保證rpc請求的成功率,最好rpc框架能解決這個問題。但另一方面,能夠安全重試的基礎是下游服務能夠做到冪等,否則重複的請求會帶來意想不到的後果,而不冪等的下游服務只能至多呼叫一次。
因此rpc框架需要能允許使用者不同的服務可以有不同的叢集服務呼叫方式,這樣冪等的服務可以設定成可自動重試N次的failover呼叫、只能呼叫1次的fast-fail呼叫或者廣播呼叫等等方式。
MyRpc的Invoker介面用於抽象上述的不同叢集呼叫方式,並簡單的實現了failover和fast-fail等多種呼叫方式(參考dubbo)。
public interface InvokerCallable {
RpcResponse invoke(NettyClient nettyClient);
}
/**
* 不同的叢集呼叫方式
* */
public interface Invoker {
RpcResponse invoke(InvokerCallable callable, String serviceName,
Registry registry, LoadBalance loadBalance);
}
/**
* 快速失敗,無論成功與否呼叫1次就返回
* */
public class FastFailInvoker implements Invoker {
private static final Logger logger = LoggerFactory.getLogger(FastFailInvoker.class);
@Override
public RpcResponse invoke(InvokerCallable callable, String serviceName,
Registry registry, LoadBalance loadBalance) {
List<ServiceInfo> serviceInfoList = registry.discovery(serviceName);
logger.debug("serviceInfoList.size={},serviceInfoList={}",serviceInfoList.size(), JsonUtil.obj2Str(serviceInfoList));
NettyClient nettyClient = InvokerUtil.getTargetClient(serviceInfoList,loadBalance);
logger.info("ClientDynamicProxy getTargetClient={}", nettyClient);
// fast-fail,簡單的呼叫一次就行,有錯誤就直接向上拋
return callable.invoke(nettyClient);
}
}
/**
* 故障轉移呼叫(如果呼叫出現了錯誤,則重試指定次數)
* 1 如果重試過程中成功了,則快讀返回
* 2 如果重試了指定次數後還是沒成功,則丟擲異常
* */
public class FailoverInvoker implements Invoker {
private static final Logger logger = LoggerFactory.getLogger(FailoverInvoker.class);
private final int defaultRetryCount = 2;
private final int retryCount;
public FailoverInvoker() {
this.retryCount = defaultRetryCount;
}
public FailoverInvoker(int retryCount) {
this.retryCount = Math.max(retryCount,1);
}
@Override
public RpcResponse invoke(InvokerCallable callable, String serviceName, Registry registry, LoadBalance loadBalance) {
MyRpcException myRpcException = null;
for(int i=0; i<retryCount; i++){
List<ServiceInfo> serviceInfoList = registry.discovery(serviceName);
logger.debug("serviceInfoList.size={},serviceInfoList={}",serviceInfoList.size(), JsonUtil.obj2Str(serviceInfoList));
NettyClient nettyClient = InvokerUtil.getTargetClient(serviceInfoList,loadBalance);
logger.info("ClientDynamicProxy getTargetClient={}", nettyClient);
try {
RpcResponse rpcResponse = callable.invoke(nettyClient);
if(myRpcException != null){
// 雖然最終重試成功了,但是之前請求失敗過
logger.warn("FailRetryInvoker finally success, but there have been failed providers");
}
return rpcResponse;
}catch (Exception e){
myRpcException = new MyRpcException(e);
logger.warn("FailRetryInvoker callable.invoke error",e);
}
}
// 走到這裡說明經過了retryCount次重試依然不成功,myRpcException一定不為null
throw myRpcException;
}
}
/**
* 使用者端動態代理
* */
public class ClientDynamicProxy implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(ClientDynamicProxy.class);
private final Registry registry;
private final LoadBalance loadBalance;
private final Invoker invoker;
public ClientDynamicProxy(Registry registry, LoadBalance loadBalance, Invoker invoker) {
this.registry = registry;
this.loadBalance = loadBalance;
this.invoker = invoker;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Tuple<Object,Boolean> localMethodResult = processLocalMethod(proxy,method,args);
if(localMethodResult.getRight()){
// right為true,代表是本地方法,返回toString等物件自帶方法的執行結果,不發起rpc呼叫
return localMethodResult.getLeft();
}
logger.debug("ClientDynamicProxy before: methodName=" + method.getName());
String serviceName = method.getDeclaringClass().getName();
// 構造請求和協定頭
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setInterfaceName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterClasses(method.getParameterTypes());
rpcRequest.setParams(args);
MessageHeader messageHeader = new MessageHeader();
messageHeader.setMessageFlag(MessageFlagEnums.REQUEST.getCode());
messageHeader.setTwoWayFlag(false);
messageHeader.setEventFlag(true);
messageHeader.setSerializeType(GlobalConfig.messageSerializeType.getCode());
messageHeader.setResponseStatus((byte)'a');
messageHeader.setMessageId(rpcRequest.getMessageId());
logger.debug("ClientDynamicProxy rpcRequest={}", JsonUtil.obj2Str(rpcRequest));
RpcResponse rpcResponse = this.invoker.invoke((nettyClient)->{
Channel channel = nettyClient.getChannel();
// 將netty的非同步轉為同步,參考dubbo DefaultFuture
DefaultFuture<RpcResponse> newDefaultFuture = DefaultFutureManager.createNewFuture(channel,rpcRequest);
try {
nettyClient.send(new MessageProtocol<>(messageHeader,rpcRequest));
// 呼叫方阻塞在這裡
return newDefaultFuture.get();
} catch (Exception e) {
throw new MyRpcException("InvokerCallable error!",e);
}
},serviceName,registry,loadBalance);
logger.debug("ClientDynamicProxy defaultFuture.get() rpcResponse={}",rpcResponse);
return processRpcResponse(rpcResponse);
}
/**
* 處理本地方法
* @return tuple.right 標識是否是本地方法, true是
* */
private Tuple<Object,Boolean> processLocalMethod(Object proxy, Method method, Object[] args) throws Exception {
// 處理toString等物件自帶方法,不發起rpc呼叫
if (method.getDeclaringClass() == Object.class) {
return new Tuple<>(method.invoke(proxy, args),true);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return new Tuple<>(proxy.toString(),true);
} else if ("hashCode".equals(methodName)) {
return new Tuple<>(proxy.hashCode(),true);
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return new Tuple<>(proxy.equals(args[0]),true);
}
// 返回null標識非本地方法,需要進行rpc呼叫
return new Tuple<>(null,false);
}
private Object processRpcResponse(RpcResponse rpcResponse){
if(rpcResponse.getExceptionValue() == null){
// 沒有異常,return正常的返回值
return rpcResponse.getReturnValue();
}else{
// 有異常,往外丟擲去
throw new MyRpcRemotingException(rpcResponse.getExceptionValue());
}
}
}
使用者端發起請求後,可能由於網路原因,可能由於伺服器端負載過大等原因而遲遲無法收到回覆。
出於效能或者自身業務的考慮,使用者端不能無限制的等待下去,因此rpc框架需要能允許使用者端設定請求的超時時間。在一定的時間內如果無法收到響應則需要丟擲超時異常,令呼叫者及時的感知到問題。
在使用者端側DefaultFuture.get方法,指定超時時間是可以做到這一點的。
但其依賴底層作業系統的定時任務機制,雖然超時時間的精度很高(nanos級別),但在高並行場景下效能不如時間輪。
具體原理可以參考我之前的部落格:時間輪TimeWheel工作原理解析
MyRpc參考dubbo,引入時間輪來實現使用者端設定請求超時時間的功能。
public class DefaultFutureManager {
private static final Logger logger = LoggerFactory.getLogger(DefaultFutureManager.class);
public static final Map<Long,DefaultFuture> DEFAULT_FUTURE_CACHE = new ConcurrentHashMap<>();
public static final HashedWheelTimer TIMER = new HashedWheelTimer();
public static void received(RpcResponse rpcResponse){
Long messageId = rpcResponse.getMessageId();
logger.debug("received rpcResponse={},DEFAULT_FUTURE_CACHE={}",rpcResponse,DEFAULT_FUTURE_CACHE);
DefaultFuture defaultFuture = DEFAULT_FUTURE_CACHE.remove(messageId);
if(defaultFuture != null){
logger.debug("remove defaultFuture success");
if(rpcResponse.getExceptionValue() != null){
// 例外處理
defaultFuture.completeExceptionally(rpcResponse.getExceptionValue());
}else{
// 正常返回
defaultFuture.complete(rpcResponse);
}
}else{
// 可能超時了,接到響應前已經remove掉了這個future(超時和實際接到請求都會呼叫received方法)
logger.debug("remove defaultFuture fail");
}
}
public static DefaultFuture createNewFuture(Channel channel, RpcRequest rpcRequest){
DefaultFuture defaultFuture = new DefaultFuture(channel,rpcRequest);
// 增加超時處理的邏輯
newTimeoutCheck(defaultFuture);
return defaultFuture;
}
public static DefaultFuture getFuture(long messageId){
return DEFAULT_FUTURE_CACHE.get(messageId);
}
/**
* 增加請求超時的檢查任務
* */
public static void newTimeoutCheck(DefaultFuture defaultFuture){
TimeoutCheckTask timeoutCheckTask = new TimeoutCheckTask(defaultFuture.getMessageId());
TIMER.newTimeout(timeoutCheckTask, defaultFuture.getTimeout(), TimeUnit.MILLISECONDS);
}
}
public class TimeoutCheckTask implements TimerTask {
private final long messageId;
public TimeoutCheckTask(long messageId) {
this.messageId = messageId;
}
@Override
public void run(Timeout timeout) {
DefaultFuture defaultFuture = DefaultFutureManager.getFuture(this.messageId);
if(defaultFuture == null || defaultFuture.isDone()){
// 請求已經在超時前返回,處理過了,直接返回即可
return;
}
// 構造超時的響應
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setMessageId(this.messageId);
rpcResponse.setExceptionValue(new MyRpcTimeoutException(
"request timeout:" + defaultFuture.getTimeout() + " channel=" + defaultFuture.getChannel()));
DefaultFutureManager.received(rpcResponse);
}
}