微服務11:熔斷、降級的Hystrix實現(附原始碼)

2022-12-20 18:07:54

微服務1:微服務及其演進史
微服務2:微服務全景架構
微服務3:微服務拆分策略
微服務4:服務註冊與發現
微服務5:服務註冊與發現(實踐篇)
微服務6:通訊之閘道器
微服務7:通訊之RPC
微服務8:通訊之RPC實踐篇(附原始碼)
微服務9:服務治理來保證高可用
微服務10:系統服務熔斷、限流

1 介紹

前面的章節,我們學習了微服務中對熔斷降級的原理,參考這篇《服務治理:熔斷、降級、限流》。瞭解了固定視窗演演算法、滑動視窗演演算法、 漏桶原理和令牌桶原理,本文對Hystrix做進一步的分析。
Hystrix是Netflix開源的一款具備熔斷、限流、降級能力的容錯系統,設計目的是將應用中的系統存取、多鏈路服務呼叫、第三方依賴服務的呼叫,通過流量資源控制的方式隔離開。
避免了在分散式系中的某個服務故障沿著呼叫鏈向上傳遞,出現整體的服務雪崩,並以此提升系統的穩定性和健壯性。

1.1 Hystrix是用來解決哪些問題的?

  • 對所依賴的服務的延遲和故障進行容錯(防護+控制)
  • 對服務的故障進行妥善的處理
  • 對延遲過久的請求進行快速失敗並迅速恢復,避免佇列阻塞
  • 返回預設值或者預設的處理(fallback),實現優雅的降級(如給使用者一個友好的提示)
  • 近實時的資料監控與異常告警,及時發現問題並快速止損

1.2 Hystrix如何解決這些問題

  • HystrixCommand、HystrixObservableCommand在單獨執行緒中執行,防止單線依賴,消耗整個服務的資源
  • 服務過載的時候立即斷開並快速失敗,防止佇列阻塞,即執行緒池或號誌滿的時候直接拒絕請求
  • 當超時或者失敗時,提供fallback能力避免使用者直接面對故障,提供優雅的反饋。
  • 採用隔離技術(流量泳道和斷路器模式)來避免單個依賴導致這個鏈路的雪崩
  • 近實時的資料監控與異常告警,及時發現問題並快速止損(提供監控和預警能力)

2 Hystrix 基礎模型

2.1 設計模式:命令模式(Command Pattern)

以往的存取模式,是A鏈路 與 B鏈路(A -> B)的直接存取。而命令模式(Command Pattern)的作用則是通過建立命令物件來解耦A、B鏈路。
在執行過程中,命令物件可以對請求進行排隊、記錄請求紀錄檔、執行故障注入、超時/故障 快速返回等操作,如 A -> Command Work -> B。

2.2 隔離模式:執行緒池和號誌隔離

在計算機中,執行緒是系統執行的基本單位,我們可以通過對執行緒池資源的管理,如非同步請求,請求超時斷開,請求熔斷,來對系統資源進行隔離,當部分型別的資源有限,請求過載時,進行系統保護。
Java程式中,Semaphore(號誌)是用來控制同時存取特定資源的執行緒數量,通過協調各個執行緒以保證合理地使用公共資源。也保證了資源競爭的隔離性。

3 Hystrix 工作原理

如下圖所示(圖片源自官網),Hystrix的工作流程上大概會有如下9個步驟,下文將詳細介紹每個流程:

3.1 建立命令

建立HystrixCommand 或者 HystrixObservableCommand 命令

3.2 執行命令

執行命令,如圖中的,一共有四種方式來執行run()/construct()

  • execute
  • queue
  • observer
  • toObserver

單個範例只能執行一次這4個方法。HystrixObservableCommand沒有execute()和queue()。

