作者:vivo 網際網路伺服器團隊- Jin Kai
本文從Java NIO網路程式設計的基礎知識講到了Tars框架使用NIO進行網路程式設計的原始碼分析。
Tars是騰訊開源的支援多語言的高效能RPC框架,起源於騰訊內部2008年至今一直使用的統一應用框架TAF(Total Application Framework),目前支援C++、Java、PHP、Nodejs、Go語言。
該框架為使用者提供了涉及到開發、運維、以及測試的一整套解決方案,幫助一個產品或者服務快速開發、部署、測試、上線。它集可延伸協定編解碼、高效能RPC通訊框架、名字路由與發現、釋出監控、紀錄檔統計、設定管理等於一體,通過它可以快速用微服務的方式構建自己的穩定可靠的分散式應用,並實現完整有效的服務治理。
官方倉庫地址:
https://github.com/TarsCloud/Tars
vivo推播平臺也深度使用了該框架,部署服務節點超過一千個,經過線上每日一百多億訊息推播量的考驗。
此前已在vivo網際網路技術公眾號釋出過《Tars Java 使用者端原始碼分析》,此篇文章為續集。
Tars-java 最新穩定版1.7.2以及之前的版本都使用Java NIO進行網路程式設計;本文將分別詳細介紹java NIO的原理和Tars 使用NIO進行網路程式設計的細節。
從1.4版本開始,Java提供了一種新的IO處理方式:NIO (New IO 或 Non-blocking IO) 是一個可以替代標準Java IO 的API,它是面向緩衝區而不是位元組流,它是非阻塞的,支援IO多路複用。
標準的IO基於位元組流進行操作的,而NIO是基於通道(Channel)和緩衝區(Buffer)進行操作。資料總是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中,下圖是一個完整流程。
Channel型別:
支援檔案讀寫資料的FileChannel
能通過UDP讀寫網路中的資料的DatagramChannel
能通過TCP讀寫網路資料的SocketChannel
可以監聽新進來的TCP連線,對每一個新進來的連線都會建立一個SocketChannel的ServerSocketChannel 。
SocketChannel:
開啟 SocketChannel:SocketChannel socketChannel = SocketChannel.open();
關閉 SocketChannel:socketChannel.close();
從Channel中讀取的資料放到Buffer: int bytesRead = inChannel.read(buf);
將Buffer中的資料寫到Channel: int bytesWritten = inChannel.write(buf);
ServerSocketChannel:
通過 ServerSocketChannel.accept() 方法監聽新進來的連線,當accept()方法返回的時候,它返回一個包含新進來的連線的SocketChannel,因此accept()方法會一直阻塞到有新連線到達。
通常不會僅僅只監聽一個連線,在while迴圈中呼叫 accept()方法. 如下面的例子:
程式碼1:
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
//do something with socketChannel...
}
ServerSocketChannel可以設定成非阻塞模式。在非阻塞模式下,accept() 方法會立刻返回,如果還沒有新進來的連線,返回的將是null。因此,需要檢查返回的SocketChannel是否是null。
程式碼2:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
serverSocketChannel.configureBlocking(false);
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
if(socketChannel != null){
//do something with socketChannel...
}
}
Buffer型別:
ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
Buffer的分配:
ByteBuffer buf = ByteBuffer.allocate(2048);
Buffer的讀寫:
一般是以下四個步驟:
寫入資料到Buffer,最大寫入量是capacity,寫模式下limit值即為capacity值,position即為寫到的位置。
呼叫flip()方法將Buffer從寫模式切換到讀模式,此時position移動到開始位置0,limit移動到position的位置。
從Buffer中讀取資料,在讀模式下可以讀取之前寫入到buffer的所有資料,即為limit位置。
呼叫clear()方法或者compact()方法。clear()方法將position設為0,limit被設定成capacity的值。compact()方法將所有未讀的資料拷貝到Buffer起始處,然後將position設到最後一個未讀元素後面。
mark() 與 reset()方法 通過呼叫Buffer.mark()方法,可以標記Buffer中的一個特定position,之後可以通過呼叫Buffer.reset()方法恢復到這個position。
duplicate() 此方法返回承載先前位元組緩衝區內容的新位元組緩衝區。
remaining()limit 減去 position的值
Java NIO引入了選擇器的概念,選擇器用於監聽多個通道的事件。單個的執行緒可以監聽多個資料通道。要使用Selector,得向Selector註冊Channel,然後呼叫它的select()方法。這個方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,執行緒就可以處理這些事件。
程式碼3:
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,Selectionkey.OP_READ);
注意register()方法的第二個引數,這是一個監聽的集合,即在通過Selector監聽Channel時關注什麼事件集合。
SelectionKey包含:
1) interest集合:selectionKey.interestOps() 可以監聽四種不同型別的事件:OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ
2) ready集合:selectionKey.readyOps(); ready 集合是通道已經準備就緒的操作的集合,提供4個方便的方法:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
3) Channel:selectionKey.channel();
4) Selector:selectionKey.selector();
5) 可選的附加物件:
selectionKey.attachment(); 可以將一個物件或者更多資訊附著到SelectionKey上,這樣就能方便的識別特定的通道。
提示:
OP_ACCEPT和OP_CONNECT的區別:簡單來說,使用者端建立連線是connect,伺服器準備接收連線是accept。一個典型的使用者端伺服器網路互動流程如下圖
selectedKeys()
一旦呼叫了select()方法,並且返回值表明有一個或更多個通道就緒了,然後可以通過呼叫selector的selectedKeys()方法,存取已選擇鍵集(selected key set)中的就緒通道。
wakeUp()
某個執行緒呼叫select()方法後阻塞了,即使沒有通道已經就緒,也有辦法讓其從select()方法返回。只要讓其它執行緒在阻塞執行緒呼叫select()方法的物件上呼叫Selector.wakeup()方法即可。阻塞在select()方法上的執行緒會立馬返回。如果有其它執行緒呼叫了wakeup()方法,但當前沒有執行緒阻塞在select()方法上,下個呼叫select()方法的執行緒會立即wake up。
close()
用完Selector後呼叫其close()方法會關閉該Selector,且使註冊到該Selector上的所有SelectionKey範例無效。通道本身並不會關閉。
通過Selector選擇通道:
int select() 阻塞直到至少有一個通道在你註冊的事件上就緒了
int select(long timeout) 增加最長阻塞毫秒數
int selectNow() 不會阻塞,不管什麼通道就緒都立刻返回
瞭解完 Java NIO的原理,我們來看看Tars是如何使用NIO進行網路程式設計的。
Tars的網路模型是多reactor多執行緒模型。有一點特殊的是tars的reactor執行緒組裡隨機選一個執行緒處理網路事件,並且該執行緒同時也能處理讀寫。
核心類之間的關係如下:
建立ServerSocketChannel,設定為非阻塞,並繫結埠
建立Selector物件
給ServerSocketChannel註冊SelectionKey.OP_ACCEPT事件
啟動一個執行緒迴圈,呼叫Selector的select方法來檢查IO就緒事件,一旦有IO就緒事件,就通知使用者執行緒去處理IO事件
如果有Accept事件,就建立一個SocketChannel,並註冊SelectionKey.OP_READ
如果有讀事件,判斷一下是否全包,如果全包,就交給後端執行緒處理
寫事件比較特殊。isWriteable表示的是本機的寫緩衝區是否可寫。這個在絕大多少情況下都是為真的。在Netty中只有寫半包的時候才需要註冊寫事件,如果一次寫就完全把資料寫入了緩衝區就不需要註冊寫事件。
Communicator.stringToProxy() 根據servantName等設定資訊建立通訊器。
ServantProxyFactory.getServantProxy() 呼叫工廠方法建立servant代理。
ObjectProxyFactory.getObjectProxy() 呼叫工廠方法建立obj代理。
TarsProtocolInvoker.create() 建立協定呼叫者。
ServantProtocolInvoker.initClient(Url url) 根據servantProxyConfig中的設定資訊找到servant的ip埠等進行初始化ServantClient。
ClientPoolManager.getSelectorManager() 如果第一次呼叫selectorManager是空的就會去初始化selectorManager。
reactorSet = new Reactor[selectorPoolSize]; SelectorManager初始化構造類中的會根據selectorPoolSize(預設是2)的設定建立Reactor執行緒陣列。執行緒名稱的字首是servant-proxy-加上CommunicatorId,CommunicatorId生成規則是由locator的地址生成的UUID。
啟動reactor執行緒。
tars支援TCP和UDP兩種協定,RPC場景下是使用TCP協定。
new SelectorManager() 根據設定資訊初始化selectorManager,執行緒池大小 processors > 8 ? 4 + (processors * 5 / 8) : processors + 1;執行緒名稱字首是server-tcp-reactor,然後啟動reactor執行緒陣列中的所有執行緒。
開啟伺服器端監聽的ServerSocketChannel,繫結伺服器端本地ip和監聽的埠號,設定TCP連線請求佇列的最大容量為1024;設定非阻塞模式。
選取reactor執行緒陣列中第0個執行緒作為伺服器端監聽連線OP_ACCEPT就緒事件的執行緒。
程式碼4:
public void bind(AppService appService) throws IOException {
// 此處略去非關鍵程式碼
if (endpoint.type().equals("tcp")) { // 1
this.selectorManager = new SelectorManager(Utils.getSelectorPoolSize(), new ServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false); // 2
this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay());
this.selectorManager.start();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(endpoint.host(), endpoint.port()), 1024); // 3
serverChannel.configureBlocking(false);
selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT); // 4
} else if (endpoint.type().equals("udp")) {
this.selectorManager = new SelectorManager(1, new ServantProtocolFactory(codec), threadPool, processor, false, "server-udp-reactor", true);
this.selectorManager.start();
// UDP開啟的是DatagramChannel
DatagramChannel serverChannel = DatagramChannel.open();
DatagramSocket socket = serverChannel.socket();
socket.bind(new InetSocketAddress(endpoint.host(), endpoint.port()));
serverChannel.configureBlocking(false);
// UDP協定不需要建連,監聽的是OP_READ就緒事件
this.selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_READ);
}
}
多路複用器開始輪詢檢查 是否有就緒的事件。
處理register佇列中剩餘的channel註冊到當前reactor執行緒的多路複用器selector中。
獲取已選鍵集中所有就緒的channel。
更新Session中最近操作時間,Tars伺服器端啟動時會呼叫 startSessionManager() , 單執行緒每30s掃描一次session對談列表,會檢查每個session的 lastUpdateOperationTime 與當前時間的時間差,如果超過60秒會將過期session對應的channel踢除。
分發IO事件進行處理。
處理unregister佇列中剩餘的channel,從當前reactor執行緒的多路複用器selector中解除註冊。
程式碼5:
public void run() {
while (!Thread.interrupted()) {
selector.select(); // 1
processRegister(); // 2
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // 3
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (!key.isValid()) continue;
try {
if (key.attachment() != null && key.attachment() instanceof Session) {
((Session) key.attachment()).updateLastOperationTime(); //4
}
dispatchEvent(key); // 5
} catch (Throwable ex) {
disConnectWithException(key, ex);
}
}
processUnRegister(); // 6
}
}
每個reactor執行緒都有一個專門的Accepter類去處理各種IO事件。TCPAccepter可以處理全部的四種事件(OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ)、UDPAccepter由於不需要建立連線所以只需要處理讀和寫兩種事件。
1. 處理OP_ACCEPT
獲取channel,處理TCP請求。
為這個TCP請求建立TCPSession,對談的狀態是伺服器已連線
對談註冊到sessionManager中,Tars服務可設定最大連線數maxconns,如果超過就會關閉當前對談。
尋找下一個reactor執行緒進行多路複用器與channel的繫結。
程式碼6:
public void handleAcceptEvent(SelectionKey key) throws IOException {
ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 1
SocketChannel channel = server.accept();
channel.socket().setTcpNoDelay(selectorManager.isTcpNoDelay());
channel.configureBlocking(false);
Utils.setQosFlag(channel.socket());
TCPSession session = new TCPSession(selectorManager); // 2
session.setChannel(channel);
session.setStatus(SessionStatus.SERVER_CONNECTED);
session.setKeepAlive(selectorManager.isKeepAlive());
session.setTcpNoDelay(selectorManager.isTcpNoDelay());
SessionManager.getSessionManager().registerSession(session); // 3
selectorManager.nextReactor().registerChannel(channel, SelectionKey.OP_READ, session); // 4
}
2. 處理OP_CONNECT
獲取使用者端連線過來的channel通道
獲取Session
與伺服器建立連線,將關注的興趣OPS設定為ready就緒事件,session中的狀態修改為使用者端已連線
處理OP_CONNECT
程式碼7:
public void handleConnectEvent(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel(); // 1
TCPSession session = (TCPSession) key.attachment(); //2
if (session == null) throw new RuntimeException("The session is null when connecting to ...");
try { // 3
client.finishConnect();
key.interestOps(SelectionKey.OP_READ);
session.setStatus(SessionStatus.CLIENT_CONNECTED);
} finally {
session.finishConnect();
}
}
3.處理OP_WRITE、 處理OP_READ
程式碼8:
public void handleReadEvent(SelectionKey key) throws IOException {
TCPSession session = (TCPSession) key.attachment();
if (session == null) throw new RuntimeException("The session is null when reading data...");
session.read();
}
public void handleWriteEvent(SelectionKey key) throws IOException {
TCPSession session = (TCPSession) key.attachment();
if (session == null) throw new RuntimeException("The session is null when writing data...");
session.doWrite();
}
1. 讀事件處理
申請2k的ByteBuffer空間,讀取channel中的資料到readBuffer中。根據sessionStatus判斷是使用者端讀響應還是伺服器讀請求,分別進行處理。
程式碼9:
protected void read() throws IOException {
int ret = readChannel();
if (this.status == SessionStatus.CLIENT_CONNECTED) {
readResponse();
} else if (this.status == SessionStatus.SERVER_CONNECTED) {
readRequest();
} else {
throw new IllegalStateException("The current session status is invalid. [status:" + this.status + "]");
}
if (ret < 0) {
close();
return;
}
}
private int readChannel() throws IOException {
int readBytes = 0, ret = 0;
ByteBuffer data = ByteBuffer.allocate(1024 * 2); // 1
if (readBuffer == null) {
readBuffer = IoBuffer.allocate(bufferSize);
}
// 2
while ((ret = ((SocketChannel) channel).read(data)) > 0) {
data.flip(); // 3
readBytes += data.remaining();
readBuffer.put(data.array(), data.position(), data.remaining());
data.clear();
}
return ret < 0 ? ret : readBytes;
}
① 使用者端讀響應
從當前readBuffer中的內容複製到一個新的臨時buffer中,並且切換到讀模式,使用TarsCodec類解析出buffer內的協定欄位到response,WorkThread執行緒通知Ticket處理response。如果response為空,則重置tempBuffer到mark的位置,重新解析協定。
程式碼10:
public void readResponse() {
Response response = null;
IoBuffer tempBuffer = null;
tempBuffer = readBuffer.duplicate().flip();
while (true) {
tempBuffer.mark();
if (tempBuffer.remaining() > 0) {
response = selectorManager.getProtocolFactory().getDecoder().decodeResponse(tempBuffer, this);
} else {
response = null;
}
if (response != null) {
if (response.getTicketNumber() == Ticket.DEFAULT_TICKET_NUMBER) response.setTicketNumber(response.getSession().hashCode());
selectorManager.getThreadPool().execute(new WorkThread(response, selectorManager));
} else {
tempBuffer.reset();
readBuffer = resetIoBuffer(tempBuffer);
break;
}
}
}
② 伺服器讀請求
任務放入執行緒池交給 WorkThread執行緒,最終交給Processor類出構建請求的響應體,包括分散式上下文,然後經過FilterChain的處理,最終通過jdk提供的反射方法invoke伺服器端原生的方法然後返回response。如果執行緒池丟擲拒絕異常,則返回SERVEROVERLOAD = -9,伺服器端過載保護。如果request為空,則重置tempBuffer到mark的位置,重新解析協定。
程式碼11:
public void readRequest() {
Request request = null;
IoBuffer tempBuffer = readBuffer.duplicate().flip();
while (true) {
tempBuffer.mark();
if (tempBuffer.remaining() > 0) {
request = selectorManager.getProtocolFactory().getDecoder().decodeRequest(tempBuffer, this);
} else {
request = null;
}
if (request != null) {
try {
request.resetBornTime();
selectorManager.getThreadPool().execute(new WorkThread(request, selectorManager));
} catch (RejectedExecutionException e) {
selectorManager.getProcessor().overload(request, request.getIoSession());
} catch (Exception ex) {
ex.printStackTrace();
}
} else {
tempBuffer.reset();
readBuffer = resetIoBuffer(tempBuffer);
break;
}
}
}
2. 寫事件處理
同樣也包括使用者端寫請求和伺服器端寫響應兩種,其實這兩種都是往TCPSession中的LinkedBlockingQueue(有界佇列最大8K)中插入ByteBuffer。LinkedBlockingQueue中的ByteBuffer最終會由TCPAcceptor中的handleWriteEvent監聽寫就緒事件並消費。
程式碼12:
protected void write(IoBuffer buffer) throws IOException {
if (buffer == null) return;
if (channel == null || key == null) throw new IOException("Connection is closed");
if (!this.queue.offer(buffer.buf())) {
throw new IOException("The session queue is full. [ queue size:" + queue.size() + " ]");
}
if (key != null) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
key.selector().wakeup();
}
}
本文主要介紹了Java NIO程式設計的基礎知識 和 Tars-Java 1.7.2版本的網路程式設計模組的原始碼實現。
在最新的Tars-Java的master分支中我們可以發現網路程式設計已經由NIO改成了Netty,雖然Netty更加成熟穩定,但是作為學習者瞭解NIO的原理也是掌握網路程式設計的必經之路。
更多關於Tars框架的介紹可以存取:
本文分析原始碼地址(v1.7.x分支):
https://github.com/TarsCloud/TarsJava