網路程式設計之IO模型

2023-12-18 15:02:41

我們討論網路程式設計中的IO模型時,需要先明確什麼是IO以及IO操作為什麼在程式開發中是很關鍵的一部分,首先我們看下IO的定義。

IO的定義

IO操作(Input/Output操作)是計算機系統中的一種重要操作,用於資料的輸入和輸出,通常涉及到計算機與外部裝置(如硬碟、網路卡、鍵盤、滑鼠、印表機等)之間的資料傳輸和互動的都可以認為是IO操作。

IO操作可以分為兩種主要型別:

1 輸入操作(Input)指從外部裝置或資料來源中讀取資料到計算機記憶體或程式當中。例如從硬碟讀取檔案內容、從鍵盤接收使用者的輸入、從網路卡接收資料等.

2 輸出操作(Output)指將計算機記憶體中的資料寫入到外部裝置或資料目標中。例如將資料寫入到硬碟上的檔案、文字列印輸出、將資料傳送到網路上等都屬於輸出操作。

無論是哪種I/O 操作分為兩個部分:

  • 資料準備,將資料載入到核心快取。(資料載入到作業系統)
  • 將核心快取中的資料載入到使用者快取(從作業系統複製到應用中)

因此開發工作當中涉及到的網路讀寫、資料庫操作、檔案操作、紀錄檔列印的,都可以歸為IO操作,IO操作的效能可以受到多種因素的影響,包括硬體效能、作業系統的優化、檔案系統的效能等等。而網路IO特指計算機程式與網路之間進行資料互動的過程,在編碼層面我們可以簡單把它定義成的Socket通訊端操作,如Socket的建立和關閉,資料的傳送和接收。

IO的重要性

那麼為什麼我們要關心IO操並對應存在多種IO模型呢, 因為IO操作的本質是要與硬體進行資料互動,這個過程是需要時間的,返回的結果也是需要等待的,從而也就造成了我們經常說的阻塞問題,你的程式是需要停在那裡等待的,所以在大部分程式開發場景中,IO操作通常是最耗時的操作之一,並且最容易成為效能瓶頸的關鍵因素,特別是一旦你的程式開始上規模的,同樣的一段程式,一旦需要處理的數量級上去,就會升級成一個複雜的問題;同理網路IO之所以重要且被人反覆提及,是因為網路開發特別是伺服器端的開發中不可能處理的是單一連結,處理100個連結與處理100萬個連結面對的效能挑戰是不能同日而語的,這也是C10K問題的由來,所以一些需要處理海量連結的服務應用,比如IOT物聯網服務,推播服務, 為了提高IO操作的效能,通常會使用一些阻塞、非阻塞IO、非同步IO模型來優化IO對整個應用的效能影響,也就是我們經常聽到的BIO、NIO、AIO等IO模型,當然這只是解決上面所說問題的方案其中的一個環節。

IO依據阻塞,非阻塞,同步,非同步等特點可以劃分為阻塞IO(BIO)、非阻塞IO(NIO)、多路複用IO(multiplexing IO)、非同步IO(AIO)。每一種IO都有他們的使用場景和優勢,其中平常我們說的NIO已經包含了IO的多路複用,以下這張圖各個IO和阻塞非阻塞,同步非同步之間的關係。

其中阻塞、非阻塞、多路IO複用這些需要輪詢處理的都是同步IO,真正的非同步IO中程式只需要等待一個完成的訊號的通知,也就是我們通常說的非同步回撥機制。所以拉一個子執行緒去輪訓或使用select、poll、epoll都不是非同步。

網路程式設計IO模型

上面我們闡述了IO的定義以及重要性,現在結合Java程式碼的具體實現看下不同IO模型的具體實現與特點。

1、 阻塞IO模型

BIO(Blocking I/O)

  • BIO是最傳統的阻塞I/O模型,意味著當一個執行緒執行I/O操作時,它會一直等待直到操作完成。
  • 每個I/O操作都需要一個獨立的執行緒來處理,這會導致執行緒數量的大幅增加,降低了系統的並行效能。
  • 適用於一般連線數不多的應用場景。
