Skip to content
This repository has been archived by the owner on Jun 26, 2024. It is now read-only.

Handling Clock-Skews in Distributed Traces #90

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions gateway-service-impl/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ dependencies {
// Needed by clusters snapshots
implementation("com.fasterxml.jackson.core:jackson-annotations:2.11.1")
implementation("com.fasterxml.jackson.core:jackson-databind:2.11.1")
annotationProcessor("org.projectlombok:lombok:1.18.18")
compileOnly("org.projectlombok:lombok:1.18.18")

testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
testImplementation("org.mockito:mockito-core:3.9.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.hypertrace.gateway.service.entity.config.LogConfig;
import org.hypertrace.gateway.service.explore.ExploreService;
import org.hypertrace.gateway.service.logevent.LogEventsService;
import org.hypertrace.gateway.service.span.ClockskewAdjuster;
import org.hypertrace.gateway.service.span.SpanService;
import org.hypertrace.gateway.service.trace.TracesService;
import org.hypertrace.gateway.service.v1.baseline.BaselineEntitiesRequest;
Expand Down Expand Up @@ -91,7 +92,11 @@ public GatewayServiceImpl(Config appConfig) {
new TracesService(
queryServiceClient, qsRequestTimeout, attributeMetadataProvider, scopeFilterConfigs);
this.spanService =
new SpanService(queryServiceClient, qsRequestTimeout, attributeMetadataProvider);
new SpanService(
queryServiceClient,
qsRequestTimeout,
attributeMetadataProvider,
ClockskewAdjuster.getAdjuster(appConfig));
this.entityService =
new EntityService(
queryServiceClient,
Expand Down Expand Up @@ -174,7 +179,9 @@ public void getSpans(
org.hypertrace.core.grpcutils.context.RequestContext.CURRENT
.get()
.getRequestHeaders());

SpansResponse response = spanService.getSpansByFilter(context, request);

responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.hypertrace.gateway.service.span;

import com.typesafe.config.Config;
import java.time.Duration;
import java.time.Instant;
import lombok.Data;
import lombok.experimental.Accessors;
import org.hypertrace.gateway.service.v1.span.SpanEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ClockskewAdjuster implements SpanTransformationStage {

private static final Logger LOG = LoggerFactory.getLogger(ClockskewAdjuster.class);

private static final String CONFIG_PATH = "clockskew.adjuster";
private static final String FALLBACK_TYPE = "noop";

/**
* Returns a concrete implementation of {@link ClockskewAdjuster} based on the supplied type. If
* an incorrect configuration is supplied, then it fallbacks to {@link NoOpClockskewAdjuster}.
* This method never throws an exception. Any exception propagated up from downstream is swallowed
* and the fallback adjuster is returned
*
* @param appConfig the app configuration
* @return a concrete implementation of {@link ClockskewAdjuster}
*/
public static ClockskewAdjuster getAdjuster(Config appConfig) {
String type;
try {
type = appConfig.getString(CONFIG_PATH);
return ClockskewAdjusters.getAdjuster(type);
} catch (Exception e) {
LOG.warn(
"Some exception occurred while trying to get the clockskew adjuster, falling back to no-op adjuster");
return ClockskewAdjusters.getAdjuster(FALLBACK_TYPE);
}
}

@Data
@Accessors(chain = true, fluent = true)
static class Span {
private String id;
private String parentSpanId;
private Instant startTime;
private Instant endTime;
private Duration duration;
// the mutable builder object
private SpanEvent.Builder spanBuilder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.hypertrace.gateway.service.span;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Factory to return concrete instances of {@link ClockskewAdjuster} based on the supplied type */
class ClockskewAdjusters {

private static final Map<String, Supplier<? extends ClockskewAdjuster>> REGISTRY =
new HashMap<>();

private static final Logger LOG = LoggerFactory.getLogger(ClockskewAdjusters.class);

private static final String JAEGAR_BASED = "jaegar";
private static final String NOOP_BASED = "noop";

static {
REGISTRY.put(JAEGAR_BASED, JaegarBasedClockskewAdjuster::new);
REGISTRY.put(NOOP_BASED, NoOpClockskewAdjuster::new);
}

static ClockskewAdjuster getAdjuster(String type) {
Supplier<? extends ClockskewAdjuster> supplier = REGISTRY.get(type);
if (null == supplier) {
LOG.warn("No clockskew adjuster for the supplied config, falling back to no-op adjuster");
return REGISTRY.get(NOOP_BASED).get();
}
return supplier.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.hypertrace.gateway.service.span;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.NonNull;
import org.hypertrace.gateway.service.v1.common.Value;
import org.hypertrace.gateway.service.v1.common.ValueType;
import org.hypertrace.gateway.service.v1.span.SpanEvent;

class JaegarBasedClockskewAdjuster extends ClockskewAdjuster {

@Override
public List<SpanEvent.Builder> transform(@NonNull List<? extends SpanEvent.Builder> spans) {
Map<String, String> parentChildMap = new HashMap<>();
Map<String, Span> idToSpanMap =
spans.stream()
.map(
spanBuilder -> {
Map<String, Value> attributesMap = spanBuilder.getAttributesMap();
String spanId = attributesMap.get("EVENT.id").getString(),
parentSpanId = attributesMap.get("EVENT.parentSpanId").getString();
Instant
startTime =
Instant.ofEpochMilli(attributesMap.get("EVENT.startTime").getLong()),
endTime = Instant.ofEpochMilli(attributesMap.get("EVENT.endTime").getLong());
Duration duration = Duration.between(startTime, endTime);
return new Span()
.id(spanId)
.parentSpanId(parentSpanId)
.startTime(startTime)
.endTime(endTime)
.duration(duration)
.spanBuilder(spanBuilder);
})
.peek(
span -> {
if (null != span.parentSpanId()) {
parentChildMap.putIfAbsent(span.parentSpanId(), span.id());
}
})
.collect(toMap(Span::id, span -> span));
// Start from root spans, and adjust each parent-child pair
return idToSpanMap.entrySet().stream()
.filter(span -> null == span.getValue().parentSpanId())
.peek(entry -> adjustSpan(entry.getValue(), idToSpanMap, parentChildMap))
.map(
entry -> {
Span updatedSpan = entry.getValue();
entry
.getValue()
.spanBuilder()
.putAttributes(
"EVENT.startTime",
Value.newBuilder()
.setValueType(ValueType.LONG)
.setLong(updatedSpan.startTime().toEpochMilli())
.build());
return updatedSpan.spanBuilder();
})
.collect(toList());
}

private void adjustSpan(
Span span, Map<String, Span> idToSpanMap, Map<String, String> parentToChildMap) {
if (null != span) {
Span childSpan = idToSpanMap.get(parentToChildMap.get(span.id()));
Duration adjustment = getAdjustmentForChildSpan(childSpan, span);
adjustTimestamp(childSpan, adjustment);
adjustSpan(childSpan, idToSpanMap, parentToChildMap);
}
}

private void adjustTimestamp(Span childSpan, Duration adjustment) {
childSpan.startTime().plus(adjustment);
}

private Duration getAdjustmentForChildSpan(Span childSpan, Span parentSpan) {
// if child span is greater than parent span
if (childSpan.duration().compareTo(parentSpan.duration()) > 0) {
// in this case, we can only ensure that it does not start before its parent
if (childSpan.startTime().isBefore(parentSpan.startTime())) {
return Duration.between(parentSpan.startTime(), childSpan.startTime());
}
return Duration.ofMillis(0);
}
// if child already fits in its parent, do not adjust
if (!childSpan.startTime().isBefore(parentSpan.startTime())
&& !childSpan.endTime().isAfter(parentSpan.endTime())) {
return Duration.ofMillis(0);
}
var latency = (parentSpan.duration().minus(childSpan.duration()).toMillis()) >> 1;
return Duration.between(childSpan.startTime(), parentSpan.startTime().plusMillis(latency));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.hypertrace.gateway.service.span;

import java.util.ArrayList;
import java.util.List;
import lombok.NonNull;
import org.hypertrace.gateway.service.v1.span.SpanEvent;

public class NoOpClockskewAdjuster extends ClockskewAdjuster {
public List<SpanEvent.Builder> transform(@NonNull List<? extends SpanEvent.Builder> spans) {
return new ArrayList<>(spans);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,20 @@ public class SpanService {
private final QueryServiceClient queryServiceClient;
private final int requestTimeout;
private final AttributeMetadataProvider attributeMetadataProvider;
private final SpanTransformationPipeline spanTransformationPipeline;

private Timer queryExecutionTimer;

public SpanService(
QueryServiceClient queryServiceClient,
int requestTimeout,
AttributeMetadataProvider attributeMetadataProvider) {
AttributeMetadataProvider attributeMetadataProvider,
ClockskewAdjuster clockskewAdjuster) {
this.queryServiceClient = queryServiceClient;
this.requestTimeout = requestTimeout;
this.attributeMetadataProvider = attributeMetadataProvider;
spanTransformationPipeline =
SpanTransformationPipeline.getNewPipeline().addProcessingStage(clockskewAdjuster);
initMetrics();
}

Expand Down Expand Up @@ -89,7 +93,8 @@ Collection<SpanEvent> filterSpans(
RequestContext context,
SpansRequest request,
Map<String, AttributeMetadata> attributeMetadataMap) {
return filterSpanEvents(context, request, attributeMetadataMap);
return spanTransformationPipeline.execute(
filterSpanEvents(context, request, attributeMetadataMap));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.hypertrace.gateway.service.span;

import static java.util.stream.Collectors.toList;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.hypertrace.gateway.service.v1.span.SpanEvent;
import org.hypertrace.gateway.service.v1.span.SpanEvent.Builder;

/**
* A pipeline of handlers that transform a list of spans through it. This class is not thread-safe.
*/
class SpanTransformationPipeline {

private final Function<List<? extends SpanEvent.Builder>, List<SpanEvent.Builder>> pipeline;

private SpanTransformationPipeline() {
this.pipeline = ArrayList::new;
}

public static SpanTransformationPipeline getNewPipeline() {
return new SpanTransformationPipeline();
}

private SpanTransformationPipeline(
Function<List<? extends SpanEvent.Builder>, List<SpanEvent.Builder>> pipeline) {
this.pipeline = pipeline;
}

public SpanTransformationPipeline addProcessingStage(SpanTransformationStage processingStage) {
Function<List<? extends SpanEvent.Builder>, List<SpanEvent.Builder>> updatedPipeline =
pipeline.andThen(processingStage::transform);
return new SpanTransformationPipeline(updatedPipeline);
}

/**
* Processes the passed list of spans through the pipeline
*
* @param spans list of spans to process
* @return processed spans
*/
public List<SpanEvent> execute(List<SpanEvent> spans) {
List<Builder> mutableSpans =
spans.stream()
.map(span -> SpanEvent.newBuilder().putAllAttributes(span.getAttributesMap()))
.collect(toList());
return pipeline.apply(mutableSpans).stream().map(SpanEvent.Builder::build).collect(toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.hypertrace.gateway.service.span;

import java.util.List;
import lombok.NonNull;
import org.hypertrace.gateway.service.v1.span.SpanEvent;

/** Represents a transformation stage in the {@link SpanTransformationPipeline} */
interface SpanTransformationStage {
List<SpanEvent.Builder> transform(@NonNull List<? extends SpanEvent.Builder> spans);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

/** Unit tests for {@link GatewayServiceImpl} */
public class GatewayServiceImplTest {

private static Config appConfig;

@BeforeAll
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.hypertrace.gateway.service.span;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.hypertrace.gateway.service.v1.common.Value;
import org.hypertrace.gateway.service.v1.common.ValueType;
import org.hypertrace.gateway.service.v1.span.SpanEvent;
import org.hypertrace.gateway.service.v1.span.SpanEvent.Builder;
import org.junit.jupiter.api.Test;

class NoOpClockskewAdjusterTest {

private static final ClockskewAdjuster CLOCKSKEW_ADJUSTER = new NoOpClockskewAdjuster();

private static final String ATTRIBUTE_KEY_ID = "id";

@Test
void returnsUnchangedSpanList() {
var originalSpans =
List.of(
createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, UUID.randomUUID().toString())),
createSpanBuilder(Map.of(ATTRIBUTE_KEY_ID, UUID.randomUUID().toString())));
assertEquals(originalSpans, CLOCKSKEW_ADJUSTER.transform(originalSpans));
}

@Test
void throwsNPEWhenNullSpanListArePassed() {
assertThrows(NullPointerException.class, () -> CLOCKSKEW_ADJUSTER.transform(null));
}

private static SpanEvent.Builder createSpanBuilder(Map<String, String> attributes) {
Builder builder = SpanEvent.newBuilder();
attributes.forEach(
(attributeName, attributeValue) -> {
builder.putAttributes(
attributeName,
Value.newBuilder().setValueType(ValueType.STRING).setString(attributeValue).build());
});
return builder;
}
}
Loading