diff --git a/gateway-service-impl/build.gradle.kts b/gateway-service-impl/build.gradle.kts index 1e27419c..c999b63f 100644 --- a/gateway-service-impl/build.gradle.kts +++ b/gateway-service-impl/build.gradle.kts @@ -29,6 +29,8 @@ dependencies { // Needed by clusters snapshots implementation("com.fasterxml.jackson.core:jackson-annotations:2.11.1") implementation("com.fasterxml.jackson.core:jackson-databind:2.11.1") + annotationProcessor("org.projectlombok:lombok:1.18.18") + compileOnly("org.projectlombok:lombok:1.18.18") testImplementation("org.junit.jupiter:junit-jupiter:5.7.0") testImplementation("org.mockito:mockito-core:3.9.0") diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java index 5c7a9931..2e06a9cb 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/GatewayServiceImpl.java @@ -26,6 +26,7 @@ import org.hypertrace.gateway.service.entity.config.LogConfig; import org.hypertrace.gateway.service.explore.ExploreService; import org.hypertrace.gateway.service.logevent.LogEventsService; +import org.hypertrace.gateway.service.span.ClockskewAdjuster; import org.hypertrace.gateway.service.span.SpanService; import org.hypertrace.gateway.service.trace.TracesService; import org.hypertrace.gateway.service.v1.baseline.BaselineEntitiesRequest; @@ -91,7 +92,11 @@ public GatewayServiceImpl(Config appConfig) { new TracesService( queryServiceClient, qsRequestTimeout, attributeMetadataProvider, scopeFilterConfigs); this.spanService = - new SpanService(queryServiceClient, qsRequestTimeout, attributeMetadataProvider); + new SpanService( + queryServiceClient, + qsRequestTimeout, + attributeMetadataProvider, + ClockskewAdjuster.getAdjuster(appConfig)); this.entityService = new EntityService( queryServiceClient, @@ -174,7 +179,9 @@ public void getSpans( org.hypertrace.core.grpcutils.context.RequestContext.CURRENT .get() .getRequestHeaders()); + SpansResponse response = spanService.getSpansByFilter(context, request); + responseObserver.onNext(response); responseObserver.onCompleted(); } catch (Exception e) { diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java new file mode 100644 index 00000000..49a012c4 --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjuster.java @@ -0,0 +1,51 @@ +package org.hypertrace.gateway.service.span; + +import com.typesafe.config.Config; +import java.time.Duration; +import java.time.Instant; +import lombok.Data; +import lombok.experimental.Accessors; +import org.hypertrace.gateway.service.v1.span.SpanEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ClockskewAdjuster implements SpanTransformationStage { + + private static final Logger LOG = LoggerFactory.getLogger(ClockskewAdjuster.class); + + private static final String CONFIG_PATH = "clockskew.adjuster"; + private static final String FALLBACK_TYPE = "noop"; + + /** + * Returns a concrete implementation of {@link ClockskewAdjuster} based on the supplied type. If + * an incorrect configuration is supplied, then it fallbacks to {@link NoOpClockskewAdjuster}. + * This method never throws an exception. Any exception propagated up from downstream is swallowed + * and the fallback adjuster is returned + * + * @param appConfig the app configuration + * @return a concrete implementation of {@link ClockskewAdjuster} + */ + public static ClockskewAdjuster getAdjuster(Config appConfig) { + String type; + try { + type = appConfig.getString(CONFIG_PATH); + return ClockskewAdjusters.getAdjuster(type); + } catch (Exception e) { + LOG.warn( + "Some exception occurred while trying to get the clockskew adjuster, falling back to no-op adjuster"); + return ClockskewAdjusters.getAdjuster(FALLBACK_TYPE); + } + } + + @Data + @Accessors(chain = true, fluent = true) + static class Span { + private String id; + private String parentSpanId; + private Instant startTime; + private Instant endTime; + private Duration duration; + // the mutable builder object + private SpanEvent.Builder spanBuilder; + } +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java new file mode 100644 index 00000000..013b9544 --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/ClockskewAdjusters.java @@ -0,0 +1,33 @@ +package org.hypertrace.gateway.service.span; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Factory to return concrete instances of {@link ClockskewAdjuster} based on the supplied type */ +class ClockskewAdjusters { + + private static final Map> REGISTRY = + new HashMap<>(); + + private static final Logger LOG = LoggerFactory.getLogger(ClockskewAdjusters.class); + + private static final String JAEGAR_BASED = "jaegar"; + private static final String NOOP_BASED = "noop"; + + static { + REGISTRY.put(JAEGAR_BASED, JaegarBasedClockskewAdjuster::new); + REGISTRY.put(NOOP_BASED, NoOpClockskewAdjuster::new); + } + + static ClockskewAdjuster getAdjuster(String type) { + Supplier supplier = REGISTRY.get(type); + if (null == supplier) { + LOG.warn("No clockskew adjuster for the supplied config, falling back to no-op adjuster"); + return REGISTRY.get(NOOP_BASED).get(); + } + return supplier.get(); + } +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java new file mode 100644 index 00000000..59ce5be8 --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/JaegarBasedClockskewAdjuster.java @@ -0,0 +1,100 @@ +package org.hypertrace.gateway.service.span; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import lombok.NonNull; +import org.hypertrace.gateway.service.v1.common.Value; +import org.hypertrace.gateway.service.v1.common.ValueType; +import org.hypertrace.gateway.service.v1.span.SpanEvent; + +class JaegarBasedClockskewAdjuster extends ClockskewAdjuster { + + @Override + public List transform(@NonNull List spans) { + Map parentChildMap = new HashMap<>(); + Map idToSpanMap = + spans.stream() + .map( + spanBuilder -> { + Map attributesMap = spanBuilder.getAttributesMap(); + String spanId = attributesMap.get("EVENT.id").getString(), + parentSpanId = attributesMap.get("EVENT.parentSpanId").getString(); + Instant + startTime = + Instant.ofEpochMilli(attributesMap.get("EVENT.startTime").getLong()), + endTime = Instant.ofEpochMilli(attributesMap.get("EVENT.endTime").getLong()); + Duration duration = Duration.between(startTime, endTime); + return new Span() + .id(spanId) + .parentSpanId(parentSpanId) + .startTime(startTime) + .endTime(endTime) + .duration(duration) + .spanBuilder(spanBuilder); + }) + .peek( + span -> { + if (null != span.parentSpanId()) { + parentChildMap.putIfAbsent(span.parentSpanId(), span.id()); + } + }) + .collect(toMap(Span::id, span -> span)); + // Start from root spans, and adjust each parent-child pair + return idToSpanMap.entrySet().stream() + .filter(span -> null == span.getValue().parentSpanId()) + .peek(entry -> adjustSpan(entry.getValue(), idToSpanMap, parentChildMap)) + .map( + entry -> { + Span updatedSpan = entry.getValue(); + entry + .getValue() + .spanBuilder() + .putAttributes( + "EVENT.startTime", + Value.newBuilder() + .setValueType(ValueType.LONG) + .setLong(updatedSpan.startTime().toEpochMilli()) + .build()); + return updatedSpan.spanBuilder(); + }) + .collect(toList()); + } + + private void adjustSpan( + Span span, Map idToSpanMap, Map parentToChildMap) { + if (null != span) { + Span childSpan = idToSpanMap.get(parentToChildMap.get(span.id())); + Duration adjustment = getAdjustmentForChildSpan(childSpan, span); + adjustTimestamp(childSpan, adjustment); + adjustSpan(childSpan, idToSpanMap, parentToChildMap); + } + } + + private void adjustTimestamp(Span childSpan, Duration adjustment) { + childSpan.startTime().plus(adjustment); + } + + private Duration getAdjustmentForChildSpan(Span childSpan, Span parentSpan) { + // if child span is greater than parent span + if (childSpan.duration().compareTo(parentSpan.duration()) > 0) { + // in this case, we can only ensure that it does not start before its parent + if (childSpan.startTime().isBefore(parentSpan.startTime())) { + return Duration.between(parentSpan.startTime(), childSpan.startTime()); + } + return Duration.ofMillis(0); + } + // if child already fits in its parent, do not adjust + if (!childSpan.startTime().isBefore(parentSpan.startTime()) + && !childSpan.endTime().isAfter(parentSpan.endTime())) { + return Duration.ofMillis(0); + } + var latency = (parentSpan.duration().minus(childSpan.duration()).toMillis()) >> 1; + return Duration.between(childSpan.startTime(), parentSpan.startTime().plusMillis(latency)); + } +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java new file mode 100644 index 00000000..5106e7cc --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjuster.java @@ -0,0 +1,12 @@ +package org.hypertrace.gateway.service.span; + +import java.util.ArrayList; +import java.util.List; +import lombok.NonNull; +import org.hypertrace.gateway.service.v1.span.SpanEvent; + +public class NoOpClockskewAdjuster extends ClockskewAdjuster { + public List transform(@NonNull List spans) { + return new ArrayList<>(spans); + } +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java index 95c0020c..69ecc2b1 100644 --- a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanService.java @@ -44,16 +44,20 @@ public class SpanService { private final QueryServiceClient queryServiceClient; private final int requestTimeout; private final AttributeMetadataProvider attributeMetadataProvider; + private final SpanTransformationPipeline spanTransformationPipeline; private Timer queryExecutionTimer; public SpanService( QueryServiceClient queryServiceClient, int requestTimeout, - AttributeMetadataProvider attributeMetadataProvider) { + AttributeMetadataProvider attributeMetadataProvider, + ClockskewAdjuster clockskewAdjuster) { this.queryServiceClient = queryServiceClient; this.requestTimeout = requestTimeout; this.attributeMetadataProvider = attributeMetadataProvider; + spanTransformationPipeline = + SpanTransformationPipeline.getNewPipeline().addProcessingStage(clockskewAdjuster); initMetrics(); } @@ -89,7 +93,8 @@ Collection filterSpans( RequestContext context, SpansRequest request, Map attributeMetadataMap) { - return filterSpanEvents(context, request, attributeMetadataMap); + return spanTransformationPipeline.execute( + filterSpanEvents(context, request, attributeMetadataMap)); } @VisibleForTesting diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java new file mode 100644 index 00000000..ff293e6c --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationPipeline.java @@ -0,0 +1,50 @@ +package org.hypertrace.gateway.service.span; + +import static java.util.stream.Collectors.toList; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import org.hypertrace.gateway.service.v1.span.SpanEvent; +import org.hypertrace.gateway.service.v1.span.SpanEvent.Builder; + +/** + * A pipeline of handlers that transform a list of spans through it. This class is not thread-safe. + */ +class SpanTransformationPipeline { + + private final Function, List> pipeline; + + private SpanTransformationPipeline() { + this.pipeline = ArrayList::new; + } + + public static SpanTransformationPipeline getNewPipeline() { + return new SpanTransformationPipeline(); + } + + private SpanTransformationPipeline( + Function, List> pipeline) { + this.pipeline = pipeline; + } + + public SpanTransformationPipeline addProcessingStage(SpanTransformationStage processingStage) { + Function, List> updatedPipeline = + pipeline.andThen(processingStage::transform); + return new SpanTransformationPipeline(updatedPipeline); + } + + /** + * Processes the passed list of spans through the pipeline + * + * @param spans list of spans to process + * @return processed spans + */ + public List execute(List spans) { + List mutableSpans = + spans.stream() + .map(span -> SpanEvent.newBuilder().putAllAttributes(span.getAttributesMap())) + .collect(toList()); + return pipeline.apply(mutableSpans).stream().map(SpanEvent.Builder::build).collect(toList()); + } +} diff --git a/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java new file mode 100644 index 00000000..7fe1a329 --- /dev/null +++ b/gateway-service-impl/src/main/java/org/hypertrace/gateway/service/span/SpanTransformationStage.java @@ -0,0 +1,10 @@ +package org.hypertrace.gateway.service.span; + +import java.util.List; +import lombok.NonNull; +import org.hypertrace.gateway.service.v1.span.SpanEvent; + +/** Represents a transformation stage in the {@link SpanTransformationPipeline} */ +interface SpanTransformationStage { + List transform(@NonNull List spans); +} diff --git a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/GatewayServiceImplTest.java b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/GatewayServiceImplTest.java index e4c90b4f..ee34da6f 100644 --- a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/GatewayServiceImplTest.java +++ b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/GatewayServiceImplTest.java @@ -8,6 +8,7 @@ /** Unit tests for {@link GatewayServiceImpl} */ public class GatewayServiceImplTest { + private static Config appConfig; @BeforeAll diff --git a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjusterTest.java b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjusterTest.java new file mode 100644 index 00000000..c2cf4f21 --- /dev/null +++ b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/NoOpClockskewAdjusterTest.java @@ -0,0 +1,45 @@ +package org.hypertrace.gateway.service.span; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.hypertrace.gateway.service.v1.common.Value; +import org.hypertrace.gateway.service.v1.common.ValueType; +import org.hypertrace.gateway.service.v1.span.SpanEvent; +import org.hypertrace.gateway.service.v1.span.SpanEvent.Builder; +import org.junit.jupiter.api.Test; + +class NoOpClockskewAdjusterTest { + + private static final ClockskewAdjuster CLOCKSKEW_ADJUSTER = new NoOpClockskewAdjuster(); + + private static final String ATTRIBUTE_KEY_ID = "id"; + + @Test + void returnsUnchangedSpanList() { + var originalSpans = + List.of( + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, UUID.randomUUID().toString())), + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, UUID.randomUUID().toString()))); + assertEquals(originalSpans, CLOCKSKEW_ADJUSTER.transform(originalSpans)); + } + + @Test + void throwsNPEWhenNullSpanListArePassed() { + assertThrows(NullPointerException.class, () -> CLOCKSKEW_ADJUSTER.transform(null)); + } + + private static SpanEvent.Builder createSpanBuilder(Map attributes) { + Builder builder = SpanEvent.newBuilder(); + attributes.forEach( + (attributeName, attributeValue) -> { + builder.putAttributes( + attributeName, + Value.newBuilder().setValueType(ValueType.STRING).setString(attributeValue).build()); + }); + return builder; + } +} diff --git a/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/SpanTransformationPipelineTest.java b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/SpanTransformationPipelineTest.java new file mode 100644 index 00000000..77665960 --- /dev/null +++ b/gateway-service-impl/src/test/java/org/hypertrace/gateway/service/span/SpanTransformationPipelineTest.java @@ -0,0 +1,79 @@ +package org.hypertrace.gateway.service.span; + +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import org.hypertrace.gateway.service.v1.common.Value; +import org.hypertrace.gateway.service.v1.common.ValueType; +import org.hypertrace.gateway.service.v1.span.SpanEvent; +import org.hypertrace.gateway.service.v1.span.SpanEvent.Builder; +import org.junit.jupiter.api.Test; + +class SpanTransformationPipelineTest { + + private static final String ATTRIBUTE_KEY_ID = "id"; + + private static final List ORIGINAL_SPANS = + List.of( + createSpan(Map.of(ATTRIBUTE_KEY_ID, "firstSpan")), + createSpan(Map.of(ATTRIBUTE_KEY_ID, "secondSpan"))); + + @Test + void returnsUnchangedSpansIfNoTransformationsRegistered() { + assertEquals( + ORIGINAL_SPANS, SpanTransformationPipeline.getNewPipeline().execute(ORIGINAL_SPANS)); + } + + @Test + void appliesMultipleTransformationsInOrderOfRegistry() { + // first transformation stage + var firstTransformation = mock(SpanTransformationStage.class); + var firstTransformationResult = + List.of( + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "firstTransformationFirstSpan")), + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "firstTransformationSecondSpan"))); + + when(firstTransformation.transform(anyList())).thenReturn(firstTransformationResult); + + // second transformation stage + var secondTransformation = mock(SpanTransformationStage.class); + var secondTransformationResult = + List.of( + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "secondTransformationFirstSpan")), + createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, "secondTransformationSecondSpan"))); + + when(secondTransformation.transform(eq(firstTransformationResult))) + .thenReturn(secondTransformationResult); + + List pipelineTransformationExpectedResult = + secondTransformationResult.stream().map(Builder::build).collect(toList()); + + assertEquals( + pipelineTransformationExpectedResult, + SpanTransformationPipeline.getNewPipeline() + .addProcessingStage(firstTransformation) + .addProcessingStage(secondTransformation) + .execute(ORIGINAL_SPANS)); + } + + private static SpanEvent.Builder createSpanBuilder(Map attributes) { + Builder builder = SpanEvent.newBuilder(); + attributes.forEach( + (attributeName, attributeValue) -> { + builder.putAttributes( + attributeName, + Value.newBuilder().setValueType(ValueType.STRING).setString(attributeValue).build()); + }); + return builder; + } + + private static SpanEvent createSpan(Map attributes) { + return createSpanBuilder(attributes).build(); + } +}