diff --git a/.github/workflows/plugins-test.2.yaml b/.github/workflows/plugins-test.2.yaml
index a439aa44bb..ccb6a7865d 100644
--- a/.github/workflows/plugins-test.2.yaml
+++ b/.github/workflows/plugins-test.2.yaml
@@ -81,6 +81,7 @@ jobs:
- jersey-2.26.x-2.39.x-scenario
- websphere-liberty-23.x-scenario
- nacos-client-2.x-scenario
+ - rocketmq-5-grpc-scenario
steps:
- uses: actions/checkout@v2
with:
diff --git a/CHANGES.md b/CHANGES.md
index d465d2d7f3..f6e20621b3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -152,7 +152,8 @@ Callable {
* Merge two instrumentation classes to avoid duplicate enhancements in MySQL plugins.
* Support asynchronous invocation in jetty client 9.0 and 9.x plugin
* Add nacos-client 2.x plugin
-* Staticize the tags for preventing synchronization in JDK 8
+* Staticize the tags for preventing synchronization in JDK 8
+* Add RocketMQ-Client-Java 5.x plugin
* Fix NullPointerException in lettuce-5.x-plugin.
#### Documentation
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml
index d44f682ec8..0d610b4250 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -134,6 +134,7 @@
rocketMQ-5.x-pluginwebsphere-liberty-23.x-pluginaerospike-plugin
+ rocketMQ-client-java-5.x-pluginpom
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/pom.xml
new file mode 100644
index 0000000000..dffc94662b
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/pom.xml
@@ -0,0 +1,54 @@
+
+
+
+
+
+ apm-sdk-plugin
+ org.apache.skywalking
+ 9.0.0-SNAPSHOT
+
+ 4.0.0
+
+ apm-rocketmq-client-java-5.x-plugin
+ rocketMQ-client-java-5.x-plugin
+
+
+ UTF-8
+ 5.0.5
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-client-java
+ ${rocketmq-client-java.version}
+ provided
+
+
+
+
+
+
+ maven-deploy-plugin
+
+
+
+
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
new file mode 100644
index 0000000000..513da2581c
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
+
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ConsumerEnhanceInfos;
+
+import java.lang.reflect.Method;
+
+public class MessageListenerInterceptor implements InstanceMethodsAroundInterceptor {
+
+ public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ MessageView messageView = (MessageView) allArguments[0];
+
+ ContextCarrier contextCarrier = getContextCarrierFromMessage(messageView);
+
+ AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + messageView.getTopic()
+ + "/Consumer", contextCarrier);
+ Tags.MQ_TOPIC.set(span, messageView.getTopic());
+
+ Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
+ if (skyWalkingDynamicField != null) {
+ ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos) skyWalkingDynamicField;
+ Tags.MQ_BROKER.set(span, consumerEnhanceInfos.getNamesrvAddr());
+ span.setPeer(consumerEnhanceInfos.getNamesrvAddr());
+ }
+
+ span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
+ SpanLayer.asMQ(span);
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ ConsumeResult status = (ConsumeResult) ret;
+ if (ConsumeResult.FAILURE.equals(status)) {
+ AbstractSpan activeSpan = ContextManager.activeSpan();
+ activeSpan.errorOccurred();
+ Tags.MQ_STATUS.set(activeSpan, status.name());
+ }
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+
+ private ContextCarrier getContextCarrierFromMessage(MessageView message) {
+ ContextCarrier contextCarrier = new ContextCarrier();
+
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ next.setHeadValue(message.getProperties().get(next.getHeadKey()));
+ }
+
+ return contextCarrier;
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
new file mode 100644
index 0000000000..258fcfd62d
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
+
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageBuilder;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.apis.producer.Transaction;
+import org.apache.rocketmq.client.java.impl.ClientImpl;
+import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.util.StringUtil;
+
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * {@link MessageSendInterceptor} create exit span when the method {@link org.apache.rocketmq.client.java.impl.producer.ProducerImpl#send(Message)}
+ * and {@link org.apache.rocketmq.client.java.impl.producer.ProducerImpl#send(Message, Transaction)} execute.
+ */
+public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor {
+
+ public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ Message message = (Message) allArguments[0];
+ ClientImpl producerImpl = (ClientImpl) objInst;
+
+ ContextCarrier contextCarrier = new ContextCarrier();
+ String namingServiceAddress = producerImpl.getClientConfiguration().getEndpoints();
+ AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);
+ span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
+ Tags.MQ_BROKER.set(span, namingServiceAddress);
+ Tags.MQ_TOPIC.set(span, message.getTopic());
+ Collection keys = message.getKeys();
+ if (!CollectionUtil.isEmpty(keys)) {
+ span.tag(Tags.ofKey("mq.message.keys"), keys.stream().collect(Collectors.joining(",")));
+ }
+ Optional tag = message.getTag();
+ if (tag.isPresent()) {
+ span.tag(Tags.ofKey("mq.message.tags"), tag.get());
+ }
+
+ contextCarrier.extensionInjector().injectSendingTimestamp();
+ SpanLayer.asMQ(span);
+
+ Map properties = message.getProperties();
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ if (!StringUtil.isEmpty(next.getHeadValue())) {
+ properties.put(next.getHeadKey(), next.getHeadValue());
+ }
+ }
+
+ MessageBuilder messageBuilder = new MessageBuilderImpl();
+ messageBuilder.setTopic(message.getTopic());
+ if (message.getTag().isPresent()) {
+ messageBuilder.setTag(message.getTag().get());
+ }
+ messageBuilder.setKeys(message.getKeys().toArray(new String[0]));
+ if (message.getMessageGroup().isPresent()) {
+ messageBuilder.setMessageGroup(message.getMessageGroup().get());
+ }
+
+ byte[] body = new byte[message.getBody().limit()];
+ message.getBody().get(body);
+ messageBuilder.setBody(body);
+ if (message.getDeliveryTimestamp().isPresent()) {
+ messageBuilder.setDeliveryTimestamp(message.getDeliveryTimestamp().get());
+ }
+ properties.entrySet().forEach(item -> messageBuilder.addProperty(item.getKey(), item.getValue()));
+ allArguments[0] = messageBuilder.build();
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
+ SendReceipt sendReceipt = (SendReceipt) ret;
+ if (sendReceipt != null && sendReceipt.getMessageId() != null) {
+ AbstractSpan activeSpan = ContextManager.activeSpan();
+ activeSpan.tag(Tags.ofKey("mq.message.id"), sendReceipt.getMessageId().toString());
+ }
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+
+ private String buildOperationName(String topicName) {
+ return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer";
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/PushConsumerImplInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/PushConsumerImplInterceptor.java
new file mode 100644
index 0000000000..d6aa1ec5fa
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/PushConsumerImplInterceptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+import org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ConsumerEnhanceInfos;
+
+import java.util.Map;
+
+/**
+ * {@link PushConsumerImplInterceptor} create exit span when the method {@link org.apache.rocketmq.client.java.impl.consumer.PushConsumerImpl#PushConsumerImpl(ClientConfiguration, String, Map, MessageListener, int, int, int)} execute.
+ */
+public class PushConsumerImplInterceptor implements InstanceConstructorInterceptor {
+
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+ ClientConfiguration clientConfiguration = (ClientConfiguration) allArguments[0];
+ String namesrvAddr = clientConfiguration.getEndpoints();
+ ConsumerEnhanceInfos consumerEnhanceInfos = new ConsumerEnhanceInfos(namesrvAddr);
+
+ if (allArguments[3] instanceof EnhancedInstance) {
+ EnhancedInstance enhancedMessageListener = (EnhancedInstance) allArguments[3];
+ enhancedMessageListener.setSkyWalkingDynamicField(consumerEnhanceInfos);
+ }
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ConsumerEnhanceInfos.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ConsumerEnhanceInfos.java
new file mode 100644
index 0000000000..e85c48b8da
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ConsumerEnhanceInfos.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define;
+
+public class ConsumerEnhanceInfos {
+
+ private String namesrvAddr;
+
+ public ConsumerEnhanceInfos(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/MessageListenerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/MessageListenerInstrumentation.java
new file mode 100644
index 0000000000..7bf170cb6a
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/MessageListenerInstrumentation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+public class MessageListenerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.apis.consumer.MessageListener";
+ private static final String SEND_MESSAGE_METHOD_NAME = "consume";
+ private static final String SEND_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.MessageListenerInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return HierarchyMatch.byHierarchyMatch(new String[]{ENHANCE_CLASS});
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named(SEND_MESSAGE_METHOD_NAME);
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return SEND_METHOD_INTERCEPTOR;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ProducerImplInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ProducerImplInstrumentation.java
new file mode 100644
index 0000000000..22a8c123c2
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ProducerImplInstrumentation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class ProducerImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.java.impl.producer.ProducerImpl";
+ private static final String SEND_MESSAGE_METHOD_NAME = "send";
+ private static final String SEND_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.MessageSendInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ return named(SEND_MESSAGE_METHOD_NAME).and(takesArgumentWithType(0, "org.apache.rocketmq.client.apis.message.Message"));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return SEND_METHOD_INTERCEPTOR;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return true;
+ }
+ }
+ };
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/PushConsumerImplInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/PushConsumerImplInstrumentation.java
new file mode 100644
index 0000000000..5b68b5acaf
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/PushConsumerImplInstrumentation.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class PushConsumerImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.java.impl.consumer.PushConsumerImpl";
+ private static final String CONSTRUCTOR_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.PushConsumerImplInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[]{
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher getConstructorMatcher() {
+ return takesArguments(7);
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return CONSTRUCTOR_METHOD_INTERCEPTOR;
+ }
+ }
+ };
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[0];
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/resources/skywalking-plugin.def
new file mode 100644
index 0000000000..ed2f03fde6
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.MessageListenerInstrumentation
+rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ProducerImplInstrumentation
+rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.PushConsumerImplInstrumentation
diff --git a/docs/en/setup/service-agent/java-agent/Plugin-list.md b/docs/en/setup/service-agent/java-agent/Plugin-list.md
index 186d1507f4..d1137699ab 100644
--- a/docs/en/setup/service-agent/java-agent/Plugin-list.md
+++ b/docs/en/setup/service-agent/java-agent/Plugin-list.md
@@ -86,7 +86,8 @@
- resteasy-server-6.x
- rocketMQ-3.x
- rocketMQ-4.x
-- rocketMQ-5.x
+- rocketMQ-5.x
+- rocketMQ-client-java-5.x
- sentinel-1.x
- servicecomb-2.x
- sharding-sphere-3.x
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md
index 9d172e5e15..c106d1a2ed 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -72,6 +72,7 @@ metrics based on the tracing data.
* [Nacos-Client](https://github.com/alibaba/nacos) 2.x (Optional²)
* MQ
* [RocketMQ](https://github.com/apache/rocketmq) 3.x-> 5.x
+ * [RocketMQ-gRPC](http://github.com/apache/rocketmq-clients) 5.x
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.2.3
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka))
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/bin/startup.sh b/test/plugin/scenarios/rocketmq-5-grpc-scenario/bin/startup.sh
new file mode 100644
index 0000000000..cfaffd692b
--- /dev/null
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/bin/startup.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+home="$(cd "$(dirname $0)"; pwd)"
+
+java -Dendpoints=${ENDPOINTS} -DnameServer=${NAME_SERVER} -jar ${agent_opts} ${home}/../libs/rocketmq-5-grpc-scenario.jar &
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/config/expectedData.yaml b/test/plugin/scenarios/rocketmq-5-grpc-scenario/config/expectedData.yaml
new file mode 100644
index 0000000000..3f228e2725
--- /dev/null
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/config/expectedData.yaml
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+segmentItems:
+ - serviceName: rocketmq-5-grpc-scenario
+ segmentSize: ge 2
+ segments:
+ - segmentId: not null
+ spans:
+ - operationName: RocketMQ/TopicTest/Producer
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 38
+ isError: false
+ spanType: Exit
+ peer: not null
+ tags:
+ - { key: mq.broker, value: not null }
+ - { key: mq.topic, value: TopicTest }
+ - { key: mq.message.keys, value: KeyA }
+ - { key: mq.message.tags, value: TagA }
+ - { key: mq.message.id, value: not null }
+ skipAnalysis: 'false'
+ - operationName: GET:/case/rocketmq-5-grpc-scenario
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 14
+ isError: false
+ spanType: Entry
+ peer: ''
+ tags:
+ - { key: url, value: 'http://localhost:8080/rocketmq-5-grpc-scenario/case/rocketmq-5-grpc-scenario' }
+ - { key: http.method, value: GET }
+ - { key: http.status_code, value: '200' }
+ skipAnalysis: 'false'
+ - segmentId: not null
+ spans:
+ - operationName: RocketMQ/TopicTest/Consumer
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 39
+ isError: false
+ spanType: Entry
+ peer: not blank
+ tags:
+ - { key: transmission.latency, value: not null }
+ - { key: mq.topic, value: TopicTest }
+ - { key: mq.broker, value: not blank }
+ refs:
+ - { parentEndpoint: GET:/case/rocketmq-5-grpc-scenario, networkAddress: not null,
+ refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService: not null, traceId: not null }
+ skipAnalysis: 'false'
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/configuration.yml b/test/plugin/scenarios/rocketmq-5-grpc-scenario/configuration.yml
new file mode 100644
index 0000000000..c51eb261d7
--- /dev/null
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/configuration.yml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+type: jvm
+entryService: http://localhost:8080/rocketmq-5-grpc-scenario/case/rocketmq-5-grpc-scenario
+healthCheck: http://localhost:8080/rocketmq-5-grpc-scenario/case/healthCheck
+startScript: ./bin/startup.sh
+environment:
+ - ENDPOINTS=broker:8081
+ - NAME_SERVER=namesrv:9876
+depends_on:
+ - namesrv
+ - broker
+dependencies:
+ namesrv:
+ image: apache/rocketmq:${CASE_SERVER_IMAGE_VERSION}
+ hostname: namesrv
+ command: sh mqnamesrv
+ broker:
+ image: apache/rocketmq:${CASE_SERVER_IMAGE_VERSION}
+ hostname: broker
+ command: sh mqbroker -n namesrv:9876 --enable-proxy
+ environment:
+ - autoCreateTopicEnable=true
+ depends_on:
+ - namesrv
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/pom.xml b/test/plugin/scenarios/rocketmq-5-grpc-scenario/pom.xml
new file mode 100644
index 0000000000..d27b6488b1
--- /dev/null
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/pom.xml
@@ -0,0 +1,135 @@
+
+
+
+
+ org.apache.skywalking.apm.testcase
+ rocketmq-5-grpc-scenario
+ 1.0.0
+ jar
+
+ 4.0.0
+
+ skywalking-rocketmq-5-grpc-scenario
+
+
+ UTF-8
+ 1.8
+ 3.8.1
+ 5.0.5
+ 5.1.1
+ 2.1.6.RELEASE
+ 1.18.20
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ ${spring.boot.version}
+ pom
+ import
+
+
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-client-java
+ ${rocketmq.client.version}
+
+
+
+ org.apache.rocketmq
+ rocketmq-tools
+ ${rocketmq.version}
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-log4j2
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+ provided
+
+
+
+
+
+ rocketmq-5-grpc-scenario
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring.boot.version}
+
+
+
+ repackage
+
+
+
+
+
+ maven-compiler-plugin
+ ${maven-compiler-plugin.version}
+
+
+ ${compiler.version}
+ ${project.build.sourceEncoding}
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ assemble
+ package
+
+ single
+
+
+
+ src/main/assembly/assembly.xml
+
+ ./target/
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/assembly/assembly.xml b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/assembly/assembly.xml
new file mode 100644
index 0000000000..acf7530727
--- /dev/null
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/assembly/assembly.xml
@@ -0,0 +1,41 @@
+
+
+
+
+ zip
+
+
+
+
+ ./bin
+ 0775
+
+
+
+
+
+
+ ./libs
+ 0775
+
+
+
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/Application.java b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/Application.java
new file mode 100644
index 0000000000..f9098c93c9
--- /dev/null
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/Application.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package test.apache.skywalking.apm.testcase.rocketmq.client.java;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ try {
+ SpringApplication.run(Application.class, args);
+ } catch (Exception e) {
+ // Never do this
+ }
+ }
+}
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java
new file mode 100644
index 0000000000..ba0e43815d
--- /dev/null
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package test.apache.skywalking.apm.testcase.rocketmq.client.java.controller;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.tools.command.MQAdminStartup;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+@RestController
+@RequestMapping("/case")
+@Slf4j
+public class CaseController {
+
+ private static final String SUCCESS = "Success";
+
+ @Value("${endpoints}")
+ private String endpoints;
+
+ @Value("${nameServer}")
+ private String nameServer;
+
+ static final String TOPIC = "TopicTest";
+ static final String TAG = "TagA";
+ static final String GROUP = "group1";
+
+ Producer producer;
+
+ PushConsumer consumer;
+
+ @RequestMapping("/rocketmq-5-grpc-scenario")
+ @ResponseBody
+ public String testcase() {
+ try {
+ ClientServiceProvider provider = ClientServiceProvider.loadService();
+ ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .enableSsl(false)
+ .build();
+ // start producer
+ if (producer == null) {
+ producer = provider.newProducerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ .build();
+ }
+
+ // send msg
+ Message message = provider.newMessageBuilder()
+ // Set topic for the current message.
+ .setTopic(TOPIC)
+ // Message secondary classifier of message besides topic.
+ .setTag(TAG)
+ // Key(s) of the message, another way to mark message besides message id.
+ .setKeys("KeyA")
+ .setBody("This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8))
+ .build();
+ SendReceipt sendReceipt = producer.send(message);
+
+ // start consumer
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG);
+ if (consumer == null) {
+ consumer = provider.newPushConsumerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ .setConsumerGroup(GROUP)
+ .setSubscriptionExpressions(Collections.singletonMap(TOPIC, filterExpression))
+ .setMessageListener(new MyConsumer())
+ .build();
+ }
+ } catch (Exception e) {
+ log.error("consumer start error", e);
+ }
+ }
+ });
+ thread.start();
+ } catch (Exception e) {
+ log.error("testcase error", e);
+ }
+ return SUCCESS;
+ }
+
+ @RequestMapping("/healthCheck")
+ @ResponseBody
+ public String healthCheck() throws Exception {
+ System.setProperty(MixAll.ROCKETMQ_HOME_ENV, this.getClass().getResource("/").getPath());
+ String[] subArgs = new String[]{
+ "updateTopic",
+ "-n",
+ nameServer,
+ "-c",
+ "DefaultCluster",
+ "-t",
+ "TopicTest"};
+ MQAdminStartup.main(subArgs);
+
+ subArgs = new String[]{
+ "updateSubGroup",
+ "-n",
+ nameServer,
+ "-c",
+ "DefaultCluster",
+ "-g",
+ "group1"};
+ MQAdminStartup.main(subArgs);
+
+ ClientServiceProvider provider = ClientServiceProvider.loadService();
+ ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+ .setEndpoints(endpoints)
+ .enableSsl(false)
+ .build();
+ // start producer
+ Producer producer = provider.newProducerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ .build();
+ return SUCCESS;
+ }
+
+ public static class MyConsumer implements MessageListener {
+
+ @Override
+ public ConsumeResult consume(MessageView messageView) {
+ log.info("Consume message successfully, messageId={},messageBody={}", messageView.getMessageId(),
+ messageView.getBody().toString());
+ return ConsumeResult.SUCCESS;
+ }
+ }
+
+}
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/resources/application.yaml b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/resources/application.yaml
new file mode 100644
index 0000000000..576aa83e65
--- /dev/null
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/resources/application.yaml
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+server:
+ port: 8080
+ servlet:
+ context-path: /rocketmq-5-grpc-scenario
+logging:
+ config: classpath:log4j2.xml
\ No newline at end of file
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/resources/config/tools.yml b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/resources/config/tools.yml
new file mode 100644
index 0000000000..9a37259370
--- /dev/null
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/resources/config/tools.yml
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+accessKey: rocketmq2
+secretKey: 12345678
+
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/resources/log4j2.xml b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/resources/log4j2.xml
new file mode 100644
index 0000000000..9849ed5a8a
--- /dev/null
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/resources/log4j2.xml
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/support-version.list b/test/plugin/scenarios/rocketmq-5-grpc-scenario/support-version.list
new file mode 100644
index 0000000000..a9219d9010
--- /dev/null
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/support-version.list
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# lists your version here (Contains only the last version number of each minor version.)
+
+5.1.1