Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #848 from zalando/ARUHA-1501-choose-partition-hila
Browse files Browse the repository at this point in the history
Aruha 1501 choose partition hila
  • Loading branch information
v-stepanov authored Mar 26, 2018
2 parents 8d51e32 + ad8b37f commit 43f1803
Show file tree
Hide file tree
Showing 40 changed files with 1,003 additions and 261 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [2.6.0] - 2018-03-26

### Added
- Allow to select partitions to read from a subscription

## [2.5.10] - 2018-03-26

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void before() throws Exception {
final TimelineService timelineService = mock(TimelineService.class);
when(timelineService.getTopicRepository((Timeline) any())).thenReturn(topicRepository);
timeline = buildTimeline(etName, topic, CREATED_AT);
when(timelineService.getActiveTimeline(any())).thenReturn(timeline);
when(timelineService.getActiveTimeline(any(EventType.class))).thenReturn(timeline);

final Subscription subscription = mock(Subscription.class);
when(subscription.getId()).thenReturn(sid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.http.HttpStatus;
import org.hamcrest.Matchers;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -31,6 +32,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -49,12 +51,15 @@
import static org.hamcrest.Matchers.not;
import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN;
import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.END;
import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.AUTO;
import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.DIRECT;
import static org.zalando.nakadi.utils.TestUtils.waitFor;
import static org.zalando.nakadi.webservice.hila.StreamBatch.MatcherIgnoringToken.equalToBatchIgnoringToken;
import static org.zalando.nakadi.webservice.hila.StreamBatch.singleEventBatch;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createEventType;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.getNumberOfAssignedStreams;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvent;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvents;
import static org.zalando.nakadi.webservice.utils.TestStreamingClient.SESSION_ID_UNKNOWN;
Expand Down Expand Up @@ -295,8 +300,12 @@ public void testGetSubscriptionStat() throws Exception {
List<SubscriptionEventTypeStats> subscriptionStats =
Collections.singletonList(new SubscriptionEventTypeStats(
eventType.getName(),
Collections.singletonList(
new SubscriptionEventTypeStats.Partition("0", "assigned", 15L, client.getSessionId())))
Collections.singletonList(new SubscriptionEventTypeStats.Partition(
"0",
"assigned",
15L,
client.getSessionId(),
AUTO)))
);
NakadiTestUtils.getSubscriptionStat(subscription)
.then()
Expand All @@ -309,14 +318,42 @@ public void testGetSubscriptionStat() throws Exception {
subscriptionStats =
Collections.singletonList(new SubscriptionEventTypeStats(
eventType.getName(),
Collections.singletonList(
new SubscriptionEventTypeStats.Partition("0", "assigned", 5L, client.getSessionId())))
Collections.singletonList(new SubscriptionEventTypeStats.Partition(
"0",
"assigned",
5L,
client.getSessionId(),
AUTO)))
);
NakadiTestUtils.getSubscriptionStat(subscription)
.then()
.content(new StringContains(JSON_TEST_HELPER.asJsonString(new ItemsWrapper<>(subscriptionStats))));
}

@Test(timeout = 10000)
public void testGetSubscriptionStatWhenDirectAssignment() throws Exception {
// connect with 1 stream directly requesting the partition
final TestStreamingClient client = new TestStreamingClient(URL, subscription.getId(), "",
Optional.empty(),
Optional.of("{\"partitions\":[" +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"0\"}]}"));
client.start();
// wait for rebalance to finish
waitFor(() -> assertThat(getNumberOfAssignedStreams(subscription.getId()), Matchers.is(1)));

NakadiTestUtils.getSubscriptionStat(subscription)
.then()
.content(new StringContains(JSON_TEST_HELPER.asJsonString(new SubscriptionEventTypeStats(
eventType.getName(),
Collections.singletonList(new SubscriptionEventTypeStats.Partition(
"0",
"assigned",
0L,
client.getSessionId(),
DIRECT
))))));
}

@Test
public void testSubscriptionStatsMultiET() throws IOException {
final List<EventType> eventTypes = Lists.newArrayList(createEventType(), createEventType());
Expand Down Expand Up @@ -345,15 +382,17 @@ public void testSubscriptionStatsMultiET() throws IOException {
"0",
"assigned",
1L,
client.getSessionId()
client.getSessionId(),
AUTO
))))))
.content(new StringContains(JSON_TEST_HELPER.asJsonString(new SubscriptionEventTypeStats(
eventTypes.get(1).getName(),
Collections.singletonList(new SubscriptionEventTypeStats.Partition(
"0",
"assigned",
2L,
client.getSessionId()
client.getSessionId(),
AUTO
))))));
client.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.zalando.nakadi.webservice.hila;