執行方式 說明 可用物件
execute() 阻塞式同步執行,返回依賴服務的單一返回結果(或者丟擲異常) HystrixCommand
queue() 基於Future的非同步方式執行,返回依賴服務的單一返回結果(或者丟擲異常) HystrixCommand
observe() 基於Rxjava的Observable方式,返回通過Observable表示的依賴服務返回結果,代呼叫程式碼先執行(Hot Obserable) HystrixObservableCommand
toObvsevable() 基於Rxjava的Observable方式,返回通過Observable表示的依賴服務返回結果,執行程式碼等到真正訂閱的時候才會執行(cold observable) HystrixObservableCommand
  • execute()
    以同步堵塞方式執行run(),呼叫execute()後,hystrix會先建立一個新執行緒執行run(),執行excute時一直出於堵塞狀態,直到run()執行完成。

  • queue()
    以非同步非堵塞方式執行run()。一呼叫queue()就直接返回一個Future物件,同時hystrix建立一個新執行緒執行run(),呼叫程式通過Future.get()拿到run()的返回結果,而Future.get()是堵塞執行的。

  • observe()
    事件註冊前執行run()/construct()。

    • 事件註冊前,先呼叫observe()自動觸發執行run()/construct()(如果繼承的是HystrixCommand,hystrix將建立新執行緒非堵塞執行run();如果繼承的是HystrixObservableCommand,將以呼叫程式執行緒堵塞執行construct())
    • observe()返回結果後,呼叫程式呼叫subscribe()完成事件註冊,如果run()/construct()執行成功則觸發onNext()和onCompleted() 方法,如果執行異常則觸發 onError() 方法
  • toObservable()
    事件註冊後執行run()/construct()。

    • 事件註冊前,呼叫toObservable()就立即返回 Observable物件
    • 呼叫subscribe()完成事件註冊後自動觸發執行run()/construct(),如果run()/construct()執行成功則觸發onNext()和onCompleted()方法,如果執行異常則觸發onError() 方法。

3.3 是否從快取獲取結果返回?

如果當前命令物件設定了允許從結果快取中取返回結果,並且在結果快取中已經快取了請求結果,則立即通過Observable返回。

3.4 是否啟用了熔斷器?

判斷 circuit-breaker 是否開啟。如果3.3步驟沒有快取沒有命中,則判斷一下當前斷路器的斷路狀態是否開啟。如果斷路器狀態為開啟狀態,則Hystrix將不會執行此Command命令,直接執行步驟3.8 呼叫Fallback。
如果斷路器狀態是關閉,則執行 步驟3.5 檢查是否有足夠的資源執行 Command命令。

3.5 判斷資源(執行緒池/佇列/號誌)是否已滿?

如果當前要執行的Command命令 先關連的執行緒池 和佇列(或者號誌)資源已經滿了,Hystrix將不會執行 Command命令,直接執行步驟8的Fallback降級處理;如果未滿,表示有剩餘的資源執行Command命令,則執行步驟 3.6。

3.6 執行 construct() 或者 run()

執行 HystrixObservableCommand.construct() 或者 HystrixCommand.run()。
當經過步驟 3.5 判斷,有足夠的資源執行Command命令時,本步驟將呼叫Command命令執行方法。呼叫HystrixCommand的run方法。按照一下兩個條件去判斷:

  • 判斷請求邏輯是否呼叫成功,如果呼叫成功返回撥用結果。呼叫出錯(5xx),進入步驟 3.8。
  • 判斷呼叫的依賴邏輯呼叫是否超時,未超時則直接返回成功結果。超時斷開,進入步驟3.8。

3.7 計算熔斷器健康情況

對於熔斷器的資訊會做健康的判斷。Hystrix 統計Command命令執行執行過程中的 success count、fail count、reject count 和 timeout count, 並將這些資訊記錄到斷路器(Circuit Breaker)中。
斷路器會把上面的統計資訊按照時間窗統計下來。並判斷什麼時候可以將請求熔斷,在熔斷後和熔斷視窗期結束之前,請求都不會被Fallback。熔斷視窗期結束後會再次校驗,通過後熔斷開關會被關閉。

3.8 熔斷後的請求執行Fallback

Hystrix會在以下場景出現後,觸發Fallback操作:

  • 異常場景:run()方法丟擲非HystrixBadRequestException異常。
  • 超時場景:run()方法呼叫超時。
  • 直接熔斷:熔斷器開啟攔截調,所有的請求都會被攔截。
  • 容量滿額:執行緒池/佇列/號誌滿量之後

3.9 返回成功結果

