17. 從零開始編寫一個類nginx工具, Rust中一些功能的實現

2023-10-24 18:00:41

wmproxy

wmproxy將用Rust實現http/https代理, socks5代理, 反向代理, 靜態檔案伺服器,後續將實現websocket代理, 內外網穿透等, 會將實現過程分享出來, 感興趣的可以一起造個輪子法

專案地址

gite: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

紀錄檔功能

為了更容易理解程式中發生的情況,我們可能想要新增一些紀錄檔語句。通常在編寫應用程式時這很容易。「在某種程度上,紀錄檔記錄與使用 println! 相同,只是你可以指定訊息的重要性」。
在rust中定義的紀錄檔級別有5種分別為errorwarninfodebugtrace
定義紀錄檔的級別是表示只關係這級別的紀錄檔及更高階別的紀錄檔:

定義log,則包含所有的級別
定義warn,則只會顯示error或者warn的訊息

要嚮應用程式新增紀錄檔記錄,你需要兩樣東西:

  1. log crate,rust官方指定的紀錄檔級別庫
  2. 一個實際將紀錄檔輸出寫到有用位置的介面卡

當下我們選用的是流行的根據環境變數指定的介面卡env_logger,它會根據環境變數中設定的值,紀錄檔等級,或者只開啟指定的庫等功能,或者不同的庫分配不同的等級等。

Linux或者MacOs上開啟功能

env RUST_LOG=debug cargo run 

Windows PowerShell上開啟功能

$env:RUST_LOG="debug"
cargo run

Windows CMD上開啟功能

set RUST_LOG="debug"
cargo run

如果我們指定庫等級可以設定

RUST_LOG="info,wenmeng=warn,webparse=warn"

這樣就可以減少第三方庫打紀錄檔給程式帶來的干擾

需要在Cargo.toml中參照

[dependencies]
log = "0.4.20"
env_logger = "0.10.0"

以下是示意程式碼

use log::{info, warn};
fn main() {
    env_logger::init();
    info!("歡迎使用軟體wmproxy");
    warn!("現在已經成功啟動");
}

println!將會直接輸出到stdout,當紀錄檔資料多的時候,無法進行關閉,做為第三方庫,就不能干擾參照庫的正常看紀錄檔,所以這隻能偵錯的時候使用,或者少量的關鍵地方使用。

多個TcpListener的Accept

因為當前支援多個埠繫結,或者設定沒有設定,存在None的情況,我們需要同時在一個執行緒中await所有的TcpListener。
在這裡我們先用的是tokio::select!對多個TcpListener同時進行await。
如果此時我們沒有繫結proxy的繫結地址,此時listener為None,但我們需要進行判斷才知道他是否為None,如果我們用以下寫法:

use tokio::net::TcpListener;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut listener: Option<TcpListener> = None;
    tokio::select! {
        // 加了if條件判斷是否有值
        Ok((conn, addr)) = listener.as_mut().unwrap().accept(), if listener.is_some() => {
            println!("accept addr = {:?}", addr);
        }
    }
    Ok(())
}

此時我們試執行,依然報以下錯誤:

thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', examples/udp.rs:9:46

也就是即使加了if條件我們也正確的執行我們的操作,因為tokio::select的每個分支必須返回Fut,此時如果為None,就不能返回Fut違反了該函數的定義,那麼我們做以下封裝:

async fn tcp_listen_work(listen: &Option<TcpListener>) -> Option<(TcpStream, SocketAddr)> {
    if listen.is_some() {
        match listen.as_ref().unwrap().accept().await {
            Ok((tcp, addr)) => Some((tcp, addr)),
            Err(_e) => None,
        }
    } else {
        // 如果為None的時候,就永遠返回Poll::Pending
        let pend = std::future::pending();
        let () = pend.await;
        None
    }
}

如果為None的話,將其返回Poll::Pending,則該分支await的時候永遠不會等到結果。
那麼最終的的程式碼示意如下:

#[tokio::main]
async fn main() -> io::Result<()> {

    let listener: Option<TcpListener> = TcpListener::bind("127.0.0.1:8090").await.ok();
    tokio::select! {
        Some((conn, addr)) = tcp_listen_work(&listener) => {
            println!("accept addr = {:?}", addr);
        }
    }
    Ok(())
}

另一種在反向代理的時候因為server的數量是不定的,所以監聽的TcpListener也是不定的,此時我們用Vec<TcpListener>來做表示,那麼此時,我們如何通過tokio::select來一次性await所有的accept呢?
此時我們藉助futures庫中的select_all來監聽,但是select_all又不允許空的Vec,因為他要返回一個Fut,空的無法返回一個Fut,所以此時我們也要對其進行封裝:

