31. 乾貨系列從零用Rust編寫正反向代理,HTTP限流的實現(limit_req)

2023-11-30 12:06:27

wmproxy

wmproxy已用Rust實現http/https代理, socks5代理, 反向代理, 靜態檔案伺服器,四層TCP/UDP轉發,七層負載均衡,內網穿透,後續將實現websocket代理等,會將實現過程分享出來,感興趣的可以一起造個輪子

專案地址

國內: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

HTTP限流

HTTP限流是在HTTP請求處理過程中,對請求進行限制的一種技術手段。其目的是防止系統過載,保護系統的穩定性和可用性。HTTP限流可以基於不同的策略和方法,例如基於時間視窗、令牌桶、漏桶等。

常見的HTTP限流方法包括:

  • 基於時間視窗:這種方法將一段時間劃分為若干個時間視窗,每個時間視窗內只允許一定數量的請求通過。例如,每秒只允許20個請求通過。
  • 令牌桶:令牌桶演演算法允許突發流量,只要有令牌就可以處理請求,當沒有令牌時,請求就被拒絕。這種方法適用於處理突發流量的情況。和時間視窗結合時,如果當前時間段已經有20個請求,此時觸發令牌桶brust,將當前的流量進行延時處理。
  • 漏桶:漏桶演演算法不允許突發流量,無論何時都只能按照一定的速率處理請求。這種方法適用於處理穩定流量的情況。

在進行HTTP限流時,需要考慮系統的實際情況和需求,選擇合適的限流策略和方法。同時,還需要對系統的效能和負載進行充分的測試和評估,以確保系統的穩定性和可用性。

方案選擇

在此專案中,選擇的是基於時間視窗及令牌桶做組合使用進行限制,以下做個例子,設定

limit="rate=10r/s brust=10"

效果將是每秒鐘限制10條請求,可以允許突發的10個令牌桶做一秒的延時,在下一秒允許通行。

sequenceDiagram participant C participant S C->>S: 第一秒請求資料10條 Note right of S: 當前記錄請求10條 S->>C: 返回成功 C->>S: 第一秒繼續請求資料1條 Note right of S: 當前記錄請求10條+1條令牌桶 S->>C: 延時一秒再進行後續處理返回成功 C->>S: 第一秒繼續請求資料9條 Note right of S: 當前記錄請求10條+10條令牌桶 S->>C: 延時一秒再進行後續處理返回成功 C->>S: 第一秒繼續請求資料(1條-N條) Note right of S: 當前記錄當前秒已滿,返回拒絕 S->>C: 直接返回錯誤,頻率過快,已超時 C-->>S: 第二秒請求資料1條 Note right of S: 清除第一秒的記錄10條<br>10條令牌桶轉化第二秒10條記錄<br>故當前為請求數10條+1條令牌桶 S->>C: 延時一秒過時行後續處理返回成功 C-->>S: 第三秒請求資料1條 Note right of S: 清除第二秒的記錄10條<br>1條令牌桶轉化第三秒1條記錄<br>故當前為請求數1條 S->>C: 直接返回成功

以上是時序加令牌的請求資料和返回情況

限流設定

類似於nginx中的limit_req設定,分為limit_req_zonelimit_req兩部分,可分為兩個類,一個為zone,一個為關聯到zone名稱的具體專案

