Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #817 from zalando/ARUHA-1428
Browse files Browse the repository at this point in the history
ARUHA-1428: Added submitting of subscription log events;
  • Loading branch information
v-stepanov authored Jan 3, 2018
2 parents 6e4906a + 6618bc6 commit eb6c7ed
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 22 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [2.5.1] - 2018-01-03

### Added
- Added publishing of subscription log events

## [2.5.0] - 2017-12-22

### Added
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.zalando.nakadi.service.subscription;

import com.google.common.collect.ImmutableSet;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
Expand Down Expand Up @@ -37,6 +39,7 @@
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.CursorOperationsService;
import org.zalando.nakadi.service.NakadiKpiPublisher;
import org.zalando.nakadi.service.Result;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
Expand Down Expand Up @@ -71,6 +74,8 @@ public class SubscriptionService {
private final SubscriptionValidationService subscriptionValidationService;
private final CursorConverter converter;
private final CursorOperationsService cursorOperationsService;
private final NakadiKpiPublisher nakadiKpiPublisher;
private final String subLogEventType;

@Autowired
public SubscriptionService(final SubscriptionDbRepository subscriptionRepository,
Expand All @@ -79,22 +84,32 @@ public SubscriptionService(final SubscriptionDbRepository subscriptionRepository
final EventTypeRepository eventTypeRepository,
final SubscriptionValidationService subscriptionValidationService,
final CursorConverter converter,
final CursorOperationsService cursorOperationsService) {
final CursorOperationsService cursorOperationsService,
final NakadiKpiPublisher nakadiKpiPublisher,
@Value("${nakadi.kpi.event-types.nakadiSubscriptionLog}") final String subLogEventType) {
this.subscriptionRepository = subscriptionRepository;
this.subscriptionClientFactory = subscriptionClientFactory;
this.timelineService = timelineService;
this.eventTypeRepository = eventTypeRepository;
this.subscriptionValidationService = subscriptionValidationService;
this.converter = converter;
this.cursorOperationsService = cursorOperationsService;
this.nakadiKpiPublisher = nakadiKpiPublisher;
this.subLogEventType = subLogEventType;
}

public Subscription createSubscription(final SubscriptionBase subscriptionBase)
throws TooManyPartitionsException, RepositoryProblemException, DuplicatedSubscriptionException,
NoEventTypeException, InconsistentStateException, WrongInitialCursorsException {

subscriptionValidationService.validateSubscription(subscriptionBase);
return subscriptionRepository.createSubscription(subscriptionBase);
final Subscription subscription = subscriptionRepository.createSubscription(subscriptionBase);

nakadiKpiPublisher.publish(subLogEventType, () -> new JSONObject()
.put("subscription_id", subscription.getId())
.put("status", "created"));

return subscription;
}

public Subscription getExistingSubscription(final SubscriptionBase subscriptionBase)
Expand Down Expand Up @@ -159,6 +174,10 @@ public Result<Void> deleteSubscription(final String subscriptionId) {
subscription, "subscription." + subscriptionId + ".delete_subscription");
zkSubscriptionClient.deleteSubscription();

nakadiKpiPublisher.publish(subLogEventType, () -> new JSONObject()
.put("subscription_id", subscriptionId)
.put("status", "deleted"));

return Result.ok();
} catch (final NoSuchSubscriptionException e) {
LOG.debug("Failed to find subscription: {}", subscriptionId, e);
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ nakadi:
event-types:
nakadiAccessLog: "nakadi.access.log"
nakadiEventTypeLog: "nakadi.event.type.log"
nakadiSubscriptionLog: "nakadi.subscription.log"
nakadiBatchPublished: "nakadi.batch.published"
nakadiDataStreamed: "nakadi.data.streamed"
hasher.salt: "salt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.zalando.nakadi.security.NakadiClient;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.CursorOperationsService;
import org.zalando.nakadi.service.NakadiKpiPublisher;
import org.zalando.nakadi.service.subscription.SubscriptionService;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.model.Session;
Expand Down Expand Up @@ -81,6 +82,8 @@
public class SubscriptionControllerTest {

private static final String PROBLEM_CONTENT_TYPE = "application/problem+json";
private static final int PARTITIONS_PER_SUBSCRIPTION = 5;
private static final Timeline TIMELINE = buildTimelineWithTopic("topic");

private final SubscriptionDbRepository subscriptionRepository = mock(SubscriptionDbRepository.class);
private final EventTypeRepository eventTypeRepository = mock(EventTypeRepository.class);
Expand All @@ -90,8 +93,6 @@ public class SubscriptionControllerTest {
private final CursorConverter cursorConverter;
private final CursorOperationsService cursorOperationsService;
private final TimelineService timelineService;
private static final int PARTITIONS_PER_SUBSCRIPTION = 5;
private static final Timeline TIMELINE = buildTimelineWithTopic("topic");

public SubscriptionControllerTest() throws Exception {
final FeatureToggleService featureToggleService = mock(FeatureToggleService.class);
Expand All @@ -111,9 +112,10 @@ public SubscriptionControllerTest() throws Exception {
when(settings.getMaxSubscriptionPartitions()).thenReturn(PARTITIONS_PER_SUBSCRIPTION);
cursorOperationsService = mock(CursorOperationsService.class);
cursorConverter = mock(CursorConverter.class);
final NakadiKpiPublisher nakadiKpiPublisher = mock(NakadiKpiPublisher.class);
final SubscriptionService subscriptionService = new SubscriptionService(subscriptionRepository,
zkSubscriptionClientFactory, timelineService, eventTypeRepository, null,
cursorConverter, cursorOperationsService);
cursorConverter, cursorOperationsService, nakadiKpiPublisher, "subscription_log_et");
final SubscriptionController controller = new SubscriptionController(featureToggleService, subscriptionService);
final ApplicationService applicationService = mock(ApplicationService.class);
doReturn(true).when(applicationService).exists(any());
Expand Down
24 changes: 7 additions & 17 deletions src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.zalando.nakadi.config.NakadiSettings;
Expand All @@ -27,16 +26,12 @@

import java.util.ArrayList;
import java.util.Optional;
import java.util.function.Supplier;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand All @@ -45,6 +40,7 @@
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType;
import static org.zalando.nakadi.utils.TestUtils.checkKPIEventSubmitted;

public class EventTypeServiceTest {

Expand Down Expand Up @@ -114,47 +110,41 @@ public void shouldRemoveEventTypeWhenTimelineCreationFails() throws Exception {
}

@Test
public void whenEventTypeCreatedTheKPIEventSubmitted() throws Exception {
public void whenEventTypeCreatedThenKPIEventSubmitted() throws Exception {
final EventType et = buildDefaultEventType();
eventTypeService.create(et);
checkKPIEventSubmitted(KPI_ET_LOG_EVENT_TYPE,
checkKPIEventSubmitted(nakadiKpiPublisher, KPI_ET_LOG_EVENT_TYPE,
new JSONObject()
.put("event_type", et.getName())
.put("status", "created")
.put("category", et.getCategory()));
}

@Test
public void whenEventTypeUpdatedTheKPIEventSubmitted() throws Exception {
public void whenEventTypeUpdatedThenKPIEventSubmitted() throws Exception {
final EventType et = buildDefaultEventType();
when(eventTypeRepository.findByName(et.getName())).thenReturn(et);
when(schemaEvolutionService.evolve(any(), any())).thenReturn(et);

eventTypeService.update(et.getName(), et);
checkKPIEventSubmitted(KPI_ET_LOG_EVENT_TYPE,
checkKPIEventSubmitted(nakadiKpiPublisher, KPI_ET_LOG_EVENT_TYPE,
new JSONObject()
.put("event_type", et.getName())
.put("status", "updated")
.put("category", et.getCategory()));
}

@Test
public void whenEventTypeDeletedTheKPIEventSubmitted() throws Exception {
public void whenEventTypeDeletedThenKPIEventSubmitted() throws Exception {
final EventType et = buildDefaultEventType();
when(eventTypeRepository.findByNameO(et.getName())).thenReturn(Optional.of(et));

eventTypeService.delete(et.getName());
checkKPIEventSubmitted(KPI_ET_LOG_EVENT_TYPE,
checkKPIEventSubmitted(nakadiKpiPublisher, KPI_ET_LOG_EVENT_TYPE,
new JSONObject()
.put("event_type", et.getName())
.put("status", "deleted")
.put("category", et.getCategory()));
}

@SuppressWarnings("unchecked")
private void checkKPIEventSubmitted(final String eventType, final JSONObject expectedEvent) {
final ArgumentCaptor<Supplier> supplierCaptor = ArgumentCaptor.forClass(Supplier.class);
verify(nakadiKpiPublisher, times(1)).publish(eq(eventType), supplierCaptor.capture());
assertThat(expectedEvent.similar(supplierCaptor.getValue().get()), is(true));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.zalando.nakadi.service;

import org.json.JSONObject;
import org.junit.Test;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.service.subscription.SubscriptionService;
import org.zalando.nakadi.service.subscription.SubscriptionValidationService;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.utils.RandomSubscriptionBuilder;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.zalando.nakadi.utils.TestUtils.checkKPIEventSubmitted;

public class SubscriptionServiceTest {

private static final String SUBSCRIPTION_LOG_ET = "subscription_log_et";

private final SubscriptionDbRepository subscriptionRepository;
private final NakadiKpiPublisher nakadiKpiPublisher;
private final SubscriptionService subscriptionService;

public SubscriptionServiceTest() throws Exception {
final SubscriptionClientFactory zkSubscriptionClientFactory = mock(SubscriptionClientFactory.class);
final ZkSubscriptionClient zkSubscriptionClient = mock(ZkSubscriptionClient.class);
when(zkSubscriptionClientFactory.createClient(any(), any())).thenReturn(zkSubscriptionClient);
final TimelineService timelineService = mock(TimelineService.class);
final CursorOperationsService cursorOperationsService = mock(CursorOperationsService.class);
final CursorConverter cursorConverter = mock(CursorConverter.class);
final SubscriptionValidationService subscriptionValidationService = mock(SubscriptionValidationService.class);
final EventTypeRepository eventTypeRepository = mock(EventTypeRepository.class);
nakadiKpiPublisher = mock(NakadiKpiPublisher.class);
subscriptionRepository = mock(SubscriptionDbRepository.class);

subscriptionService = new SubscriptionService(subscriptionRepository, zkSubscriptionClientFactory,
timelineService, eventTypeRepository, subscriptionValidationService, cursorConverter,
cursorOperationsService, nakadiKpiPublisher, SUBSCRIPTION_LOG_ET);
}

@Test
public void whenSubscriptionCreatedThenKPIEventSubmitted() throws Exception {
final SubscriptionBase subscriptionBase = RandomSubscriptionBuilder.builder()
.buildSubscriptionBase();
final Subscription subscription = RandomSubscriptionBuilder.builder()
.withId("my_subscription_id1")
.build();
when(subscriptionRepository.createSubscription(subscriptionBase)).thenReturn(subscription);

subscriptionService.createSubscription(subscriptionBase);

checkKPIEventSubmitted(nakadiKpiPublisher, SUBSCRIPTION_LOG_ET,
new JSONObject()
.put("subscription_id", "my_subscription_id1")
.put("status", "created"));
}

@Test
public void whenSubscriptionDeletedThenKPIEventSubmitted() throws Exception {
subscriptionService.deleteSubscription("my_subscription_id1");

checkKPIEventSubmitted(nakadiKpiPublisher, SUBSCRIPTION_LOG_ET,
new JSONObject()
.put("subscription_id", "my_subscription_id1")
.put("status", "deleted"));
}

}
16 changes: 16 additions & 0 deletions src/test/java/org/zalando/nakadi/utils/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.json.JSONObject;
import org.mockito.ArgumentCaptor;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.test.web.servlet.MockMvc;
Expand All @@ -23,6 +24,7 @@
import org.zalando.nakadi.plugin.api.authz.AuthorizationService;
import org.zalando.nakadi.plugin.api.authz.Resource;
import org.zalando.nakadi.problem.ValidationProblem;
import org.zalando.nakadi.service.NakadiKpiPublisher;
import org.zalando.problem.Problem;

import java.io.IOException;
Expand All @@ -31,12 +33,18 @@
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.setup.MockMvcBuilders.standaloneSetup;
import static org.zalando.nakadi.utils.RandomSubscriptionBuilder.builder;
Expand Down Expand Up @@ -224,4 +232,12 @@ public static Timeline buildTimelineWithTopic(final String topic) {
public static String toTimelineOffset(final long offset) {
return String.format("001-0001-%018d", offset);
}

@SuppressWarnings("unchecked")
public static void checkKPIEventSubmitted(final NakadiKpiPublisher nakadiKpiPublisher, final String eventType,
final JSONObject expectedEvent) {
final ArgumentCaptor<Supplier> supplierCaptor = ArgumentCaptor.forClass(Supplier.class);
verify(nakadiKpiPublisher, times(1)).publish(eq(eventType), supplierCaptor.capture());
assertThat(expectedEvent.similar(supplierCaptor.getValue().get()), is(true));
}
}

0 comments on commit eb6c7ed

Please sign in to comment.