public class BioServer {  
      
    public static void main(String[] args) throws IOException, InterruptedException {  
         
       ExecutorService executor = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),  
             new ThreadPoolExecutor.CallerRunsPolicy());  
       ServerSocket serverSocket = new ServerSocket();  
       serverSocket.bind(new InetSocketAddress("127.0.0.1",9091));//繫結IP地址與埠,定義一個伺服器端socket,開啟監聽  
       while (true) {  
          Socket socket = serverSocket.accept();//這裡如果沒有使用者端連結,會一直阻塞等待  
          //Thread.sleep(1000*30);  
          executor.execute(new BioServerHandler(socket));  
            
       }  
    }  
}

public class BioServerHandler implements Runnable{  
  
    private final Socket socket;  
      
    public BioServerHandler(Socket socket) {  
       this.socket=socket;  
    }  
      
    @Override  
    public void run() {  
       // TODO Auto-generated method stub  
       try {  
            
          while (true) {  
             byte[] rbytes = new byte[1024];  
             InputStream inputStream =  socket.getInputStream(); //通過IO輸入流接受訊息  
                int rlength=inputStream.read(rbytes, 0, 1024); //訊息長度  
             byte[] bytes = new byte[rlength];  
             System.arraycopy(rbytes, 0, bytes, 0, rlength);  
             String message = new String(bytes);  
             System.out.printf("Client: %s%n", message);  
  
             PrintStream writer = new PrintStream(socket.getOutputStream()); //通過IO輸出流傳送訊息  
             writer.println("Hello BIO Client");  
          }  
  
            
       } catch (IOException e) {  
          // TODO Auto-generated catch block  
          e.printStackTrace();  
       }  
         
    }  
  
}

2、 NIO模型

NIO 和多路複用是密切相關的概念,它們通常結合使用,特別是在網路程式設計中NIO 通常通過多路複用機制來實現非阻塞 I/O。多路複用允許一個執行緒同時監視多個通道的狀態,當某個通道有資料可讀或可寫時,多路複用機制會通知應用程式,這使得應用程式能夠高效地處理多個通道的 I/O 事件。

NIO引入了Channel和Buffer的概念,允許一個執行緒管理多個Channel,從而提高了系統的並行效能。

  1. Channel和Buffer:NIO引入了Channel和Buffer的概念,允許一個執行緒管理多個Channel。Channel表示與資料來源(如檔案或網路通訊端)的連線,而Buffer是用於讀取或寫入資料的緩衝區。執行緒可以將資料從Channel讀取到Buffer,或者將資料從Buffer寫入到Channel,而無需等待資料準備好。

  2. Selector(選擇器):Selector是NIO的核心元件之一,它允許一個執行緒同時管理多個Channel。Selector可以檢測多個Channel是否準備好讀取或寫入資料,從而使執行緒能夠非阻塞地等待資料準備好的通知。這種機制稱為事件驅動,允許一個執行緒同時監視多個通道上的事件,只處理已經準備好的事件,而不是等待每個通道的資料準備好。

  3. 非阻塞呼叫:NIO中的Channel和Socket通常可以設定為非阻塞模式。在非阻塞模式下,當進行讀取或寫入操作時,如果沒有資料準備好,不會阻塞執行緒,而是立即返回一個狀態碼,表示沒有資料可用,這樣執行緒可以繼續處理其他Channel,而不會被一個阻塞的操作阻塞。

綜合上述機制,NIO允許一個執行緒同時處理多個連線,並在資料準備好時進行處理,而不會阻塞等待資料準備好。這種基於底層事件驅動的方式提高了應用系統的並行效能,特別適用於需要處理大量連線的場景。

