Java I/O(4):AIO和NIO中的Selector

2022-10-21 12:02:43

您好,我是湘王,這是我的部落格園,歡迎您來,歡迎您再來~

 

Java NIO的三大核心中,除了Channel和Buffer,剩下的就是Selector了。有的地方叫它選擇器,也有叫多路複用器的(比如Netty)。

之前提過,資料總是從Channel讀取到Buffer,或者從Buffer寫入到Channel,單個執行緒可以監聽多個Channel——Selector就是這個執行緒背後的實現機制(所以得名Selector)。

 

 

 

Selector通過控制單個執行緒處理多個Channel,如果應用開啟了多個Channel,但每次傳輸的流量都很低,使用Selector就會很方便(至於為什麼,具體到Netty中再分析)。所以使用Selector的好處就顯而易見:用最少的資源實現最多的操作,避免了執行緒切換帶來的開銷。

還是以程式碼為例來演示Selector的作用。新建一個類,main()方法中輸入下面的程式碼:

/**
 * NIO中的Selector
 *
 * @author xiangwang
 */
public class TestSelector {
    public static void main(String args[]) throws IOException {
        // 建立ServerSocketChannel
        ServerSocketChannel channel1 = ServerSocketChannel.open();
        channel1.socket().bind(new InetSocketAddress("127.0.0.1", 8080));
        channel1.configureBlocking(false);
        ServerSocketChannel channel2 = ServerSocketChannel.open();
        channel2.socket().bind(new InetSocketAddress("127.0.0.1", 9090));
        channel2.configureBlocking(false);

        // 建立一個Selector物件
        Selector selector = Selector.open();
        // 按照字面意思理解,應該是這樣的:selector.register(channel, event);
        // 但其實是這樣的:channel.register(selector, SelectionKey.OP_READ);
        // 四種監聽事件:
        // OP_CONNECT(連線就緒)
        // OP_ACCEPT(接收就緒)
        // OP_READ(讀就緒)
        // OP_WRITE(寫就緒)
        // 註冊Channel到Selector,事件一旦被觸發,監聽隨之結束
        SelectionKey key1 = channel1.register(selector, SelectionKey.OP_ACCEPT);
        SelectionKey key2 = channel2.register(selector, SelectionKey.OP_ACCEPT);

        // 模板程式碼:在編寫程式時,大多數時間都是在模板程式碼中新增相應的業務程式碼
        while(true) {
            int readyNum = selector.select();
            if (readyNum == 0) {
                continue;
            }

            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            // 輪詢
            for (SelectionKey key : selectedKeys) {
                Channel channel = key.channel();
                if (key.isConnectable()) {
                    if (channel == channel1) {
                        System.out.println("channel1連線就緒");
                    } else {
                        System.out.println("channel2連線就緒");
                    }
                } else if (key.isAcceptable()) {
                    if (channel == channel1) {
                        System.out.println("channel1接收就緒");
                    } else {
                        System.out.println("channel2接收就緒");
                    }
                }
                // 觸發後刪除,這裡不刪
                // it.remove();
            }
        }
    }
}

 

 

程式碼寫好後啟動ServerSocketChannel服務,可以看到我這裡已經啟動成功:

 

 

 

然後在網上下載一個叫做SocketTest.jar的工具(在一些工具網站下載的時候當心中毒,如果不放心,可以私信我,給你地址),雙擊開啟,並按下圖方式執行:

 

 

 

點選「Connect」可以看到變化:

 

 

 

然後點選「Disconnect」,再輸入9090」後,再點選「Connect」試試:

 

 

 

可以看到結果顯示結果變了:

 

 

 

兩次連線,列印了三條資訊:說明selector的輪詢在起作用(因為Set<SelectionKey>中包含了所有處於監聽的SelectionKey)。但是「接收就緒」監聽事件僅執行了一次就再不響應。如果感興趣的話你可以把OP_READ、OP_WRITE這些事件也執行一下試試看。

 

因為Selector是單執行緒輪詢監聽多個Channel,那麼如果Selector(執行緒)之間需要傳遞資料,怎麼辦呢?——Pipe登場了。Pipe就是一種用於Selector之間資料傳遞的「管道」。

先來看個圖:

 

 

 

可以清楚地看到它的工作方式。

還是用程式碼來解釋。

/**
 * NIO中的Pipe
 *
 * @author xiangwang
 */
public class TestPipe {
    public static void main(String args[]) throws IOException {
        // 開啟管道
        Pipe pipe = Pipe.open();

        // 將Buffer資料寫入到管道
        Pipe.SinkChannel sinkChannel = pipe.sink();
        ByteBuffer buffer = ByteBuffer.allocate(32);
        buffer.put("ByteBuffer".getBytes());
        // 切換到寫模式
        buffer.flip();
        sinkChannel.write(buffer);

        // 從管道讀取資料
        Pipe.SourceChannel sourceChannel = pipe.source();
        buffer = ByteBuffer.allocate(32);
        sourceChannel.read(buffer);
        System.out.println(new String(buffer.array()));

        // 關閉管道
        sinkChannel.close();
        sourceChannel.close();
    }
}

 

 

之前說過,同步指的按順序一次完成一個任務,直到前一個任務完成並有了結果以後,才能再執行後面的任務。而非同步指的是前一個任務結束後,並不等待任務結果,而是繼續執行後一個任務,在所有任務都「執行」完後,通過任務的回撥函數去獲得結果。所以非同步使得應用效能有了極大的提高。為了更加生動地說明什麼是非同步,可以來做個實驗:

 

 

 

