gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
但凡代理之類,基本上都有修改頭引數的需求,就比如要獲取使用者端的真實IP,需要寫入
x-forward-for
表示使用者端的真實IP,要不然經過轉發後的HTTP無法獲取真實的使用者端地址。
所以需要在轉發的同時能進行處理頭部資訊的相關引數。故內網端不能僅做流量轉發。而且使用者端可能直接以純HTTP2
的協定請求內網的資料,所以同時需要支援HTTP/1.1及HTTP2
,由於以上需求,我們把之前的簡單的轉發邏輯改成以伺服器端接收使用者端請求的模式對資料進行重加工。
以下是資料從外網進入到內網伺服器的加工流程
以下是內網伺服器返回資料給外網使用者端的流程
如果我們這麼操作,當封包非常的大的時候例如1G,我們此時在記憶體中將有完整的1G記憶體,那麼此時只需有數個同一類的請求,將會耗盡我們的記憶體,所以我們必須不能這麼處理。
超大檔案必須將得到的資料及時的轉發給使用者端,此時在記憶體中的值才不至於太大,又能及時的傳輸給使用者端,要不然可能大檔案下載到中轉伺服器的時間內使用者端得不到任何資料就會空耗掉這時間。
因為http/1.1的chunked協定,由RFC 2616
定義,
分塊編碼(Transfer-Encoding: chunked)是超文字傳輸協定(HTTP)中的一種資料傳輸機制,允許HTTP由網頁伺服器傳送給使用者端的資料可以分成多個部分。分塊傳輸編碼只在HTTP協定1.1版本(HTTP/1.1)中提供,如果頭部中有該選項,則代表封包是chunked
格式。
資料分解成一系列資料塊,並以一個或多個塊傳送,這樣伺服器可以傳送資料而不需要預先知道傳送內容的總大小。
比如我們常看到的
for data in res.chunk() {
}
就是表示的是資料分段接收,對於巨量資料這個尤為重要。
此種報文的範例
這時,報文中的實體需要改為用一系列分塊來傳輸。
每個分塊包含十六進位制的長度值和資料,長度值獨佔一行,長度不包括它結尾的 CRLF(\r\n),也不包括分塊資料結尾的 CRLF。
最後一個分塊長度值必須為 0,對應的分塊資料沒有內容,表示實體結束。
例:
HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
a\r\n
01234567890\r\n
1e\r\n
wmproxy is very good nat tool\r\n
0\r\n
\r\n
此種報文中我們必須進行解析,因為使用者端可能是
keep-alive
選項,可以連續進行多發。所以收到的Request和Response都是連續的。必須知道何處結束才能繼續解析下一個Request/Response。http2不需要,因為http2自帶的data分包機制就有這些資料的處理
Request
的重寫,另一部分是對返回Response
的重寫。所以我們必須同時支援這兩種,且將其區分出來。每條header資訊我們將定定義一個可變長的陣列,如第一個字元為proxy
則表示對Request
修改。x-forward-for
需要末尾新增,我們用操作符+
,比如[proxy, +, x-forward-for, $client_ip]
+
,如[-, hidden]
[custom-key, custom-value]
?
,如[?, server, wmproxy]
所以我們client.yaml的設定新增至如下:
# 連線伺服器端地址
server: 127.0.0.1:8091
# 連線伺服器端是否加密
ts: true
# 內網對映設定的陣列
mappings:
#將localhost的域名轉發到原生的127.0.0.1:8080
- name: web
mode: http
local_addr: 127.0.0.1:8080
domain: localhost
headers:
- [proxy, +, x-forward-for, $client_ip]
- [-, hidden]
- [custom-key, custom-value]
- [?, server, wmproxy]
mappings的結構修改
pub struct MappingConfig {
pub name: String,
pub mode: String,
pub local_addr: Option<SocketAddr>,
#[serde(default = "default_domain")]
pub domain: String,
#[serde(default = "default_header")]
pub headers: Vec<Vec<String>>,
}
我們把headers定義成一個動態的陣列。根據不同的型別做不同的資料,因為長度有變化所以做不定長引數。
以下是程式碼解析
pub fn parse<T: Buf>(header: ProtFrameHeader, mut buf: T) -> ProxyResult<ProtMapping> {
must_have!(buf, 2)?;
let len = buf.get_u16() as usize;
let mut mappings = vec![];
for _ in 0..len {
let name = read_short_string(&mut buf)?;
let mode = read_short_string(&mut buf)?;
let domain = read_short_string(&mut buf)?;
let mut headers = vec![];
must_have!(buf, 2)?;
let len = buf.get_u16();
for _ in 0 .. len {
let mut header = vec![];
must_have!(buf, 1)?;
let sub_len = buf.get_u8();
for _ in 0..sub_len {
header.push(read_short_string(&mut buf)?);
}
headers.push(header);
}
mappings.push(MappingConfig::new(name, mode, domain, headers));
}
Ok(ProtMapping {
sock_map: header.sock_map(),
mappings,
})
}
如此解析成一個完整的對應域名的結構,因為伺服器端用不到local_addr所以不做傳輸。
核心處理程式碼在
trans/http.rs
下,外部傳入一個可讀可寫的stream,可能是TcpStream
也可能是TlsStream<TcpStream>
或者其它,同時把接收的SocketAddr
傳入,以方便後續獲取$client_ip
的標頭檔案資訊。
預處理
pub async fn process<T>(self, inbound: T, addr: SocketAddr) -> Result<(), ProxyError<T>>
where
T: AsyncRead + AsyncWrite + Unpin + Debug,
{
println!("new process {:?}", inbound);
let build = Client::builder();
let (virtual_sender, virtual_receiver) = channel::<ProtFrame>(10);
let stream = VirtualStream::new(self.sock_map, self.sender.clone(), virtual_receiver);
let mut client = Client::new(build.value().ok().unwrap(), stream);
let (receiver, sender) = client.split().unwrap();
let oper = HttpOper {
receiver,
sender,
sender_work: self.sender_work.clone(),
virtual_sender: Some(virtual_sender),
sock_map: self.sock_map,
mappings: self.mappings.clone(),
http_map: None,
};
let mut server = Server::new(inbound, Some(addr), oper);
tokio::spawn( async move {
let _ = client.wait_operate().await;
});
let _ret = server.incoming(Self::operate).await;
if _ret.is_err() {
println!("ret = {:?}", _ret);
}
Ok(())
}
此時我們建立一個虛擬的Stream來做雙邊互傳,但是此時我們還沒有收到任何的Request請求,我們並不知道當前的Host
,此時我們還未傳送ProtCreate
,等真正處理請求的時候做處理,HttpOper
是處理每個操作時均會帶的引數,我們可以根據自己需要帶上該引數。
後續處理,其中我們讀和寫都用RecvStream,做到讀多少資料轉發多少資料,以保證資料處理的及時性
async fn inner_operate(
mut req: Request<RecvStream>,
data: Arc<Mutex<HttpOper>>,
) -> ProtResult<Option<Response<RecvStream>>> {
println!("receiver req = {:?}", req.url());
let mut value = data.lock().await;
let sender = value.virtual_sender.take();
// 傳在該引數則為第一次, 第一次的時候傳送Create建立繫結連線
if sender.is_some() {
let host_name = req.get_host().unwrap_or(String::new());
// 取得相關的host資料,對內網的對映端做匹配,如果未匹配到返回錯誤,表示不支援
{
let mut config = None;
let mut is_find = false;
{
let read = value.mappings.read().await;
for v in &*read {
if v.domain == host_name {
is_find = true;
config = Some(v.clone());
}
}
}
if !is_find {
return Ok(Some(Response::builder().status(404).body("not found").ok().unwrap().into_type()));
}
value.http_map = config;
}
println!("do create prot {}, host = {:?}", value.sock_map, req.get_host());
let create = ProtCreate::new(value.sock_map, Some(req.get_host().unwrap_or(String::new())));
let _ = value.sender_work.send((create, sender.unwrap())).await;
}
if let Some(config) = &value.http_map {
// 複寫Request的標頭檔案資訊
HeaderHelper::rewrite_request(&mut req, &config.headers);
}
// 將請求傳送出去
value.sender.send(req).await?;
// 等待返回資料的到來
let mut res = value.receiver.recv().await;
if res.is_some() {
if let Some(config) = &value.http_map {
// 複寫Response的標頭檔案資訊
HeaderHelper::rewrite_response(res.as_mut().unwrap(), &config.headers);
}
return Ok(res);
} else {
return Ok(Some(Response::builder().status(503).body("cant trans").ok().unwrap().into_type()));
}
}
以下是直接HTTP/1.1的請求範例
以下是直接HTTP/1.1升級成HTTP2的請求範例
以下是直接HTTP2的請求範例
請求的返回結果均帶上了新增的頭部資訊,測試正常,至此HTTP的內網穿透資料打通。