diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/AuditLogAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/AuditLogAT.java new file mode 100644 index 0000000000..35aac75ed3 --- /dev/null +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/AuditLogAT.java @@ -0,0 +1,52 @@ +package org.zalando.nakadi.webservice; + +import org.apache.http.HttpStatus; +import org.junit.Test; +import org.zalando.nakadi.domain.Subscription; +import org.zalando.nakadi.domain.SubscriptionBase; +import org.zalando.nakadi.utils.RandomSubscriptionBuilder; +import org.zalando.nakadi.webservice.utils.TestStreamingClient; + +import java.io.IOException; + +import static com.jayway.restassured.RestAssured.given; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.END; +import static org.zalando.nakadi.utils.TestUtils.waitFor; +import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createEventType; +import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription; + +public class AuditLogAT extends BaseAT { + + private static final String AUDIT_LOG_ET = "nakadi.audit.log"; + + @Test + public void testAuditLogEventTypeIsCreated() { + given() + .get("/event-types/" + AUDIT_LOG_ET) + .then() + .statusCode(HttpStatus.SC_OK) + .body("name", equalTo("nakadi.audit.log")) + .body("owning_application", equalTo("stups_nakadi")); + } + + @Test(timeout = 10000L) + public void testAuditLogEventIsSent() throws IOException, InterruptedException { + // subscribe to audit log + final SubscriptionBase subscriptionBase = RandomSubscriptionBuilder.builder() + .withEventType(AUDIT_LOG_ET) + .withStartFrom(END) + .buildSubscriptionBase(); + final Subscription subscription = createSubscription(subscriptionBase); + final TestStreamingClient client = new TestStreamingClient(URL, subscription.getId(), "batch_limit=1"); + client.start(); + + // create event type + createEventType(); + + // expect event to be posted to audit log as a reaction to event type creation + waitFor(() -> assertThat(client.getBatches().size(), greaterThan(0))); + } +} diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java index 59e1a3b33a..3ab86e40cb 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java @@ -18,6 +18,7 @@ import org.zalando.nakadi.service.AuthorizationValidator; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorsService; +import org.zalando.nakadi.service.NakadiAuditLogPublisher; import org.zalando.nakadi.service.SubscriptionCache; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.zk.NewZkSubscriptionClient; @@ -112,7 +113,8 @@ public void before() throws Exception { uuidGenerator = mock(UUIDGenerator.class); when(uuidGenerator.isUUID(any())).thenReturn(true); cursorsService = new CursorsService(subscriptionRepo, subscriptionCache, null, mock(NakadiSettings.class), - zkSubscriptionFactory, cursorConverter, uuidGenerator, null, mock(AuthorizationValidator.class)); + zkSubscriptionFactory, cursorConverter, uuidGenerator, null, mock(AuthorizationValidator.class), + mock(NakadiAuditLogPublisher.class)); // Register cursors in converter registerNakadiCursor(NakadiCursor.of(buildTimeline(etName, topic, CREATED_AT), P1, NEW_OFFSET)); diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java index f2655ca5d4..5d60a3ddb6 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java @@ -27,7 +27,7 @@ public class SettingsControllerAT extends BaseAT { private static final CuratorFramework CURATOR = ZookeeperTestUtils.createCurator(ZOOKEEPER_URL); @Test - public void testBlockFlooder() throws Exception { + public void testBlacklistConsumerByEventType() throws Exception { final EventType eventType = NakadiTestUtils.createEventType(); blacklist(eventType.getName(), BlacklistService.Type.CONSUMER_ET); Assert.assertNotNull(CURATOR.checkExists() @@ -35,7 +35,7 @@ public void testBlockFlooder() throws Exception { } @Test - public void testUnblockFlooder() throws Exception { + public void testWhitelistConsumerByEventType() throws Exception { final EventType eventType = NakadiTestUtils.createEventType(); blacklist(eventType.getName(), BlacklistService.Type.CONSUMER_ET); @@ -46,7 +46,7 @@ public void testUnblockFlooder() throws Exception { } @Test - public void testGetFlooders() throws Exception { + public void testGetBlacklist() throws Exception { final EventType eventType = NakadiTestUtils.createEventType(); blacklist(eventType.getName(), BlacklistService.Type.CONSUMER_ET); TestUtils.waitFor( diff --git a/src/main/java/org/zalando/nakadi/config/RepositoriesConfig.java b/src/main/java/org/zalando/nakadi/config/RepositoriesConfig.java index e3a4c03559..31106468e2 100644 --- a/src/main/java/org/zalando/nakadi/config/RepositoriesConfig.java +++ b/src/main/java/org/zalando/nakadi/config/RepositoriesConfig.java @@ -15,6 +15,8 @@ import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.repository.zookeeper.ZookeeperConfig; import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.FeatureToggleService.Feature; +import org.zalando.nakadi.service.FeatureToggleService.FeatureWrapper; import org.zalando.nakadi.service.FeatureToggleServiceZk; import org.zalando.nakadi.service.timeline.TimelineSync; import org.zalando.nakadi.validation.EventBodyMustRespectSchema; @@ -22,6 +24,7 @@ import org.zalando.nakadi.validation.JsonSchemaEnrichment; import org.zalando.nakadi.validation.ValidationStrategy; +import java.util.Optional; import java.util.Set; @Configuration @@ -34,16 +37,16 @@ public class RepositoriesConfig { @Profile({"acceptanceTest", "local"}) @Bean - public FeatureToggleService featureToggleServiceLocal( - final ZooKeeperHolder zooKeeperHolder, final FeaturesConfig featuresConfig) { + public FeatureToggleService featureToggleServiceLocal(final ZooKeeperHolder zooKeeperHolder, + final FeaturesConfig featuresConfig) { final FeatureToggleService featureToggleService = new FeatureToggleServiceZk(zooKeeperHolder); if (featuresConfig.containsDefaults()) { final Set features = featuresConfig.getFeaturesWithDefaultState(); - for (final String feature: features) { - LOG.info("Setting feature {} to {}", feature, featuresConfig.getDefaultState(feature)); - featureToggleService.setFeature( - new FeatureToggleService.FeatureWrapper(FeatureToggleService.Feature.valueOf(feature), - featuresConfig.getDefaultState(feature))); + for (final String featureStr : features) { + final boolean defaultState = featuresConfig.getDefaultState(featureStr); + LOG.info("Setting feature {} to {}", featureStr, defaultState); + final FeatureWrapper featureWrapper = new FeatureWrapper(Feature.valueOf(featureStr), defaultState); + featureToggleService.setFeature(featureWrapper, Optional.empty()); } } return featureToggleService; diff --git a/src/main/java/org/zalando/nakadi/controller/CursorsController.java b/src/main/java/org/zalando/nakadi/controller/CursorsController.java index a1c76b1b81..f4a68ff86f 100644 --- a/src/main/java/org/zalando/nakadi/controller/CursorsController.java +++ b/src/main/java/org/zalando/nakadi/controller/CursorsController.java @@ -10,6 +10,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.NativeWebRequest; import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.exceptions.runtime.CursorsAreEmptyException; @@ -35,6 +36,7 @@ import static org.springframework.http.ResponseEntity.noContent; import static org.springframework.http.ResponseEntity.ok; +import static org.zalando.nakadi.util.RequestUtils.getUser; @RestController public class CursorsController { @@ -96,9 +98,10 @@ public ResponseEntity commitCursors(@PathVariable("subscriptionId") final Str @RequestMapping(value = "/subscriptions/{subscriptionId}/cursors", method = RequestMethod.PATCH) public ResponseEntity resetCursors( @PathVariable("subscriptionId") final String subscriptionId, - @Valid @RequestBody final ItemsWrapper cursors) + @Valid @RequestBody final ItemsWrapper cursors, + final NativeWebRequest request) throws NoSuchEventTypeException, InvalidCursorException, InternalNakadiException { - cursorsService.resetCursors(subscriptionId, convertToNakadiCursors(cursors)); + cursorsService.resetCursors(subscriptionId, convertToNakadiCursors(cursors), getUser(request)); return noContent().build(); } diff --git a/src/main/java/org/zalando/nakadi/controller/EventTypeController.java b/src/main/java/org/zalando/nakadi/controller/EventTypeController.java index 9a09e6abce..75eb670c6f 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventTypeController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventTypeController.java @@ -45,6 +45,7 @@ import static org.springframework.http.ResponseEntity.status; import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_EVENT_TYPE_CREATION; import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_EVENT_TYPE_DELETION; +import static org.zalando.nakadi.util.RequestUtils.getUser; @RestController @RequestMapping(value = "/event-types") @@ -88,13 +89,13 @@ public ResponseEntity create(@Valid @RequestBody final EventTypeBase eventTyp throw new ValidationException(errors); } - eventTypeService.create(eventType); + eventTypeService.create(eventType, getUser(request)); return ResponseEntity.status(HttpStatus.CREATED).headers(generateWarningHeaders(eventType)).build(); } @RequestMapping(value = "/{name:.+}", method = RequestMethod.DELETE) - public ResponseEntity delete(@PathVariable("name") final String eventTypeName) + public ResponseEntity delete(@PathVariable("name") final String eventTypeName, final NativeWebRequest request) throws EventTypeDeletionException, AccessDeniedException, NoSuchEventTypeException, @@ -105,7 +106,7 @@ public ResponseEntity delete(@PathVariable("name") final String eventTypeName throw new ForbiddenOperationException("Event Type deletion is disabled"); } - eventTypeService.delete(eventTypeName); + eventTypeService.delete(eventTypeName, getUser(request)); return status(HttpStatus.OK).build(); } @@ -127,7 +128,7 @@ public ResponseEntity update( throw new ValidationException(errors); } - eventTypeService.update(name, eventType); + eventTypeService.update(name, eventType, getUser(request)); return status(HttpStatus.OK).headers(generateWarningHeaders(eventType)).build(); } diff --git a/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java b/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java index baf663e3f9..c8fcbd3f05 100644 --- a/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java +++ b/src/main/java/org/zalando/nakadi/controller/PostSubscriptionController.java @@ -29,6 +29,7 @@ import static org.apache.http.HttpHeaders.CONTENT_LOCATION; import static org.springframework.http.HttpStatus.OK; import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_SUBSCRIPTION_CREATION; +import static org.zalando.nakadi.util.RequestUtils.getUser; @RestController @@ -63,7 +64,8 @@ public ResponseEntity createOrGetSubscription(@Valid @RequestBody final Subsc throw new SubscriptionCreationDisabledException("Subscription creation is temporarily unavailable"); } try { - final Subscription subscription = subscriptionService.createSubscription(subscriptionBase); + final Subscription subscription = subscriptionService.createSubscription(subscriptionBase, + getUser(request)); return prepareLocationResponse(subscription); } catch (final DuplicatedSubscriptionException ex) { throw new InconsistentStateException("Unexpected problem occurred when creating subscription", ex); @@ -83,7 +85,7 @@ public ResponseEntity updateSubscription( if (errors.hasErrors()) { throw new ValidationException(errors); } - subscriptionService.updateSubscription(subscriptionId, subscription); + subscriptionService.updateSubscription(subscriptionId, subscription, getUser(request)); return ResponseEntity.noContent().build(); } diff --git a/src/main/java/org/zalando/nakadi/controller/SettingsController.java b/src/main/java/org/zalando/nakadi/controller/SettingsController.java index 79db6508dd..5563c103c4 100644 --- a/src/main/java/org/zalando/nakadi/controller/SettingsController.java +++ b/src/main/java/org/zalando/nakadi/controller/SettingsController.java @@ -8,6 +8,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.NativeWebRequest; import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.ResourceAuthorization; import org.zalando.nakadi.exceptions.runtime.ForbiddenOperationException; @@ -20,6 +21,7 @@ import javax.validation.Valid; import static org.zalando.nakadi.domain.ResourceImpl.ADMIN_RESOURCE; +import static org.zalando.nakadi.util.RequestUtils.getUser; @RestController @RequestMapping(value = "/settings") @@ -29,7 +31,6 @@ public class SettingsController { private final FeatureToggleService featureToggleService; private final AdminService adminService; - @Autowired public SettingsController(final BlacklistService blacklistService, final FeatureToggleService featureToggleService, @@ -49,23 +50,25 @@ public ResponseEntity getBlacklist() throws ForbiddenOperationException { @RequestMapping(value = "/blacklist/{blacklist_type}/{name}", method = RequestMethod.PUT) public ResponseEntity blacklist(@PathVariable("blacklist_type") final BlacklistService.Type blacklistType, - @PathVariable("name") final String name) + @PathVariable("name") final String name, + final NativeWebRequest request) throws ForbiddenOperationException { if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) { throw new ForbiddenOperationException("Admin privileges are required to perform this operation"); } - blacklistService.blacklist(name, blacklistType); + blacklistService.blacklist(name, blacklistType, getUser(request)); return ResponseEntity.noContent().build(); } @RequestMapping(value = "/blacklist/{blacklist_type}/{name}", method = RequestMethod.DELETE) public ResponseEntity whitelist(@PathVariable("blacklist_type") final BlacklistService.Type blacklistType, - @PathVariable("name") final String name) + @PathVariable("name") final String name, + final NativeWebRequest request) throws ForbiddenOperationException { if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) { throw new ForbiddenOperationException("Admin privileges are required to perform this operation"); } - blacklistService.whitelist(name, blacklistType); + blacklistService.whitelist(name, blacklistType, getUser(request)); return ResponseEntity.noContent().build(); } @@ -79,12 +82,13 @@ public ResponseEntity getFeatures() } @RequestMapping(path = "/features", method = RequestMethod.POST) - public ResponseEntity setFeature(@RequestBody final FeatureToggleService.FeatureWrapper featureWrapper) + public ResponseEntity setFeature(@RequestBody final FeatureToggleService.FeatureWrapper featureWrapper, + final NativeWebRequest request) throws ForbiddenOperationException { if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) { throw new ForbiddenOperationException("Admin privileges are required to perform this operation"); } - featureToggleService.setFeature(featureWrapper); + featureToggleService.setFeature(featureWrapper, getUser(request)); return ResponseEntity.noContent().build(); } @@ -98,7 +102,8 @@ public ResponseEntity getAdmins() throws ForbiddenOperationException { @RequestMapping(path = "/admins", method = RequestMethod.POST) public ResponseEntity updateAdmins(@Valid @RequestBody final ResourceAuthorization authz, - final Errors errors) + final Errors errors, + final NativeWebRequest request) throws ValidationException, ForbiddenOperationException { if (!adminService.isAdmin(AuthorizationService.Operation.ADMIN)) { throw new ForbiddenOperationException("Admin privileges are required to perform this operation"); @@ -106,7 +111,7 @@ public ResponseEntity updateAdmins(@Valid @RequestBody final ResourceAuthoriz if (errors.hasErrors()) { throw new ValidationException(errors); } - adminService.updateAdmins(authz.toPermissionsList(ADMIN_RESOURCE)); + adminService.updateAdmins(authz.toPermissionsList(ADMIN_RESOURCE), getUser(request)); return ResponseEntity.ok().build(); } } diff --git a/src/main/java/org/zalando/nakadi/controller/StoragesController.java b/src/main/java/org/zalando/nakadi/controller/StoragesController.java index d943e2fd4f..d639b0b3a9 100644 --- a/src/main/java/org/zalando/nakadi/controller/StoragesController.java +++ b/src/main/java/org/zalando/nakadi/controller/StoragesController.java @@ -28,6 +28,7 @@ import static org.springframework.http.HttpStatus.NO_CONTENT; import static org.springframework.http.HttpStatus.OK; import static org.springframework.http.ResponseEntity.status; +import static org.zalando.nakadi.util.RequestUtils.getUser; @RestController public class StoragesController { @@ -60,7 +61,7 @@ public ResponseEntity createStorage(@RequestBody final String storage, if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) { throw new ForbiddenOperationException("Admin privileges required to perform this operation"); } - storageService.createStorage(new JSONObject(storage)); + storageService.createStorage(new JSONObject(storage), getUser(request)); return status(CREATED).build(); } @@ -81,7 +82,7 @@ public ResponseEntity deleteStorage(@PathVariable("id") final String id, fina if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) { throw new ForbiddenOperationException("Admin privileges required to perform this operation"); } - storageService.deleteStorage(id); + storageService.deleteStorage(id, getUser(request)); return status(NO_CONTENT).build(); } diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java index 73242840a2..8d0fedab33 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java @@ -26,6 +26,7 @@ import static org.springframework.http.HttpStatus.NO_CONTENT; import static org.springframework.http.HttpStatus.OK; import static org.springframework.http.ResponseEntity.status; +import static org.zalando.nakadi.util.RequestUtils.getUser; @RestController @@ -65,7 +66,7 @@ public ResponseEntity deleteSubscription(@PathVariable("id") final String sub final NativeWebRequest request) throws DbWriteOperationsBlockedException, NoSuchSubscriptionException, NoSuchEventTypeException, ServiceTemporarilyUnavailableException, InternalNakadiException { - subscriptionService.deleteSubscription(subscriptionId); + subscriptionService.deleteSubscription(subscriptionId, getUser(request)); return status(NO_CONTENT).build(); } diff --git a/src/main/java/org/zalando/nakadi/controller/TimelinesController.java b/src/main/java/org/zalando/nakadi/controller/TimelinesController.java index 179ca52561..afb2dc4138 100644 --- a/src/main/java/org/zalando/nakadi/controller/TimelinesController.java +++ b/src/main/java/org/zalando/nakadi/controller/TimelinesController.java @@ -8,6 +8,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.NativeWebRequest; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; import org.zalando.nakadi.exceptions.runtime.RepositoryProblemException; @@ -20,6 +21,7 @@ import java.util.stream.Collectors; import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; +import static org.zalando.nakadi.util.RequestUtils.getUser; @RestController @RequestMapping(value = "/event-types/{name}/timelines", produces = APPLICATION_JSON_VALUE) @@ -34,10 +36,11 @@ public TimelinesController(final TimelineService timelineService) { @RequestMapping(method = RequestMethod.POST) public ResponseEntity createTimeline(@PathVariable("name") final String eventTypeName, - @RequestBody final TimelineRequest timelineRequest) + @RequestBody final TimelineRequest timelineRequest, + final NativeWebRequest request) throws AccessDeniedException, TimelineException, TopicRepositoryException, InconsistentStateException, RepositoryProblemException { - timelineService.createTimeline(eventTypeName, timelineRequest.getStorageId()); + timelineService.createTimeline(eventTypeName, timelineRequest.getStorageId(), getUser(request)); return ResponseEntity.status(HttpStatus.CREATED).build(); } diff --git a/src/main/java/org/zalando/nakadi/domain/EventType.java b/src/main/java/org/zalando/nakadi/domain/EventType.java index c41b2430a2..6f2f18830d 100644 --- a/src/main/java/org/zalando/nakadi/domain/EventType.java +++ b/src/main/java/org/zalando/nakadi/domain/EventType.java @@ -49,6 +49,6 @@ public void setCreatedAt(final DateTime createdAt) { } public Resource asResource() { - return new ResourceImpl(getName(), ResourceImpl.EVENT_TYPE_RESOURCE, getAuthorization(), this); + return new ResourceImpl<>(getName(), ResourceImpl.EVENT_TYPE_RESOURCE, getAuthorization(), this); } } diff --git a/src/main/java/org/zalando/nakadi/domain/ResourceImpl.java b/src/main/java/org/zalando/nakadi/domain/ResourceImpl.java index a2e0cdef80..2fdf00c2b3 100644 --- a/src/main/java/org/zalando/nakadi/domain/ResourceImpl.java +++ b/src/main/java/org/zalando/nakadi/domain/ResourceImpl.java @@ -7,7 +7,7 @@ import java.util.List; import java.util.Optional; -public class ResourceImpl implements Resource { +public class ResourceImpl implements Resource { public static final String ALL_DATA_ACCESS_RESOURCE = "all-data-access"; public static final String ADMIN_RESOURCE = "nakadi"; diff --git a/src/main/java/org/zalando/nakadi/domain/Subscription.java b/src/main/java/org/zalando/nakadi/domain/Subscription.java index 20ac458903..69bdf7443d 100644 --- a/src/main/java/org/zalando/nakadi/domain/Subscription.java +++ b/src/main/java/org/zalando/nakadi/domain/Subscription.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.zalando.nakadi.plugin.api.authz.Resource; import javax.annotation.Nullable; @@ -56,7 +57,7 @@ public void setCreatedAt(final DateTime createdAt) { } public Resource asResource() { - return new ResourceImpl(id, ResourceImpl.SUBSCRIPTION_RESOURCE, getAuthorization(), this); + return new ResourceImpl<>(id, ResourceImpl.SUBSCRIPTION_RESOURCE, getAuthorization(), this); } @Nullable @@ -80,6 +81,12 @@ public boolean equals(final Object o) { return super.equals(that) && Objects.equals(id, that.id) && Objects.equals(createdAt, that.createdAt); } + public Subscription mergeFrom(final SubscriptionBase newValue) { + final Subscription subscription = new Subscription(id, createdAt, new DateTime(DateTimeZone.UTC), this); + subscription.setAuthorization(newValue.getAuthorization()); + return subscription; + } + @Override public int hashCode() { int result = super.hashCode(); diff --git a/src/main/java/org/zalando/nakadi/domain/SubscriptionBase.java b/src/main/java/org/zalando/nakadi/domain/SubscriptionBase.java index b0e93666a3..30498dd880 100644 --- a/src/main/java/org/zalando/nakadi/domain/SubscriptionBase.java +++ b/src/main/java/org/zalando/nakadi/domain/SubscriptionBase.java @@ -105,10 +105,6 @@ public void setInitialCursors(@Nullable final List messageDigestThreadLocal; + + @Autowired + public UsernameHasher(@Value("${nakadi.hasher.salt}") final String salt) { + this.salt = salt.getBytes(Charsets.UTF_8); + this.messageDigestThreadLocal = ThreadLocal.withInitial(DigestUtils::getSha256Digest); + } + + public String hash(final String value) { + final MessageDigest messageDigest = messageDigestThreadLocal.get(); + messageDigest.reset(); + messageDigest.update(salt); + messageDigest.update(value.getBytes(Charsets.UTF_8)); + return Hex.encodeHexString(messageDigest.digest()); + } +} diff --git a/src/main/java/org/zalando/nakadi/service/AdminService.java b/src/main/java/org/zalando/nakadi/service/AdminService.java index bc716fc064..ef160f1aac 100644 --- a/src/main/java/org/zalando/nakadi/service/AdminService.java +++ b/src/main/java/org/zalando/nakadi/service/AdminService.java @@ -5,6 +5,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.Permission; @@ -18,6 +19,7 @@ import org.zalando.nakadi.repository.db.AuthorizationDbRepository; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; @@ -36,24 +38,28 @@ public class AdminService { private final FeatureToggleService featureToggleService; private final NakadiSettings nakadiSettings; private Cache> resourceCache; + private final NakadiAuditLogPublisher auditLogPublisher; @Autowired public AdminService(final AuthorizationDbRepository authorizationDbRepository, final AuthorizationService authorizationService, final FeatureToggleService featureToggleService, - final NakadiSettings nakadiSettings) { + final NakadiSettings nakadiSettings, + @Lazy final NakadiAuditLogPublisher auditLogPublisher) { this.authorizationDbRepository = authorizationDbRepository; this.authorizationService = authorizationService; this.featureToggleService = featureToggleService; this.nakadiSettings = nakadiSettings; this.resourceCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build(); + this.auditLogPublisher = auditLogPublisher; } public List getAdmins() { return addDefaultAdmin(authorizationDbRepository.listAdmins()); } - public void updateAdmins(final List newAdmins) throws DbWriteOperationsBlockedException { + public void updateAdmins(final List newAdmins, final Optional user) + throws DbWriteOperationsBlockedException { if (featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_DB_WRITE_OPERATIONS)) { throw new DbWriteOperationsBlockedException("Cannot update admins: write operations on DB " + "are blocked by feature flag."); @@ -65,11 +71,19 @@ public void updateAdmins(final List newAdmins) throws DbWriteOperati final List delete = removeDefaultAdmin(currentAdmins.stream() .filter(p -> !newAdmins.stream().anyMatch(Predicate.isEqual(p))).collect(Collectors.toList())); authorizationDbRepository.update(add, delete); + + auditLogPublisher.publish( + Optional.of(ResourceAuthorization.fromPermissionsList(currentAdmins)), + Optional.of(ResourceAuthorization.fromPermissionsList(newAdmins)), + NakadiAuditLogPublisher.ResourceType.ADMINS, + NakadiAuditLogPublisher.ActionType.UPDATED, + "-", + user); } public boolean isAdmin(final AuthorizationService.Operation operation) throws PluginException { final List permissions = getAdmins(); - final Resource resource = new ResourceImpl(ADMIN_RESOURCE, ADMIN_RESOURCE, + final Resource resource = new ResourceImpl<>(ADMIN_RESOURCE, ADMIN_RESOURCE, ResourceAuthorization.fromPermissionsList(permissions), null); return authorizationService.isAuthorized(operation, resource); } @@ -77,8 +91,8 @@ public boolean isAdmin(final AuthorizationService.Operation operation) throws Pl public boolean hasAllDataAccess(final AuthorizationService.Operation operation) throws PluginException { try { final List permissions = resourceCache.get(ALL_DATA_ACCESS_RESOURCE, - () -> authorizationDbRepository.listAllDataAccess()); - final Resource resource = new ResourceImpl(ALL_DATA_ACCESS_RESOURCE, + authorizationDbRepository::listAllDataAccess); + final Resource resource = new ResourceImpl<>(ALL_DATA_ACCESS_RESOURCE, ALL_DATA_ACCESS_RESOURCE, ResourceAuthorization.fromPermissionsList(permissions), null); return authorizationService.isAuthorized(operation, resource); diff --git a/src/main/java/org/zalando/nakadi/service/BlacklistService.java b/src/main/java/org/zalando/nakadi/service/BlacklistService.java index 9d06d32fca..d7f4638fac 100644 --- a/src/main/java/org/zalando/nakadi/service/BlacklistService.java +++ b/src/main/java/org/zalando/nakadi/service/BlacklistService.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Set; @Component @@ -28,13 +29,16 @@ public class BlacklistService { private final SubscriptionDbRepository subscriptionDbRepository; private final ZooKeeperHolder zooKeeperHolder; + private final NakadiAuditLogPublisher auditLogPublisher; private TreeCache blacklistCache; @Autowired public BlacklistService(final SubscriptionDbRepository subscriptionDbRepository, - final ZooKeeperHolder zooKeeperHolder) { + final ZooKeeperHolder zooKeeperHolder, + final NakadiAuditLogPublisher auditLogPublisher) { this.zooKeeperHolder = zooKeeperHolder; this.subscriptionDbRepository = subscriptionDbRepository; + this.auditLogPublisher = auditLogPublisher; } @PostConstruct @@ -87,14 +91,14 @@ public boolean isSubscriptionConsumptionBlocked(final String subscriptionId, fin } public boolean isSubscriptionConsumptionBlocked(final Collection etNames, final String appId) { - return etNames.stream() - .map(etName -> isBlocked(Type.CONSUMER_ET, etName)).findFirst().orElse(false) || - isBlocked(Type.CONSUMER_APP, appId); + return etNames.stream() + .map(etName -> isBlocked(Type.CONSUMER_ET, etName)).findFirst().orElse(false) || + isBlocked(Type.CONSUMER_APP, appId); } - public Map getBlacklist() { + public Map>> getBlacklist() { return ImmutableMap.of( - "consumers", ImmutableMap.of( + "consumers", ImmutableMap.of( "event_types", getChildren(Type.CONSUMER_ET), "apps", getChildren(Type.CONSUMER_APP)), "producers", ImmutableMap.of( @@ -102,24 +106,50 @@ public Map getBlacklist() { "apps", getChildren(Type.PRODUCER_APP))); } - public void blacklist(final String name, final Type type) throws RuntimeException { + public void blacklist(final String name, final Type type, final Optional user) throws RuntimeException { try { + final boolean oldValue = isBlocked(type, name); + final CuratorFramework curator = zooKeeperHolder.get(); - final String path = createFlooderPath(name, type); + final String path = createBlacklistEntryPath(name, type); if (curator.checkExists().forPath(path) == null) { curator.create().creatingParentsIfNeeded().forPath(path); } + + final BlacklistEntry newEntry = new BlacklistEntry(type, name); + BlacklistEntry oldEntry = null; + NakadiAuditLogPublisher.ActionType actionType = NakadiAuditLogPublisher.ActionType.CREATED; + if (oldValue) { + oldEntry = newEntry; + actionType = NakadiAuditLogPublisher.ActionType.UPDATED; + } + auditLogPublisher.publish( + Optional.ofNullable(oldEntry), + Optional.of(newEntry), + NakadiAuditLogPublisher.ResourceType.BLACKLIST_ENTRY, + actionType, + newEntry.getId(), + user); } catch (final Exception e) { throw new RuntimeException("Issue occurred while creating node in zk", e); } } - public void whitelist(final String name, final Type type) throws RuntimeException { + public void whitelist(final String name, final Type type, final Optional user) throws RuntimeException { try { final CuratorFramework curator = zooKeeperHolder.get(); - final String path = createFlooderPath(name, type); + final String path = createBlacklistEntryPath(name, type); if (curator.checkExists().forPath(path) != null) { curator.delete().forPath(path); + + final BlacklistEntry entry = new BlacklistEntry(type, name); + auditLogPublisher.publish( + Optional.of(entry), + Optional.empty(), + NakadiAuditLogPublisher.ResourceType.BLACKLIST_ENTRY, + NakadiAuditLogPublisher.ActionType.DELETED, + entry.getId(), + user); } } catch (final Exception e) { throw new RuntimeException("Issue occurred while deleting node from zk", e); @@ -131,7 +161,7 @@ private Set getChildren(final Type type) { return currentChildren == null ? Collections.emptySet() : currentChildren.keySet(); } - private String createFlooderPath(final String name, final Type type) { + private String createBlacklistEntryPath(final String name, final Type type) { return type.getZkPath() + "/" + name; } @@ -152,4 +182,26 @@ public String getZkPath() { } } + public static class BlacklistEntry { + private Type type; + private String name; + + public BlacklistEntry(final Type type, final String name) { + this.type = type; + this.name = name; + } + + public Type getType() { + return type; + } + + public String getName() { + return name; + } + + public String getId() { + return String.format("%s:%s", type, name); + } + } + } diff --git a/src/main/java/org/zalando/nakadi/service/CursorsService.java b/src/main/java/org/zalando/nakadi/service/CursorsService.java index 1974107dbf..6b63fcf8e2 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorsService.java @@ -6,6 +6,7 @@ import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.CursorError; import org.zalando.nakadi.domain.EventTypePartition; +import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; @@ -36,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -53,6 +55,7 @@ public class CursorsService { private final AuthorizationValidator authorizationValidator; private final SubscriptionDbRepository subscriptionRepository; private final SubscriptionCache subscriptionCache; + private final NakadiAuditLogPublisher auditLogPublisher; @Autowired public CursorsService(final SubscriptionDbRepository subscriptionRepository, @@ -63,7 +66,8 @@ public CursorsService(final SubscriptionDbRepository subscriptionRepository, final CursorConverter cursorConverter, final UUIDGenerator uuidGenerator, final TimelineService timelineService, - final AuthorizationValidator authorizationValidator) { + final AuthorizationValidator authorizationValidator, + final NakadiAuditLogPublisher auditLogPublisher) { this.eventTypeCache = eventTypeCache; this.nakadiSettings = nakadiSettings; this.zkSubscriptionFactory = zkSubscriptionFactory; @@ -73,6 +77,7 @@ public CursorsService(final SubscriptionDbRepository subscriptionRepository, this.authorizationValidator = authorizationValidator; this.subscriptionRepository = subscriptionRepository; this.subscriptionCache = subscriptionCache; + this.auditLogPublisher = auditLogPublisher; } /** @@ -159,7 +164,7 @@ public List getSubscriptionCursors(final String return cursorsListBuilder.build(); } - public void resetCursors(final String subscriptionId, final List cursors) + public void resetCursors(final String subscriptionId, final List cursors, final Optional user) throws ServiceTemporarilyUnavailableException, NoSuchSubscriptionException, UnableProcessException, OperationTimeoutException, ZookeeperException, InternalNakadiException, NoSuchEventTypeException, InvalidCursorException { @@ -187,11 +192,23 @@ public void resetCursors(final String subscriptionId, final List c zkClient, subscription, timelineService, cursorConverter)); // add 1 second to commit timeout in order to give time to finish reset if there is uncommitted events if (!cursors.isEmpty()) { + final List oldCursors = getSubscriptionCursors(subscriptionId); + final long timeout = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout()) + TimeUnit.SECONDS.toMillis(1); - zkClient.resetCursors( - cursors.stream().map(cursorConverter::convertToNoToken).collect(Collectors.toList()), - timeout); + final List newCursors = cursors.stream() + .map(cursorConverter::convertToNoToken) + .collect(Collectors.toList()); + + zkClient.resetCursors(newCursors, timeout); + + auditLogPublisher.publish( + Optional.of(new ItemsWrapper<>(oldCursors)), + Optional.of(new ItemsWrapper<>(newCursors)), + NakadiAuditLogPublisher.ResourceType.CURSORS, + NakadiAuditLogPublisher.ActionType.UPDATED, + subscriptionId, + user); } } diff --git a/src/main/java/org/zalando/nakadi/service/EventTypeService.java b/src/main/java/org/zalando/nakadi/service/EventTypeService.java index 0692a6b60d..30278450eb 100644 --- a/src/main/java/org/zalando/nakadi/service/EventTypeService.java +++ b/src/main/java/org/zalando/nakadi/service/EventTypeService.java @@ -90,6 +90,7 @@ public class EventTypeService { private final TransactionTemplate transactionTemplate; private final NakadiKpiPublisher nakadiKpiPublisher; private final String etLogEventType; + private final NakadiAuditLogPublisher nakadiAuditLogPublisher; private final EventTypeOptionsValidator eventTypeOptionsValidator; private final AdminService adminService; @@ -108,6 +109,7 @@ public EventTypeService(final EventTypeRepository eventTypeRepository, final NakadiSettings nakadiSettings, final NakadiKpiPublisher nakadiKpiPublisher, @Value("${nakadi.kpi.event-types.nakadiEventTypeLog}") final String etLogEventType, + final NakadiAuditLogPublisher nakadiAuditLogPublisher, final EventTypeOptionsValidator eventTypeOptionsValidator, final AdminService adminService) { this.eventTypeRepository = eventTypeRepository; @@ -124,6 +126,7 @@ public EventTypeService(final EventTypeRepository eventTypeRepository, this.nakadiSettings = nakadiSettings; this.nakadiKpiPublisher = nakadiKpiPublisher; this.etLogEventType = etLogEventType; + this.nakadiAuditLogPublisher = nakadiAuditLogPublisher; this.eventTypeOptionsValidator = eventTypeOptionsValidator; this.adminService = adminService; } @@ -132,7 +135,7 @@ public List list() { return eventTypeRepository.list(); } - public void create(final EventTypeBase eventType) + public void create(final EventTypeBase eventType, final Optional user) throws TopicCreationException, InternalNakadiException, NoSuchPartitionStrategyException, @@ -176,6 +179,10 @@ public void create(final EventTypeBase eventType) .put("category", eventType.getCategory()) .put("authz", identifyAuthzState(eventType)) .put("compatibility_mode", eventType.getCompatibilityMode())); + + nakadiAuditLogPublisher.publish(Optional.empty(), Optional.of(eventType), + NakadiAuditLogPublisher.ResourceType.EVENT_TYPE, NakadiAuditLogPublisher.ActionType.CREATED, + eventType.getName(), user); } private void validateCompaction(final EventTypeBase eventType) throws @@ -212,8 +219,8 @@ private void setDefaultEventTypeOptions(final EventTypeBase eventType) { } } - public void delete(final String eventTypeName) throws EventTypeDeletionException, AccessDeniedException, - NoSuchEventTypeException, ConflictException, ServiceTemporarilyUnavailableException, + public void delete(final String eventTypeName, final Optional user) throws EventTypeDeletionException, + AccessDeniedException, NoSuchEventTypeException, ConflictException, ServiceTemporarilyUnavailableException, DbWriteOperationsBlockedException { if (featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_DB_WRITE_OPERATIONS)) { throw new DbWriteOperationsBlockedException("Cannot delete event type: write operations on DB " + @@ -277,6 +284,10 @@ public void delete(final String eventTypeName) throws EventTypeDeletionException .put("category", eventType.getCategory()) .put("authz", identifyAuthzState(eventType)) .put("compatibility_mode", eventType.getCompatibilityMode())); + + nakadiAuditLogPublisher.publish(Optional.of(eventType), Optional.empty(), + NakadiAuditLogPublisher.ResourceType.EVENT_TYPE, NakadiAuditLogPublisher.ActionType.DELETED, + eventType.getName(), user); } private Multimap deleteEventTypeIfNoSubscriptions(final String eventType) { @@ -314,7 +325,8 @@ private boolean hasSubscriptions(final String eventTypeName) { } public void update(final String eventTypeName, - final EventTypeBase eventTypeBase) + final EventTypeBase eventTypeBase, + final Optional user) throws TopicConfigException, InconsistentStateException, NakadiRuntimeException, @@ -327,9 +339,11 @@ public void update(final String eventTypeName, "are blocked by feature flag."); } Closeable updatingCloser = null; + final EventType original; + final EventType eventType; try { updatingCloser = timelineSync.workWithEventType(eventTypeName, nakadiSettings.getTimelineWaitTimeoutMs()); - final EventType original = eventTypeRepository.findByName(eventTypeName); + original = eventTypeRepository.findByName(eventTypeName); if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) { eventTypeOptionsValidator.checkRetentionTime(eventTypeBase.getOptions()); @@ -341,7 +355,7 @@ public void update(final String eventTypeName, validateSchema(eventTypeBase); validateAudience(original, eventTypeBase); partitionResolver.validate(eventTypeBase); - final EventType eventType = schemaEvolutionService.evolve(original, eventTypeBase); + eventType = schemaEvolutionService.evolve(original, eventTypeBase); eventType.setDefaultStatistic( validateStatisticsUpdate(original.getDefaultStatistic(), eventType.getDefaultStatistic())); updateRetentionTime(original, eventType); @@ -371,6 +385,10 @@ public void update(final String eventTypeName, .put("category", eventTypeBase.getCategory()) .put("authz", identifyAuthzState(eventTypeBase)) .put("compatibility_mode", eventTypeBase.getCompatibilityMode())); + + nakadiAuditLogPublisher.publish(Optional.of(original), Optional.of(eventType), + NakadiAuditLogPublisher.ResourceType.EVENT_TYPE, NakadiAuditLogPublisher.ActionType.UPDATED, + eventType.getName(), user); } private void updateRetentionTime(final EventType original, final EventType eventType) { diff --git a/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java b/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java index dd124c1e43..1a86034761 100644 --- a/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java +++ b/src/main/java/org/zalando/nakadi/service/FeatureToggleService.java @@ -5,15 +5,18 @@ import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; @Service public interface FeatureToggleService { - void setFeature(FeatureWrapper feature); + void setFeature(FeatureWrapper feature, Optional user); boolean isFeatureEnabled(Feature feature); + void setAuditLogPublisher(NakadiAuditLogPublisher auditLogPublisher); + default void checkFeatureOn(final Feature feature) { if (!isFeatureEnabled(feature)) { throw new FeatureNotAvailableException("Feature " + feature + " is disabled", feature); @@ -35,6 +38,7 @@ enum Feature { DISABLE_SUBSCRIPTION_CREATION("disable_subscription_creation"), REMOTE_TOKENINFO("remote_tokeninfo"), KPI_COLLECTION("kpi_collection"), + AUDIT_LOG_COLLECTION("audit_log_collection"), DISABLE_DB_WRITE_OPERATIONS("disable_db_write_operations"), DISABLE_LOG_COMPACTION("disable_log_compaction"); diff --git a/src/main/java/org/zalando/nakadi/service/FeatureToggleServiceZk.java b/src/main/java/org/zalando/nakadi/service/FeatureToggleServiceZk.java index 389b74e3e9..53cbcdd3bf 100644 --- a/src/main/java/org/zalando/nakadi/service/FeatureToggleServiceZk.java +++ b/src/main/java/org/zalando/nakadi/service/FeatureToggleServiceZk.java @@ -11,6 +11,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; public class FeatureToggleServiceZk implements FeatureToggleService { @@ -18,6 +19,8 @@ public class FeatureToggleServiceZk implements FeatureToggleService { private static final String PREFIX = "/nakadi/feature_toggle"; private final ZooKeeperHolder zkHolder; + + private NakadiAuditLogPublisher auditLogPublisher; private PathChildrenCache featuresCache; private final Map featurePaths; @@ -30,7 +33,7 @@ public FeatureToggleServiceZk(final ZooKeeperHolder zkHolder) { LOG.error(e.getMessage(), e); } featurePaths = new HashMap<>(Feature.values().length); - for (final Feature feature: Feature.values()) { + for (final Feature feature : Feature.values()) { featurePaths.put(feature, PREFIX + "/" + feature.getId()); } } @@ -53,8 +56,9 @@ public boolean isFeatureEnabled(final Feature feature) { } } - public void setFeature(final FeatureWrapper feature) { + public void setFeature(final FeatureWrapper feature, final Optional user) { try { + final boolean oldState = isFeatureEnabled(feature.getFeature()); final CuratorFramework curator = zkHolder.get(); final String path = getPath(feature.getFeature()); if (feature.isEnabled()) { @@ -64,6 +68,16 @@ public void setFeature(final FeatureWrapper feature) { curator.delete().forPath(path); LOG.debug("Feature {} disabled", feature.getFeature().getId()); } + + if (auditLogPublisher != null) { + auditLogPublisher.publish( + Optional.of(new FeatureToggleService.FeatureWrapper(feature.getFeature(), oldState)), + Optional.of(feature), + NakadiAuditLogPublisher.ResourceType.FEATURE, + NakadiAuditLogPublisher.ActionType.UPDATED, + feature.getFeature().getId(), + user); + } } catch (final KeeperException.NoNodeException nne) { LOG.debug("Feature {} was already disabled", feature.getFeature().getId()); } catch (final KeeperException.NodeExistsException nne) { @@ -76,4 +90,8 @@ public void setFeature(final FeatureWrapper feature) { private String getPath(final Feature feature) { return featurePaths.get(feature); } + + public void setAuditLogPublisher(final NakadiAuditLogPublisher auditLogPublisher) { + this.auditLogPublisher = auditLogPublisher; + } } diff --git a/src/main/java/org/zalando/nakadi/service/NakadiAuditLogInitialization.java b/src/main/java/org/zalando/nakadi/service/NakadiAuditLogInitialization.java new file mode 100644 index 0000000000..a32a0aea90 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/service/NakadiAuditLogInitialization.java @@ -0,0 +1,104 @@ +package org.zalando.nakadi.service; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.io.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; +import org.zalando.nakadi.domain.EventTypeBase; +import org.zalando.nakadi.exceptions.runtime.DuplicatedEventTypeNameException; +import org.zalando.nakadi.exceptions.runtime.NakadiBaseException; + +import java.io.IOException; +import java.util.Optional; + +@Component +@ConfigurationProperties(prefix = "nakadi.audit") +public class NakadiAuditLogInitialization { + private static final Logger LOG = LoggerFactory.getLogger(NakadiAuditLogInitialization.class); + + private final ObjectMapper objectMapper; + private final EventTypeService eventTypeService; + private final FeatureToggleService featureToggleService; + + private String eventType; + private String owningApplication; + private String authDataType; + private String authValue; + + @Autowired + public NakadiAuditLogInitialization(final ObjectMapper objectMapper, final EventTypeService eventTypeService, + final FeatureToggleService featureToggleService) { + this.objectMapper = objectMapper; + this.eventTypeService = eventTypeService; + this.featureToggleService = featureToggleService; + } + + @EventListener + public void onApplicationEvent(final ContextRefreshedEvent event) throws IOException { + if (!featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.AUDIT_LOG_COLLECTION)) { + LOG.debug("Audit log collection is disabled, skip creation of audit log event type"); + return; + } + + LOG.debug("Initializing Audit log event type"); + + String auditEventTypeString = Resources + .toString(Resources.getResource("audit_event_type.json"), Charsets.UTF_8); + + auditEventTypeString = auditEventTypeString.replaceAll("event_type_name_placeholder", eventType); + auditEventTypeString = auditEventTypeString.replaceAll("owning_application_placeholder", owningApplication); + auditEventTypeString = auditEventTypeString.replaceAll("auth_data_type_placeholder", authDataType); + auditEventTypeString = auditEventTypeString.replaceAll("auth_value_placeholder", authValue); + + final TypeReference typeReference = new TypeReference() { + }; + final EventTypeBase eventType = objectMapper.readValue(auditEventTypeString, typeReference); + + try { + eventTypeService.create(eventType, Optional.of(owningApplication)); + } catch (final DuplicatedEventTypeNameException e) { + LOG.debug("Audit event type already exists " + eventType.getName()); + } catch (final NakadiBaseException e) { + LOG.debug("Problem creating audit event type " + eventType.getName(), e); + } + } + + public String getEventType() { + return eventType; + } + + public void setEventType(final String eventType) { + this.eventType = eventType; + } + + public String getOwningApplication() { + return owningApplication; + } + + public void setOwningApplication(final String owningApplication) { + this.owningApplication = owningApplication; + } + + public String getAuthDataType() { + return authDataType; + } + + public void setAuthDataType(final String authDataType) { + this.authDataType = authDataType; + } + + public String getAuthValue() { + return authValue; + } + + public void setAuthValue(final String authValue) { + this.authValue = authValue; + } +} diff --git a/src/main/java/org/zalando/nakadi/service/NakadiAuditLogPublisher.java b/src/main/java/org/zalando/nakadi/service/NakadiAuditLogPublisher.java new file mode 100644 index 0000000000..b045d0a9ed --- /dev/null +++ b/src/main/java/org/zalando/nakadi/service/NakadiAuditLogPublisher.java @@ -0,0 +1,114 @@ +package org.zalando.nakadi.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.zalando.nakadi.security.UsernameHasher; + +import java.util.Optional; + +@Component +public class NakadiAuditLogPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(NakadiAuditLogPublisher.class); + + private final FeatureToggleService featureToggleService; + private final EventsProcessor eventsProcessor; + private final UsernameHasher usernameHasher; + private final String auditEventType; + private final ObjectMapper objectMapper; + + @Autowired + protected NakadiAuditLogPublisher(final FeatureToggleService featureToggleService, + final EventsProcessor eventsProcessor, + final ObjectMapper objectMapper, + final UsernameHasher usernameHasher, + @Value("${nakadi.audit.eventType}") final String auditEventType) { + this.eventsProcessor = eventsProcessor; + this.usernameHasher = usernameHasher; + this.objectMapper = objectMapper; + this.auditEventType = auditEventType; + this.featureToggleService = featureToggleService; + this.featureToggleService.setAuditLogPublisher(this); + } + + public void publish(final Optional previousState, + final Optional newState, + final ResourceType resourceType, + final ActionType actionType, + final String resourceId, + final Optional userOrNone) { + try { + if (!featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.AUDIT_LOG_COLLECTION)) { + return; + } + + final String user = userOrNone.orElse(""); + + final Optional previousEventText = previousState.map(this::serialize); + final Optional previousEventObject = previousEventText.map(JSONObject::new); + + final Optional newEventText = newState.map(this::serialize); + final Optional newEventObject = newEventText.map(JSONObject::new); + + final JSONObject payload = new JSONObject() + .put("previous_object", previousEventObject.orElse(null)) + .put("previous_text", previousEventText.orElse(null)) + .put("new_object", newEventObject.orElse(null)) + .put("new_text", newEventText.orElse(null)) + .put("resource_type", resourceType.name().toLowerCase()) + .put("resource_id", resourceId) + .put("user", user) + .put("user_hash", usernameHasher.hash(user)); + + final JSONObject dataEvent = new JSONObject() + .put("data_type", resourceType.name().toLowerCase()) + .put("data_op", actionType.getShortname()) + .put("data", payload); + + + eventsProcessor.enrichAndSubmit(auditEventType, dataEvent); + } catch (final Throwable e) { + LOG.error("Error occurred when submitting audit event for publishing", e); + } + } + + private String serialize(final Object state) { + try { + return objectMapper.writeValueAsString(state); + } catch (JsonProcessingException e) { + LOG.error("failed to publish audit log", e); + return null; + } + } + + public enum ResourceType { + EVENT_TYPE, + SUBSCRIPTION, + TIMELINE, + STORAGE, + FEATURE, + ADMINS, + CURSORS, + BLACKLIST_ENTRY + } + + public enum ActionType { + CREATED("C"), UPDATED("U"), DELETED("D"); + + private final String shortname; + + ActionType(final String shortname) { + this.shortname = shortname; + } + + public String getShortname() { + return shortname; + } + } +} diff --git a/src/main/java/org/zalando/nakadi/service/NakadiKpiInitialization.java b/src/main/java/org/zalando/nakadi/service/NakadiKpiInitialization.java index 622fe33008..c900890a6a 100644 --- a/src/main/java/org/zalando/nakadi/service/NakadiKpiInitialization.java +++ b/src/main/java/org/zalando/nakadi/service/NakadiKpiInitialization.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; @Component @ConfigurationProperties(prefix = "nakadi.kpi.event-types") @@ -53,7 +54,7 @@ public void onApplicationEvent(final ContextRefreshedEvent event) throws IOExcep LOG.debug("Initializing KPI event types"); - final String kpiEventTypesString = Resources + String kpiEventTypesString = Resources .toString(Resources.getResource("kpi_event_types.json"), Charsets.UTF_8); final Map replacements = new HashMap<>(); @@ -62,9 +63,11 @@ public void onApplicationEvent(final ContextRefreshedEvent event) throws IOExcep replacements.put("nakadi.data.streamed", nakadiDataStreamed); replacements.put("nakadi.batch.published", nakadiBatchPublished); replacements.put("nakadi.access.log", nakadiAccessLog); - replacements.put("owning_application", owningApplication); + replacements.put("owning_application_placeholder", owningApplication); - replacements.forEach((key, value) -> kpiEventTypesString.replaceAll(key, value)); + for (final Map.Entry entry : replacements.entrySet()) { + kpiEventTypesString = kpiEventTypesString.replaceAll(entry.getKey(), entry.getValue()); + } final TypeReference> typeReference = new TypeReference>() { }; @@ -73,7 +76,7 @@ public void onApplicationEvent(final ContextRefreshedEvent event) throws IOExcep eventTypes.forEach(et -> { try { - eventTypeService.create(et); + eventTypeService.create(et, Optional.of(owningApplication)); } catch (final DuplicatedEventTypeNameException e) { LOG.debug("KPI event type already exists " + et.getName()); } catch (final NakadiBaseException e) { diff --git a/src/main/java/org/zalando/nakadi/service/NakadiKpiPublisher.java b/src/main/java/org/zalando/nakadi/service/NakadiKpiPublisher.java index a4616ea89d..962ee7ef7b 100644 --- a/src/main/java/org/zalando/nakadi/service/NakadiKpiPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/NakadiKpiPublisher.java @@ -1,16 +1,12 @@ package org.zalando.nakadi.service; -import com.google.common.base.Charsets; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.codec.digest.DigestUtils; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.zalando.nakadi.security.UsernameHasher; -import java.security.MessageDigest; import java.util.function.Supplier; @Component @@ -18,19 +14,17 @@ public class NakadiKpiPublisher { private static final Logger LOG = LoggerFactory.getLogger(NakadiKpiPublisher.class); - private final ThreadLocal messageDigestThreadLocal; private final FeatureToggleService featureToggleService; private final EventsProcessor eventsProcessor; - private final byte[] salt; + private final UsernameHasher usernameHasher; @Autowired protected NakadiKpiPublisher(final FeatureToggleService featureToggleService, final EventsProcessor eventsProcessor, - @Value("${nakadi.hasher.salt}") final String salt) { + final UsernameHasher usernameHasher) { this.featureToggleService = featureToggleService; this.eventsProcessor = eventsProcessor; - this.salt = salt.getBytes(Charsets.UTF_8); - this.messageDigestThreadLocal = ThreadLocal.withInitial(DigestUtils::getSha256Digest); + this.usernameHasher = usernameHasher; } public void publish(final String etName, final Supplier eventSupplier) { @@ -47,11 +41,7 @@ public void publish(final String etName, final Supplier eventSupplie } public String hash(final String value) { - final MessageDigest messageDigest = messageDigestThreadLocal.get(); - messageDigest.reset(); - messageDigest.update(salt); - messageDigest.update(value.getBytes(Charsets.UTF_8)); - return Hex.encodeHexString(messageDigest.digest()); + return usernameHasher.hash(value); } } diff --git a/src/main/java/org/zalando/nakadi/service/StorageService.java b/src/main/java/org/zalando/nakadi/service/StorageService.java index 59ae711a18..4710405d0f 100644 --- a/src/main/java/org/zalando/nakadi/service/StorageService.java +++ b/src/main/java/org/zalando/nakadi/service/StorageService.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.function.Function; @Service public class StorageService { @@ -40,18 +41,21 @@ public class StorageService { private final DefaultStorage defaultStorage; private final CuratorFramework curator; private final FeatureToggleService featureToggleService; + private final NakadiAuditLogPublisher auditLogPublisher; @Autowired public StorageService(final ObjectMapper objectMapper, final StorageDbRepository storageDbRepository, @Qualifier("default_storage") final DefaultStorage defaultStorage, final ZooKeeperHolder zooKeeperHolder, - final FeatureToggleService featureToggleService) { + final FeatureToggleService featureToggleService, + final NakadiAuditLogPublisher auditLogPublisher) { this.objectMapper = objectMapper; this.storageDbRepository = storageDbRepository; this.defaultStorage = defaultStorage; this.curator = zooKeeperHolder.get(); this.featureToggleService = featureToggleService; + this.auditLogPublisher = auditLogPublisher; } @PostConstruct @@ -96,7 +100,7 @@ public Storage getStorage(final String id) throws NoSuchStorageException, Intern } } - public void createStorage(final JSONObject json) + public void createStorage(final JSONObject json, final Optional user) throws DbWriteOperationsBlockedException, DuplicatedStorageException, InternalNakadiException, UnknownStorageTypeException { if (featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_DB_WRITE_OPERATIONS)) { @@ -136,9 +140,17 @@ public void createStorage(final JSONObject json) LOG.error("DB error occurred when creating storage", e); throw new InternalNakadiException(e.getMessage()); } + + auditLogPublisher.publish( + Optional.empty(), + Optional.of(storage), + NakadiAuditLogPublisher.ResourceType.STORAGE, + NakadiAuditLogPublisher.ActionType.CREATED, + storage.getId(), + user); } - public void deleteStorage(final String id) + public void deleteStorage(final String id, final Optional user) throws DbWriteOperationsBlockedException, NoSuchStorageException, StorageIsUsedException, InternalNakadiException { if (featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_DB_WRITE_OPERATIONS)) { @@ -146,7 +158,18 @@ public void deleteStorage(final String id) "are blocked by feature flag."); } try { + final Optional storageOrNone = storageDbRepository.getStorage(id) + .map(Function.identity()); + storageDbRepository.deleteStorage(id); + + auditLogPublisher.publish( + storageOrNone, + Optional.empty(), + NakadiAuditLogPublisher.ResourceType.STORAGE, + NakadiAuditLogPublisher.ActionType.DELETED, + id, + user); } catch (final RepositoryProblemException e) { LOG.error("DB error occurred when deleting storage", e); throw new InternalNakadiException(e.getMessage()); @@ -159,12 +182,12 @@ public void deleteStorage(final String id) public Storage setDefaultStorage(final String defaultStorageId) throws NoSuchStorageException, InternalNakadiException { final Storage storage = getStorage(defaultStorageId); - try { - curator.setData().forPath(ZK_TIMELINES_DEFAULT_STORAGE, defaultStorageId.getBytes(Charsets.UTF_8)); - } catch (final Exception e) { - LOG.error("Error while setting default storage in zk {} ", e.getMessage(), e); - throw new InternalNakadiException("Error while setting default storage in zk"); - } + try { + curator.setData().forPath(ZK_TIMELINES_DEFAULT_STORAGE, defaultStorageId.getBytes(Charsets.UTF_8)); + } catch (final Exception e) { + LOG.error("Error while setting default storage in zk {} ", e.getMessage(), e); + throw new InternalNakadiException("Error while setting default storage in zk"); + } return storage; } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index b46fcd3868..785f800865 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -2,8 +2,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +44,7 @@ import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorOperationsService; import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.NakadiAuditLogPublisher; import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; @@ -55,7 +54,6 @@ import org.zalando.nakadi.util.SubscriptionsUriHelper; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; - import javax.annotation.Nullable; import java.time.Duration; import java.util.ArrayList; @@ -86,6 +84,7 @@ public class SubscriptionService { private final String subLogEventType; private final SubscriptionTimeLagService subscriptionTimeLagService; private final AuthorizationValidator authorizationValidator; + private final NakadiAuditLogPublisher nakadiAuditLogPublisher; @Autowired public SubscriptionService(final SubscriptionDbRepository subscriptionRepository, @@ -99,6 +98,7 @@ public SubscriptionService(final SubscriptionDbRepository subscriptionRepository final FeatureToggleService featureToggleService, final SubscriptionTimeLagService subscriptionTimeLagService, @Value("${nakadi.kpi.event-types.nakadiSubscriptionLog}") final String subLogEventType, + final NakadiAuditLogPublisher nakadiAuditLogPublisher, final AuthorizationValidator authorizationValidator) { this.subscriptionRepository = subscriptionRepository; this.subscriptionClientFactory = subscriptionClientFactory; @@ -111,10 +111,11 @@ public SubscriptionService(final SubscriptionDbRepository subscriptionRepository this.featureToggleService = featureToggleService; this.subscriptionTimeLagService = subscriptionTimeLagService; this.subLogEventType = subLogEventType; + this.nakadiAuditLogPublisher = nakadiAuditLogPublisher; this.authorizationValidator = authorizationValidator; } - public Subscription createSubscription(final SubscriptionBase subscriptionBase) + public Subscription createSubscription(final SubscriptionBase subscriptionBase, final Optional user) throws TooManyPartitionsException, RepositoryProblemException, DuplicatedSubscriptionException, NoSuchEventTypeException, InconsistentStateException, WrongInitialCursorsException, DbWriteOperationsBlockedException { @@ -131,10 +132,15 @@ public Subscription createSubscription(final SubscriptionBase subscriptionBase) .put("subscription_id", subscription.getId()) .put("status", "created")); + nakadiAuditLogPublisher.publish(Optional.empty(), Optional.of(subscription), + NakadiAuditLogPublisher.ResourceType.SUBSCRIPTION, NakadiAuditLogPublisher.ActionType.CREATED, + subscription.getId(), user); + return subscription; } - public Subscription updateSubscription(final String subscriptionId, final SubscriptionBase newValue) + public Subscription updateSubscription(final String subscriptionId, final SubscriptionBase newValue, + final Optional user) throws NoSuchSubscriptionException, SubscriptionUpdateConflictException { if (featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_DB_WRITE_OPERATIONS)) { throw new DbWriteOperationsBlockedException("Cannot create subscription: write operations on DB " + @@ -145,10 +151,14 @@ public Subscription updateSubscription(final String subscriptionId, final Subscr authorizationValidator.authorizeSubscriptionAdmin(old); subscriptionValidationService.validateSubscriptionChange(old, newValue); - old.mergeFrom(newValue); - old.setUpdatedAt(new DateTime(DateTimeZone.UTC)); - subscriptionRepository.updateSubscription(old); - return old; + final Subscription updated = old.mergeFrom(newValue); + subscriptionRepository.updateSubscription(updated); + + nakadiAuditLogPublisher.publish(Optional.of(old), Optional.of(updated), + NakadiAuditLogPublisher.ResourceType.SUBSCRIPTION, NakadiAuditLogPublisher.ActionType.UPDATED, + updated.getId(), user); + + return updated; } public Subscription getExistingSubscription(final SubscriptionBase subscriptionBase) @@ -197,7 +207,7 @@ public Subscription getSubscription(final String subscriptionId) return subscriptionRepository.getSubscription(subscriptionId); } - public void deleteSubscription(final String subscriptionId) + public void deleteSubscription(final String subscriptionId, final Optional user) throws DbWriteOperationsBlockedException, NoSuchSubscriptionException, NoSuchEventTypeException, ServiceTemporarilyUnavailableException, InternalNakadiException { if (featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_DB_WRITE_OPERATIONS)) { @@ -216,6 +226,10 @@ public void deleteSubscription(final String subscriptionId) nakadiKpiPublisher.publish(subLogEventType, () -> new JSONObject() .put("subscription_id", subscriptionId) .put("status", "deleted")); + + nakadiAuditLogPublisher.publish(Optional.of(subscription), Optional.empty(), + NakadiAuditLogPublisher.ResourceType.SUBSCRIPTION, NakadiAuditLogPublisher.ActionType.DELETED, + subscription.getId(), user); } public ItemsWrapper getSubscriptionStat(final String subscriptionId, diff --git a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java index e16e1a0f8c..b43fc67661 100644 --- a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java +++ b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java @@ -7,6 +7,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionException; import org.springframework.transaction.support.TransactionTemplate; @@ -49,6 +50,7 @@ import org.zalando.nakadi.repository.db.TimelineDbRepository; import org.zalando.nakadi.service.AdminService; import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.NakadiAuditLogPublisher; import org.zalando.nakadi.service.NakadiCursorComparator; import javax.annotation.Nullable; @@ -75,6 +77,7 @@ public class TimelineService { private final AdminService adminService; private final FeatureToggleService featureToggleService; private final String compactedStorageName; + private final NakadiAuditLogPublisher auditLogPublisher; @Autowired public TimelineService(final EventTypeCache eventTypeCache, @@ -87,7 +90,8 @@ public TimelineService(final EventTypeCache eventTypeCache, @Qualifier("default_storage") final DefaultStorage defaultStorage, final AdminService adminService, final FeatureToggleService featureToggleService, - @Value("${nakadi.timelines.storage.compacted}") final String compactedStorageName) { + @Value("${nakadi.timelines.storage.compacted}") final String compactedStorageName, + @Lazy final NakadiAuditLogPublisher auditLogPublisher) { this.eventTypeCache = eventTypeCache; this.storageDbRepository = storageDbRepository; this.timelineSync = timelineSync; @@ -99,9 +103,10 @@ public TimelineService(final EventTypeCache eventTypeCache, this.adminService = adminService; this.featureToggleService = featureToggleService; this.compactedStorageName = compactedStorageName; + this.auditLogPublisher = auditLogPublisher; } - public void createTimeline(final String eventTypeName, final String storageId) + public void createTimeline(final String eventTypeName, final String storageId, final Optional user) throws AccessDeniedException, TimelineException, TopicRepositoryException, InconsistentStateException, RepositoryProblemException, DbWriteOperationsBlockedException { if (featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_DB_WRITE_OPERATIONS)) { @@ -135,6 +140,14 @@ public void createTimeline(final String eventTypeName, final String storageId) activeTimeline.getOrder() + 1, storage, newTopic, new Date()); switchTimelines(activeTimeline, nextTimeline); + + auditLogPublisher.publish( + Optional.empty(), + Optional.of(nextTimeline), + NakadiAuditLogPublisher.ResourceType.TIMELINE, + NakadiAuditLogPublisher.ActionType.CREATED, + String.valueOf(nextTimeline.getId()), + user); } catch (final TopicCreationException | TopicConfigException | ServiceTemporarilyUnavailableException | InternalNakadiException e) { throw new TimelineException("Internal service error", e); diff --git a/src/main/java/org/zalando/nakadi/util/RequestUtils.java b/src/main/java/org/zalando/nakadi/util/RequestUtils.java new file mode 100644 index 0000000000..a18d715aaa --- /dev/null +++ b/src/main/java/org/zalando/nakadi/util/RequestUtils.java @@ -0,0 +1,12 @@ +package org.zalando.nakadi.util; + +import org.springframework.web.context.request.NativeWebRequest; + +import java.security.Principal; +import java.util.Optional; + +public class RequestUtils { + public static Optional getUser(final NativeWebRequest request) { + return Optional.ofNullable(request.getUserPrincipal()).map(Principal::getName); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7454da00f5..39edc3771c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -112,6 +112,11 @@ nakadi: timelines.storage: default: "default" compacted: "default" + audit: + eventType: "nakadi.audit.log" + owningApplication: "stups_nakadi" + authDataType: "*" + authValue: "*" kpi: config: batch-collection-timeout: 1000 @@ -126,7 +131,7 @@ nakadi: nakadiSubscriptionLog: "nakadi.subscription.log" nakadiBatchPublished: "nakadi.batch.published" nakadiDataStreamed: "nakadi.data.streamed" - owning_application: "nakadi" + owning_application: "stups_nakadi" hasher.salt: "salt" twintip: @@ -150,6 +155,7 @@ nakadi: REMOTE_TOKENINFO: true KPI_COLLECTION: true DISABLE_DB_WRITE_OPERATIONS: false + AUDIT_LOG_COLLECTION: true kpi: config: stream-data-collection-frequency-ms: 100 diff --git a/src/main/resources/audit_event_type.json b/src/main/resources/audit_event_type.json new file mode 100644 index 0000000000..5554f7f116 --- /dev/null +++ b/src/main/resources/audit_event_type.json @@ -0,0 +1,50 @@ +{ + "name": "event_type_name_placeholder", + "owning_application": "owning_application_placeholder", + "category": "data", + "enrichment_strategies": [ + "metadata_enrichment" + ], + "partition_strategy": "hash", + "partition_key_fields": [ + "resource_id" + ], + "cleanup_policy": "delete", + "ordering_key_fields": [], + "ordering_instance_ids": [], + "schema": { + "type": "json_schema", + "schema": "{\"properties\": {\"previous_object\": { \"type\": \"object\", \"description\": \"When modifying an already existent entity, its value is captured in this field as a JSON object. So, for example, when changing an Event Type attribute, this field contains the entire state before the changes are applied\"},\"previous_text\": { \"type\": \"string\", \"description\": \"Contains the same information as the field `previous_object` but as text, since the data lake stores a flat map of all the fields in the object, destroying information about its structure. Storing the text makes sure that the original data is not lost by any transformation that the data lake may apply on the data\"},\"new_object\": { \"type\": \"object\", \"description\": \"New value submitted by the user\"},\"new_text\": { \"type\": \"string\", \"description\": \"New value submitted by the user as text, in order to preserve the structure, if needed\"},\"resource_type\": { \"x-extensible-enum\": [ \"event_type\", \"subscription\", \"timeline\", \"storage\", \"feature\", \"admins\", \"cursors\", \"blacklist_entry\" ], \"type\":\"string\" },\"resource_id\": { \"description\": \"Resource identifier. Together with `resource_type` allows for the selection of a resource\", \"type\": \"string\"},\"user\": { \"description\": \"User or service that requested the changes\", \"type\": \"string\"},\"user_hash\": { \"description\": \"User hashed\", \"type\": \"string\"}},\"required\": [\"user\", \"user_hash\", \"resource_id\", \"resource_type\"]}" + }, + "default_statistic": { + "messages_per_minute": 100, + "message_size": 100, + "read_parallelism": 10, + "write_parallelism": 10 + }, + "options": { + "retention_time": 345600000 + }, + "compatibility_mode": "forward", + "audience": "company-internal", + "authorization": { + "admins": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ], + "readers": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ], + "writers": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ] + } +} \ No newline at end of file diff --git a/src/main/resources/kpi_event_types.json b/src/main/resources/kpi_event_types.json index 2763871533..11300204c7 100644 --- a/src/main/resources/kpi_event_types.json +++ b/src/main/resources/kpi_event_types.json @@ -1,7 +1,7 @@ [ { "name": "nakadi.batch.published", - "owning_application": "owning_application", + "owning_application": "owning_application_placeholder", "category": "business", "enrichment_strategies": [ "metadata_enrichment" @@ -29,7 +29,7 @@ }, { "name": "nakadi.data.streamed", - "owning_application": "owning_application", + "owning_application": "owning_application_placeholder", "category": "business", "enrichment_strategies": [ "metadata_enrichment" @@ -65,7 +65,7 @@ }, { "name": "nakadi.access.log", - "owning_application": "owning_application", + "owning_application": "owning_application_placeholder", "category": "business", "enrichment_strategies": [ "metadata_enrichment" @@ -100,7 +100,7 @@ }, { "name": "nakadi.event.type.log", - "owning_application": "owning_application", + "owning_application": "owning_application_placeholder", "category": "business", "enrichment_strategies": [ "metadata_enrichment" @@ -136,7 +136,7 @@ }, { "name": "nakadi.subscription.log", - "owning_application": "owning_application", + "owning_application": "owning_application_placeholder", "category": "business", "enrichment_strategies": [ "metadata_enrichment" diff --git a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java index 8fca12851e..8e584e3d34 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java +++ b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java @@ -29,12 +29,13 @@ import org.zalando.nakadi.service.AuthorizationValidator; import org.zalando.nakadi.service.EventTypeService; import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.NakadiAuditLogPublisher; import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.service.timeline.TimelineSync; +import org.zalando.nakadi.service.validation.EventTypeOptionsValidator; import org.zalando.nakadi.util.UUIDGenerator; import org.zalando.nakadi.utils.TestUtils; -import org.zalando.nakadi.service.validation.EventTypeOptionsValidator; import org.zalando.nakadi.validation.SchemaEvolutionService; import org.zalando.problem.Problem; import uk.co.datumedge.hamcrest.json.SameJSONAs; @@ -83,6 +84,7 @@ public class EventTypeControllerTestCase { protected final AuthorizationValidator authorizationValidator = mock(AuthorizationValidator.class); protected final AdminService adminService = mock(AdminService.class); protected final NakadiKpiPublisher nakadiKpiPublisher = mock(NakadiKpiPublisher.class); + protected final NakadiAuditLogPublisher nakadiAuditLogPublisher = mock(NakadiAuditLogPublisher.class); protected MockMvc mockMvc; @@ -110,8 +112,9 @@ public void init() throws Exception { final EventTypeService eventTypeService = new EventTypeService(eventTypeRepository, timelineService, partitionResolver, enrichment, subscriptionRepository, schemaEvolutionService, partitionsCalculator, featureToggleService, authorizationValidator, timelineSync, transactionTemplate, nakadiSettings, - nakadiKpiPublisher, "et-log-event-type", eventTypeOptionsValidator, adminService); - final EventTypeController controller = new EventTypeController(eventTypeService,featureToggleService, + nakadiKpiPublisher, "et-log-event-type", nakadiAuditLogPublisher, eventTypeOptionsValidator, + adminService); + final EventTypeController controller = new EventTypeController(eventTypeService, featureToggleService, adminService, nakadiSettings); doReturn(randomUUID).when(uuid).randomUUID(); diff --git a/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java index d83ec8baa7..8f64144053 100644 --- a/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/PostSubscriptionControllerTest.java @@ -108,7 +108,7 @@ public void whenPostValidSubscriptionThenOk() throws Exception { final Subscription subscription = new Subscription("123", createdAt, createdAt, subscriptionBase); when(subscriptionService.getExistingSubscription(any())).thenThrow(new NoSuchSubscriptionException("", null)); - when(subscriptionService.createSubscription(any())).thenReturn(subscription); + when(subscriptionService.createSubscription(any(), any())).thenReturn(subscription); postSubscription(subscriptionBase) .andExpect(status().isCreated()) @@ -157,7 +157,7 @@ public void whenEventTypesIsEmptyThenUnprocessableEntity() throws Exception { @Test public void whenMoreThanAllowedEventTypeThenUnprocessableEntity() throws Exception { when(subscriptionService.getExistingSubscription(any())).thenThrow(new NoSuchSubscriptionException("", null)); - when(subscriptionService.createSubscription(any())).thenThrow(new TooManyPartitionsException("msg")); + when(subscriptionService.createSubscription(any(), any())).thenThrow(new TooManyPartitionsException("msg")); final SubscriptionBase subscriptionBase = builder().buildSubscriptionBase(); final Problem expectedProblem = Problem.valueOf(UNPROCESSABLE_ENTITY, "msg"); @@ -183,7 +183,7 @@ public void whenWrongStartFromThenBadRequest() throws Exception { public void whenEventTypeDoesNotExistThenUnprocessableEntity() throws Exception { final SubscriptionBase subscriptionBase = builder().buildSubscriptionBase(); when(subscriptionService.getExistingSubscription(any())).thenThrow(new NoSuchSubscriptionException("", null)); - when(subscriptionService.createSubscription(any())).thenThrow(new NoSuchEventTypeException("msg")); + when(subscriptionService.createSubscription(any(), any())).thenThrow(new NoSuchEventTypeException("msg")); final Problem expectedProblem = Problem.valueOf(UNPROCESSABLE_ENTITY, "msg"); checkForProblem(postSubscription(subscriptionBase), expectedProblem); @@ -196,7 +196,7 @@ public void whenSubscriptionExistsThenReturnIt() throws Exception { final Subscription existingSubscription = new Subscription("123", createdAt, createdAt, subscriptionBase); when(subscriptionService.getExistingSubscription(any())).thenReturn(existingSubscription); - when(subscriptionService.createSubscription(any())).thenThrow(new NoSuchEventTypeException("msg")); + when(subscriptionService.createSubscription(any(), any())).thenThrow(new NoSuchEventTypeException("msg")); postSubscription(subscriptionBase) .andExpect(status().isOk()) diff --git a/src/test/java/org/zalando/nakadi/controller/StoragesControllerTest.java b/src/test/java/org/zalando/nakadi/controller/StoragesControllerTest.java index ffce062e61..e4b41bf3e5 100644 --- a/src/test/java/org/zalando/nakadi/controller/StoragesControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/StoragesControllerTest.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; @@ -67,7 +68,7 @@ public void testListStorages() throws Exception { @Test public void testDeleteUnusedStorage() throws Exception { - doNothing().when(storageService).deleteStorage("s1"); + doNothing().when(storageService).deleteStorage("s1", Optional.empty()); when(adminService.isAdmin(AuthorizationService.Operation.WRITE)).thenReturn(true); mockMvc.perform(delete("/storages/s1") .principal(mockPrincipal("nakadi"))) @@ -77,7 +78,7 @@ public void testDeleteUnusedStorage() throws Exception { @Test public void testPostStorage() throws Exception { final JSONObject json = createJsonKafkaStorage("test_storage"); - doNothing().when(storageService).createStorage(any()); + doNothing().when(storageService).createStorage(any(), any()); when(adminService.isAdmin(AuthorizationService.Operation.WRITE)).thenReturn(true); mockMvc.perform(post("/storages") .contentType(APPLICATION_JSON) diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java index 17b8d8a3ab..d65e32010a 100644 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java @@ -37,6 +37,7 @@ import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorOperationsService; import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.NakadiAuditLogPublisher; import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.subscription.SubscriptionService; import org.zalando.nakadi.service.subscription.model.Partition; @@ -122,10 +123,11 @@ public SubscriptionControllerTest() throws Exception { cursorOperationsService = mock(CursorOperationsService.class); cursorConverter = mock(CursorConverter.class); final NakadiKpiPublisher nakadiKpiPublisher = mock(NakadiKpiPublisher.class); + final NakadiAuditLogPublisher nakadiAuditLogPublisher = mock(NakadiAuditLogPublisher.class); final SubscriptionService subscriptionService = new SubscriptionService(subscriptionRepository, zkSubscriptionClientFactory, timelineService, eventTypeRepository, null, cursorConverter, cursorOperationsService, nakadiKpiPublisher, featureToggleService, null, - "subscription_log_et", mock(AuthorizationValidator.class)); + "subscription_log_et", nakadiAuditLogPublisher, mock(AuthorizationValidator.class)); final SubscriptionController controller = new SubscriptionController(subscriptionService); final ApplicationService applicationService = mock(ApplicationService.class); doReturn(true).when(applicationService).exists(any()); diff --git a/src/test/java/org/zalando/nakadi/controller/TimelinesControllerTest.java b/src/test/java/org/zalando/nakadi/controller/TimelinesControllerTest.java index 59bcb52ba6..9daabc3c87 100644 --- a/src/test/java/org/zalando/nakadi/controller/TimelinesControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/TimelinesControllerTest.java @@ -15,8 +15,8 @@ import org.zalando.nakadi.domain.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.security.ClientResolver; -import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.util.PrincipalMockFactory; import org.zalando.nakadi.utils.TestUtils; import org.zalando.nakadi.view.TimelineView; @@ -49,7 +49,7 @@ public TimelinesControllerTest() { @Test public void whenPostTimelineThenCreated() throws Exception { - Mockito.doNothing().when(timelineService).createTimeline(Mockito.any(), Mockito.any()); + Mockito.doNothing().when(timelineService).createTimeline(Mockito.any(), Mockito.any(), Mockito.any()); mockMvc.perform(MockMvcRequestBuilders.post("/event-types/event_type/timelines") .contentType(MediaType.APPLICATION_JSON) .content(new JSONObject().put("storage_id", "default").toString()) diff --git a/src/test/java/org/zalando/nakadi/security/UsernameHasherTest.java b/src/test/java/org/zalando/nakadi/security/UsernameHasherTest.java new file mode 100644 index 0000000000..bf47440529 --- /dev/null +++ b/src/test/java/org/zalando/nakadi/security/UsernameHasherTest.java @@ -0,0 +1,17 @@ +package org.zalando.nakadi.security; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class UsernameHasherTest { + + @Test + public void teastHash() { + final UsernameHasher usernameHasher = new UsernameHasher("123"); + assertThat( + usernameHasher.hash("abc"), + equalTo("dd130a849d7b29e5541b05d2f7f86a4acd4f1ec598c1c9438783f56bc4f0ff80")); + } +} diff --git a/src/test/java/org/zalando/nakadi/service/AdminServiceTest.java b/src/test/java/org/zalando/nakadi/service/AdminServiceTest.java index 4d22fb86de..08383af94c 100644 --- a/src/test/java/org/zalando/nakadi/service/AdminServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/AdminServiceTest.java @@ -13,6 +13,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; @@ -62,8 +63,9 @@ public AdminServiceTest() { this.authorizationService = mock(AuthorizationService.class); this.nakadiSettings = mock(NakadiSettings.class); this.featureToggleService = mock(FeatureToggleService.class); + final NakadiAuditLogPublisher auditLogPublisher = mock(NakadiAuditLogPublisher.class); this.adminService = new AdminService(authorizationDbRepository, authorizationService, - featureToggleService, nakadiSettings); + featureToggleService, nakadiSettings, auditLogPublisher); this.adminList = new ArrayList<>(Arrays.asList(permAdminUser1, permAdminService1, permAdminService2, permReadUser1, permReadService1, permReadService2, permWriteUser1, permWriteService1, permWriteService2)); @@ -79,7 +81,7 @@ public AdminServiceTest() { @Test public void whenUpdateAdminsThenOk() { when(authorizationDbRepository.listAdmins()).thenReturn(adminList); - adminService.updateAdmins(newAuthz.toPermissionsList("nakadi")); + adminService.updateAdmins(newAuthz.toPermissionsList("nakadi"), Optional.empty()); } @Test @@ -89,7 +91,7 @@ public void whenUpdateThenDefaultAdminIsNotAddedToDB() { final List newList = new ArrayList<>(adminList); newList.addAll(defaultAdminPermissions); - adminService.updateAdmins(newList); + adminService.updateAdmins(newList, Optional.empty()); verify(authorizationDbRepository, times(0)).createPermission(any()); verify(authorizationDbRepository, times(0)).deletePermission(any()); } @@ -99,7 +101,7 @@ public void whenUpdateThenDefaultAdminIsNotDeletedFromDB() { when(nakadiSettings.getDefaultAdmin()).thenReturn(defaultAdmin); when(authorizationDbRepository.listAdmins()).thenReturn(adminList); - adminService.updateAdmins(adminList); + adminService.updateAdmins(adminList, Optional.empty()); verify(authorizationDbRepository, times(0)).createPermission(any()); verify(authorizationDbRepository, times(0)).deletePermission(any()); } @@ -117,7 +119,7 @@ public void whenAddNewAdminCallCreatePermission() { newList.add(new Permission("nakadi", AuthorizationService.Operation.READ, new ResourceAuthorizationAttribute("user", "user42"))); - adminService.updateAdmins(newList); + adminService.updateAdmins(newList, Optional.empty()); verify(authorizationDbRepository).update(addCaptor.capture(), deleteCaptor.capture()); assertEquals(1, addCaptor.getValue().size()); @@ -136,7 +138,7 @@ public void whenDeleteAdminCallDeletePermission() { final List newList = new ArrayList<>(adminList); newList.remove(permReadUser1); - adminService.updateAdmins(newList); + adminService.updateAdmins(newList, Optional.empty()); verify(authorizationDbRepository).update(addCaptor.capture(), deleteCaptor.capture()); assertEquals(0, addCaptor.getValue().size()); diff --git a/src/test/java/org/zalando/nakadi/service/CursorsServiceTest.java b/src/test/java/org/zalando/nakadi/service/CursorsServiceTest.java index d11937c616..5a35c54c3d 100644 --- a/src/test/java/org/zalando/nakadi/service/CursorsServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/CursorsServiceTest.java @@ -9,6 +9,7 @@ import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import java.util.Collections; +import java.util.Optional; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; @@ -23,7 +24,7 @@ public class CursorsServiceTest { public void setup() { authorizationValidator = mock(AuthorizationValidator.class); service = new CursorsService(mock(SubscriptionDbRepository.class), mock(SubscriptionCache.class), null, null, - null, null, null, null, authorizationValidator); + null, null, null, null, authorizationValidator, null); } @Test(expected = AccessDeniedException.class) @@ -31,7 +32,7 @@ public void whenResetCursorsThenAdminAccessChecked() throws Exception { doThrow(new AccessDeniedException(AuthorizationService.Operation.ADMIN, new ResourceImpl("", ResourceImpl.SUBSCRIPTION_RESOURCE, null, null))) .when(authorizationValidator).authorizeSubscriptionAdmin(any()); - service.resetCursors("test", Collections.emptyList()); + service.resetCursors("test", Collections.emptyList(), Optional.empty()); } @Test(expected = AccessDeniedException.class) diff --git a/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java b/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java index e572a5b5f1..287a67906a 100644 --- a/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java @@ -13,10 +13,10 @@ import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.enrichment.Enrichment; -import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.ConflictException; import org.zalando.nakadi.exceptions.runtime.EventTypeDeletionException; import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException; +import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.TopicCreationException; import org.zalando.nakadi.partitioning.PartitionResolver; import org.zalando.nakadi.repository.EventTypeRepository; @@ -64,6 +64,7 @@ public class EventTypeServiceTest { private final TransactionTemplate transactionTemplate = mock(TransactionTemplate.class); private final AuthorizationValidator authorizationValidator = mock(AuthorizationValidator.class); private final NakadiKpiPublisher nakadiKpiPublisher = mock(NakadiKpiPublisher.class); + private final NakadiAuditLogPublisher nakadiAuditLogPublisher = mock(NakadiAuditLogPublisher.class); private final AdminService adminService = mock(AdminService.class); private EventTypeService eventTypeService; @@ -74,7 +75,8 @@ public void setUp() { eventTypeService = new EventTypeService(eventTypeRepository, timelineService, partitionResolver, enrichment, subscriptionDbRepository, schemaEvolutionService, partitionsCalculator, featureToggleService, authorizationValidator, timelineSync, transactionTemplate, nakadiSettings, nakadiKpiPublisher, - KPI_ET_LOG_EVENT_TYPE, eventTypeOptionsValidator, adminService); + KPI_ET_LOG_EVENT_TYPE, nakadiAuditLogPublisher, eventTypeOptionsValidator, + adminService); when(transactionTemplate.execute(any())).thenAnswer(invocation -> { final TransactionCallback callback = (TransactionCallback) invocation.getArguments()[0]; return callback.doInTransaction(null); @@ -94,7 +96,7 @@ public void testFailToDeleteEventType() throws Exception { .listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 0, 1); doReturn(topicsToDelete).when(timelineService).deleteAllTimelinesForEventType(eventType.getName()); try { - eventTypeService.delete(eventType.getName()); + eventTypeService.delete(eventType.getName(), Optional.empty()); } catch (final EventTypeDeletionException e) { // check that topics are not deleted in Kafka verifyZeroInteractions(topicsToDelete); @@ -113,7 +115,7 @@ public void whenSubscriptionsExistThenCantDeleteEventType() throws Exception { .when(subscriptionDbRepository) .listSubscriptions(ImmutableSet.of(eventType.getName()), Optional.empty(), 0, 1); - eventTypeService.delete(eventType.getName()); + eventTypeService.delete(eventType.getName(), Optional.empty()); } @Test @@ -128,7 +130,7 @@ public void testFeatureToggleAllowsDeleteEventTypeWithSubscriptions() throws Exc when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS)) .thenReturn(true); - eventTypeService.delete(eventType.getName()); + eventTypeService.delete(eventType.getName(), Optional.empty()); // no exception should be thrown } @@ -140,7 +142,7 @@ public void testFeatureToggleDisableLogCompaction() throws Exception { when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_LOG_COMPACTION)) .thenReturn(true); - eventTypeService.create(eventType); + eventTypeService.create(eventType, Optional.empty()); } @Test @@ -149,7 +151,7 @@ public void shouldRemoveEventTypeWhenTimelineCreationFails() throws Exception { when(timelineService.createDefaultTimeline(any(), anyInt())) .thenThrow(new TopicCreationException("Failed to create topic")); try { - eventTypeService.create(eventType); + eventTypeService.create(eventType, Optional.empty()); fail("should throw TopicCreationException"); } catch (final TopicCreationException e) { // expected @@ -161,7 +163,7 @@ public void shouldRemoveEventTypeWhenTimelineCreationFails() throws Exception { @Test public void whenEventTypeCreatedThenKPIEventSubmitted() throws Exception { final EventType et = buildDefaultEventType(); - eventTypeService.create(et); + eventTypeService.create(et, Optional.empty()); checkKPIEventSubmitted(nakadiKpiPublisher, KPI_ET_LOG_EVENT_TYPE, new JSONObject() .put("event_type", et.getName()) @@ -177,7 +179,7 @@ public void whenEventTypeUpdatedThenKPIEventSubmitted() throws Exception { when(eventTypeRepository.findByName(et.getName())).thenReturn(et); when(schemaEvolutionService.evolve(any(), any())).thenReturn(et); - eventTypeService.update(et.getName(), et); + eventTypeService.update(et.getName(), et, Optional.empty()); checkKPIEventSubmitted(nakadiKpiPublisher, KPI_ET_LOG_EVENT_TYPE, new JSONObject() .put("event_type", et.getName()) @@ -192,7 +194,7 @@ public void whenEventTypeDeletedThenKPIEventSubmitted() throws Exception { final EventType et = buildDefaultEventType(); when(eventTypeRepository.findByNameO(et.getName())).thenReturn(Optional.of(et)); - eventTypeService.delete(et.getName()); + eventTypeService.delete(et.getName(), Optional.empty()); checkKPIEventSubmitted(nakadiKpiPublisher, KPI_ET_LOG_EVENT_TYPE, new JSONObject() .put("event_type", et.getName()) diff --git a/src/test/java/org/zalando/nakadi/service/NakadiAuditLogPublisherTest.java b/src/test/java/org/zalando/nakadi/service/NakadiAuditLogPublisherTest.java new file mode 100644 index 0000000000..4253adfb2f --- /dev/null +++ b/src/test/java/org/zalando/nakadi/service/NakadiAuditLogPublisherTest.java @@ -0,0 +1,97 @@ +package org.zalando.nakadi.service; + +import org.joda.time.DateTime; +import org.json.JSONObject; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.zalando.nakadi.config.JsonConfig; +import org.zalando.nakadi.domain.EventType; +import org.zalando.nakadi.security.UsernameHasher; + +import java.util.Optional; + +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; +import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs; + +public class NakadiAuditLogPublisherTest { + + @Test + public void testPublishAuditLog() { + final EventsProcessor processor = mock(EventsProcessor.class); + final FeatureToggleService toggle = mock(FeatureToggleService.class); + + when(toggle.isFeatureEnabled(FeatureToggleService.Feature.AUDIT_LOG_COLLECTION)).thenReturn(true); + + final NakadiAuditLogPublisher publisher = new NakadiAuditLogPublisher( + toggle, + processor, + new JsonConfig().jacksonObjectMapper(), + new UsernameHasher("salt"), + "audit-event-type"); + + final DateTime now = DateTime.parse("2019-01-16T13:44:16.819Z"); + final EventType et = buildDefaultEventType(); + et.setName("new-et-name"); + et.setCreatedAt(now); + et.setUpdatedAt(now); + et.getSchema().setCreatedAt(now); + publisher.publish(Optional.empty(), Optional.of(et), + NakadiAuditLogPublisher.ResourceType.EVENT_TYPE, + NakadiAuditLogPublisher.ActionType.CREATED, "et-name", Optional.of("user-name")); + + final ArgumentCaptor supplierCaptor = ArgumentCaptor.forClass(JSONObject.class); + verify(processor, times(1)).enrichAndSubmit(eq("audit-event-type"), supplierCaptor.capture()); + + assertThat(new JSONObject("{" + + "\"data_op\":\"C\"," + + "\"data\":{" + + "\"new_object\":{" + + "\"schema\":{" + + "\"schema\":\"{ \\\"properties\\\": { \\\"foo\\\": { \\\"type\\\": \\\"string\\\"" + + " } } }\"," + + "\"created_at\":\"2019-01-16T13:44:16.819Z\"," + + "\"type\":\"json_schema\"," + + "\"version\":\"1.0.0\"" + + "}," + + "\"compatibility_mode\":\"compatible\"," + + "\"ordering_key_fields\":[]," + + "\"created_at\":\"2019-01-16T13:44:16.819Z\"," + + "\"cleanup_policy\":\"delete\"," + + "\"ordering_instance_ids\":[]," + + "\"authorization\":null," + + "\"partition_key_fields\":[]," + + "\"updated_at\":\"2019-01-16T13:44:16.819Z\"," + + "\"default_statistic\":null," + + "\"name\":\"new-et-name\"," + + "\"options\":{\"retention_time\":172800000}," + + "\"partition_strategy\":\"random\"," + + "\"owning_application\":\"event-producer-application\"," + + "\"enrichment_strategies\":[]," + + "\"category\":\"undefined\"" + + "}," + + "\"new_text\":\"{\\\"name\\\":\\\"new-et-name\\\",\\\"owning_application\\\":\\\"event" + + "-producer-application\\\",\\\"category\\\":\\\"undefined\\\",\\\"enrichment_strategies\\\"" + + ":[],\\\"partition_strategy\\\":\\\"random\\\",\\\"partition_key_fields\\\":[],\\\"cleanup" + + "_policy\\\":\\\"delete\\\",\\\"ordering_key_fields\\\":[],\\\"ordering_instance_ids\\\":[" + + "],\\\"schema\\\":{\\\"type\\\":\\\"json_schema\\\",\\\"schema\\\":\\\"{ \\\\\\\"propertie" + + "s\\\\\\\": { \\\\\\\"foo\\\\\\\": { \\\\\\\"type\\\\\\\": \\\\\\\"string\\\\\\\" } } }\\\"" + + ",\\\"version\\\":\\\"1.0.0\\\",\\\"created_at\\\":\\\"2019-01-16T13:44:16.819Z\\\"},\\\"d" + + "efault_statistic\\\":null,\\\"options\\\":{\\\"retention_time\\\":172800000},\\\"authoriz" + + "ation\\\":null,\\\"compatibility_mode\\\":\\\"compatible\\\",\\\"updated_at\\\":\\\"2019-" + + "01-16T13:44:16.819Z\\\",\\\"created_at\\\":\\\"2019-01-16T13:44:16.819Z\\\"}\"," + + "\"resource_type\":\"event_type\"," + + "\"resource_id\":\"et-name\"," + + "\"user\":\"user-name\"," + + "\"user_hash\":\"89bc5f7398509d3ce86c013c138e11357ff7f589fca9d58cfce443c27f81956c\"" + + "}," + + "\"data_type\":\"event_type\"}").toString(), + sameJSONAs(supplierCaptor.getValue().toString())); + } + +} \ No newline at end of file diff --git a/src/test/java/org/zalando/nakadi/service/NakadiKpiPublisherTest.java b/src/test/java/org/zalando/nakadi/service/NakadiKpiPublisherTest.java index aca24d6a83..5c8f35810e 100644 --- a/src/test/java/org/zalando/nakadi/service/NakadiKpiPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/NakadiKpiPublisherTest.java @@ -3,8 +3,8 @@ import org.json.JSONObject; import org.junit.Test; import org.mockito.Mockito; +import org.zalando.nakadi.security.UsernameHasher; -import java.security.MessageDigest; import java.util.function.Supplier; import static org.hamcrest.MatcherAssert.assertThat; @@ -14,14 +14,15 @@ public class NakadiKpiPublisherTest { private final FeatureToggleService featureToggleService = Mockito.mock(FeatureToggleService.class); private final EventsProcessor eventsProcessor = Mockito.mock(EventsProcessor.class); - private MessageDigest messageDigest; + private final UsernameHasher usernameHasher = new UsernameHasher("123"); @Test public void testPublishWithFeatureToggleOn() throws Exception { Mockito.when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.KPI_COLLECTION)) .thenReturn(true); final Supplier dataSupplier = () -> null; - new NakadiKpiPublisher(featureToggleService, eventsProcessor, "123").publish("test_et_name", dataSupplier); + new NakadiKpiPublisher(featureToggleService, eventsProcessor, usernameHasher) + .publish("test_et_name", dataSupplier); Mockito.verify(eventsProcessor).enrichAndSubmit("test_et_name", dataSupplier.get()); } @@ -31,7 +32,7 @@ public void testPublishWithFeatureToggleOff() throws Exception { Mockito.when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.KPI_COLLECTION)) .thenReturn(false); final Supplier dataSupplier = () -> null; - new NakadiKpiPublisher(featureToggleService, eventsProcessor, "123") + new NakadiKpiPublisher(featureToggleService, eventsProcessor, usernameHasher) .publish("test_et_name", dataSupplier); Mockito.verify(eventsProcessor, Mockito.never()).enrichAndSubmit("test_et_name", dataSupplier.get()); @@ -39,7 +40,8 @@ public void testPublishWithFeatureToggleOff() throws Exception { @Test public void testHash() throws Exception { - final NakadiKpiPublisher publisher = new NakadiKpiPublisher(featureToggleService, eventsProcessor, "123"); + final NakadiKpiPublisher publisher = new NakadiKpiPublisher(featureToggleService, eventsProcessor, + usernameHasher); assertThat(publisher.hash("application"), equalTo("befee725ab2ed3b17020112089a693ad8d8cfbf62b2442dcb5b89d66ce72391e")); } diff --git a/src/test/java/org/zalando/nakadi/service/StorageServiceTest.java b/src/test/java/org/zalando/nakadi/service/StorageServiceTest.java index 3be14501bb..594fb7b580 100644 --- a/src/test/java/org/zalando/nakadi/service/StorageServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/StorageServiceTest.java @@ -11,6 +11,8 @@ import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.utils.TestUtils; +import java.util.Optional; + import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -26,8 +28,10 @@ public class StorageServiceTest { public void setUp() { featureToggleService = mock(FeatureToggleService.class); storageDbRepository = mock(StorageDbRepository.class); + final NakadiAuditLogPublisher auditLogPublisher = mock(NakadiAuditLogPublisher.class); storageService = new StorageService(TestUtils.OBJECT_MAPPER, storageDbRepository, - new DefaultStorage(mock(Storage.class)), mock(ZooKeeperHolder.class), featureToggleService); + new DefaultStorage(mock(Storage.class)), mock(ZooKeeperHolder.class), featureToggleService, + auditLogPublisher); when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_DB_WRITE_OPERATIONS)) .thenReturn(false); } @@ -39,26 +43,29 @@ public void testCreateStorage() { when(storageDbRepository.createStorage(any())).thenReturn(dbReply); final JSONObject storage = createTestStorageJson("s1"); - storageService.createStorage(storage); + storageService.createStorage(storage, Optional.empty()); } @Test - public void testDeleteUnusedStorage() throws Exception { - storageService.deleteStorage("s3"); + public void testDeleteUnusedStorage() { + when(storageDbRepository.getStorage(any())).thenReturn(Optional.empty()); + storageService.deleteStorage("s3", Optional.empty()); } @Test(expected = StorageIsUsedException.class) - public void testDeleteStorageInUse() throws Exception { + public void testDeleteStorageInUse() { + when(storageDbRepository.getStorage(any())).thenReturn(Optional.empty()); doThrow(new StorageIsUsedException("", null)).when(storageDbRepository).deleteStorage("s"); - storageService.deleteStorage("s"); + storageService.deleteStorage("s", Optional.empty()); } @Test(expected = NoSuchStorageException.class) - public void testDeleteNonExistingStorage() throws Exception { + public void testDeleteNonExistingStorage() { + when(storageDbRepository.getStorage(any())).thenReturn(Optional.empty()); doThrow(new NoSuchStorageException("")).when(storageDbRepository).deleteStorage("s"); - storageService.deleteStorage("s"); + storageService.deleteStorage("s", Optional.empty()); } private JSONObject createTestStorageJson(final String id) { diff --git a/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java b/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java index ebb31038b1..827882b589 100644 --- a/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java @@ -19,6 +19,8 @@ import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.utils.RandomSubscriptionBuilder; +import java.util.Optional; + import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; @@ -46,6 +48,7 @@ public void setUp() throws Exception { final CursorOperationsService cursorOperationsService = mock(CursorOperationsService.class); final CursorConverter cursorConverter = mock(CursorConverter.class); final EventTypeRepository eventTypeRepository = mock(EventTypeRepository.class); + final NakadiAuditLogPublisher nakadiAuditLogPublisher = mock(NakadiAuditLogPublisher.class); subscriptionValidationService = mock(SubscriptionValidationService.class); nakadiKpiPublisher = mock(NakadiKpiPublisher.class); subscriptionRepository = mock(SubscriptionDbRepository.class); @@ -55,7 +58,7 @@ public void setUp() throws Exception { subscriptionService = new SubscriptionService(subscriptionRepository, zkSubscriptionClientFactory, timelineService, eventTypeRepository, subscriptionValidationService, cursorConverter, cursorOperationsService, nakadiKpiPublisher, featureToggleService, null, SUBSCRIPTION_LOG_ET, - authorizationValidator); + nakadiAuditLogPublisher, authorizationValidator); } @Test @@ -68,7 +71,7 @@ public void whenSubscriptionCreatedThenKPIEventSubmitted() { subscription.setUpdatedAt(subscription.getCreatedAt()); when(subscriptionRepository.createSubscription(subscriptionBase)).thenReturn(subscription); - subscriptionService.createSubscription(subscriptionBase); + subscriptionService.createSubscription(subscriptionBase, Optional.empty()); checkKPIEventSubmitted(nakadiKpiPublisher, SUBSCRIPTION_LOG_ET, new JSONObject() @@ -78,7 +81,8 @@ public void whenSubscriptionCreatedThenKPIEventSubmitted() { @Test public void whenSubscriptionDeletedThenKPIEventSubmitted() { - subscriptionService.deleteSubscription("my_subscription_id1"); + when(subscriptionRepository.getSubscription(any())).thenReturn(new Subscription()); + subscriptionService.deleteSubscription("my_subscription_id1", Optional.empty()); checkKPIEventSubmitted(nakadiKpiPublisher, SUBSCRIPTION_LOG_ET, new JSONObject() @@ -94,7 +98,7 @@ public void whenSubscriptionCreatedAuthorizationIsValidated() { doThrow(new UnableProcessException("fake")) .when(subscriptionValidationService).validateSubscription(eq(subscriptionBase)); - subscriptionService.createSubscription(subscriptionBase); + subscriptionService.createSubscription(subscriptionBase, Optional.empty()); } @Test(expected = AccessDeniedException.class) @@ -106,7 +110,7 @@ public void whenSubscriptionModifiedAuthorizationIsValidated() throws NoSuchSubs final SubscriptionBase subscriptionBase = RandomSubscriptionBuilder.builder() .buildSubscriptionBase(); - subscriptionService.updateSubscription("test", subscriptionBase); + subscriptionService.updateSubscription("test", subscriptionBase, Optional.empty()); } @Test(expected = AccessDeniedException.class) @@ -115,6 +119,6 @@ public void whenSubscriptionDeletedAuthorizationIsValidated() { new ResourceImpl("", ResourceImpl.SUBSCRIPTION_RESOURCE, null, null))) .when(authorizationValidator).authorizeSubscriptionAdmin(any()); - subscriptionService.deleteSubscription("test"); + subscriptionService.deleteSubscription("test", Optional.empty()); } } diff --git a/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java b/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java index 03b324918e..5c27f5167b 100644 --- a/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java @@ -12,9 +12,9 @@ import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.Storage; import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException; -import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; import org.zalando.nakadi.exceptions.runtime.NotFoundException; import org.zalando.nakadi.exceptions.runtime.TimelineException; import org.zalando.nakadi.exceptions.runtime.TimelinesNotSupportedException; @@ -25,11 +25,13 @@ import org.zalando.nakadi.repository.db.TimelineDbRepository; import org.zalando.nakadi.service.AdminService; import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.NakadiAuditLogPublisher; import org.zalando.nakadi.utils.EventTypeTestBuilder; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static java.util.stream.IntStream.range; @@ -47,10 +49,12 @@ public class TimelineServiceTest { private final TimelineDbRepository timelineDbRepository = mock(TimelineDbRepository.class); private final TopicRepositoryHolder topicRepositoryHolder = mock(TopicRepositoryHolder.class); private final FeatureToggleService featureToggleService = mock(FeatureToggleService.class); + private final NakadiAuditLogPublisher auditLogPublisher = mock(NakadiAuditLogPublisher.class); private final TimelineService timelineService = new TimelineService(eventTypeCache, storageDbRepository, mock(TimelineSync.class), mock(NakadiSettings.class), timelineDbRepository, topicRepositoryHolder, new TransactionTemplate(mock(PlatformTransactionManager.class)), - new DefaultStorage(new Storage()), adminService, featureToggleService, "compacted-storage"); + new DefaultStorage(new Storage()), adminService, featureToggleService, "compacted-storage", + auditLogPublisher); @Test(expected = NotFoundException.class) public void testGetTimelinesNotFound() throws Exception { @@ -149,7 +153,7 @@ public void whenCreateTimelineForCompactedEventTypeThenException() throws Except when(adminService.isAdmin(any())).thenReturn(true); - timelineService.createTimeline("et1", "st1"); + timelineService.createTimeline("et1", "st1", Optional.empty()); } }