diff --git a/CHANGELOG.md b/CHANGELOG.md index 9013803cac..e9cf88e55c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## [2.6.0] - 2018-03-26 + +### Added +- Allow to select partitions to read from a subscription + ## [2.5.10] - 2018-03-26 ### Added diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java index 39f0fa0734..202d87fa4c 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java @@ -97,7 +97,7 @@ public void before() throws Exception { final TimelineService timelineService = mock(TimelineService.class); when(timelineService.getTopicRepository((Timeline) any())).thenReturn(topicRepository); timeline = buildTimeline(etName, topic, CREATED_AT); - when(timelineService.getActiveTimeline(any())).thenReturn(timeline); + when(timelineService.getActiveTimeline(any(EventType.class))).thenReturn(timeline); final Subscription subscription = mock(Subscription.class); when(subscription.getId()).thenReturn(sid); diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index 9c787af5a8..fbe206c818 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -6,6 +6,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.http.HttpStatus; +import org.hamcrest.Matchers; import org.hamcrest.core.StringContains; import org.junit.Assert; import org.junit.Before; @@ -31,6 +32,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -49,12 +51,15 @@ import static org.hamcrest.Matchers.not; import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN; import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.END; +import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.AUTO; +import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.DIRECT; import static org.zalando.nakadi.utils.TestUtils.waitFor; import static org.zalando.nakadi.webservice.hila.StreamBatch.MatcherIgnoringToken.equalToBatchIgnoringToken; import static org.zalando.nakadi.webservice.hila.StreamBatch.singleEventBatch; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createEventType; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription; +import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.getNumberOfAssignedStreams; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvent; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvents; import static org.zalando.nakadi.webservice.utils.TestStreamingClient.SESSION_ID_UNKNOWN; @@ -295,8 +300,12 @@ public void testGetSubscriptionStat() throws Exception { List subscriptionStats = Collections.singletonList(new SubscriptionEventTypeStats( eventType.getName(), - Collections.singletonList( - new SubscriptionEventTypeStats.Partition("0", "assigned", 15L, client.getSessionId()))) + Collections.singletonList(new SubscriptionEventTypeStats.Partition( + "0", + "assigned", + 15L, + client.getSessionId(), + AUTO))) ); NakadiTestUtils.getSubscriptionStat(subscription) .then() @@ -309,14 +318,42 @@ public void testGetSubscriptionStat() throws Exception { subscriptionStats = Collections.singletonList(new SubscriptionEventTypeStats( eventType.getName(), - Collections.singletonList( - new SubscriptionEventTypeStats.Partition("0", "assigned", 5L, client.getSessionId()))) + Collections.singletonList(new SubscriptionEventTypeStats.Partition( + "0", + "assigned", + 5L, + client.getSessionId(), + AUTO))) ); NakadiTestUtils.getSubscriptionStat(subscription) .then() .content(new StringContains(JSON_TEST_HELPER.asJsonString(new ItemsWrapper<>(subscriptionStats)))); } + @Test(timeout = 10000) + public void testGetSubscriptionStatWhenDirectAssignment() throws Exception { + // connect with 1 stream directly requesting the partition + final TestStreamingClient client = new TestStreamingClient(URL, subscription.getId(), "", + Optional.empty(), + Optional.of("{\"partitions\":[" + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"0\"}]}")); + client.start(); + // wait for rebalance to finish + waitFor(() -> assertThat(getNumberOfAssignedStreams(subscription.getId()), Matchers.is(1))); + + NakadiTestUtils.getSubscriptionStat(subscription) + .then() + .content(new StringContains(JSON_TEST_HELPER.asJsonString(new SubscriptionEventTypeStats( + eventType.getName(), + Collections.singletonList(new SubscriptionEventTypeStats.Partition( + "0", + "assigned", + 0L, + client.getSessionId(), + DIRECT + )))))); + } + @Test public void testSubscriptionStatsMultiET() throws IOException { final List eventTypes = Lists.newArrayList(createEventType(), createEventType()); @@ -345,7 +382,8 @@ public void testSubscriptionStatsMultiET() throws IOException { "0", "assigned", 1L, - client.getSessionId() + client.getSessionId(), + AUTO )))))) .content(new StringContains(JSON_TEST_HELPER.asJsonString(new SubscriptionEventTypeStats( eventTypes.get(1).getName(), @@ -353,7 +391,8 @@ public void testSubscriptionStatsMultiET() throws IOException { "0", "assigned", 2L, - client.getSessionId() + client.getSessionId(), + AUTO )))))); client.close(); } diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRebalanceAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRebalanceAT.java index 555419f9c0..f7522c9e25 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRebalanceAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRebalanceAT.java @@ -1,7 +1,9 @@ package org.zalando.nakadi.webservice.hila; +import com.google.common.collect.ImmutableSet; import org.junit.Before; import org.junit.Test; +import org.springframework.http.HttpStatus; import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionBase; @@ -14,6 +16,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import static com.google.common.collect.Sets.intersection; import static java.util.stream.Collectors.toList; @@ -28,6 +31,7 @@ import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createBusinessEventTypeWithPartitions; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription; +import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.getNumberOfAssignedStreams; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishBusinessEventWithUserDefinedPartition; public class HilaRebalanceAT extends BaseAT { @@ -80,8 +84,8 @@ public void whenRebalanceThenPartitionsAreEquallyDistributedAndCommittedOffsetsA .create(URL, subscription.getId(), "stream_limit=20&max_uncommitted_events=100") .start(); - // wait for rebalance process to start - Thread.sleep(1000); + // wait for rebalance process to finish + waitFor(() -> assertThat(getNumberOfAssignedStreams(subscription.getId()), is(2))); // write 5 more events to each partition publishBusinessEventWithUserDefinedPartition( @@ -90,7 +94,7 @@ public void whenRebalanceThenPartitionsAreEquallyDistributedAndCommittedOffsetsA x -> "blah_" + x, x -> String.valueOf(x % 8)); - // wait till all event arrive + // wait till all events arrive waitFor(() -> assertThat(clientB.getBatches(), hasSize(20))); waitFor(() -> assertThat(clientA.getBatches(), hasSize(60))); @@ -114,8 +118,8 @@ public void whenRebalanceThenPartitionsAreEquallyDistributedAndCommittedOffsetsA commitCursors(subscription.getId(), lastCursorsB, clientB.getSessionId()); waitFor(() -> assertThat(clientB.isRunning(), is(false))); - // wait for rebalance process to start - Thread.sleep(1000); + // wait for rebalance process to finish + waitFor(() -> assertThat(getNumberOfAssignedStreams(subscription.getId()), is(1))); // write 5 more events to each partition publishBusinessEventWithUserDefinedPartition( @@ -125,6 +129,132 @@ public void whenRebalanceThenPartitionsAreEquallyDistributedAndCommittedOffsetsA waitFor(() -> assertThat(clientA.getBatches(), hasSize(100))); } + @Test(timeout = 15000) + public void whenDirectlyRequestPartitionsTheyAssignedCorrectly() throws IOException { + // publish 2 events to each partition + publishBusinessEventWithUserDefinedPartition( + eventType.getName(), 16, x -> "blah" + x, x -> String.valueOf(x % 8)); + + // start a stream requesting to read from partitions 5, 6 + final TestStreamingClient client = new TestStreamingClient(URL, subscription.getId(), "", Optional.empty(), + Optional.of("{\"partitions\":[" + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}," + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"5\"}" + + "]}")); + client.start(); + + // wait till we receive 4 batches (2 per partition) + waitFor(() -> assertThat(client.getBatches(), hasSize(4))); + + // check that all batches are from partitions 5, 6 + checkAllEventsAreFromPartitions(client, ImmutableSet.of("5", "6")); + } + + @Test(timeout = 15000) + public void whenMixedStreamsThenPartitionsAssignedCorrectly() throws IOException, InterruptedException { + + // start 2 streams not specifying partitions directly + final TestStreamingClient autoClient1 = new TestStreamingClient(URL, subscription.getId(), ""); + autoClient1.start(); + final TestStreamingClient autoClient2 = new TestStreamingClient(URL, subscription.getId(), ""); + autoClient2.start(); + + // start a stream requesting to read from partition 6 + final TestStreamingClient directClient1 = new TestStreamingClient(URL, subscription.getId(), "", + Optional.empty(), + Optional.of("{\"partitions\":[" + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}]}")); + directClient1.start(); + + // start a stream requesting to read from partition 7 + final TestStreamingClient directClient2 = new TestStreamingClient(URL, subscription.getId(), "", + Optional.empty(), + Optional.of("{\"partitions\":[" + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"7\"}]}")); + directClient2.start(); + + // wait for rebalance to finish + waitFor(() -> assertThat(getNumberOfAssignedStreams(subscription.getId()), is(4))); + + // publish 2 events to each partition + publishBusinessEventWithUserDefinedPartition( + eventType.getName(), 16, x -> "blah" + x, x -> String.valueOf(x % 8)); + + // we should receive 2 batches for streams that directly requested 1 partition + waitFor(() -> assertThat(directClient1.getBatches(), hasSize(2))); + checkAllEventsAreFromPartitions(directClient1, ImmutableSet.of("6")); + + waitFor(() -> assertThat(directClient2.getBatches(), hasSize(2))); + checkAllEventsAreFromPartitions(directClient2, ImmutableSet.of("7")); + + // we should receive 6 batches for streams that use auto balance (they read 3 partitions each) + waitFor(() -> assertThat(autoClient1.getBatches(), hasSize(6))); + waitFor(() -> assertThat(autoClient2.getBatches(), hasSize(6))); + } + + @Test(timeout = 15000) + public void checkDirectAssignmentCorrectlyCapturesAndReleasesPartition() throws IOException, InterruptedException { + // launch two clients: one using auto-rebalance, second one directly reading from partition 6 + final TestStreamingClient autoClient = new TestStreamingClient(URL, subscription.getId(), + "max_uncommitted_events=100"); + autoClient.start(); + final TestStreamingClient directClient = new TestStreamingClient(URL, subscription.getId(), "", + Optional.empty(), + Optional.of("{\"stream_limit\":1,\"partitions\":[" + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}]}")); + directClient.start(); + + // wait for rebalance to finish and send 1 event to each partition + waitFor(() -> assertThat(getNumberOfAssignedStreams(subscription.getId()), is(2))); + publishBusinessEventWithUserDefinedPartition( + eventType.getName(), 8, x -> "blah" + x, x -> String.valueOf(x % 8)); + + waitFor(() -> assertThat(autoClient.getBatches(), hasSize(7))); + checkAllEventsAreFromPartitions(autoClient, ImmutableSet.of("0", "1", "2", "3", "4", "5", "7")); + waitFor(() -> assertThat(directClient.getBatches(), hasSize(1))); + checkAllEventsAreFromPartitions(directClient, ImmutableSet.of("6")); + + // commit cursors and wait for stream to be closed (because of reaching stream_limit) + commitCursors( + subscription.getId(), + directClient.getBatches().stream().map(StreamBatch::getCursor).collect(Collectors.toList()), + directClient.getSessionId()); + waitFor(() -> assertThat(directClient.isRunning(), is(false))); + + + // send 1 event to each partition + publishBusinessEventWithUserDefinedPartition( + eventType.getName(), 8, x -> "blah" + x, x -> String.valueOf(x % 8)); + + // the client with auto-balancing should now get all 8 new events + waitFor(() -> assertThat(autoClient.getBatches(), hasSize(7 + 8))); + checkAllEventsAreFromPartitions(autoClient, ImmutableSet.of("0", "1", "2", "3", "4", "5", "6", "7")); + } + + @Test(timeout = 15000) + public void whenTwoStreamsDirectlyRequestOnePartitionThenConflict() throws IOException, InterruptedException { + + // first stream wants to read partition 6 + final TestStreamingClient client1 = new TestStreamingClient(URL, subscription.getId(), "", Optional.empty(), + Optional.of("{\"partitions\":[" + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}" + + "]}")); + client1.start(); + waitFor(() -> assertThat(client1.getResponseCode(), is(HttpStatus.OK.value()))); + + // second stream wants to read partitions 5 and 6 + final TestStreamingClient client2 = new TestStreamingClient(URL, subscription.getId(), "", Optional.empty(), + Optional.of("{\"partitions\":[" + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"5\"}," + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}" + + "]}")); + client2.start(); + + // check that second client was disconnected and received status code 409 + waitFor(() -> assertThat(client2.isRunning(), is(false))); + assertThat(client2.getResponseCode(), is(HttpStatus.CONFLICT.value())); + } + @Test(timeout = 15000) public void whenNotCommittedThenEventsAreReplayedAfterRebalance() { publishBusinessEventWithUserDefinedPartition( @@ -143,6 +273,34 @@ public void whenNotCommittedThenEventsAreReplayedAfterRebalance() { waitFor(() -> assertThat(clientB.getBatches(), hasSize(2)), 10000); } + @Test(timeout = 15000) + public void whenNoFreeSlotsForAutoClientThenConflict() throws IOException, InterruptedException { + // direct client reads all but one partition + final TestStreamingClient directClient = new TestStreamingClient(URL, subscription.getId(), "", + Optional.empty(), + Optional.of("{\"partitions\":[" + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"0\"}," + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"1\"}," + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"2\"}," + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"3\"}," + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"4\"}," + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"5\"}," + + "{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}" + + "]}")); + directClient.start(); + waitFor(() -> assertThat(directClient.getResponseCode(), is(HttpStatus.OK.value()))); + + // first auto-balance client reads remaining partition + final TestStreamingClient autoClient1 = new TestStreamingClient(URL, subscription.getId(), ""); + autoClient1.start(); + waitFor(() -> assertThat(autoClient1.getResponseCode(), is(HttpStatus.OK.value()))); + + // second auto-balance client has nothing to read - should get 409 (Conflict) status code + final TestStreamingClient autoClient2 = new TestStreamingClient(URL, subscription.getId(), ""); + autoClient2.start(); + waitFor(() -> assertThat(autoClient2.getResponseCode(), is(HttpStatus.CONFLICT.value()))); + } + public List getLastCursorsForPartitions(final TestStreamingClient client, final Set partitions) { if (!client.getBatches().isEmpty()) { @@ -159,6 +317,20 @@ public List getLastCursorsForPartitions(final TestStreamingC } } + private void checkAllEventsAreFromPartitions(final TestStreamingClient clientA, final Set partitions) { + // check that all batches belong to the specified set of partitions + final List batches = clientA.getBatches(); + final long batchesFromCorrectPartitions = batches.stream() + .filter(b -> partitions.contains(b.getCursor().getPartition())) + .count(); + assertThat(batchesFromCorrectPartitions, is((long) batches.size())); + // check that all partitions are present in batches + final long partitionsWithNoBatch = partitions.stream() + .filter(p -> !batches.stream().anyMatch(b -> b.getCursor().getPartition().equals(p))) + .count(); + assertThat(partitionsWithNoBatch, is(0L)); + } + private Set getUniquePartitionsStreamedToClient(final TestStreamingClient client) { return getUniquePartitionsStreamedToClient(client, 0, client.getBatches().size()); } diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java index c3931d51a1..e1f5fb01ed 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java @@ -32,6 +32,8 @@ import org.zalando.nakadi.webservice.utils.NakadiTestUtils; import org.zalando.nakadi.webservice.utils.TestStreamingClient; import org.zalando.nakadi.webservice.utils.ZookeeperTestUtils; +import org.zalando.problem.MoreStatus; +import org.zalando.problem.Problem; import java.io.IOException; import java.util.List; @@ -388,6 +390,47 @@ public void whenStatsOnNotInitializedSubscriptionThanCorrectResponse() throws IO } } + @Test + public void whenStreamDuplicatePartitionsThenUnprocessableEntity() throws IOException { + final String et = createEventType().getName(); + final Subscription s = createSubscriptionForEventType(et); + + final String body = "{\"partitions\":[" + + "{\"event_type\":\"et1\",\"partition\":\"0\"}," + + "{\"event_type\":\"et1\",\"partition\":\"0\"}]}"; + given().body(body) + .contentType(JSON) + .when() + .post("/subscriptions/{sid}/events", s.getId()) + .then() + .statusCode(HttpStatus.SC_UNPROCESSABLE_ENTITY) + .body(JSON_HELPER.matchesObject(Problem.valueOf( + MoreStatus.UNPROCESSABLE_ENTITY, + "Duplicated partition specified"))); + } + + @Test + public void whenStreamWrongPartitionsThenUnprocessableEntity() throws IOException { + final String et = createEventType().getName(); + final Subscription s = createSubscriptionForEventType(et); + + final String body = "{\"partitions\":[" + + "{\"event_type\":\"" + et + "\",\"partition\":\"0\"}," + + "{\"event_type\":\"" + et + "\",\"partition\":\"1\"}," + + "{\"event_type\":\"dummy-et-123\",\"partition\":\"0\"}]}"; + given().body(body) + .contentType(JSON) + .when() + .post("/subscriptions/{sid}/events", s.getId()) + .then() + .statusCode(HttpStatus.SC_UNPROCESSABLE_ENTITY) + .body(JSON_HELPER.matchesObject(Problem.valueOf( + MoreStatus.UNPROCESSABLE_ENTITY, + "Wrong partitions specified - some partitions don't belong to subscription: " + + "EventTypePartition{eventType='" + et + "', partition='1'}, " + + "EventTypePartition{eventType='dummy-et-123', partition='0'}"))); + } + private Response commitCursors(final Subscription subscription, final String cursor, final String streamId) { return given() .body(cursor) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/NakadiTestUtils.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/NakadiTestUtils.java index a6776f18f3..729c4470e2 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/NakadiTestUtils.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/NakadiTestUtils.java @@ -22,6 +22,7 @@ import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionBase; +import org.zalando.nakadi.domain.SubscriptionEventTypeStats; import org.zalando.nakadi.partitioning.PartitionStrategy; import org.zalando.nakadi.utils.EventTypeTestBuilder; import org.zalando.nakadi.utils.RandomSubscriptionBuilder; @@ -37,6 +38,7 @@ import java.util.stream.IntStream; import static com.jayway.restassured.RestAssured.given; +import static com.jayway.restassured.RestAssured.when; import static com.jayway.restassured.http.ContentType.JSON; import static java.text.MessageFormat.format; import static org.springframework.http.HttpStatus.OK; @@ -252,4 +254,25 @@ public static EventType getEventType(final String name) throws IOException { .get("/event-types/{name}", name) .getBody().asString(), EventType.class); } + + public static int getNumberOfAssignedStreams(final String sid) { + final Response response = when().get("/subscriptions/{sid}/stats", sid).thenReturn(); + final ItemsWrapper statsItems; + try { + statsItems = MAPPER.readValue( + response.print(), + new TypeReference>() { + }); + } catch (final IOException e) { + throw new AssertionError("Failed to get stats", e); + } + final long assignedUniqueStreamsCount = statsItems.getItems() + .stream() + .flatMap(stat -> stat.getPartitions().stream()) + .filter(p -> "assigned".equals(p.getState())) + .map(SubscriptionEventTypeStats.Partition::getStreamId) + .distinct() + .count(); + return (int) assignedUniqueStreamsCount; + } } diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java index b763428e17..220905712d 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,6 +13,7 @@ import javax.annotation.Nullable; import java.io.BufferedReader; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -43,11 +45,13 @@ public class TestStreamingClient implements Runnable { private HttpURLConnection connection; private String sessionId; private Optional token; + private final Optional bodyParams; private volatile int responseCode; private Consumer> batchesListener; private final CountDownLatch started = new CountDownLatch(1); - public TestStreamingClient(final String baseUrl, final String subscriptionId, final String params) { + public TestStreamingClient(final String baseUrl, final String subscriptionId, final String params, + final Optional token, final Optional bodyParams) { this.baseUrl = baseUrl; this.subscriptionId = subscriptionId; this.params = params; @@ -56,12 +60,17 @@ public TestStreamingClient(final String baseUrl, final String subscriptionId, fi this.sessionId = SESSION_ID_UNKNOWN; this.token = Optional.empty(); this.headers = new ConcurrentHashMap<>(); + this.token = token; + this.bodyParams = bodyParams; + } + + public TestStreamingClient(final String baseUrl, final String subscriptionId, final String params) { + this(baseUrl, subscriptionId, params, Optional.empty(), Optional.empty()); } public TestStreamingClient(final String baseUrl, final String subscriptionId, final String params, final Optional token) { - this(baseUrl, subscriptionId, params); - this.token = token; + this(baseUrl, subscriptionId, params, token, Optional.empty()); } public static TestStreamingClient create(final String baseUrl, final String subscriptionId, final String params) { @@ -79,6 +88,17 @@ public void run() { final String url = format("{0}/subscriptions/{1}/events?{2}", baseUrl, subscriptionId, params); connection = (HttpURLConnection) new URL(url).openConnection(); token.ifPresent(token -> connection.setRequestProperty("Authorization", "Bearer " + token)); + + if (bodyParams.isPresent()) { + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/json"); + try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) { + wr.write(bodyParams.get().getBytes(Charsets.UTF_8)); + wr.flush(); + } + } + responseCode = connection.getResponseCode(); connection.getHeaderFields().entrySet().stream() .filter(entry -> entry.getKey() != null) @@ -215,4 +235,4 @@ public String getHeaderValue(final String name) { } return values.get(0); } -} \ No newline at end of file +} diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java index 11a2155323..9f44f30288 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java @@ -3,36 +3,48 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.NativeWebRequest; import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; +import org.zalando.nakadi.exceptions.runtime.SubscriptionPartitionConflictException; +import org.zalando.nakadi.exceptions.runtime.WrongStreamParametersException; import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.service.ClosedConnectionsCrutch; +import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.subscription.StreamParameters; import org.zalando.nakadi.service.subscription.SubscriptionOutput; import org.zalando.nakadi.service.subscription.SubscriptionStreamer; import org.zalando.nakadi.service.subscription.SubscriptionStreamerFactory; -import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.subscription.SubscriptionValidationService; import org.zalando.nakadi.util.FlowIdUtils; +import org.zalando.nakadi.view.UserStreamParameters; +import org.zalando.problem.MoreStatus; import org.zalando.problem.Problem; +import org.zalando.problem.spring.web.advice.Responses; import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.validation.Valid; import javax.ws.rs.core.Response; import java.io.IOException; import java.io.OutputStream; @@ -54,6 +66,7 @@ public class SubscriptionStreamController { private final BlacklistService blacklistService; private final MetricRegistry metricRegistry; private final SubscriptionDbRepository subscriptionDbRepository; + private final SubscriptionValidationService subscriptionValidationService; @Autowired public SubscriptionStreamController(final SubscriptionStreamerFactory subscriptionStreamerFactory, @@ -63,7 +76,8 @@ public SubscriptionStreamController(final SubscriptionStreamerFactory subscripti final NakadiSettings nakadiSettings, final BlacklistService blacklistService, @Qualifier("perPathMetricRegistry") final MetricRegistry metricRegistry, - final SubscriptionDbRepository subscriptionDbRepository) { + final SubscriptionDbRepository subscriptionDbRepository, + final SubscriptionValidationService subscriptionValidationService) { this.subscriptionStreamerFactory = subscriptionStreamerFactory; this.featureToggleService = featureToggleService; this.jsonMapper = objectMapper; @@ -72,6 +86,7 @@ public SubscriptionStreamController(final SubscriptionStreamerFactory subscripti this.blacklistService = blacklistService; this.metricRegistry = metricRegistry; this.subscriptionDbRepository = subscriptionDbRepository; + this.subscriptionValidationService = subscriptionValidationService; } private class SubscriptionOutputImpl implements SubscriptionOutput { @@ -105,8 +120,13 @@ public void onException(final Exception ex) { if (ex instanceof AccessDeniedException) { writeProblemResponse(response, out, Problem.valueOf(Response.Status.FORBIDDEN, ((AccessDeniedException) ex).explain())); - } - if (ex instanceof NakadiException) { + } else if (ex instanceof SubscriptionPartitionConflictException) { + writeProblemResponse(response, out, Problem.valueOf(Response.Status.CONFLICT, + ex.getMessage())); + } else if (ex instanceof WrongStreamParametersException) { + writeProblemResponse(response, out, Problem.valueOf(MoreStatus.UNPROCESSABLE_ENTITY, + ex.getMessage())); + } else if (ex instanceof NakadiException) { writeProblemResponse(response, out, ((NakadiException) ex).asProblem()); } else { writeProblemResponse(response, out, Problem.valueOf(Response.Status.SERVICE_UNAVAILABLE, @@ -127,19 +147,47 @@ public OutputStream getOutputStream() { } } + @RequestMapping(value = "/subscriptions/{subscription_id}/events", method = RequestMethod.POST) + public StreamingResponseBody streamEvents( + @PathVariable("subscription_id") final String subscriptionId, + @Valid @RequestBody final UserStreamParameters userParameters, + final HttpServletRequest request, + final HttpServletResponse response, + final Client client) { + + final StreamParameters streamParameters = StreamParameters.of(userParameters, + nakadiSettings.getDefaultCommitTimeoutSeconds(), client); + + return stream(subscriptionId, request, response, client, streamParameters); + } + @RequestMapping(value = "/subscriptions/{subscription_id}/events", method = RequestMethod.GET) public StreamingResponseBody streamEvents( @PathVariable("subscription_id") final String subscriptionId, - @RequestParam(value = "max_uncommitted_events", required = false, defaultValue = "10") - final int maxUncommittedSize, - @RequestParam(value = "batch_limit", required = false, defaultValue = "1") final int batchLimit, + @Nullable @RequestParam(value = "max_uncommitted_events", required = false) final Integer + maxUncommittedEvents, + @Nullable @RequestParam(value = "batch_limit", required = false) final Integer batchLimit, @Nullable @RequestParam(value = "stream_limit", required = false) final Long streamLimit, - @RequestParam(value = "batch_flush_timeout", required = false, defaultValue = "30") final int batchTimeout, + @Nullable @RequestParam(value = "batch_flush_timeout", required = false) final Integer batchTimeout, @Nullable @RequestParam(value = "stream_timeout", required = false) final Long streamTimeout, - @Nullable - @RequestParam(value = "stream_keep_alive_limit", required = false) final Integer streamKeepAliveLimit, - final HttpServletRequest request, final HttpServletResponse response, final Client client) - throws IOException { + @Nullable @RequestParam(value = "stream_keep_alive_limit", required = false) final Integer + streamKeepAliveLimit, + final HttpServletRequest request, final HttpServletResponse response, final Client client) { + + final UserStreamParameters userParameters = new UserStreamParameters(batchLimit, streamLimit, batchTimeout, + streamTimeout, streamKeepAliveLimit, maxUncommittedEvents, ImmutableList.of()); + + final StreamParameters streamParameters = StreamParameters.of(userParameters, + nakadiSettings.getDefaultCommitTimeoutSeconds(), client); + + return stream(subscriptionId, request, response, client, streamParameters); + } + + private StreamingResponseBody stream(final String subscriptionId, + final HttpServletRequest request, + final HttpServletResponse response, + final Client client, + final StreamParameters streamParameters) { final String flowId = FlowIdUtils.peek(); return outputStream -> { @@ -165,10 +213,9 @@ public StreamingResponseBody streamEvents( return; } - final StreamParameters streamParameters = StreamParameters.of(batchLimit, streamLimit, batchTimeout, - streamTimeout, streamKeepAliveLimit, maxUncommittedSize, - nakadiSettings.getDefaultCommitTimeoutSeconds(), client); final Subscription subscription = subscriptionDbRepository.getSubscription(subscriptionId); + subscriptionValidationService.validatePartitionsToStream(subscription, + streamParameters.getPartitions()); streamer = subscriptionStreamerFactory.build(subscription, streamParameters, output, connectionReady, blacklistService); @@ -194,4 +241,11 @@ private void writeProblemResponse(final HttpServletResponse response, jsonMapper.writer().writeValue(outputStream, problem); } + @ExceptionHandler(WrongStreamParametersException.class) + public ResponseEntity invalidEventTypeException(final WrongStreamParametersException exception, + final NativeWebRequest request) { + LOG.debug(exception.getMessage(), exception); + return Responses.create(MoreStatus.UNPROCESSABLE_ENTITY, exception.getMessage(), request); + } + } diff --git a/src/main/java/org/zalando/nakadi/domain/EventTypePartition.java b/src/main/java/org/zalando/nakadi/domain/EventTypePartition.java index d9ae99fbb3..e6fa4ee3c8 100644 --- a/src/main/java/org/zalando/nakadi/domain/EventTypePartition.java +++ b/src/main/java/org/zalando/nakadi/domain/EventTypePartition.java @@ -1,17 +1,24 @@ package org.zalando.nakadi.domain; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; import javax.annotation.concurrent.Immutable; +import javax.validation.constraints.NotNull; @Immutable public class EventTypePartition { + @NotNull private final String eventType; + @NotNull private final String partition; - public EventTypePartition(final String eventType, final String partition) { + @JsonCreator + public EventTypePartition(@JsonProperty("event_type") final String eventType, + @JsonProperty("partition") final String partition) { this.eventType = eventType; this.partition = partition; } diff --git a/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java b/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java index 858a54107a..212864546a 100644 --- a/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java +++ b/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; @@ -31,21 +32,46 @@ public List getPartitions() { @Immutable public static class Partition { + + public enum AssignmentType { + AUTO("auto"), + DIRECT("direct"); + + private final String description; + + AssignmentType(final String description) { + this.description = description; + } + + @JsonValue + public String getDescription() { + return description; + } + } + private final String partition; + private final String state; + @JsonInclude(JsonInclude.Include.NON_NULL) private final Long unconsumedEvents; + private final String streamId; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final AssignmentType assignmentType; + public Partition( @JsonProperty("partition") final String partition, @JsonProperty("state") final String state, @JsonProperty("unconsumed_events") @Nullable final Long unconsumedEvents, - @JsonProperty("stream_id") final String streamId) { + @JsonProperty("stream_id") final String streamId, + @JsonProperty("assignment_type") @Nullable final AssignmentType assignmentType) { this.partition = partition; this.state = state; this.unconsumedEvents = unconsumedEvents; this.streamId = streamId; + this.assignmentType = assignmentType; } public String getPartition() { @@ -64,5 +90,9 @@ public Long getUnconsumedEvents() { public String getStreamId() { return streamId; } + + public AssignmentType getAssignmentType() { + return assignmentType; + } } } diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/RebalanceConflictException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/RebalanceConflictException.java new file mode 100644 index 0000000000..60d29c255b --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/RebalanceConflictException.java @@ -0,0 +1,9 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class RebalanceConflictException extends MyNakadiRuntimeException1 { + + public RebalanceConflictException(final String msg) { + super(msg); + } + +} diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/SubscriptionPartitionConflictException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/SubscriptionPartitionConflictException.java new file mode 100644 index 0000000000..1eca7c8571 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/SubscriptionPartitionConflictException.java @@ -0,0 +1,23 @@ +package org.zalando.nakadi.exceptions.runtime; + +import org.zalando.nakadi.domain.EventTypePartition; + +import java.util.List; +import java.util.stream.Collectors; + +public class SubscriptionPartitionConflictException extends MyNakadiRuntimeException1 { + + private SubscriptionPartitionConflictException(final String msg) { + super(msg); + } + + public static SubscriptionPartitionConflictException of(final List conflictPartitions) { + final String partitionsDesc = conflictPartitions.stream() + .map(EventTypePartition::toString) + .collect(Collectors.joining(",")); + + return new SubscriptionPartitionConflictException("The following partitions are already requested by other " + + "stream(s) of this subscription: " + partitionsDesc); + } + +} diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/WrongStreamParametersException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/WrongStreamParametersException.java new file mode 100644 index 0000000000..b95549c8c7 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/WrongStreamParametersException.java @@ -0,0 +1,8 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class WrongStreamParametersException extends MyNakadiRuntimeException1 { + + public WrongStreamParametersException(final String message) { + super(message); + } +} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/IdleStreamWatcher.java b/src/main/java/org/zalando/nakadi/service/subscription/IdleStreamWatcher.java new file mode 100644 index 0000000000..654168c01e --- /dev/null +++ b/src/main/java/org/zalando/nakadi/service/subscription/IdleStreamWatcher.java @@ -0,0 +1,36 @@ +package org.zalando.nakadi.service.subscription; + +public class IdleStreamWatcher { + + private final long idleCloseTimeout; + + private long idleStartMillis; + private boolean isIdle; + + public IdleStreamWatcher(final long idleCloseTimeout) { + this.idleCloseTimeout = idleCloseTimeout; + } + + public long getIdleCloseTimeout() { + return idleCloseTimeout; + } + + public boolean idleStart() { + if (!isIdle) { + isIdle = true; + idleStartMillis = System.currentTimeMillis(); + return true; // just started to be idle + } else { + return false; // was already idle + } + } + + public void idleEnd() { + isIdle = false; + } + + public boolean isIdleForToolLong() { + final long currentMillis = System.currentTimeMillis(); + return isIdle && currentMillis >= idleStartMillis + idleCloseTimeout; + } +} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java index 572127a6dd..3f55488190 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java @@ -1,10 +1,12 @@ package org.zalando.nakadi.service.subscription; -import org.zalando.nakadi.exceptions.UnprocessableEntityException; +import org.zalando.nakadi.domain.EventTypePartition; +import org.zalando.nakadi.exceptions.runtime.WrongStreamParametersException; import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.EventStreamConfig; +import org.zalando.nakadi.view.UserStreamParameters; -import javax.annotation.Nullable; +import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -41,25 +43,25 @@ public class StreamParameters { private final Client consumingClient; - private StreamParameters( - final int batchLimitEvents, @Nullable final Long streamLimitEvents, final long batchTimeoutMillis, - @Nullable final Long streamTimeoutSeconds, @Nullable final Integer batchKeepAliveIterations, - final int maxUncommittedMessages, final long commitTimeoutMillis, final Client consumingClient) - throws UnprocessableEntityException { - if (batchLimitEvents > 0) { - this.batchLimitEvents = batchLimitEvents; - } else { - throw new UnprocessableEntityException("batch_limit can't be lower than 1"); + private final List partitions; + + private StreamParameters(final UserStreamParameters userParameters, final long commitTimeoutMillis, + final Client consumingClient) throws WrongStreamParametersException { + + this.batchLimitEvents = userParameters.getBatchLimit().orElse(1); + if (batchLimitEvents <= 0) { + throw new WrongStreamParametersException("batch_limit can't be lower than 1"); } - this.streamLimitEvents = Optional.ofNullable(streamLimitEvents).filter(v -> v != 0); - this.batchTimeoutMillis = batchTimeoutMillis; + this.streamLimitEvents = userParameters.getStreamLimit().filter(v -> v != 0); + this.batchTimeoutMillis = TimeUnit.SECONDS.toMillis(userParameters.getBatchFlushTimeout().orElse(30)); this.streamTimeoutMillis = TimeUnit.SECONDS.toMillis( - Optional.ofNullable(streamTimeoutSeconds) + userParameters.getStreamTimeout() .filter(timeout -> timeout > 0 && timeout <= EventStreamConfig.MAX_STREAM_TIMEOUT) .orElse((long) EventStreamConfig.generateDefaultStreamTimeout())); - this.batchKeepAliveIterations = Optional.ofNullable(batchKeepAliveIterations); - this.maxUncommittedMessages = maxUncommittedMessages; - this.commitTimeoutMillis = commitTimeoutMillis; + this.maxUncommittedMessages = userParameters.getMaxUncommittedEvents().orElse(10); + this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit(); + this.partitions = userParameters.getPartitions(); + this.commitTimeoutMillis = TimeUnit.SECONDS.toMillis(commitTimeoutMillis); this.consumingClient = consumingClient; } @@ -79,23 +81,14 @@ public Client getConsumingClient() { return consumingClient; } - public static StreamParameters of( - final int batchLimitEvents, - @Nullable final Long streamLimitEvents, - final long batchTimeoutSeconds, - @Nullable final Long streamTimeoutSeconds, - @Nullable final Integer batchKeepAliveIterations, - final int maxUncommittedMessages, - final long commitTimeoutSeconds, - final Client client) throws UnprocessableEntityException { - return new StreamParameters( - batchLimitEvents, - streamLimitEvents, - TimeUnit.SECONDS.toMillis(batchTimeoutSeconds), - streamTimeoutSeconds, - batchKeepAliveIterations, - maxUncommittedMessages, - TimeUnit.SECONDS.toMillis(commitTimeoutSeconds), - client); + public List getPartitions() { + return partitions; } + + public static StreamParameters of(final UserStreamParameters userStreamParameters, + final long commitTimeoutSeconds, + final Client client) throws WrongStreamParametersException { + return new StreamParameters(userStreamParameters, commitTimeoutSeconds, client); + } + } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancer.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancer.java similarity index 58% rename from src/main/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancer.java rename to src/main/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancer.java index baee1172f0..94d09db8b4 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancer.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancer.java @@ -1,5 +1,8 @@ package org.zalando.nakadi.service.subscription; +import com.google.common.collect.Lists; +import org.zalando.nakadi.domain.EventTypePartition; +import org.zalando.nakadi.exceptions.runtime.RebalanceConflictException; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; @@ -8,15 +11,61 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -class ExactWeightRebalancer implements BiFunction, Partition[], Partition[]> { +class SubscriptionRebalancer implements BiFunction, Partition[], Partition[]> { + @Override public Partition[] apply(final Collection sessions, final Partition[] currentPartitions) { + + final List activeSessions = sessions.stream() + .map(Session::getId) + .collect(Collectors.toList()); + final List partitionsLeft = Lists.newArrayList(currentPartitions); + final List changedPartitions = new ArrayList<>(); + + final List sessionsWithSpecifiedPartitions = sessions.stream() + .filter(s -> !s.getRequestedPartitions().isEmpty()) + .collect(Collectors.toList()); + + // go through all sessions that directly requested partitions to stream + for (final Session session : sessionsWithSpecifiedPartitions) { + for (final EventTypePartition requestedPartition : session.getRequestedPartitions()) { + + // find a partition that is requested and assign it to a session that requests it + final Partition partition = partitionsLeft.stream() + .filter(p -> p.getKey().equals(requestedPartition)) + .findFirst() + .orElseThrow(() -> new RebalanceConflictException( + "Two existing sessions request the same partition: " + requestedPartition)); + partitionsLeft.remove(partition); + + // if this partition is not assigned to this session - move it + if (!session.getId().equals(partition.getSession())) { + final Partition movedPartition = partition.moveToSessionId(session.getId(), activeSessions); + changedPartitions.add(movedPartition); + } + } + } + + // for the rest of partitions/sessions perform a rebalance based on partitions count + final List autoBalanceSessions = sessions.stream() + .filter(s -> s.getRequestedPartitions().isEmpty()) + .collect(Collectors.toList()); + + if (!autoBalanceSessions.isEmpty() && !partitionsLeft.isEmpty()) { + final Partition[] partitionsChangedByAutoRebalance = rebalanceByWeight( + autoBalanceSessions, + partitionsLeft.toArray(new Partition[partitionsLeft.size()])); + changedPartitions.addAll(Arrays.asList(partitionsChangedByAutoRebalance)); + } + return changedPartitions.toArray(new Partition[changedPartitions.size()]); + } + + private Partition[] rebalanceByWeight(final Collection sessions, final Partition[] currentPartitions) { final Map activeSessionWeights = sessions.stream() .collect(Collectors.toMap(Session::getId, Session::getWeight)); // sorted session ids. @@ -28,15 +77,15 @@ public Partition[] apply(final Collection sessions, final Partition[] c activeSessionIds.stream().mapToInt(activeSessionWeights::get).toArray()); // Stage 1. Select partitions that are not assigned to any EXISTING session. - final Set toRebalance = Stream.of(currentPartitions) + final List toRebalance = Stream.of(currentPartitions) .filter(p -> p.mustBeRebalanced(activeSessionIds)) - .collect(Collectors.toSet()); + .collect(Collectors.toList()); // State 2. Remove partitions from sessions that have too many of them. // 2.1. collect information per session. final Map> partitions = Stream.of(currentPartitions) .filter(p -> !toRebalance.contains(p)) - .collect(Collectors.groupingBy(Partition::getSessionOrNextSession)); + .collect(Collectors.groupingBy(Partition::getEffectiveSession)); // 2.2. Remove for (int idx = 0; idx < activeSessionIds.size(); ++idx) { final String sessionId = activeSessionIds.get(idx); @@ -45,8 +94,9 @@ public Partition[] apply(final Collection sessions, final Partition[] c while (toTake > 0) { final List candidates = partitions.get(sessionId); final Partition toTakeItem = candidates.stream() - .filter(p -> p.getState() == Partition.State.REASSIGNING).findAny().orElse( - candidates.get(candidates.size() - 1)); + .filter(p -> p.getState() == Partition.State.REASSIGNING) + .findAny() + .orElse(candidates.get(candidates.size() - 1)); candidates.remove(toTakeItem); toRebalance.add(toTakeItem); toTake -= 1; diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index 2765b64fc1..2c17420728 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -305,8 +305,13 @@ private List loadStats( .map(node -> node.guessStream(stat.getTimeline().getEventType(), stat.getPartition())) .orElse(""); + final SubscriptionEventTypeStats.Partition.AssignmentType assignmentType = subscriptionNode + .map(node -> node.getPartitionAssignmentType( + stat.getTimeline().getEventType(), stat.getPartition())) + .orElse(null); + resultPartitions.add(new SubscriptionEventTypeStats.Partition( - lastPosition.getPartition(), state.getDescription(), distance, streamId)); + lastPosition.getPartition(), state.getDescription(), distance, streamId, assignmentType)); } resultPartitions.sort(Comparator.comparing(SubscriptionEventTypeStats.Partition::getPartition)); result.add(new SubscriptionEventTypeStats(eventType.getName(), resultPartitions)); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java index f9f4b40c3d..e6249fcea3 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java @@ -85,7 +85,7 @@ public SubscriptionStreamer build( final AtomicBoolean connectionReady, final BlacklistService blacklistService) throws InternalNakadiException, NoSuchEventTypeException { - final Session session = Session.generate(1); + final Session session = Session.generate(1, streamParameters.getPartitions()); final String loggingPath = "subscription." + subscription.getId() + "." + session.getId(); // Create streaming context return new StreamingContext.Builder() @@ -95,7 +95,7 @@ public SubscriptionStreamer build( .setSession(session) .setTimer(executorService) .setZkClient(zkClientFactory.createClient(subscription, loggingPath)) - .setRebalancer(new ExactWeightRebalancer()) + .setRebalancer(new SubscriptionRebalancer()) .setKafkaPollTimeout(kafkaPollTimeout) .setLoggingPath(loggingPath) .setConnectionReady(connectionReady) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionValidationService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionValidationService.java index 30acd4258a..a4336e1ec8 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionValidationService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionValidationService.java @@ -7,6 +7,7 @@ import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.NakadiCursor; +import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionBase; import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.exceptions.InvalidCursorException; @@ -16,11 +17,13 @@ import org.zalando.nakadi.exceptions.runtime.RepositoryProblemException; import org.zalando.nakadi.exceptions.runtime.TooManyPartitionsException; import org.zalando.nakadi.exceptions.runtime.WrongInitialCursorsException; +import org.zalando.nakadi.exceptions.runtime.WrongStreamParametersException; import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -57,13 +60,8 @@ public void validateSubscription(final SubscriptionBase subscription) final Map> eventTypesOrNone = getSubscriptionEventTypesOrNone(subscription); checkEventTypesExist(eventTypesOrNone); - final List eventTypes = eventTypesOrNone.values().stream() - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList()); - // check that maximum number of partitions is not exceeded - final List allPartitions = getAllPartitions(eventTypes); + final List allPartitions = getAllPartitions(subscription.getEventTypes()); if (allPartitions.size() > maxSubscriptionPartitions) { final String message = String.format( "total partition count for subscription is %d, but the maximum partition count is %d", @@ -77,6 +75,26 @@ public void validateSubscription(final SubscriptionBase subscription) } } + public void validatePartitionsToStream(final Subscription subscription, final List partitions) { + // check for duplicated partitions + final long uniquePartitions = partitions.stream().distinct().count(); + if (uniquePartitions < partitions.size()) { + throw new WrongStreamParametersException("Duplicated partition specified"); + } + // check that partitions belong to subscription + final List allPartitions = getAllPartitions(subscription.getEventTypes()); + final List wrongPartitions = partitions.stream() + .filter(p -> !allPartitions.contains(p)) + .collect(Collectors.toList()); + if (!wrongPartitions.isEmpty()) { + final String wrongPartitionsDesc = wrongPartitions.stream() + .map(EventTypePartition::toString) + .collect(Collectors.joining(", ")); + throw new WrongStreamParametersException("Wrong partitions specified - some partitions don't belong to " + + "subscription: " + wrongPartitionsDesc); + } + } + private void validateInitialCursors(final SubscriptionBase subscription, final List allPartitions) throws WrongInitialCursorsException, RepositoryProblemException { @@ -116,7 +134,7 @@ private void validateInitialCursors(final SubscriptionBase subscription, } } - private List getAllPartitions(final List eventTypes) { + private List getAllPartitions(final Collection eventTypes) { return eventTypes.stream() .map(timelineService::getActiveTimeline) .flatMap(timeline -> timelineService.getTopicRepository(timeline) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java b/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java index c9852a8a7a..aeb0914c43 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java @@ -125,7 +125,7 @@ public String getNextSession() { @Nullable @JsonIgnore - public String getSessionOrNextSession() { + public String getEffectiveSession() { if (state == State.REASSIGNING) { return nextSession; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/model/Session.java b/src/main/java/org/zalando/nakadi/service/subscription/model/Session.java index d369205a5b..e5e3e002c8 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/model/Session.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/model/Session.java @@ -2,18 +2,29 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.zalando.nakadi.domain.EventTypePartition; +import java.util.List; import java.util.UUID; public class Session { + private final String id; private final int weight; + private final List requestedPartitions; @JsonCreator public Session(@JsonProperty("id") final String id, - @JsonProperty("weight") final int weight) { + @JsonProperty("weight") final int weight, + @JsonProperty("requested_partitions") final List requestedPartitions) { this.id = id; this.weight = weight; + this.requestedPartitions = requestedPartitions; + } + + public Session(final String id, final int weight) { + this(id, weight, ImmutableList.of()); } public String getId() { @@ -24,9 +35,13 @@ public int getWeight() { return weight; } + public List getRequestedPartitions() { + return requestedPartitions; + } + @Override public String toString() { - return "Session{" + id + ", weight=" + weight + '}'; + return "Session{" + id + ", weight=" + weight + ", requestedPartitions=" + requestedPartitions + "}"; } @Override @@ -39,17 +54,20 @@ public boolean equals(final Object o) { } final Session session = (Session) o; - return weight == session.getWeight() && id.equals(session.getId()); + return weight == session.getWeight() + && id.equals(session.getId()) + && requestedPartitions.equals(session.getRequestedPartitions()); } @Override public int hashCode() { int result = id.hashCode(); result = 31 * result + weight; + result = 31 * result + requestedPartitions.hashCode(); return result; } - public static Session generate(final int weight) { - return new Session(UUID.randomUUID().toString(), weight); + public static Session generate(final int weight, final List requestedPartitions) { + return new Session(UUID.randomUUID().toString(), weight, requestedPartitions); } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java index 5ae73a3b7b..9fed3372b5 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java @@ -1,5 +1,6 @@ package org.zalando.nakadi.service.subscription.state; +import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.PartitionEndStatistics; import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.Subscription; @@ -10,14 +11,17 @@ import org.zalando.nakadi.exceptions.NoStreamingSlotsAvailable; import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; +import org.zalando.nakadi.exceptions.runtime.SubscriptionPartitionConflictException; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.subscription.model.Partition; +import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; import javax.ws.rs.core.Response; import java.io.IOException; +import java.util.Collection; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -42,7 +46,7 @@ protected Response.StatusType getStatus() { })); return; } - getZk().runLocked(this::createSubscriptionLocked); + getZk().runLocked(this::initializeStream); } /** @@ -54,13 +58,36 @@ protected Response.StatusType getStatus() { *

