From 744eb1e5d822f8e3564914cde52881620b77de78 Mon Sep 17 00:00:00 2001 From: darkness-2nd <81127133+darkness-2nd@users.noreply.github.com> Date: Thu, 25 Jul 2024 16:33:03 +0800 Subject: [PATCH] Support Kafka 3.7+ (#707) --- .dlc.json | 3 +- CHANGES.md | 2 +- .../Kafka37AsyncConsumerInstrumentation.java | 40 +++++++++++++++++++ .../Kafka37LegacyConsumerInstrumentation.java | 40 +++++++++++++++++++ .../src/main/resources/skywalking-plugin.def | 4 +- .../service-agent/java-agent/Plugin-list.md | 1 + .../java-agent/Supported-list.md | 2 +- .../kafka/controller/CaseController.java | 15 ++++++- .../kafka-scenario/support-version.list | 3 ++ 9 files changed, 104 insertions(+), 6 deletions(-) create mode 100644 apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/Kafka37AsyncConsumerInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/Kafka37LegacyConsumerInstrumentation.java diff --git a/.dlc.json b/.dlc.json index 1ae0883a40..6a207d2c58 100644 --- a/.dlc.json +++ b/.dlc.json @@ -36,6 +36,7 @@ 200, 301, 302, - 401 + 401, + 403 ] } diff --git a/CHANGES.md b/CHANGES.md index 623b5015f1..d2ab11f4ee 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -22,7 +22,7 @@ Release Notes. * Improve 4x performance of ContextManagerExtendService.createTraceContext() * Add a plugin that supports the Solon framework. * Fixed issues in the MySQL component where the executeBatch method could result in empty SQL statements . - +* Support kafka-clients-3.7.x intercept All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/213?closed=1) diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/Kafka37AsyncConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/Kafka37AsyncConsumerInstrumentation.java new file mode 100644 index 0000000000..7c24a135ac --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/Kafka37AsyncConsumerInstrumentation.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.kafka.define; + +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * For Kafka 3.7.x change + * + *
+ * 1. The method named pollForFetchs was removed from KafkaConsumer to+ */ +public class Kafka37AsyncConsumerInstrumentation extends KafkaConsumerInstrumentation { + + private static final String ENHANCE_CLASS_37_ASYNC = "org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS_37_ASYNC); + } +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/Kafka37LegacyConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/Kafka37LegacyConsumerInstrumentation.java new file mode 100644 index 0000000000..95f33bfe82 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/Kafka37LegacyConsumerInstrumentation.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.kafka.define; + +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * For Kafka 3.7.x change + * + *AsyncKafkaConsumer
andLegacyKafkaConsumer
+ * 2. Because of the enhance class was changed, so we should create new Instrumentation to intercept the method + *
+ * 1. The method named pollForFetchs was removed from KafkaConsumer to+ */ +public class Kafka37LegacyConsumerInstrumentation extends KafkaConsumerInstrumentation { + + private static final String ENHANCE_CLASS_37_LEGACY = "org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer"; + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS_37_LEGACY); + } +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def index d807aceefe..a85deb260b 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def @@ -18,4 +18,6 @@ kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstr kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation -kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation \ No newline at end of file +kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation +kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.Kafka37AsyncConsumerInstrumentation +kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.Kafka37LegacyConsumerInstrumentation diff --git a/docs/en/setup/service-agent/java-agent/Plugin-list.md b/docs/en/setup/service-agent/java-agent/Plugin-list.md index e15b3b1f00..d2c07c5a41 100644 --- a/docs/en/setup/service-agent/java-agent/Plugin-list.md +++ b/docs/en/setup/service-agent/java-agent/Plugin-list.md @@ -54,6 +54,7 @@ - jetty-client-9.x - jetty-server-9.x - kafka-0.11.x/1.x/2.x +- kafka-3.7.x - kotlin-coroutine - lettuce-5.x - light4j diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md index c91c25d4a2..cd7c02f78b 100644 --- a/docs/en/setup/service-agent/java-agent/Supported-list.md +++ b/docs/en/setup/service-agent/java-agent/Supported-list.md @@ -76,7 +76,7 @@ metrics based on the tracing data. * MQ * [RocketMQ](https://github.com/apache/rocketmq) 3.x-> 5.x * [RocketMQ-gRPC](http://github.com/apache/rocketmq-clients) 5.x - * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.2.3 + * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.7.1 * [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka)) * [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4 * [RabbitMQ](https://www.rabbitmq.com/) 3.x-> 5.x diff --git a/test/plugin/scenarios/kafka-scenario/src/main/java/test/apache/skywalking/apm/testcase/kafka/controller/CaseController.java b/test/plugin/scenarios/kafka-scenario/src/main/java/test/apache/skywalking/apm/testcase/kafka/controller/CaseController.java index ea8bb6e3c1..ee6458bdc1 100644 --- a/test/plugin/scenarios/kafka-scenario/src/main/java/test/apache/skywalking/apm/testcase/kafka/controller/CaseController.java +++ b/test/plugin/scenarios/kafka-scenario/src/main/java/test/apache/skywalking/apm/testcase/kafka/controller/CaseController.java @@ -24,6 +24,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.Collection; import java.util.regex.Pattern; import java.util.List; import java.util.ArrayList; @@ -32,10 +33,10 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -270,7 +271,17 @@ public void run() { consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerAsyncKafkaConsumer
andLegacyKafkaConsumer
+ * 2. Because of the enhance class was changed, so we should create new Instrumentation to intercept the method + *