接上篇Sentinel叢集限流探索,上次簡單提到了叢集限流的原理,然後用官方給的 demo 簡單修改了一下,可以正常執行生效。
這一次需要更進一步,基於 Sentinel 實現內嵌式叢集限流的高可用方案,並且包裝成一箇中介軟體 starter 提供給三方使用。
對於高可用,我們主要需要解決兩個問題,這無論是使用內嵌或者獨立模式都需要解決的問題,相比而言,內嵌式模式更簡單一點。
首先,考慮到大部分的服務可能都不需要叢集限流這個功能,因此實現一個註解用於手動開啟叢集限流模式,只有開啟註解的情況下,才去範例化叢集限流的 Bean 和限流資料。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({EnableClusterImportSelector.class})
@Documented
public @interface SentinelCluster {
}
public class EnableClusterImportSelector implements DeferredImportSelector {
@Override
public String[] selectImports(AnnotationMetadata annotationMetadata) {
return new String[]{ClusterConfiguration.class.getName()};
}
}
這樣寫好之後,當掃描到有我們的 SentinelCluster
註解的時候,就會去範例化 ClusterConfiguration
。
@Slf4j
public class ClusterConfiguration implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
private Environment environment;
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(ClusterManager.class);
beanDefinitionBuilder.addConstructorArgValue(this.environment);
registry.registerBeanDefinition("clusterManager", beanDefinitionBuilder.getBeanDefinition());
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
}
在設定中去範例化用於管理叢集限流的ClusterManager
,這段邏輯和我們之前文章中使用到的一般無二,註冊到ApolloDataSource
之後自動監聽Apollo
的變化達到動態生效的效果。
@Slf4j
public class ClusterManager {
private Environment environment;
private String namespace;
private static final String CLUSTER_SERVER_KEY = "sentinel.cluster.server"; //服務叢集設定
private static final String DEFAULT_RULE_VALUE = "[]"; //叢集預設規則
private static final String FLOW_RULE_KEY = "sentinel.flow.rules"; //限流規則
private static final String DEGRADE_RULE_KEY = "sentinel.degrade.rules"; //降級規則
private static final String PARAM_FLOW_RULE_KEY = "sentinel.param.rules"; //熱點限流規則
private static final String CLUSTER_CLIENT_CONFIG_KEY = "sentinel.client.config"; //使用者端設定
public ClusterManager(Environment environment) {
this.environment = environment;
this.namespace = "YourNamespace";
init();
}
private void init() {
initClientConfig();
initClientServerAssign();
registerRuleSupplier();
initServerTransportConfig();
initState();
}
private void initClientConfig() {
ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new ApolloDataSource<>(
namespace,
CLUSTER_CLIENT_CONFIG_KEY,
DEFAULT_SERVER_VALUE,
source -> JacksonUtil.from(source, ClusterClientConfig.class)
);
ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
}
private void initClientServerAssign() {
ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new ApolloDataSource<>(
namespace,
CLUSTER_SERVER_KEY,
DEFAULT_SERVER_VALUE,
new ServerAssignConverter(environment)
);
ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
}
private void registerRuleSupplier() {
ClusterFlowRuleManager.setPropertySupplier(ns -> {
ReadableDataSource<String, List<FlowRule>> ds = new ApolloDataSource<>(
namespace,
FLOW_RULE_KEY,
DEFAULT_RULE_VALUE,
source -> JacksonUtil.fromList(source, FlowRule.class));
return ds.getProperty();
});
ClusterParamFlowRuleManager.setPropertySupplier(ns -> {
ReadableDataSource<String, List<ParamFlowRule>> ds = new ApolloDataSource<>(
namespace,
PARAM_FLOW_RULE_KEY,
DEFAULT_RULE_VALUE,
source -> JacksonUtil.fromList(source, ParamFlowRule.class)
);
return ds.getProperty();
});
}
private void initServerTransportConfig() {
ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new ApolloDataSource<>(
namespace,
CLUSTER_SERVER_KEY,
DEFAULT_SERVER_VALUE,
new ServerTransportConverter(environment)
);
ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
}
private void initState() {
ReadableDataSource<String, Integer> clusterModeDs = new ApolloDataSource<>(
namespace,
CLUSTER_SERVER_KEY,
DEFAULT_SERVER_VALUE,
new ServerStateConverter(environment)
);
ClusterStateManager.registerProperty(clusterModeDs.getProperty());
}
}
這樣的話,一個叢集限流的基本功能已經差不多是OK了,上述步驟都比較簡單,按照官方檔案基本都能跑起來,接下來要實現文章開頭提及到的核心的幾個功能了。
自動選舉怎麼實現?簡單點,不用考慮那麼多,每臺機器啟動成功之後直接寫入到 Apollo 當中,第一個寫入成功的就是 Server 節點。
這個過程為了保證並行帶來的問題,我們需要加鎖確保只有一臺機器成功寫入自己的本機資訊。
由於我使用 Eureka 作為註冊中心,Eureka 又有CacheRefreshedEvent
本地快取重新整理的事件,基於此每當本地快取重新整理,我們就去檢測當前 Server 節點是否存在,然後根據實際情況去實現選舉。
首先在 spring.factories 中新增我們的監聽器。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.test.config.SentinelEurekaEventListener
監聽器只有當開啟了叢集限流注解SentinelCluster
之後才會生效。
@Configuration
@Slf4j
@ConditionalOnBean(annotation = SentinelCluster.class)
public class SentinelEurekaEventListener implements ApplicationListener<CacheRefreshedEvent> {
@Resource
private DiscoveryClient discoveryClient;
@Resource
private Environment environment;
@Resource
private ApolloManager apolloManager;
@Override
public void onApplicationEvent(EurekaClientLocalCacheRefreshedEvent event) {
if (!leaderAlive(loadEureka(), loadApollo())) {
boolean tryLockResult = redis.lock; //redis或者其他加分散式鎖
if (tryLockResult) {
try {
flush();
} catch (Exception e) {
} finally {
unlock();
}
}
}
}
private boolean leaderAlive(List<ClusterGroup> eurekaList, ClusterGroup server) {
if (Objects.isNull(server)) {
return false;
}
for (ClusterGroup clusterGroup : eurekaList) {
if (clusterGroup.getMachineId().equals(server.getMachineId())) {
return true;
}
}
return false;
}
}
OK,其實看到程式碼已經知道我們把故障轉移的邏輯也實現了,其實道理是一樣的。
第一次啟動的時候 Apollo 中的 server 資訊是空的,所以第一臺加鎖寫入的機器就是 server 節點,後續如果 server 宕機下線,本地登入檔快取重新整理,對比 Eureka 的範例資訊和 Apollo 中的 server,如果 server 不存在,那麼就重新執行選舉的邏輯。
需要注意的是,本地快取重新整理的時間極端情況下可能會達到幾分鐘級別,那麼也就是說在服務下線的可能幾分鐘內沒有重新選舉出新的 server 節點整個叢集限流是不可用的狀態,對於業務要求非常嚴格的情況這個方案就不太適用了。
對於 Eureka 快取時間同步的問題,可以參考之前的文章Eureka服務下線太慢,電話被告警打爆了。
到這兒為止,我們已經把高可用方案實現好了,接下來最後一步,只要通過 Sentinel 自帶的控制檯能夠把設定寫入到 Apollo 中,那麼應用就自然會監聽到設定的變化,達到動態生效的效果。
根據官方的描述,官方已經實現了FlowControllerV2
用於叢集限流,同時在測試目錄下有簡單的案例幫助我們快速實現控制檯的持久化的邏輯。
我們只要實現DynamicRuleProvider
,同時注入到Controller
中使用即可,這裡我們實現flowRuleApolloProvider
用於提供從Apollo查詢資料,flowRuleApolloPublisher
用於寫入限流設定到Apollo。
@RestController
@RequestMapping(value = "/v2/flow")
public class FlowControllerV2 {
private final Logger logger = LoggerFactory.getLogger(FlowControllerV2.class);
@Autowired
private InMemoryRuleRepositoryAdapter<FlowRuleEntity> repository;
@Autowired
@Qualifier("flowRuleApolloProvider")
private DynamicRuleProvider<List<FlowRuleEntity>> ruleProvider;
@Autowired
@Qualifier("flowRuleApolloPublisher")
private DynamicRulePublisher<List<FlowRuleEntity>> rulePublisher;
}
實現方式很簡單,provider 通過 Apollo 的 open-api 從 namespace 中讀取設定,publisher 則是通過 open-api 寫入規則。
@Component("flowRuleApolloProvider")
public class FlowRuleApolloProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {
@Autowired
private ApolloManager apolloManager;
@Autowired
private Converter<String, List<FlowRuleEntity>> converter;
@Override
public List<FlowRuleEntity> getRules(String appName) {
String rules = apolloManager.loadNamespaceRuleList(appName, ApolloManager.FLOW_RULES_KEY);
if (StringUtil.isEmpty(rules)) {
return new ArrayList<>();
}
return converter.convert(rules);
}
}
@Component("flowRuleApolloPublisher")
public class FlowRuleApolloPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {
@Autowired
private ApolloManager apolloManager;
@Autowired
private Converter<List<FlowRuleEntity>, String> converter;
@Override
public void publish(String app, List<FlowRuleEntity> rules) {
AssertUtil.notEmpty(app, "app name cannot be empty");
if (rules == null) {
return;
}
apolloManager.writeAndPublish(app, ApolloManager.FLOW_RULES_KEY, converter.convert(rules));
}
}
ApolloManager
實現了通過open-api
查詢和寫入設定的能力,使用需要自行設定 Apollo Portal 地址和 token,這裡不贅述,可以自行檢視 Apollo 的官方檔案。
@Component
public class ApolloManager {
private static final String APOLLO_USERNAME = "apollo";
public static final String FLOW_RULES_KEY = "sentinel.flow.rules";
public static final String DEGRADE_RULES_KEY = "sentinel.degrade.rules";
public static final String PARAM_FLOW_RULES_KEY = "sentinel.param.rules";
public static final String APP_NAME = "YourAppName";
@Value("${apollo.portal.url}")
private String portalUrl;
@Value("${apollo.portal.token}")
private String portalToken;
private String apolloEnv;
private String apolloCluster = "default";
private ApolloOpenApiClient client;
@PostConstruct
public void init() {
this.client = ApolloOpenApiClient.newBuilder()
.withPortalUrl(portalUrl)
.withToken(portalToken)
.build();
this.apolloEnv = "default";
}
public String loadNamespaceRuleList(String appName, String ruleKey) {
OpenNamespaceDTO openNamespaceDTO = client.getNamespace(APP_NAME, apolloEnv, apolloCluster, "default");
return openNamespaceDTO
.getItems()
.stream()
.filter(p -> p.getKey().equals(ruleKey))
.map(OpenItemDTO::getValue)
.findFirst()
.orElse("");
}
public void writeAndPublish(String appName, String ruleKey, String value) {
OpenItemDTO openItemDTO = new OpenItemDTO();
openItemDTO.setKey(ruleKey);
openItemDTO.setValue(value);
openItemDTO.setComment("Add Sentinel Config");
openItemDTO.setDataChangeCreatedBy(APOLLO_USERNAME);
openItemDTO.setDataChangeLastModifiedBy(APOLLO_USERNAME);
client.createOrUpdateItem(APP_NAME, apolloEnv, apolloCluster, "default", openItemDTO);
NamespaceReleaseDTO namespaceReleaseDTO = new NamespaceReleaseDTO();
namespaceReleaseDTO.setEmergencyPublish(true);
namespaceReleaseDTO.setReleasedBy(APOLLO_USERNAME);
namespaceReleaseDTO.setReleaseTitle("Add Sentinel Config Release");
client.publishNamespace(APP_NAME, apolloEnv, apolloCluster, "default", namespaceReleaseDTO);
}
}
對於其他規則,比如降級、熱點限流都可以參考此方式去修改,當然控制檯要做的修改肯定不是這一點點,比如叢集的flowId
預設使用的單機自增,這個肯定需要修改,還有頁面的傳參、查詢路由的修改等等,比較繁瑣,就不在此贅述了,總歸也就是工作量的問題。
好了,本期內容就這些,我是艾小仙,我們下期見。