Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #825 from zalando/aruha-1402-kpi-token-realm
Browse files Browse the repository at this point in the history
KPI token realm
  • Loading branch information
rcillo authored Jan 18, 2018
2 parents 69a3956 + fa65407 commit d30b412
Show file tree
Hide file tree
Showing 19 changed files with 103 additions and 78 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Fixed
- Allow to update event types with recursion in schemas

### Changed
- Distinguish between employee token and service token in KPI events

## [2.5.2] - 2018-01-08

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ private void reportSLOs(final long startingNanos, final int totalSizeBytes, fina
.put("event_type", eventTypeName)
.put("app", applicationName)
.put("app_hashed", nakadiKpiPublisher.hash(applicationName))
.put("token_realm", client.getRealm())
.put("number_of_events", eventCount)
.put("ms_spent", msSpent)
.put("batch_size", totalSizeBytes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public StreamingResponseBody streamEvents(
.withStreamTimeout(streamTimeout)
.withStreamKeepAliveLimit(streamKeepAliveLimit)
.withEtName(eventTypeName)
.withConsumingAppId(client.getClientId())
.withConsumingClient(client)
.withCursors(getStreamingStart(eventType, cursorsStr))
.withMaxMemoryUsageBytes(maxMemoryUsageBytes)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public StreamingResponseBody streamEvents(

final StreamParameters streamParameters = StreamParameters.of(batchLimit, streamLimit, batchTimeout,
streamTimeout, streamKeepAliveLimit, maxUncommittedSize,
nakadiSettings.getDefaultCommitTimeoutSeconds(), client.getClientId());
nakadiSettings.getDefaultCommitTimeoutSeconds(), client);
final Subscription subscription = subscriptionDbRepository.getSubscription(subscriptionId);

streamer = subscriptionStreamerFactory.build(subscription, streamParameters, output,
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/zalando/nakadi/security/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ public Client(final String clientId) {
public String getClientId() {
return clientId;
}

public String getRealm() {
return "";
}
}
26 changes: 16 additions & 10 deletions src/main/java/org/zalando/nakadi/security/ClientResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.MethodParameter;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.oauth2.common.exceptions.UnauthorizedUserException;
import org.springframework.security.oauth2.provider.OAuth2Authentication;
Expand All @@ -15,9 +16,8 @@
import org.zalando.nakadi.util.FeatureToggleService;

import java.security.Principal;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.zalando.nakadi.config.SecuritySettings.AuthMode.OFF;

Expand All @@ -26,12 +26,10 @@ public class ClientResolver implements HandlerMethodArgumentResolver {

private static final String FULL_ACCESS_CLIENT_ID = "adminClientId";
private final SecuritySettings settings;
private final FeatureToggleService featureToggleService;

@Autowired
public ClientResolver(final SecuritySettings settings, final FeatureToggleService featureToggleService) {
this.settings = settings;
this.featureToggleService = featureToggleService;
}

@Override
Expand All @@ -50,15 +48,23 @@ public Client resolveArgument(final MethodParameter parameter,
return new FullAccessClient(clientId.orElse(FULL_ACCESS_CLIENT_ID));
}

return clientId.map(client -> new NakadiClient(client, getScopes()))
return clientId.map(client -> new NakadiClient(client, getRealm()))
.orElseThrow(() -> new UnauthorizedUserException("Client unauthorized"));
}

private Set<String> getScopes() {
final Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication instanceof OAuth2Authentication) {
return ((OAuth2Authentication) authentication).getOAuth2Request().getScope();
public String getRealm() {
try {
return Optional.of(SecurityContextHolder.getContext())
.map(SecurityContext::getAuthentication)
.map(authentication -> (OAuth2Authentication) authentication)
.map(OAuth2Authentication::getUserAuthentication)
.map(Authentication::getDetails)
.map(details -> (Map) details)
.map(details -> details.get("realm"))
.map(realm -> (String) realm)
.orElse("");
} catch (final ClassCastException e) {
return "";
}
return Collections.emptySet();
}
}
13 changes: 8 additions & 5 deletions src/main/java/org/zalando/nakadi/security/NakadiClient.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package org.zalando.nakadi.security;

import java.util.Set;

public class NakadiClient extends Client {

private final Set<String> scopes;
private final String realm;

public NakadiClient(final String clientId, final Set<String> scopes) {
public NakadiClient(final String clientId, final String realm) {
super(clientId);
this.scopes = scopes;
this.realm = realm;
}

@Override
public String getRealm() {
return realm;
}
}
21 changes: 12 additions & 9 deletions src/main/java/org/zalando/nakadi/service/EventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.metrics.StreamKpiData;
import org.zalando.nakadi.repository.EventConsumer;
import org.zalando.nakadi.security.Client;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -80,7 +81,8 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che
long bytesInMemory = 0;

while (connectionReady.get() &&
!blacklistService.isConsumptionBlocked(config.getEtName(), config.getConsumingAppId())) {
!blacklistService.isConsumptionBlocked(config.getEtName(), config.getConsumingClient()
.getClientId())) {

checkAuthorization.run();

Expand Down Expand Up @@ -146,7 +148,7 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che
final long count = kpiData.getAndResetNumberOfEventsSent();
final long bytes = kpiData.getAndResetBytesSent();

publishKpi(config.getConsumingAppId(), count, bytes);
publishKpi(config.getConsumingClient(), count, bytes);

lastKpiEventSent = System.currentTimeMillis();
}
Expand Down Expand Up @@ -183,10 +185,10 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che
LOG.info("Error occurred when streaming events (possibly server closed connection)", e);
} catch (final KafkaException e) {
LOG.error("Error occurred when polling events from kafka; consumer: {}, event-type: {}",
config.getConsumingAppId(), config.getEtName(), e);
config.getConsumingClient().getClientId(), config.getEtName(), e);
} finally {
publishKpi(
config.getConsumingAppId(),
config.getConsumingClient(),
kpiData.getAndResetNumberOfEventsSent(),
kpiData.getAndResetBytesSent());
}
Expand All @@ -196,19 +198,20 @@ private boolean isMemoryLimitReached(final long memoryUsed) {
return memoryUsed > config.getMaxMemoryUsageBytes();
}

private void publishKpi(final String appName, final long count, final long bytes) {
final String appNameHashed = kpiPublisher.hash(appName);
private void publishKpi(final Client client, final long count, final long bytes) {
final String appNameHashed = kpiPublisher.hash(client.getClientId());

LOG.info("[SLO] [streamed-data] api={} eventTypeName={} app={} appHashed={} numberOfEvents={} bytesStreamed={}",
"lola", config.getEtName(), appName, appNameHashed, count, bytes);
"lola", config.getEtName(), client.getClientId(), appNameHashed, count, bytes);

kpiPublisher.publish(
kpiDataStreamedEventType,
() -> new JSONObject()
.put("api", "lola")
.put("event_type", config.getEtName())
.put("app", appName)
.put("app_hashed", kpiPublisher.hash(appName))
.put("app", client.getClientId())
.put("app_hashed", appNameHashed)
.put("token_realm", client.getRealm())
.put("number_of_events", count)
.put("bytes_streamed", bytes));
}
Expand Down
19 changes: 10 additions & 9 deletions src/main/java/org/zalando/nakadi/service/EventStreamConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.exceptions.UnprocessableEntityException;
import org.zalando.nakadi.security.Client;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
Expand All @@ -27,12 +28,12 @@ public class EventStreamConfig {
private final int streamTimeout;
private final int streamKeepAliveLimit;
private final String etName;
private final String consumingAppId;
private final Client consumingClient;
private final long maxMemoryUsageBytes;

private EventStreamConfig(final List<NakadiCursor> cursors, final int batchLimit,
final int streamLimit, final int batchTimeout, final int streamTimeout,
final int streamKeepAliveLimit, final String etName, final String consumingAppId,
final int streamKeepAliveLimit, final String etName, final Client consumingClient,
final long maxMemoryUsageBytes) {
this.cursors = cursors;
this.batchLimit = batchLimit;
Expand All @@ -41,7 +42,7 @@ private EventStreamConfig(final List<NakadiCursor> cursors, final int batchLimit
this.streamTimeout = streamTimeout;
this.streamKeepAliveLimit = streamKeepAliveLimit;
this.etName = etName;
this.consumingAppId = consumingAppId;
this.consumingClient= consumingClient;
this.maxMemoryUsageBytes = maxMemoryUsageBytes;
}

Expand Down Expand Up @@ -73,8 +74,8 @@ public String getEtName() {
return etName;
}

public String getConsumingAppId() {
return consumingAppId;
public Client getConsumingClient() {
return consumingClient;
}

public long getMaxMemoryUsageBytes() {
Expand Down Expand Up @@ -133,7 +134,7 @@ public static class Builder {
private int streamKeepAliveLimit = STREAM_KEEP_ALIVE_LIMIT_DEFAULT;
private long maxMemoryUsageBytes = DEF_MAX_MEMORY_USAGE_BYTES;
private String etName;
private String consumingAppId;
private Client consumingClient;

public Builder withCursors(final List<NakadiCursor> cursors) {
this.cursors = cursors;
Expand Down Expand Up @@ -191,8 +192,8 @@ public Builder withEtName(final String etName) {
return this;
}

public Builder withConsumingAppId(final String consumingAppId) {
this.consumingAppId = consumingAppId;
public Builder withConsumingClient(final Client consumingClient) {
this.consumingClient = consumingClient;
return this;
}

Expand All @@ -206,7 +207,7 @@ public EventStreamConfig build() throws UnprocessableEntityException {
throw new UnprocessableEntityException("batch_limit can't be lower than 1");
}
return new EventStreamConfig(cursors, batchLimit, streamLimit, batchTimeout, streamTimeout,
streamKeepAliveLimit, etName, consumingAppId, maxMemoryUsageBytes);
streamKeepAliveLimit, etName, consumingClient, maxMemoryUsageBytes);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.zalando.nakadi.service.subscription;

import org.zalando.nakadi.exceptions.UnprocessableEntityException;
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.service.EventStreamConfig;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -38,12 +39,12 @@ public class StreamParameters {
// Applies to stream. Timeout without commits.
public final long commitTimeoutMillis;

private final String consumingAppId;
private final Client consumingClient;

private StreamParameters(
final int batchLimitEvents, @Nullable final Long streamLimitEvents, final long batchTimeoutMillis,
@Nullable final Long streamTimeoutSeconds, @Nullable final Integer batchKeepAliveIterations,
final int maxUncommittedMessages, final long commitTimeoutMillis, final String consumingAppId)
final int maxUncommittedMessages, final long commitTimeoutMillis, final Client consumingClient)
throws UnprocessableEntityException {
if (batchLimitEvents > 0) {
this.batchLimitEvents = batchLimitEvents;
Expand All @@ -59,7 +60,7 @@ private StreamParameters(
this.batchKeepAliveIterations = Optional.ofNullable(batchKeepAliveIterations);
this.maxUncommittedMessages = maxUncommittedMessages;
this.commitTimeoutMillis = commitTimeoutMillis;
this.consumingAppId = consumingAppId;
this.consumingClient = consumingClient;
}

public long getMessagesAllowedToSend(final long limit, final long sentSoFar) {
Expand All @@ -74,8 +75,8 @@ public boolean isKeepAliveLimitReached(final IntStream keepAlive) {
return batchKeepAliveIterations.map(it -> keepAlive.allMatch(v -> v >= it)).orElse(false);
}

public String getConsumingAppId() {
return consumingAppId;
public Client getConsumingClient() {
return consumingClient;
}

public static StreamParameters of(
Expand All @@ -86,7 +87,7 @@ public static StreamParameters of(
@Nullable final Integer batchKeepAliveIterations,
final int maxUncommittedMessages,
final long commitTimeoutSeconds,
final String consumingAppId) throws UnprocessableEntityException {
final Client client) throws UnprocessableEntityException {
return new StreamParameters(
batchLimitEvents,
streamLimitEvents,
Expand All @@ -95,6 +96,6 @@ public static StreamParameters of(
batchKeepAliveIterations,
maxUncommittedMessages,
TimeUnit.SECONDS.toMillis(commitTimeoutSeconds),
consumingAppId);
client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public boolean isConnectionReady() {
public boolean isSubscriptionConsumptionBlocked() {
return blacklistService.isSubscriptionConsumptionBlocked(
subscription.getEventTypes(),
parameters.getConsumingAppId());
parameters.getConsumingClient().getClientId());
}

public CursorTokenService getCursorTokenService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.zalando.nakadi.metrics.MetricUtils;
import org.zalando.nakadi.metrics.StreamKpiData;
import org.zalando.nakadi.repository.EventConsumer;
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.service.NakadiKpiPublisher;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.zk.ZkSubscription;
Expand Down Expand Up @@ -73,7 +74,7 @@ class StreamingState extends State {
@Override
public void onEnter() {
final String kafkaFlushedBytesMetricName = MetricUtils.metricNameForHiLAStream(
this.getContext().getParameters().getConsumingAppId(),
this.getContext().getParameters().getConsumingClient().getClientId(),
this.getContext().getSubscription().getId()
);
bytesSentMeterPerSubscription = this.getContext().getMetricRegistry().meter(kafkaFlushedBytesMetricName);
Expand Down Expand Up @@ -277,24 +278,26 @@ private void streamToOutput() {
}

private void publishKpi(final String eventTypeName) {
final String appName = getContext().getParameters().getConsumingAppId();
final Client client = getContext().getParameters().getConsumingClient();
final NakadiKpiPublisher kpiPublisher = getContext().getKpiPublisher();
final long bytes = kpiDataPerEventType.get(eventTypeName).getAndResetBytesSent();
final long count = kpiDataPerEventType.get(eventTypeName).getAndResetNumberOfEventsSent();
final String appNameHashed = kpiPublisher.hash(appName);
final String appNameHashed = kpiPublisher.hash(client.getClientId());

getLog().info("[SLO] [streamed-data] api={} eventTypeName={} app={} appHashed={} " +
"numberOfEvents={} bytesStreamed={} subscription={}", "hila",
eventTypeName, appName, appNameHashed, count, bytes, getContext().getSubscription().getId());
eventTypeName, client.getClientId(), appNameHashed, count, bytes,
getContext().getSubscription().getId());

kpiPublisher.publish(
getContext().getKpiDataStreamedEventType(),
() -> new JSONObject()
.put("api", "hila")
.put("subscription", getContext().getSubscription().getId())
.put("event_type", eventTypeName)
.put("app", appName)
.put("app", client.getClientId())
.put("app_hashed", appNameHashed)
.put("token_realm", client.getRealm())
.put("number_of_events", count)
.put("bytes_streamed", bytes));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ public void testAccessDenied() throws Exception {
}

private void writeStream() throws Exception {
final StreamingResponseBody responseBody = createStreamingResponseBody(new NakadiClient("clientId", null));
final StreamingResponseBody responseBody = createStreamingResponseBody(new NakadiClient("clientId", ""));
final OutputStream outputStream = mock(OutputStream.class);
responseBody.writeTo(outputStream);
}
Expand Down
Loading

0 comments on commit d30b412

Please sign in to comment.