diff --git a/CHANGELOG.md b/CHANGELOG.md index 078f92b427..0320391a47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## [2.5.1] - 2018-01-03 + +### Added +- Added publishing of subscription log events + ## [2.5.0] - 2017-12-22 ### Added diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index 13abb08dc9..cdb4e4f653 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -1,9 +1,11 @@ package org.zalando.nakadi.service.subscription; import com.google.common.collect.ImmutableSet; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; @@ -37,6 +39,7 @@ import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorOperationsService; +import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.Result; import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; @@ -71,6 +74,8 @@ public class SubscriptionService { private final SubscriptionValidationService subscriptionValidationService; private final CursorConverter converter; private final CursorOperationsService cursorOperationsService; + private final NakadiKpiPublisher nakadiKpiPublisher; + private final String subLogEventType; @Autowired public SubscriptionService(final SubscriptionDbRepository subscriptionRepository, @@ -79,7 +84,9 @@ public SubscriptionService(final SubscriptionDbRepository subscriptionRepository final EventTypeRepository eventTypeRepository, final SubscriptionValidationService subscriptionValidationService, final CursorConverter converter, - final CursorOperationsService cursorOperationsService) { + final CursorOperationsService cursorOperationsService, + final NakadiKpiPublisher nakadiKpiPublisher, + @Value("${nakadi.kpi.event-types.nakadiSubscriptionLog}") final String subLogEventType) { this.subscriptionRepository = subscriptionRepository; this.subscriptionClientFactory = subscriptionClientFactory; this.timelineService = timelineService; @@ -87,6 +94,8 @@ public SubscriptionService(final SubscriptionDbRepository subscriptionRepository this.subscriptionValidationService = subscriptionValidationService; this.converter = converter; this.cursorOperationsService = cursorOperationsService; + this.nakadiKpiPublisher = nakadiKpiPublisher; + this.subLogEventType = subLogEventType; } public Subscription createSubscription(final SubscriptionBase subscriptionBase) @@ -94,7 +103,13 @@ public Subscription createSubscription(final SubscriptionBase subscriptionBase) NoEventTypeException, InconsistentStateException, WrongInitialCursorsException { subscriptionValidationService.validateSubscription(subscriptionBase); - return subscriptionRepository.createSubscription(subscriptionBase); + final Subscription subscription = subscriptionRepository.createSubscription(subscriptionBase); + + nakadiKpiPublisher.publish(subLogEventType, () -> new JSONObject() + .put("subscription_id", subscription.getId()) + .put("status", "created")); + + return subscription; } public Subscription getExistingSubscription(final SubscriptionBase subscriptionBase) @@ -159,6 +174,10 @@ public Result deleteSubscription(final String subscriptionId) { subscription, "subscription." + subscriptionId + ".delete_subscription"); zkSubscriptionClient.deleteSubscription(); + nakadiKpiPublisher.publish(subLogEventType, () -> new JSONObject() + .put("subscription_id", subscriptionId) + .put("status", "deleted")); + return Result.ok(); } catch (final NoSuchSubscriptionException e) { LOG.debug("Failed to find subscription: {}", subscriptionId, e); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 98192e252b..6de4d7fe19 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -111,6 +111,7 @@ nakadi: event-types: nakadiAccessLog: "nakadi.access.log" nakadiEventTypeLog: "nakadi.event.type.log" + nakadiSubscriptionLog: "nakadi.subscription.log" nakadiBatchPublished: "nakadi.batch.published" nakadiDataStreamed: "nakadi.data.streamed" hasher.salt: "salt" diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java index fccc0052b2..bdea9c2d59 100644 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java @@ -32,6 +32,7 @@ import org.zalando.nakadi.security.NakadiClient; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorOperationsService; +import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.subscription.SubscriptionService; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; @@ -81,6 +82,8 @@ public class SubscriptionControllerTest { private static final String PROBLEM_CONTENT_TYPE = "application/problem+json"; + private static final int PARTITIONS_PER_SUBSCRIPTION = 5; + private static final Timeline TIMELINE = buildTimelineWithTopic("topic"); private final SubscriptionDbRepository subscriptionRepository = mock(SubscriptionDbRepository.class); private final EventTypeRepository eventTypeRepository = mock(EventTypeRepository.class); @@ -90,8 +93,6 @@ public class SubscriptionControllerTest { private final CursorConverter cursorConverter; private final CursorOperationsService cursorOperationsService; private final TimelineService timelineService; - private static final int PARTITIONS_PER_SUBSCRIPTION = 5; - private static final Timeline TIMELINE = buildTimelineWithTopic("topic"); public SubscriptionControllerTest() throws Exception { final FeatureToggleService featureToggleService = mock(FeatureToggleService.class); @@ -111,9 +112,10 @@ public SubscriptionControllerTest() throws Exception { when(settings.getMaxSubscriptionPartitions()).thenReturn(PARTITIONS_PER_SUBSCRIPTION); cursorOperationsService = mock(CursorOperationsService.class); cursorConverter = mock(CursorConverter.class); + final NakadiKpiPublisher nakadiKpiPublisher = mock(NakadiKpiPublisher.class); final SubscriptionService subscriptionService = new SubscriptionService(subscriptionRepository, zkSubscriptionClientFactory, timelineService, eventTypeRepository, null, - cursorConverter, cursorOperationsService); + cursorConverter, cursorOperationsService, nakadiKpiPublisher, "subscription_log_et"); final SubscriptionController controller = new SubscriptionController(featureToggleService, subscriptionService); final ApplicationService applicationService = mock(ApplicationService.class); doReturn(true).when(applicationService).exists(any()); diff --git a/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java b/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java index 961f7688a1..c74329abb7 100644 --- a/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java @@ -5,7 +5,6 @@ import org.json.JSONObject; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionTemplate; import org.zalando.nakadi.config.NakadiSettings; @@ -27,16 +26,12 @@ import java.util.ArrayList; import java.util.Optional; -import java.util.function.Supplier; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -45,6 +40,7 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; +import static org.zalando.nakadi.utils.TestUtils.checkKPIEventSubmitted; public class EventTypeServiceTest { @@ -114,10 +110,10 @@ public void shouldRemoveEventTypeWhenTimelineCreationFails() throws Exception { } @Test - public void whenEventTypeCreatedTheKPIEventSubmitted() throws Exception { + public void whenEventTypeCreatedThenKPIEventSubmitted() throws Exception { final EventType et = buildDefaultEventType(); eventTypeService.create(et); - checkKPIEventSubmitted(KPI_ET_LOG_EVENT_TYPE, + checkKPIEventSubmitted(nakadiKpiPublisher, KPI_ET_LOG_EVENT_TYPE, new JSONObject() .put("event_type", et.getName()) .put("status", "created") @@ -125,13 +121,13 @@ public void whenEventTypeCreatedTheKPIEventSubmitted() throws Exception { } @Test - public void whenEventTypeUpdatedTheKPIEventSubmitted() throws Exception { + public void whenEventTypeUpdatedThenKPIEventSubmitted() throws Exception { final EventType et = buildDefaultEventType(); when(eventTypeRepository.findByName(et.getName())).thenReturn(et); when(schemaEvolutionService.evolve(any(), any())).thenReturn(et); eventTypeService.update(et.getName(), et); - checkKPIEventSubmitted(KPI_ET_LOG_EVENT_TYPE, + checkKPIEventSubmitted(nakadiKpiPublisher, KPI_ET_LOG_EVENT_TYPE, new JSONObject() .put("event_type", et.getName()) .put("status", "updated") @@ -139,22 +135,16 @@ public void whenEventTypeUpdatedTheKPIEventSubmitted() throws Exception { } @Test - public void whenEventTypeDeletedTheKPIEventSubmitted() throws Exception { + public void whenEventTypeDeletedThenKPIEventSubmitted() throws Exception { final EventType et = buildDefaultEventType(); when(eventTypeRepository.findByNameO(et.getName())).thenReturn(Optional.of(et)); eventTypeService.delete(et.getName()); - checkKPIEventSubmitted(KPI_ET_LOG_EVENT_TYPE, + checkKPIEventSubmitted(nakadiKpiPublisher, KPI_ET_LOG_EVENT_TYPE, new JSONObject() .put("event_type", et.getName()) .put("status", "deleted") .put("category", et.getCategory())); } - @SuppressWarnings("unchecked") - private void checkKPIEventSubmitted(final String eventType, final JSONObject expectedEvent) { - final ArgumentCaptor supplierCaptor = ArgumentCaptor.forClass(Supplier.class); - verify(nakadiKpiPublisher, times(1)).publish(eq(eventType), supplierCaptor.capture()); - assertThat(expectedEvent.similar(supplierCaptor.getValue().get()), is(true)); - } } diff --git a/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java b/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java new file mode 100644 index 0000000000..ba8fe41bff --- /dev/null +++ b/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java @@ -0,0 +1,73 @@ +package org.zalando.nakadi.service; + +import org.json.JSONObject; +import org.junit.Test; +import org.zalando.nakadi.domain.Subscription; +import org.zalando.nakadi.domain.SubscriptionBase; +import org.zalando.nakadi.repository.EventTypeRepository; +import org.zalando.nakadi.repository.db.SubscriptionDbRepository; +import org.zalando.nakadi.service.subscription.SubscriptionService; +import org.zalando.nakadi.service.subscription.SubscriptionValidationService; +import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; +import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; +import org.zalando.nakadi.service.timeline.TimelineService; +import org.zalando.nakadi.utils.RandomSubscriptionBuilder; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.zalando.nakadi.utils.TestUtils.checkKPIEventSubmitted; + +public class SubscriptionServiceTest { + + private static final String SUBSCRIPTION_LOG_ET = "subscription_log_et"; + + private final SubscriptionDbRepository subscriptionRepository; + private final NakadiKpiPublisher nakadiKpiPublisher; + private final SubscriptionService subscriptionService; + + public SubscriptionServiceTest() throws Exception { + final SubscriptionClientFactory zkSubscriptionClientFactory = mock(SubscriptionClientFactory.class); + final ZkSubscriptionClient zkSubscriptionClient = mock(ZkSubscriptionClient.class); + when(zkSubscriptionClientFactory.createClient(any(), any())).thenReturn(zkSubscriptionClient); + final TimelineService timelineService = mock(TimelineService.class); + final CursorOperationsService cursorOperationsService = mock(CursorOperationsService.class); + final CursorConverter cursorConverter = mock(CursorConverter.class); + final SubscriptionValidationService subscriptionValidationService = mock(SubscriptionValidationService.class); + final EventTypeRepository eventTypeRepository = mock(EventTypeRepository.class); + nakadiKpiPublisher = mock(NakadiKpiPublisher.class); + subscriptionRepository = mock(SubscriptionDbRepository.class); + + subscriptionService = new SubscriptionService(subscriptionRepository, zkSubscriptionClientFactory, + timelineService, eventTypeRepository, subscriptionValidationService, cursorConverter, + cursorOperationsService, nakadiKpiPublisher, SUBSCRIPTION_LOG_ET); + } + + @Test + public void whenSubscriptionCreatedThenKPIEventSubmitted() throws Exception { + final SubscriptionBase subscriptionBase = RandomSubscriptionBuilder.builder() + .buildSubscriptionBase(); + final Subscription subscription = RandomSubscriptionBuilder.builder() + .withId("my_subscription_id1") + .build(); + when(subscriptionRepository.createSubscription(subscriptionBase)).thenReturn(subscription); + + subscriptionService.createSubscription(subscriptionBase); + + checkKPIEventSubmitted(nakadiKpiPublisher, SUBSCRIPTION_LOG_ET, + new JSONObject() + .put("subscription_id", "my_subscription_id1") + .put("status", "created")); + } + + @Test + public void whenSubscriptionDeletedThenKPIEventSubmitted() throws Exception { + subscriptionService.deleteSubscription("my_subscription_id1"); + + checkKPIEventSubmitted(nakadiKpiPublisher, SUBSCRIPTION_LOG_ET, + new JSONObject() + .put("subscription_id", "my_subscription_id1") + .put("status", "deleted")); + } + +} diff --git a/src/test/java/org/zalando/nakadi/utils/TestUtils.java b/src/test/java/org/zalando/nakadi/utils/TestUtils.java index 2bea9f2240..f84e906f96 100644 --- a/src/test/java/org/zalando/nakadi/utils/TestUtils.java +++ b/src/test/java/org/zalando/nakadi/utils/TestUtils.java @@ -8,6 +8,7 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.json.JSONObject; +import org.mockito.ArgumentCaptor; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.test.web.servlet.MockMvc; @@ -23,6 +24,7 @@ import org.zalando.nakadi.plugin.api.authz.AuthorizationService; import org.zalando.nakadi.plugin.api.authz.Resource; import org.zalando.nakadi.problem.ValidationProblem; +import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.problem.Problem; import java.io.IOException; @@ -31,12 +33,18 @@ import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.function.Supplier; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.springframework.test.web.servlet.setup.MockMvcBuilders.standaloneSetup; import static org.zalando.nakadi.utils.RandomSubscriptionBuilder.builder; @@ -224,4 +232,12 @@ public static Timeline buildTimelineWithTopic(final String topic) { public static String toTimelineOffset(final long offset) { return String.format("001-0001-%018d", offset); } + + @SuppressWarnings("unchecked") + public static void checkKPIEventSubmitted(final NakadiKpiPublisher nakadiKpiPublisher, final String eventType, + final JSONObject expectedEvent) { + final ArgumentCaptor supplierCaptor = ArgumentCaptor.forClass(Supplier.class); + verify(nakadiKpiPublisher, times(1)).publish(eq(eventType), supplierCaptor.capture()); + assertThat(expectedEvent.similar(supplierCaptor.getValue().get()), is(true)); + } }