Reactor 模式與Tomcat中的Reactor

2023-06-23 18:00:14

系列文章目錄和關於我

參考:[nio.pdf (oswego.edu)](https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)

一丶什麼是Reactor

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.


Reactor模式是一種用於處理高並行的設計模式,也被稱為事件驅動模式。在這種模式中,應用程式會將輸入事件交給一個事件處理器,稱為Reactor,Reactor會監聽所有輸入事件,並將它們分發給相應的處理程式進行處理。這種模式可以大大提高應用程式的效能和可延伸性,因為它使用了非阻塞I/O和非同步處理技術,使得一個程序可以同時處理多個事件,而不會因為某個事件的處理時間過長而影響其他事件的處理。Reactor模式被廣泛應用於網路程式設計和作業系統級別的事件驅動程式。

二丶為什麼需要Reactor

1.傳統BIO

在傳統BIO模式中有多少個使用者端請求,就需要多少個對於的執行緒進行一對一的處理。

這種模型有如下缺點:

  • 同步阻塞IO,讀寫阻塞,大量執行緒掛起
  • 指定執行緒數的時候,只能依據系統的cpu核心數,無法根據並行請求數來指定。
  • 大量執行緒導致上下文切換開銷大,執行緒佔用記憶體大。

2.NIO

Java NIO 帶來非阻塞IO,和IO多路複用。

得益於非阻塞IO和IO多路複用,讓服務可以處理更多的並行請,不再受限於一個使用者端一個執行緒來處理,而是一個執行緒可以維護多個使用者端。

可以看到java 中NIO有點reactor的意思:

Selector多路複用器監聽IO事件進行分發,針對連線事件,讀寫事件進行不同的處理。


Reactor核心是Reactor加上對應的處理器Handler,Reactor在一個單獨的執行緒中執行,負責監聽和分發事件,將接收到的事件交給不同的Handler來處理,Handler是處理程式執行I/O事件的實際操作。

  • 高並行:Reactor模式可以在同一時間內處理大量的使用者端請求,提高了系統的並行處理能力。得益於Java NIO 非阻塞IO 於 IO多路複用
  • 可延伸性:Reactor模式可以很容易地擴充套件到更多的處理器,以滿足更高的並行量。
  • 編碼簡單:Reactor模式可以使編碼更加簡單明瞭,因為它將不同的事件分離開來處理,降低了程式碼的複雜度。例如Netty就使用了Reactor模式,程式設計師只需要寫如何處理事件
  • 效率高:Reactor模式採用非阻塞I/O和非同步處理技術,可以使得一個程序可以同時處理多個事件,而不會因為某個事件的處理時間過長而影響其他事件的處理,從而提高了系統的效率。
  • 可移植性好:Reactor模式可以很方便地移植到不同的平臺上,因為它遵循了標準的Java NIO介面,可以在不同的作業系統上實現。

三丶Reactor模型於簡單程式碼實現

1.單Reactor單執行緒模型

這個模型詮釋了Reactor模式的組成部分:

  • Reactor 負責分離通訊端,對於觸發connect的io事件交給Acceptor處理,對於IO讀寫事件交給Handler處理
  • Acceptor負責建立Handler,將Handler和socketChannel進行繫結,當socketChannel讀事件觸發後,Reactor進行分發給對應Handler處理。
public class Reactor implements Runnable {
    
    //多路複用器
    final Selector selector;
	//伺服器端Channel
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(
                new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        // 註冊io多路複用器連線事件
        SelectionKey sk =
                serverSocket.register(selector,
                        SelectionKey.OP_ACCEPT);
        // 將伺服器端Channel 關聯一個Acceptor
        sk.attach(new Acceptor());
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext())
                    //分發
                    dispatch(it.next());
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        // 拿到關聯的acceptor 或者handler
        Runnable r = (Runnable) (k.attachment());
        if (r != null)
            r.run();
    }
	
    //內部類 負責處理連線事件
    class Acceptor implements Runnable {
        public void run() {
            try {
                // 拿到Channel
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    // 建立handler
                    new Handler(selector, c);
            } catch (IOException ex) { /* ... */ }
        }
    }

    final class Handler implements Runnable {
        final SocketChannel socket;
        final SelectionKey sk;
        ByteBuffer input = ByteBuffer.allocate(1024);
        ByteBuffer output = ByteBuffer.allocate(1024);
        static final int READING = 0, SENDING = 1;
        int state = READING;

        //設定非阻塞
        //監聽可讀事件
        Handler(Selector sel, SocketChannel c)
                throws IOException {
            socket = c;
            c.configureBlocking(false);
            sk = socket.register(sel, 0);
            sk.attach(this);
            sk.interestOps(SelectionKey.OP_READ);
            sel.wakeup();
        }

        boolean inputIsComplete() {
            return false;
        }

        boolean outputIsComplete() {
            return false;
        }

        void process() {
        }

        public void run() {
            try {
                //如果可讀
                if (state == READING) read();
				//如果可寫
                else if (state == SENDING) send();
            } catch (IOException ex) { /* ... */ }
        }

        void read() throws IOException {
            socket.read(input);
            if (inputIsComplete()) {
                process();
                state = SENDING;
                sk.interestOps(SelectionKey.OP_WRITE);
            }
        }

        void send() throws IOException {
            socket.write(output);
            if (outputIsComplete()) sk.cancel();
        }
    }
}