* 4. Switches to streaming state. */ - private void createSubscriptionLocked() { + private void initializeStream() { final boolean subscriptionJustInitialized = initializeSubscriptionLocked(getZk(), getContext().getSubscription(), getContext().getTimelineService(), getContext().getCursorConverter()); if (!subscriptionJustInitialized) { + // check if there are streaming slots available + final Collection sessions = getZk().listSessions(); final Partition[] partitions = getZk().getTopology().getPartitions(); - if (getZk().listSessions().size() >= partitions.length) { - switchState(new CleanupState(new NoStreamingSlotsAvailable(partitions.length))); + final List requestedPartitions = getContext().getParameters().getPartitions(); + if (requestedPartitions.isEmpty()) { + final long directlyRequestedPartitionsCount = sessions.stream() + .flatMap(s -> s.getRequestedPartitions().stream()) + .distinct() + .count(); + final long autoSlotsCount = partitions.length - directlyRequestedPartitionsCount; + final long autoBalanceSessionsCount = sessions.stream() + .filter(s -> s.getRequestedPartitions().isEmpty()) + .count(); + if (autoBalanceSessionsCount >= autoSlotsCount) { + switchState(new CleanupState(new NoStreamingSlotsAvailable(partitions.length))); + return; + } + } + + // check if the requested partitions are not directly requested by another stream(s) + final List conflictPartitions = sessions.stream() + .flatMap(s -> s.getRequestedPartitions().stream()) + .filter(requestedPartitions::contains) + .collect(Collectors.toList()); + if (!conflictPartitions.isEmpty()) { + switchState(new CleanupState(SubscriptionPartitionConflictException.of(conflictPartitions))); return; } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 0db8fafe2b..372b765896 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -18,6 +18,7 @@ import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.NakadiKpiPublisher; +import org.zalando.nakadi.service.subscription.IdleStreamWatcher; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.zk.ZkSubscription; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; @@ -61,6 +62,7 @@ class StreamingState extends State { // Uncommitted offsets are calculated right on exiting from Streaming state. private Map uncommittedOffsets; private Closeable cursorResetSubscription; + private IdleStreamWatcher idleStreamWatcher; /** * Time that is used for commit timeout check. Commit timeout check is working only in case when there is something @@ -83,6 +85,8 @@ public void onEnter() { kpiDataPerEventType = this.getContext().getSubscription().getEventTypes().stream() .collect(Collectors.toMap(et -> et, et -> new StreamKpiData())); + idleStreamWatcher = new IdleStreamWatcher(getParameters().commitTimeoutMillis * 2); + this.eventConsumer = getContext().getTimelineService().createEventConsumer(null); recreateTopologySubscription(); @@ -375,6 +379,7 @@ void reactOnTopologyChange() { .filter(p -> getSessionId().equals(p.getSession())) .toArray(Partition[]::new); addTask(() -> refreshTopologyUnlocked(assignedPartitions)); + trackIdleness(topology); } void recheckTopology() { @@ -640,4 +645,27 @@ private void removeFromStreaming(final EventTypePartition key) { } } } + + /** + * If stream doesn't have any partitions - start timer that will close this session + * in commitTimeout*2 if it doesn't get any partitions during that time + * @param topology the new topology + */ + private void trackIdleness(final ZkSubscriptionClient.Topology topology) { + final boolean hasAnyAssignment = Stream.of(topology.getPartitions()) + .anyMatch(p -> getSessionId().equals(p.getSession()) || getSessionId().equals(p.getNextSession())); + if (hasAnyAssignment) { + idleStreamWatcher.idleEnd(); + } else { + final boolean justSwitchedToIdle = idleStreamWatcher.idleStart(); + if (justSwitchedToIdle) { + scheduleTask(() -> { + if (idleStreamWatcher.isIdleForToolLong()) { + shutdownGracefully("There are no available partitions to read"); + } + }, idleStreamWatcher.getIdleCloseTimeout(), TimeUnit.MILLISECONDS); + } + } + } + } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index 54fe153f6d..0296f9689d 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -175,8 +175,8 @@ public final void registerSession(final Session session) { getLog().info("Registering session " + session); try { final String clientPath = getSubscriptionPath("/sessions/" + session.getId()); - getCurator().create().withMode(CreateMode.EPHEMERAL).forPath(clientPath, - String.valueOf(session.getWeight()).getBytes(UTF_8)); + final byte[] sessionData = serializeSession(session); + getCurator().create().withMode(CreateMode.EPHEMERAL).forPath(clientPath, sessionData); } catch (final Exception e) { throw new NakadiRuntimeException(e); } @@ -257,13 +257,8 @@ public final Collection listSessions() return loadDataAsync( zkSessions, key -> getSubscriptionPath("/sessions/" + key), - (key, data) -> { - try { - return deserializeSession(key, data); - } catch (final IOException e) { - throw new NakadiRuntimeException(e); - } - }).values(); + this::deserializeSession + ).values(); } @Override @@ -469,5 +464,7 @@ protected abstract byte[] createTopologyAndOffsets(Collection subscribeForTopologyChanges(final Runnable getSubscriptionPath(NODE_TOPOLOGY)); } + protected byte[] serializeSession(final Session session) + throws NakadiRuntimeException { + try { + return objectMapper.writeValueAsBytes(session); + } catch (final JsonProcessingException e) { + throw new NakadiRuntimeException(e); + } + } + + protected Session deserializeSession(final String sessionId, final byte[] sessionZkData) + throws NakadiRuntimeException { + try { + // old version of session: zkNode data is session weight + final int weight = Integer.parseInt(new String(sessionZkData, UTF_8)); + return new Session(sessionId, weight, ImmutableList.of()); + } catch (final NumberFormatException nfe) { + // new version of session: zkNode data is session object as json + try { + return objectMapper.readValue(sessionZkData, Session.class); + } catch (final IOException e) { + throw new NakadiRuntimeException(e); + } + } + } protected String getOffsetPath(final EventTypePartition etp) { return getSubscriptionPath("/offsets/" + etp.getEventType() + "/" + etp.getPartition()); @@ -148,17 +174,6 @@ public Map getOffsets( new SubscriptionCursorWithoutToken(etp.getEventType(), etp.getPartition(), new String(value, UTF_8))); } - protected Session deserializeSession(final String sessionId, final byte[] sessionZkData) throws IOException { - try { - // old version of session: zkNode data is session weight - final int weight = Integer.parseInt(new String(sessionZkData, UTF_8)); - return new Session(sessionId, weight); - } catch (final NumberFormatException nfe) { - // new version of session: zkNode data is session object as json - return objectMapper.readValue(sessionZkData, Session.class); - } - } - @Override public void transfer(final String sessionId, final Collection partitions) throws NakadiRuntimeException, SubscriptionNotInitializedException { diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java index f6cda8af99..62c2da4649 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java @@ -30,7 +30,7 @@ public interface ZkSubscriptionClient { /** * Makes runLocked on subscription, using zk path /nakadi/locks/subscription_{subscriptionId} - * Lock is created as an ephemeral node, so it will be deleted if nakadi go down. After obtaining runLocked, + * Lock is created as an ephemeral node, so it will be deleted if nakadi goes down. After obtaining runLocked, * provided function will be called under subscription runLocked * * @param function Function to call in context of runLocked. diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java index 7778cc464d..52d80f80d0 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java @@ -1,5 +1,6 @@ package org.zalando.nakadi.service.subscription.zk; +import org.zalando.nakadi.domain.SubscriptionEventTypeStats; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; @@ -7,6 +8,9 @@ import java.util.Collection; import java.util.Optional; +import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.AUTO; +import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.DIRECT; + public final class ZkSubscriptionNode { private final Collection partitions; @@ -41,4 +45,15 @@ public String guessStream(final String eventType, final String partition) { .orElse(null); } + @Nullable + public SubscriptionEventTypeStats.Partition.AssignmentType getPartitionAssignmentType(final String eventType, + final String partition) { + return partitions.stream() + .filter(p -> p.getPartition().equals(partition) && p.getEventType().equals(eventType)) + .flatMap(p -> sessions.stream().filter(s -> s.getId().equalsIgnoreCase(p.getSession()))) + .findAny() + .map(s -> s.getRequestedPartitions().isEmpty() ? AUTO : DIRECT) + .orElse(null); + } + } diff --git a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java index d1f76f33c6..017cfd167e 100644 --- a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java +++ b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java @@ -191,9 +191,8 @@ public List getAllTimelinesOrdered(final String eventType) return eventTypeCache.getTimelinesOrdered(eventType); } - public Timeline getActiveTimeline(final EventTypeBase eventType) throws TimelineException { + public Timeline getActiveTimeline(final String eventTypeName) throws TimelineException { try { - final String eventTypeName = eventType.getName(); final List timelines = eventTypeCache.getTimelinesOrdered(eventTypeName); final ListIterator rIterator = timelines.listIterator(timelines.size()); while (rIterator.hasPrevious()) { @@ -205,11 +204,15 @@ public Timeline getActiveTimeline(final EventTypeBase eventType) throws Timeline throw new TimelineException(String.format("No timelines for event type %s", eventTypeName)); } catch (final NakadiException e) { - LOG.error("Failed to get timeline for event type {}", eventType.getName(), e); + LOG.error("Failed to get timeline for event type {}", eventTypeName, e); throw new TimelineException("Failed to get timeline", e); } } + public Timeline getActiveTimeline(final EventTypeBase eventType) throws TimelineException { + return getActiveTimeline(eventType.getName()); + } + public TopicRepository getTopicRepository(final Storage storage) { return topicRepositoryHolder.getTopicRepository(storage); } diff --git a/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java b/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java new file mode 100644 index 0000000000..879e960d68 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java @@ -0,0 +1,72 @@ +package org.zalando.nakadi.view; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.zalando.nakadi.domain.EventTypePartition; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Optional; + +public class UserStreamParameters { + + private final Optional batchLimit; + + private final Optional streamLimit; + + private final Optional batchFlushTimeout; + + private final Optional streamTimeout; + + private final Optional streamKeepAliveLimit; + + private final Optional maxUncommittedEvents; + + private final List partitions; + + @JsonCreator + public UserStreamParameters(@JsonProperty("batch_limit") @Nullable final Integer batchLimit, + @JsonProperty("stream_limit") @Nullable final Long streamLimit, + @JsonProperty("batch_flush_timeout") @Nullable final Integer batchFlushTimeout, + @JsonProperty("stream_timeout") @Nullable final Long streamTimeout, + @JsonProperty("stream_keep_alive_limit") @Nullable final Integer streamKeepAliveLimit, + @JsonProperty("max_uncommitted_events") @Nullable final Integer maxUncommittedEvents, + @JsonProperty("partitions") @Nullable final List partitions) { + this.batchLimit = Optional.ofNullable(batchLimit); + this.streamLimit = Optional.ofNullable(streamLimit); + this.batchFlushTimeout = Optional.ofNullable(batchFlushTimeout); + this.streamTimeout = Optional.ofNullable(streamTimeout); + this.streamKeepAliveLimit = Optional.ofNullable(streamKeepAliveLimit); + this.maxUncommittedEvents = Optional.ofNullable(maxUncommittedEvents); + this.partitions = partitions == null ? ImmutableList.of() : partitions; + } + + public Optional getBatchLimit() { + return batchLimit; + } + + public Optional getStreamLimit() { + return streamLimit; + } + + public Optional getBatchFlushTimeout() { + return batchFlushTimeout; + } + + public Optional getStreamTimeout() { + return streamTimeout; + } + + public Optional getStreamKeepAliveLimit() { + return streamKeepAliveLimit; + } + + public Optional getMaxUncommittedEvents() { + return maxUncommittedEvents; + } + + public List getPartitions() { + return partitions; + } +} diff --git a/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java b/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java index d7b5889e47..9175fb10ee 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/EventStreamControllerTest.java @@ -306,7 +306,7 @@ public void whenNoCursorsThenLatestOffsetsAreUsed() throws NakadiException, IOEx final List tps2 = ImmutableList.of( new KafkaPartitionStatistics(timeline, 0, 0, 87), new KafkaPartitionStatistics(timeline, 1, 0, 34)); - when(timelineService.getActiveTimeline(any())).thenReturn(timeline); + when(timelineService.getActiveTimeline(any(EventType.class))).thenReturn(timeline); when(topicRepositoryMock.loadTopicStatistics(eq(Collections.singletonList(timeline)))) .thenReturn(tps2); diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java index 48adabb6aa..c733356a9d 100644 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java @@ -11,6 +11,7 @@ import org.springframework.web.method.support.HandlerMethodArgumentResolver; import org.springframework.web.method.support.ModelAndViewContainer; import org.zalando.nakadi.config.NakadiSettings; +import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.EventTypeBase; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.ItemsWrapper; @@ -76,6 +77,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.springframework.test.web.servlet.setup.MockMvcBuilders.standaloneSetup; +import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.AUTO; import static org.zalando.nakadi.util.SubscriptionsUriHelper.createSubscriptionListUri; import static org.zalando.nakadi.utils.RandomSubscriptionBuilder.builder; import static org.zalando.nakadi.utils.TestUtils.buildTimelineWithTopic; @@ -110,7 +112,7 @@ public SubscriptionControllerTest() throws Exception { zkSubscriptionClient = mock(ZkSubscriptionClient.class); when(zkSubscriptionClientFactory.createClient(any(), any())).thenReturn(zkSubscriptionClient); timelineService = mock(TimelineService.class); - when(timelineService.getActiveTimeline(any())).thenReturn(TIMELINE); + when(timelineService.getActiveTimeline(any(EventType.class))).thenReturn(TIMELINE); when(timelineService.getTopicRepository((EventTypeBase) any())).thenReturn(topicRepository); when(timelineService.getTopicRepository((Timeline) any())).thenReturn(topicRepository); final NakadiSettings settings = mock(NakadiSettings.class); @@ -258,7 +260,8 @@ public void whenGetSubscriptionStatThenOk() throws Exception { final List expectedStats = Collections.singletonList(new SubscriptionEventTypeStats( TIMELINE.getEventType(), - Collections.singletonList(new SubscriptionEventTypeStats.Partition("0", "assigned", 10L, "xz"))) + Collections.singletonList( + new SubscriptionEventTypeStats.Partition("0", "assigned", 10L, "xz", AUTO))) ); getSubscriptionStats(subscription.getId()) diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionStreamControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionStreamControllerTest.java deleted file mode 100644 index eaa53330cd..0000000000 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionStreamControllerTest.java +++ /dev/null @@ -1,106 +0,0 @@ -package org.zalando.nakadi.controller; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.MetricRegistry; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; -import org.zalando.nakadi.config.JsonConfig; -import org.zalando.nakadi.config.NakadiSettings; -import org.zalando.nakadi.exceptions.InvalidCursorException; -import org.zalando.nakadi.exceptions.NakadiException; -import org.zalando.nakadi.repository.EventTypeRepository; -import org.zalando.nakadi.repository.db.SubscriptionDbRepository; -import org.zalando.nakadi.security.Client; -import org.zalando.nakadi.security.FullAccessClient; -import org.zalando.nakadi.service.AuthorizationValidator; -import org.zalando.nakadi.service.BlacklistService; -import org.zalando.nakadi.service.ClosedConnectionsCrutch; -import org.zalando.nakadi.service.EventTypeChangeListener; -import org.zalando.nakadi.service.subscription.SubscriptionStreamerFactory; -import org.zalando.nakadi.service.FeatureToggleService; -import org.zalando.nakadi.utils.JsonTestHelper; -import org.zalando.problem.Problem; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API; -import static org.zalando.problem.MoreStatus.UNPROCESSABLE_ENTITY; - -public class SubscriptionStreamControllerTest { - - private static final Client FULL_ACCESS_CLIENT = new FullAccessClient("clientId"); - - private HttpServletRequest requestMock; - private HttpServletResponse responseMock; - - private SubscriptionStreamController controller; - private JsonTestHelper jsonHelper; - - private SubscriptionStreamerFactory subscriptionStreamerFactory; - - private SubscriptionDbRepository subscriptionDbRepository; - private EventTypeRepository eventTypeRepository; - private AuthorizationValidator authorizationValidator; - private EventTypeChangeListener eventTypeChangeListener; - - @Before - public void setup() throws NakadiException, UnknownHostException, InvalidCursorException { - final ObjectMapper objectMapper = new JsonConfig().jacksonObjectMapper(); - jsonHelper = new JsonTestHelper(objectMapper); - - requestMock = mock(HttpServletRequest.class); - responseMock = mock(HttpServletResponse.class); - - final MetricRegistry metricRegistry = mock(MetricRegistry.class); - when(metricRegistry.counter(any())).thenReturn(mock(Counter.class)); - - final ClosedConnectionsCrutch crutch = mock(ClosedConnectionsCrutch.class); - when(crutch.listenForConnectionClose(requestMock)).thenReturn(new AtomicBoolean(true)); - - final BlacklistService blacklistService = Mockito.mock(BlacklistService.class); - Mockito.when(blacklistService.isSubscriptionConsumptionBlocked(any(String.class), any(String.class))) - .thenReturn(false); - - final FeatureToggleService featureToggleService = mock(FeatureToggleService.class); - when(featureToggleService.isFeatureEnabled(HIGH_LEVEL_API)).thenReturn(true); - - final NakadiSettings nakadiSettings = mock(NakadiSettings.class); - - subscriptionStreamerFactory = mock(SubscriptionStreamerFactory.class); - subscriptionDbRepository = mock(SubscriptionDbRepository.class); - eventTypeRepository = mock(EventTypeRepository.class); - authorizationValidator = mock(AuthorizationValidator.class); - eventTypeChangeListener = mock(EventTypeChangeListener.class); - - controller = new SubscriptionStreamController(subscriptionStreamerFactory, featureToggleService, objectMapper, - crutch, nakadiSettings, blacklistService, metricRegistry, subscriptionDbRepository); - } - - @Test - public void whenBatchLimitLowerThan1ThenUnprocessableEntity() throws Exception { - final StreamingResponseBody responseBody = controller.streamEvents("abc", 0, 0, null, 10, null, null, - requestMock, responseMock, FULL_ACCESS_CLIENT); - - final Problem expectedProblem = Problem.valueOf(UNPROCESSABLE_ENTITY, "batch_limit can't be lower than 1"); - assertThat(responseToString(responseBody), jsonHelper.matchesObject(expectedProblem)); - } - - protected String responseToString(final StreamingResponseBody responseBody) throws IOException { - final ByteArrayOutputStream out = new ByteArrayOutputStream(); - responseBody.writeTo(out); - return out.toString(); - } - -} diff --git a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java index 6a4c5be46a..2eaf3b178a 100644 --- a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java @@ -88,7 +88,7 @@ public EventPublisherTest() { Mockito.when(ts.getTopicRepository((Timeline) any())).thenReturn(topicRepository); Mockito.when(ts.getTopicRepository((EventTypeBase) any())).thenReturn(topicRepository); final Timeline timeline = Mockito.mock(Timeline.class); - Mockito.when(ts.getActiveTimeline(any())).thenReturn(timeline); + Mockito.when(ts.getActiveTimeline(any(EventType.class))).thenReturn(timeline); publisher = new EventPublisher(ts, cache, partitionResolver, enrichment, nakadiSettings, timelineSync, authzValidator); diff --git a/src/test/java/org/zalando/nakadi/service/SubscriptionValidationServiceTest.java b/src/test/java/org/zalando/nakadi/service/SubscriptionValidationServiceTest.java index e76a2e04db..0e9c645317 100644 --- a/src/test/java/org/zalando/nakadi/service/SubscriptionValidationServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/SubscriptionValidationServiceTest.java @@ -78,7 +78,7 @@ public void setUp() throws InternalNakadiException { final Timeline timeline = mock(Timeline.class); when(timeline.getTopic()).thenReturn(topicForET(et.getName())); when(timeline.getEventType()).thenReturn(et.getName()); - when(timelineService.getActiveTimeline(eq(et))).thenReturn(timeline); + when(timelineService.getActiveTimeline(eq(et.getName()))).thenReturn(timeline); } when(timelineService.getTopicRepository((Timeline) any())).thenReturn(topicRepository); when(timelineService.getTopicRepository((EventType) any())).thenReturn(topicRepository); diff --git a/src/test/java/org/zalando/nakadi/service/subscription/IdleStreamWatcherTest.java b/src/test/java/org/zalando/nakadi/service/subscription/IdleStreamWatcherTest.java new file mode 100644 index 0000000000..c0e4969871 --- /dev/null +++ b/src/test/java/org/zalando/nakadi/service/subscription/IdleStreamWatcherTest.java @@ -0,0 +1,25 @@ +package org.zalando.nakadi.service.subscription; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class IdleStreamWatcherTest { + + @Test + public void checkIdleStreamWatcher() throws InterruptedException { + final IdleStreamWatcher idleStreamWatcher = new IdleStreamWatcher(100); + + assertThat(idleStreamWatcher.isIdleForToolLong(), is(false)); // not idle at all + + idleStreamWatcher.idleStart(); + assertThat(idleStreamWatcher.isIdleForToolLong(), is(false)); // is idle not long enough + + Thread.sleep(120); + assertThat(idleStreamWatcher.isIdleForToolLong(), is(true)); // is idle for too long + + idleStreamWatcher.idleEnd(); + assertThat(idleStreamWatcher.isIdleForToolLong(), is(false)); // not idle at all + } +} diff --git a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java index cbd78c6814..1a1bc932d8 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java @@ -1,9 +1,12 @@ package org.zalando.nakadi.service.subscription; +import com.google.common.collect.ImmutableList; import org.junit.Test; import org.zalando.nakadi.exceptions.UnprocessableEntityException; +import org.zalando.nakadi.exceptions.runtime.WrongStreamParametersException; import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.EventStreamConfig; +import org.zalando.nakadi.view.UserStreamParameters; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -17,14 +20,14 @@ public class StreamParametersTest { - @Test(expected = UnprocessableEntityException.class) + @Test(expected = WrongStreamParametersException.class) public void whenBatchLimitLowerOrEqualToZeroTheException() throws Exception { - StreamParameters.of(0, null, 0, null, null, 0, 0, mock(Client.class)); + createStreamParameters(0, null, 0, null, null, 0, 0, mock(Client.class)); } @Test public void checkParamsAreTransformedCorrectly() throws Exception { - final StreamParameters streamParameters = StreamParameters.of(1, null, 10, 60L, null, 1000, 20, + final StreamParameters streamParameters = createStreamParameters(1, null, 10, 60L, null, 1000, 20, mock(Client.class)); assertThat(streamParameters.batchLimitEvents, equalTo(1)); @@ -36,14 +39,15 @@ public void checkParamsAreTransformedCorrectly() throws Exception { @Test public void whenStreamTimeoutOmittedThenItIsGenerated() throws Exception { - final StreamParameters streamParameters = StreamParameters.of(1, null, 0, null, null, 0, 0, mock(Client.class)); + final StreamParameters streamParameters = createStreamParameters(1, null, 0, null, null, 0, 0, + mock(Client.class)); checkStreamTimeoutIsGeneratedCorrectly(streamParameters); } @Test public void whenStreamTimeoutIsGreaterThanMaxThenItIsGenerated() throws Exception { - final StreamParameters streamParameters = StreamParameters.of(1, null, 0, + final StreamParameters streamParameters = createStreamParameters(1, null, 0, EventStreamConfig.MAX_STREAM_TIMEOUT + 1L, null, 0, 0, mock(Client.class)); checkStreamTimeoutIsGeneratedCorrectly(streamParameters); @@ -51,7 +55,8 @@ public void whenStreamTimeoutIsGreaterThanMaxThenItIsGenerated() throws Exceptio @Test public void checkIsStreamLimitReached() throws Exception { - final StreamParameters streamParameters = StreamParameters.of(1, 150L, 0, null, null, 0, 0, mock(Client.class)); + final StreamParameters streamParameters = createStreamParameters(1, 150L, 0, null, null, 0, 0, + mock(Client.class)); assertThat(streamParameters.isStreamLimitReached(140), is(false)); assertThat(streamParameters.isStreamLimitReached(151), is(true)); @@ -60,15 +65,16 @@ public void checkIsStreamLimitReached() throws Exception { @Test public void checkIsKeepAliveLimitReached() throws Exception { - final StreamParameters streamParameters = StreamParameters.of(1, null, 0, null, 5, 0, 0, mock(Client.class)); + final StreamParameters streamParameters = createStreamParameters(1, null, 0, null, 5, 0, 0, mock(Client.class)); assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 6, 12)), is(true)); assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 4, 12)), is(false)); } @Test - public void checkgetMessagesAllowedToSend() throws Exception { - final StreamParameters streamParameters = StreamParameters.of(1, 200L, 0, null, null, 0, 0, mock(Client.class)); + public void checkGetMessagesAllowedToSend() throws Exception { + final StreamParameters streamParameters = createStreamParameters(1, 200L, 0, null, null, 0, 0, + mock(Client.class)); assertThat(streamParameters.getMessagesAllowedToSend(50, 190), equalTo(10L)); assertThat(streamParameters.getMessagesAllowedToSend(50, 120), equalTo(50L)); @@ -80,4 +86,18 @@ private void checkStreamTimeoutIsGeneratedCorrectly(final StreamParameters strea assertThat(streamParameters.streamTimeoutMillis, greaterThanOrEqualTo( TimeUnit.SECONDS.toMillis((long) EventStreamConfig.MAX_STREAM_TIMEOUT - 1200))); } + + public static StreamParameters createStreamParameters(final int batchLimitEvents, + final Long streamLimitEvents, + final int batchTimeoutSeconds, + final Long streamTimeoutSeconds, + final Integer batchKeepAliveIterations, + final int maxUncommittedMessages, + final long commitTimeoutSeconds, + final Client client) throws UnprocessableEntityException { + final UserStreamParameters userParams = new UserStreamParameters(batchLimitEvents, streamLimitEvents, + batchTimeoutSeconds, streamTimeoutSeconds, batchKeepAliveIterations, maxUncommittedMessages, + ImmutableList.of()); + return StreamParameters.of(userParams, commitTimeoutSeconds, client); + } } diff --git a/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java b/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java index 66166dd4ee..e6076265e6 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java @@ -1,5 +1,6 @@ package org.zalando.nakadi.service.subscription; +import com.google.common.collect.ImmutableList; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -37,7 +38,7 @@ public OutputStream getOutputStream() { return new StreamingContext.Builder() .setOut(output) .setParameters(null) - .setSession(Session.generate(1)) + .setSession(Session.generate(1, ImmutableList.of())) .setTimer(null) .setZkClient(null) .setRebalancer(null) @@ -154,4 +155,4 @@ public void onEnter() { Mockito.verify(ctxSpy).unregisterSession(); Mockito.verify(ctxSpy).switchState(Mockito.isA(DummyState.class)); } -} \ No newline at end of file +} diff --git a/src/test/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancerTest.java b/src/test/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancerTest.java similarity index 51% rename from src/test/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancerTest.java rename to src/test/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancerTest.java index f9ee2154bf..123fcfb0d9 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancerTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancerTest.java @@ -1,14 +1,15 @@ package org.zalando.nakadi.service.subscription; +import com.google.common.collect.ImmutableList; import org.junit.Test; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; -import java.util.Arrays; import java.util.List; import java.util.stream.Stream; +import static com.google.common.collect.Sets.newHashSet; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.emptyArray; import static org.junit.Assert.assertArrayEquals; @@ -17,44 +18,120 @@ import static org.junit.Assert.assertTrue; import static org.zalando.nakadi.service.subscription.model.Partition.State.ASSIGNED; import static org.zalando.nakadi.service.subscription.model.Partition.State.REASSIGNING; +import static org.zalando.nakadi.service.subscription.model.Partition.State.UNASSIGNED; -public class ExactWeightRebalancerTest { +public class SubscriptionRebalancerTest { @Test(expected = IllegalArgumentException.class) public void splitByWeightShouldAcceptOnlyCorrectData1() { - ExactWeightRebalancer.splitByWeight(1, new int[]{1, 1}); + SubscriptionRebalancer.splitByWeight(1, new int[]{1, 1}); } @Test(expected = IllegalArgumentException.class) public void splitByWeightShouldAcceptOnlyCorrectData2() { - ExactWeightRebalancer.splitByWeight(2, new int[]{1, 0}); + SubscriptionRebalancer.splitByWeight(2, new int[]{1, 0}); } @Test public void splitByWeightMustCorrectlyWorkOnDifferentValues() { assertArrayEquals( new int[]{1, 1}, - ExactWeightRebalancer.splitByWeight(2, new int[]{1, 1})); + SubscriptionRebalancer.splitByWeight(2, new int[]{1, 1})); assertArrayEquals( new int[]{2, 1}, - ExactWeightRebalancer.splitByWeight(3, new int[]{1, 1})); + SubscriptionRebalancer.splitByWeight(3, new int[]{1, 1})); assertArrayEquals( new int[]{1, 2}, - ExactWeightRebalancer.splitByWeight(3, new int[]{1, 2})); + SubscriptionRebalancer.splitByWeight(3, new int[]{1, 2})); assertArrayEquals( new int[]{34, 33, 33}, - ExactWeightRebalancer.splitByWeight(100, new int[]{1, 1, 1})); + SubscriptionRebalancer.splitByWeight(100, new int[]{1, 1, 1})); assertArrayEquals( new int[]{26, 25, 50}, - ExactWeightRebalancer.splitByWeight(101, new int[]{1, 1, 2})); + SubscriptionRebalancer.splitByWeight(101, new int[]{1, 1, 2})); + } + + @Test + public void directlyRequestedPartitionsAreCaptured() { + final Partition[] changeset = new SubscriptionRebalancer().apply( + ImmutableList.of( + new Session("s1", 1), + new Session("s2", 1), + new Session("s3", 1, ImmutableList.of( + new EventTypePartition("et1", "p1"), + new EventTypePartition("et1", "p4")))), + new Partition[]{ + new Partition("et1", "p1", "s7", null, ASSIGNED), + new Partition("et1", "p2", "s7", null, ASSIGNED), + new Partition("et1", "p3", "s7", null, ASSIGNED), + new Partition("et1", "p4", "s7", null, ASSIGNED)}); + + assertEquals(newHashSet(changeset), newHashSet( + new Partition("et1", "p1", "s3", null, ASSIGNED), + new Partition("et1", "p2", "s1", null, ASSIGNED), + new Partition("et1", "p3", "s2", null, ASSIGNED), + new Partition("et1", "p4", "s3", null, ASSIGNED))); + } + + @Test + public void directlyAssignedPartitionsAreNotTransferred() { + final Partition[] changeset = new SubscriptionRebalancer().apply( + ImmutableList.of( + new Session("s1", 1, ImmutableList.of(new EventTypePartition("et1", "p1"))), + new Session("s2", 1)), + new Partition[]{ + new Partition("et1", "p1", "s1", null, ASSIGNED), + new Partition("et1", "p2", null, null, UNASSIGNED)}); + + assertEquals(newHashSet(changeset), newHashSet( + new Partition("et1", "p2", "s2", null, ASSIGNED))); + } + + @Test + public void directlyRequestedPartitionsAreTransferred() { + final Partition[] changeset = new SubscriptionRebalancer().apply( + ImmutableList.of( + new Session("s1", 1), + new Session("s2", 1), + new Session("s3", 1, ImmutableList.of( + new EventTypePartition("et1", "p1"), + new EventTypePartition("et1", "p4")))), + new Partition[]{ + new Partition("et1", "p1", "s1", null, ASSIGNED), + new Partition("et1", "p2", "s1", null, ASSIGNED), + new Partition("et1", "p3", "s2", null, ASSIGNED), + new Partition("et1", "p4", "s2", null, ASSIGNED)}); + + assertEquals(newHashSet(changeset), newHashSet( + new Partition("et1", "p1", "s1", "s3", REASSIGNING), + new Partition("et1", "p4", "s2", "s3", REASSIGNING))); + } + + @Test + public void onlyDirectSessionsWorkFine() { + final Partition[] changeset = new SubscriptionRebalancer().apply( + ImmutableList.of( + new Session("s1", 1, ImmutableList.of(new EventTypePartition("et1", "p3"))), + new Session("s2", 1, ImmutableList.of(new EventTypePartition("et1", "p2")))), + new Partition[]{ + new Partition("et1", "p1", null, null, UNASSIGNED), + new Partition("et1", "p2", null, null, UNASSIGNED), + new Partition("et1", "p3", null, null, UNASSIGNED), + new Partition("et1", "p4", null, null, UNASSIGNED)}); + + assertEquals(newHashSet(changeset), newHashSet( + new Partition("et1", "p3", "s1", null, ASSIGNED), + new Partition("et1", "p2", "s2", null, ASSIGNED))); } @Test public void rebalanceShouldHaveEmptyChangesetForBalancedData() { - final ExactWeightRebalancer rebalancer = new ExactWeightRebalancer(); + final SubscriptionRebalancer rebalancer = new SubscriptionRebalancer(); // 1. Data contains only assigned - final List sessions = Arrays.asList(new Session("0", 1), new Session("1", 1)); + final List sessions = ImmutableList.of( + new Session("0", 1), + new Session("1", 1)); assertThat(rebalancer.apply(sessions, new Partition[]{ new Partition("0", "0", "0", null, ASSIGNED), @@ -84,8 +161,8 @@ public void rebalanceShouldHaveEmptyChangesetForBalancedData() { @Test public void rebalanceShouldRemoveDeadSessions() { - final Partition[] changeset = new ExactWeightRebalancer().apply( - Arrays.asList(new Session("1", 1), new Session("2", 1)), + final Partition[] changeset = new SubscriptionRebalancer().apply( + ImmutableList.of(new Session("1", 1), new Session("2", 1)), new Partition[]{ new Partition("0", "0", "0", null, ASSIGNED), new Partition("0", "1", "0", "1", REASSIGNING), @@ -106,8 +183,8 @@ public void rebalanceShouldRemoveDeadSessions() { @Test public void rebalanceShouldMoveToReassigningState() { - final Partition[] changeset = new ExactWeightRebalancer().apply( - Arrays.asList(new Session("1", 1), new Session("2", 1), new Session("3", 1)), + final Partition[] changeset = new SubscriptionRebalancer().apply( + ImmutableList.of(new Session("1", 1), new Session("2", 1), new Session("3", 1)), new Partition[]{ new Partition("0", "0", "1", null, ASSIGNED), new Partition("0", "1", "1", null, ASSIGNED), @@ -125,8 +202,8 @@ public void rebalanceShouldMoveToReassigningState() { @Test public void rebalanceShouldTakeRebalancingPartitions() { - final Partition[] changeset = new ExactWeightRebalancer().apply( - Arrays.asList(new Session("1", 1), new Session("2", 1), new Session("3", 1)), + final Partition[] changeset = new SubscriptionRebalancer().apply( + ImmutableList.of(new Session("1", 1), new Session("2", 1), new Session("3", 1)), new Partition[]{ new Partition("0", "0", "1", null, ASSIGNED), new Partition("0", "1", "1", null, ASSIGNED), diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java index 0f7e060b77..c582af5d65 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java @@ -35,6 +35,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.zalando.nakadi.service.subscription.StreamParametersTest.createStreamParameters; public class StreamingStateTest { @@ -73,7 +74,7 @@ public void prepareMocks() throws Exception { final Client client = mock(Client.class); when(client.getClientId()).thenReturn("consumingAppId"); - final StreamParameters spMock = StreamParameters.of( + final StreamParameters spMock = createStreamParameters( 1000, 100L, 100, diff --git a/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNodeTest.java b/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNodeTest.java index f8ddac5fc3..4da299035f 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNodeTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNodeTest.java @@ -3,6 +3,7 @@ import com.google.common.collect.ImmutableList; import org.junit.Before; import org.junit.Test; +import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; @@ -10,6 +11,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; +import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.AUTO; +import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.DIRECT; public class ZkSubscriptionNodeTest { @@ -25,7 +28,7 @@ public void before() { ); final List sessions = ImmutableList.of( - new Session("stream1", 1), + new Session("stream1", 1, ImmutableList.of(new EventTypePartition("et1", "0"))), new Session("stream2", 1), new Session("stream3", 1), new Session("stream4", 1) @@ -49,4 +52,12 @@ public void whenGuessStateThenOk() { assertThat(zkSubscriptionNode.guessState("et2", "0"), equalTo(Partition.State.UNASSIGNED)); assertThat(zkSubscriptionNode.guessState("et2", "1"), equalTo(Partition.State.UNASSIGNED)); } + + @Test + public void whenGetPartitionAssignmentTypeThenOk() { + assertThat(zkSubscriptionNode.getPartitionAssignmentType("et1", "0"), equalTo(DIRECT)); + assertThat(zkSubscriptionNode.getPartitionAssignmentType("et1", "1"), equalTo(AUTO)); + assertThat(zkSubscriptionNode.getPartitionAssignmentType("et2", "0"), equalTo(AUTO)); + assertThat(zkSubscriptionNode.getPartitionAssignmentType("et2", "1"), equalTo(null)); + } }