基於java gRPC 1.24.2 分析
// io.grpc.internal.GrpcUtil
public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);
io.grpc.internal.KeepAliveManager
, 用於管理KeepAlive狀態,ping任務排程與執行.NettyChannelBuilder builder = NettyChannelBuilder.forTarget(String.format("grpc://%s", provider)) //
.usePlaintext() //
.defaultLoadBalancingPolicy(props.getBalancePolicy()) //
.maxInboundMessageSize(props.getMaxInboundMessageSize()) //
.keepAliveTime(1,TimeUnit.MINUTES)
.keepAliveWithoutCalls(true)
.keepAliveTimeout(10,TimeUnit.SECONDS)
.intercept(channelManager.getInterceptors()); //
keepAliveTime
,keepAliveTimeout
,keepAliveWithoutCalls
。這三個變數有什麼作用呢?Create & Start
NettyChannelBuilder
-----> NettyTransportFactory
---------> NettyClientTransport
-------------> KeepAliveManager & NettyClientHandler
響應各種事件
當Active、Idle、DataReceived、Started、Termination事件發生時,更改KeepAlive狀態,排程傳送ping任務。
// 只擷取關鍵程式碼,詳細程式碼請看`NettyServerBuilder`
ServerImpl server = new ServerImpl(
this,
buildTransportServers(getTracerFactories()),
Context.ROOT);
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
notifyTarget.notifyOnBuild(server);
}
return server;
// 在buildTransportServers方法中建立NettyServer
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());
transportServers.add(transportServer);
}
Create & Start
NettyServerBuilder
---> NettyServer
---------> NettyServerTransport
-------------> NettyServerHandler
-----------------> KeepAliveEnforcer
連線準備就緒
呼叫 io.netty.channel.ChannelHandler的handlerAdded
方法,關於此方法的描述:
Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.
NettyServerHandler(handlerAdded)
---> 建立KeepAliveManager物件
響應各種事件
同Client
在上面Server端的簡要時序圖中,可以看見,server端有一個特有的io.grpc.netty.KeepAliveEnforcer
類
此類的作用是監控clinet ping的頻率,以確保其在一個合理範圍內。
package io.grpc.netty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue;
/** Monitors the client's PING usage to make sure the rate is permitted. */
class KeepAliveEnforcer {
@VisibleForTesting
static final int MAX_PING_STRIKES = 2;
@VisibleForTesting
static final long IMPLICIT_PERMIT_TIME_NANOS = TimeUnit.HOURS.toNanos(2);
private final boolean permitWithoutCalls;
private final long minTimeNanos;
private final Ticker ticker;
private final long epoch;
private long lastValidPingTime;
private boolean hasOutstandingCalls;
private int pingStrikes;
public KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit) {
this(permitWithoutCalls, minTime, unit, SystemTicker.INSTANCE);
}
@VisibleForTesting
KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit, Ticker ticker) {
Preconditions.checkArgument(minTime >= 0, "minTime must be non-negative");
this.permitWithoutCalls = permitWithoutCalls;
this.minTimeNanos = Math.min(unit.toNanos(minTime), IMPLICIT_PERMIT_TIME_NANOS);
this.ticker = ticker;
this.epoch = ticker.nanoTime();
lastValidPingTime = epoch;
}
/** Returns {@code false} when client is misbehaving and should be disconnected. */
@CheckReturnValue
public boolean pingAcceptable() {
long now = ticker.nanoTime();
boolean valid;
if (!hasOutstandingCalls && !permitWithoutCalls) {
valid = compareNanos(lastValidPingTime + IMPLICIT_PERMIT_TIME_NANOS, now) <= 0;
} else {
valid = compareNanos(lastValidPingTime + minTimeNanos, now) <= 0;
}
if (!valid) {
pingStrikes++;
return !(pingStrikes > MAX_PING_STRIKES);
} else {
lastValidPingTime = now;
return true;
}
}
/**
* Reset any counters because PINGs are allowed in response to something sent. Typically called
* when sending HEADERS and DATA frames.
*/
public void resetCounters() {
lastValidPingTime = epoch;
pingStrikes = 0;
}
/** There are outstanding RPCs on the transport. */
public void onTransportActive() {
hasOutstandingCalls = true;
}
/** There are no outstanding RPCs on the transport. */
public void onTransportIdle() {
hasOutstandingCalls = false;
}
/**
* Positive when time1 is greater; negative when time2 is greater; 0 when equal. It is important
* to use something like this instead of directly comparing nano times. See {@link
* System#nanoTime}.
*/
private static long compareNanos(long time1, long time2) {
// Possibility of overflow/underflow is on purpose and necessary for correctness
return time1 - time2;
}
@VisibleForTesting
interface Ticker {
long nanoTime();
}
@VisibleForTesting
static class SystemTicker implements Ticker {
public static final SystemTicker INSTANCE = new SystemTicker();
@Override
public long nanoTime() {
return System.nanoTime();
}
}
}
pingAcceptable
方法,此方法是判斷是否接受client ping。lastValidPingTime
是上次client valid ping的時間, 連線建立時此時間等於KeepAliveEnforcer物件建立的時間。當client ping有效時,其等於當時ping的時間hasOutstandingCalls
其初始值為false,當連線activie時,其值為true,當連線idle時,其值為false。如果grpc呼叫為阻塞時呼叫,則呼叫時連線變為active,呼叫完成,連線變為idle.permitWithoutCalls
其值是建立NettyServer時傳入,預設為false.IMPLICIT_PERMIT_TIME_NANOS
其值為常數,2hminTimeNanos
其值是建立NettyServer時傳入,預設為5min.MAX_PING_STRIKES
其值為常數2resetCounters
方法是當grpc當中有資料時會被呼叫,即有grpc呼叫時lastValidPingTime和pingStrikes會被重置。permitWithoutCalls
值需要設定為true,而且cient keepAliveTime需要>=minTimeNanos