From 2a81833836244cdc386f930a47d16a2167e50f5f Mon Sep 17 00:00:00 2001 From: Ricardo de Cillo Date: Tue, 5 Feb 2019 18:37:03 +0100 Subject: [PATCH 1/2] Cursor commit, patch and get protected by blacklisting CONSUMER_APP In case a consumer application goes out of control and starts hammering the cursors endpoints. It's important because these endpoints are fairly expensive since they hit a non scalable component: Zookeeper. We consider that an app which is forbidden from consumption of events is also forbidden from commits, which is part of the stream-commit consumption process. --- .../nakadi/controller/CursorsController.java | 29 ++++++-- .../java/org/zalando/nakadi/view/Cursor.java | 3 + .../controller/CursorsControllerTest.java | 68 ++++++++++++++++++- 3 files changed, 95 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/controller/CursorsController.java b/src/main/java/org/zalando/nakadi/controller/CursorsController.java index f4a68ff86f..5895eeeb3c 100644 --- a/src/main/java/org/zalando/nakadi/controller/CursorsController.java +++ b/src/main/java/org/zalando/nakadi/controller/CursorsController.java @@ -13,6 +13,7 @@ import org.springframework.web.context.request.NativeWebRequest; import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.NakadiCursor; +import org.zalando.nakadi.exceptions.runtime.BlockedException; import org.zalando.nakadi.exceptions.runtime.CursorsAreEmptyException; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; @@ -20,6 +21,8 @@ import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.runtime.NoSuchSubscriptionException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; +import org.zalando.nakadi.security.Client; +import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorTokenService; import org.zalando.nakadi.service.CursorsService; @@ -46,19 +49,26 @@ public class CursorsController { private final CursorsService cursorsService; private final CursorConverter cursorConverter; private final CursorTokenService cursorTokenService; + private final BlacklistService blacklistService; @Autowired public CursorsController(final CursorsService cursorsService, final CursorConverter cursorConverter, - final CursorTokenService cursorTokenService) { + final CursorTokenService cursorTokenService, + final BlacklistService blacklistService) { this.cursorsService = cursorsService; this.cursorConverter = cursorConverter; this.cursorTokenService = cursorTokenService; + this.blacklistService = blacklistService; } @RequestMapping(path = "/subscriptions/{subscriptionId}/cursors", method = RequestMethod.GET) - public ItemsWrapper getCursors(@PathVariable("subscriptionId") final String subscriptionId) { + public ItemsWrapper getCursors(@PathVariable("subscriptionId") final String subscriptionId, + final Client client) { try { + if (blacklistService.isSubscriptionConsumptionBlocked(subscriptionId, client.getClientId())) { + throw new BlockedException("Application or subscription is blocked"); + } final List cursors = cursorsService.getSubscriptionCursors(subscriptionId) .stream() .map(cursor -> cursor.withToken(cursorTokenService.generateToken())) @@ -72,7 +82,8 @@ public ItemsWrapper getCursors(@PathVariable("subscriptionId @RequestMapping(value = "/subscriptions/{subscriptionId}/cursors", method = RequestMethod.POST) public ResponseEntity commitCursors(@PathVariable("subscriptionId") final String subscriptionId, @Valid @RequestBody final ItemsWrapper cursorsIn, - @NotNull @RequestHeader("X-Nakadi-StreamId") final String streamId) + @NotNull @RequestHeader("X-Nakadi-StreamId") final String streamId, + final Client client) throws NoSuchEventTypeException, NoSuchSubscriptionException, InvalidCursorException, @@ -82,6 +93,11 @@ public ResponseEntity commitCursors(@PathVariable("subscriptionId") final Str if (cursors.isEmpty()) { throw new CursorsAreEmptyException(); } + + if (blacklistService.isSubscriptionConsumptionBlocked(subscriptionId, client.getClientId())) { + throw new BlockedException("Application or subscription is blocked"); + } + final List items = cursorsService.commitCursors(streamId, subscriptionId, cursors); final boolean allCommited = items.stream().allMatch(item -> item); @@ -99,8 +115,13 @@ public ResponseEntity commitCursors(@PathVariable("subscriptionId") final Str public ResponseEntity resetCursors( @PathVariable("subscriptionId") final String subscriptionId, @Valid @RequestBody final ItemsWrapper cursors, - final NativeWebRequest request) + final NativeWebRequest request, + final Client client) throws NoSuchEventTypeException, InvalidCursorException, InternalNakadiException { + if (blacklistService.isSubscriptionConsumptionBlocked(subscriptionId, client.getClientId())) { + throw new BlockedException("Application or subscription is blocked"); + } + cursorsService.resetCursors(subscriptionId, convertToNakadiCursors(cursors), getUser(request)); return noContent().build(); } diff --git a/src/main/java/org/zalando/nakadi/view/Cursor.java b/src/main/java/org/zalando/nakadi/view/Cursor.java index e268eaf1e5..b173e091ad 100644 --- a/src/main/java/org/zalando/nakadi/view/Cursor.java +++ b/src/main/java/org/zalando/nakadi/view/Cursor.java @@ -4,6 +4,7 @@ import javax.annotation.concurrent.Immutable; import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; @Immutable public class Cursor { @@ -11,9 +12,11 @@ public class Cursor { public static final String BEFORE_OLDEST_OFFSET = "BEGIN"; @NotNull + @Size(min = 1, message = "cursor partition cannot be empty") private final String partition; @NotNull + @Size(min = 1, message = "cursor offset cannot be empty") private final String offset; public Cursor(@JsonProperty("partition") final String partition, @JsonProperty("offset") final String offset) { diff --git a/src/test/java/org/zalando/nakadi/controller/CursorsControllerTest.java b/src/test/java/org/zalando/nakadi/controller/CursorsControllerTest.java index aadb07e40f..2bfc966c94 100644 --- a/src/test/java/org/zalando/nakadi/controller/CursorsControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/CursorsControllerTest.java @@ -2,6 +2,7 @@ import com.google.common.collect.ImmutableList; import org.junit.Test; +import org.mockito.Mockito; import org.springframework.http.HttpStatus; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.test.web.servlet.MockMvc; @@ -21,7 +22,9 @@ import org.zalando.nakadi.plugin.api.authz.AuthorizationService; import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.repository.db.SubscriptionDbRepository; +import org.zalando.nakadi.security.Client; import org.zalando.nakadi.security.ClientResolver; +import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorTokenService; import org.zalando.nakadi.service.CursorsService; @@ -37,12 +40,14 @@ import java.util.stream.IntStream; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.springframework.http.MediaType.APPLICATION_JSON; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.patch; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -50,6 +55,7 @@ import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; import static org.zalando.nakadi.utils.TestUtils.buildTimelineWithTopic; import static org.zalando.nakadi.utils.TestUtils.invalidProblem; +import static org.zalando.problem.Status.FORBIDDEN; import static org.zalando.problem.Status.NOT_FOUND; import static org.zalando.problem.Status.SERVICE_UNAVAILABLE; import static org.zalando.problem.Status.UNPROCESSABLE_ENTITY; @@ -78,6 +84,8 @@ public class CursorsControllerTest { private final SubscriptionDbRepository subscriptionRepository; private final CursorConverter cursorConverter; private final AuthorizationService authorizationService; + private final Client client; + private final BlacklistService blacklistService; public CursorsControllerTest() throws Exception { @@ -96,7 +104,12 @@ public CursorsControllerTest() throws Exception { final CursorTokenService tokenService = mock(CursorTokenService.class); when(tokenService.generateToken()).thenReturn(TOKEN); - final CursorsController controller = new CursorsController(cursorsService, cursorConverter, tokenService); + client = mock(Client.class); + + blacklistService = mock(BlacklistService.class); + + final CursorsController controller = new CursorsController(cursorsService, cursorConverter, tokenService, + blacklistService); final SecuritySettings settings = mock(SecuritySettings.class); doReturn(SecuritySettings.AuthMode.OFF).when(settings).getAuthMode(); @@ -181,6 +194,51 @@ public void whenCommitCursorWithoutEventTypeThenUnprocessableEntity() throws Exc invalidProblem("items[0].event_type", "may not be null")); } + @Test + public void whenCommitCursorWithEmptyPartitionThenUnprocessableEntity() throws Exception { + checkForProblem( + postCursorsString("{\"items\":[{\"offset\":\"0\",\"partition\":\"\",\"cursor_token\":\"x\"," + + "\"event_type\":\"et\"}]}"), + invalidProblem("items[0].partition", "cursor partition cannot be empty")); + } + + @Test + public void whenCommitCursorWithEmptyOffsetThenUnprocessableEntity() throws Exception { + checkForProblem( + postCursorsString("{\"items\":[{\"offset\":\"\",\"partition\":\"0\",\"cursor_token\":\"x\"," + + "\"event_type\":\"et\"}]}"), + invalidProblem("items[0].offset", "cursor offset cannot be empty")); + } + + @Test + public void whenSubscriptionConsumptionBlockedThenAccessDeniedOnPostCommit() throws Exception { + Mockito.when(blacklistService.isSubscriptionConsumptionBlocked(anyString(), any())).thenReturn(true); + final Problem expectedProblem = Problem.valueOf(FORBIDDEN, "Application or subscription is blocked"); + checkForProblem( + postCursorsString("{\"items\":[{\"offset\":\"0\",\"partition\":\"0\",\"cursor_token\":\"x\"," + + "\"event_type\":\"et\"}]}"), + expectedProblem); + } + + @Test + public void whenSubscriptionConsumptionBlockedThenAccessDeniedOnPatchCommit() throws Exception { + Mockito.when(blacklistService.isSubscriptionConsumptionBlocked(anyString(), any())).thenReturn(true); + final Problem expectedProblem = Problem.valueOf(FORBIDDEN, "Application or subscription is blocked"); + checkForProblem( + patchCursorsString("{\"items\":[{\"offset\":\"0\",\"partition\":\"0\",\"cursor_token\":\"x\"," + + "\"event_type\":\"et\"}]}"), + expectedProblem); + } + + @Test + public void whenSubscriptionConsumptionBlockedThenAccessDeniedOnGetCursors() throws Exception { + Mockito.when(blacklistService.isSubscriptionConsumptionBlocked(anyString(), any())).thenReturn(true); + final Problem expectedProblem = Problem.valueOf(FORBIDDEN, "Application or subscription is blocked"); + checkForProblem( + getCursors(), + expectedProblem); + } + private ResultActions getCursors() throws Exception { final MockHttpServletRequestBuilder requestBuilder = get("/subscriptions/" + SUBSCRIPTION_ID + "/cursors"); return mockMvc.perform(requestBuilder); @@ -198,6 +256,14 @@ private ResultActions postCursors(final List cursors) throws return postCursorsString(TestUtils.OBJECT_MAPPER.writeValueAsString(cursorsWrapper)); } + private ResultActions patchCursorsString(final String cursors) throws Exception { + final MockHttpServletRequestBuilder requestBuilder = patch("/subscriptions/" + SUBSCRIPTION_ID + "/cursors") + .header("X-Nakadi-StreamId", "test-stream-id") + .contentType(APPLICATION_JSON) + .content(cursors); + return mockMvc.perform(requestBuilder); + } + private ResultActions postCursorsString(final String cursors) throws Exception { final MockHttpServletRequestBuilder requestBuilder = post("/subscriptions/" + SUBSCRIPTION_ID + "/cursors") .header("X-Nakadi-StreamId", "test-stream-id") From fc27dae5274f122cc80b9c42fcbffb5f6a32be79 Mon Sep 17 00:00:00 2001 From: Ricardo de Cillo Date: Wed, 6 Feb 2019 12:26:13 +0100 Subject: [PATCH 2/2] Load subscription from cache That would have the side effect of addins a up to 5 minutes lag when changing blacklisting configuration, which should not be a problem. --- .../org/zalando/nakadi/service/BlacklistService.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/BlacklistService.java b/src/main/java/org/zalando/nakadi/service/BlacklistService.java index d7f4638fac..997d528c6e 100644 --- a/src/main/java/org/zalando/nakadi/service/BlacklistService.java +++ b/src/main/java/org/zalando/nakadi/service/BlacklistService.java @@ -10,7 +10,6 @@ import org.springframework.stereotype.Component; import org.zalando.nakadi.exceptions.runtime.NoSuchSubscriptionException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; -import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import javax.annotation.PostConstruct; @@ -27,17 +26,17 @@ public class BlacklistService { private static final Logger LOG = LoggerFactory.getLogger(BlacklistService.class); private static final String PATH_BLACKLIST = "/nakadi/blacklist"; - private final SubscriptionDbRepository subscriptionDbRepository; + private final SubscriptionCache subscriptionCache; private final ZooKeeperHolder zooKeeperHolder; private final NakadiAuditLogPublisher auditLogPublisher; private TreeCache blacklistCache; @Autowired - public BlacklistService(final SubscriptionDbRepository subscriptionDbRepository, + public BlacklistService(final SubscriptionCache subscriptionCache, final ZooKeeperHolder zooKeeperHolder, final NakadiAuditLogPublisher auditLogPublisher) { this.zooKeeperHolder = zooKeeperHolder; - this.subscriptionDbRepository = subscriptionDbRepository; + this.subscriptionCache = subscriptionCache; this.auditLogPublisher = auditLogPublisher; } @@ -81,7 +80,7 @@ public boolean isConsumptionBlocked(final String etName, final String appId) { public boolean isSubscriptionConsumptionBlocked(final String subscriptionId, final String appId) { try { return isSubscriptionConsumptionBlocked( - subscriptionDbRepository.getSubscription(subscriptionId).getEventTypes(), appId); + subscriptionCache.getSubscription(subscriptionId).getEventTypes(), appId); } catch (final NoSuchSubscriptionException e) { // It's fine, subscription doesn't exists. } catch (final ServiceTemporarilyUnavailableException e) {