Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/repairkafka3.7 #707

Merged
merged 23 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3feef5a
Fixed an issue where intercept methods for kafka3.7 were invalid
darkness-2nd Jul 21, 2024
2971c2b
Fixed an issue where intercept methods for kafka3.7 were invalid
darkness-2nd Jul 21, 2024
7db4f51
Revert "Fixed an issue where intercept methods for kafka3.7 were inva…
darkness-2nd Jul 22, 2024
1cbc3db
Revert "Fixed an issue where intercept methods for kafka3.7 were inva…
darkness-2nd Jul 22, 2024
6220bd4
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 22, 2024
1e2b82a
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 23, 2024
d875bd3
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 23, 2024
ef2193b
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 23, 2024
e20ef76
repair ci error
darkness-2nd Jul 23, 2024
3543204
add case of 3.6.0
darkness-2nd Jul 23, 2024
526abbb
fix upe
darkness-2nd Jul 23, 2024
abd3848
repair spring-kafka 2.2.x ci
darkness-2nd Jul 25, 2024
0cb524c
remove dead line in Bootstrap-plugins.md
darkness-2nd Jul 25, 2024
f31fb6f
repair the uncompatible code in kafka Case
darkness-2nd Jul 25, 2024
8297b83
test ci for 3.7 jdk ci
darkness-2nd Jul 25, 2024
73c85a0
add support version in Supported-list.md
darkness-2nd Jul 25, 2024
1cc9438
remove error class
darkness-2nd Jul 25, 2024
960be6c
remove error class
darkness-2nd Jul 25, 2024
14ebf37
Add comments for resolve the problem that Kafka3.7.x can not be inter…
darkness-2nd Jul 25, 2024
39353e8
revert the format codes
darkness-2nd Jul 25, 2024
3f51753
ignore 403 when checking markdown link
kezhenxu94 Jul 25, 2024
1a5f39c
Update CHANGES.md: Support kafka-clients-3.7.x intercept
darkness-2nd Jul 25, 2024
b876765
Update docs/en/setup/service-agent/java-agent/Supported-list.md
wu-sheng Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;

import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

public class Kafka37ConsumerInterceptor extends KafkaConsumerInterceptor {

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
if (ret instanceof ConsumerRecords) {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
ConsumerRecords<?, ?> consumerRecords = (ConsumerRecords<?, ?>) ret;
if (consumerRecords.count() == 0) {
return ret;
}
for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
if (consumerRecord == null) {
continue;
}
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
Iterator<Header> iterator = consumerRecord.headers().headers(next.getHeadKey()).iterator();
if (iterator.hasNext()) {
next.setHeadValue(new String(iterator.next().value(), StandardCharsets.UTF_8));
}
}
String operationName = OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId();
AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier).start(requiredInfo.getStartTime());
activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER);
SpanLayer.asMQ(activeSpan);
Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers());
Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics());
activeSpan.setPeer(requiredInfo.getBrokerServers());
ContextManager.stopSpan();
}
}
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
public static final String INTERCEPTOR_CLASS_KAFKA3_2 = "org.apache.skywalking.apm.plugin.kafka.Kafka3ConsumerInterceptor";
public static final String ENHANCE_METHOD = "pollOnce";
public static final String ENHANCE_METHOD_KAFKA3_7 = "poll";
public static final String INTERCEPTOR_CLASS_KAFKA3_7 = "org.apache.skywalking.apm.plugin.kafka.Kafka37ConsumerInterceptor";
public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
public static final String SUBSCRIBE_METHOD = "subscribe";
Expand All @@ -62,104 +64,121 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
}
return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not reformat, thr code style file is in the source codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not reformat, thr code style file is in the source codes.

I reset the java file which I reformat


@Override
public String getConstructorInterceptor() {
return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
},
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_MAP_TYPE);
}

@Override
public String getConstructorInterceptor() {
return MAP_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
},
@Override
public String getConstructorInterceptor() {
return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
},
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_MAP_TYPE);
}

@Override
public String getConstructorInterceptor() {
return MAP_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert re-format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert re-format.

I have already recerted it, this class dosen't change, I have already rollback it to the original version

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already recerted it, this class dosen't change, I have already rollback it to the original version

No, you didn't. Change is still there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already recerted it, this class dosen't change, I have already rollback it to the original version

No, you didn't. Change is still there.

Those changes are compare to the changes which are not correct I commit before...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already recerted it, this class dosen't change, I have already rollback it to the original version

No, you didn't. Change is still there.

I dont't know why.... I check out the original branch and copy the codes into the class file, but the IDE says that there is no difference.....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already recerted it, this class dosen't change, I have already rollback it to the original version

No, you didn't. Change is still there.

Resolved


};
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// targeting Kafka Client < 3.2
return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD).and(returns(Map.class)));
}
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// targeting Kafka Client < 3.2
return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD).and(returns(Map.class)));
}

@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}

@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// targeting Kafka Client >= 3.2
return named(ENHANCE_COMPATIBLE_METHOD).and(returns(named("org.apache.kafka.clients.consumer.internals.Fetch")));
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// targeting Kafka Client >= 3.2
return named(ENHANCE_COMPATIBLE_METHOD).and(returns(named("org.apache.kafka.clients.consumer.internals.Fetch")));
}

@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS_KAFKA3_2;
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS_KAFKA3_2;
}

@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD)
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_NAME));
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// targeting Kafka Client >= 3.7
return named(ENHANCE_METHOD_KAFKA3_7).and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords")));
}

@Override
public String getMethodsInterceptor() {
return SUBSCRIBE_INTERCEPT_CLASS;
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS_KAFKA3_7;
}

@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD)
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_PATTERN));
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD)
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_NAME));
}

@Override
public String getMethodsInterceptor() {
return SUBSCRIBE_INTERCEPT_CLASS;
}
@Override
public String getMethodsInterceptor() {
return SUBSCRIBE_INTERCEPT_CLASS;
}

@Override
public boolean isOverrideArgs() {
return false;
}
},
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD)
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_PATTERN));
}

@Override
public String getMethodsInterceptor() {
return SUBSCRIBE_INTERCEPT_CLASS;
}

@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
Expand Down