微服務1:微服務及其演進史
微服務2:微服務全景架構
微服務3:微服務拆分策略
微服務4:服務註冊與發現
微服務5:服務註冊與發現(實踐篇)
微服務6:通訊之閘道器
微服務7:通訊之RPC
微服務8:通訊之RPC實踐篇(附原始碼)
微服務9:服務治理來保證高可用
微服務10:系統服務熔斷、限流
前面的章節,我們學習了微服務中對熔斷降級的原理,參考這篇《服務治理:熔斷、降級、限流》。瞭解了固定視窗演演算法、滑動視窗演演算法、 漏桶原理和令牌桶原理,本文對Hystrix做進一步的分析。
Hystrix是Netflix開源的一款具備熔斷、限流、降級能力的容錯系統,設計目的是將應用中的系統存取、多鏈路服務呼叫、第三方依賴服務的呼叫,通過流量資源控制的方式隔離開。
避免了在分散式系中的某個服務故障沿著呼叫鏈向上傳遞,出現整體的服務雪崩,並以此提升系統的穩定性和健壯性。
以往的存取模式,是A鏈路 與 B鏈路(A -> B)的直接存取。而命令模式(Command Pattern)的作用則是通過建立命令物件來解耦A、B鏈路。
在執行過程中,命令物件可以對請求進行排隊、記錄請求紀錄檔、執行故障注入、超時/故障 快速返回等操作,如 A -> Command Work -> B。
在計算機中,執行緒是系統執行的基本單位,我們可以通過對執行緒池資源的管理,如非同步請求,請求超時斷開,請求熔斷,來對系統資源進行隔離,當部分型別的資源有限,請求過載時,進行系統保護。
Java程式中,Semaphore(號誌)是用來控制同時存取特定資源的執行緒數量,通過協調各個執行緒以保證合理地使用公共資源。也保證了資源競爭的隔離性。
如下圖所示(圖片源自官網),Hystrix的工作流程上大概會有如下9個步驟,下文將詳細介紹每個流程:
建立HystrixCommand 或者 HystrixObservableCommand 命令
執行命令,如圖中的,一共有四種方式來執行run()/construct()
單個範例只能執行一次這4個方法。HystrixObservableCommand沒有execute()和queue()。
執行方式 | 說明 | 可用物件 |
---|---|---|
execute() | 阻塞式同步執行,返回依賴服務的單一返回結果(或者丟擲異常) | HystrixCommand |
queue() | 基於Future的非同步方式執行,返回依賴服務的單一返回結果(或者丟擲異常) | HystrixCommand |
observe() | 基於Rxjava的Observable方式,返回通過Observable表示的依賴服務返回結果,代呼叫程式碼先執行(Hot Obserable) | HystrixObservableCommand |
toObvsevable() | 基於Rxjava的Observable方式,返回通過Observable表示的依賴服務返回結果,執行程式碼等到真正訂閱的時候才會執行(cold observable) | HystrixObservableCommand |
execute()
以同步堵塞方式執行run(),呼叫execute()後,hystrix會先建立一個新執行緒執行run(),執行excute時一直出於堵塞狀態,直到run()執行完成。
queue()
以非同步非堵塞方式執行run()。一呼叫queue()就直接返回一個Future物件,同時hystrix建立一個新執行緒執行run(),呼叫程式通過Future.get()拿到run()的返回結果,而Future.get()是堵塞執行的。
observe()
事件註冊前執行run()/construct()。
toObservable()
事件註冊後執行run()/construct()。
如果當前命令物件設定了允許從結果快取中取返回結果,並且在結果快取中已經快取了請求結果,則立即通過Observable返回。
判斷 circuit-breaker 是否開啟。如果3.3步驟沒有快取沒有命中,則判斷一下當前斷路器的斷路狀態是否開啟。如果斷路器狀態為開啟狀態,則Hystrix將不會執行此Command命令,直接執行步驟3.8 呼叫Fallback。
如果斷路器狀態是關閉,則執行 步驟3.5 檢查是否有足夠的資源執行 Command命令。
如果當前要執行的Command命令 先關連的執行緒池 和佇列(或者號誌)資源已經滿了,Hystrix將不會執行 Command命令,直接執行步驟8的Fallback降級處理;如果未滿,表示有剩餘的資源執行Command命令,則執行步驟 3.6。
執行 HystrixObservableCommand.construct() 或者 HystrixCommand.run()。
當經過步驟 3.5 判斷,有足夠的資源執行Command命令時,本步驟將呼叫Command命令執行方法。呼叫HystrixCommand的run方法。按照一下兩個條件去判斷:
對於熔斷器的資訊會做健康的判斷。Hystrix 統計Command命令執行執行過程中的 success count、fail count、reject count 和 timeout count, 並將這些資訊記錄到斷路器(Circuit Breaker)中。
斷路器會把上面的統計資訊按照時間窗統計下來。並判斷什麼時候可以將請求熔斷,在熔斷後和熔斷視窗期結束之前,請求都不會被Fallback。熔斷視窗期結束後會再次校驗,通過後熔斷開關會被關閉。
Hystrix會在以下場景出現後,觸發Fallback操作:
Hystrix命令物件執行成功,會直接返回結果或者以Observable形式返回結果。返回的Observable 會執行以下流程返回結果。
Hystrix獲取返回結果執行流程如下(圖片源自官網):
pom.xml加上以下依賴。我們使用原生hystrix來做案例介紹。
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.8</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream</artifactId>
<version>1.4.10</version>
</dependency>
fallBack是指當程式符合我們執行熔斷降級的條件時候,我們預設執行的路線,可以是一個方法或者一個物件。HystrixCommand中已有,我們只需重寫即可,類似
@Override
protected String getFallback() {
return "當熔斷、降級發生時,返回的預設資訊";
}
在3.8 節 我們介紹了Hystrix 觸發 fallBack的四種條件,下面我們一個個來測試。
除了HystrixBadRequestException,所有程式丟擲的異常,都會觸發getFallback(),呼叫程式將獲得getFallback()的執行並返回。
/**
* @author brand
* @Description: 模擬異常/超時的場景
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:35
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixException extends HystrixCommand<String> {
/**
* 實現getFallback()後,執行命令時遇到以上4種情況將被fallback接管,不會丟擲異常或其他
* 下面演示的是異常的情況
*/
private final String name;
public HystrixException(String name) {
super(HystrixCommandGroupKey.Factory.asKey("Command Group:fallbackGroup"));
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------以下三種情況觸發fallback-------------------*/
// 1.迴圈+等待,超時fallBack
// int i = 0;
// while (true) {
// i++;
// Thread.currentThread().sleep(1000);
// }
// 2.除零導致異常
// int i = 1/0;
// 3.主動丟擲異常
// throw new Exception("command trigger fallback");
/*---------------直接丟擲HystrixBadRequestException,不觸發fallback-----------------*/
// HystrixBadRequestException,這個是非法引數或非系統錯誤引起,不觸發fallback,也不被計入熔斷器
throw new HystrixBadRequestException("HystrixBadRequestException not trigger fallback");
// return "success";
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
}
編寫測試類:
/**
* @author brand
* @Description: 測試異常/超時 fallBack
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:35
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class ExceptionTimeOutFallBackTest {
@Test
public void testException() throws IOException {
try {
assertEquals("success", new HystrixException("Exception").execute());
} catch(Exception e) {
System.out.println("run()丟擲HystrixBadRequestException時,會被捕獲到這裡" + e.getCause());
}
}
}
測試類執行直接丟擲HystrixBadRequestException,測試類會走到catch函數段中。
測試類執行其他三種情況,會得到以下結果:
同上 4.2.1 中的 迴圈+等待,超時fallBack 的場景
圖片源自官網,這邊就不單獨畫了。
key值 | 說明 | 預設值 |
---|---|---|
circuitBreaker.enabled | 是否開啟斷路器 | true |
circuitBreaker.requestVolumeThreshold | 斷路器啟用請求數閾值 | 10 |
circuitBreaker.sleepWindowInMilliseconds | 斷路器啟用後的睡眠時間窗 | 5000(ms) |
circuitBreaker.errorThresholdPercentage | 斷路器啟用失敗率閾值 | 50(%) |
circuitBreaker.forceOpen | 是否強制將斷路器設定成開啟狀態 | false |
circuitBreaker.forceClosed | 是否強制將斷路器設定成關閉狀態 | false |
/**
* @author brand
* @Description: 熔斷
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午3:41
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixCircuitBreaker extends HystrixCommand<String> {
private final String name;
public HystrixCircuitBreaker(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Group:CircuitBreaker"))
.andCommandKey(HystrixCommandKey.Factory.asKey("Command:CircuitBreaker"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPool:CircuitBreakerTest"))
.andThreadPoolPropertiesDefaults( // 設定執行緒池
HystrixThreadPoolProperties.Setter()
.withCoreSize(200) // 設定執行緒池裡的執行緒數,設定足夠多執行緒,以防未熔斷卻打滿threadpool
)
.andCommandPropertiesDefaults( // 設定熔斷器
HystrixCommandProperties.Setter()
.withCircuitBreakerEnabled(true)
.withCircuitBreakerRequestVolumeThreshold(10)
.withCircuitBreakerErrorThresholdPercentage(50)
// .withCircuitBreakerForceOpen(true) // true時強制將斷路器設定成開啟狀態,所有請求都將被拒絕,直接到fallback
// .withCircuitBreakerForceClosed(true) // true時強制將斷路器設定成關閉狀態,將忽略所有錯誤
// .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE) // 號誌隔離
// .withExecutionTimeoutInMilliseconds(5000)
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
System.out.println("running num :" + name);
int num = Integer.valueOf(name);
if (num % 2 == 0 && num < 30) { // 符合條件,直接返回
return name;
} else { // 模擬異常
int j = 0;
j = num / j;
}
return name;
}
@Override
protected String getFallback() {
return "CircuitBreaker fallback: " + name;
}
}
執行結果如下,偶數正常返回,奇數進入熔斷資訊,並且超過30之後全部進入fallBack
執行緒池隔離:不同服務通過使用不同執行緒池,彼此間將不受影響,達到隔離效果。
我們通過andThreadPoolKey設定使用命名為ThreadPoolTest的執行緒池,實現與其他命名的執行緒池天然隔離,如果不設定andThreadPoolKey,也可以則使用withGroupKey設定來命名執行緒池。
/**
* @author brand
* @Description: 執行緒池隔離
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:58
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixThreadPool extends HystrixCommand<String> {
private final String name;
public HystrixThreadPool(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup")) // CommandGroup分組
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolTest")) // 執行緒池key
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 設定執行緒池裡的執行緒數為3。超過3次進行熔斷
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------如果執行緒數超配,會觸發fallback的case,否則休眠1s,進行正常返回-------------------*/
TimeUnit.MILLISECONDS.sleep(1000);
return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
}
測試一下,下面都是使用 HystrixThreadPoolKey 為 ThreadPoolTest的執行緒池命名,所以是公用,會返回fallBack的結果。
for(int i = 0; i < 3; i++) {
try {
Future<String> future = new HystrixThreadPool("thread pool"+i).queue(); // 以非同步非堵塞方式執行run(),所以消耗了3個執行緒
} catch(Exception e) {
System.out.println("run()丟擲HystrixBadRequestException時,被捕獲到這裡" + e.getCause());
}
}
for(int i = 0; i < 10; i++) {
try {
System.out.println("===========" + new HystrixThreadPool("thread pool").execute()); //上面消耗了所有執行緒,這邊會執行到fallBack中
} catch(Exception e) {
System.out.println("run()丟擲HystrixBadRequestException時,被捕獲到這裡" + e.getCause());
}
}
我們做一下調整,讓執行緒池的key(HystrixThreadPoolKey)不一致,再測試是否返回正常的執行結果。
/**
* @author brand
* @Description: 執行緒池隔離
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:58
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixThreadPool extends HystrixCommand<String> {
private final String name;
public HystrixThreadPool(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup")) // CommandGroup分組
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name)) // 執行緒池key,根據請求的入參來算
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 設定執行緒池裡的執行緒數為3。超過3次進行熔斷
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------如果執行緒數超配,會觸發fallback的case,否則休眠1s,進行正常返回-------------------*/
TimeUnit.MILLISECONDS.sleep(1000);
return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
}
測試一下,下面都是使用 HystrixThreadPoolKey 為 ThreadPoolTest的執行緒池命名,所以是公用,會返回fallBack的結果。
for(int i = 0; i < 3; i++) {
try {
Future<String> future = new HystrixThreadPool("thread pool"+i).queue(); // 會有三個執行緒池組 thread pool1、thread poo2、thread pool3,不互相影響,更不會影響下面excute()的執行
} catch(Exception e) {
System.out.println("run()丟擲HystrixBadRequestException時,被捕獲到這裡" + e.getCause());
}
}
for(int i = 0; i < 10; i++) {
try {
System.out.println("===========" + new HystrixThreadPool("thread pool").execute()); //與上面隔離,所以這邊執行始終不會走到fallBack中
} catch(Exception e) {
System.out.println("run()丟擲HystrixBadRequestException時,被捕獲到這裡" + e.getCause());
}
}
https://github.com/WengZhiHua/Helenlyn.Grocery/tree/master/parent/HystrixDemo