async fn multi_tcp_listen_work(listens: &mut Vec<TcpListener>) -> (io::Result<(TcpStream, SocketAddr)>, usize) {
    if !listens.is_empty() {
        let (conn, index, _) = select_all(listens.iter_mut()
                .map(|listener| listener.accept().boxed())).await;
        (conn, index)
    } else {
        let pend = std::future::pending();
        let () = pend.await;
        unreachable!()
    }
}

此時監聽從8091-8099,我們的最終程式碼:

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener: Option<TcpListener> = TcpListener::bind("127.0.0.1:8090").await.ok();
    let mut listeners = vec![];
    for i in 8091..8099 {
        listeners.push(TcpListener::bind(format!("127.0.0.1:{}", i)).await?);
    }
    tokio::select! {
        Some((conn, addr)) = tcp_listen_work(&listener) => {
            println!("accept addr = {:?}", addr);
        }
        (result, index) = multi_tcp_listen_work(&mut listeners) => {
            println!("index receiver = {:?}", index)
        }
    }
    Ok(())
}

如果此時我們用

telnet 127.0.0.1 8098

那麼我們就可以看到輸出:

index receiver = 7

表示程式碼已正確的執行。

Rust中資料在多個執行緒中的共用

Rust中每個物件的所有權都僅只能有一個物件擁有,那麼我們資料在在多個地方共用的時候可以怎麼辦呢?
在單執行緒中,我們可以用use std::rc::Rc;

Rc的特點

  1. 單執行緒的參照計數
  2. 不可變參照
  3. 非執行緒安全,即僅能在單執行緒中使用
    Rc參照計數中還有一個弱參照稱為Weak,弱參照表示持有物件的一個指標,但是不新增參照計數,也不會影響資料刪除,不保證一定能取得到資料。
    因為其不能修改資料,所以也常用RefCell做配合,來做參照計數的修改。
    以下是一個父類別子類用弱參照計數實現的方案:
use std::rc::Rc;
use std::rc::Weak;
use std::cell::RefCell;

/// 父類別擁有者
struct Owner {
    name: String,
    gadgets: RefCell<Vec<Weak<Gadget>>>,
}

/// 子類物件
struct Gadget {
    id: i32,
    owner: Rc<Owner>,
}

fn main() {
    let gadget_owner: Rc<Owner> = Rc::new(
        Owner {
            name: "wmproxy".to_string(),
            gadgets: RefCell::new(vec![]),
        }
    );
    
    // 生成兩個小工具
    let gadget1 = Rc::new(
        Gadget {
            id: 1,
            owner: Rc::clone(&gadget_owner),
        }
    );
    let gadget2 = Rc::new(
        Gadget {
            id: 2,
            owner: Rc::clone(&gadget_owner),
        }
    );

    {
        let mut gadgets = gadget_owner.gadgets.borrow_mut();
        gadgets.push(Rc::downgrade(&gadget1));
        gadgets.push(Rc::downgrade(&gadget2));
    }

    for gadget_weak in gadget_owner.gadgets.borrow().iter() {
        let gadget = gadget_weak.upgrade().unwrap();
        println!("小工具 {} 的擁有者:{}", gadget.id, gadget.owner.name);
    }
}

因為其並未實現Send函數,所以無法在多執行緒種傳遞。在多執行緒中,我們需要用Arc,但是在Arc獲取可變物件的時候有限制,必須他是唯一參照的時候才能修改。

use std::sync::Arc;
fn main() {
    let mut x = Arc::new(3);
    *Arc::get_mut(&mut x).unwrap() = 4;
    assert_eq!(*x, 4);
    
    let _y = Arc::clone(&x);
    assert!(Arc::get_mut(&mut x).is_none());
}

所以我們在多執行緒中的參照需要修改的時候,通常會用Atomic或者Mutex來做資料的寫入的唯一性。

#![allow(unused)]
fn main() {
    use std::sync::{Arc, Mutex};
    use std::thread;
    use std::sync::mpsc::channel;
    
    const N: usize = 10;
    
    let data = Arc::new(Mutex::new(0));
    
    let (tx, rx) = channel();
    for _ in 0..N {
        let (data, tx) = (Arc::clone(&data), tx.clone());
        thread::spawn(move || {
            // 共用資料data,保證線上程中只會同時有一個物件擁有修改許可權,也相當於擁有所有權,10個執行緒,每個執行緒+1,最終結果必須等於10
            let mut data = data.lock().unwrap();
            *data += 1;
            if *data == N {
                tx.send(()).unwrap();
            }
        });
    }
    rx.recv().unwrap();
    assert!(*data.lock().unwrap() == 10);
}

結語

以上是三種編寫Rust中常碰見的情況,也是在此專案中應用解決過的方案,在瞭解原理的情況下,解決問題可以有不同的思路。理解了原理,你就知道他設計的初衷,更好的幫助你學習相關的Rust知識。