diff --git a/CHANGELOG.md b/CHANGELOG.md index 4269b6b544..008cf6e588 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed - Allow to update event types with recursion in schemas +### Changed +- Distinguish between employee token and service token in KPI events + ## [2.5.2] - 2018-01-08 ### Fixed diff --git a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java index f20c527a1a..562ea70915 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java @@ -132,6 +132,7 @@ private void reportSLOs(final long startingNanos, final int totalSizeBytes, fina .put("event_type", eventTypeName) .put("app", applicationName) .put("app_hashed", nakadiKpiPublisher.hash(applicationName)) + .put("token_realm", client.getRealm()) .put("number_of_events", eventCount) .put("ms_spent", msSpent) .put("batch_size", totalSizeBytes)); diff --git a/src/main/java/org/zalando/nakadi/controller/EventStreamController.java b/src/main/java/org/zalando/nakadi/controller/EventStreamController.java index 8adbc3bd4a..f7f7d62f9d 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventStreamController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventStreamController.java @@ -237,7 +237,7 @@ public StreamingResponseBody streamEvents( .withStreamTimeout(streamTimeout) .withStreamKeepAliveLimit(streamKeepAliveLimit) .withEtName(eventTypeName) - .withConsumingAppId(client.getClientId()) + .withConsumingClient(client) .withCursors(getStreamingStart(eventType, cursorsStr)) .withMaxMemoryUsageBytes(maxMemoryUsageBytes) .build(); diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java index 483a450d37..8bbb32a920 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java @@ -167,7 +167,7 @@ public StreamingResponseBody streamEvents( final StreamParameters streamParameters = StreamParameters.of(batchLimit, streamLimit, batchTimeout, streamTimeout, streamKeepAliveLimit, maxUncommittedSize, - nakadiSettings.getDefaultCommitTimeoutSeconds(), client.getClientId()); + nakadiSettings.getDefaultCommitTimeoutSeconds(), client); final Subscription subscription = subscriptionDbRepository.getSubscription(subscriptionId); streamer = subscriptionStreamerFactory.build(subscription, streamParameters, output, diff --git a/src/main/java/org/zalando/nakadi/security/Client.java b/src/main/java/org/zalando/nakadi/security/Client.java index 2ac13c4302..cefad014f9 100644 --- a/src/main/java/org/zalando/nakadi/security/Client.java +++ b/src/main/java/org/zalando/nakadi/security/Client.java @@ -11,4 +11,8 @@ public Client(final String clientId) { public String getClientId() { return clientId; } + + public String getRealm() { + return ""; + } } diff --git a/src/main/java/org/zalando/nakadi/security/ClientResolver.java b/src/main/java/org/zalando/nakadi/security/ClientResolver.java index 7b0d229f97..c3d2f980e7 100644 --- a/src/main/java/org/zalando/nakadi/security/ClientResolver.java +++ b/src/main/java/org/zalando/nakadi/security/ClientResolver.java @@ -3,6 +3,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.MethodParameter; import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.oauth2.common.exceptions.UnauthorizedUserException; import org.springframework.security.oauth2.provider.OAuth2Authentication; @@ -15,9 +16,8 @@ import org.zalando.nakadi.util.FeatureToggleService; import java.security.Principal; -import java.util.Collections; +import java.util.Map; import java.util.Optional; -import java.util.Set; import static org.zalando.nakadi.config.SecuritySettings.AuthMode.OFF; @@ -26,12 +26,10 @@ public class ClientResolver implements HandlerMethodArgumentResolver { private static final String FULL_ACCESS_CLIENT_ID = "adminClientId"; private final SecuritySettings settings; - private final FeatureToggleService featureToggleService; @Autowired public ClientResolver(final SecuritySettings settings, final FeatureToggleService featureToggleService) { this.settings = settings; - this.featureToggleService = featureToggleService; } @Override @@ -50,15 +48,23 @@ public Client resolveArgument(final MethodParameter parameter, return new FullAccessClient(clientId.orElse(FULL_ACCESS_CLIENT_ID)); } - return clientId.map(client -> new NakadiClient(client, getScopes())) + return clientId.map(client -> new NakadiClient(client, getRealm())) .orElseThrow(() -> new UnauthorizedUserException("Client unauthorized")); } - private Set getScopes() { - final Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); - if (authentication instanceof OAuth2Authentication) { - return ((OAuth2Authentication) authentication).getOAuth2Request().getScope(); + public String getRealm() { + try { + return Optional.of(SecurityContextHolder.getContext()) + .map(SecurityContext::getAuthentication) + .map(authentication -> (OAuth2Authentication) authentication) + .map(OAuth2Authentication::getUserAuthentication) + .map(Authentication::getDetails) + .map(details -> (Map) details) + .map(details -> details.get("realm")) + .map(realm -> (String) realm) + .orElse(""); + } catch (final ClassCastException e) { + return ""; } - return Collections.emptySet(); } } diff --git a/src/main/java/org/zalando/nakadi/security/NakadiClient.java b/src/main/java/org/zalando/nakadi/security/NakadiClient.java index 553982aa96..2b0bc82159 100644 --- a/src/main/java/org/zalando/nakadi/security/NakadiClient.java +++ b/src/main/java/org/zalando/nakadi/security/NakadiClient.java @@ -1,13 +1,16 @@ package org.zalando.nakadi.security; -import java.util.Set; - public class NakadiClient extends Client { - private final Set scopes; + private final String realm; - public NakadiClient(final String clientId, final Set scopes) { + public NakadiClient(final String clientId, final String realm) { super(clientId); - this.scopes = scopes; + this.realm = realm; + } + + @Override + public String getRealm() { + return realm; } } diff --git a/src/main/java/org/zalando/nakadi/service/EventStream.java b/src/main/java/org/zalando/nakadi/service/EventStream.java index 899f154332..b3edc30164 100644 --- a/src/main/java/org/zalando/nakadi/service/EventStream.java +++ b/src/main/java/org/zalando/nakadi/service/EventStream.java @@ -10,6 +10,7 @@ import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.metrics.StreamKpiData; import org.zalando.nakadi.repository.EventConsumer; +import org.zalando.nakadi.security.Client; import java.io.IOException; import java.io.OutputStream; @@ -80,7 +81,8 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che long bytesInMemory = 0; while (connectionReady.get() && - !blacklistService.isConsumptionBlocked(config.getEtName(), config.getConsumingAppId())) { + !blacklistService.isConsumptionBlocked(config.getEtName(), config.getConsumingClient() + .getClientId())) { checkAuthorization.run(); @@ -146,7 +148,7 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che final long count = kpiData.getAndResetNumberOfEventsSent(); final long bytes = kpiData.getAndResetBytesSent(); - publishKpi(config.getConsumingAppId(), count, bytes); + publishKpi(config.getConsumingClient(), count, bytes); lastKpiEventSent = System.currentTimeMillis(); } @@ -183,10 +185,10 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che LOG.info("Error occurred when streaming events (possibly server closed connection)", e); } catch (final KafkaException e) { LOG.error("Error occurred when polling events from kafka; consumer: {}, event-type: {}", - config.getConsumingAppId(), config.getEtName(), e); + config.getConsumingClient().getClientId(), config.getEtName(), e); } finally { publishKpi( - config.getConsumingAppId(), + config.getConsumingClient(), kpiData.getAndResetNumberOfEventsSent(), kpiData.getAndResetBytesSent()); } @@ -196,19 +198,20 @@ private boolean isMemoryLimitReached(final long memoryUsed) { return memoryUsed > config.getMaxMemoryUsageBytes(); } - private void publishKpi(final String appName, final long count, final long bytes) { - final String appNameHashed = kpiPublisher.hash(appName); + private void publishKpi(final Client client, final long count, final long bytes) { + final String appNameHashed = kpiPublisher.hash(client.getClientId()); LOG.info("[SLO] [streamed-data] api={} eventTypeName={} app={} appHashed={} numberOfEvents={} bytesStreamed={}", - "lola", config.getEtName(), appName, appNameHashed, count, bytes); + "lola", config.getEtName(), client.getClientId(), appNameHashed, count, bytes); kpiPublisher.publish( kpiDataStreamedEventType, () -> new JSONObject() .put("api", "lola") .put("event_type", config.getEtName()) - .put("app", appName) - .put("app_hashed", kpiPublisher.hash(appName)) + .put("app", client.getClientId()) + .put("app_hashed", appNameHashed) + .put("token_realm", client.getRealm()) .put("number_of_events", count) .put("bytes_streamed", bytes)); } diff --git a/src/main/java/org/zalando/nakadi/service/EventStreamConfig.java b/src/main/java/org/zalando/nakadi/service/EventStreamConfig.java index 3598579ccb..cdfc900449 100644 --- a/src/main/java/org/zalando/nakadi/service/EventStreamConfig.java +++ b/src/main/java/org/zalando/nakadi/service/EventStreamConfig.java @@ -2,6 +2,7 @@ import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.exceptions.UnprocessableEntityException; +import org.zalando.nakadi.security.Client; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; @@ -27,12 +28,12 @@ public class EventStreamConfig { private final int streamTimeout; private final int streamKeepAliveLimit; private final String etName; - private final String consumingAppId; + private final Client consumingClient; private final long maxMemoryUsageBytes; private EventStreamConfig(final List cursors, final int batchLimit, final int streamLimit, final int batchTimeout, final int streamTimeout, - final int streamKeepAliveLimit, final String etName, final String consumingAppId, + final int streamKeepAliveLimit, final String etName, final Client consumingClient, final long maxMemoryUsageBytes) { this.cursors = cursors; this.batchLimit = batchLimit; @@ -41,7 +42,7 @@ private EventStreamConfig(final List cursors, final int batchLimit this.streamTimeout = streamTimeout; this.streamKeepAliveLimit = streamKeepAliveLimit; this.etName = etName; - this.consumingAppId = consumingAppId; + this.consumingClient= consumingClient; this.maxMemoryUsageBytes = maxMemoryUsageBytes; } @@ -73,8 +74,8 @@ public String getEtName() { return etName; } - public String getConsumingAppId() { - return consumingAppId; + public Client getConsumingClient() { + return consumingClient; } public long getMaxMemoryUsageBytes() { @@ -133,7 +134,7 @@ public static class Builder { private int streamKeepAliveLimit = STREAM_KEEP_ALIVE_LIMIT_DEFAULT; private long maxMemoryUsageBytes = DEF_MAX_MEMORY_USAGE_BYTES; private String etName; - private String consumingAppId; + private Client consumingClient; public Builder withCursors(final List cursors) { this.cursors = cursors; @@ -191,8 +192,8 @@ public Builder withEtName(final String etName) { return this; } - public Builder withConsumingAppId(final String consumingAppId) { - this.consumingAppId = consumingAppId; + public Builder withConsumingClient(final Client consumingClient) { + this.consumingClient = consumingClient; return this; } @@ -206,7 +207,7 @@ public EventStreamConfig build() throws UnprocessableEntityException { throw new UnprocessableEntityException("batch_limit can't be lower than 1"); } return new EventStreamConfig(cursors, batchLimit, streamLimit, batchTimeout, streamTimeout, - streamKeepAliveLimit, etName, consumingAppId, maxMemoryUsageBytes); + streamKeepAliveLimit, etName, consumingClient, maxMemoryUsageBytes); } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java index d07c4269d7..572127a6dd 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java @@ -1,6 +1,7 @@ package org.zalando.nakadi.service.subscription; import org.zalando.nakadi.exceptions.UnprocessableEntityException; +import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.EventStreamConfig; import javax.annotation.Nullable; @@ -38,12 +39,12 @@ public class StreamParameters { // Applies to stream. Timeout without commits. public final long commitTimeoutMillis; - private final String consumingAppId; + private final Client consumingClient; private StreamParameters( final int batchLimitEvents, @Nullable final Long streamLimitEvents, final long batchTimeoutMillis, @Nullable final Long streamTimeoutSeconds, @Nullable final Integer batchKeepAliveIterations, - final int maxUncommittedMessages, final long commitTimeoutMillis, final String consumingAppId) + final int maxUncommittedMessages, final long commitTimeoutMillis, final Client consumingClient) throws UnprocessableEntityException { if (batchLimitEvents > 0) { this.batchLimitEvents = batchLimitEvents; @@ -59,7 +60,7 @@ private StreamParameters( this.batchKeepAliveIterations = Optional.ofNullable(batchKeepAliveIterations); this.maxUncommittedMessages = maxUncommittedMessages; this.commitTimeoutMillis = commitTimeoutMillis; - this.consumingAppId = consumingAppId; + this.consumingClient = consumingClient; } public long getMessagesAllowedToSend(final long limit, final long sentSoFar) { @@ -74,8 +75,8 @@ public boolean isKeepAliveLimitReached(final IntStream keepAlive) { return batchKeepAliveIterations.map(it -> keepAlive.allMatch(v -> v >= it)).orElse(false); } - public String getConsumingAppId() { - return consumingAppId; + public Client getConsumingClient() { + return consumingClient; } public static StreamParameters of( @@ -86,7 +87,7 @@ public static StreamParameters of( @Nullable final Integer batchKeepAliveIterations, final int maxUncommittedMessages, final long commitTimeoutSeconds, - final String consumingAppId) throws UnprocessableEntityException { + final Client client) throws UnprocessableEntityException { return new StreamParameters( batchLimitEvents, streamLimitEvents, @@ -95,6 +96,6 @@ public static StreamParameters of( batchKeepAliveIterations, maxUncommittedMessages, TimeUnit.SECONDS.toMillis(commitTimeoutSeconds), - consumingAppId); + client); } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index b2fffc7eb4..298273c90a 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -250,7 +250,7 @@ public boolean isConnectionReady() { public boolean isSubscriptionConsumptionBlocked() { return blacklistService.isSubscriptionConsumptionBlocked( subscription.getEventTypes(), - parameters.getConsumingAppId()); + parameters.getConsumingClient().getClientId()); } public CursorTokenService getCursorTokenService() { diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index ec753d3bf5..ea444cb567 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -16,6 +16,7 @@ import org.zalando.nakadi.metrics.MetricUtils; import org.zalando.nakadi.metrics.StreamKpiData; import org.zalando.nakadi.repository.EventConsumer; +import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.zk.ZkSubscription; @@ -73,7 +74,7 @@ class StreamingState extends State { @Override public void onEnter() { final String kafkaFlushedBytesMetricName = MetricUtils.metricNameForHiLAStream( - this.getContext().getParameters().getConsumingAppId(), + this.getContext().getParameters().getConsumingClient().getClientId(), this.getContext().getSubscription().getId() ); bytesSentMeterPerSubscription = this.getContext().getMetricRegistry().meter(kafkaFlushedBytesMetricName); @@ -277,15 +278,16 @@ private void streamToOutput() { } private void publishKpi(final String eventTypeName) { - final String appName = getContext().getParameters().getConsumingAppId(); + final Client client = getContext().getParameters().getConsumingClient(); final NakadiKpiPublisher kpiPublisher = getContext().getKpiPublisher(); final long bytes = kpiDataPerEventType.get(eventTypeName).getAndResetBytesSent(); final long count = kpiDataPerEventType.get(eventTypeName).getAndResetNumberOfEventsSent(); - final String appNameHashed = kpiPublisher.hash(appName); + final String appNameHashed = kpiPublisher.hash(client.getClientId()); getLog().info("[SLO] [streamed-data] api={} eventTypeName={} app={} appHashed={} " + "numberOfEvents={} bytesStreamed={} subscription={}", "hila", - eventTypeName, appName, appNameHashed, count, bytes, getContext().getSubscription().getId()); + eventTypeName, client.getClientId(), appNameHashed, count, bytes, + getContext().getSubscription().getId()); kpiPublisher.publish( getContext().getKpiDataStreamedEventType(), @@ -293,8 +295,9 @@ private void publishKpi(final String eventTypeName) { .put("api", "hila") .put("subscription", getContext().getSubscription().getId()) .put("event_type", eventTypeName) - .put("app", appName) + .put("app", client.getClientId()) .put("app_hashed", appNameHashed) + .put("token_realm", client.getRealm()) .put("number_of_events", count) .put("bytes_streamed", bytes)); } diff --git a/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java b/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java index 22d14f7076..831bbfcd5d 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java @@ -467,7 +467,7 @@ public void testAccessDenied() throws Exception { } private void writeStream() throws Exception { - final StreamingResponseBody responseBody = createStreamingResponseBody(new NakadiClient("clientId", null)); + final StreamingResponseBody responseBody = createStreamingResponseBody(new NakadiClient("clientId", "")); final OutputStream outputStream = mock(OutputStream.class); responseBody.writeTo(outputStream); } diff --git a/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java index 080e192e82..aa4882fc6d 100644 --- a/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java @@ -26,9 +26,6 @@ import org.zalando.nakadi.utils.TestUtils; import org.zalando.problem.Problem; -import java.util.HashSet; -import java.util.Set; - import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -235,13 +232,6 @@ private ResultActions postSubscriptionAsJson(final String subscription) throws E private class TestHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver { - private Set scopes = new HashSet<>(); - - public TestHandlerMethodArgumentResolver addScope(final Set scopes) { - this.scopes = scopes; - return this; - } - @Override public boolean supportsParameter(final MethodParameter parameter) { return true; @@ -252,7 +242,7 @@ public Object resolveArgument(final MethodParameter parameter, final ModelAndViewContainer mavContainer, final NativeWebRequest webRequest, final WebDataBinderFactory binderFactory) throws Exception { - return new NakadiClient("nakadiClientId", scopes); + return new NakadiClient("nakadiClientId", ""); } } } diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java index bdea9c2d59..ea87e1794f 100644 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java @@ -50,7 +50,6 @@ import javax.ws.rs.core.Response; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -327,13 +326,6 @@ private void checkForProblem(final ResultActions resultActions, final Problem ex private class TestHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver { - private Set scopes = new HashSet<>(); - - public TestHandlerMethodArgumentResolver addScope(final Set scopes) { - this.scopes = scopes; - return this; - } - @Override public boolean supportsParameter(final MethodParameter parameter) { return true; @@ -344,7 +336,7 @@ public Object resolveArgument(final MethodParameter parameter, final ModelAndViewContainer mavContainer, final NativeWebRequest webRequest, final WebDataBinderFactory binderFactory) throws Exception { - return new NakadiClient("nakadiClientId", scopes); + return new NakadiClient("nakadiClientId", ""); } } } diff --git a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java index 7dfcded0fd..5ba9c6bf00 100644 --- a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java @@ -446,7 +446,7 @@ public void testWrite() throws Exception { Mockito.when(cache.getEventType(eventType.getName())).thenReturn(eventType); mockSuccessfulValidation(eventType); final EventPublishResult result = publisher.publish(buildDefaultBatch(0).toString(), eventType.getName(), - new NakadiClient(CLIENT_ID, null)); + new NakadiClient(CLIENT_ID, "")); Assert.assertEquals(result.getStatus(), EventPublishingStatus.SUBMITTED); } diff --git a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java index e2c012ba23..25f7c16df7 100644 --- a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java @@ -19,6 +19,7 @@ import org.zalando.nakadi.repository.db.EventTypeCache; import org.zalando.nakadi.repository.kafka.KafkaCursor; import org.zalando.nakadi.repository.kafka.NakadiKafkaConsumer; +import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.converter.CursorConverterImpl; import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.utils.TestUtils; @@ -98,6 +99,7 @@ public void whenIOExceptionThenStreamIsClosed() throws NakadiException, Interrup .withCursors(ImmutableList.of(NakadiCursor.of(TIMELINE, "0", "0"))) .withBatchLimit(1) .withBatchTimeout(1) + .withConsumingClient(mock(Client.class)) .build(); final OutputStream outputStreamMock = mock(OutputStream.class); final EventStream eventStream = new EventStream( @@ -126,6 +128,7 @@ public void whenCrutchWorkedThenStreamIsClosed() throws NakadiException, Interru .withCursors(ImmutableList.of(NakadiCursor.of(TIMELINE, "0", "0"))) .withBatchLimit(1) .withBatchTimeout(1) + .withConsumingClient(mock(Client.class)) .build(); final EventStream eventStream = new EventStream( emptyConsumer(), mock(OutputStream.class), config, mock(BlacklistService.class), cursorConverter, @@ -153,6 +156,7 @@ public void whenAuthorizationChangedStreamClosed() throws NakadiException, Inter .withCursors(ImmutableList.of(NakadiCursor.of(TIMELINE, "0", "0"))) .withBatchLimit(1) .withBatchTimeout(1) + .withConsumingClient(mock(Client.class)) .build(); final EventStream eventStream = new EventStream( emptyConsumer(), mock(OutputStream.class), config, mock(BlacklistService.class), cursorConverter, @@ -195,6 +199,7 @@ public void whenStreamTimeoutIsSetThenStreamIsClosed() throws NakadiException, I .withStreamTimeout(1) .withBatchTimeout(1) .withCursors(new ArrayList<>()) + .withConsumingClient(mock(Client.class)) .build(); final EventStream eventStream = new EventStream( emptyConsumer(), mock(OutputStream.class), config, mock(BlacklistService.class), cursorConverter, @@ -211,6 +216,7 @@ public void whenStreamLimitIsSetThenStreamIsClosed() throws NakadiException, IOE .withCursors(ImmutableList.of(NakadiCursor.of(TIMELINE, "0", "0"))) .withBatchLimit(1) .withStreamLimit(1) + .withConsumingClient(mock(Client.class)) .build(); final EventStream eventStream = new EventStream(endlessDummyConsumer(), mock(OutputStream.class), config, mock(BlacklistService.class), cursorConverter, BYTES_FLUSHED_METER, writerProvider, kpiPublisher, @@ -228,6 +234,7 @@ public void whenKeepAliveLimitIsSetThenStreamIsClosed() throws NakadiException, .withBatchLimit(1) .withBatchTimeout(1) .withStreamKeepAliveLimit(1) + .withConsumingClient(mock(Client.class)) .build(); final EventStream eventStream = new EventStream( emptyConsumer(), mock(OutputStream.class), config, mock(BlacklistService.class), cursorConverter, @@ -245,6 +252,7 @@ public void whenNoEventsToReadThenKeepAliveIsSent() throws NakadiException, IOEx .withBatchLimit(1) .withBatchTimeout(1) .withStreamTimeout(2) + .withConsumingClient(mock(Client.class)) .build(); final ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -271,6 +279,7 @@ public void whenBatchSizeIsSetThenGetEventsInBatches() throws NakadiException, I .withBatchLimit(5) .withBatchTimeout(1) .withStreamTimeout(1) + .withConsumingClient(mock(Client.class)) .build(); final ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -299,6 +308,7 @@ public void whenReadingEventsTheOrderIsCorrect() throws NakadiException, IOExcep .withCursors(ImmutableList.of(NakadiCursor.of(TIMELINE, "0", "0"))) .withBatchLimit(1) .withStreamLimit(4) + .withConsumingClient(mock(Client.class)) .build(); final ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -344,6 +354,7 @@ public void whenReadFromMultiplePartitionsThenGroupedInBatchesAccordingToPartiti .withBatchLimit(2) .withStreamLimit(6) .withBatchTimeout(30) + .withConsumingClient(mock(Client.class)) .build(); final ByteArrayOutputStream out = new ByteArrayOutputStream(); diff --git a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java index 1f49747d5d..cbd78c6814 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java @@ -2,6 +2,7 @@ import org.junit.Test; import org.zalando.nakadi.exceptions.UnprocessableEntityException; +import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.EventStreamConfig; import java.util.concurrent.TimeUnit; @@ -12,17 +13,19 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; public class StreamParametersTest { @Test(expected = UnprocessableEntityException.class) public void whenBatchLimitLowerOrEqualToZeroTheException() throws Exception { - StreamParameters.of(0, null, 0, null, null, 0, 0, ""); + StreamParameters.of(0, null, 0, null, null, 0, 0, mock(Client.class)); } @Test public void checkParamsAreTransformedCorrectly() throws Exception { - final StreamParameters streamParameters = StreamParameters.of(1, null, 10, 60L, null, 1000, 20, ""); + final StreamParameters streamParameters = StreamParameters.of(1, null, 10, 60L, null, 1000, 20, + mock(Client.class)); assertThat(streamParameters.batchLimitEvents, equalTo(1)); assertThat(streamParameters.batchTimeoutMillis, equalTo(10000L)); @@ -33,7 +36,7 @@ public void checkParamsAreTransformedCorrectly() throws Exception { @Test public void whenStreamTimeoutOmittedThenItIsGenerated() throws Exception { - final StreamParameters streamParameters = StreamParameters.of(1, null, 0, null, null, 0, 0, ""); + final StreamParameters streamParameters = StreamParameters.of(1, null, 0, null, null, 0, 0, mock(Client.class)); checkStreamTimeoutIsGeneratedCorrectly(streamParameters); } @@ -41,14 +44,14 @@ public void whenStreamTimeoutOmittedThenItIsGenerated() throws Exception { @Test public void whenStreamTimeoutIsGreaterThanMaxThenItIsGenerated() throws Exception { final StreamParameters streamParameters = StreamParameters.of(1, null, 0, - EventStreamConfig.MAX_STREAM_TIMEOUT + 1L, null, 0, 0, ""); + EventStreamConfig.MAX_STREAM_TIMEOUT + 1L, null, 0, 0, mock(Client.class)); checkStreamTimeoutIsGeneratedCorrectly(streamParameters); } @Test public void checkIsStreamLimitReached() throws Exception { - final StreamParameters streamParameters = StreamParameters.of(1, 150L, 0, null, null, 0, 0, ""); + final StreamParameters streamParameters = StreamParameters.of(1, 150L, 0, null, null, 0, 0, mock(Client.class)); assertThat(streamParameters.isStreamLimitReached(140), is(false)); assertThat(streamParameters.isStreamLimitReached(151), is(true)); @@ -57,7 +60,7 @@ public void checkIsStreamLimitReached() throws Exception { @Test public void checkIsKeepAliveLimitReached() throws Exception { - final StreamParameters streamParameters = StreamParameters.of(1, null, 0, null, 5, 0, 0, ""); + final StreamParameters streamParameters = StreamParameters.of(1, null, 0, null, 5, 0, 0, mock(Client.class)); assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 6, 12)), is(true)); assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 4, 12)), is(false)); @@ -65,7 +68,7 @@ public void checkIsKeepAliveLimitReached() throws Exception { @Test public void checkgetMessagesAllowedToSend() throws Exception { - final StreamParameters streamParameters = StreamParameters.of(1, 200L, 0, null, null, 0, 0, ""); + final StreamParameters streamParameters = StreamParameters.of(1, 200L, 0, null, null, 0, 0, mock(Client.class)); assertThat(streamParameters.getMessagesAllowedToSend(50, 190), equalTo(10L)); assertThat(streamParameters.getMessagesAllowedToSend(50, 120), equalTo(50L)); diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java index e091759a0c..55fd9e1a7f 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java @@ -17,6 +17,7 @@ import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.repository.TopicRepository; +import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.subscription.StreamParameters; import org.zalando.nakadi.service.subscription.StreamingContext; @@ -71,6 +72,9 @@ public void prepareMocks() throws Exception { cursorConverter = mock(CursorConverter.class); when(contextMock.getCursorConverter()).thenReturn(cursorConverter); + final Client client = mock(Client.class); + when(client.getClientId()).thenReturn("consumingAppId"); + final StreamParameters spMock = StreamParameters.of( 1000, 100L, @@ -79,7 +83,7 @@ public void prepareMocks() throws Exception { 100, 100, 100, - "consumingAppId" + client ); when(contextMock.getParameters()).thenReturn(spMock);