From fc66a81a3555d0d892e0e1a15e1d4c99dc1b45cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=9D=E6=9C=9B?= Date: Wed, 7 Jul 2021 17:22:10 +0800 Subject: [PATCH 1/6] record rqeust arrive time and send to blocking queue time --- pom.xml | 2 +- .../com/alipay/remoting/InvokeContext.java | 40 +++++++++----- .../remoting/rpc/RpcClientRemoting.java | 8 ++- .../rpc/protocol/RpcCommandDecoder.java | 10 +++- .../rpc/protocol/RpcCommandDecoderV2.java | 11 +++- .../rpc/protocol/RpcRequestCommand.java | 35 ++++++++++++- .../rpc/protocol/RpcRequestProcessor.java | 9 ++++ .../util/ThreadLocalArriveTimeHolder.java | 52 +++++++++++++++++++ 8 files changed, 145 insertions(+), 22 deletions(-) create mode 100644 src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java diff --git a/pom.xml b/pom.xml index 5982049a..d4d0c5f7 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.alipay.sofa bolt - 1.5.8 + 1.5.9 jar ${project.groupId}:${project.artifactId} diff --git a/src/main/java/com/alipay/remoting/InvokeContext.java b/src/main/java/com/alipay/remoting/InvokeContext.java index 6aae5614..4b8a94f3 100644 --- a/src/main/java/com/alipay/remoting/InvokeContext.java +++ b/src/main/java/com/alipay/remoting/InvokeContext.java @@ -26,28 +26,40 @@ */ public class InvokeContext { // ~~~ invoke context keys of client side - public final static String CLIENT_LOCAL_IP = "bolt.client.local.ip"; - public final static String CLIENT_LOCAL_PORT = "bolt.client.local.port"; - public final static String CLIENT_REMOTE_IP = "bolt.client.remote.ip"; - public final static String CLIENT_REMOTE_PORT = "bolt.client.remote.port"; + public final static String CLIENT_LOCAL_IP = "bolt.client.local.ip"; + public final static String CLIENT_LOCAL_PORT = "bolt.client.local.port"; + public final static String CLIENT_REMOTE_IP = "bolt.client.remote.ip"; + public final static String CLIENT_REMOTE_PORT = "bolt.client.remote.port"; /** time consumed during connection creating, this is a timespan */ - public final static String CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime"; + public final static String CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime"; + public final static String CLIENT_CONN_CREATE_START_IN_NANO = "bolt.client.conn.create.start.nano"; + public final static String CLIENT_CONN_CREATE_END_IN_NANO = "bolt.client.conn.create.end.nano"; // ~~~ invoke context keys of server side - public final static String SERVER_LOCAL_IP = "bolt.server.local.ip"; - public final static String SERVER_LOCAL_PORT = "bolt.server.local.port"; - public final static String SERVER_REMOTE_IP = "bolt.server.remote.ip"; - public final static String SERVER_REMOTE_PORT = "bolt.server.remote.port"; + public final static String SERVER_LOCAL_IP = "bolt.server.local.ip"; + public final static String SERVER_LOCAL_PORT = "bolt.server.local.port"; + public final static String SERVER_REMOTE_IP = "bolt.server.remote.ip"; + public final static String SERVER_REMOTE_PORT = "bolt.server.remote.port"; // ~~~ invoke context keys of bolt client and server side - public final static String BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id"; + public final static String BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id"; /** time consumed start from the time when request arrive, to the time when request be processed, this is a timespan */ - public final static String BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time"; - public final static String BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer"; - public final static String BOLT_CRC_SWITCH = "bolt.invoke.crc.switch"; + public final static String BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time"; + /** time request arrived in nano seconds , collected by System.nanoTime() */ + public final static String BOLT_PROCESS_ARRIVE_HEADER_IN_NANO = "bolt.invoke.request.arrive.header.in.nano"; + public final static String BOLT_PROCESS_ARRIVE_BODY_IN_NANO = "bolt.invoke.request.arrive.body.in.nano"; + + /** time before send request to user thread in nano seconds , collected by System.nanoTime() */ + public final static String BOLT_PROCESS_BEFORE_DISPATCH_IN_NANO = "bolt.invoke.before.dispatch.in.nano"; + + /** time before send request to user thread in nano seconds , collected by System.nanoTime() */ + public final static String BOLT_PROCESS_START_PROCESS_IN_NANO = "bolt.invoke.start.process.in.nano"; + + public final static String BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer"; + public final static String BOLT_CRC_SWITCH = "bolt.invoke.crc.switch"; // ~~~ constants - public final static int INITIAL_SIZE = 8; + public final static int INITIAL_SIZE = 8; /** context */ private ConcurrentHashMap context; diff --git a/src/main/java/com/alipay/remoting/rpc/RpcClientRemoting.java b/src/main/java/com/alipay/remoting/rpc/RpcClientRemoting.java index 5a500e2b..84028ca2 100644 --- a/src/main/java/com/alipay/remoting/rpc/RpcClientRemoting.java +++ b/src/main/java/com/alipay/remoting/rpc/RpcClientRemoting.java @@ -29,7 +29,7 @@ /** * Rpc client remoting - * + * * @author xiaomin.cxm * @version $Id: RpcClientRemoting.java, v 0.1 Apr 14, 2016 11:58:56 AM xiaomin.cxm Exp $ */ @@ -119,6 +119,8 @@ protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext in throws RemotingException, InterruptedException { long start = System.currentTimeMillis(); + long startInNano = System.nanoTime(); + Connection conn; try { conn = this.connectionManager.getAndCreateIfAbsent(url); @@ -126,6 +128,10 @@ protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext in if (null != invokeContext) { invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start)); + invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATE_START_IN_NANO, + startInNano); + invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATE_END_IN_NANO, + System.nanoTime()); } } return conn; diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java index 1618279d..6ec24664 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.util.List; +import com.alipay.remoting.util.ThreadLocalArriveTimeHolder; import org.slf4j.Logger; import com.alipay.remoting.CommandCode; @@ -96,6 +97,9 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro byte[] clazz = null; byte[] header = null; byte[] content = null; + String remoteAddress = ctx.channel().remoteAddress().toString(); + String uniqueKey = remoteAddress + requestId; + ThreadLocalArriveTimeHolder.arrive(uniqueKey); if (in.readableBytes() >= classLen + headerLen + contentLen) { if (classLen > 0) { clazz = new byte[classLen]; @@ -117,7 +121,7 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro if (cmdCode == CommandCode.HEARTBEAT_VALUE) { command = new HeartbeatCommand(); } else { - command = createRequestCommand(cmdCode); + command = createRequestCommand(cmdCode, uniqueKey); } command.setType(type); command.setVersion(ver2); @@ -207,10 +211,12 @@ private ResponseCommand createResponseCommand(short cmdCode) { return command; } - private RpcRequestCommand createRequestCommand(short cmdCode) { + private RpcRequestCommand createRequestCommand(short cmdCode, String key) { RpcRequestCommand command = new RpcRequestCommand(); command.setCmdCode(RpcCommandCode.valueOf(cmdCode)); command.setArriveTime(System.currentTimeMillis()); + command.setArriveHeaderTimeInNano(ThreadLocalArriveTimeHolder.getAndClear(key)); + command.setArriveBodyTimeInNano(System.nanoTime()); return command; } diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java index 7e86ec71..0fa6bac9 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java @@ -20,6 +20,7 @@ import java.util.List; import com.alipay.remoting.log.BoltLoggerFactory; +import com.alipay.remoting.util.ThreadLocalArriveTimeHolder; import org.slf4j.Logger; import com.alipay.remoting.CommandCode; @@ -87,6 +88,7 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro in.readByte(); //protocol code byte version = in.readByte(); //protocol version byte type = in.readByte(); //type + String socketAddress = ctx.channel().remoteAddress().toString(); if (type == RpcCommandType.REQUEST || type == RpcCommandType.REQUEST_ONEWAY) { //decode request if (in.readableBytes() >= RpcProtocolV2.getRequestHeaderLength() - 3) { @@ -103,6 +105,8 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro byte[] header = null; byte[] content = null; + String uniqueKey = socketAddress + requestId; + ThreadLocalArriveTimeHolder.arrive(uniqueKey); // decide the at-least bytes length for each version int lengthAtLeastForV1 = classLen + headerLen + contentLen; boolean crcSwitchOn = ProtocolSwitch.isOn( @@ -139,7 +143,7 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro if (cmdCode == CommandCode.HEARTBEAT_VALUE) { command = new HeartbeatCommand(); } else { - command = createRequestCommand(cmdCode); + command = createRequestCommand(cmdCode, uniqueKey); } command.setType(type); command.setVersion(ver2); @@ -261,10 +265,13 @@ private ResponseCommand createResponseCommand(short cmdCode) { return command; } - private RpcRequestCommand createRequestCommand(short cmdCode) { + private RpcRequestCommand createRequestCommand(short cmdCode, String key) { RpcRequestCommand command = new RpcRequestCommand(); command.setCmdCode(RpcCommandCode.valueOf(cmdCode)); command.setArriveTime(System.currentTimeMillis()); + command.setArriveHeaderTimeInNano(ThreadLocalArriveTimeHolder.getAndClear(key)); + command.setArriveBodyTimeInNano(System.nanoTime()); + return command; } diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestCommand.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestCommand.java index 9787f420..4ef878de 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestCommand.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestCommand.java @@ -36,14 +36,20 @@ */ public class RpcRequestCommand extends RequestCommand { /** For serialization */ - private static final long serialVersionUID = -4602613826188210946L; + private static final long serialVersionUID = -4602613826188210946L; private Object requestObject; private String requestClass; private CustomSerializer customSerializer; private Object requestHeader; - private transient long arriveTime = -1; + private transient long arriveTime = -1; + + private transient long arriveHeaderTimeInNano = -1; + + private transient long arriveBodyTimeInNano = -1; + + private transient long beforeEnterQueueTime = -1; /** * create request command without id @@ -247,4 +253,29 @@ public long getArriveTime() { public void setArriveTime(long arriveTime) { this.arriveTime = arriveTime; } + + public long getArriveHeaderTimeInNano() { + return arriveHeaderTimeInNano; + } + + public void setArriveHeaderTimeInNano(long arriveHeaderTimeInNano) { + this.arriveHeaderTimeInNano = arriveHeaderTimeInNano; + } + + public long getArriveBodyTimeInNano() { + return arriveBodyTimeInNano; + } + + public void setArriveBodyTimeInNano(long arriveBodyTimeInNano) { + this.arriveBodyTimeInNano = arriveBodyTimeInNano; + } + + public long getBeforeEnterQueueTime() { + return beforeEnterQueueTime; + } + + public void setBeforeEnterQueueTime(long beforeEnterQueueTime) { + this.beforeEnterQueueTime = beforeEnterQueueTime; + } + } diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestProcessor.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestProcessor.java index 2b0babe3..6cf258d3 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestProcessor.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcRequestProcessor.java @@ -121,6 +121,7 @@ public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService } // use the final executor dispatch process task + cmd.setBeforeEnterQueueTime(System.nanoTime()); executor.execute(new ProcessTask(ctx, cmd)); } @@ -299,6 +300,14 @@ private void preProcessRemotingContext(RemotingContext ctx, RpcRequestCommand cm ctx.setRpcCommandType(cmd.getType()); ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_WAIT_TIME, currentTimestamp - cmd.getArriveTime()); + ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_ARRIVE_HEADER_IN_NANO, + cmd.getArriveHeaderTimeInNano()); + ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_ARRIVE_BODY_IN_NANO, + cmd.getArriveBodyTimeInNano()); + ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_BEFORE_DISPATCH_IN_NANO, + cmd.getBeforeEnterQueueTime()); + ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_START_PROCESS_IN_NANO, + System.nanoTime()); } /** diff --git a/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java b/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java new file mode 100644 index 00000000..68b67d69 --- /dev/null +++ b/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.remoting.util; + +import io.netty.util.concurrent.FastThreadLocal; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author zhaowang + * @version : ThreadLocalTimeHolder.java, v 0.1 2021年07月01日 3:05 下午 zhaowang + */ +public class ThreadLocalArriveTimeHolder { + private static FastThreadLocal> arriveTimeInNano = new FastThreadLocal>(); + + static { + arriveTimeInNano.set(new HashMap(256)); + } + + public static void arrive(String key) { + + Map map = arriveTimeInNano.get(); + if (map.get(key) == null) { + map.put(key, System.nanoTime()); + } + } + + public static long getAndClear(String key) { + Map map = arriveTimeInNano.get(); + Long result = map.remove(key); + if (result == null) { + return -1; + } + return result; + } + +} \ No newline at end of file From c45d09185c8e9abd81b5cd5fd8499598b90bed8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=9D=E6=9C=9B?= Date: Wed, 7 Jul 2021 19:09:45 +0800 Subject: [PATCH 2/6] add test --- .../util/ThreadLocalArriveTimeHolderTest.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java diff --git a/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java b/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java new file mode 100644 index 00000000..36cdd9ae --- /dev/null +++ b/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.remoting.util; + +import org.junit.Assert; +import org.junit.Test; + +/** + * @author zhaowang + * @version : ThreadLocalArriveTimeHolderTest.java, v 0.1 2021年07月07日 5:45 下午 zhaowang + */ +public class ThreadLocalArriveTimeHolderTest { + + @Test + public void test() { + long start = System.nanoTime(); + ThreadLocalArriveTimeHolder.arrive("a"); + long end = System.nanoTime(); + ThreadLocalArriveTimeHolder.arrive("a"); + long time = ThreadLocalArriveTimeHolder.getAndClear("a"); + Assert.assertTrue(time >= start); + Assert.assertTrue(time <= end); + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear("a")); + } +} \ No newline at end of file From 51fc3076c1a43a3ec859c101c31ece3379589cbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=9D=E6=9C=9B?= Date: Thu, 8 Jul 2021 13:36:38 +0800 Subject: [PATCH 3/6] fix by code review --- .../remoting/config/switches/ProtocolSwitch.java | 4 ++-- .../remoting/rpc/protocol/RpcCommandDecoder.java | 12 +++++++++--- .../remoting/rpc/protocol/RpcCommandDecoderV2.java | 11 ++++++++--- .../util/ThreadLocalArriveTimeHolderTest.java | 6 ++++++ 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java b/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java index b85f29ab..ac91a17a 100644 --- a/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java +++ b/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java @@ -127,8 +127,8 @@ public static byte toByte(BitSet bs) { public static BitSet toBitSet(int value) { if (value < 0 || value > Byte.MAX_VALUE) { throw new IllegalArgumentException( - "The value " + value + " is out of byte range, should be limited between [" - + 0 + "] to [" + Byte.MAX_VALUE + "]"); + "The value " + value + " is out of byte range, should be limited between [" + 0 + + "] to [" + Byte.MAX_VALUE + "]"); } BitSet bs = new BitSet(); int index = 0; diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java index 6ec24664..edefd75a 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java @@ -17,6 +17,7 @@ package com.alipay.remoting.rpc.protocol; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.List; import com.alipay.remoting.util.ThreadLocalArriveTimeHolder; @@ -97,9 +98,14 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro byte[] clazz = null; byte[] header = null; byte[] content = null; - String remoteAddress = ctx.channel().remoteAddress().toString(); - String uniqueKey = remoteAddress + requestId; - ThreadLocalArriveTimeHolder.arrive(uniqueKey); + SocketAddress socketAddress = ctx.channel().remoteAddress(); + String uniqueKey = null; + if(socketAddress != null){ + String remoteAddress = socketAddress.toString(); + uniqueKey = remoteAddress + requestId; + ThreadLocalArriveTimeHolder.arrive(uniqueKey); + } + if (in.readableBytes() >= classLen + headerLen + contentLen) { if (classLen > 0) { clazz = new byte[classLen]; diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java index 0fa6bac9..0d73f0b4 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java @@ -17,6 +17,7 @@ package com.alipay.remoting.rpc.protocol; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.List; import com.alipay.remoting.log.BoltLoggerFactory; @@ -88,7 +89,6 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro in.readByte(); //protocol code byte version = in.readByte(); //protocol version byte type = in.readByte(); //type - String socketAddress = ctx.channel().remoteAddress().toString(); if (type == RpcCommandType.REQUEST || type == RpcCommandType.REQUEST_ONEWAY) { //decode request if (in.readableBytes() >= RpcProtocolV2.getRequestHeaderLength() - 3) { @@ -105,8 +105,13 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro byte[] header = null; byte[] content = null; - String uniqueKey = socketAddress + requestId; - ThreadLocalArriveTimeHolder.arrive(uniqueKey); + SocketAddress socketAddress = ctx.channel().remoteAddress(); + String uniqueKey = null; + if(socketAddress != null){ + String remoteAddress = socketAddress.toString(); + uniqueKey = remoteAddress + requestId; + ThreadLocalArriveTimeHolder.arrive(uniqueKey); + } // decide the at-least bytes length for each version int lengthAtLeastForV1 = classLen + headerLen + contentLen; boolean crcSwitchOn = ProtocolSwitch.isOn( diff --git a/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java b/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java index 36cdd9ae..ff3ede91 100644 --- a/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java +++ b/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java @@ -36,4 +36,10 @@ public void test() { Assert.assertTrue(time <= end); Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear("a")); } + + @Test + public void testRemoveNull() { + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear(null)); + + } } \ No newline at end of file From f429064447209c307a71df29935a087d8590d0b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=9D=E6=9C=9B?= Date: Thu, 8 Jul 2021 14:25:57 +0800 Subject: [PATCH 4/6] fix unit test --- .../rpc/protocol/RpcCommandDecoder.java | 2 +- .../rpc/protocol/RpcCommandDecoderV2.java | 2 +- .../util/ThreadLocalArriveTimeHolder.java | 19 +++++++----- .../util/ThreadLocalArriveTimeHolderTest.java | 31 +++++++++++++++++++ 4 files changed, 45 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java index edefd75a..a116d34e 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java @@ -100,7 +100,7 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro byte[] content = null; SocketAddress socketAddress = ctx.channel().remoteAddress(); String uniqueKey = null; - if(socketAddress != null){ + if (socketAddress != null) { String remoteAddress = socketAddress.toString(); uniqueKey = remoteAddress + requestId; ThreadLocalArriveTimeHolder.arrive(uniqueKey); diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java index 0d73f0b4..849ae82a 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java @@ -107,7 +107,7 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro SocketAddress socketAddress = ctx.channel().remoteAddress(); String uniqueKey = null; - if(socketAddress != null){ + if (socketAddress != null) { String remoteAddress = socketAddress.toString(); uniqueKey = remoteAddress + requestId; ThreadLocalArriveTimeHolder.arrive(uniqueKey); diff --git a/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java b/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java index 68b67d69..479e8871 100644 --- a/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java +++ b/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java @@ -28,20 +28,15 @@ public class ThreadLocalArriveTimeHolder { private static FastThreadLocal> arriveTimeInNano = new FastThreadLocal>(); - static { - arriveTimeInNano.set(new HashMap(256)); - } - public static void arrive(String key) { - - Map map = arriveTimeInNano.get(); + Map map = getArriveTimeMap(); if (map.get(key) == null) { map.put(key, System.nanoTime()); } } public static long getAndClear(String key) { - Map map = arriveTimeInNano.get(); + Map map = getArriveTimeMap(); Long result = map.remove(key); if (result == null) { return -1; @@ -49,4 +44,14 @@ public static long getAndClear(String key) { return result; } + private static Map getArriveTimeMap() { + Map map = arriveTimeInNano.get(); + if (map == null) { + arriveTimeInNano.set(new HashMap(256)); + return arriveTimeInNano.get(); + } else { + return map; + } + } + } \ No newline at end of file diff --git a/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java b/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java index ff3ede91..634b6ce0 100644 --- a/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java +++ b/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java @@ -19,6 +19,9 @@ import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + /** * @author zhaowang * @version : ThreadLocalArriveTimeHolderTest.java, v 0.1 2021年07月07日 5:45 下午 zhaowang @@ -40,6 +43,34 @@ public void test() { @Test public void testRemoveNull() { Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear(null)); + } + + @Test + public void testMultiThread() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + long start = System.nanoTime(); + ThreadLocalArriveTimeHolder.arrive("a"); + long end = System.nanoTime(); + ThreadLocalArriveTimeHolder.arrive("a"); + long time = ThreadLocalArriveTimeHolder.getAndClear("a"); + Assert.assertTrue(time >= start); + Assert.assertTrue(time <= end); + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear("a")); + Runnable runnable = new Runnable() { + @Override + public void run() { + long start = System.nanoTime(); + ThreadLocalArriveTimeHolder.arrive("a"); + long end = System.nanoTime(); + long time = ThreadLocalArriveTimeHolder.getAndClear("a"); + Assert.assertTrue(time >= start); + Assert.assertTrue(time <= end); + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear("a")); + countDownLatch.countDown(); + } + }; + new Thread(runnable).start(); + Assert.assertTrue(countDownLatch.await(2, TimeUnit.SECONDS)); } } \ No newline at end of file From c7f62f865245970810346371ba66bbe810581a9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=9D=E6=9C=9B?= Date: Fri, 9 Jul 2021 15:56:39 +0800 Subject: [PATCH 5/6] use WeakHashMap to avoid memory leak --- .../com/alipay/remoting/BaseRemoting.java | 5 ++++ .../rpc/protocol/RpcCommandDecoder.java | 21 +++++++------- .../rpc/protocol/RpcCommandDecoderV2.java | 22 +++++++-------- .../util/ThreadLocalArriveTimeHolder.java | 27 ++++++++++-------- .../util/ThreadLocalArriveTimeHolderTest.java | 28 +++++++++++-------- 5 files changed, 58 insertions(+), 45 deletions(-) diff --git a/src/main/java/com/alipay/remoting/BaseRemoting.java b/src/main/java/com/alipay/remoting/BaseRemoting.java index 519f890e..b8fd5307 100644 --- a/src/main/java/com/alipay/remoting/BaseRemoting.java +++ b/src/main/java/com/alipay/remoting/BaseRemoting.java @@ -61,6 +61,7 @@ protected RemotingCommand invokeSync(final Connection conn, final RemotingComman final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext()); conn.addInvokeFuture(future); final int requestId = request.getId(); + InvokeContext invokeContext = request.getInvokeContext(); try { conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { @@ -75,6 +76,10 @@ public void operationComplete(ChannelFuture f) throws Exception { } }); + + if (null != invokeContext) { + invokeContext.put("REQUEST_SEND", System.nanoTime()); + } } catch (Exception e) { conn.removeInvokeFuture(requestId); future.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e)); diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java index a116d34e..a00fb58b 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoder.java @@ -17,10 +17,10 @@ package com.alipay.remoting.rpc.protocol; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.List; import com.alipay.remoting.util.ThreadLocalArriveTimeHolder; +import io.netty.channel.Channel; import org.slf4j.Logger; import com.alipay.remoting.CommandCode; @@ -98,13 +98,8 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro byte[] clazz = null; byte[] header = null; byte[] content = null; - SocketAddress socketAddress = ctx.channel().remoteAddress(); - String uniqueKey = null; - if (socketAddress != null) { - String remoteAddress = socketAddress.toString(); - uniqueKey = remoteAddress + requestId; - ThreadLocalArriveTimeHolder.arrive(uniqueKey); - } + Channel channel = ctx.channel(); + ThreadLocalArriveTimeHolder.arrive(channel, requestId); if (in.readableBytes() >= classLen + headerLen + contentLen) { if (classLen > 0) { @@ -124,10 +119,14 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro return; } RequestCommand command; + + long headerArriveTimeInNano = ThreadLocalArriveTimeHolder.getAndClear( + channel, requestId); + if (cmdCode == CommandCode.HEARTBEAT_VALUE) { command = new HeartbeatCommand(); } else { - command = createRequestCommand(cmdCode, uniqueKey); + command = createRequestCommand(cmdCode, headerArriveTimeInNano); } command.setType(type); command.setVersion(ver2); @@ -217,11 +216,11 @@ private ResponseCommand createResponseCommand(short cmdCode) { return command; } - private RpcRequestCommand createRequestCommand(short cmdCode, String key) { + private RpcRequestCommand createRequestCommand(short cmdCode, long headerArriveTimeInNano) { RpcRequestCommand command = new RpcRequestCommand(); command.setCmdCode(RpcCommandCode.valueOf(cmdCode)); command.setArriveTime(System.currentTimeMillis()); - command.setArriveHeaderTimeInNano(ThreadLocalArriveTimeHolder.getAndClear(key)); + command.setArriveHeaderTimeInNano(headerArriveTimeInNano); command.setArriveBodyTimeInNano(System.nanoTime()); return command; } diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java index 849ae82a..c187ce70 100644 --- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java +++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcCommandDecoderV2.java @@ -17,11 +17,11 @@ package com.alipay.remoting.rpc.protocol; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.List; import com.alipay.remoting.log.BoltLoggerFactory; import com.alipay.remoting.util.ThreadLocalArriveTimeHolder; +import io.netty.channel.Channel; import org.slf4j.Logger; import com.alipay.remoting.CommandCode; @@ -105,13 +105,9 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro byte[] header = null; byte[] content = null; - SocketAddress socketAddress = ctx.channel().remoteAddress(); - String uniqueKey = null; - if (socketAddress != null) { - String remoteAddress = socketAddress.toString(); - uniqueKey = remoteAddress + requestId; - ThreadLocalArriveTimeHolder.arrive(uniqueKey); - } + Channel channel = ctx.channel(); + ThreadLocalArriveTimeHolder.arrive(channel, requestId); + // decide the at-least bytes length for each version int lengthAtLeastForV1 = classLen + headerLen + contentLen; boolean crcSwitchOn = ProtocolSwitch.isOn( @@ -144,11 +140,15 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) thro in.resetReaderIndex(); return; } + + long headerArriveTimeInNano = ThreadLocalArriveTimeHolder.getAndClear( + channel, requestId); + RequestCommand command; if (cmdCode == CommandCode.HEARTBEAT_VALUE) { command = new HeartbeatCommand(); } else { - command = createRequestCommand(cmdCode, uniqueKey); + command = createRequestCommand(cmdCode, headerArriveTimeInNano); } command.setType(type); command.setVersion(ver2); @@ -270,11 +270,11 @@ private ResponseCommand createResponseCommand(short cmdCode) { return command; } - private RpcRequestCommand createRequestCommand(short cmdCode, String key) { + private RpcRequestCommand createRequestCommand(short cmdCode, long headerArriveTimeInNano) { RpcRequestCommand command = new RpcRequestCommand(); command.setCmdCode(RpcCommandCode.valueOf(cmdCode)); command.setArriveTime(System.currentTimeMillis()); - command.setArriveHeaderTimeInNano(ThreadLocalArriveTimeHolder.getAndClear(key)); + command.setArriveHeaderTimeInNano(headerArriveTimeInNano); command.setArriveBodyTimeInNano(System.nanoTime()); return command; diff --git a/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java b/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java index 479e8871..daee05f8 100644 --- a/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java +++ b/src/main/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolder.java @@ -16,27 +16,29 @@ */ package com.alipay.remoting.util; +import io.netty.channel.Channel; import io.netty.util.concurrent.FastThreadLocal; import java.util.HashMap; import java.util.Map; +import java.util.WeakHashMap; /** * @author zhaowang * @version : ThreadLocalTimeHolder.java, v 0.1 2021年07月01日 3:05 下午 zhaowang */ public class ThreadLocalArriveTimeHolder { - private static FastThreadLocal> arriveTimeInNano = new FastThreadLocal>(); + private static FastThreadLocal>> arriveTimeInNano = new FastThreadLocal>>(); - public static void arrive(String key) { - Map map = getArriveTimeMap(); + public static void arrive(Channel channel, Integer key) { + Map map = getArriveTimeMap(channel); if (map.get(key) == null) { map.put(key, System.nanoTime()); } } - public static long getAndClear(String key) { - Map map = getArriveTimeMap(); + public static long getAndClear(Channel channel, Integer key) { + Map map = getArriveTimeMap(channel); Long result = map.remove(key); if (result == null) { return -1; @@ -44,14 +46,17 @@ public static long getAndClear(String key) { return result; } - private static Map getArriveTimeMap() { - Map map = arriveTimeInNano.get(); + private static Map getArriveTimeMap(Channel channel) { + WeakHashMap> map = arriveTimeInNano.get(); if (map == null) { - arriveTimeInNano.set(new HashMap(256)); - return arriveTimeInNano.get(); - } else { - return map; + arriveTimeInNano.set(new WeakHashMap>(256)); + map = arriveTimeInNano.get(); } + Map subMap = map.get(channel); + if (subMap == null) { + map.put(channel, new HashMap()); + } + return map.get(channel); } } \ No newline at end of file diff --git a/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java b/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java index 634b6ce0..8ac5802e 100644 --- a/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java +++ b/src/test/java/com/alipay/remoting/util/ThreadLocalArriveTimeHolderTest.java @@ -16,6 +16,7 @@ */ package com.alipay.remoting.util; +import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Assert; import org.junit.Test; @@ -30,43 +31,46 @@ public class ThreadLocalArriveTimeHolderTest { @Test public void test() { + EmbeddedChannel channel = new EmbeddedChannel(); long start = System.nanoTime(); - ThreadLocalArriveTimeHolder.arrive("a"); + ThreadLocalArriveTimeHolder.arrive(channel, 1); long end = System.nanoTime(); - ThreadLocalArriveTimeHolder.arrive("a"); - long time = ThreadLocalArriveTimeHolder.getAndClear("a"); + ThreadLocalArriveTimeHolder.arrive(channel, 1); + long time = ThreadLocalArriveTimeHolder.getAndClear(channel, 1); Assert.assertTrue(time >= start); Assert.assertTrue(time <= end); - Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear("a")); + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear(channel, 1)); } @Test public void testRemoveNull() { - Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear(null)); + EmbeddedChannel channel = new EmbeddedChannel(); + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear(channel, 1)); } @Test public void testMultiThread() throws InterruptedException { + final EmbeddedChannel channel = new EmbeddedChannel(); final CountDownLatch countDownLatch = new CountDownLatch(1); long start = System.nanoTime(); - ThreadLocalArriveTimeHolder.arrive("a"); + ThreadLocalArriveTimeHolder.arrive(channel, 1); long end = System.nanoTime(); - ThreadLocalArriveTimeHolder.arrive("a"); - long time = ThreadLocalArriveTimeHolder.getAndClear("a"); + ThreadLocalArriveTimeHolder.arrive(channel, 1); + long time = ThreadLocalArriveTimeHolder.getAndClear(channel, 1); Assert.assertTrue(time >= start); Assert.assertTrue(time <= end); - Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear("a")); + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear(channel, 1)); Runnable runnable = new Runnable() { @Override public void run() { long start = System.nanoTime(); - ThreadLocalArriveTimeHolder.arrive("a"); + ThreadLocalArriveTimeHolder.arrive(channel, 1); long end = System.nanoTime(); - long time = ThreadLocalArriveTimeHolder.getAndClear("a"); + long time = ThreadLocalArriveTimeHolder.getAndClear(channel, 1); Assert.assertTrue(time >= start); Assert.assertTrue(time <= end); - Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear("a")); + Assert.assertEquals(-1, ThreadLocalArriveTimeHolder.getAndClear(channel, 1)); countDownLatch.countDown(); } }; From 55282376e748f73d35a46114d9edb86d8b41035f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=9D=E6=9C=9B?= Date: Thu, 15 Jul 2021 11:56:38 +0800 Subject: [PATCH 6/6] support record client send time --- src/main/java/com/alipay/remoting/BaseRemoting.java | 9 ++++++++- src/main/java/com/alipay/remoting/InvokeContext.java | 7 +++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/alipay/remoting/BaseRemoting.java b/src/main/java/com/alipay/remoting/BaseRemoting.java index b8fd5307..cd30c56d 100644 --- a/src/main/java/com/alipay/remoting/BaseRemoting.java +++ b/src/main/java/com/alipay/remoting/BaseRemoting.java @@ -62,6 +62,9 @@ protected RemotingCommand invokeSync(final Connection conn, final RemotingComman conn.addInvokeFuture(future); final int requestId = request.getId(); InvokeContext invokeContext = request.getInvokeContext(); + if (null != invokeContext) { + invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_BEFORE_SEND, System.nanoTime()); + } try { conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { @@ -78,7 +81,7 @@ public void operationComplete(ChannelFuture f) throws Exception { }); if (null != invokeContext) { - invokeContext.put("REQUEST_SEND", System.nanoTime()); + invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_AFTER_SEND, System.nanoTime()); } } catch (Exception e) { conn.removeInvokeFuture(requestId); @@ -87,6 +90,10 @@ public void operationComplete(ChannelFuture f) throws Exception { } RemotingCommand response = future.waitResponse(timeoutMillis); + if (null != invokeContext) { + invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_RECEIVED, System.nanoTime()); + } + if (response == null) { conn.removeInvokeFuture(requestId); response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress()); diff --git a/src/main/java/com/alipay/remoting/InvokeContext.java b/src/main/java/com/alipay/remoting/InvokeContext.java index 4b8a94f3..13beede5 100644 --- a/src/main/java/com/alipay/remoting/InvokeContext.java +++ b/src/main/java/com/alipay/remoting/InvokeContext.java @@ -58,6 +58,13 @@ public class InvokeContext { public final static String BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer"; public final static String BOLT_CRC_SWITCH = "bolt.invoke.crc.switch"; + /** time before send request to net in nano seconds , collected by System.nanoTime() **/ + public final static String BOLT_PROCESS_CLIENT_BEFORE_SEND = "bolt.invoke.client.before.send"; + /** time after send request to net in nano seconds , collected by System.nanoTime() **/ + public final static String BOLT_PROCESS_CLIENT_AFTER_SEND = "bolt.invoke.client.after.send"; + /** time after receive response from server in nano seconds , collected by System.nanoTime() **/ + public final static String BOLT_PROCESS_CLIENT_RECEIVED = "bolt.invoke.client.received"; + // ~~~ constants public final static int INITIAL_SIZE = 8;