Hystrix命令物件執行成功,會直接返回結果或者以Observable形式返回結果。返回的Observable 會執行以下流程返回結果。
Hystrix獲取返回結果執行流程如下(圖片源自官網):

4 Hystrix 實現過程

4.1 引入依賴

pom.xml加上以下依賴。我們使用原生hystrix來做案例介紹。

 <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-core</artifactId>
            <version>1.5.8</version>
        </dependency>

        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-metrics-event-stream</artifactId>
            <version>1.4.10</version>
        </dependency>

4.2 fallBack

fallBack是指當程式符合我們執行熔斷降級的條件時候,我們預設執行的路線,可以是一個方法或者一個物件。HystrixCommand中已有,我們只需重寫即可,類似

@Override
protected String getFallback() {
    return "當熔斷、降級發生時,返回的預設資訊";
}

在3.8 節 我們介紹了Hystrix 觸發 fallBack的四種條件,下面我們一個個來測試。

4.2.1 程式異常fallBack

除了HystrixBadRequestException,所有程式丟擲的異常,都會觸發getFallback(),呼叫程式將獲得getFallback()的執行並返回。

/**
 * @author brand
 * @Description: 模擬異常/超時的場景 
 * @Copyright: Copyright (c) 2022
 * @Company: Helenlyn, Inc. All Rights Reserved.
 * @date 2022/1/8 下午5:35
 * @Update Time:
 * @Updater:
 * @Update Comments:
 */
public class HystrixException extends HystrixCommand<String> {
     /**
     * 實現getFallback()後,執行命令時遇到以上4種情況將被fallback接管,不會丟擲異常或其他
     * 下面演示的是異常的情況
     */
    private final String name;

    public HystrixException(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("Command Group:fallbackGroup"));
        this.name = name;
    }
	
    @Override
    protected String run() throws Exception {
        /*---------------以下三種情況觸發fallback-------------------*/
        // 1.迴圈+等待,超時fallBack
//    	int i = 0;
//    	while (true) {
//    		i++;
//            Thread.currentThread().sleep(1000);
//    	}

        // 2.除零導致異常
//    	 int i = 1/0;

        // 3.主動丟擲異常
        // throw new Exception("command trigger fallback");

        /*---------------直接丟擲HystrixBadRequestException,不觸發fallback-----------------*/
        // HystrixBadRequestException,這個是非法引數或非系統錯誤引起,不觸發fallback,也不被計入熔斷器
        throw new HystrixBadRequestException("HystrixBadRequestException not trigger fallback");

//    return "success";
    }

    @Override
    protected String getFallback() {
        return "fallback: " + name;
    }
}

編寫測試類:

/**
 * @author brand
 * @Description: 測試異常/超時 fallBack
 * @Copyright: Copyright (c) 2022
 * @Company: Helenlyn, Inc. All Rights Reserved.
 * @date 2022/1/8 下午5:35
 * @Update Time:
 * @Updater:
 * @Update Comments:
 */
public class ExceptionTimeOutFallBackTest {    
    @Test
    public void testException() throws IOException {
        try {
          assertEquals("success", new HystrixException("Exception").execute());
        } catch(Exception e) {
            System.out.println("run()丟擲HystrixBadRequestException時,會被捕獲到這裡" + e.getCause());
        }
    }
}

測試類執行直接丟擲HystrixBadRequestException,測試類會走到catch函數段中。
測試類執行其他三種情況,會得到以下結果:

4.2.2 呼叫超時fallBack

同上 4.2.1 中的 迴圈+等待,超時fallBack 的場景

4.3 熔斷策略

4.3.1 熔斷實現的基本原理


圖片源自官網,這邊就不單獨畫了。

  1. 斷路器設定了生效閾值,並且在時間窗內的請求數超過閾值:circuitBreaker.requestVolumeThreshold。
    超過之後觸發斷路器開啟,否則不開啟。比如熔斷閾值為100,哪怕你99個都fail,也不會觸發熔斷。
  2. 請求的錯誤率超過錯誤率閾值:errorThresholdPercentage,比如20%,10次有2次就達到要求。
  3. 1與2的條件都滿足的時候,原來關閉的斷路器將開啟。
  4. 斷路器開啟之後,後續請求過來的流量都會被斷開。
  5. 斷路的那段時間我們叫做休眠時間窗:sleepWindowInMilliseconds。 休眠時間窗過去之後,再發起請求,這時候斷路器半開。
    • 請求失敗:斷路器狀態繼續保持未開啟,並更新休眠時間窗。
    • 請求成功:則斷路器狀態改為關閉。

