20. 從零用Rust編寫正反向代理,四層反向代理stream(tcp與udp)實現

2023-10-30 12:03:09

wmproxy

wmproxy是由Rust編寫,已實現http/https代理,socks5代理, 反向代理,靜態檔案伺服器,內網穿透,設定熱更新等, 後續將實現websocket代理等,同時會將實現過程分享出來, 感興趣的可以一起造個輪子法

專案地址

gite: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

四層代理

四層代理,也稱為網路層代理,是基於IP地址和埠號的代理方式。它只關心封包的源IP地址、目的IP地址、源埠號和目的埠號,不關心封包的具體內容。四層代理主要通過報文中的目標地址和埠,再加上負載均衡裝置設定的伺服器選擇方式,決定最終選擇的內部伺服器。

因為四層代理不用處理任何相關的包資訊,只需將包資料傳遞給正確的伺服器即可,所以實現相對比較簡單。

以下是OSI七層模型的示意圖,來源於網上

實現方式

雙端建立連線,也就是收到使用者端的連線的時候,同時建立一條通往伺服器端的連線,然後做雙向繫結即可完成服務。

四層代理還有udp的轉發需求,需要同步將udp的資料進行轉發,udp的處理方式處理會相對複雜一些,因為當前地址只有繫結一份,但是可能來自各種不同的地址,不同的使用者端的(remote_ip, remote_port)我們需要當成一個全新的使用者端。

而且有時候無法主動感知是否已經被斷開了,所以也必須有超時機制,好在超時的時候能及時釋放掉連線,好讓系統及時的socket資源。

TCP實現

tcp找到相應的地址,連線,並雙向繫結即可

pub async fn process<T>(
    data: Arc<Mutex<StreamConfig>>,
    local_addr: SocketAddr,
    mut inbound: T,
    _addr: SocketAddr,
) -> ProxyResult<()>
where
    T: AsyncRead + AsyncWrite + Unpin + std::marker::Send + 'static,
{
    let value = data.lock().await;
    for (_, s) in value.server.iter().enumerate() {
        if s.bind_addr.port() == local_addr.port() {
            let addr = ReverseHelper::get_upstream_addr(&s.upstream, "")?;
            let mut connect = HealthCheck::connect(&addr).await?;
            copy_bidirectional(&mut inbound, &mut connect).await?;
            break;
        }
    }
    Ok(())
}

UDP實現

UDP相對比較複雜,下面我們先列舉內部的流程圖

flowchart TD A[繫結反向udp埠] B[使用者端] H{是否第一次} I[建立非同步協程] D[非同步協程中] B <-->|根據地址連線傳送資料到| A A --> H H -->|是|I I -->|將Receiver傳到以接收資料| D H -->|否,將資料Sender給|D D -->|非同步讀取資料並行送|A

在stream繫結的時候,要區分出TCP還是UDP的,做分別的繫結

/// stream的繫結,按bind_mode區分出udp或者是tcp,返回相應的列表
pub async fn bind(&mut self) -> ProxyResult<(Vec<TcpListener>, Vec<StreamUdp>)> {
    let mut listeners = vec![];
    let mut udp_listeners = vec![];
    let mut bind_port = HashSet::new();
    for value in &self.server.clone() {
        if bind_port.contains(&value.bind_addr.port()) {
            continue;
        }
        bind_port.insert(value.bind_addr.port());
        if value.bind_mode == "udp" {
            let listener = Helper::bind_upd(value.bind_addr).await?;
            udp_listeners.push(StreamUdp::new(listener, value.clone()));
        } else {
            let listener = Helper::bind(value.bind_addr).await?;
            listeners.push(listener);
        }
    }
    Ok((listeners, udp_listeners))
}

我們會對連線做分別的監聽,下面是udp的獲取是否有新資料:

async fn multi_udp_listen_work(
    listens: &mut Vec<StreamUdp>,
) -> (io::Result<(Vec<u8>, SocketAddr)>, usize) {
    if !listens.is_empty() {
        let (data, index, _) =
            select_all(listens.iter_mut().map(|listener| {
                listener.next().boxed()
            })).await;
        if data.is_none() {
            return (Err(io::Error::new(io::ErrorKind::InvalidInput, "read none data")), index)
        }
        (data.unwrap(), index)
    } else {
        let pend = std::future::pending();
        let () = pend.await;
        unreachable!()
    }
}

此處我們用next,也就是我們實現了 futures_core::Stream介面,用Poll的方式來註冊實現有事件的時候來通知。

在tokio中,在read或者write的時候返回Poll::Pending,將會將socket的可讀可寫註冊到底層,如果一旦系統可讀可寫就會通知該介面,將會重新執行一遍futures_core::Stream

我們將同時可以處理可讀可寫可傳送事件,如果介面超時我們將關閉相應的介面。

impl Stream for StreamUdp {
    type Item = io::Result<(Vec<u8>, SocketAddr)>;
    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let _ = self.poll_write(cx)?;
        let _ = self.poll_sender(cx)?;
        self.poll_read(cx)
    }
}

下面是主要的StreamUdp

/// Udp轉發的處理結構,快取一些數值以做中轉
pub struct StreamUdp {
    /// 讀的緩衝類,避免每次都釋放
    pub buf: BinaryMut,
    /// 核心的udp繫結埠
    pub socket: UdpSocket,
    pub server: ServerConfig,

    /// 如果接收該資料大小為0,那麼則代表通知資料關閉
    pub receiver: Receiver<(Vec<u8>, SocketAddr)>,
    /// 將傳送器傳達給每個子協程
    pub sender: Sender<(Vec<u8>, SocketAddr)>,

    /// 接收的快取資料,無法保證全部直接進行傳送完畢
    pub cache_data: LinkedList<(Vec<u8>, SocketAddr)>,
    /// 傳送的快取資料,無法保證全部直接進行傳送完畢
    pub send_cache_data: LinkedList<(Vec<u8>, SocketAddr)>,
    /// 每個地址繫結的物件,包含Sender,最後操作時間,超時時間
    remote_sockets: HashMap<SocketAddr, InnerUdp>,
}

結果測試

我們自己開一個udp伺服器端,繫結了原生的8089,我們將接收到的資料前面加上from server:並進行返回,代理端我們繫結了84的埠,並將udp資料轉發給8089端:

use tokio::net::UdpSocket;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let sock = UdpSocket::bind("0.0.0.0:8089").await?;
    let mut buf = [0; 1024];
    loop {
        let (len, addr) = sock.recv_from(&mut buf).await?;
        let mut vec = "from server: ".as_bytes().to_vec();
        vec.extend(&buf[..len]);
        let _ = sock.send_to(&vec, addr).await?;
    }
}

使用者端我們用nc執行:

可以看出兩個使用者端互相獨立,彼此返回的資料均符合預期,正常的接收及返回。

TCP我們繫結了83埠並轉發到HTTP的本地埠8080,我們用curl進行測試,符合預期,如圖:

結語

至此四層的反向代理TCP/UDP均已完成,也符合預期。

點選 [關注][在看][點贊] 是對作者最大的支援