Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/repairkafka3.7 #707

Merged
merged 23 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3feef5a
Fixed an issue where intercept methods for kafka3.7 were invalid
darkness-2nd Jul 21, 2024
2971c2b
Fixed an issue where intercept methods for kafka3.7 were invalid
darkness-2nd Jul 21, 2024
7db4f51
Revert "Fixed an issue where intercept methods for kafka3.7 were inva…
darkness-2nd Jul 22, 2024
1cbc3db
Revert "Fixed an issue where intercept methods for kafka3.7 were inva…
darkness-2nd Jul 22, 2024
6220bd4
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 22, 2024
1e2b82a
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 23, 2024
d875bd3
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 23, 2024
ef2193b
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 23, 2024
e20ef76
repair ci error
darkness-2nd Jul 23, 2024
3543204
add case of 3.6.0
darkness-2nd Jul 23, 2024
526abbb
fix upe
darkness-2nd Jul 23, 2024
abd3848
repair spring-kafka 2.2.x ci
darkness-2nd Jul 25, 2024
0cb524c
remove dead line in Bootstrap-plugins.md
darkness-2nd Jul 25, 2024
f31fb6f
repair the uncompatible code in kafka Case
darkness-2nd Jul 25, 2024
8297b83
test ci for 3.7 jdk ci
darkness-2nd Jul 25, 2024
73c85a0
add support version in Supported-list.md
darkness-2nd Jul 25, 2024
1cc9438
remove error class
darkness-2nd Jul 25, 2024
960be6c
remove error class
darkness-2nd Jul 25, 2024
14ebf37
Add comments for resolve the problem that Kafka3.7.x can not be inter…
darkness-2nd Jul 25, 2024
39353e8
revert the format codes
darkness-2nd Jul 25, 2024
3f51753
ignore 403 when checking markdown link
kezhenxu94 Jul 25, 2024
1a5f39c
Update CHANGES.md: Support kafka-clients-3.7.x intercept
darkness-2nd Jul 25, 2024
b876765
Update docs/en/setup/service-agent/java-agent/Supported-list.md
wu-sheng Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* kafak3.7 removed the method named pollForFetches with return type Fetch.
* When the kafka version < 3.7, the enhance method could be: org.apache.kafka.clients.consumer.KafkaConsumer#pollForFetches.
* But when the kafka version >= 3.7, the enhance method should be: org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration).
* And the return type was also changed, from org.apache.kafka.clients.consumer.internals.Fetch to org.apache.kafka.clients.consumer.ConsumerRecords.
*/
public class Kafka37ConsumerInterceptor extends KafkaConsumerInterceptor {

@Override
protected Map<TopicPartition, List<ConsumerRecord<?, ?>>> fetchRecords(Object retObj) {
Map<TopicPartition, List<ConsumerRecord<?, ?>>> rsp = new HashMap<>();
if (retObj instanceof ConsumerRecords) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why instanceof? Is there another case to have another parameter? The parent class has Map<TopicPartition, List<ConsumerRecord<?, ?>>> type as the parameter, but it should not affect this new Kafka37ConsumerInterceptor, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should be removed, I missed it, I'll delete it at next commit

ConsumerRecords<?, ?> consumerRecords = (ConsumerRecords<?, ?>) retObj;
if (consumerRecords.isEmpty()) {
return rsp;
}
for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
int partition = consumerRecord.partition();
String topic = consumerRecord.topic();
TopicPartition topicPartition = new TopicPartition(topic, partition);
List<ConsumerRecord<?, ?>> recordsForTp = rsp.computeIfAbsent(topicPartition, k -> new ArrayList<>());
recordsForTp.add(consumerRecord);
}
}
return rsp;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.kafka.define;

import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the comment, I think we can focus on the difference instead of repeating what is already written in the parent class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the comment, I think we can focus on the difference instead of repeating what is already written in the parent class.

Yes, the difference is the method named pollForFetchs was removed from KafkaConsumer to another two classes, so the original interceptor can not intercept it. Because of the enhance class is changed, so I create two new classes to repair the uncompatible problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the difference is the method named pollForFetchs was removed from KafkaConsumer to another two classes, so the original interceptor can not intercept it. Because of the enhance class is changed, so I create two new classes to repair the uncompatible problem.

