在前面的章節中,我們介紹了在netty中可以使用kequeue或者epoll來實現更為高效的native傳輸方式。那麼kequeue和epoll和NIO傳輸協定有什麼不同呢?
本章將會以kequeue為例進行深入探討。
在上面我們介紹的native的例子中,關於kqueue的類有這樣幾個,分別是KQueueEventLoopGroup,KQueueServerSocketChannel和KQueueSocketChannel,通過簡單的替換和新增對應的依賴包,我們可以輕鬆的將普通的NIO netty服務替換成為native的Kqueue服務。
是時候揭開Kqueue的祕密了。
eventLoop和eventLoopGroup是用來接受event和事件處理的。先來看下KQueueEventLoopGroup的定義:
public final class KQueueEventLoopGroup extends MultithreadEventLoopGroup
作為一個MultithreadEventLoopGroup,必須實現一個newChild方法,用來建立child EventLoop。在KQueueEventLoopGroup中,除了建構函式之外,額外需要實現的方法就是newChild:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
Integer maxEvents = (Integer) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new KQueueEventLoop(this, executor, maxEvents,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
newChild中的所有引數都是從KQueueEventLoopGroup的建構函式中傳入的。除了maxEvents,selectStrategyFactory和rejectedExecutionHandler之外,還可以接收taskQueueFactory和tailTaskQueueFactory兩個引數,最後把這些引數都傳到KQueueEventLoop的建構函式中去,最終返回一個KQueueEventLoop物件。
另外在使用KQueueEventLoopGroup之前我們還需要確保Kqueue在系統中是可用的,這個判斷是通過呼叫KQueue.ensureAvailability();
來實現的。
KQueue.ensureAvailability首先判斷是否定義了系統屬性io.netty.transport.noNative,如果定了,說明native transport被禁用了,後續也就沒有必要再進行判斷了。
如果io.netty.transport.noNative沒有被定義,那麼會呼叫Native.newKQueue()
來嘗試從native中獲取一個kqueue的FileDescriptor,如果上述的獲取過程中沒有任何異常,則說明kqueue在native方法中存在,我們可以繼續使用了。
以下是判斷kqueue是否可用的程式碼:
static {
Throwable cause = null;
if (SystemPropertyUtil.getBoolean("io.netty.transport.noNative", false)) {
cause = new UnsupportedOperationException(
"Native transport was explicit disabled with -Dio.netty.transport.noNative=true");
} else {
FileDescriptor kqueueFd = null;
try {
kqueueFd = Native.newKQueue();
} catch (Throwable t) {
cause = t;
} finally {
if (kqueueFd != null) {
try {
kqueueFd.close();
} catch (Exception ignore) {
// ignore
}
}
}
}
UNAVAILABILITY_CAUSE = cause;
}
KQueueEventLoop是從KQueueEventLoopGroup中建立出來的,用來執行具體的IO任務。
先來看一下KQueueEventLoop的定義:
final class KQueueEventLoop extends SingleThreadEventLoop
不管是NIO還是KQueue或者是Epoll,因為使用了更加高階的IO技術,所以他們使用的EventLoop都是SingleThreadEventLoop,也就是說使用單執行緒就夠了。
和KQueueEventLoopGroup一樣,KQueueEventLoop也需要判斷當前的系統環境是否支援kqueue:
static {
KQueue.ensureAvailability();
}
上一節講到了,KQueueEventLoopGroup會呼叫KQueueEventLoop的建構函式來返回一個eventLoop物件, 我們先來看下KQueueEventLoop的建構函式:
KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
this.kqueueFd = Native.newKQueue();
if (maxEvents == 0) {
allowGrowing = true;
maxEvents = 4096;
} else {
allowGrowing = false;
}
this.changeList = new KQueueEventArray(maxEvents);
this.eventList = new KQueueEventArray(maxEvents);
int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
if (result < 0) {
cleanup();
throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
}
}
傳入的maxEvents表示的是這個KQueueEventLoop能夠接受的最大的event個數。如果maxEvents=0,則表示KQueueEventLoop的event容量可以動態擴充套件,並且最大值是4096。否則的話,KQueueEventLoop的event容量不能擴充套件。
maxEvents是作為陣列的大小用來構建changeList和eventList。
KQueueEventLoop中還定義了一個map叫做channels,用來儲存註冊的channels:
private final IntObjectMap<AbstractKQueueChannel> channels = new IntObjectHashMap<AbstractKQueueChannel>(4096);
來看一下channel的add和remote方法:
void add(AbstractKQueueChannel ch) {
assert inEventLoop();
AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch);
assert old == null || !old.isOpen();
}
void remove(AbstractKQueueChannel ch) throws Exception {
assert inEventLoop();
int fd = ch.fd().intValue();
AbstractKQueueChannel old = channels.remove(fd);
if (old != null && old != ch) {
channels.put(fd, old);
assert !ch.isOpen();
} else if (ch.isOpen()) {
ch.unregisterFilters();
}
}
可以看到新增和刪除的都是AbstractKQueueChannel,後面的章節中我們會詳細講解KQueueChannel,這裡我們只需要知道channel map中的key是kequeue中特有的FileDescriptor的int值。
再來看一下EventLoop中最重要的run方法:
protected void run() {
for (;;) {
try {
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
if (wakenUp == 1) {
wakeup();
}
default:
}
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processReady(strategy);
}
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
if (strategy > 0) {
processReady(strategy);
}
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
它的邏輯是先使用selectStrategy.calculateStrategy獲取當前的select strategy,然後根據strategy的值來判斷是否需要執行processReady方法,最後執行runAllTasks,從task queue中拿到要執行的任務去執行。
selectStrategy.calculateStrategy用來判斷當前的select狀態,預設情況下有三個狀態,分別是:SELECT,CONTINUE,BUSY_WAIT。 這三個狀態都是負數:
int SELECT = -1;
int CONTINUE = -2;
int BUSY_WAIT = -3;
分別表示當前的IO在slect的block狀態,或者跳過當前IO的狀態,和正在IO loop pull的狀態。BUSY_WAIT是一個非阻塞的IO PULL,kqueue並不支援,所以會fallback到SELECT。
除了這三個狀態之外,calculateStrategy還會返回一個正值,表示當前要執行的任務的個數。
在run方法中,如果strategy的結果是SELECT,那麼最終會呼叫Native.keventWait方法返回當前ready的events個數,並且將ready的event放到KQueueEventArray的eventList中去。
如果ready的event個數大於零,則會呼叫processReady方法對這些event進行狀態回撥處理。
怎麼處理的呢?下面是處理的核心邏輯:
AbstractKQueueChannel channel = channels.get(fd);
AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) channel.unsafe();
if (filter == Native.EVFILT_WRITE) {
unsafe.writeReady();
} else if (filter == Native.EVFILT_READ) {
unsafe.readReady(eventList.data(i));
} else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
unsafe.readEOF();
}
這裡的fd是從eventList中讀取到的:
final int fd = eventList.fd(i);
根據eventList的fd,我們可以從channels中拿到對應的KQueueChannel,然後根據event的filter狀態來決定KQueueChannel的具體操作,是writeReady,readReady或者readEOF。
最後就是執行runAllTasks方法了,runAllTasks的邏輯很簡單,就是從taskQueue中讀取任務然後執行。
KQueueServerSocketChannel是用在server端的channel:
public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel implements ServerSocketChannel {
KQueueServerSocketChannel繼承自AbstractKQueueServerChannel,除了建構函式之外,最重要的一個方法就是newChildChannel:
@Override
protected Channel newChildChannel(int fd, byte[] address, int offset, int len) throws Exception {
return new KQueueSocketChannel(this, new BsdSocket(fd), address(address, offset, len));
}
這個方法用來建立一個新的child channel。從上面的程式碼中,我們可以看到生成的child channel是一個KQueueSocketChannel的範例。
它的建構函式接受三個引數,分別是parent channel,BsdSocket和InetSocketAddress。
KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remoteAddress) {
super(parent, fd, remoteAddress);
config = new KQueueSocketChannelConfig(this);
}
這裡的fd是socket accept acceptedAddress的結果:
int acceptFd = socket.accept(acceptedAddress);
下面是KQueueSocketChannel的定義:
public final class KQueueSocketChannel extends AbstractKQueueStreamChannel implements SocketChannel {
KQueueSocketChannel和KQueueServerSocketChannel的關係是父子的關係,在KQueueSocketChannel中有一個parent方法,用來返回ServerSocketChannel物件,這也是前面提到的newChildChannel方法中傳入KQueueSocketChannel建構函式中的serverChannel:
public ServerSocketChannel parent() {
return (ServerSocketChannel) super.parent();
}
KQueueSocketChannel還有一個特性就是支援tcp fastopen,它的本質是呼叫BsdSocket的connectx方法,在建立連線的同時傳遞資料:
int bytesSent = socket.connectx(
(InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);
以上就是KqueueEventLoop和KqueueSocketChannel的詳細介紹,基本上和NIO沒有太大的區別,只不過效能根據優秀。
更多內容請參考 http://www.flydean.com/53-1-netty-kqueue-transport/
最通俗的解讀,最深刻的乾貨,最簡潔的教學,眾多你不知道的小技巧等你來發現!
歡迎關注我的公眾號:「程式那些事」,懂技術,更懂你!