4.3.2 斷路器設定引數說明

key值 說明 預設值
circuitBreaker.enabled 是否開啟斷路器 true
circuitBreaker.requestVolumeThreshold 斷路器啟用請求數閾值 10
circuitBreaker.sleepWindowInMilliseconds 斷路器啟用後的睡眠時間窗 5000(ms)
circuitBreaker.errorThresholdPercentage 斷路器啟用失敗率閾值 50(%)
circuitBreaker.forceOpen 是否強制將斷路器設定成開啟狀態 false
circuitBreaker.forceClosed 是否強制將斷路器設定成關閉狀態 false

4.3.3 測試案例

  • 通過withCircuitBreakerRequestVolumeThreshold設定10s(預設時間窗)內請求數超過10個時熔斷器開始生效
  • 通過withCircuitBreakerErrorThresholdPercentage設定錯誤比例>50%時開始熔斷
  • 然後for迴圈執行execute()觸發run(),在run()裡,如果name是小於30的偶數則正常返回,否則異常
  • 通過多次迴圈後,異常請求佔所有請求的比例將大於50%,就會看到後續請求都不進入run()而是進入getFallback(),因為不再列印"running run():" + name了。
  • 除此之外,hystrix還支援多長時間從熔斷狀態自動恢復等功能,見下文附錄。
/**
 * @author brand
 * @Description: 熔斷
 * @Copyright: Copyright (c) 2022
 * @Company: Helenlyn, Inc. All Rights Reserved.
 * @date 2022/1/8 下午3:41
 * @Update Time:
 * @Updater:
 * @Update Comments:
 */
public class HystrixCircuitBreaker extends HystrixCommand<String> {

    private final String name;

    public HystrixCircuitBreaker(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Group:CircuitBreaker"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("Command:CircuitBreaker"))
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPool:CircuitBreakerTest"))
                        .andThreadPoolPropertiesDefaults(    // 設定執行緒池
                                HystrixThreadPoolProperties.Setter()
                                        .withCoreSize(200)    // 設定執行緒池裡的執行緒數,設定足夠多執行緒,以防未熔斷卻打滿threadpool
                        )
                        .andCommandPropertiesDefaults(    // 設定熔斷器
                                HystrixCommandProperties.Setter()
                                        .withCircuitBreakerEnabled(true)
                                        .withCircuitBreakerRequestVolumeThreshold(10)
                                        .withCircuitBreakerErrorThresholdPercentage(50)
//                		.withCircuitBreakerForceOpen(true)	// true時強制將斷路器設定成開啟狀態,所有請求都將被拒絕,直接到fallback
//                		.withCircuitBreakerForceClosed(true)	// true時強制將斷路器設定成關閉狀態,將忽略所有錯誤
//                		.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)	// 號誌隔離
//                		.withExecutionTimeoutInMilliseconds(5000)
                        )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        System.out.println("running num :" + name);
        int num = Integer.valueOf(name);
        if (num % 2 == 0 && num < 30) {    // 符合條件,直接返回
            return name;
        } else {    // 模擬異常
            int j = 0;
            j = num / j;
        }
        return name;
    }

    @Override
    protected String getFallback() {
        return "CircuitBreaker fallback: " + name;
    }
}

執行結果如下,偶數正常返回,奇數進入熔斷資訊,並且超過30之後全部進入fallBack

4.4 執行緒池/號誌隔離策略

4.4.1 執行緒池隔離策略

執行緒池隔離:不同服務通過使用不同執行緒池,彼此間將不受影響,達到隔離效果。
我們通過andThreadPoolKey設定使用命名為ThreadPoolTest的執行緒池,實現與其他命名的執行緒池天然隔離,如果不設定andThreadPoolKey,也可以則使用withGroupKey設定來命名執行緒池。

4.4.1.1 執行緒池未隔離情況

