【1】Hystrix是springCloud的元件之一,Hystrix 可以讓我們在分散式系統中對服務間的呼叫進行控制加入一些呼叫延遲或者依賴故障的容錯機制。
【2】Hystrix 通過將依賴服務進行資源隔離進而阻止某個依賴服務出現故障時在整個系統所有的依賴服務呼叫中進行蔓延;【防止服務雪崩】
【3】其核心功能:
1)服務隔離(服務限流)
通過執行緒池或者號誌判斷是否已滿,超出容量的請求直接降級,以達到限流的作用。
2)服務熔斷
當失敗率達到閾值自動觸發降級,熔斷器觸發的快速失敗會有助於系統防止崩潰。【可以說熔斷是特定條件的降級】
3)服務降級
服務降級是當伺服器壓力劇增的情況下,根據當前業務情況及流量對一些服務和頁面有策略的降級,以此釋放伺服器資源以保證核心任務的正常執行。
【1】引入依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>
【2】啟動類開啟hystrix功能
@SpringBootApplication //註冊到eureka @EnableEurekaClient //開啟斷路器功能 @EnableCircuitBreaker public class WebApplication {
【3】註解@HystrixCommand引數分析
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface HystrixCommand { // HystrixCommand 命令所屬的組的名稱:預設註解方法類的名稱 String groupKey() default ""; // HystrixCommand 命令的key值,預設值為註解方法的名稱 String commandKey() default ""; // 執行緒池名稱,預設定義為groupKey String threadPoolKey() default ""; // 定義回退方法的名稱, 此方法必須和hystrix的執行方法在相同類中 String fallbackMethod() default ""; // 設定hystrix命令的引數 HystrixProperty[] commandProperties() default {}; // 設定hystrix依賴的執行緒池的引數 HystrixProperty[] threadPoolProperties() default {}; // 如果hystrix方法丟擲的異常包括RUNTIME_EXCEPTION,則會被封裝HystrixRuntimeException異常。我們也可以通過此方法定義哪些需要忽略的異常 Class<? extends Throwable>[] ignoreExceptions() default {}; // 定義執行hystrix observable的命令的模式,型別詳細見ObservableExecutionMode ObservableExecutionMode observableExecutionMode() default ObservableExecutionMode.EAGER; // 如果hystrix方法丟擲的異常包括RUNTIME_EXCEPTION,則會被封裝HystrixRuntimeException異常。此方法定義需要丟擲的異常 HystrixException[] raiseHystrixExceptions() default {}; // 定義回撥方法:但是defaultFallback不能傳入引數,返回引數和hystrix的命令相容 String defaultFallback() default ""; }
【4】使用範例
//執行緒池隔離的設定,執行緒池隔離與號誌隔離的最大區別在於傳送請求的執行緒,號誌是採用呼叫方法的執行緒,而執行緒池則是用池內的執行緒去傳送請求 @HystrixCommand( groupKey="test-provider", threadPoolKey="test-provider", threadPoolProperties = { @HystrixProperty(name = "coreSize", value = "20"),//執行緒池大小 @HystrixProperty(name = "maximumSize", value = "30"),//最大執行緒池大小 @HystrixProperty(name = "maxQueueSize", value = "20"),//最大佇列長度 @HystrixProperty(name = "keepAliveTimeMinutes", value = "2")//執行緒存活時間 },commandProperties = { @HystrixProperty(name = "execution.isolation.strategy",value = "THREAD") } //號誌隔離的設定 @HystrixCommand( //用來設定降級方法 fallbackMethod = "myTestFallbackMethod", commandProperties = { //進行熔斷設定 //條件1,設定在捲動時間視窗中,斷路器的最小請求數(沒有達到不會熔斷)。預設20。 @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold" ,value = "10"), //條件2,設定斷路器開啟的錯誤百分比。在捲動時間內,在請求數量超過requestVolumeThreshold的值,且錯誤請求數的百分比超過這個比例,斷路器就為開啟狀態。 @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage" ,value = "30"), //條件3,設定捲動時間窗的長度,單位毫秒。這個時間視窗就是斷路器收集資訊的持續時間。斷路器在收集指標資訊的時會根據這個時間視窗把這個視窗拆分成多個桶,每個桶代表一段時間的指標,預設10000. @HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds" ,value = "10000"), //設定當斷路器開啟之後的休眠時間,休眠時間結束後斷路器為半開狀態,斷路器能接受請求,如果請求失敗又重新回到開啟狀態,如果請求成功又回到關閉狀態 //單位是毫秒 @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds" ,value = "3000"), //設定號誌隔離 //設定號誌的數值 @HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests",value = "100"), //選擇策略為號誌隔離 @HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"), //設定HystrixCommand執行的超時時間,單位毫秒 @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000000000") } ) public String Test(){ .... } public String myTestFallbackMethod() { log.info("========myTestFallbackMethod========="); return "myTestFallbackMethod"; }
1.採用了AOP的方式來對方法進行了增強,
2.採用了大量的RxJava響應式程式設計,利用了Future+執行緒池的方法進行了大量的非同步
3.涉及到了滑動視窗的設計,來進行統計失敗率
【1】分析註解@EnableCircuitBreaker是如何開啟斷路器功能
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableCircuitBreakerImportSelector.class) public @interface EnableCircuitBreaker {} //註解說明:註釋以啟用斷路器實現 //但實際上只是匯入了EnableCircuitBreakerImportSelector類
【2】深入分析EnableCircuitBreakerImportSelector類做了什麼
//會發現什麼都沒做,只是將環境變數中的某個值設定為true @Order(Ordered.LOWEST_PRECEDENCE - 100) public class EnableCircuitBreakerImportSelector extends SpringFactoryImportSelector<EnableCircuitBreaker> { @Override protected boolean isEnabled() { return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled", Boolean.class, Boolean.TRUE); } }
【3】分析SpringBoot自動裝配會匯入什麼
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\ org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\ org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\ org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration //該類會比較重要
【4】分析HystrixCircuitBreakerConfiguration類做了什麼
@Configuration(proxyBeanMethods = false) public class HystrixCircuitBreakerConfiguration { //這個看名字就很重要,初始化AOP的攔截 @Bean public HystrixCommandAspect hystrixCommandAspect() { return new HystrixCommandAspect(); } @Bean public HystrixShutdownHook hystrixShutdownHook() { return new HystrixShutdownHook(); } @Bean public HasFeatures hystrixFeature() { return HasFeatures.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class)); } private class HystrixShutdownHook implements DisposableBean { @Override public void destroy() throws Exception { // Just call Hystrix to reset thread pool etc. Hystrix.reset(); } } }
【5】分析HystrixCommandAspect類在做了什麼
//先是定義了兩個切入點 @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() {} @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() {} //定義切面 @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { //通過切點獲取被攔截的方法 Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException(...); } MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); //metaholder中儲存了很多和切點相關的資訊,說白了就是解析註解獲得上面的資訊 MetaHolder metaHolder = metaHolderFactory.create(joinPoint); HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) {...} catch (HystrixRuntimeException e) {...} return result; }
【5.1】模式分析---分析MetaHolder的構成
public MetaHolder create(final ProceedingJoinPoint joinPoint) { Method method = getMethodFromTarget(joinPoint); Object obj = joinPoint.getTarget(); Object[] args = joinPoint.getArgs(); Object proxy = joinPoint.getThis(); return create(proxy, method, obj, args, joinPoint); } private static class CommandMetaHolderFactory extends MetaHolderFactory { @Override public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) { HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class); ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType()); MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint); if (isCompileWeaving()) { builder.ajcMethod(getAjcMethodFromTarget(joinPoint)); } return builder.defaultCommandKey(method.getName()) .hystrixCommand(hystrixCommand) .observableExecutionMode(hystrixCommand.observableExecutionMode()) .executionType(executionType) .observable(ExecutionType.OBSERVABLE == executionType) .build(); } } public enum ExecutionType { ASYNCHRONOUS, SYNCHRONOUS, OBSERVABLE; //所以根據我們的基本使用可以判斷是SYNCHRONOUS,同步模式 public static ExecutionType getExecutionType(Class<?> type) { if (Future.class.isAssignableFrom(type)) { return ExecutionType.ASYNCHRONOUS; } else if (Observable.class.isAssignableFrom(type)) { return ExecutionType.OBSERVABLE; } else { return ExecutionType.SYNCHRONOUS; } } }
【6】分析HystrixInvokable的建立【層層追溯,其實發現是生成一個包裝過的HystrixCommand】
//分析HystrixInvokable的建立 public HystrixInvokable create(MetaHolder metaHolder) { HystrixInvokable executable;
//判斷是不是HystrixCollapser註解 if (metaHolder.isCollapserAnnotationPresent()) { executable = new CommandCollapser(metaHolder); } else if (metaHolder.isObservable()) { executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } else { //主要是這裡 executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } return executable; } public GenericCommand(HystrixCommandBuilder builder) { super(builder); } protected AbstractHystrixCommand(HystrixCommandBuilder builder) { super(builder.getSetterBuilder().build()); this.commandActions = builder.getCommandActions(); this.collapsedRequests = builder.getCollapsedRequests(); this.cacheResultInvocationContext = builder.getCacheResultInvocationContext(); this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext(); this.ignoreExceptions = builder.getIgnoreExceptions(); this.executionType = builder.getExecutionType(); } HystrixCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { super(group, key, threadPoolKey, circuitBreaker, threadPool, commandPropertiesDefaults, threadPoolPropertiesDefaults, metrics, fallbackSemaphore, executionSemaphore, propertiesStrategy, executionHook); } protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { this.commandGroup = initGroupKey(group); this.commandKey = initCommandKey(key, getClass()); this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults); this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get()); this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); //Strategies from plugins this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties); this.executionHook = initExecutionHook(executionHook); this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy); /* fallback semaphore override if applicable */ this.fallbackSemaphoreOverride = fallbackSemaphore; /* execution semaphore override if applicable */ this.executionSemaphoreOverride = executionSemaphore; }
【7】那麼接下來分析CommandExecutor.execute做了什麼
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { //基於上面構成先分析同步方法 case SYNCHRONOUS: { //呼叫HystrixCommand return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } } //HystrixCommand類#execute方法 public R execute() { try { //利用了Future模式 return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } }
【8】分析queue()方法怎麼使用Future模式的
//分析queue()方法怎麼使用Future模式的 //熟悉執行緒池的,應該知道執行緒池有個FutureTask的任務 //通過持有FutureTask控制程式碼可以非同步獲取返回結果 //本質上就是FutureTask持有 //一個結果存放地址 //執行緒執行的run方法(執行完後將結果放入固定的存放地址) //那麼現在看下面的邏輯就會十分清晰 public Future<R> queue() { final Future<R> delegate = toObservable().toBlocking().toFuture(); final Future<R> f = new Future<R>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { if (delegate.isCancelled()) { return false; } if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) { interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning); } final boolean res = delegate.cancel(interruptOnFutureCancel.get()); if (!isExecutionComplete() && interruptOnFutureCancel.get()) { final Thread t = executionThread.get(); if (t != null && !t.equals(Thread.currentThread())) { t.interrupt(); } } return res; } @Override public boolean isCancelled() { return delegate.isCancelled(); } @Override public boolean isDone() { return delegate.isDone(); } @Override public R get() throws InterruptedException, ExecutionException { return delegate.get(); } @Override public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.get(timeout, unit); } }; /* special handling of error states that throw immediately */ if (f.isDone()) { try { f.get(); return f; } catch (Exception e) { Throwable t = decomposeException(e); if (t instanceof HystrixBadRequestException) { return f; } else if (t instanceof HystrixRuntimeException) { HystrixRuntimeException hre = (HystrixRuntimeException) t; switch (hre.getFailureType()) { case COMMAND_EXCEPTION: case TIMEOUT: // we don't throw these types from queue() only from queue().get() as they are execution errors return f; default: // these are errors we throw from queue() as they as rejection type errors throw hre; } } else { throw Exceptions.sneakyThrow(t); } } } //也就是將產生的Future物件返回 return f; }
【9】分析結果的獲取是從delegate屬性中獲取,它被定義為一個觀察者
//定義觀察者 public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; //doOnCompleted handler already did all of the SUCCESS work //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work //第一個觀察者,命令執行結束後的清理者 final Action0 terminateCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { handleCommandEnd(false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { handleCommandEnd(true); //user code did run } } }; //mark the command as CANCELLED and store the latency (in addition to standard cleanup) //第二個觀察者,取消訂閱時處理者 final Action0 unsubscribeCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { if (!_cmd.executionResult.containsTerminalEvent()) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); try { executionHook.onUnsubscribe(_cmd); } catch (Throwable hookEx) {...} _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); } handleCommandEnd(false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { if (!_cmd.executionResult.containsTerminalEvent()) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); try { executionHook.onUnsubscribe(_cmd); } catch (Throwable hookEx) {...} _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); } handleCommandEnd(true); //user code did run } } }; //第三個觀察者,重點:Hystrix 核心邏輯: 斷路器、隔離 final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } return applyHystrixSemantics(_cmd); } }; //第四個觀察者,發射資料(OnNext表示發射資料)時的Hook final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() { @Override public R call(R r) { R afterFirstApplication = r; try { afterFirstApplication = executionHook.onComplete(_cmd, r); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx); } try { return executionHook.onEmit(_cmd, afterFirstApplication); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx); return afterFirstApplication; } } }; //第五個觀察者,命令執行完成的Hook final Action0 fireOnCompletedHook = new Action0() { @Override public void call() { try { executionHook.onSuccess(_cmd); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx); } } }; //進行包裝 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { /* this is a stateful object so can only be used once */ if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { ..省略丟擲異常.. } commandStartTimestamp = System.currentTimeMillis(); if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); /* try from cache first */ if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } // 使用上面的Func0:applyHystrixSemantics 來建立Observable Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // 設定快取邏輯,不太重要 if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } //鏈式 return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } }); }
【10】分析核心applyHystrixSemantics方法
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { //執行命令開始執行的勾點方法 可能有人會問 前面繫結了那麼多的勾點方法 這裡怎麼才開始 //start 因為前面繫結但是並沒有執行。 當有訂閱者訂閱 這裡才是開始執行的程式碼邏輯 executionHook.onStart(_cmd); //判斷斷路器是否開啟 if (circuitBreaker.allowRequest()) { //如果是號誌隔離 返回TryableSemaphoreActual 根據設定的並行量來判斷是否能執行,如果不能執行,進入fallback。 //如果是執行緒池隔離 返回TryableSemaphoreNoOp 直接返回true沒有任何操作 final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; //判斷能否正常執行 if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); //核心方法 return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { //號誌執行的時候並行太大直接回退 return handleSemaphoreRejectionViaFallback(); } } else { //執行降級 return handleShortCircuitViaFallback(); } } //TryableSemaphoreActual類#tryAcquire方法 @Override public boolean tryAcquire() { int currentCount = count.incrementAndGet(); if (currentCount > numberOfPermits.get()) { count.decrementAndGet(); return false; } else { return true; } } //TryableSemaphoreNoOp類#tryAcquire方法 @Override public boolean tryAcquire() { return true; }
【11】分析allowRequest方法是怎麼判斷是否允許通過的
@Override public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { // 屬性要求我們強制開啟電路,這樣我們將允許NO請求 return false; } if (properties.circuitBreakerForceClosed().get()) { // 我們仍然希望允許isOpen()執行它的計算,因此我們模擬正常的行為 isOpen(); // 屬性要求我們忽略錯誤,所以我們將忽略isOpen的結果,只允許所有的流量通過 return true; } return !isOpen() || allowSingleTest(); } @Override public boolean isOpen() { //如果斷路器開啟立即返回true if (circuitOpen.get()) { return true; } // we're closed, so let's see if errors have made us so we should trip the circuit open HealthCounts health = metrics.getHealthCounts(); // check if we are past the statisticalWindowVolumeThreshold if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything return false; } if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { return false; } else { // our failure rate is too high, trip the circuit if (circuitOpen.compareAndSet(false, true)) { // if the previousValue was false then we want to set the currentTime circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); return true; } else { // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have // caused another thread to set it to true already even though we were in the process of doing the same // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open return true; } } } public boolean allowSingleTest() { long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); // 1) 如果斷路器是開啟的 // 2) 且已經過了休眠時間,嘗試開啟 if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { //已經過了休眠時間,允許一個請求嘗試。 //如果成功,斷路器被關閉。 if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { //如果這個返回true,意味著我們設定了時間,因此我們將返回true以允許單次嘗試 //如果它返回false,這意味著另一個執行緒在我們之前執行並允許單次嘗試 return true; } } return false; }
【12】分析降級的邏輯
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) { final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread(); long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); // record the executionResult // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144) executionResult = executionResult.addEvent((int) latency, eventType); if (shouldNotBeWrapped(originalException)){ /* executionHook for all errors */ Exception e = wrapWithOnErrorHook(failureType, originalException); return Observable.error(e); } else if (isUnrecoverable(originalException)) { logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException); /* executionHook for all errors */ Exception e = wrapWithOnErrorHook(failureType, originalException); return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null)); } else { if (isRecoverableError(originalException)) { logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException); } if (properties.fallbackEnabled().get()) { /* fallback behavior is permitted so attempt */ final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(requestContext); } }; final Action1<R> markFallbackEmit = new Action1<R>() { @Override public void call(R r) { if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT); eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey); } } }; final Action0 markFallbackCompleted = new Action0() { @Override public void call() { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS); } }; final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { Exception e = originalException; Exception fe = getExceptionFromThrowable(t); if (fe instanceof UnsupportedOperationException) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING); /* executionHook for all errors */ e = wrapWithOnErrorHook(failureType, e); return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe)); } else { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe); eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE); /* executionHook for all errors */ e = wrapWithOnErrorHook(failureType, e); return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe)); } } }; final TryableSemaphore fallbackSemaphore = getFallbackSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { fallbackSemaphore.release(); } } }; Observable<R> fallbackExecutionChain; //上面那些定義的其實都不會在這裡呼叫,主要是看下面的 // acquire a permit if (fallbackSemaphore.tryAcquire()) { try { if (isFallbackUserDefined()) { executionHook.onFallbackStart(this); //HystrixCommand類#getFallbackObservable fallbackExecutionChain = getFallbackObservable(); } else { //same logic as above without the hook invocation fallbackExecutionChain = getFallbackObservable(); } } catch (Throwable ex) { //If hook or user-fallback throws, then use that as the result of the fallback lookup fallbackExecutionChain = Observable.error(ex); } return fallbackExecutionChain .doOnEach(setRequestContext) .lift(new FallbackHookApplication(_cmd)) .lift(new DeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } else { return handleFallbackRejectionByEmittingError(); } } else { return handleFallbackDisabledByEmittingError(originalException, failureType, message); } } } //HystrixCommand類#getFallbackObservable @Override final protected Observable<R> getFallbackObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { //呼叫GenericCommand類的getFallback方法【子類重新寫父類別】 return Observable.just(getFallback()); } catch (Throwable ex) { return Observable.error(ex); } } }); } //GenericCommand類#getFallback方法 @Override protected Object getFallback() { final CommandAction commandAction = getFallbackAction(); if (commandAction != null) { try { return process(new Action() { @Override Object execute() { MetaHolder metaHolder = commandAction.getMetaHolder(); Object[] args = createArgsForFallback(metaHolder, getExecutionException()); return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args); } }); } catch (Throwable e) { LOGGER.error(FallbackErrorMessageBuilder.create() .append(commandAction, e).build()); throw new FallbackInvocationException(unwrapCause(e)); } } else { return super.getFallback(); } }
【13】分析核心的executeCommandAndObserve執行邏輯
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); //主要是來對HystrixCommand和HystrixObservableCommand記錄的事件是不同的 final Action1<R> markEmits = new Action1<R>() { @Override public void call(R r) { if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.EMIT); eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); } if (commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); } } }; final Action0 markOnCompleted = new Action0() { @Override public void call() { if (!commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); } } }; //執行失敗的邏輯定義 final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; //上面定義的都是一些非同步呼叫事件,主體在這裡 Observable<R> execution; //如果超時開啟 使用HystrixObservableTimeoutOperator來對Observable做超時處理。 //所以不管是號誌隔離還是執行緒池隔離都會走該邏輯進行超時控制。 if (properties.executionTimeoutEnabled().get()) { //看名字就知道是特殊的隔離,也就是隔離邏輯所在 execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); } private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //we have not been unsubscribed, so should proceed HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); /** * If any of these hooks throw an exception, then it appears as if the actual execution threw an error */ try { executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it } }).doOnUnsubscribe(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it } }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { //執行緒池隔離呼叫 @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // semaphore isolated // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } } }); } } private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { Observable<R> userObservable; try { userObservable = getExecutionObservable(); } catch (Throwable ex) { // the run() method is a user provided implementation so can throw instead of using Observable.onError // so we catch it here and turn it into Observable.error userObservable = Observable.error(ex); } return userObservable .lift(new ExecutionHookApplication(_cmd)) .lift(new DeprecatedOnRunHookApplication(_cmd)); } @Override final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { //呼叫GenericCommand類的run方法【子類重新寫父類別】 return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override public void call() { // Save thread on which we get subscribed so that we can interrupt it later if needed executionThread.set(Thread.currentThread()); } }); } //GenericCommand類#run方法 @Override protected Object run() throws Exception { LOGGER.debug("execute command: {}", getCommandKey().name()); return process(new Action() { @Override Object execute() { return getCommandAction().execute(getExecutionType()); } }); }
【14】分析隔離執行緒池的執行緒隔離threadPool.getScheduler的初始化【位於第六步裡面】
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { if (fromConstructor == null) { // get the default implementation of HystrixThreadPool //通過塞入註解資訊threadPoolPropertiesDefaults進行初始化 return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); } else { return fromConstructor; } } static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work String key = threadPoolKey.name(); // this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } // if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); } public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties), properties); this.threadPool = this.metrics.getThreadPool(); this.queue = this.threadPool.getQueue(); /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); }
【15】斷路器初始化分析【位於第六步裡面】
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { // 如果啟用了熔斷器 if (enabled) { // 若commandKey沒有對應的CircuitBreaker,則建立 if (fromConstructor == null) { // get the default implementation of HystrixCircuitBreaker return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics); } else { // 如果有則返回現有的 return fromConstructor; } } else { return new NoOpCircuitBreaker(); } } //circuitBreaker以commandKey為維度,每個commandKey都會有對應的circuitBreaker public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { // 如果有則返回現有的, key.name()即command的name作為檢索條件 HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name()); if (previouslyCached != null) { return previouslyCached; } // 如果沒有則建立並cache HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics)); if (cbForCommand == null) { // this means the putIfAbsent step just created a new one so let's retrieve and return it return circuitBreakersByCommand.get(key.name()); } else { // this means a race occurred and while attempting to 'put' another one got there before // and we instead retrieved it and will now return it return cbForCommand; } } protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; }
【16】健康分析【涉及到滑動視窗的設計】
HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) { super(null); this.key = key; this.group = commandGroup; this.threadPoolKey = threadPoolKey; this.properties = properties; healthCountsStream = HealthCountsStream.getInstance(key, properties); rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties); cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties); rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties); rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties); rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties); } public static HealthCountsStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) { //預設是按500ms為一個段 final int healthCountBucketSizeInMs = properties.metricsHealthSnapshotIntervalInMilliseconds().get(); if (healthCountBucketSizeInMs == 0) { throw new RuntimeException(...); } //獲得桶數:設定的時間/段的時間 final int numHealthCountBuckets = properties.metricsRollingStatisticalWindowInMilliseconds().get() / healthCountBucketSizeInMs; //初始化 return getInstance(commandKey, numHealthCountBuckets, healthCountBucketSizeInMs); } public static HealthCountsStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) { HealthCountsStream initialStream = streams.get(commandKey.name()); if (initialStream != null) { return initialStream; } else { final HealthCountsStream healthStream; synchronized (HealthCountsStream.class) { HealthCountsStream existingStream = streams.get(commandKey.name()); if (existingStream == null) { //新建邏輯 HealthCountsStream newStream = new HealthCountsStream(commandKey, numBuckets, bucketSizeInMs, HystrixCommandMetrics.appendEventToBucket); streams.putIfAbsent(commandKey.name(), newStream); healthStream = newStream; } else { healthStream = existingStream; } } healthStream.startCachingStreamValuesIfUnstarted(); return healthStream; } } private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs, Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) { super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator); } public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> { private Observable<Output> sourceStream; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket, final Func2<Output, Bucket, Output> reduceBucket) { //呼叫父級方法 super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket); Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() { @Override public Observable<Output> call(Observable<Bucket> window) { return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); } }; this.sourceStream = bucketedStream //stream broken up into buckets .window(numBuckets, 1) //emit overlapping windows of buckets .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer } @Override public Observable<Output> observe() { return sourceStream; } /* package-private */ boolean isSourceCurrentlySubscribed() { return isSourceCurrentlySubscribed.get(); } } public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> { protected final int numBuckets; protected final Observable<Bucket> bucketedStream; protected final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>(null); private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary; private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(getEmptyOutputValue()); protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket) { this.numBuckets = numBuckets; this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() { @Override public Observable<Bucket> call(Observable<Event> eventBucket) { return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); } }; final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>(); for (int i = 0; i < numBuckets; i++) { emptyEventCountsToStart.add(getEmptyBucketSummary()); } this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() { @Override public Observable<Bucket> call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full) } }); } abstract Bucket getEmptyBucketSummary(); abstract Output getEmptyOutputValue(); /** * Return the stream of buckets * @return stream of buckets */ public abstract Observable<Output> observe(); public void startCachingStreamValuesIfUnstarted() { if (subscription.get() == null) { //the stream is not yet started Subscription candidateSubscription = observe().subscribe(counterSubject); if (subscription.compareAndSet(null, candidateSubscription)) { //won the race to set the subscription } else { //lost the race to set the subscription, so we need to cancel this one candidateSubscription.unsubscribe(); } } } /** * Synchronous call to retrieve the last calculated bucket without waiting for any emissions * @return last calculated bucket */ public Output getLatest() { startCachingStreamValuesIfUnstarted(); if (counterSubject.hasValue()) { return counterSubject.getValue(); } else { return getEmptyOutputValue(); } } public void unsubscribe() { Subscription s = subscription.get(); if (s != null) { s.unsubscribe(); subscription.compareAndSet(s, null); } } }
MetaHolder