通過呼叫CompletableFuture.supplyAsync()方法可以很明顯地觀察到,處於位置2的「這一步先執行」會最先顯示,然後才執行位置1的程式碼。而這就是非同步的具體實現。

NIO為了支援非同步,升級到了NIO2,也就是AIO。而AIO引入了新的非同步Channel的概念,並提供了非同步FileChannel和非同步SocketChannel的實現。AIO的非同步SocketChannel是真正的非同步非阻塞I/O。通過程式碼可以更好地說明:

/**
 * AIO使用者端
 *
 * @author xiangwang
 */
public class AioClient {
    public void start() throws IOException, InterruptedException {
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
        if (channel.isOpen()) {
            // socket接收緩衝區recbuf大小
            channel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024);
            // socket傳送緩衝區recbuf大小
            channel.setOption(StandardSocketOptions.SO_SNDBUF, 128 * 1024);
            // 保持長連線狀態
            channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
            // 連線到伺服器端
            channel.connect(new InetSocketAddress(8080), null,
                    new AioClientHandler(channel));
            // 阻塞主程序
            for(;;) {
                TimeUnit.SECONDS.sleep(1);
            }
        } else {
            throw new RuntimeException("Channel not opened!");
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        new AioClient().start();
    }
}

 

/**
 * AIO使用者端CompletionHandler
 *
 * @author xiangwang
 */
public class AioClientHandler implements CompletionHandler<Void, AioClient> {
    private final AsynchronousSocketChannel channel;
    private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
    private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in));

    public AioClientHandler(AsynchronousSocketChannel channel) {
        this.channel = channel;
    }
    @Override
    public void failed(Throwable exc, AioClient attachment) {
        throw new RuntimeException("channel not opened!");
    }
    @Override
    public void completed(Void result, AioClient attachment) {
        System.out.println("send message to server: ");
        try {
            // 將輸入內容寫到buffer
            String line = input.readLine();
            channel.write(ByteBuffer.wrap(line.getBytes()));
            // 在作業系統中的Java本地方法native已經把資料寫到了buffer中
            // 這裡只需要一個緩衝區能接收就行了
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while (channel.read(buffer).get() != -1) {
                buffer.flip();
                System.out.println("from server: " + decoder.decode(buffer).toString());
                if (buffer.hasRemaining()) {
                    buffer.compact();
                } else {
                    buffer.clear();
                }
                // 將輸入內容寫到buffer
                line = input.readLine();
                channel.write(ByteBuffer.wrap(line.getBytes()));
            }
        } catch (IOException | InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

 

/**
 * AIO伺服器端
 *
 * @author xiangwang
 */
public class AioServer {
    public void start() throws InterruptedException, IOException {
        AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
        if (channel.isOpen()) {
            // socket接受緩衝區recbuf大小
            channel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
            // 埠重用,防止程序意外終止,未釋放埠,重啟時失敗
            // 因為直接殺程序,沒有顯式關閉通訊端來釋放埠,會等待一段時間後才可以重新use這個關口
            // 解決辦法就是用SO_REUSEADDR
            channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
            channel.bind(new InetSocketAddress(8080));
        } else {
            throw new RuntimeException("channel not opened!");
        }
        // 處理client連線
        channel.accept(null, new AioServerHandler(channel));
        System.out.println("server started");
        // 阻塞主程序
        for(;;) {
            TimeUnit.SECONDS.sleep(1);
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        AioServer server = new AioServer();
        server.start();
    }
}

 

/**
 * AIO伺服器端CompletionHandler
 *
 * @author xiangwang
 */
public class AioServerHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
    private final AsynchronousServerSocketChannel serverChannel;
    private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
    private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in));

    public AioServerHandler(AsynchronousServerSocketChannel serverChannel) {
        this.serverChannel = serverChannel;
    }
    @Override
    public void failed(Throwable exc, Void attachment) {
        // 處理下一次的client連線
        serverChannel.accept(null, this);
    }
    @Override
    public void completed(AsynchronousSocketChannel result, Void attachment) {
        // 處理下一次的client連線,類似鏈式呼叫
        serverChannel.accept(null, this);
        try {
            // 將輸入內容寫到buffer
            String line = input.readLine();
            result.write(ByteBuffer.wrap(line.getBytes()));
            // 在作業系統中的Java本地方法native已經把資料寫到了buffer中
            // 這裡只需要一個緩衝區能接收就行了
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while (result.read(buffer).get() != -1) {
                buffer.flip();
                System.out.println("from client: " + decoder.decode(buffer).toString());
                if (buffer.hasRemaining()) {
                    buffer.compact();
                } else {
                    buffer.clear();
                }
                // 將輸入內容寫到buffer
                line = input.readLine();
                result.write(ByteBuffer.wrap(line.getBytes()));
            }
        } catch (InterruptedException | ExecutionException | IOException e) {
            e.printStackTrace();
        }
    }
}

 

 

執行測試後顯示,不管是在使用者端還是在伺服器端,讀寫完全是非同步的。

 

 


 

 

感謝您的大駕光臨!諮詢技術、產品、運營和管理相關問題,請關注後留言。歡迎騷擾,不勝榮幸~