18. 從零開始編寫一個類nginx工具, 主動式健康檢查原始碼實現

2023-10-26 12:02:35

wmproxy

wmproxy將用Rust實現http/https代理, socks5代理, 反向代理, 靜態檔案伺服器,後續將實現websocket代理, 內外網穿透等, 會將實現過程分享出來, 感興趣的可以一起造個輪子法

專案地址

gite: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

為什麼我們需要主動

  主動可以讓我們掌握好系統的穩定性,假設我們有一條連線不可達,連線超時的判定是5秒,需要檢測失敗3次才認定為失敗,那麼此時從我們開始檢測,到判定失敗需要耗時15秒。
  如果此時我們是個高並行的系統,每秒的QPS是1000,我們有三個地址判定,那麼此時我們有1/3的失敗概率。那麼在15秒內,我們會收到15000個請求,會造成5000個請求失敗,如果是重要的資料,我們會丟失很多重要資料。
  如果此時使用者端擁有重試機制,那麼使用者端在失敗的時候會發起重試,而且系統可能會反覆的分配到那臺不可達的系統,將會造成短時間內請求數激增,可能引發系統的雪崩。
  所以此時我們主動知道目標端的系統穩定性極其重要。

網路存取示意圖

以下是沒有主動健康檢查

sequenceDiagram participant 使用者端 participant 代理伺服器 使用者端->>代理伺服器: 請求資料(0.5s) 代理伺服器->>後端1: 連線並請求資料(5s)失敗 Note right of 後端1: 機器宕機不可達 代理伺服器-->>使用者端: 返回失敗0.5s(總耗時6s) 使用者端->>代理伺服器: 重新請求資料(0.5s) 代理伺服器->>後端2: 請求資料成功(0.2s) 後端2-->>代理伺服器: 返回資料成功(0.2s) 代理伺服器-->> 使用者端: 返回資料成功0.5s(總耗時1.4s)

如果出錯的時候,一個請求的平均時長可能會達到(1.4s + 5s) / 2 = (3.2s),比正常存取多了(3.2 - 1.4) = 1.8s,節點的宕機會對系統的穩定性產生較大的影響

以下是主動健康檢查,它保證了存取後端伺服器組均是正常的狀態

sequenceDiagram 使用者端->>代理伺服器: 請求資料(0.5s) loop 健康檢查 代理伺服器->>伺服器組(只存取1): 定時請求,保證存活,1檢查成功,2檢查失敗 end Note right of 伺服器組(只存取1): 處理使用者端資料 代理伺服器 -->> 伺服器組(只存取1): 請求資料(0.2s) 伺服器組(只存取1) -->> 代理伺服器: 返回資料成功(0.2s) 代理伺服器-->>使用者端: 返回資料成功(0.5s)(總耗時1.4s)

伺服器2出錯的時候,主動檢查已經檢查出伺服器2不可用,負載均衡的時候選擇已經把伺服器2摘除,所以系統的平均耗時1.4s,系統依然保持穩定

健康檢查的種類

在目前的系統中有以下兩分類:

  • HTTP 請求特定的方法及路徑,判斷返回是否得到預期的status或者body
  • TCP 僅只能測試連通性,如果能連線表示正常,會出現能連線但無服務的情況

健康檢查的準備

我們需要從設定中讀出所有的需要健康檢查的型別,即需要去重,把同一個指向的地址過濾掉
設定有可能被重新載入,所以我們需要預留傳送設定的方式(或者後續類似nginx用新開程序的方式則不需要),此處做一個預留。

  • 如何去重
    像這種簡單級別的去重通常用HashSet複雜度為O(1)或者用簡單的Vec複雜度為O(n),以SocketAddr的為鍵值,判斷是否有重複的資料。

  • 如何保證不影響主執行緒
    把健康請求的方法移到非同步函數,用tokio::spawn中處理,在健康檢查的情況下保證不影響其它資料處理

  • 如果同時處理多個地址的健康檢查
    每一次健康檢查都會在一個非同步函數中執行,在我們呼叫完請求後,我們會對當前該非同步進行tokio::time::sleep以讓出當前CPU。

  • 如何按指定間隔時間請求
    因為每一次健康請求都是在非同步函數中,我們不確認之前的非同步是否完成,所以我們在每次請求前都記錄last_request,我們在請求前呼叫HealthCheck::check_can_request判斷當前是否可以傳送請求來保證間隔時間內不多次請求造成伺服器的壓力。

  • 超時連線判定處理
    利用tokio::time::timeoutfuture做組合,等超時的時候直接按錯誤處理

