-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge span normalizer into grouper #384
base: main
Are you sure you want to change the base?
Changes from all commits
ee9aab6
3c11b4c
23b0cde
ddcd352
641a5f5
a0c7eef
93ab874
dc060e7
fd7aaab
96936e0
e08b840
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,13 +6,18 @@ | |
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG; | ||
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME; | ||
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE; | ||
import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.BYPASS_OUTPUT_TOPIC_CONFIG_KEY; | ||
import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY; | ||
|
||
import com.typesafe.config.Config; | ||
import io.jaegertracing.api_v2.JaegerSpanInternalModel; | ||
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.kafka.common.serialization.Serde; | ||
import org.apache.kafka.common.serialization.Serdes; | ||
import org.apache.kafka.streams.StreamsBuilder; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.streams.kstream.Consumed; | ||
import org.apache.kafka.streams.kstream.KStream; | ||
import org.apache.kafka.streams.kstream.Named; | ||
import org.apache.kafka.streams.kstream.Produced; | ||
|
@@ -29,6 +34,13 @@ | |
import org.hypertrace.core.spannormalizer.SpanIdentity; | ||
import org.hypertrace.core.spannormalizer.TraceIdentity; | ||
import org.hypertrace.core.spannormalizer.TraceState; | ||
import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanPreProcessor; | ||
import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanSerde; | ||
import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanToAvroRawSpanTransformer; | ||
import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanToLogRecordsTransformer; | ||
import org.hypertrace.core.spannormalizer.jaeger.PreProcessedSpan; | ||
import org.hypertrace.core.spannormalizer.rawspan.ByPassPredicate; | ||
import org.hypertrace.core.spannormalizer.rawspan.RawSpanToStructuredTraceTransformer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -44,17 +56,19 @@ public StreamsBuilder buildTopology( | |
Map<String, Object> properties, | ||
StreamsBuilder streamsBuilder, | ||
Map<String, KStream<?, ?>> inputStreams) { | ||
|
||
Config jobConfig = getJobConfig(properties); | ||
String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); | ||
String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); | ||
String bypassOutputTopic = jobConfig.getString(BYPASS_OUTPUT_TOPIC_CONFIG_KEY); | ||
String outputTopicRawLogs = jobConfig.getString(OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY); | ||
|
||
KStream<TraceIdentity, RawSpan> inputStream = | ||
(KStream<TraceIdentity, RawSpan>) inputStreams.get(inputTopic); | ||
KStream<byte[], JaegerSpanInternalModel.Span> inputStream = | ||
(KStream<byte[], JaegerSpanInternalModel.Span>) inputStreams.get(inputTopic); | ||
if (inputStream == null) { | ||
inputStream = | ||
streamsBuilder | ||
// read the input topic | ||
.stream(inputTopic); | ||
streamsBuilder.stream( | ||
inputTopic, Consumed.with(Serdes.ByteArray(), new JaegerSpanSerde())); | ||
inputStreams.put(inputTopic, inputStream); | ||
} | ||
|
||
|
@@ -75,7 +89,34 @@ public StreamsBuilder buildTopology( | |
streamsBuilder.addStateStore(spanStoreBuilder); | ||
streamsBuilder.addStateStore(traceStateStoreBuilder); | ||
|
||
StreamPartitioner<TraceIdentity, StructuredTrace> groupPartitioner = | ||
KStream<byte[], PreProcessedSpan> preProcessedStream = | ||
inputStream.transform(() -> new JaegerSpanPreProcessor(getGrpcChannelRegistry())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How are we ensuring that this jaeger spans topic is consumed exactly from the offset where the span-normalizer got decommissioned? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Being different applications, it will be difficult to get them in sync. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My suggestion would be the following: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably need to keep both applications for now. And may need to move |
||
|
||
// logs output | ||
preProcessedStream.transform(JaegerSpanToLogRecordsTransformer::new).to(outputTopicRawLogs); | ||
|
||
KStream<TraceIdentity, RawSpan>[] branches = | ||
preProcessedStream | ||
.transform(JaegerSpanToAvroRawSpanTransformer::new) | ||
.branch(new ByPassPredicate(jobConfig), (key, value) -> true); | ||
|
||
KStream<TraceIdentity, RawSpan> bypassTopicBranch = branches[0]; | ||
KStream<TraceIdentity, RawSpan> outputTopicBranch = branches[1]; | ||
|
||
StreamPartitioner<String, StructuredTrace> tenantIsolationPartitionerForBypassTopic = | ||
new GroupPartitionerBuilder<String, StructuredTrace>() | ||
.buildPartitioner( | ||
"spans", | ||
jobConfig, | ||
(traceid, span) -> traceid, | ||
new KeyHashPartitioner<>(), | ||
getGrpcChannelRegistry()); | ||
|
||
bypassTopicBranch | ||
.transform(RawSpanToStructuredTraceTransformer::new) | ||
.to(bypassOutputTopic, Produced.with(null, null, tenantIsolationPartitionerForBypassTopic)); | ||
|
||
StreamPartitioner<TraceIdentity, StructuredTrace> tenantIsolationPartitionerForOutputTopic = | ||
new GroupPartitionerBuilder<TraceIdentity, StructuredTrace>() | ||
.buildPartitioner( | ||
"spans", | ||
|
@@ -85,10 +126,10 @@ public StreamsBuilder buildTopology( | |
getGrpcChannelRegistry()); | ||
|
||
Produced<TraceIdentity, StructuredTrace> outputTopicProducer = | ||
Produced.with(null, null, groupPartitioner); | ||
Produced.with(null, null, tenantIsolationPartitionerForOutputTopic); | ||
outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER); | ||
|
||
inputStream | ||
outputTopicBranch | ||
.transform( | ||
RawSpansProcessor::new, | ||
Named.as(RawSpansProcessor.class.getSimpleName()), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to be an incremental change. We need to read from both
jaeger-spans
andraw-spans-from-jaeger-spans
topics in this PR.Only in follow up PR some time down the line, we can just read from
jaeger-spans