參考:[nio.pdf (oswego.edu)](https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.
Reactor模式是一種用於處理高並行的設計模式,也被稱為事件驅動模式。在這種模式中,應用程式會將輸入事件交給一個事件處理器,稱為Reactor,Reactor會監聽所有輸入事件,並將它們分發
給相應的處理程式進行處理。這種模式可以大大提高應用程式的效能和可延伸性,因為它使用了非阻塞I/O和非同步處理技術,使得一個程序可以同時處理多個事件,而不會因為某個事件的處理時間過長而影響其他事件的處理。Reactor模式被廣泛應用於網路程式設計和作業系統級別的事件驅動程式。
在傳統BIO模式中有多少個使用者端請求,就需要多少個對於的執行緒進行一對一的處理。
這種模型有如下缺點:
Java NIO 帶來非阻塞IO,和IO多路複用。
得益於非阻塞IO和IO多路複用,讓服務可以處理更多的並行請,不再受限於一個使用者端一個執行緒來處理,而是一個執行緒可以維護多個使用者端。
可以看到java 中NIO有點reactor的意思:
Selector多路複用器監聽IO事件進行分發,針對連線事件,讀寫事件進行不同的處理。
Reactor核心是Reactor加上對應的處理器Handler,Reactor在一個單獨的執行緒中執行,負責監聽和分發事件,將接收到的事件交給不同的Handler來處理,Handler是處理程式執行I/O事件的實際操作。
得益於Java NIO 非阻塞IO 於 IO多路複用
。例如Netty就使用了Reactor模式,程式設計師只需要寫如何處理事件
這個模型詮釋了Reactor模式的組成部分:
public class Reactor implements Runnable {
//多路複用器
final Selector selector;
//伺服器端Channel
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(
new InetSocketAddress(port));
serverSocket.configureBlocking(false);
// 註冊io多路複用器連線事件
SelectionKey sk =
serverSocket.register(selector,
SelectionKey.OP_ACCEPT);
// 將伺服器端Channel 關聯一個Acceptor
sk.attach(new Acceptor());
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext())
//分發
dispatch(it.next());
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
// 拿到關聯的acceptor 或者handler
Runnable r = (Runnable) (k.attachment());
if (r != null)
r.run();
}
//內部類 負責處理連線事件
class Acceptor implements Runnable {
public void run() {
try {
// 拿到Channel
SocketChannel c = serverSocket.accept();
if (c != null)
// 建立handler
new Handler(selector, c);
} catch (IOException ex) { /* ... */ }
}
}
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
//設定非阻塞
//監聽可讀事件
Handler(Selector sel, SocketChannel c)
throws IOException {
socket = c;
c.configureBlocking(false);
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() {
return false;
}
boolean outputIsComplete() {
return false;
}
void process() {
}
public void run() {
try {
//如果可讀
if (state == READING) read();
//如果可寫
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
}
}
可以看到Reactor模式將Channel和Acceptor,Handler進行繫結依賴於SelectionKey#attach
方法,通過此方法在不同的事件發生時呼叫SelectionKey#attachment
方法,獲取到對應的處理程式進行處理。
Reactor由單執行緒執行,通過IO多路複用Selector監聽多個事件是否就緒,得益於Channel提供的非阻塞IO能力,當IO沒有就緒的時候,單執行緒不會阻塞而是繼續處理下一個。
由於其單執行緒的原因,無法利用計算機多核心資源,並且如果讀取請求內容處理的過程存在耗時操作(比如資料庫,rpc等)那麼回導致下一個事件得不到快速的響應。
引入多執行緒解決單執行緒Reactor的不足
可以看到多執行緒模型引入了執行緒池,對於就緒的可讀,可寫IO事件交給執行緒池進行處理。
主要是對單執行緒模型中的Handler進行改造,將處理邏輯提交到執行緒池中。
多執行緒模型涉及到共用資源的使用,不如讀寫Channel依賴的Buffer如何分配。
可以看到多執行緒模型的缺點:執行緒通訊和同步邏輯複雜,需要處理多執行緒安全問題。
在這種模型中,mainReactor負責處理連線建立事件,只需要一個執行緒即可。subReactor負責和建立連線的socket進行資料互動並處理業務邏輯,並且每一個subReactor可持有一個獨立的Selector進行IO多路複用事件監聽。
// SubReactor 池子,負責負載均衡的選擇SubReactor
public class SubReactorPool {
final static SubReactor[] subReactors;
static final AtomicInteger count = new AtomicInteger();
static {
int availableProcessors = Runtime.getRuntime().availableProcessors();
subReactors = new SubReactor[availableProcessors];
for (int i = 0; i < subReactors.length; i++) {
subReactors[i] = new SubReactor();
}
}
static class SubReactor implements Runnable{
// 業務處理執行緒池
final static Executor poolExecutor = Executors.newCachedThreadPool();
// io多路複用
Selector selector;
SubReactor() {
try {
selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void registry(SocketChannel socketChannel) throws ClosedChannelException {
socketChannel.register(selector,SelectionKey.OP_READ);
}
@Override
public void run() {
while (true){
try {
selector.select();
} catch (IOException e) {
throw new RuntimeException(e);
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey sk = iterator.next();
if (sk.isReadable()) {
poolExecutor.execute(()->new Handler(sk));
}
// 可寫,。。。。
iterator.remove();
}
}
}
}
//選擇合適的SubReactor
static SubReactor loadBalanceChoose(SocketChannel socketChannel){
int countInt = count.getAndAdd(1);
return subReactors[countInt % subReactors.length];
}
}
多Reactor解決了單個Selector註冊連線,讀寫事件,導致核心輪詢的時候需要判斷太多fd而效率緩慢的問題。
在Tomcat請求處理流程與原始碼淺析 - Cuzzz - 部落格園 (cnblogs.com)中,說到Tomcat Connector的設計
其中
下圖展示了Acceptor 和 Poller的共同作業
這一步藉助ServerSocketChannel#accept方法,進行等待使用者端連線,Acceptor單執行緒進行監聽。
這一步設定非阻塞,並且使用計數取模的方式實現多個Poller的負載均衡
然後將事件保證為PollerEvent 提交到Poller的阻塞佇列中
輪詢阻塞佇列中的PollerEvent,並且呼叫run方法,run方法會把事件註冊到Poller的Selector上,注意下面的註冊將NioSocketWrapper作為attachment進行了繫結
tomcat處理事件的時候,會建立出SocketProcessor進行處理,SocketProcessor是一個Runnable,最後會提交到執行緒池。