前言:
當我們在部署web服務的時候,web容器通常都會記錄來自使用者端的存取紀錄檔。而當我們使用Thrift RPC服務的時候,Thrift服務則不會給我們自動記錄使用者端的存取紀錄檔。
通過這篇文章,你可以學習到如何使用在Thrift伺服器端新增使用者端的存取紀錄檔。
面臨的問題:
要在Thrift伺服器端新增使用者端的存取紀錄檔,我們需要解決兩個問題:
- 找到合適的攔截點記錄資訊
- 收集存取紀錄檔中需要的資訊
尋找合適的攔截點:
我們都知道,Thrift協定為我們提供了thrift檔案向各種程式語言轉換的程式。通過觀察,我們會發現Thrift將IDL中定義的每個方法抽象為一個類,即ProcessFunction類。
該類負責從輸入中讀取引數,呼叫使用者編寫的服務將響應寫回到輸出中。該類是如何發揮作用的,下面這張類圖可以比較清晰地說明。
當我們使用Thrift.exe可執行程式處理IDL檔案的時候,Processor會被自動建立出來。它負責把實際的方法實現和方法的key關聯起來,放到Map中維護。
以TMultiplexedProcessor為例,TMultiplexedProcessor會將所有註冊的Processor都儲存到SERVICE_PROCESSOR_MAP中。
public boolean process(TProtocol iprot, TProtocol oprot) throws TException {
/*
先讀取訊息頭
*/
TMessage message = iprot.readMessageBegin();
if (message.type != TMessageType.CALL && message.type != TMessageType.ONEWAY) {
// TODO Apache Guys - Can the server ever get an EXCEPTION or REPLY?
// TODO Should we check for this here?
throw new TException("This should not have happened!?");
}
// Extract the service name
int index = message.name.indexOf(TMultiplexedProtocol.SEPARATOR);
if (index < 0) {
throw new TException("Service name not found in message name: " + message.name + ". Did you " +
"forget to use a TMultiplexProtocol in your client?");
}
// 從message中讀取serviceName
String serviceName = message.name.substring(0, index);
TProcessor actualProcessor = SERVICE_PROCESSOR_MAP.get(serviceName);
if (actualProcessor == null) {
throw new TException("Service name not found: " + serviceName + ". Did you forget " +
"to call registerProcessor()?");
}
// Create a new TMessage, removing the service name
TMessage standardMessage = new TMessage(
message.name.substring(serviceName.length()+TMultiplexedProtocol.SEPARATOR.length()),
message.type,
message.seqid
);
//由真實的處理器對輸入資訊進行處理
return actualProcessor.process(new StoredMessageProtocol(iprot, standardMessage), oprot);
}
actualProcessor的process過程如下,其具體的實現邏輯在TBaseProcessor中。
public boolean process(TProtocol in, TProtocol out) throws TException {
//讀取訊息頭
TMessage msg = in.readMessageBegin();
//從方法集合中獲取對應的方法處理類
ProcessFunction fn = processMap.get(msg.name);
if (fn == null) {
TProtocolUtil.skip(in, TType.STRUCT);
in.readMessageEnd();
TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
return true;
}
//進行具體的處理,ProcessFunction物件是實際方法的裝飾器,
//process內部會呼叫實際方法的處理邏輯
fn.process(msg.seqid, in, out, iface);
return true;
}
通過上面的分析,我們可以在ProcessFunction中新增有關的access紀錄檔。但是這其中有一個問題,就是經過ThriftServer對thrift請求的解析以及訊息內容處理,在到達ProcessFunction::process方法的時候,我們已經無法獲取到使用者端的遠端IP地址了。
接下來,我們就要考慮如何收集存取紀錄檔需要的資訊了。
如何收集存取紀錄檔需要的資訊:
從上面ProcessFunction中的process方法中,我們可以看出將使用者端的IP地址儲存到iprot中,是一個不錯的選擇。
那麼,接下來我們需要找到iprot這個物件引數是在什麼地方被建立的,以及在合適的地方將使用者端的IP地址寫入到這個物件中。
經過分析,我們會發現TNonblockingServer是NIO伺服器的實現,它通過Selector來檢查IO就緒狀態,進而呼叫相關的Channel。
就方法呼叫而言,它處理的是讀事件,用AbstractNonblockingServer的handelRead()來進一步處理。
protected void handleRead(SelectionKey key) throws IOException {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.read()) {
cleanupSelectionKey(key);
return;
}
// if the buffer's frame read is complete, invoke the method.
if (buffer.isFrameFullyRead()) {
if (!requestInvoke(buffer)) {
cleanupSelectionKey(key);
}
}
}
SelectionKey中有使用者端的IP地址,FrameBuffer則是處理方法呼叫的緩衝區物件,其內部的invoke方法會對Processor中的方法進行實際呼叫。
因此,在handleRead方法中新增兩行程式碼將使用者端的IP地址寫入inProt_中就可以帶入ProcessFunction中了。
SocketChannel socketChannel = (SocketChannel)key.channel();
buffer.inProt_.setClientAddr(socketChannel.getRemoteAddress().toString());