#[derive(Debug, Clone)]
pub struct LimitReqZone {
    /// 鍵值的匹配方式
    pub key: String,
    /// IP個數
    pub limit: u64,
    /// 週期內可以通行的資料
    pub nums: u64,
    /// 每個週期的時間
    pub per: Duration,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LimitReq {
    zone: String,
    burst: u64,
}

然後在http的根目錄下設定當前的zone空間,為一個HashMap結構,可以設定多種zone結構

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpConfig {
    // ...
    #[serde_as(as = "HashMap<_, DisplayFromStr>")]
    #[serde(default = "HashMap::new")]
    pub limit_req_zone: HashMap<String, LimitReqZone>,
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CommonConfig {
    // ...
    #[serde_as(as = "Option<DisplayFromStr>")]
    pub limit_req: Option<LimitReq>,
}

因為並不是任何的請求都要進行限流,所以此處為Option,如果子級未設定,父級有設定,子級將會應用父級的設定。

以下展示在toml格式的設定

# 反向代理相關,七層協定為http及https
[http]

[http.limit_req_zone]
limit = "{client_ip} limit=10m rate=10r/s"
less = "{client_ip} limit=10m rate=10r/min"


# 反向代理中的具體服務,可設定多個多組
[[http.server]]
bind_addr = "0.0.0.0:82"
server_name = "soft.wm-proxy.com"
limit_req = "zone=limit brust=10"

# 按請求路徑進行rule匹配,可匹配method,看具體的處理的內容如檔案服務或者負載均衡
[[http.server.location]]
limit_req = "zone=less brust=1"
rule = "/root"
file_server = { browse = true }

[[http.server.location]]
rule = "/api"
file_server = { browse = true }

這樣子就可以實現api不同的進行不同的限速方案,可以實現更好的通用效果。

設定解析

  • LimitReqZone解析
    需要將"{client_ip} limit=10m rate=10r/s"轉成LimitReqZone結構,此處我們用的是FromStr介面,用空格分割,第一個欄位為key,後續用=做分割,得取相應的值
impl FromStr for LimitReqZone {
    type Err = ProxyError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let v = s.split(" ").collect::<Vec<&str>>();
        let key = v[0].to_string();
        let mut limit = 0;
        let mut nums = 0;
        let mut per = Duration::new(0, 0);
        for idx in 1..v.len() {
            let key_value = v[idx].split("=").map(|k| k.trim()).collect::<Vec<&str>>();
            if key_value.len() <= 1 {
                return Err(ProxyError::Extension("未知的LimitReq"));
            }
            match key_value[0] {
                "limit" => {
                    let s = ConfigSize::from_str(key_value[1])?;
                    limit = s.0;
                }
                "rate" => {
                    let rate_key = key_value[1]
                        .split("/")
                        .map(|k| k.trim())
                        .collect::<Vec<&str>>();
                    if rate_key.len() == 1 {
                        return Err(ProxyError::Extension("未知的LimitReq"));
                    }

                    let rate = rate_key[0].trim_end_matches("r");
                    nums = rate
                        .parse::<u64>()
                        .map_err(|_e| ProxyError::Extension("parse error"))?;
                    
                    let s = ConfigDuration::from_str(rate_key[1])?;
                    per = s.0;
                }
                _ => {
                    return Err(ProxyError::Extension("未知的LimitReq"));
                }
            }
        }

        Ok(LimitReqZone::new(key, limit, nums, per))
    }
}
  • LimitReq解析

需要將"zone=less brust=1"轉成LimitReq結構,此處我們用的是FromStr介面,用空格分割,將每個值用=做分割,得取相應的值

impl FromStr for LimitReq {
    type Err = ProxyError;
    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let v = s.split(" ").collect::<Vec<&str>>();
        let mut zone = String::new();
        let mut brust = 0;
        for idx in 0..v.len() {
            let key_value = v[idx].split("=").map(|k| k.trim()).collect::<Vec<&str>>();
            if key_value.len() <= 1 {
                return Err(ProxyError::Extension("未知的LimitReq"));
            }
            match key_value[0] {
                "zone" => {
                    zone = key_value[1].to_string();
                }
                "brust" => {
                    brust = key_value[1]
                        .parse::<u64>()
                        .map_err(|_e| ProxyError::Extension("parse error"))?;
                }
                _ => {
                    return Err(ProxyError::Extension("未知的LimitReq"));
                }
            }
        }

        Ok(LimitReq::new(zone, brust))
    }
}

限制實現

首先我們設定一個靜態可存取的全域性變數,因為所有的執行緒操作都需要彙總到此時判定是否合格

每個名稱空間裡,都將儲存不超過規格資料的IP,如果超過將直接返回失敗

pub struct LimitReqData {
    /// 記錄所有的ip資料的限制情況
    ips: HashMap<String, InnerLimit>,
    /// IP個數
    limit: u64,
    /// 週期內可以通行的資料
    nums: u64,
    /// 每個週期的時間
    per: Duration,
    /// 最後清理IP的時間
    last_remove: Instant,
}

全域性靜態資料

lazy_static! {
    static ref GLOABL_LIMIT_REQ: RwLock<HashMap<&'static str, LimitReqData>> =
        RwLock::new(HashMap::new());
}

返回結果

#[derive(Debug)]
pub enum LimitResult {
    Ok,
    Refuse,
    Delay(Duration),
}

所以的判斷是否通過,我們將通過以下函數返回相應的結果,從而使外部的函數可以進行相應的處理。

impl LimitReqData {
    pub fn recv_new_req(key: &str, ip: &String, burst: u64) -> ProtResult<LimitResult> {
        let mut write = GLOBAL_LIMIT_REQ
            .write()
            .map_err(|_| ProtError::Extension("unlock error"))?;
        if !write.contains_key(&*key) {
            return Ok(LimitResult::Ok);
        }
        write.get_mut(key).unwrap().inner_recv_new_req(ip, burst)
    }
}

小結

我們通過全域性共用資料,需要加鎖獲取該資料,來判定整體的KEY的流量情況,可能是IP,可能是IP+Cookie等,來靈活的針對使用者限流還是針對IP限流或者其它的業務情況進行合理的安排。

點選 [關注][在看][點贊] 是對作者最大的支援