Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge branch 'master' into cache-admins
Browse files Browse the repository at this point in the history
  • Loading branch information
ferbncode authored Feb 20, 2019
2 parents dd69b6d + 7d2b13c commit 3a8dbfe
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 10 deletions.
29 changes: 25 additions & 4 deletions src/main/java/org/zalando/nakadi/controller/CursorsController.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
import org.springframework.web.context.request.NativeWebRequest;
import org.zalando.nakadi.domain.ItemsWrapper;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.exceptions.runtime.BlockedException;
import org.zalando.nakadi.exceptions.runtime.CursorsAreEmptyException;
import org.zalando.nakadi.exceptions.runtime.InternalNakadiException;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorException;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.runtime.NoSuchSubscriptionException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.service.BlacklistService;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.CursorTokenService;
import org.zalando.nakadi.service.CursorsService;
Expand All @@ -45,19 +48,26 @@ public class CursorsController {
private final CursorsService cursorsService;
private final CursorConverter cursorConverter;
private final CursorTokenService cursorTokenService;
private final BlacklistService blacklistService;

@Autowired
public CursorsController(final CursorsService cursorsService,
final CursorConverter cursorConverter,
final CursorTokenService cursorTokenService) {
final CursorTokenService cursorTokenService,
final BlacklistService blacklistService) {
this.cursorsService = cursorsService;
this.cursorConverter = cursorConverter;
this.cursorTokenService = cursorTokenService;
this.blacklistService = blacklistService;
}

@RequestMapping(path = "/subscriptions/{subscriptionId}/cursors", method = RequestMethod.GET)
public ItemsWrapper<SubscriptionCursor> getCursors(@PathVariable("subscriptionId") final String subscriptionId) {
public ItemsWrapper<SubscriptionCursor> getCursors(@PathVariable("subscriptionId") final String subscriptionId,
final Client client) {
try {
if (blacklistService.isSubscriptionConsumptionBlocked(subscriptionId, client.getClientId())) {
throw new BlockedException("Application or subscription is blocked");
}
final List<SubscriptionCursor> cursors = cursorsService.getSubscriptionCursors(subscriptionId)
.stream()
.map(cursor -> cursor.withToken(cursorTokenService.generateToken()))
Expand All @@ -71,7 +81,8 @@ public ItemsWrapper<SubscriptionCursor> getCursors(@PathVariable("subscriptionId
@RequestMapping(value = "/subscriptions/{subscriptionId}/cursors", method = RequestMethod.POST)
public ResponseEntity<?> commitCursors(@PathVariable("subscriptionId") final String subscriptionId,
@Valid @RequestBody final ItemsWrapper<SubscriptionCursor> cursorsIn,
@NotNull @RequestHeader("X-Nakadi-StreamId") final String streamId)
@NotNull @RequestHeader("X-Nakadi-StreamId") final String streamId,
final Client client)
throws NoSuchEventTypeException,
NoSuchSubscriptionException,
InvalidCursorException,
Expand All @@ -81,6 +92,11 @@ public ResponseEntity<?> commitCursors(@PathVariable("subscriptionId") final Str
if (cursors.isEmpty()) {
throw new CursorsAreEmptyException();
}

if (blacklistService.isSubscriptionConsumptionBlocked(subscriptionId, client.getClientId())) {
throw new BlockedException("Application or subscription is blocked");
}

final List<Boolean> items = cursorsService.commitCursors(streamId, subscriptionId, cursors);

final boolean allCommited = items.stream().allMatch(item -> item);
Expand All @@ -98,8 +114,13 @@ public ResponseEntity<?> commitCursors(@PathVariable("subscriptionId") final Str
public ResponseEntity<?> resetCursors(
@PathVariable("subscriptionId") final String subscriptionId,
@Valid @RequestBody final ItemsWrapper<SubscriptionCursorWithoutToken> cursors,
final NativeWebRequest request)
final NativeWebRequest request,
final Client client)
throws NoSuchEventTypeException, InvalidCursorException, InternalNakadiException {
if (blacklistService.isSubscriptionConsumptionBlocked(subscriptionId, client.getClientId())) {
throw new BlockedException("Application or subscription is blocked");
}

cursorsService.resetCursors(subscriptionId, convertToNakadiCursors(cursors));
return noContent().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.springframework.stereotype.Component;
import org.zalando.nakadi.exceptions.runtime.NoSuchSubscriptionException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;

import javax.annotation.PostConstruct;
Expand All @@ -27,17 +26,17 @@ public class BlacklistService {
private static final Logger LOG = LoggerFactory.getLogger(BlacklistService.class);
private static final String PATH_BLACKLIST = "/nakadi/blacklist";

private final SubscriptionDbRepository subscriptionDbRepository;
private final SubscriptionCache subscriptionCache;
private final ZooKeeperHolder zooKeeperHolder;
private final NakadiAuditLogPublisher auditLogPublisher;
private TreeCache blacklistCache;

@Autowired
public BlacklistService(final SubscriptionDbRepository subscriptionDbRepository,
public BlacklistService(final SubscriptionCache subscriptionCache,
final ZooKeeperHolder zooKeeperHolder,
final NakadiAuditLogPublisher auditLogPublisher) {
this.zooKeeperHolder = zooKeeperHolder;
this.subscriptionDbRepository = subscriptionDbRepository;
this.subscriptionCache = subscriptionCache;
this.auditLogPublisher = auditLogPublisher;
}

Expand Down Expand Up @@ -81,7 +80,7 @@ public boolean isConsumptionBlocked(final String etName, final String appId) {
public boolean isSubscriptionConsumptionBlocked(final String subscriptionId, final String appId) {
try {
return isSubscriptionConsumptionBlocked(
subscriptionDbRepository.getSubscription(subscriptionId).getEventTypes(), appId);
subscriptionCache.getSubscription(subscriptionId).getEventTypes(), appId);
} catch (final NoSuchSubscriptionException e) {
// It's fine, subscription doesn't exists.
} catch (final ServiceTemporarilyUnavailableException e) {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/zalando/nakadi/view/Cursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@

import javax.annotation.concurrent.Immutable;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;

@Immutable
public class Cursor {

public static final String BEFORE_OLDEST_OFFSET = "BEGIN";

@NotNull
@Size(min = 1, message = "cursor partition cannot be empty")
private final String partition;

@NotNull
@Size(min = 1, message = "cursor offset cannot be empty")
private final String offset;

public Cursor(@JsonProperty("partition") final String partition, @JsonProperty("offset") final String offset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableList;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.http.HttpStatus;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.test.web.servlet.MockMvc;
Expand All @@ -21,7 +22,9 @@
import org.zalando.nakadi.plugin.api.authz.AuthorizationService;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.security.ClientResolver;
import org.zalando.nakadi.service.BlacklistService;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.CursorTokenService;
import org.zalando.nakadi.service.CursorsService;
Expand All @@ -37,19 +40,22 @@
import java.util.stream.IntStream;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.patch;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.springframework.test.web.servlet.setup.MockMvcBuilders.standaloneSetup;
import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType;
import static org.zalando.nakadi.utils.TestUtils.buildTimelineWithTopic;
import static org.zalando.nakadi.utils.TestUtils.invalidProblem;
import static org.zalando.problem.Status.FORBIDDEN;
import static org.zalando.problem.Status.NOT_FOUND;
import static org.zalando.problem.Status.SERVICE_UNAVAILABLE;
import static org.zalando.problem.Status.UNPROCESSABLE_ENTITY;
Expand Down Expand Up @@ -78,6 +84,8 @@ public class CursorsControllerTest {
private final SubscriptionDbRepository subscriptionRepository;
private final CursorConverter cursorConverter;
private final AuthorizationService authorizationService;
private final Client client;
private final BlacklistService blacklistService;

public CursorsControllerTest() throws Exception {

Expand All @@ -96,7 +104,12 @@ public CursorsControllerTest() throws Exception {
final CursorTokenService tokenService = mock(CursorTokenService.class);
when(tokenService.generateToken()).thenReturn(TOKEN);

final CursorsController controller = new CursorsController(cursorsService, cursorConverter, tokenService);
client = mock(Client.class);

blacklistService = mock(BlacklistService.class);

final CursorsController controller = new CursorsController(cursorsService, cursorConverter, tokenService,
blacklistService);

final SecuritySettings settings = mock(SecuritySettings.class);
doReturn(SecuritySettings.AuthMode.OFF).when(settings).getAuthMode();
Expand Down Expand Up @@ -181,6 +194,51 @@ public void whenCommitCursorWithoutEventTypeThenUnprocessableEntity() throws Exc
invalidProblem("items[0].event_type", "may not be null"));
}

@Test
public void whenCommitCursorWithEmptyPartitionThenUnprocessableEntity() throws Exception {
checkForProblem(
postCursorsString("{\"items\":[{\"offset\":\"0\",\"partition\":\"\",\"cursor_token\":\"x\"," +
"\"event_type\":\"et\"}]}"),
invalidProblem("items[0].partition", "cursor partition cannot be empty"));
}

@Test
public void whenCommitCursorWithEmptyOffsetThenUnprocessableEntity() throws Exception {
checkForProblem(
postCursorsString("{\"items\":[{\"offset\":\"\",\"partition\":\"0\",\"cursor_token\":\"x\"," +
"\"event_type\":\"et\"}]}"),
invalidProblem("items[0].offset", "cursor offset cannot be empty"));
}

@Test
public void whenSubscriptionConsumptionBlockedThenAccessDeniedOnPostCommit() throws Exception {
Mockito.when(blacklistService.isSubscriptionConsumptionBlocked(anyString(), any())).thenReturn(true);
final Problem expectedProblem = Problem.valueOf(FORBIDDEN, "Application or subscription is blocked");
checkForProblem(
postCursorsString("{\"items\":[{\"offset\":\"0\",\"partition\":\"0\",\"cursor_token\":\"x\"," +
"\"event_type\":\"et\"}]}"),
expectedProblem);
}

@Test
public void whenSubscriptionConsumptionBlockedThenAccessDeniedOnPatchCommit() throws Exception {
Mockito.when(blacklistService.isSubscriptionConsumptionBlocked(anyString(), any())).thenReturn(true);
final Problem expectedProblem = Problem.valueOf(FORBIDDEN, "Application or subscription is blocked");
checkForProblem(
patchCursorsString("{\"items\":[{\"offset\":\"0\",\"partition\":\"0\",\"cursor_token\":\"x\"," +
"\"event_type\":\"et\"}]}"),
expectedProblem);
}

@Test
public void whenSubscriptionConsumptionBlockedThenAccessDeniedOnGetCursors() throws Exception {
Mockito.when(blacklistService.isSubscriptionConsumptionBlocked(anyString(), any())).thenReturn(true);
final Problem expectedProblem = Problem.valueOf(FORBIDDEN, "Application or subscription is blocked");
checkForProblem(
getCursors(),
expectedProblem);
}

private ResultActions getCursors() throws Exception {
final MockHttpServletRequestBuilder requestBuilder = get("/subscriptions/" + SUBSCRIPTION_ID + "/cursors");
return mockMvc.perform(requestBuilder);
Expand All @@ -198,6 +256,14 @@ private ResultActions postCursors(final List<SubscriptionCursor> cursors) throws
return postCursorsString(TestUtils.OBJECT_MAPPER.writeValueAsString(cursorsWrapper));
}

private ResultActions patchCursorsString(final String cursors) throws Exception {
final MockHttpServletRequestBuilder requestBuilder = patch("/subscriptions/" + SUBSCRIPTION_ID + "/cursors")
.header("X-Nakadi-StreamId", "test-stream-id")
.contentType(APPLICATION_JSON)
.content(cursors);
return mockMvc.perform(requestBuilder);
}

private ResultActions postCursorsString(final String cursors) throws Exception {
final MockHttpServletRequestBuilder requestBuilder = post("/subscriptions/" + SUBSCRIPTION_ID + "/cursors")
.header("X-Nakadi-StreamId", "test-stream-id")
Expand Down

0 comments on commit 3a8dbfe

Please sign in to comment.