在上一篇文章中,完成了ZooKeeper註冊中心,新增了一個簡單的本地快取
但是,存在一些問題:
之前我們是將快取直接放在ZooKeeperClientUtils中的,維護一個Map集合。我們將快取部分移動到ZooKeeperClientCache中,快取資料從這裡獲取:
我們監聽樹上所有節點的變化情況,對於包含範例的變化,每次獲取對應的服務資訊,然後通過Clinet查詢現存的對應服務的範例,進行更新。
watchPathSet維護了Client呼叫過的服務集合,對於呼叫過的服務才開啟原生的快取,並且進行更新。
instances即為本地快取集合
@Slf4j
public class ZookeeperClientCache {
private static final Map<String, List<InetSocketAddress>> instances=new ConcurrentHashMap<>();
private static final Set<String> watchPathSet=new ConcurrentHashSet<>();
private static CuratorFramework zookeeperClient;
private static boolean isListening=false;
//將服務加入監聽set中
public static void addListenService(String service){
//開啟服務監聽
openListen();
//path路徑放入
watchPathSet.add(ZookeeperUtil.serviceName2Path(service));
}
//新增本地快取,同時開啟監聽服務
public static void addLocalCache(String serviceName,List<InetSocketAddress> addressList){
//直接替換原本的快取
instances.put(serviceName,addressList);
//將服務加入監聽set
addListenService(serviceName);
}
public static void cleanLocalCache(String serviceName){
log.info("服務呼叫失敗,清除本地快取,重新獲取範例===>{}",serviceName);
instances.remove(serviceName);
}
public static boolean containsKey(String serviceName){
return instances.containsKey(serviceName);
}
public static List<InetSocketAddress> getOrDefault(String serviceName){
return instances.getOrDefault(serviceName,null);
}
public static List<InetSocketAddress> getInstances(String serviceName){
try {
String path = ZookeeperUtil.serviceName2Path(serviceName);
//獲取路徑下所有的實現
List<String> instancePaths = zookeeperClient.getChildren().forPath(path);
List<InetSocketAddress> addressList = new ArrayList<>();
for (String instancePath : instancePaths) {
byte[] bytes = zookeeperClient.getData().forPath(path+"/"+instancePath);
String json = new String(bytes);
InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
addressList.add(instance);
}
return addressList;
} catch (Exception e) {
log.error("服務獲取失敗====>{}",e);
throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
}
}
private static synchronized void openListen(){
//已初始化過
if (isListening){
return;
}
//注入client
if (zookeeperClient==null) {
zookeeperClient=ZookeeperUtil.getZookeeperClient();
}
TreeCache cache = TreeCache.newBuilder(zookeeperClient, "/cn/zko0/myRpc/api").setCacheData(true).build();
cache.getListenable().addListener((c, event) -> {
if ( event.getData() != null )
{
System.out.println("type=" + event.getType() + " path=" + event.getData().getPath());
//可以通過event.type來進行節點的處理,我這裡直接多節點每次行為做reload
if (event.getData().getPath().contains("Service/")){
//是服務節點,做更新
String path = event.getData().getPath();
//去除尾部範例段
path=path.substring(0,path.lastIndexOf("/"));
String serviceName = ZookeeperUtil.path2ServiceName(path);
if (watchPathSet.contains(path)) {
log.info("更新本地快取");
List<InetSocketAddress> addressList = getInstances(serviceName);
addLocalCache(serviceName,addressList);
}
}
}
else
{
System.out.println("type=" + event.getType());
}
});
try {
cache.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
isListening=true;
}
}
建立完Cache類,只需要修改之前ZooKeeperClientUtils中,從當前類改為Cache類獲取即可:
完整程式碼:
@Slf4j
public class ZookeeperClientUtils {
private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();
public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) {
InetSocketAddress address;
//本地快取查詢
if (ZookeeperClientCache.containsKey(serviceName)){
List<InetSocketAddress> addressList = ZookeeperClientCache.getOrDefault(serviceName);
if (!addressList.isEmpty()){
//使用lb進行負載均衡
return loadBalancer.select(addressList);
}
}
try {
String path = ZookeeperUtil.serviceName2Path(serviceName);
//獲取路徑下所有的實現
List<String> instancePaths = client.getChildren().forPath(path);
List<InetSocketAddress> addressList = new ArrayList<>();
for (String instancePath : instancePaths) {
byte[] bytes = client.getData().forPath(path+"/"+instancePath);
String json = new String(bytes);
InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
addressList.add(instance);
}
ZookeeperClientCache.addLocalCache(serviceName,addressList);
return loadBalancer.select(addressList);
} catch (Exception e) {
log.error("服務獲取失敗====>{}",e);
throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
}
}
}
實現上述程式碼,下面是服務監聽的簡單測試
開啟Server,Client:
關閉Server,Server自動進行服務的登出:
Client服務監控: