Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1113 from zalando/fix-slo-lightstep
Browse files Browse the repository at this point in the history
take batch size for lightstep slo
  • Loading branch information
adyach authored Oct 31, 2019
2 parents 5804eb7 + 522fbae commit 81e2128
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.NativeWebRequest;
import org.zalando.nakadi.domain.EventPublishResult;
import org.zalando.nakadi.domain.EventPublishingStatus;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
Expand Down Expand Up @@ -68,23 +67,18 @@ public EventPublishingController(final EventPublisher publisher,
@RequestMapping(value = "/event-types/{eventTypeName}/events", method = POST)
public ResponseEntity postEvent(@PathVariable final String eventTypeName,
@RequestBody final String eventsAsString,
final NativeWebRequest request,
final HttpServletRequest request,
final Client client)
throws AccessDeniedException, BlockedException, ServiceTemporarilyUnavailableException,
InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException {
LOG.trace("Received event {} for event type {}", eventsAsString, eventTypeName);
final HttpServletRequest httpServletRequest = request.getNativeRequest(HttpServletRequest.class);
final Span publishingSpan = TracingService.extractSpan(httpServletRequest, "publish_events")
.setTag("event_type", eventTypeName)
.setTag("slo_bucket", getSLOBucket(httpServletRequest.getContentLength()))
.setTag(Tags.SPAN_KIND_PRODUCER, client.getClientId());
final EventTypeMetrics eventTypeMetrics = eventTypeMetricRegistry.metricsFor(eventTypeName);
if (blacklistService.isProductionBlocked(eventTypeName, client.getClientId())) {
throw new BlockedException("Application or event type is blocked");
}
try {
final ResponseEntity response = postEventInternal(
eventTypeName, eventsAsString, eventTypeMetrics, client, publishingSpan);
eventTypeName, eventsAsString, eventTypeMetrics, client, request);
eventTypeMetrics.incrementResponseCount(response.getStatusCode().value());
return response;
} catch (final NoSuchEventTypeException exception) {
Expand All @@ -96,27 +90,24 @@ public ResponseEntity postEvent(@PathVariable final String eventTypeName,
}
}

private String getSLOBucket(final long contentLength) {
if (contentLength > 50000 || contentLength == 0) {
return ">50K";
} else if (contentLength < 5000) {
return "<5K";
}
return "5K-50K";
}

private ResponseEntity postEventInternal(final String eventTypeName,
final String eventsAsString,
final EventTypeMetrics eventTypeMetrics,
final Client client, final Span parentSpan)
final Client client,
final HttpServletRequest request)
throws AccessDeniedException, ServiceTemporarilyUnavailableException, InternalNakadiException,
EventTypeTimeoutException, NoSuchEventTypeException {
final long startingNanos = System.nanoTime();
try {
final EventPublishResult result = publisher.publish(eventsAsString, eventTypeName, parentSpan);
final int totalSizeBytes = eventsAsString.getBytes(Charsets.UTF_8).length;
final Span publishingSpan = TracingService.extractSpan(request, "publish_events")
.setTag("event_type", eventTypeName)
.setTag("slo_bucket", TracingService.getSLOBucket(totalSizeBytes))
.setTag(Tags.SPAN_KIND_PRODUCER, client.getClientId());

final EventPublishResult result = publisher.publish(eventsAsString, eventTypeName, publishingSpan);

final int eventCount = result.getResponses().size();
final int totalSizeBytes = eventsAsString.getBytes(Charsets.UTF_8).length;

reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount);
reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client);
Expand Down
23 changes: 17 additions & 6 deletions src/main/java/org/zalando/nakadi/service/TracingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.util.GlobalTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.TimeUnit;

@Component
public class TracingService {
private static final Logger LOG = LoggerFactory.getLogger(TracingService.class);

private static final String BUCKET_NAME_5_50_KB = "5K-50K";
private static final String BUCKET_NAME_5_KB = "<5K";
private static final String BUCKET_NAME_MORE_THAN_50_KB = ">50K";

private static final Long BUCKET_5_KB = 5000L;
private static final Long BUCKET_MORE_THAN_50_KB = 50000L;

public static void logErrorInSpan(final Span span, final String error) {
if (error != null) {
Expand All @@ -27,7 +29,7 @@ public static void logStreamCloseReason(final Span span, final String error) {
span.log(ImmutableMap.of("stream.close.reason", error));
}
}

public static Span extractSpan(final HttpServletRequest request, final String operation) {
final Span span = (Span) request.getAttribute("span");
if (span != null) {
Expand Down Expand Up @@ -74,4 +76,13 @@ public static Span getNewSpanWithParent(final Span span, final String operationN
.asChildOf(span).start();
}

public static String getSLOBucket(final long batchSize) {
if (batchSize > BUCKET_MORE_THAN_50_KB) {
return BUCKET_NAME_MORE_THAN_50_KB;
} else if (batchSize < BUCKET_5_KB) {
return BUCKET_NAME_5_KB;
}
return BUCKET_NAME_5_50_KB;
}

}

0 comments on commit 81e2128

Please sign in to comment.