import com.google.common.collect.ImmutableSet;
import org.junit.Before;
import org.junit.Test;
import org.springframework.http.HttpStatus;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
Expand All @@ -14,6 +16,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.google.common.collect.Sets.intersection;
import static java.util.stream.Collectors.toList;
Expand All @@ -28,6 +31,7 @@
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createBusinessEventTypeWithPartitions;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.getNumberOfAssignedStreams;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishBusinessEventWithUserDefinedPartition;

public class HilaRebalanceAT extends BaseAT {
Expand Down Expand Up @@ -80,8 +84,8 @@ public void whenRebalanceThenPartitionsAreEquallyDistributedAndCommittedOffsetsA
.create(URL, subscription.getId(), "stream_limit=20&max_uncommitted_events=100")
.start();

// wait for rebalance process to start
Thread.sleep(1000);
// wait for rebalance process to finish
waitFor(() -> assertThat(getNumberOfAssignedStreams(subscription.getId()), is(2)));

// write 5 more events to each partition
publishBusinessEventWithUserDefinedPartition(
Expand All @@ -90,7 +94,7 @@ public void whenRebalanceThenPartitionsAreEquallyDistributedAndCommittedOffsetsA
x -> "blah_" + x,
x -> String.valueOf(x % 8));

// wait till all event arrive
// wait till all events arrive
waitFor(() -> assertThat(clientB.getBatches(), hasSize(20)));
waitFor(() -> assertThat(clientA.getBatches(), hasSize(60)));

Expand All @@ -114,8 +118,8 @@ public void whenRebalanceThenPartitionsAreEquallyDistributedAndCommittedOffsetsA
commitCursors(subscription.getId(), lastCursorsB, clientB.getSessionId());
waitFor(() -> assertThat(clientB.isRunning(), is(false)));

// wait for rebalance process to start
Thread.sleep(1000);
// wait for rebalance process to finish
waitFor(() -> assertThat(getNumberOfAssignedStreams(subscription.getId()), is(1)));

// write 5 more events to each partition
publishBusinessEventWithUserDefinedPartition(
Expand All @@ -125,6 +129,132 @@ public void whenRebalanceThenPartitionsAreEquallyDistributedAndCommittedOffsetsA
waitFor(() -> assertThat(clientA.getBatches(), hasSize(100)));
}

@Test(timeout = 15000)
public void whenDirectlyRequestPartitionsTheyAssignedCorrectly() throws IOException {
// publish 2 events to each partition
publishBusinessEventWithUserDefinedPartition(
eventType.getName(), 16, x -> "blah" + x, x -> String.valueOf(x % 8));

// start a stream requesting to read from partitions 5, 6
final TestStreamingClient client = new TestStreamingClient(URL, subscription.getId(), "", Optional.empty(),
Optional.of("{\"partitions\":[" +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}," +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"5\"}" +
"]}"));
client.start();

// wait till we receive 4 batches (2 per partition)
waitFor(() -> assertThat(client.getBatches(), hasSize(4)));

// check that all batches are from partitions 5, 6
checkAllEventsAreFromPartitions(client, ImmutableSet.of("5", "6"));
}