可以看到Reactor模式將Channel和Acceptor,Handler進行繫結依賴於SelectionKey#attach方法,通過此方法在不同的事件發生時呼叫SelectionKey#attachment方法,獲取到對應的處理程式進行處理。

Reactor由單執行緒執行,通過IO多路複用Selector監聽多個事件是否就緒,得益於Channel提供的非阻塞IO能力,當IO沒有就緒的時候,單執行緒不會阻塞而是繼續處理下一個。

由於其單執行緒的原因,無法利用計算機多核心資源,並且如果讀取請求內容處理的過程存在耗時操作(比如資料庫,rpc等)那麼回導致下一個事件得不到快速的響應。

2.單Reactor多執行緒模型

引入多執行緒解決單執行緒Reactor的不足

可以看到多執行緒模型引入了執行緒池,對於就緒的可讀,可寫IO事件交給執行緒池進行處理。

主要是對單執行緒模型中的Handler進行改造,將處理邏輯提交到執行緒池中。

多執行緒模型涉及到共用資源的使用,不如讀寫Channel依賴的Buffer如何分配。

可以看到多執行緒模型的缺點:執行緒通訊和同步邏輯複雜,需要處理多執行緒安全問題。

3.多Reactor多執行緒模型

在這種模型中,mainReactor負責處理連線建立事件,只需要一個執行緒即可。subReactor負責和建立連線的socket進行資料互動並處理業務邏輯,並且每一個subReactor可持有一個獨立的Selector進行IO多路複用事件監聽。

// SubReactor 池子,負責負載均衡的選擇SubReactor
public class SubReactorPool {
    final static SubReactor[] subReactors;
    static final AtomicInteger count = new AtomicInteger();
    static {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        subReactors = new SubReactor[availableProcessors];
        for (int i = 0; i < subReactors.length; i++) {
            subReactors[i] = new SubReactor();
        }
    }

    static class SubReactor implements Runnable{
        // 業務處理執行緒池
        final static Executor poolExecutor = Executors.newCachedThreadPool();
		// io多路複用
        Selector selector;
		
        SubReactor()  {
            try {
                selector = Selector.open();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        public void registry(SocketChannel socketChannel) throws ClosedChannelException {
            socketChannel.register(selector,SelectionKey.OP_READ);
        }
        @Override
        public void run() {
            while (true){
                try {
                    selector.select();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey sk = iterator.next();
                    if (sk.isReadable()) {
                        poolExecutor.execute(()->new Handler(sk));
                    }
                    // 可寫,。。。。
                    iterator.remove();
                }

            }
        }

    }
	
    
    //選擇合適的SubReactor
    static SubReactor loadBalanceChoose(SocketChannel socketChannel){
        int countInt = count.getAndAdd(1);
        return subReactors[countInt % subReactors.length];
    }
}

多Reactor解決了單個Selector註冊連線,讀寫事件,導致核心輪詢的時候需要判斷太多fd而效率緩慢的問題。

四丶Tomcat中Reactor

Tomcat請求處理流程與原始碼淺析 - Cuzzz - 部落格園 (cnblogs.com)中,說到Tomcat Connector的設計

其中

  • Endpoint:tomcat中沒有這個介面,只有AbstractEndpoint,它負責啟動執行緒來監聽伺服器埠,並且在接受到資料後交給Processor處理
  • Processor:Processor讀取到使用者端請求後按照請求地址對映到具體的容器進行處理,這個過程請求對映,Processor實現請求對映依賴於Mapper物件,在容器發生註冊和登出的時候,MapperListener會監聽到對應的事件,從而來變更Mapper中維護的請求對映資訊。
  • ProtocolHandler:協定處理器,針對不同的IO方式(NIO,BIO等)和不同的協定(Http,AJP)具備不同的實現,ProtocolHandler包含一個Endpoint來開啟埠監聽,並且包含一個Processor用於按照協定讀取資料並將請求交給容器處理。
  • Acceptor:Acceptor實現了Runnable介面,可以作為一個執行緒啟動,使用Socket API監聽指定埠,用於接收使用者請求。
  • Poller:主要用於監測註冊在原始 scoket 上的事件是否發生,Acceptor接受到請求後,會註冊到Poller的佇列中。

下圖展示了Acceptor 和 Poller的共同作業

1.Acceptor 等待使用者端連線

這一步藉助ServerSocketChannel#accept方法,進行等待使用者端連線,Acceptor單執行緒進行監聽。

2.Acceptor選擇Poller進行註冊

這一步設定非阻塞,並且使用計數取模的方式實現多個Poller的負載均衡

然後將事件保證為PollerEvent 提交到Poller的阻塞佇列中

3.Poller 輪詢阻塞佇列中的PollerEvent並註冊到Selector上

輪詢阻塞佇列中的PollerEvent,並且呼叫run方法,run方法會把事件註冊到Poller的Selector上,注意下面的註冊將NioSocketWrapper作為attachment進行了繫結

4.Poller中Selector IO多路複用處理事件,並處理事件

tomcat處理事件的時候,會建立出SocketProcessor進行處理,SocketProcessor是一個Runnable,最後會提交到執行緒池。