您好,我是湘王,這是我的部落格園,歡迎您來,歡迎您再來~
在Java NIO的三大核心中,除了Channel和Buffer,剩下的就是Selector了。有的地方叫它選擇器,也有叫多路複用器的(比如Netty)。
之前提過,資料總是從Channel讀取到Buffer,或者從Buffer寫入到Channel,單個執行緒可以監聽多個Channel——Selector就是這個執行緒背後的實現機制(所以得名Selector)。
Selector通過控制單個執行緒處理多個Channel,如果應用開啟了多個Channel,但每次傳輸的流量都很低,使用Selector就會很方便(至於為什麼,具體到Netty中再分析)。所以使用Selector的好處就顯而易見:用最少的資源實現最多的操作,避免了執行緒切換帶來的開銷。
還是以程式碼為例來演示Selector的作用。新建一個類,在main()方法中輸入下面的程式碼:
/** * NIO中的Selector * * @author xiangwang */ public class TestSelector { public static void main(String args[]) throws IOException { // 建立ServerSocketChannel ServerSocketChannel channel1 = ServerSocketChannel.open(); channel1.socket().bind(new InetSocketAddress("127.0.0.1", 8080)); channel1.configureBlocking(false); ServerSocketChannel channel2 = ServerSocketChannel.open(); channel2.socket().bind(new InetSocketAddress("127.0.0.1", 9090)); channel2.configureBlocking(false); // 建立一個Selector物件 Selector selector = Selector.open(); // 按照字面意思理解,應該是這樣的:selector.register(channel, event); // 但其實是這樣的:channel.register(selector, SelectionKey.OP_READ); // 四種監聽事件: // OP_CONNECT(連線就緒) // OP_ACCEPT(接收就緒) // OP_READ(讀就緒) // OP_WRITE(寫就緒) // 註冊Channel到Selector,事件一旦被觸發,監聽隨之結束 SelectionKey key1 = channel1.register(selector, SelectionKey.OP_ACCEPT); SelectionKey key2 = channel2.register(selector, SelectionKey.OP_ACCEPT); // 模板程式碼:在編寫程式時,大多數時間都是在模板程式碼中新增相應的業務程式碼 while(true) { int readyNum = selector.select(); if (readyNum == 0) { continue; } Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 輪詢 for (SelectionKey key : selectedKeys) { Channel channel = key.channel(); if (key.isConnectable()) { if (channel == channel1) { System.out.println("channel1連線就緒"); } else { System.out.println("channel2連線就緒"); } } else if (key.isAcceptable()) { if (channel == channel1) { System.out.println("channel1接收就緒"); } else { System.out.println("channel2接收就緒"); } } // 觸發後刪除,這裡不刪 // it.remove(); } } } }
程式碼寫好後啟動ServerSocketChannel服務,可以看到我這裡已經啟動成功:
然後在網上下載一個叫做SocketTest.jar的工具(在一些工具網站下載的時候當心中毒,如果不放心,可以私信我,給你地址),雙擊開啟,並按下圖方式執行:
點選「Connect」可以看到變化:
然後點選「Disconnect」,再輸入「9090」後,再點選「Connect」試試:
可以看到結果顯示結果變了:
兩次連線,列印了三條資訊:說明selector的輪詢在起作用(因為Set<SelectionKey>中包含了所有處於監聽的SelectionKey)。但是「接收就緒」監聽事件僅執行了一次就再不響應。如果感興趣的話你可以把OP_READ、OP_WRITE這些事件也執行一下試試看。
因為Selector是單執行緒輪詢監聽多個Channel,那麼如果Selector(執行緒)之間需要傳遞資料,怎麼辦呢?——Pipe登場了。Pipe就是一種用於Selector之間資料傳遞的「管道」。
先來看個圖:
可以清楚地看到它的工作方式。
還是用程式碼來解釋。
/** * NIO中的Pipe * * @author xiangwang */ public class TestPipe { public static void main(String args[]) throws IOException { // 開啟管道 Pipe pipe = Pipe.open(); // 將Buffer資料寫入到管道 Pipe.SinkChannel sinkChannel = pipe.sink(); ByteBuffer buffer = ByteBuffer.allocate(32); buffer.put("ByteBuffer".getBytes()); // 切換到寫模式 buffer.flip(); sinkChannel.write(buffer); // 從管道讀取資料 Pipe.SourceChannel sourceChannel = pipe.source(); buffer = ByteBuffer.allocate(32); sourceChannel.read(buffer); System.out.println(new String(buffer.array())); // 關閉管道 sinkChannel.close(); sourceChannel.close(); } }
之前說過,同步指的按順序一次完成一個任務,直到前一個任務完成並有了結果以後,才能再執行後面的任務。而非同步指的是前一個任務結束後,並不等待任務結果,而是繼續執行後一個任務,在所有任務都「執行」完後,通過任務的回撥函數去獲得結果。所以非同步使得應用效能有了極大的提高。為了更加生動地說明什麼是非同步,可以來做個實驗:
通過呼叫CompletableFuture.supplyAsync()方法可以很明顯地觀察到,處於位置2的「這一步先執行」會最先顯示,然後才執行位置1的程式碼。而這就是非同步的具體實現。
NIO為了支援非同步,升級到了NIO2,也就是AIO。而AIO引入了新的非同步Channel的概念,並提供了非同步FileChannel和非同步SocketChannel的實現。AIO的非同步SocketChannel是真正的非同步非阻塞I/O。通過程式碼可以更好地說明:
/** * AIO使用者端 * * @author xiangwang */ public class AioClient { public void start() throws IOException, InterruptedException { AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(); if (channel.isOpen()) { // socket接收緩衝區recbuf大小 channel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024); // socket傳送緩衝區recbuf大小 channel.setOption(StandardSocketOptions.SO_SNDBUF, 128 * 1024); // 保持長連線狀態 channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); // 連線到伺服器端 channel.connect(new InetSocketAddress(8080), null, new AioClientHandler(channel)); // 阻塞主程序 for(;;) { TimeUnit.SECONDS.sleep(1); } } else { throw new RuntimeException("Channel not opened!"); } } public static void main(String[] args) throws IOException, InterruptedException { new AioClient().start(); } }
/** * AIO使用者端CompletionHandler * * @author xiangwang */ public class AioClientHandler implements CompletionHandler<Void, AioClient> { private final AsynchronousSocketChannel channel; private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder(); private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); public AioClientHandler(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void failed(Throwable exc, AioClient attachment) { throw new RuntimeException("channel not opened!"); } @Override public void completed(Void result, AioClient attachment) { System.out.println("send message to server: "); try { // 將輸入內容寫到buffer String line = input.readLine(); channel.write(ByteBuffer.wrap(line.getBytes())); // 在作業系統中的Java本地方法native已經把資料寫到了buffer中 // 這裡只需要一個緩衝區能接收就行了 ByteBuffer buffer = ByteBuffer.allocate(1024); while (channel.read(buffer).get() != -1) { buffer.flip(); System.out.println("from server: " + decoder.decode(buffer).toString()); if (buffer.hasRemaining()) { buffer.compact(); } else { buffer.clear(); } // 將輸入內容寫到buffer line = input.readLine(); channel.write(ByteBuffer.wrap(line.getBytes())); } } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
/** * AIO伺服器端 * * @author xiangwang */ public class AioServer { public void start() throws InterruptedException, IOException { AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(); if (channel.isOpen()) { // socket接受緩衝區recbuf大小 channel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024); // 埠重用,防止程序意外終止,未釋放埠,重啟時失敗 // 因為直接殺程序,沒有顯式關閉通訊端來釋放埠,會等待一段時間後才可以重新use這個關口 // 解決辦法就是用SO_REUSEADDR channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.bind(new InetSocketAddress(8080)); } else { throw new RuntimeException("channel not opened!"); } // 處理client連線 channel.accept(null, new AioServerHandler(channel)); System.out.println("server started"); // 阻塞主程序 for(;;) { TimeUnit.SECONDS.sleep(1); } } public static void main(String[] args) throws IOException, InterruptedException { AioServer server = new AioServer(); server.start(); } }
/** * AIO伺服器端CompletionHandler * * @author xiangwang */ public class AioServerHandler implements CompletionHandler<AsynchronousSocketChannel, Void> { private final AsynchronousServerSocketChannel serverChannel; private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder(); private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); public AioServerHandler(AsynchronousServerSocketChannel serverChannel) { this.serverChannel = serverChannel; } @Override public void failed(Throwable exc, Void attachment) { // 處理下一次的client連線 serverChannel.accept(null, this); } @Override public void completed(AsynchronousSocketChannel result, Void attachment) { // 處理下一次的client連線,類似鏈式呼叫 serverChannel.accept(null, this); try { // 將輸入內容寫到buffer String line = input.readLine(); result.write(ByteBuffer.wrap(line.getBytes())); // 在作業系統中的Java本地方法native已經把資料寫到了buffer中 // 這裡只需要一個緩衝區能接收就行了 ByteBuffer buffer = ByteBuffer.allocate(1024); while (result.read(buffer).get() != -1) { buffer.flip(); System.out.println("from client: " + decoder.decode(buffer).toString()); if (buffer.hasRemaining()) { buffer.compact(); } else { buffer.clear(); } // 將輸入內容寫到buffer line = input.readLine(); result.write(ByteBuffer.wrap(line.getBytes())); } } catch (InterruptedException | ExecutionException | IOException e) { e.printStackTrace(); } } }
執行測試後顯示,不管是在使用者端還是在伺服器端,讀寫完全是非同步的。
感謝您的大駕光臨!諮詢技術、產品、運營和管理相關問題,請關注後留言。歡迎騷擾,不勝榮幸~