1、I/O 模型簡單的理解:就是用什麼樣的通道進行資料的傳送和接收,很大程度上決定了程式通訊的效能
2、Java 共支援 3 種網路程式設計模型/IO 模式:BIO、NIO、AIO
3、Java BIO : 同步並阻塞(傳統阻塞型),伺服器實現模式為一個連線一個執行緒,即使用者端有連線請求時伺服器端就需要啟動一個執行緒進行處理,如果這個連線不做任何事情會造成不必要的執行緒開銷
4、Java NIO :
同步非阻塞
,伺服器實現模式為一個執行緒處理多個請求(連線)
,即使用者端傳送的連線請求都會註冊到多路複用器
上,多路複用器輪詢到連線有 I/O 請求就進行處理
5、Java AIO(NIO.2) :
非同步非阻塞
,AIO 引入非同步通道的概念,採用了 Proactor 模式,簡化了程式編寫,有效的請求才啟動執行緒,它的特點是先由作業系統完成後才通知伺服器端程式啟動執行緒去處理,一般適用於連線數較多且連線時間較長
的應用
BIO :特點:同步並阻塞;使用場景:一個連線對應一個執行緒 2.執行緒開銷大 連線數目比較小且固定的架構,伺服器資源要求比較高,程式簡單易理解
NIO :特點:同步非阻塞;使用場景:一個執行緒處理多個請求(連線) 2.多路複用器輪詢到連線有 I/O 請求 連線數目多且連線比較短(輕操作),比如聊天伺服器,彈幕系統,伺服器間通訊等。程式設計比較複雜
AIO :特點:非同步非阻塞;使用場景:採用了 Proactor 模式 連線數較多且連線時間較長,比如相簿伺服器,充分呼叫 OS 參與並行操作,程式設計比較複雜
- Java BIO 就是傳統的 java io 程式設計,其相關的類和介面在 java.io
- BIO(blocking I/O): 同步阻塞,伺服器實現模式為一個連線一個執行緒,即使用者端有連線請求時伺服器端就需要啟動一個執行緒進行處理,如果這個連線不做任何事情會造成不必要的執行緒開銷,可以通過執行緒池機制改善(實現多個客戶連線伺服器)。
- BIO 方式適用於連線數目比較小且固定的架構,這種方式對伺服器資源要求比較高,並行侷限於應用中,JDK1.4以前的唯一選擇,程式簡單易理解
BIO 程式設計流程的梳理:
伺服器端啟動一個 ServerSocket
使用者端啟動 Socket 對伺服器進行通訊,預設情況下伺服器端需要對每個客戶建立一個執行緒與之通訊
使用者端發出請求後, 先諮詢伺服器是否有執行緒響應,如果沒有則會等待,或者被拒絕
如果有響應,使用者端執行緒會等待請求結束後,在繼續執行
範例說明:
- 使用 BIO 模型編寫一個伺服器端,監聽 6666 埠,當有使用者端連線時,就啟動一個執行緒與之通訊。
- 要求使用執行緒池機制改善,可以連線多個使用者端.
- 伺服器端可以接收使用者端傳送的資料(telnet 方式即可)。
package com.sun.bio; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2022/12/20 10:49 **/ public class BioServer { public static void main(String[] args) throws Exception { // 1、建立執行緒池,為多個使用者端提供執行緒處理請求 ExecutorService executorService = Executors.newCachedThreadPool(); // 2、建立socket伺服器,繫結埠9999 ServerSocket serverSocket = new ServerSocket(9999); // 3、迴圈接收使用者端請求 while (true) { // 4、接收使用者端請求,獲取請求的使用者端 Socket acceptSocket = serverSocket.accept(); // 5、執行緒池處理請求 executorService.submit(() -> { handle(acceptSocket); }); } } // 6、處理請求的方法 private static void handle(Socket socket) { System.out.println("處理當前請求的執行緒ID:" + Thread.currentThread().getId()); InputStream inputStream = null; byte[] bytes = new byte[1024]; try { inputStream = socket.getInputStream(); while (true) { int read = inputStream.read(bytes); if (read != -1) { System.out.println("執行緒ID[" + Thread.currentThread().getId() + "]接收到的資料:" + new String(bytes, 0, read)); } else { break; } } } catch (Exception e) { e.printStackTrace(); }finally { try { inputStream.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
- 每個請求都需要建立獨立的執行緒,與對應的使用者端進行資料 Read,業務處理,資料 Write 。
- 當並行數較大時,需要建立大量執行緒來處理連線,系統資源佔用較大。
- 連線建立後,如果當前執行緒暫時沒有資料可讀,則執行緒就阻塞在 Read 操作上,造成執行緒資源浪費
Java NIO 全稱 java non-blocking IO,是指 JDK 提供的新 API。從 JDK1.4 開始,Java 提供了一系列改進的輸入/輸出的新特性,被統稱為 NIO(即 New IO),是同步非阻塞的
NIO 相關類都被放在 java.nio 包及子包下,並且對原 java.io 包中的很多類進行改寫。
NIO 有三大核心部分:Channel( 通道),Buffer( 緩衝區), Selector( 選擇器)
NIO 是區面向緩衝區,向或者面向塊程式設計的。資料讀取到一個它稍後處理的緩衝區,需要時可在緩衝區中前後移動,這就增加了處理過程中的靈活性,使用它可以提供非阻塞式的高伸縮性網路
Java NIO 的非阻塞模式,使一個執行緒從某通道傳送請求或者讀取資料,但是它僅能得到目前可用的資料,如果目前沒有資料可用時,就什麼都不會獲取,而不是保持執行緒阻塞,所以直至資料變的可以讀取之前,該執行緒可以繼續做其他的事情。非阻塞寫也是如此,一個執行緒請求寫入一些資料到某通道,但不需要等待它完全寫入,這個執行緒同時可以去做別的事情。
通俗理解:NIO 是可以做到用一個執行緒來處理多個操作的。 假設有 10000 個請求過來,根據實際情況,可以分配50 或者 100 個執行緒來處理。不像之前的阻塞 IO 那樣,非得分配 10000 個。
HTTP2.0 使用了多路複用的技術,做到同一個連線並行處理多個請求,而且並行請求的數量比 HTTP1.1 大了好幾個數量級
案例說明 NIO 的 Buffer
package com.sun.netty.Buffer; import java.nio.IntBuffer; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2022/12/20 11:29 **/ public class BasicBuffer { public static void main(String[] args) { // 1、建立一個儲存int型別資料的Buffer,儲存容量為5 IntBuffer intBuffer = IntBuffer.allocate(5); // 2、向Buffer中新增資料 for (int i = 0; i < intBuffer.capacity(); i++) { intBuffer.put(i*2); } // 3、如何從Buffer中讀取資料? // 3.1、讀寫轉換操作,必須做 intBuffer.flip(); // 4、正式讀取 while(intBuffer.hasRemaining()){ System.out.println(intBuffer.get()); } } }
1、BIO 以流的方式處理資料,而 NIO 以塊的方式處理資料,塊 I/O 的效率比流 I/O 高很多
2、BIO 是阻塞的,NIO 則是非阻塞的
3、BIO 基於位元組流和字元流進行操作,而 NIO 基於 Channel(通道)和 Buffer(緩衝區)進行操作,資料總是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中。Selector(選擇器)用於監聽多個通道的事件(比如:連線請求,資料到達等),因此使用單個執行緒就可以 監聽多個使用者端通道
關係圖的說明:
- 每個 channel 都會對應一個 Buffer
- Selector 對應一個執行緒, 一個執行緒對應多個 channel(連線)
- 該圖反應了有三個 channel 註冊到 該 selector //程式
- 程式切換到哪個 channel 是有事件決定的, Event 就是一個重要的概念
- Selector 會根據不同的事件,在各個通道上切換
- Buffer 就是一個記憶體塊 , 底層是有一個陣列
- 資料的讀取寫入是通過 Buffer, 這個和 BIO , BIO 中要麼是輸入流,或者是輸出流, 不能雙向,但是 NIO 的 Buffer 是可以讀也可以寫, 需要 flip 方法切換channel 是雙向的, 可以返回底層作業系統的情況, 比如 Linux , 底層的作業系統通道就是雙向的.
緩衝區(Buffer):
緩衝區本質上是一個可以讀寫資料的記憶體塊,可以理解成是一個容器物件( 含陣列),該物件提供了一組方法,可以更輕鬆地使用記憶體塊,,緩衝區物件內建了一些機制,能夠跟蹤和記錄緩衝區的狀態變化情況。Channel 提供從檔案、網路讀取資料的渠道,但是讀取或寫入的資料都必須經由 Buffer.
1、在 NIO 中,Buffer 是一個頂層父類別,它是一個抽象類, 類的層級關係圖:
2、Buffer 類定義了所有的緩衝區都具有的四個屬性來提供關於其所包含的資料元素的資訊:
3、Buffer 類相關方法一覽
從前面可以看出對於 Java 中的基本資料型別(boolean 除外),都有一個 Buffer 型別與之相對應,最常用的自然是 ByteBuffer 類(二進位制資料),該類的主要方法如下:
1、NIO 的通道類似於流,但有些區別如下:
- 通道可以
同時進行讀寫
,而流只能讀或者只能寫- 通道可以實現
非同步讀寫
資料- 通道可以
從緩衝讀資料
,也可以寫資料到緩衝:
2、BIO 中的 stream 是單向的,例如 FileInputStream 物件只能進行讀取資料的操作,而 NIO 中的通道(Channel)是雙向的,可以讀操作,也可以寫操作。
3、Channel 在 NIO 中是一個介面 public interface Channel extends Closeable{}
4、常用的Channel 類有 :
- FileChannel
- DatagramChannel
- ServerSocketChannel
- SocketChannel
【ServerSocketChanne 類似 ServerSocket , SocketChannel 類似 Socket】
5、FileChannel 用於檔案的資料讀寫,DatagramChannel 用於 UDP 的資料讀寫,ServerSocketChannel 和SocketChannel 用於 TCP 的資料讀寫。
6、圖示
FileChannel 主要用來對本地檔案進行 IO 操作,常見的方法有
public int read(ByteBuffer dst) ,從通道讀取資料並放到緩衝區中
public int write(ByteBuffer src) ,把緩衝區的資料寫到通道中
public long transferFrom(ReadableByteChannel src, long position, long count),從目標通道中複製資料到當前通道
public long transferTo(long position, long count, WritableByteChannel target),把資料從當前通道複製給目標通道
package com.sun.netty.Buffer; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2022/12/21 9:30 **/ public class FileChannel01 { public static void main(String[] args) throws Exception { String str="hello,你好!"; // 1、建立檔案輸出流 FileOutputStream fileOutputStream = new FileOutputStream("1.txt"); // 2、根據檔案輸出流獲取channel FileChannel channel = fileOutputStream.getChannel(); // 3、定義Buffer ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 4、將資料放入緩衝區Buffer byteBuffer.put(str.getBytes(StandardCharsets.UTF_8)); // 5、切記!!!Buffer讀寫轉換 byteBuffer.flip(); // 5、將緩衝區資料寫入管道channel channel.write(byteBuffer); // 6、關閉資源 fileOutputStream.close(); } }
package com.sun.netty.Buffer; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2022/12/21 9:44 **/ public class FileChannel02 { public static void main(String[] args) throws Exception { // 1、獲取檔案輸入流 FileInputStream fileInputStream = new FileInputStream("1.txt"); // 2、通過檔案輸入流獲取channel FileChannel channel = fileInputStream.getChannel(); // 3、建立Buffer ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 4、將檔案從管道讀取到快取中 int read = channel.read(byteBuffer); // 5、輸出讀取文字 System.out.println(new String(byteBuffer.array(),0,read)); // 6、關閉資源 fileInputStream.close(); } }
package com.sun.netty.Buffer; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2022/12/21 9:53 **/ public class FileChannel03 { public static void main(String[] args) throws Exception { // 1、獲取檔案輸入流及channel FileInputStream fileInputStream = new FileInputStream("th.jpg"); FileChannel inputStreamChannel = fileInputStream.getChannel(); // 2、獲取檔案輸出流及channel FileOutputStream fileOutputStream = new FileOutputStream("th_copy.jpg"); FileChannel outputStreamChannel = fileOutputStream.getChannel(); // 3、獲取緩衝區 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 4、迴圈讀取檔案並儲存流內容到目標檔案 while (true) { // 5、切記此步驟,漏寫則會執行不成功 byteBuffer.clear(); int read = inputStreamChannel.read(byteBuffer); if (read == -1) { break; } // 6、切記!!!讀寫轉換 byteBuffer.flip(); outputStreamChannel.write(byteBuffer); } outputStreamChannel.close(); inputStreamChannel.close(); } }
package com.sun.netty.Buffer; import java.io.FileInputStream; import java.io.FileOutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2022/12/21 9:53 **/ public class FileChannel04 { public static void main(String[] args) throws Exception { // 1、獲取檔案輸入流及channel FileInputStream fileInputStream = new FileInputStream("th.jpg"); FileChannel inputStreamChannel = fileInputStream.getChannel(); // 2、獲取檔案輸出流及channel FileOutputStream fileOutputStream = new FileOutputStream("th_copy2.jpg"); FileChannel outputStreamChannel = fileOutputStream.getChannel(); // 3、獲取緩衝區 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 4、此處有兩種方式 // 4.1 transferFrom() // outputStreamChannel.transferFrom(inputStreamChannel,0, inputStreamChannel.size()); // 4.2 transferTo() inputStreamChannel.transferTo(0, inputStreamChannel.size(), outputStreamChannel); // 5、關閉資源 outputStreamChannel.close(); inputStreamChannel.close(); } }
1、存入、讀取型別
ByteBuffer 支援型別化的 put 和 get, put
放入的是什麼資料型別
,get 就應該使用相應的資料型別來取出
,否則可能有BufferUnderflowException 異常
package com.sun.netty.Buffer; import java.nio.ByteBuffer; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2022/12/21 11:04 **/ public class NIOBufferPutGet { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(64 ); //型別化方式放入資料 buffer.putInt(100); buffer.putLong(9); buffer.putChar('強'); buffer.putShort((short) 4); //取出,順序與放入的順序一致,求型別一致 buffer.flip(); System.out.println(buffer.getInt()); System.out.println(buffer.getLong()); System.out.println(buffer.getChar()); System.out.println(buffer.getShort()); } }
2、可以將一個普通 Buffer 轉成唯讀 Buffer
package com.sun.netty.Buffer; import java.nio.ByteBuffer; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2022/12/21 11:07 **/ public class ReadOnlyBuffer { public static void main(String[] args) { // 1、建立一個 buffer ByteBuffer buffer = ByteBuffer.allocate(64); for (int i = 0; i < 64; i++) { //給其放入0-63個數位 buffer.put((byte) i); } // 2、讀寫轉換 buffer.flip(); // 3、得到一個唯讀的 Buffer ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); System.out.println(readOnlyBuffer.getClass()); // class java.nio.HeapByteBufferR // 4、讀取 while (readOnlyBuffer.hasRemaining()) { // 判斷是否還有資料 System.out.println(readOnlyBuffer.get()); // 取出,並給position+1 } // 5、測試只能讀取,不能在put寫入 readOnlyBuffer.put((byte) 100); // ReadOnlyBufferException } }
3、NIO 還提供了 MappedByteBuffer, 可以讓檔案直接在記憶體(堆外的記憶體)中進行修改, 而如何同步到檔案由 NIO 來完成
/* 說明 1. MappedByteBuffer 可讓檔案直接在記憶體(堆外記憶體)修改, 作業系統不需要拷貝一次 */ public class MappedByteBufferTest { public static void main(String[] args) throws Exception { RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw"); //獲取對應的通道 FileChannel channel = randomAccessFile.getChannel(); /** * 引數1: FileChannel.MapMode.READ_WRITE 使用的讀寫模式 * 引數2: 0 : 可以直接修改的起始位置,位元組位置 * 引數3: 5: 是對映到記憶體的大小(不是索引位置) ,即將 1.txt 的多少個位元組對映到記憶體 * 可以直接修改的範圍就是 0-5 * 實際型別 DirectByteBuffer */ MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5); mappedByteBuffer.put(0, (byte) 'H'); mappedByteBuffer.put(3, (byte) '9'); mappedByteBuffer.put(5, (byte) 'Y');//IndexOutOfBoundsException //關閉資源 randomAccessFile.close(); System.out.println("修改成功~~"); } }
4、NIO 還支援 通過多個 Buffer (即 Buffer 陣列) 完成讀寫操作,即 Scattering 和Gathering
/** * Scattering:將資料寫入到 buffer 時,可以採用 buffer 陣列,依次寫入 [分散] * Gathering: 從 buffer 讀取資料時,可以採用 buffer 陣列,依次讀 */ public class ScatteringAndGatheringTest { public static void main(String[] args) throws Exception { //使用 ServerSocketChannel 和 SocketChannel 網路 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress(7000); //繫結埠到 socket ,並啟動 serverSocketChannel.socket().bind(inetSocketAddress); //建立 buffer 陣列 ByteBuffer[] byteBuffers = new ByteBuffer[2]; byteBuffers[0] = ByteBuffer.allocate(5); byteBuffers[1] = ByteBuffer.allocate(3); //等使用者端連線(telnet) SocketChannel socketChannel = serverSocketChannel.accept(); int messageLength = 8; //假定從使用者端接收 8 個位元組 //迴圈的讀取 while (true) { int byteRead = 0; while (byteRead < messageLength ) { long l = socketChannel.read(byteBuffers); byteRead += l; //累計讀取的位元組數 System.out.println("byteRead=" + byteRead); //使用流列印, 看看當前的這個 buffer 的 position 和 limit Arrays.asList(byteBuffers).stream().map(buffer -> "postion=" + buffer.position() + ", limit=" + buffer.limit()).forEach(System.out::println); } //將所有的 buffer 進行 flip Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip()); //將資料讀出顯示到使用者端 long byteWirte = 0; while (byteWirte < messageLength) { long l = socketChannel.write(byteBuffers); // byteWirte += l; } //將所有的 buffer 進行 clear Arrays.asList(byteBuffers).forEach(buffer-> { buffer.clear(); }); System.out.println("byteRead:=" + byteRead + " byteWrite=" + byteWirte + ", messagelength" + messageLength); } } }
1、Java 的 NIO,用非阻塞的 IO 方式。可以用一個執行緒,處理多個的使用者端連線,就會使用到 Selector(選擇器)
2、Selector 能夠檢測多個註冊的通道上是否有事件發生(注意:多個 Channel 以事件的方式可以註冊到同一個Selector),如果有事件發生,便獲取事件然後針對每個事件進行相應的處理。這樣就可以只用一個單執行緒去管理多個通道,也就是管理多個連線和請求。
3、只有在 連線/通道 真正有讀寫事件發生時,才會進行讀寫,就大大地減少了系統開銷,並且不必為每個連線都建立一個執行緒,不用去維護多個執行緒
4、避免了多執行緒之間的上下文切換導致的開銷
- Netty 的 IO 執行緒 NioEventLoop 聚合了 Selector(選擇器,也叫多路複用器),可以同時並行處理成百上千個使用者端連線。
- 當執行緒從某使用者端 Socket 通道進行讀寫資料時,若沒有資料可用時,該執行緒可以進行其他任務。
- 執行緒通常將非阻塞 IO 的空閒時間用於在其他通道上執行 IO 操作,所以單獨的執行緒可以管理多個輸入和輸出通道。
- 由於讀寫操作都是非阻塞的,這就可以充分提升 IO 執行緒的執行效率,避免由於頻繁 I/O 阻塞導致的執行緒掛起。
- 一個 I/O 執行緒可以並行處理 N 個使用者端連線和讀寫操作,這從根本上解決了傳統同步阻塞 I/O 一連線一執行緒模型,架構的效能、彈性伸縮能力和可靠性都得到了極大的提升。
Selector 類是一個抽象類
, 常用方法和說明如下:
1、NIO 中的 ServerSocketChannel 功能類似 ServerSocket,SocketChannel 功能類似 Socket
2、Selector 相關方法說明
- selector.select()//阻塞
- selector.select(1000);//阻塞 1000 毫秒,在 1000 毫秒後返回
- selector.wakeup();//喚醒 selector
- selector.selectNow();//不阻塞,立馬返還
對上圖的說明:
- 當用戶端連線時,會通過 ServerSocketChannel 得到 SocketChannel
- Selector 進行監聽 select 方法, 返回有事件發生的通道的個數.
- 將 socketChannel 註冊到 Selector 上, register(Selector sel, int ops), 一個 selector 上可以註冊多個 SocketChannel
- 註冊後返回一個 SelectionKey, 會和該 Selector 關聯(集合)
- 進一步得到各個 SelectionKey (有事件發生)
- 在通過 SelectionKey 反向獲取 SocketChannel , 方法 channel()
- 可以通過 得到的 channel , 完成業務處理
程式碼:
NIOServer:伺服器
package com.sun.netty.Selector; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2022/12/21 17:06 **/ public class NIOServer { public static void main(String[] args) throws Exception { // 1、建立ServerSocketChannel物件 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 2、建立Selector物件 Selector selector = Selector.open(); // 3、繫結埠 serverSocketChannel.bind(new InetSocketAddress(8989)); // 4、設定ServerSocketChannel為非阻塞 serverSocketChannel.configureBlocking(false); // 5、繫結ServerSocketChannel到Selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 6、迴圈接收使用者端訊息 while (true) { if (selector.select(60000) == 0) { System.out.println("等待60秒鐘無連線..."); continue; } // 如果返回的>0, 就獲取到相關的 selectionKey 集合 // 1.如果返回的>0, 表示已經獲取到關注的事件 // 2. selector.selectedKeys() 返回關注事件的集合 // 通過 selectionKeys 反向獲取通道 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 遍歷 Set<SelectionKey>, 使用迭代器遍歷 Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { // 獲取到 SelectionKey SelectionKey selectionKey = iterator.next(); // 根據 key 對應的通道發生的事件做相應處理 if (selectionKey.isAcceptable()) { // 如果是 OP_ACCEPT, 有新的使用者端連線 // 該使用者端生成一個 SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("伺服器端連線上一個請求:" + socketChannel.hashCode()); // 將 SocketChannel 設定為非阻塞 socketChannel.configureBlocking(false); // 將 socketChannel 註冊到 selector, 關注事件為 OP_READ, 同時給 socketChannel // 關聯一個 Buffer socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } if (selectionKey.isReadable()) { // 發生 OP_READ // 通過 key 反向獲取到對應 channel SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); // 獲取到該 channel 關聯的 buffer ByteBuffer buffer = (ByteBuffer) selectionKey.attachment(); buffer.clear(); socketChannel.read(buffer); if (buffer.capacity() - buffer.remaining() == 0) { System.out.println("使用者端:" + socketChannel.hashCode() + "斷開連線..."); } else { System.out.println("來自使用者端" + socketChannel.hashCode() + "的訊息:" + new String(buffer.array(), 0, buffer.capacity() - buffer.remaining())); } } // 手動從集合中移動當前的 selectionKey, 防止重複操作 iterator.remove(); } } } }
NIOClient:使用者端
package com.sun.netty.Selector; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2022/12/21 17:57 **/ public class NIOClient { public static void main(String[] args) throws Exception { //得到一個網路通道 SocketChannel socketChannel = SocketChannel.open(); //設定非阻塞 socketChannel.configureBlocking(false); //提供伺服器端的 ip 和 埠 InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8989); //連線伺服器 if (!socketChannel.connect(inetSocketAddress)) { while (!socketChannel.finishConnect()) { System.out.println("因為連線需要時間,使用者端不會阻塞,可以做其它工作.."); } } //...如果連線成功,就傳送資料 String str = "hello, 阿昌~"; //Wraps a byte array into a buffer ByteBuffer buffer = ByteBuffer.wrap(str.getBytes()); //傳送資料,將 buffer 資料寫入 channel socketChannel.write(buffer); System.in.read(); } }
1、SelectionKey,表示 Selector 和網路通道的註冊關係
int OP_ACCEPT:有新的網路連線可以 accept,值為 16
int OP_CONNECT:代表連線已經建立,值為 8
int OP_READ:代表讀操作,值為 1
int OP_WRITE:代表寫操作,值為 4
原始碼中:
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
2、SelectionKey 相關方法
ServerSocketChannel 在伺服器端監聽新的使用者端 Socket 連線
專門負責監聽新的使用者端,獲取對應的SocketChannel
SocketChannel,網路 IO 通道,具體負責進行讀寫操作。
NIO 把緩衝區的資料寫入通道,或者把通道里的資料讀到緩衝區。
範例要求:
1、編寫一個 NIO 群聊系統,實現伺服器端和使用者端之間的資料簡單通訊(非阻塞)
2、實現多人群聊
3、伺服器端:可以監測使用者上線,離線,並實現訊息轉發功能
4、使用者端:通過 channel 可以無阻塞傳送訊息給其它所有使用者,同時可以接受其它使用者傳送的訊息(有伺服器轉發得到)
5、目的:進一步理解 NIO 非阻塞網路程式設計機制
程式碼:
伺服器端
package com.sun.netty.GroupChat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; /** * 群聊伺服器端 * 1、訊息接收、轉發 * 2、上線、離線提醒 */ public class GroupChatServer { // 定義全域性變數 private ServerSocketChannel serverSocketChannel; private Selector selector; private static final int PORT = 9999; // 構造方法初始化 public GroupChatServer() { try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(PORT)); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } // 監聽方法 public void listen() { try { while (true) { int selectCount = selector.select(); if (selectCount > 0) { Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator(); while (selectionKeyIterator.hasNext()) { SelectionKey selectionKey = selectionKeyIterator.next(); if (selectionKey.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println(socketChannel.getRemoteAddress() + ":已上線..."); } if (selectionKey.isReadable()) { readData(selectionKey); } } // 注意點!!! selectionKeyIterator.remove(); } else { System.out.println("伺服器端等待連線..."); } } } catch (IOException e) { e.printStackTrace(); } finally { // 省略 } } private void readData(SelectionKey selectionKey) { SocketChannel socketChannel = null; try { socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int read = socketChannel.read(buffer); if (read > 0) { String msg = new String(buffer.array()); System.out.println("來自使用者端[" + socketChannel.getRemoteAddress() + "]:" + msg); transferToOtherClient(selectionKey, msg); } } catch (IOException e) { try { System.out.println("使用者端[" + socketChannel.getRemoteAddress() + "]:離線了..."); selectionKey.cancel(); socketChannel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } private void transferToOtherClient(SelectionKey selectionKey, String msg) throws IOException { System.out.println("伺服器正在轉發訊息..."); Iterator<SelectionKey> iterator = selector.keys().iterator(); SocketChannel senderChannel = (SocketChannel) selectionKey.channel(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); Channel channel = key.channel(); if (channel instanceof SocketChannel && key != selectionKey) { SocketChannel receiveSocketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes()); receiveSocketChannel.write(byteBuffer); } } } public static void main(String[] args) { GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); } }
使用者端
package com.sun.netty.GroupChat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; public class GroupChatClient { private static final String ip = "127.0.0.1"; private static final int PORT = 9999; private SocketChannel channel; private Selector selector; private String username; public GroupChatClient() { try { channel = SocketChannel.open(new InetSocketAddress(ip, PORT)); channel.configureBlocking(false); selector = Selector.open(); channel.register(selector, SelectionKey.OP_READ); username = channel.getLocalAddress().toString(); System.out.println(username + ":上線成功..."); } catch (IOException e) { e.printStackTrace(); } } public void sendMsg(String msg) { // 注意點!!! msg = username + ":" + msg; try { channel.write(ByteBuffer.wrap(msg.getBytes())); } catch (IOException e) { e.printStackTrace(); } } public void getMsg() { try { int selectCount = selector.select(); if (selectCount > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); channel.read(byteBuffer); System.out.println(new String(byteBuffer.array()).trim()); } } iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { GroupChatClient client = new GroupChatClient(); new Thread(() -> { while (true) { client.getMsg(); try { Thread.currentThread().sleep(3000); } catch (Exception e) { e.printStackTrace(); } } }).start(); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String next = scanner.nextLine(); client.sendMsg(next); } } }
涉及計算機系統底層,如果想要詳細瞭解請點選此處檢視相關pdf。零拷貝原理
1、JDK 7 引入了 Asynchronous I/O,即 AIO。在進行 I/O 程式設計中,常用到兩種模式:Reactor 和 Proactor。
2、Java 的NIO 就是 Reactor,當有事件觸發時,伺服器端得到通知,進行相應的處理
3、AIO 即 NIO2.0,叫做非同步不阻塞的 IO。
4、AIO 引入非同步通道的概念,採用了 Proactor 模式,簡化了程式編寫,有效的請求才啟動執行緒,它的特點是先由作業系統完成後才通知伺服器端程式啟動執行緒去處理,一般適用於連線數較多且連線時間較長的應用 目前 AIO 還沒有廣泛應用,Netty 也是基於 NIO,而不是 AIO, 因此我們就不詳解 AIO 了
5、有興趣可參考 <<Java 新 一 代 網 絡 編 程 模 型 AIO 原 理 及 Linux 系 統 AIO 介 紹 >> http://www.52im.net/thread-306-1-1.html