diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index f8d485f3..3434b721 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Created by weijiesun on 17-9-13. @@ -62,6 +63,8 @@ public void initChannel(SocketChannel ch) { pipeline.addLast("ClientHandler", new ReplicaSession.DefaultHandler()); } }); + + this.firstRecentTimedOutMs = new AtomicLong(0); } // You can specify a message response filter with constructor or with "setMessageResponseFilter" function. @@ -71,6 +74,7 @@ public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTi this(address, rpcGroup, socketTimeout); this.filter = filter; } + public void setMessageResponseFilter(MessageResponseFilter filter) { this.filter = filter; } @@ -220,8 +224,25 @@ private void tryNotifyWithSequenceID( entry.timeoutTask.cancel(true); entry.op.rpc_error.errno = errno; entry.callback.run(); - } - else { + + if (errno == error_types.ERR_TIMEOUT) { + long firstTs = firstRecentTimedOutMs.get(); + if (firstTs == 0) { + // it is the first timeout in the window. + firstRecentTimedOutMs.set(System.currentTimeMillis()); + } else if (System.currentTimeMillis() - firstTs >= sessionResetTimeWindowMs) { + // ensure that closeSession() will be invoked only once. + if (firstRecentTimedOutMs.compareAndSet(firstTs, 0)) { + logger.warn("{}: actively close the session because it's not responding for {} seconds", + name(), + sessionResetTimeWindowMs / 1000); + closeSession(); + } + } + } else { + firstRecentTimedOutMs.set(0); + } + } else { logger.warn("{}: {} is removed by others, current error {}, isTimeoutTask {}", name(), seqID, errno.toString(), isTimeoutTask); } @@ -305,11 +326,19 @@ private final static class VolatileFields { public ConnState state = ConnState.DISCONNECTED; public Channel nettyChannel = null; } + private volatile VolatileFields fields = new VolatileFields(); - private rpc_address address; + private final rpc_address address; private Bootstrap boot; private EventLoopGroup rpcGroup; + // Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs` + // are timed out, in that case we suspect that the server is unavailable. + + // Timestamp of the first timed out rpc. + private AtomicLong firstRecentTimedOutMs; + private static final long sessionResetTimeWindowMs = 10 * 1000; // 10s + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSession.class); }