public class NioServer {  
    public static void main(String[] args) {  
       try {  
          // TODO Auto-generated method stub  
            // 1.獲取Selector選擇器  
            Selector selector = Selector.open();  
            // 2.獲取通道  
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  
            // 3.設定為非阻塞  
            serverSocketChannel.configureBlocking(false);  
            // 4.繫結連線  
            serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8091));  
            // 5.將通道註冊到選擇器上,並註冊的操作為:「接收」操作  
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  
              
              
            // 6.採用輪詢的方式,查詢獲取「準備就緒」的註冊過的操作  
            while (true) {  
                selector.select(); //阻塞
                Set<SelectionKey> selectionKeys = selector.selectedKeys();  
                Iterator<SelectionKey> iterator = selectionKeys.iterator();  
                while (iterator.hasNext()) {  
                    SelectionKey selectionKey = iterator.next();  
                    new NioServerHandler(serverSocketChannel, selectionKey).handle();  
                }  
               selectionKeys.clear();  
            }  
              
              
       } catch (Exception e) {  
          // TODO: handle exception  
       }  
         
  
    }  
  
}
public class NioServerHandler {  
      
    private ServerSocketChannel serverSocketChannel;  
      
    private SelectionKey selectionKey;  
      
    public  NioServerHandler(ServerSocketChannel serverSocketChannel, SelectionKey selectionKey) {  
       this.serverSocketChannel=serverSocketChannel;  
       this.selectionKey=selectionKey;  
    }  
  
    public void handle() {  
        ByteBuffer inputBuff = ByteBuffer.allocate(1024); // 分配讀ByteBuffer  
        ByteBuffer outputBuff = ByteBuffer.allocate(1024); // 分配寫ByteBuffer  
        try {  
            if (selectionKey.isAcceptable()) { //連結事件  
                SocketChannel socketChannel = serverSocketChannel.accept();  
                if (socketChannel == null) {  
                    return;  
                }  
                socketChannel.configureBlocking(false);  
                socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);  
            }  
  
            if (selectionKey.isReadable()) {//讀事件  
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();  
                if (socketChannel == null) {  
                    return;  
                }  
  
                inputBuff.clear();  
                int length = socketChannel.read(inputBuff);  
                if (length == -1) {  
                    socketChannel.close();  
                    selectionKey.cancel();  
                    return;  
                }  
  
                inputBuff.flip();  
               
             byte[] bytes = new byte[length];  
             System.arraycopy(inputBuff.array(), 0, bytes, 0, length);  
                  
                System.err.println( BytesUtils.toHexString(bytes));  
  
  
                socketChannel.register(selectionKey.selector(), SelectionKey.OP_WRITE);  
                selectionKey.selector().wakeup();//喚醒選擇器  
            }  
  
            if (selectionKey.isWritable()) {//寫事件  
                outputBuff.clear();  
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();  
                if (socketChannel == null) {  
                    return;  
                }  
                String message = "Hello, Client. " + UUID.randomUUID();  
                System.err.println(message);  
                outputBuff.put(message.getBytes(StandardCharsets.UTF_8));  
                outputBuff.flip();  
                socketChannel.write(outputBuff);  
  
                socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);  
                selectionKey.selector().wakeup();//喚醒選擇器  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
}

3、 AIO模型

AIO(Asynchronous I/O),也稱為非同步 I/O,是一種用於處理輸入和輸出操作的程式設計模型。它與傳統的BIO和 NIO 模型有一些顯著的不同之處。它不需要像BIO與NIO通過輪詢方式去檢查資料是否準備好,而是由作業系統完成。當資料準備好後,作業系統會通知應用程式,並在回撥函數中進行處理。

AIO使用了三個核心元件:AsynchronousChannel、CompletionHandler和
AsynchronousServerSocketChannel。其中,AsynchronousChannel是讀/寫資料的通道,CompletionHandler是I/O操作完成時的回撥方法,AsynchronousServerSocketChannel是非同步伺服器端通訊端通道,用於監聽使用者端的連線請求。
以下是 AIO 的主要特點:

  1. 非同步操作:AIO 的最重要特點是非同步操作。在 AIO 模型中,應用程式發起 I/O 操作後,不需要等待操作完成,而是可以繼續執行其他任務。當操作完成時,作業系統會通知應用程式,這種模型不會阻塞應用程式的執行。

  2. 回撥機制:AIO 使用回撥機制來處理 I/O 完成事件。應用程式在發起非同步操作時需要提供一個回撥函數或回撥物件,用於處理操作完成後的事件通知。這使得 AIO 程式設計更具事件驅動的特性。