@Test(timeout = 15000)
public void whenMixedStreamsThenPartitionsAssignedCorrectly() throws IOException, InterruptedException {

// start 2 streams not specifying partitions directly
final TestStreamingClient autoClient1 = new TestStreamingClient(URL, subscription.getId(), "");
autoClient1.start();
final TestStreamingClient autoClient2 = new TestStreamingClient(URL, subscription.getId(), "");
autoClient2.start();

// start a stream requesting to read from partition 6
final TestStreamingClient directClient1 = new TestStreamingClient(URL, subscription.getId(), "",
Optional.empty(),
Optional.of("{\"partitions\":[" +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}]}"));
directClient1.start();

// start a stream requesting to read from partition 7
final TestStreamingClient directClient2 = new TestStreamingClient(URL, subscription.getId(), "",
Optional.empty(),
Optional.of("{\"partitions\":[" +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"7\"}]}"));
directClient2.start();

// wait for rebalance to finish
waitFor(() -> assertThat(getNumberOfAssignedStreams(subscription.getId()), is(4)));

// publish 2 events to each partition
publishBusinessEventWithUserDefinedPartition(
eventType.getName(), 16, x -> "blah" + x, x -> String.valueOf(x % 8));

// we should receive 2 batches for streams that directly requested 1 partition
waitFor(() -> assertThat(directClient1.getBatches(), hasSize(2)));
checkAllEventsAreFromPartitions(directClient1, ImmutableSet.of("6"));

waitFor(() -> assertThat(directClient2.getBatches(), hasSize(2)));
checkAllEventsAreFromPartitions(directClient2, ImmutableSet.of("7"));

// we should receive 6 batches for streams that use auto balance (they read 3 partitions each)
waitFor(() -> assertThat(autoClient1.getBatches(), hasSize(6)));
waitFor(() -> assertThat(autoClient2.getBatches(), hasSize(6)));
}

@Test(timeout = 15000)
public void checkDirectAssignmentCorrectlyCapturesAndReleasesPartition() throws IOException, InterruptedException {
// launch two clients: one using auto-rebalance, second one directly reading from partition 6
final TestStreamingClient autoClient = new TestStreamingClient(URL, subscription.getId(),
"max_uncommitted_events=100");
autoClient.start();
final TestStreamingClient directClient = new TestStreamingClient(URL, subscription.getId(), "",
Optional.empty(),
Optional.of("{\"stream_limit\":1,\"partitions\":[" +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}]}"));
directClient.start();

// wait for rebalance to finish and send 1 event to each partition
waitFor(() -> assertThat(getNumberOfAssignedStreams(subscription.getId()), is(2)));
publishBusinessEventWithUserDefinedPartition(
eventType.getName(), 8, x -> "blah" + x, x -> String.valueOf(x % 8));

waitFor(() -> assertThat(autoClient.getBatches(), hasSize(7)));
checkAllEventsAreFromPartitions(autoClient, ImmutableSet.of("0", "1", "2", "3", "4", "5", "7"));
waitFor(() -> assertThat(directClient.getBatches(), hasSize(1)));
checkAllEventsAreFromPartitions(directClient, ImmutableSet.of("6"));

// commit cursors and wait for stream to be closed (because of reaching stream_limit)
commitCursors(
subscription.getId(),
directClient.getBatches().stream().map(StreamBatch::getCursor).collect(Collectors.toList()),
directClient.getSessionId());
waitFor(() -> assertThat(directClient.isRunning(), is(false)));


// send 1 event to each partition
publishBusinessEventWithUserDefinedPartition(
eventType.getName(), 8, x -> "blah" + x, x -> String.valueOf(x % 8));

// the client with auto-balancing should now get all 8 new events
waitFor(() -> assertThat(autoClient.getBatches(), hasSize(7 + 8)));
checkAllEventsAreFromPartitions(autoClient, ImmutableSet.of("0", "1", "2", "3", "4", "5", "6", "7"));
}