/**
 * @author brand
 * @Description: 執行緒池隔離
 * @Copyright: Copyright (c) 2022
 * @Company: Helenlyn, Inc. All Rights Reserved.
 * @date 2022/1/8 下午5:58
 * @Update Time:
 * @Updater:
 * @Update Comments:
 */
public class HystrixThreadPool extends HystrixCommand<String> {
    private final String name;
    public HystrixThreadPool(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))  // CommandGroup分組
                .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolTest"))  // 執行緒池key
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(5000)
                )
                .andThreadPoolPropertiesDefaults(
                        HystrixThreadPoolProperties.Setter()
                                .withCoreSize(3)	// 設定執行緒池裡的執行緒數為3。超過3次進行熔斷
                )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        /*---------------如果執行緒數超配,會觸發fallback的case,否則休眠1s,進行正常返回-------------------*/
        TimeUnit.MILLISECONDS.sleep(1000); 
        return name;
    }

    @Override
    protected String getFallback() {
        return "fallback: " + name;
    }
}

測試一下,下面都是使用 HystrixThreadPoolKey 為 ThreadPoolTest的執行緒池命名,所以是公用,會返回fallBack的結果。

  for(int i = 0; i < 3; i++) {
            try {
                Future<String> future = new HystrixThreadPool("thread pool"+i).queue();  // 以非同步非堵塞方式執行run(),所以消耗了3個執行緒
            } catch(Exception e) {
                System.out.println("run()丟擲HystrixBadRequestException時,被捕獲到這裡" + e.getCause());
            }
        }
        for(int i = 0; i < 10; i++) {
            try {
                System.out.println("===========" + new HystrixThreadPool("thread pool").execute());  //上面消耗了所有執行緒,這邊會執行到fallBack中
            } catch(Exception e) {
                System.out.println("run()丟擲HystrixBadRequestException時,被捕獲到這裡" + e.getCause());
            }
        }

4.4.1.2 執行緒池隔離情況

我們做一下調整,讓執行緒池的key(HystrixThreadPoolKey)不一致,再測試是否返回正常的執行結果。

/**
 * @author brand
 * @Description: 執行緒池隔離
 * @Copyright: Copyright (c) 2022
 * @Company: Helenlyn, Inc. All Rights Reserved.
 * @date 2022/1/8 下午5:58
 * @Update Time:
 * @Updater:
 * @Update Comments:
 */
public class HystrixThreadPool extends HystrixCommand<String> {
    private final String name;
    public HystrixThreadPool(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))  // CommandGroup分組
                .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name))  // 執行緒池key,根據請求的入參來算
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(5000)
                )
                .andThreadPoolPropertiesDefaults(
                        HystrixThreadPoolProperties.Setter()
                                .withCoreSize(3)	// 設定執行緒池裡的執行緒數為3。超過3次進行熔斷
                )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        /*---------------如果執行緒數超配,會觸發fallback的case,否則休眠1s,進行正常返回-------------------*/
        TimeUnit.MILLISECONDS.sleep(1000); 
        return name;
    }

    @Override
    protected String getFallback() {
        return "fallback: " + name;
    }
}

測試一下,下面都是使用 HystrixThreadPoolKey 為 ThreadPoolTest的執行緒池命名,所以是公用,會返回fallBack的結果。

  for(int i = 0; i < 3; i++) {
            try {
                Future<String> future = new HystrixThreadPool("thread pool"+i).queue();  // 會有三個執行緒池組 thread pool1、thread poo2、thread pool3,不互相影響,更不會影響下面excute()的執行
            } catch(Exception e) {
                System.out.println("run()丟擲HystrixBadRequestException時,被捕獲到這裡" + e.getCause());
            }
        }
        for(int i = 0; i < 10; i++) {
            try {
                System.out.println("===========" + new HystrixThreadPool("thread pool").execute());  //與上面隔離,所以這邊執行始終不會走到fallBack中
            } catch(Exception e) {
                System.out.println("run()丟擲HystrixBadRequestException時,被捕獲到這裡" + e.getCause());
            }
        }

4.5 程式碼參考

https://github.com/WengZhiHua/Helenlyn.Grocery/tree/master/parent/HystrixDemo