diff --git a/pom.xml b/pom.xml index 5982049a..d4d0c5f7 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.alipay.sofa bolt - 1.5.8 + 1.5.9 jar ${project.groupId}:${project.artifactId} diff --git a/src/main/java/com/alipay/remoting/BaseRemoting.java b/src/main/java/com/alipay/remoting/BaseRemoting.java index 519f890e..cd30c56d 100644 --- a/src/main/java/com/alipay/remoting/BaseRemoting.java +++ b/src/main/java/com/alipay/remoting/BaseRemoting.java @@ -61,6 +61,10 @@ protected RemotingCommand invokeSync(final Connection conn, final RemotingComman final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext()); conn.addInvokeFuture(future); final int requestId = request.getId(); + InvokeContext invokeContext = request.getInvokeContext(); + if (null != invokeContext) { + invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_BEFORE_SEND, System.nanoTime()); + } try { conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { @@ -75,6 +79,10 @@ public void operationComplete(ChannelFuture f) throws Exception { } }); + + if (null != invokeContext) { + invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_AFTER_SEND, System.nanoTime()); + } } catch (Exception e) { conn.removeInvokeFuture(requestId); future.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e)); @@ -82,6 +90,10 @@ public void operationComplete(ChannelFuture f) throws Exception { } RemotingCommand response = future.waitResponse(timeoutMillis); + if (null != invokeContext) { + invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_RECEIVED, System.nanoTime()); + } + if (response == null) { conn.removeInvokeFuture(requestId); response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress()); diff --git a/src/main/java/com/alipay/remoting/InvokeContext.java b/src/main/java/com/alipay/remoting/InvokeContext.java index 6aae5614..13beede5 100644 --- a/src/main/java/com/alipay/remoting/InvokeContext.java +++ b/src/main/java/com/alipay/remoting/InvokeContext.java @@ -26,28 +26,47 @@ */ public class InvokeContext { // ~~~ invoke context keys of client side - public final static String CLIENT_LOCAL_IP = "bolt.client.local.ip"; - public final static String CLIENT_LOCAL_PORT = "bolt.client.local.port"; - public final static String CLIENT_REMOTE_IP = "bolt.client.remote.ip"; - public final static String CLIENT_REMOTE_PORT = "bolt.client.remote.port"; + public final static String CLIENT_LOCAL_IP = "bolt.client.local.ip"; + public final static String CLIENT_LOCAL_PORT = "bolt.client.local.port"; + public final static String CLIENT_REMOTE_IP = "bolt.client.remote.ip"; + public final static String CLIENT_REMOTE_PORT = "bolt.client.remote.port"; /** time consumed during connection creating, this is a timespan */ - public final static String CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime"; + public final static String CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime"; + public final static String CLIENT_CONN_CREATE_START_IN_NANO = "bolt.client.conn.create.start.nano"; + public final static String CLIENT_CONN_CREATE_END_IN_NANO = "bolt.client.conn.create.end.nano"; // ~~~ invoke context keys of server side - public final static String SERVER_LOCAL_IP = "bolt.server.local.ip"; - public final static String SERVER_LOCAL_PORT = "bolt.server.local.port"; - public final static String SERVER_REMOTE_IP = "bolt.server.remote.ip"; - public final static String SERVER_REMOTE_PORT = "bolt.server.remote.port"; + public final static String SERVER_LOCAL_IP = "bolt.server.local.ip"; + public final static String SERVER_LOCAL_PORT = "bolt.server.local.port"; + public final static String SERVER_REMOTE_IP = "bolt.server.remote.ip"; + public final static String SERVER_REMOTE_PORT = "bolt.server.remote.port"; // ~~~ invoke context keys of bolt client and server side - public final static String BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id"; + public final static String BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id"; /** time consumed start from the time when request arrive, to the time when request be processed, this is a timespan */ - public final static String BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time"; - public final static String BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer"; - public final static String BOLT_CRC_SWITCH = "bolt.invoke.crc.switch"; + public final static String BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time"; + /** time request arrived in nano seconds , collected by System.nanoTime() */ + public final static String BOLT_PROCESS_ARRIVE_HEADER_IN_NANO = "bolt.invoke.request.arrive.header.in.nano"; + public final static String BOLT_PROCESS_ARRIVE_BODY_IN_NANO = "bolt.invoke.request.arrive.body.in.nano"; + + /** time before send request to user thread in nano seconds , collected by System.nanoTime() */ + public final static String BOLT_PROCESS_BEFORE_DISPATCH_IN_NANO = "bolt.invoke.before.dispatch.in.nano"; + + /** time before send request to user thread in nano seconds , collected by System.nanoTime() */ + public final static String BOLT_PROCESS_START_PROCESS_IN_NANO = "bolt.invoke.start.process.in.nano"; + + public final static String BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer"; + public final static String BOLT_CRC_SWITCH = "bolt.invoke.crc.switch"; + + /** time before send request to net in nano seconds , collected by System.nanoTime() **/ + public final static String BOLT_PROCESS_CLIENT_BEFORE_SEND = "bolt.invoke.client.before.send"; + /** time after send request to net in nano seconds , collected by System.nanoTime() **/ + public final static String BOLT_PROCESS_CLIENT_AFTER_SEND = "bolt.invoke.client.after.send"; + /** time after receive response from server in nano seconds , collected by System.nanoTime() **/ + public final static String BOLT_PROCESS_CLIENT_RECEIVED = "bolt.invoke.client.received"; // ~~~ constants - public final static int INITIAL_SIZE = 8; + public final static int INITIAL_SIZE = 8; /** context */ private ConcurrentHashMap context; diff --git a/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java b/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java index b85f29ab..ac91a17a 100644 --- a/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java +++ b/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java @@ -127,8 +127,8 @@ public static byte toByte(BitSet bs) { public static BitSet toBitSet(int value) { if (value < 0 || value > Byte.MAX_VALUE) { throw new IllegalArgumentException( - "The value " + value + " is out of byte range, should be limited between [" - + 0 + "] to [" + Byte.MAX_VALUE + "]"); + "The value " + value + " is out of byte range, should be limited between [" + 0 + + "] to [" + Byte.MAX_VALUE + "]"); } BitSet bs = new BitSet(); int index = 0; diff --git a/src/main/java/com/alipay/remoting/rpc/RpcClientRemoting.java b/src/main/java/com/alipay/remoting/rpc/RpcClientRemoting.java index 5a500e2b..84028ca2 100644 --- a/src/main/java/com/alipay/remoting/rpc/RpcClientRemoting.java +++ b/src/main/java/com/alipay/remoting/rpc/RpcClientRemoting.java @@ -29,7 +29,7 @@ /** * Rpc client remoting - * + * * @author xiaomin.cxm * @version $Id: RpcClientRemoting.java, v 0.1 Apr 14, 2016 11:58:56 AM xiaomin.cxm Exp $ */ @@ -119,6 +119,8 @@ protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext in throws RemotingException, InterruptedException { long start = System.currentTimeMillis(); + long startInNano = System.nanoTime(); + Connection conn; try { conn = this.connectionManager.getAndCreateIfAbsent(url); @@ -126,6 +128,10 @@ protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext in if (null != invokeContext) { invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start)); + invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATE_START_IN_NANO, + startInNano); + invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATE_END_IN_NANO, + System.nanoTime()); } } return conn; diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java index 1618279d..a00fb58b 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java @@ -19,6 +19,8 @@ import java.net.InetSocketAddress; import java.util.List; +import com.alipay.remoting.util.ThreadLocalArriveTimeHolder; +import io.netty.channel.Channel; import org.slf4j.Logger; import com.alipay.remoting.CommandCode; @@ -96,6 +98,9 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro byte[] clazz = null; byte[] header = null; byte[] content = null; + Channel channel = ctx.channel(); + ThreadLocalArriveTimeHolder.arrive(channel, requestId); + if (in.readableBytes() >= classLen + headerLen + contentLen) { if (classLen > 0) { clazz = new byte[classLen]; @@ -114,10 +119,14 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro return; } RequestCommand command; + + long headerArriveTimeInNano = ThreadLocalArriveTimeHolder.getAndClear( + channel, requestId); + if (cmdCode == CommandCode.HEARTBEAT_VALUE) { command = new HeartbeatCommand(); } else { - command = createRequestCommand(cmdCode); + command = createRequestCommand(cmdCode, headerArriveTimeInNano); } command.setType(type); command.setVersion(ver2); @@ -207,10 +216,12 @@ private ResponseCommand createResponseCommand(short cmdCode) { return command; } - private RpcRequestCommand createRequestCommand(short cmdCode) { + private RpcRequestCommand createRequestCommand(short cmdCode, long headerArriveTimeInNano) { RpcRequestCommand command = new RpcRequestCommand(); command.setCmdCode(RpcCommandCode.valueOf(cmdCode)); command.setArriveTime(System.currentTimeMillis()); + command.setArriveHeaderTimeInNano(headerArriveTimeInNano); + command.setArriveBodyTimeInNano(System.nanoTime()); return command; } diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java index 7e86ec71..c187ce70 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java @@ -20,6 +20,8 @@ import java.util.List; import com.alipay.remoting.log.BoltLoggerFactory; +import com.alipay.remoting.util.ThreadLocalArriveTimeHolder; +import io.netty.channel.Channel; import org.slf4j.Logger; import com.alipay.remoting.CommandCode; @@ -103,6 +105,9 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro byte[] header = null; byte[] content = null; + Channel channel = ctx.channel(); + ThreadLocalArriveTimeHolder.arrive(channel, requestId); + // decide the at-least bytes length for each version int lengthAtLeastForV1 = classLen + headerLen + contentLen; boolean crcSwitchOn = ProtocolSwitch.isOn( @@ -135,11 +140,15 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro in.resetReaderIndex(); return; } + + long headerArriveTimeInNano = ThreadLocalArriveTimeHolder.getAndClear( + channel, requestId); + RequestCommand command; if (cmdCode == CommandCode.HEARTBEAT_VALUE) { command = new HeartbeatCommand(); } else { - command = createRequestCommand(cmdCode); + command = createRequestCommand(cmdCode, headerArriveTimeInNano); } command.setType(type); command.setVersion(ver2); @@ -261,10 +270,13 @@ private ResponseCommand createResponseCommand(short cmdCode) { return command; } - private RpcRequestCommand createRequestCommand(short cmdCode) { + private RpcRequestCommand createRequestCommand(short cmdCode, long headerArriveTimeInNano) { RpcRequestCommand command = new RpcRequestCommand(); command.setCmdCode(RpcCommandCode.valueOf(cmdCode)); command.setArriveTime(System.currentTimeMillis()); + command.setArriveHeaderTimeInNano(headerArriveTimeInNano); + command.setArriveBodyTimeInNano(System.nanoTime()); + return command; } diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestCommand.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestCommand.java index 9787f420..4ef878de 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestCommand.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestCommand.java @@ -36,14 +36,20 @@ */ public class RpcRequestCommand extends RequestCommand { /** For serialization */ - private static final long serialVersionUID = -4602613826188210946L; + private static final long serialVersionUID = -4602613826188210946L; private Object requestObject; private String requestClass; private CustomSerializer customSerializer; private Object requestHeader; - private transient long arriveTime = -1; + private transient long arriveTime = -1; + + private transient long arriveHeaderTimeInNano = -1; + + private transient long arriveBodyTimeInNano = -1; + + private transient long beforeEnterQueueTime = -1; /** * create request command without id @@ -247,4 +253,29 @@ public long getArriveTime() { public void setArriveTime(long arriveTime) { this.arriveTime = arriveTime; } + + public long getArriveHeaderTimeInNano() { + return arriveHeaderTimeInNano; + } + + public void setArriveHeaderTimeInNano(long arriveHeaderTimeInNano) { + this.arriveHeaderTimeInNano = arriveHeaderTimeInNano; + } + + public long getArriveBodyTimeInNano() { + return arriveBodyTimeInNano; + } + + public void setArriveBodyTimeInNano(long arriveBodyTimeInNano) { + this.arriveBodyTimeInNano = arriveBodyTimeInNano; + } + + public long getBeforeEnterQueueTime() { + return beforeEnterQueueTime; + } + + public void setBeforeEnterQueueTime(long beforeEnterQueueTime) { + this.beforeEnterQueueTime = beforeEnterQueueTime; + } + } diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestProcessor.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestProcessor.java index 2b0babe3..6cf258d3 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestProcessor.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestProcessor.java @@ -121,6 +121,7 @@ public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService } // use the final executor dispatch process task + cmd.setBeforeEnterQueueTime(System.nanoTime()); executor.execute(new ProcessTask(ctx, cmd)); } @@ -299,6 +300,14 @@ private void preProcessRemotingContext(RemotingContext ctx, RpcRequestCommand cm ctx.setRpcCommandType(cmd.getType()); ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_WAIT_TIME, currentTimestamp - cmd.getArriveTime()); + ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_ARRIVE_HEADER_IN_NANO, + cmd.getArriveHeaderTimeInNano()); + ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_ARRIVE_BODY_IN_NANO, + cmd.getArriveBodyTimeInNano()); + ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_BEFORE_DISPATCH_IN_NANO, + cmd.getBeforeEnterQueueTime()); + ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_START_PROCESS_IN_NANO, + System.nanoTime()); } /** diff --git a/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java b/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java new file mode 100644 index 00000000..daee05f8 --- /dev/null +++ b/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.remoting.util; + +import io.netty.channel.Channel; +import io.netty.util.concurrent.FastThreadLocal; + +import java.util.HashMap; +import java.util.Map; +import java.util.WeakHashMap; + +/** + * @author zhaowang + * @version : ThreadLocalTimeHolder.java, v 0.1 2021年07月01日 3:05 下午 zhaowang + */ +public class ThreadLocalArriveTimeHolder { + private static FastThreadLocal>> arriveTimeInNano = new FastThreadLocal>>(); + + public static void arrive(Channel channel, Integer key) { + Map map = getArriveTimeMap(channel); + if (map.get(key) == null) { + map.put(key, System.nanoTime()); + } + } + + public static long getAndClear(Channel channel, Integer key) { + Map map = getArriveTimeMap(channel); + Long result = map.remove(key); + if (result == null) { + return -1; + } + return result; + } + + private static Map getArriveTimeMap(Channel channel) { + WeakHashMap> map = arriveTimeInNano.get(); + if (map == null) { + arriveTimeInNano.set(new WeakHashMap>(256)); + map = arriveTimeInNano.get(); + } + Map subMap = map.get(channel); + if (subMap == null) { + map.put(channel, new HashMap()); + } + return map.get(channel); + } + +} \ No newline at end of file diff --git a/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java b/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java new file mode 100644 index 00000000..8ac5802e --- /dev/null +++ b/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.remoting.util; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @author zhaowang + * @version : ThreadLocalArriveTimeHolderTest.java, v 0.1 2021年07月07日 5:45 下午 zhaowang + */ +public class ThreadLocalArriveTimeHolderTest { + + @Test + public void test() { + EmbeddedChannel channel = new EmbeddedChannel(); + long start = System.nanoTime(); + ThreadLocalArriveTimeHolder.arrive(channel, 1); + long end = System.nanoTime(); + ThreadLocalArriveTimeHolder.arrive(channel, 1); + long time = ThreadLocalArriveTimeHolder.getAndClear(channel, 1); + Assert.assertTrue(time >= start); + Assert.assertTrue(time <= end); + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear(channel, 1)); + } + + @Test + public void testRemoveNull() { + EmbeddedChannel channel = new EmbeddedChannel(); + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear(channel, 1)); + } + + @Test + public void testMultiThread() throws InterruptedException { + final EmbeddedChannel channel = new EmbeddedChannel(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + long start = System.nanoTime(); + ThreadLocalArriveTimeHolder.arrive(channel, 1); + long end = System.nanoTime(); + ThreadLocalArriveTimeHolder.arrive(channel, 1); + long time = ThreadLocalArriveTimeHolder.getAndClear(channel, 1); + Assert.assertTrue(time >= start); + Assert.assertTrue(time <= end); + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear(channel, 1)); + + Runnable runnable = new Runnable() { + @Override + public void run() { + long start = System.nanoTime(); + ThreadLocalArriveTimeHolder.arrive(channel, 1); + long end = System.nanoTime(); + long time = ThreadLocalArriveTimeHolder.getAndClear(channel, 1); + Assert.assertTrue(time >= start); + Assert.assertTrue(time <= end); + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear(channel, 1)); + countDownLatch.countDown(); + } + }; + new Thread(runnable).start(); + Assert.assertTrue(countDownLatch.await(2, TimeUnit.SECONDS)); + } +} \ No newline at end of file