部分實現原始碼

主要原始碼定義在check/active.rs中,主要的定義兩個類

/// 單項健康檢查
#[derive(Debug, Clone)]
pub struct OneHealth {
    /// 主動檢查地址
    pub addr: SocketAddr,
    /// 主動檢查方法, 有http/https/tcp等
    pub method: String,
    /// 每次檢查間隔
    pub interval: Duration,
    /// 最後一次記錄時間
    pub last_record: Instant,
}
/// 主動式健康檢查
pub struct ActiveHealth {
    /// 所有的健康列表
    pub healths: Vec<OneHealth>,
    /// 接收健康列表,當設定變更時重新載入
    pub receiver: Receiver<Vec<OneHealth>>,
}

我們在設定的時候獲取所有需要主動檢查的資料

/// 獲取所有待健康檢查的列表
pub fn get_health_check(&self) -> Vec<OneHealth> {
    let mut result = vec![];
    let mut already: HashSet<SocketAddr> = HashSet::new();
    if let Some(proxy) = &self.proxy {
        // ...
    }

    if let Some(http) = &self.http {
        // ...
    }
    result
}

主要的檢查原始碼,所有的最終資訊都落在HealthCheck中的靜態變數裡:

pub async fn do_check(&self) -> ProxyResult<()> {
    // 防止短時間內健康檢查的連線過多, 做一定的超時處理, 或者等上一條訊息處理完畢
    if !HealthCheck::check_can_request(&self.addr, self.interval) {
        return Ok(())
    }
    if self.method.eq_ignore_ascii_case("http") {
        match tokio::time::timeout(self.interval + Duration::from_secs(1), self.connect_http()).await {
            Ok(r) => match r {
                Ok(r) => {
                    if r.status().is_server_error() {
                        log::trace!("主動健康檢查:HTTP:{}, 返回失敗:{}", self.addr, r.status());
                        HealthCheck::add_fall_down(self.addr);
                    } else {
                        HealthCheck::add_rise_up(self.addr);
                    }
                }
                Err(e) => {
                    log::trace!("主動健康檢查:HTTP:{}, 發生錯誤:{:?}", self.addr, e);
                    HealthCheck::add_fall_down(self.addr);
                }
            },
            Err(e) => {
                log::trace!("主動健康檢查:HTTP:{}, 發生超時:{:?}", self.addr, e);
                HealthCheck::add_fall_down(self.addr);
            },
        }
    } else {
        match tokio::time::timeout(Duration::from_secs(3), self.connect_http()).await {
            Ok(r) => {
                match r {
                    Ok(_) => {
                        HealthCheck::add_rise_up(self.addr);
                    }
                    Err(e) => {
                        log::trace!("主動健康檢查:TCP:{}, 發生錯誤:{:?}", self.addr, e);
                        HealthCheck::add_fall_down(self.addr);
                    }
                }
            }
            Err(e) => {
                log::trace!("主動健康檢查:TCP:{}, 發生超時:{:?}", self.addr, e);
                HealthCheck::add_fall_down(self.addr);
            }
        }
    }
    Ok(())
}

結語

主動檢查可以及時的更早的發現系統中不穩定的因素,是系統穩定性的基石,也可以通過更早的發現因素來通知運維介入,我們的目的是使系統更穩定,更健壯,處理延時更少。

點選 [關注][在看][點贊] 是對作者最大的支援