wmproxy
是由Rust
編寫,已實現http/https
代理,socks5
代理, 反向代理,靜態檔案伺服器,內網穿透,設定熱更新等, 後續將實現websocket
代理等,同時會將實現過程分享出來, 感興趣的可以一起造個輪子法
gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
四層代理,也稱為網路層代理,是基於IP地址和埠號的代理方式。它只關心封包的源IP地址、目的IP地址、源埠號和目的埠號,不關心封包的具體內容。四層代理主要通過報文中的目標地址和埠,再加上負載均衡裝置設定的伺服器選擇方式,決定最終選擇的內部伺服器。
因為四層代理不用處理任何相關的包資訊,只需將包資料傳遞給正確的伺服器即可,所以實現相對比較簡單。
以下是OSI七層模型的示意圖,來源於網上
雙端建立連線,也就是收到使用者端的連線的時候,同時建立一條通往伺服器端的連線,然後做雙向繫結即可完成服務。
四層代理還有udp的轉發需求,需要同步將udp的資料進行轉發,udp的處理方式處理會相對複雜一些,因為當前地址只有繫結一份,但是可能來自各種不同的地址,不同的使用者端的(remote_ip
, remote_port
)我們需要當成一個全新的使用者端。
而且有時候無法主動感知是否已經被斷開了,所以也必須有超時機制,好在超時的時候能及時釋放掉連線,好讓系統及時的socket資源。
tcp找到相應的地址,連線,並雙向繫結即可
pub async fn process<T>(
data: Arc<Mutex<StreamConfig>>,
local_addr: SocketAddr,
mut inbound: T,
_addr: SocketAddr,
) -> ProxyResult<()>
where
T: AsyncRead + AsyncWrite + Unpin + std::marker::Send + 'static,
{
let value = data.lock().await;
for (_, s) in value.server.iter().enumerate() {
if s.bind_addr.port() == local_addr.port() {
let addr = ReverseHelper::get_upstream_addr(&s.upstream, "")?;
let mut connect = HealthCheck::connect(&addr).await?;
copy_bidirectional(&mut inbound, &mut connect).await?;
break;
}
}
Ok(())
}
UDP相對比較複雜,下面我們先列舉內部的流程圖
在stream繫結的時候,要區分出TCP還是UDP的,做分別的繫結
/// stream的繫結,按bind_mode區分出udp或者是tcp,返回相應的列表
pub async fn bind(&mut self) -> ProxyResult<(Vec<TcpListener>, Vec<StreamUdp>)> {
let mut listeners = vec![];
let mut udp_listeners = vec![];
let mut bind_port = HashSet::new();
for value in &self.server.clone() {
if bind_port.contains(&value.bind_addr.port()) {
continue;
}
bind_port.insert(value.bind_addr.port());
if value.bind_mode == "udp" {
let listener = Helper::bind_upd(value.bind_addr).await?;
udp_listeners.push(StreamUdp::new(listener, value.clone()));
} else {
let listener = Helper::bind(value.bind_addr).await?;
listeners.push(listener);
}
}
Ok((listeners, udp_listeners))
}
我們會對連線做分別的監聽,下面是udp的獲取是否有新資料:
async fn multi_udp_listen_work(
listens: &mut Vec<StreamUdp>,
) -> (io::Result<(Vec<u8>, SocketAddr)>, usize) {
if !listens.is_empty() {
let (data, index, _) =
select_all(listens.iter_mut().map(|listener| {
listener.next().boxed()
})).await;
if data.is_none() {
return (Err(io::Error::new(io::ErrorKind::InvalidInput, "read none data")), index)
}
(data.unwrap(), index)
} else {
let pend = std::future::pending();
let () = pend.await;
unreachable!()
}
}
此處我們用next,也就是我們實現了 futures_core::Stream
介面,用Poll的方式來註冊實現有事件的時候來通知。
在tokio中,在read或者write的時候返回
Poll::Pending
,將會將socket的可讀可寫註冊到底層,如果一旦系統可讀可寫就會通知該介面,將會重新執行一遍futures_core::Stream
我們將同時可以處理可讀可寫可傳送事件,如果介面超時我們將關閉相應的介面。
impl Stream for StreamUdp {
type Item = io::Result<(Vec<u8>, SocketAddr)>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let _ = self.poll_write(cx)?;
let _ = self.poll_sender(cx)?;
self.poll_read(cx)
}
}
下面是主要的StreamUdp
類
/// Udp轉發的處理結構,快取一些數值以做中轉
pub struct StreamUdp {
/// 讀的緩衝類,避免每次都釋放
pub buf: BinaryMut,
/// 核心的udp繫結埠
pub socket: UdpSocket,
pub server: ServerConfig,
/// 如果接收該資料大小為0,那麼則代表通知資料關閉
pub receiver: Receiver<(Vec<u8>, SocketAddr)>,
/// 將傳送器傳達給每個子協程
pub sender: Sender<(Vec<u8>, SocketAddr)>,
/// 接收的快取資料,無法保證全部直接進行傳送完畢
pub cache_data: LinkedList<(Vec<u8>, SocketAddr)>,
/// 傳送的快取資料,無法保證全部直接進行傳送完畢
pub send_cache_data: LinkedList<(Vec<u8>, SocketAddr)>,
/// 每個地址繫結的物件,包含Sender,最後操作時間,超時時間
remote_sockets: HashMap<SocketAddr, InnerUdp>,
}
我們自己開一個udp伺服器端,繫結了原生的8089
,我們將接收到的資料前面加上from server:
並進行返回,代理端我們繫結了84
的埠,並將udp資料轉發給8089
端:
use tokio::net::UdpSocket;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let sock = UdpSocket::bind("0.0.0.0:8089").await?;
let mut buf = [0; 1024];
loop {
let (len, addr) = sock.recv_from(&mut buf).await?;
let mut vec = "from server: ".as_bytes().to_vec();
vec.extend(&buf[..len]);
let _ = sock.send_to(&vec, addr).await?;
}
}
使用者端我們用nc
執行:
可以看出兩個使用者端互相獨立,彼此返回的資料均符合預期,正常的接收及返回。
TCP我們繫結了83埠並轉發到HTTP的本地埠8080,我們用curl
進行測試,符合預期,如圖:
至此四層的反向代理TCP/UDP均已完成,也符合預期。
點選 [關注],[在看],[點贊] 是對作者最大的支援