Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1002 from zalando/aruha-2123-audit-log
Browse files Browse the repository at this point in the history
audit log
  • Loading branch information
v-stepanov authored Jan 28, 2019
2 parents cba71a5 + 6175bc6 commit c5ab265
Show file tree
Hide file tree
Showing 47 changed files with 869 additions and 168 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.zalando.nakadi.webservice;

import org.apache.http.HttpStatus;
import org.junit.Test;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
import org.zalando.nakadi.utils.RandomSubscriptionBuilder;
import org.zalando.nakadi.webservice.utils.TestStreamingClient;

import java.io.IOException;

import static com.jayway.restassured.RestAssured.given;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.END;
import static org.zalando.nakadi.utils.TestUtils.waitFor;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createEventType;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription;

public class AuditLogAT extends BaseAT {

private static final String AUDIT_LOG_ET = "nakadi.audit.log";

@Test
public void testAuditLogEventTypeIsCreated() {
given()
.get("/event-types/" + AUDIT_LOG_ET)
.then()
.statusCode(HttpStatus.SC_OK)
.body("name", equalTo("nakadi.audit.log"))
.body("owning_application", equalTo("stups_nakadi"));
}

@Test(timeout = 10000L)
public void testAuditLogEventIsSent() throws IOException, InterruptedException {
// subscribe to audit log
final SubscriptionBase subscriptionBase = RandomSubscriptionBuilder.builder()
.withEventType(AUDIT_LOG_ET)
.withStartFrom(END)
.buildSubscriptionBase();
final Subscription subscription = createSubscription(subscriptionBase);
final TestStreamingClient client = new TestStreamingClient(URL, subscription.getId(), "batch_limit=1");
client.start();

// create event type
createEventType();

// expect event to be posted to audit log as a reaction to event type creation
waitFor(() -> assertThat(client.getBatches().size(), greaterThan(0)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.zalando.nakadi.service.AuthorizationValidator;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.CursorsService;
import org.zalando.nakadi.service.NakadiAuditLogPublisher;
import org.zalando.nakadi.service.SubscriptionCache;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.zk.NewZkSubscriptionClient;
Expand Down Expand Up @@ -112,7 +113,8 @@ public void before() throws Exception {
uuidGenerator = mock(UUIDGenerator.class);
when(uuidGenerator.isUUID(any())).thenReturn(true);
cursorsService = new CursorsService(subscriptionRepo, subscriptionCache, null, mock(NakadiSettings.class),
zkSubscriptionFactory, cursorConverter, uuidGenerator, null, mock(AuthorizationValidator.class));
zkSubscriptionFactory, cursorConverter, uuidGenerator, null, mock(AuthorizationValidator.class),
mock(NakadiAuditLogPublisher.class));

// Register cursors in converter
registerNakadiCursor(NakadiCursor.of(buildTimeline(etName, topic, CREATED_AT), P1, NEW_OFFSET));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public class SettingsControllerAT extends BaseAT {
private static final CuratorFramework CURATOR = ZookeeperTestUtils.createCurator(ZOOKEEPER_URL);

@Test
public void testBlockFlooder() throws Exception {
public void testBlacklistConsumerByEventType() throws Exception {
final EventType eventType = NakadiTestUtils.createEventType();
blacklist(eventType.getName(), BlacklistService.Type.CONSUMER_ET);
Assert.assertNotNull(CURATOR.checkExists()
.forPath("/nakadi/blacklist/consumers/event_types/" + eventType.getName()));
}

@Test
public void testUnblockFlooder() throws Exception {
public void testWhitelistConsumerByEventType() throws Exception {
final EventType eventType = NakadiTestUtils.createEventType();
blacklist(eventType.getName(), BlacklistService.Type.CONSUMER_ET);

Expand All @@ -46,7 +46,7 @@ public void testUnblockFlooder() throws Exception {
}

@Test
public void testGetFlooders() throws Exception {
public void testGetBlacklist() throws Exception {
final EventType eventType = NakadiTestUtils.createEventType();
blacklist(eventType.getName(), BlacklistService.Type.CONSUMER_ET);
TestUtils.waitFor(
Expand Down
17 changes: 10 additions & 7 deletions src/main/java/org/zalando/nakadi/config/RepositoriesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.repository.zookeeper.ZookeeperConfig;
import org.zalando.nakadi.service.FeatureToggleService;
import org.zalando.nakadi.service.FeatureToggleService.Feature;
import org.zalando.nakadi.service.FeatureToggleService.FeatureWrapper;
import org.zalando.nakadi.service.FeatureToggleServiceZk;
import org.zalando.nakadi.service.timeline.TimelineSync;
import org.zalando.nakadi.validation.EventBodyMustRespectSchema;
import org.zalando.nakadi.validation.EventMetadataValidationStrategy;
import org.zalando.nakadi.validation.JsonSchemaEnrichment;
import org.zalando.nakadi.validation.ValidationStrategy;

import java.util.Optional;
import java.util.Set;

@Configuration
Expand All @@ -34,16 +37,16 @@ public class RepositoriesConfig {

@Profile({"acceptanceTest", "local"})
@Bean
public FeatureToggleService featureToggleServiceLocal(
final ZooKeeperHolder zooKeeperHolder, final FeaturesConfig featuresConfig) {
public FeatureToggleService featureToggleServiceLocal(final ZooKeeperHolder zooKeeperHolder,
final FeaturesConfig featuresConfig) {
final FeatureToggleService featureToggleService = new FeatureToggleServiceZk(zooKeeperHolder);
if (featuresConfig.containsDefaults()) {
final Set<String> features = featuresConfig.getFeaturesWithDefaultState();
for (final String feature: features) {
LOG.info("Setting feature {} to {}", feature, featuresConfig.getDefaultState(feature));
featureToggleService.setFeature(
new FeatureToggleService.FeatureWrapper(FeatureToggleService.Feature.valueOf(feature),
featuresConfig.getDefaultState(feature)));
for (final String featureStr : features) {
final boolean defaultState = featuresConfig.getDefaultState(featureStr);
LOG.info("Setting feature {} to {}", featureStr, defaultState);
final FeatureWrapper featureWrapper = new FeatureWrapper(Feature.valueOf(featureStr), defaultState);
featureToggleService.setFeature(featureWrapper, Optional.empty());
}
}
return featureToggleService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.NativeWebRequest;
import org.zalando.nakadi.domain.ItemsWrapper;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.exceptions.runtime.CursorsAreEmptyException;
Expand All @@ -35,6 +36,7 @@

import static org.springframework.http.ResponseEntity.noContent;
import static org.springframework.http.ResponseEntity.ok;
import static org.zalando.nakadi.util.RequestUtils.getUser;

@RestController
public class CursorsController {
Expand Down Expand Up @@ -96,9 +98,10 @@ public ResponseEntity<?> commitCursors(@PathVariable("subscriptionId") final Str
@RequestMapping(value = "/subscriptions/{subscriptionId}/cursors", method = RequestMethod.PATCH)
public ResponseEntity<?> resetCursors(
@PathVariable("subscriptionId") final String subscriptionId,
@Valid @RequestBody final ItemsWrapper<SubscriptionCursorWithoutToken> cursors)
@Valid @RequestBody final ItemsWrapper<SubscriptionCursorWithoutToken> cursors,
final NativeWebRequest request)
throws NoSuchEventTypeException, InvalidCursorException, InternalNakadiException {
cursorsService.resetCursors(subscriptionId, convertToNakadiCursors(cursors));
cursorsService.resetCursors(subscriptionId, convertToNakadiCursors(cursors), getUser(request));
return noContent().build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.springframework.http.ResponseEntity.status;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_EVENT_TYPE_CREATION;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_EVENT_TYPE_DELETION;
import static org.zalando.nakadi.util.RequestUtils.getUser;

@RestController
@RequestMapping(value = "/event-types")
Expand Down Expand Up @@ -88,13 +89,13 @@ public ResponseEntity<?> create(@Valid @RequestBody final EventTypeBase eventTyp
throw new ValidationException(errors);
}

eventTypeService.create(eventType);
eventTypeService.create(eventType, getUser(request));

return ResponseEntity.status(HttpStatus.CREATED).headers(generateWarningHeaders(eventType)).build();
}

@RequestMapping(value = "/{name:.+}", method = RequestMethod.DELETE)
public ResponseEntity<?> delete(@PathVariable("name") final String eventTypeName)
public ResponseEntity<?> delete(@PathVariable("name") final String eventTypeName, final NativeWebRequest request)
throws EventTypeDeletionException,
AccessDeniedException,
NoSuchEventTypeException,
Expand All @@ -105,7 +106,7 @@ public ResponseEntity<?> delete(@PathVariable("name") final String eventTypeName
throw new ForbiddenOperationException("Event Type deletion is disabled");
}

eventTypeService.delete(eventTypeName);
eventTypeService.delete(eventTypeName, getUser(request));

return status(HttpStatus.OK).build();
}
Expand All @@ -127,7 +128,7 @@ public ResponseEntity<?> update(
throw new ValidationException(errors);
}

eventTypeService.update(name, eventType);
eventTypeService.update(name, eventType, getUser(request));

return status(HttpStatus.OK).headers(generateWarningHeaders(eventType)).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.http.HttpHeaders.CONTENT_LOCATION;
import static org.springframework.http.HttpStatus.OK;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.DISABLE_SUBSCRIPTION_CREATION;
import static org.zalando.nakadi.util.RequestUtils.getUser;


@RestController
Expand Down Expand Up @@ -63,7 +64,8 @@ public ResponseEntity<?> createOrGetSubscription(@Valid @RequestBody final Subsc
throw new SubscriptionCreationDisabledException("Subscription creation is temporarily unavailable");
}
try {
final Subscription subscription = subscriptionService.createSubscription(subscriptionBase);
final Subscription subscription = subscriptionService.createSubscription(subscriptionBase,
getUser(request));
return prepareLocationResponse(subscription);
} catch (final DuplicatedSubscriptionException ex) {
throw new InconsistentStateException("Unexpected problem occurred when creating subscription", ex);
Expand All @@ -83,7 +85,7 @@ public ResponseEntity<?> updateSubscription(
if (errors.hasErrors()) {
throw new ValidationException(errors);
}
subscriptionService.updateSubscription(subscriptionId, subscription);
subscriptionService.updateSubscription(subscriptionId, subscription, getUser(request));
return ResponseEntity.noContent().build();

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.NativeWebRequest;
import org.zalando.nakadi.domain.ItemsWrapper;
import org.zalando.nakadi.domain.ResourceAuthorization;
import org.zalando.nakadi.exceptions.runtime.ForbiddenOperationException;
Expand All @@ -20,6 +21,7 @@
import javax.validation.Valid;

import static org.zalando.nakadi.domain.ResourceImpl.ADMIN_RESOURCE;
import static org.zalando.nakadi.util.RequestUtils.getUser;

@RestController
@RequestMapping(value = "/settings")
Expand All @@ -29,7 +31,6 @@ public class SettingsController {
private final FeatureToggleService featureToggleService;
private final AdminService adminService;


@Autowired
public SettingsController(final BlacklistService blacklistService,
final FeatureToggleService featureToggleService,
Expand All @@ -49,23 +50,25 @@ public ResponseEntity<?> getBlacklist() throws ForbiddenOperationException {

@RequestMapping(value = "/blacklist/{blacklist_type}/{name}", method = RequestMethod.PUT)
public ResponseEntity blacklist(@PathVariable("blacklist_type") final BlacklistService.Type blacklistType,
@PathVariable("name") final String name)
@PathVariable("name") final String name,
final NativeWebRequest request)
throws ForbiddenOperationException {
if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) {
throw new ForbiddenOperationException("Admin privileges are required to perform this operation");
}
blacklistService.blacklist(name, blacklistType);
blacklistService.blacklist(name, blacklistType, getUser(request));
return ResponseEntity.noContent().build();
}

@RequestMapping(value = "/blacklist/{blacklist_type}/{name}", method = RequestMethod.DELETE)
public ResponseEntity whitelist(@PathVariable("blacklist_type") final BlacklistService.Type blacklistType,
@PathVariable("name") final String name)
@PathVariable("name") final String name,
final NativeWebRequest request)
throws ForbiddenOperationException {
if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) {
throw new ForbiddenOperationException("Admin privileges are required to perform this operation");
}
blacklistService.whitelist(name, blacklistType);
blacklistService.whitelist(name, blacklistType, getUser(request));
return ResponseEntity.noContent().build();
}

Expand All @@ -79,12 +82,13 @@ public ResponseEntity<?> getFeatures()
}

@RequestMapping(path = "/features", method = RequestMethod.POST)
public ResponseEntity<?> setFeature(@RequestBody final FeatureToggleService.FeatureWrapper featureWrapper)
public ResponseEntity<?> setFeature(@RequestBody final FeatureToggleService.FeatureWrapper featureWrapper,
final NativeWebRequest request)
throws ForbiddenOperationException {
if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) {
throw new ForbiddenOperationException("Admin privileges are required to perform this operation");
}
featureToggleService.setFeature(featureWrapper);
featureToggleService.setFeature(featureWrapper, getUser(request));
return ResponseEntity.noContent().build();
}

Expand All @@ -98,15 +102,16 @@ public ResponseEntity<?> getAdmins() throws ForbiddenOperationException {

@RequestMapping(path = "/admins", method = RequestMethod.POST)
public ResponseEntity<?> updateAdmins(@Valid @RequestBody final ResourceAuthorization authz,
final Errors errors)
final Errors errors,
final NativeWebRequest request)
throws ValidationException, ForbiddenOperationException {
if (!adminService.isAdmin(AuthorizationService.Operation.ADMIN)) {
throw new ForbiddenOperationException("Admin privileges are required to perform this operation");
}
if (errors.hasErrors()) {
throw new ValidationException(errors);
}
adminService.updateAdmins(authz.toPermissionsList(ADMIN_RESOURCE));
adminService.updateAdmins(authz.toPermissionsList(ADMIN_RESOURCE), getUser(request));
return ResponseEntity.ok().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.springframework.http.HttpStatus.NO_CONTENT;
import static org.springframework.http.HttpStatus.OK;
import static org.springframework.http.ResponseEntity.status;
import static org.zalando.nakadi.util.RequestUtils.getUser;

@RestController
public class StoragesController {
Expand Down Expand Up @@ -60,7 +61,7 @@ public ResponseEntity<?> createStorage(@RequestBody final String storage,
if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) {
throw new ForbiddenOperationException("Admin privileges required to perform this operation");
}
storageService.createStorage(new JSONObject(storage));
storageService.createStorage(new JSONObject(storage), getUser(request));
return status(CREATED).build();
}

Expand All @@ -81,7 +82,7 @@ public ResponseEntity<?> deleteStorage(@PathVariable("id") final String id, fina
if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) {
throw new ForbiddenOperationException("Admin privileges required to perform this operation");
}
storageService.deleteStorage(id);
storageService.deleteStorage(id, getUser(request));
return status(NO_CONTENT).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.springframework.http.HttpStatus.NO_CONTENT;
import static org.springframework.http.HttpStatus.OK;
import static org.springframework.http.ResponseEntity.status;
import static org.zalando.nakadi.util.RequestUtils.getUser;


@RestController
Expand Down Expand Up @@ -65,7 +66,7 @@ public ResponseEntity<?> deleteSubscription(@PathVariable("id") final String sub
final NativeWebRequest request)
throws DbWriteOperationsBlockedException, NoSuchSubscriptionException, NoSuchEventTypeException,
ServiceTemporarilyUnavailableException, InternalNakadiException {
subscriptionService.deleteSubscription(subscriptionId);
subscriptionService.deleteSubscription(subscriptionId, getUser(request));
return status(NO_CONTENT).build();
}

Expand Down
Loading

0 comments on commit c5ab265

Please sign in to comment.