diff --git a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java index 3f6eac268b..0b294a5d6c 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java @@ -14,7 +14,6 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.context.request.NativeWebRequest; import org.zalando.nakadi.domain.EventPublishResult; import org.zalando.nakadi.domain.EventPublishingStatus; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; @@ -68,23 +67,18 @@ public EventPublishingController(final EventPublisher publisher, @RequestMapping(value = "/event-types/{eventTypeName}/events", method = POST) public ResponseEntity postEvent(@PathVariable final String eventTypeName, @RequestBody final String eventsAsString, - final NativeWebRequest request, + final HttpServletRequest request, final Client client) throws AccessDeniedException, BlockedException, ServiceTemporarilyUnavailableException, InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException { LOG.trace("Received event {} for event type {}", eventsAsString, eventTypeName); - final HttpServletRequest httpServletRequest = request.getNativeRequest(HttpServletRequest.class); - final Span publishingSpan = TracingService.extractSpan(httpServletRequest, "publish_events") - .setTag("event_type", eventTypeName) - .setTag("slo_bucket", getSLOBucket(httpServletRequest.getContentLength())) - .setTag(Tags.SPAN_KIND_PRODUCER, client.getClientId()); final EventTypeMetrics eventTypeMetrics = eventTypeMetricRegistry.metricsFor(eventTypeName); if (blacklistService.isProductionBlocked(eventTypeName, client.getClientId())) { throw new BlockedException("Application or event type is blocked"); } try { final ResponseEntity response = postEventInternal( - eventTypeName, eventsAsString, eventTypeMetrics, client, publishingSpan); + eventTypeName, eventsAsString, eventTypeMetrics, client, request); eventTypeMetrics.incrementResponseCount(response.getStatusCode().value()); return response; } catch (final NoSuchEventTypeException exception) { @@ -96,27 +90,24 @@ public ResponseEntity postEvent(@PathVariable final String eventTypeName, } } - private String getSLOBucket(final long contentLength) { - if (contentLength > 50000 || contentLength == 0) { - return ">50K"; - } else if (contentLength < 5000) { - return "<5K"; - } - return "5K-50K"; - } - private ResponseEntity postEventInternal(final String eventTypeName, final String eventsAsString, final EventTypeMetrics eventTypeMetrics, - final Client client, final Span parentSpan) + final Client client, + final HttpServletRequest request) throws AccessDeniedException, ServiceTemporarilyUnavailableException, InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException { final long startingNanos = System.nanoTime(); try { - final EventPublishResult result = publisher.publish(eventsAsString, eventTypeName, parentSpan); + final int totalSizeBytes = eventsAsString.getBytes(Charsets.UTF_8).length; + final Span publishingSpan = TracingService.extractSpan(request, "publish_events") + .setTag("event_type", eventTypeName) + .setTag("slo_bucket", TracingService.getSLOBucket(totalSizeBytes)) + .setTag(Tags.SPAN_KIND_PRODUCER, client.getClientId()); + + final EventPublishResult result = publisher.publish(eventsAsString, eventTypeName, publishingSpan); final int eventCount = result.getResponses().size(); - final int totalSizeBytes = eventsAsString.getBytes(Charsets.UTF_8).length; reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount); reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client); diff --git a/src/main/java/org/zalando/nakadi/service/TracingService.java b/src/main/java/org/zalando/nakadi/service/TracingService.java index dadf357ba1..5986436204 100644 --- a/src/main/java/org/zalando/nakadi/service/TracingService.java +++ b/src/main/java/org/zalando/nakadi/service/TracingService.java @@ -5,16 +5,18 @@ import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.util.GlobalTracer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; import javax.servlet.http.HttpServletRequest; import java.util.concurrent.TimeUnit; -@Component public class TracingService { - private static final Logger LOG = LoggerFactory.getLogger(TracingService.class); + + private static final String BUCKET_NAME_5_50_KB = "5K-50K"; + private static final String BUCKET_NAME_5_KB = "<5K"; + private static final String BUCKET_NAME_MORE_THAN_50_KB = ">50K"; + + private static final Long BUCKET_5_KB = 5000L; + private static final Long BUCKET_MORE_THAN_50_KB = 50000L; public static void logErrorInSpan(final Span span, final String error) { if (error != null) { @@ -27,7 +29,7 @@ public static void logStreamCloseReason(final Span span, final String error) { span.log(ImmutableMap.of("stream.close.reason", error)); } } - + public static Span extractSpan(final HttpServletRequest request, final String operation) { final Span span = (Span) request.getAttribute("span"); if (span != null) { @@ -74,4 +76,13 @@ public static Span getNewSpanWithParent(final Span span, final String operationN .asChildOf(span).start(); } + public static String getSLOBucket(final long batchSize) { + if (batchSize > BUCKET_MORE_THAN_50_KB) { + return BUCKET_NAME_MORE_THAN_50_KB; + } else if (batchSize < BUCKET_5_KB) { + return BUCKET_NAME_5_KB; + } + return BUCKET_NAME_5_50_KB; + } + }