wmproxy
將用Rust
實現http/https
代理, socks5
代理, 反向代理, 靜態檔案伺服器,後續將實現websocket
代理, 內外網穿透等, 會將實現過程分享出來, 感興趣的可以一起造個輪子法
gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
主動可以讓我們掌握好系統的穩定性,假設我們有一條連線不可達,連線超時的判定是5秒,需要檢測失敗3次才認定為失敗,那麼此時從我們開始檢測,到判定失敗需要耗時15秒。
如果此時我們是個高並行的系統,每秒的QPS是1000,我們有三個地址判定,那麼此時我們有1/3的失敗概率。那麼在15秒內,我們會收到15000個請求,會造成5000個請求失敗,如果是重要的資料,我們會丟失很多重要資料。
如果此時使用者端擁有重試機制,那麼使用者端在失敗的時候會發起重試,而且系統可能會反覆的分配到那臺不可達的系統,將會造成短時間內請求數激增,可能引發系統的雪崩。
所以此時我們主動知道目標端的系統穩定性極其重要。
以下是沒有主動健康檢查
如果出錯的時候,一個請求的平均時長可能會達到(1.4s + 5s) / 2 = (3.2s)
,比正常存取多了(3.2 - 1.4) = 1.8s
,節點的宕機會對系統的穩定性產生較大的影響
以下是主動健康檢查,它保證了存取後端伺服器組均是正常的狀態
伺服器2
出錯的時候,主動檢查已經檢查出伺服器2
不可用,負載均衡的時候選擇已經把伺服器2
摘除,所以系統的平均耗時1.4s,系統依然保持穩定
在目前的系統中有以下兩分類:
我們需要從設定中讀出所有的需要健康檢查的型別,即需要去重,把同一個指向的地址過濾掉
設定有可能被重新載入,所以我們需要預留傳送設定的方式(或者後續類似nginx用新開程序的方式則不需要),此處做一個預留。
如何去重
像這種簡單級別的去重通常用HashSet
複雜度為O(1)
或者用簡單的Vec
複雜度為O(n)
,以SocketAddr
的為鍵值,判斷是否有重複的資料。
如何保證不影響主執行緒
把健康請求的方法移到非同步函數,用tokio::spawn
中處理,在健康檢查的情況下保證不影響其它資料處理
如果同時處理多個地址的健康檢查
每一次健康檢查都會在一個非同步函數中執行,在我們呼叫完請求後,我們會對當前該非同步進行tokio::time::sleep
以讓出當前CPU。
如何按指定間隔時間請求
因為每一次健康請求都是在非同步函數中,我們不確認之前的非同步是否完成,所以我們在每次請求前都記錄last_request
,我們在請求前呼叫HealthCheck::check_can_request
判斷當前是否可以傳送請求來保證間隔時間內不多次請求造成伺服器的壓力。
超時連線判定處理
利用tokio::time::timeout
和future
做組合,等超時的時候直接按錯誤處理
主要原始碼定義在check/active.rs
中,主要的定義兩個類
/// 單項健康檢查
#[derive(Debug, Clone)]
pub struct OneHealth {
/// 主動檢查地址
pub addr: SocketAddr,
/// 主動檢查方法, 有http/https/tcp等
pub method: String,
/// 每次檢查間隔
pub interval: Duration,
/// 最後一次記錄時間
pub last_record: Instant,
}
/// 主動式健康檢查
pub struct ActiveHealth {
/// 所有的健康列表
pub healths: Vec<OneHealth>,
/// 接收健康列表,當設定變更時重新載入
pub receiver: Receiver<Vec<OneHealth>>,
}
我們在設定的時候獲取所有需要主動檢查的資料
/// 獲取所有待健康檢查的列表
pub fn get_health_check(&self) -> Vec<OneHealth> {
let mut result = vec![];
let mut already: HashSet<SocketAddr> = HashSet::new();
if let Some(proxy) = &self.proxy {
// ...
}
if let Some(http) = &self.http {
// ...
}
result
}
主要的檢查原始碼,所有的最終資訊都落在HealthCheck
中的靜態變數裡:
pub async fn do_check(&self) -> ProxyResult<()> {
// 防止短時間內健康檢查的連線過多, 做一定的超時處理, 或者等上一條訊息處理完畢
if !HealthCheck::check_can_request(&self.addr, self.interval) {
return Ok(())
}
if self.method.eq_ignore_ascii_case("http") {
match tokio::time::timeout(self.interval + Duration::from_secs(1), self.connect_http()).await {
Ok(r) => match r {
Ok(r) => {
if r.status().is_server_error() {
log::trace!("主動健康檢查:HTTP:{}, 返回失敗:{}", self.addr, r.status());
HealthCheck::add_fall_down(self.addr);
} else {
HealthCheck::add_rise_up(self.addr);
}
}
Err(e) => {
log::trace!("主動健康檢查:HTTP:{}, 發生錯誤:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
}
},
Err(e) => {
log::trace!("主動健康檢查:HTTP:{}, 發生超時:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
},
}
} else {
match tokio::time::timeout(Duration::from_secs(3), self.connect_http()).await {
Ok(r) => {
match r {
Ok(_) => {
HealthCheck::add_rise_up(self.addr);
}
Err(e) => {
log::trace!("主動健康檢查:TCP:{}, 發生錯誤:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
}
}
}
Err(e) => {
log::trace!("主動健康檢查:TCP:{}, 發生超時:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
}
}
}
Ok(())
}
主動檢查可以及時的更早的發現系統中不穩定的因素,是系統穩定性的基石,也可以通過更早的發現因素來通知運維介入,我們的目的是使系統更穩定,更健壯,處理延時更少。
點選 [關注],[在看],[點贊] 是對作者最大的支援