@Test(timeout = 15000)
public void whenTwoStreamsDirectlyRequestOnePartitionThenConflict() throws IOException, InterruptedException {

// first stream wants to read partition 6
final TestStreamingClient client1 = new TestStreamingClient(URL, subscription.getId(), "", Optional.empty(),
Optional.of("{\"partitions\":[" +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}" +
"]}"));
client1.start();
waitFor(() -> assertThat(client1.getResponseCode(), is(HttpStatus.OK.value())));

// second stream wants to read partitions 5 and 6
final TestStreamingClient client2 = new TestStreamingClient(URL, subscription.getId(), "", Optional.empty(),
Optional.of("{\"partitions\":[" +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"5\"}," +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}" +
"]}"));
client2.start();

// check that second client was disconnected and received status code 409
waitFor(() -> assertThat(client2.isRunning(), is(false)));
assertThat(client2.getResponseCode(), is(HttpStatus.CONFLICT.value()));
}

@Test(timeout = 15000)
public void whenNotCommittedThenEventsAreReplayedAfterRebalance() {
publishBusinessEventWithUserDefinedPartition(
Expand All @@ -143,6 +273,34 @@ public void whenNotCommittedThenEventsAreReplayedAfterRebalance() {
waitFor(() -> assertThat(clientB.getBatches(), hasSize(2)), 10000);
}

@Test(timeout = 15000)
public void whenNoFreeSlotsForAutoClientThenConflict() throws IOException, InterruptedException {
// direct client reads all but one partition
final TestStreamingClient directClient = new TestStreamingClient(URL, subscription.getId(), "",
Optional.empty(),
Optional.of("{\"partitions\":[" +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"0\"}," +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"1\"}," +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"2\"}," +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"3\"}," +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"4\"}," +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"5\"}," +
"{\"event_type\":\"" + eventType.getName() + "\",\"partition\":\"6\"}" +
"]}"));
directClient.start();
waitFor(() -> assertThat(directClient.getResponseCode(), is(HttpStatus.OK.value())));

// first auto-balance client reads remaining partition
final TestStreamingClient autoClient1 = new TestStreamingClient(URL, subscription.getId(), "");
autoClient1.start();
waitFor(() -> assertThat(autoClient1.getResponseCode(), is(HttpStatus.OK.value())));

// second auto-balance client has nothing to read - should get 409 (Conflict) status code
final TestStreamingClient autoClient2 = new TestStreamingClient(URL, subscription.getId(), "");
autoClient2.start();
waitFor(() -> assertThat(autoClient2.getResponseCode(), is(HttpStatus.CONFLICT.value())));
}

public List<SubscriptionCursor> getLastCursorsForPartitions(final TestStreamingClient client,
final Set<String> partitions) {
if (!client.getBatches().isEmpty()) {
Expand All @@ -159,6 +317,20 @@ public List<SubscriptionCursor> getLastCursorsForPartitions(final TestStreamingC
}
}

private void checkAllEventsAreFromPartitions(final TestStreamingClient clientA, final Set<String> partitions) {
// check that all batches belong to the specified set of partitions
final List<StreamBatch> batches = clientA.getBatches();
final long batchesFromCorrectPartitions = batches.stream()
.filter(b -> partitions.contains(b.getCursor().getPartition()))
.count();
assertThat(batchesFromCorrectPartitions, is((long) batches.size()));
// check that all partitions are present in batches
final long partitionsWithNoBatch = partitions.stream()
.filter(p -> !batches.stream().anyMatch(b -> b.getCursor().getPartition().equals(p)))
.count();
assertThat(partitionsWithNoBatch, is(0L));
}

private Set<String> getUniquePartitionsStreamedToClient(final TestStreamingClient client) {
return getUniquePartitionsStreamedToClient(client, 0, client.getBatches().size());
}
Expand Down
Loading

0 comments on commit 43f1803

Please sign in to comment.