  3. 提高並行效能:AIO 能夠在高並行環境中提供更好的效能,因為它在NIO允許一個執行緒管理多個 I/O 操作的基礎上實現了非同步回撥,因此可以更好地處理大量連線。

  4. 複雜性:AIO 程式設計相對複雜,因為它涉及到回撥和狀態管理。編寫和維護 AIO 程式碼可能需要更多的工作,但可以提供更好的效能和響應性。

總之AIO 是一種適合高並行、低延遲要求的應用程式的程式設計模型。它的非同步特性和事件驅動的方式使得應用程式能夠更好地利用系統資源,提供更好的效能和響應性。然而AIO通常比傳統BIO或 NIO的具體實現更復雜,因此設計和編碼的複雜度會相對較高。

public class AioServer {  
  
    static ExecutorService executor = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),  
            new ThreadPoolExecutor.CallerRunsPolicy());  
    public static void main(String[] args) throws IOException, InterruptedException {  
        AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();  
        serverChannel.bind(new InetSocketAddress("127.0.0.1", 8091));  
  
            serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {  
                @Override  
                public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {  
                    // 繼續接受下一個連線  
                    serverChannel.accept(null, this);  
  
                    executor.execute(new AioServerHandler(clientChannel));  
                }  
  
                @Override  
                public void failed(Throwable exc, Void attachment) {  
                    exc.printStackTrace();  
                }  
            });  
  
            Thread.currentThread().join();  
        }  
          
}
public class AioServerHandler implements Runnable {  
  
    private AsynchronousSocketChannel clientChannel;  
    public  AioServerHandler(AsynchronousSocketChannel clientChannel){  
        this.clientChannel=clientChannel;  
    }  
    @Override  
    public void run() {  
        ByteBuffer inputBuff = ByteBuffer.allocate(1024); // 分配讀ByteBuffer  
        ByteBuffer outputBuff = ByteBuffer.allocate(1024); // 分配寫ByteBuffer  
        clientChannel.read(inputBuff, null, new CompletionHandler<Integer, Void>() {  
            @Override  
            public void completed(Integer bytesRead, Void attachment) {  
                if (bytesRead == -1) {  
                    // 使用者端關閉連線  
                    try {  
                        clientChannel.close();  
                    } catch (Exception e) {  
                        e.printStackTrace();  
                    }  
                    return;  
                }  
  
                inputBuff.flip();  
                byte[] data = new byte[bytesRead];  
                inputBuff.get(data);  
                String message = new String(data);  
                System.out.println("Received message: " + message);  
  
  
                outputBuff.clear();  
  
                outputBuff.put(message.getBytes(StandardCharsets.UTF_8));  
                outputBuff.flip();  
                clientChannel.write(outputBuff, null, new CompletionHandler<Integer, Void>() {  
                    @Override  
                    public void completed(Integer result, Void attachment) {  
                        System.out.println("Sent response: " + message);  
                    }  
  
                    @Override  
                    public void failed(Throwable exc, Void attachment) {  
                        exc.printStackTrace();  
                    }  
                });  
  
                inputBuff.clear();  
                clientChannel.read(inputBuff, null, this);  
            }  
  
            @Override  
            public void failed(Throwable exce, Void attachment) {  
                exce.printStackTrace();  
                try {  
                    clientChannel.close();  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
    }  
}

結語

​瞭解不同IO模型的特點與具體實現是我們設計與開發大型應用程式的基礎,有助於我們根據不同的應用場景與效能要求選用最適合的模型並進行架構設計;同時一些常用的網路服務架構如Mina與Netty也都基於高效能的IO模型進行了深度的封裝與擴充套件實現,我們可以通過結合這些框架的原始碼加深對網路IO模型在實際開發中應用的理解。後續我也會基於本章內容繼續豐富完善,闡述一個完整的網路應用程式需要具備的各種功能並進行實現。







github地址:https://github.com/dafanjoy/jcode

關注微信公眾號,檢視更多技術文章。