I meant you can modify the comment for this class......

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I forget to change the comment

* Here is the intercept process steps.
*
* <pre>
* 1. Record the topic when the client invoke <code>subscribed</code> method
* 2. Create the entry span when the client invoke the method <code>pollOnce</code>.
* 3. Extract all the <code>Trace Context</code> by iterate all <code>ConsumerRecord</code>
* 4. Stop the entry span when <code>pollOnce</code> method finished.
* </pre>
*/
public class Kafka37AsyncConsumerInstrumentation extends KafkaConsumerInstrumentation {

private static final String ENHANCE_CLASS_37_ASYNC = "org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer";

@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS_37_ASYNC);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.kafka.define;

import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;

/**
* Here is the intercept process steps.
*
* <pre>
* 1. Record the topic when the client invoke <code>subscribed</code> method
* 2. Create the entry span when the client invoke the method <code>pollOnce</code>.
* 3. Extract all the <code>Trace Context</code> by iterate all <code>ConsumerRecord</code>
* 4. Stop the entry span when <code>pollOnce</code> method finished.
* </pre>
*/
public class Kafka37LegacyConsumerInstrumentation extends KafkaConsumerInstrumentation {

private static final String ENHANCE_CLASS_37_LEGACY = "org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer";

@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS_37_LEGACY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,103 +63,103 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
}
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
}

@Override
public String getConstructorInterceptor() {
return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
},
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_MAP_TYPE);
}

@Override
public String getConstructorInterceptor() {
return MAP_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
},
@Override
public String getConstructorInterceptor() {
return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
},
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_MAP_TYPE);
}

@Override
public String getConstructorInterceptor() {
return MAP_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert re-format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert re-format.

I have already recerted it, this class dosen't change, I have already rollback it to the original version

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already recerted it, this class dosen't change, I have already rollback it to the original version

No, you didn't. Change is still there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already recerted it, this class dosen't change, I have already rollback it to the original version

No, you didn't. Change is still there.

Those changes are compare to the changes which are not correct I commit before...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already recerted it, this class dosen't change, I have already rollback it to the original version

No, you didn't. Change is still there.

I dont't know why.... I check out the original branch and copy the codes into the class file, but the IDE says that there is no difference.....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already recerted it, this class dosen't change, I have already rollback it to the original version

No, you didn't. Change is still there.

Resolved


};
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// targeting Kafka Client < 3.2
return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD).and(returns(Map.class)));
}
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// targeting Kafka Client < 3.2
return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD).and(returns(Map.class)));
}

@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}

@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// targeting Kafka Client >= 3.2
return named(ENHANCE_COMPATIBLE_METHOD).and(returns(named("org.apache.kafka.clients.consumer.internals.Fetch")));
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// targeting Kafka Client >= 3.2
return named(ENHANCE_COMPATIBLE_METHOD).and(returns(named("org.apache.kafka.clients.consumer.internals.Fetch")));
}

@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS_KAFKA3_2;
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS_KAFKA3_2;
}

@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD)
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_NAME));
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD)
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_NAME));
}

@Override
public String getMethodsInterceptor() {
return SUBSCRIBE_INTERCEPT_CLASS;
}
@Override
public String getMethodsInterceptor() {
return SUBSCRIBE_INTERCEPT_CLASS;
}

@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD)
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_PATTERN));
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD)
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_PATTERN));
}

@Override
public String getMethodsInterceptor() {
return SUBSCRIBE_INTERCEPT_CLASS;
}
@Override
public String getMethodsInterceptor() {
return SUBSCRIBE_INTERCEPT_CLASS;
}

@Override
public boolean isOverrideArgs() {
return false;
}
},
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
Expand All @@ -184,4 +184,4 @@ public boolean isOverrideArgs() {
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstr
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.Kafka37AsyncConsumerInstrumentation
kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.Kafka37LegacyConsumerInstrumentation
Loading
Loading