Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/master' into aruha-1402-kpi-toke…
Browse files Browse the repository at this point in the history
…n-realm
  • Loading branch information
rcillo committed Jan 18, 2018
2 parents 275907f + 69a3956 commit fa65407
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 54 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Added
- Allow to patch subscription cursors in case when they were not initialized

### Fixed
- Allow to update event types with recursion in schemas

Expand Down
4 changes: 4 additions & 0 deletions docs/_data/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,10 @@ paths:
During this operation the subscription's consumers will be disconnected. The request can hang up until
subscription commit timeout. During that time requests to subscription streaming endpoint
will be rejected with 409. The clients should reconnect once the request is finished with 204.
In case, when subscription was never streamed, and therefore does not have cursors initialized, this call
will first initialize starting offsets, and then perform actual patch.
In order to provide explicit cursor initialization functionality this method supports empty cursors list,
allowing to initialize subscription cursors without side effects.
parameters:
- name: subscription_id
in: path
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.zalando.nakadi.webservice.hila;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -19,14 +20,15 @@
import org.zalando.nakadi.utils.JsonTestHelper;
import org.zalando.nakadi.utils.RandomSubscriptionBuilder;
import org.zalando.nakadi.view.Cursor;
import org.zalando.nakadi.view.EventTypePartitionView;
import org.zalando.nakadi.view.SubscriptionCursor;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;
import org.zalando.nakadi.webservice.BaseAT;
import org.zalando.nakadi.webservice.SettingsControllerAT;
import org.zalando.nakadi.webservice.utils.NakadiTestUtils;
import org.zalando.nakadi.webservice.utils.TestStreamingClient;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,7 +41,6 @@
import static org.apache.http.HttpStatus.SC_CONFLICT;
import static org.apache.http.HttpStatus.SC_NO_CONTENT;
import static org.apache.http.HttpStatus.SC_OK;
import static org.apache.http.HttpStatus.SC_UNPROCESSABLE_ENTITY;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -439,13 +440,81 @@ public void whenResetCursorsThenStreamFromResetCursorOffset() throws Exception {
Assert.assertEquals("001-0001-000000000000000005", client2.getBatches().get(0).getCursor().getOffset());
}

@Test
public void whenResetWithEmptyCursorsThen422() throws Exception {
@Test(timeout = 15000)
public void whenPatchThenCursorsAreInitializedToDefault() throws Exception {
final EventType et = createEventType();
publishEvents(et.getName(), 10, i -> "{\"foo\": \"bar\"}");
Thread.sleep(1000L);
final Subscription s = createSubscription(RandomSubscriptionBuilder.builder()
.withEventType(et.getName())
.withStartFrom(END)
.buildSubscriptionBase());
given()
.body(MAPPER.writeValueAsString(new ItemsWrapper<>(new ArrayList<SubscriptionCursor>())))
.body(MAPPER.writeValueAsString(new ItemsWrapper<>(Collections.emptyList())))
.contentType(JSON)
.patch("/subscriptions/{id}/cursors", subscription.getId())
.patch("/subscriptions/{id}/cursors", s.getId())
.then()
.statusCode(SC_UNPROCESSABLE_ENTITY);
.statusCode(SC_NO_CONTENT);

final ItemsWrapper<SubscriptionCursor> subscriptionCursors = MAPPER.readValue(
given().get("/subscriptions/{id}/cursors", s.getId()).getBody().asString(),
new TypeReference<ItemsWrapper<SubscriptionCursor>>() {
}
);
final List<EventTypePartitionView> etStats = MAPPER.readValue(
given().get("/event-types/{et}/partitions", et.getName()).getBody().asString(),
new TypeReference<List<EventTypePartitionView>>() {
}
);
Assert.assertEquals(subscriptionCursors.getItems().size(), etStats.size());
subscriptionCursors.getItems().forEach(sCursor -> {
final boolean offsetSame = etStats.stream()
.anyMatch(ss -> ss.getPartitionId().equals(sCursor.getPartition()) &&
ss.getNewestAvailableOffset().equals(sCursor.getOffset()));
// Check that after patch cursors are the same as END
Assert.assertTrue(offsetSame);
});
}

@Test(timeout = 15000)
public void whenPatchThenCursorsAreInitializedAndPatched() throws Exception {
final EventType et = createEventType();
publishEvents(et.getName(), 10, i -> "{\"foo\": \"bar\"}");
final List<EventTypePartitionView> etStats = MAPPER.readValue(
given().get("/event-types/{et}/partitions", et.getName()).getBody().asString(),
new TypeReference<List<EventTypePartitionView>>() {
}
);
final EventTypePartitionView begin = etStats.get(0);
final Subscription s = createSubscription(RandomSubscriptionBuilder.builder()
.withEventType(et.getName())
.withStartFrom(END)
.buildSubscriptionBase());
given()
.body(MAPPER.writeValueAsString(new ItemsWrapper<>(Collections.singletonList(
new SubscriptionCursorWithoutToken(
et.getName(), begin.getPartitionId(), begin.getOldestAvailableOffset())
))))
.contentType(JSON)
.patch("/subscriptions/{id}/cursors", s.getId())
.then()
.statusCode(SC_NO_CONTENT);

final ItemsWrapper<SubscriptionCursor> subscriptionCursors = MAPPER.readValue(
given().get("/subscriptions/{id}/cursors", s.getId()).getBody().asString(),
new TypeReference<ItemsWrapper<SubscriptionCursor>>() {
}
);

Assert.assertEquals(subscriptionCursors.getItems().size(), etStats.size());

subscriptionCursors.getItems().forEach(item -> {
if (item.getPartition().equals(begin.getPartitionId())) {
Assert.assertEquals(begin.getOldestAvailableOffset(), item.getOffset());
} else {
Assert.assertEquals(begin.getNewestAvailableOffset(), item.getOffset());
}
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.zalando.nakadi.exceptions.InvalidCursorException;
import org.zalando.nakadi.exceptions.InvalidStreamIdException;
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.UnableProcessException;
Expand Down Expand Up @@ -46,10 +47,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.springframework.http.HttpStatus.OK;
import static org.springframework.http.ResponseEntity.noContent;
import static org.springframework.http.ResponseEntity.ok;
import static org.springframework.http.ResponseEntity.status;
import static org.zalando.nakadi.util.FeatureToggleService.Feature.HIGH_LEVEL_API;
import static org.zalando.problem.MoreStatus.UNPROCESSABLE_ENTITY;
import static org.zalando.problem.spring.web.advice.Responses.create;
Expand All @@ -76,17 +75,16 @@ public CursorsController(final CursorsService cursorsService,
}

@RequestMapping(path = "/subscriptions/{subscriptionId}/cursors", method = RequestMethod.GET)
public ResponseEntity<?> getCursors(@PathVariable("subscriptionId") final String subscriptionId,
final NativeWebRequest request) {
public ItemsWrapper<SubscriptionCursor> getCursors(@PathVariable("subscriptionId") final String subscriptionId) {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);
try {
final List<SubscriptionCursor> cursors = cursorsService.getSubscriptionCursors(subscriptionId)
.stream()
.map(cursor -> cursor.withToken(cursorTokenService.generateToken()))
.collect(Collectors.toList());
return status(OK).body(new ItemsWrapper<>(cursors));
return new ItemsWrapper<>(cursors);
} catch (final NakadiException e) {
return create(e.asProblem(), request);
throw new NakadiRuntimeException(e);
}
}

Expand Down Expand Up @@ -138,9 +136,6 @@ public ResponseEntity<?> resetCursors(
@Valid @RequestBody final ItemsWrapper<SubscriptionCursorWithoutToken> cursors,
final NativeWebRequest request) {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);
if (cursors.getItems().isEmpty()) {
throw new CursorsAreEmptyException();
}
try {
cursorsService.resetCursors(subscriptionId, convertToNakadiCursors(cursors));
return noContent().build();
Expand Down
19 changes: 13 additions & 6 deletions src/main/java/org/zalando/nakadi/service/CursorsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.zalando.nakadi.repository.db.EventTypeCache;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.state.StartingState;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
import org.zalando.nakadi.service.subscription.zk.SubscriptionNotInitializedException;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
Expand Down Expand Up @@ -154,18 +155,24 @@ public void resetCursors(final String subscriptionId, final List<NakadiCursor> c
final Map<TopicRepository, List<NakadiCursor>> topicRepositories = cursors.stream().collect(
Collectors.groupingBy(
c -> timelineService.getTopicRepository(c.getTimeline())));
for (final Map.Entry<TopicRepository, List<NakadiCursor>> entry: topicRepositories.entrySet()) {
for (final Map.Entry<TopicRepository, List<NakadiCursor>> entry : topicRepositories.entrySet()) {
entry.getKey().validateReadCursors(entry.getValue());
}

final ZkSubscriptionClient zkClient = zkSubscriptionFactory.createClient(
subscription, "subscription." + subscriptionId + ".reset_cursors");

// In case if subscription was never initialized - initialize it
zkClient.runLocked(() -> StartingState.initializeSubscriptionLocked(
zkClient, subscription, timelineService, cursorConverter));
// add 1 second to commit timeout in order to give time to finish reset if there is uncommitted events
final long timeout = TimeUnit.SECONDS.toMillis(nakadiSettings.getDefaultCommitTimeoutSeconds()) +
TimeUnit.SECONDS.toMillis(1);
zkClient.resetCursors(
cursors.stream().map(cursorConverter::convertToNoToken).collect(Collectors.toList()),
timeout);
if (!cursors.isEmpty()) {
final long timeout = TimeUnit.SECONDS.toMillis(nakadiSettings.getDefaultCommitTimeoutSeconds()) +
TimeUnit.SECONDS.toMillis(1);
zkClient.resetCursors(
cursors.stream().map(cursorConverter::convertToNoToken).collect(Collectors.toList()),
timeout);
}
}

private void validateSubscriptionCommitCursors(final Subscription subscription, final List<NakadiCursor> cursors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.model.Session;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

Expand Down Expand Up @@ -55,14 +56,9 @@ protected Response.StatusType getStatus() {
* 4. Switches to streaming state.
*/
private void createSubscriptionLocked() {
// check that subscription initialized in zk.
if (!getZk().isSubscriptionCreatedAndInitialized()) {
final List<SubscriptionCursorWithoutToken> cursors = calculateStartPosition(
getContext().getSubscription(),
getContext().getTimelineService(),
getContext().getCursorConverter());
getZk().fillEmptySubscription(cursors);
} else {
final boolean subscriptionJustInitialized = initializeSubscriptionLocked(getZk(),
getContext().getSubscription(), getContext().getTimelineService(), getContext().getCursorConverter());
if (!subscriptionJustInitialized) {
final Session[] sessions = getZk().listSessions();
final Partition[] partitions = getZk().listPartitions();
if (sessions.length >= partitions.length) {
Expand Down Expand Up @@ -98,6 +94,20 @@ protected Response.StatusType getStatus() {
}
}

public static boolean initializeSubscriptionLocked(
final ZkSubscriptionClient zkClient,
final Subscription subscription,
final TimelineService timelineService,
final CursorConverter cursorConverter) {
if (!zkClient.isSubscriptionCreatedAndInitialized()) {
final List<SubscriptionCursorWithoutToken> cursors = calculateStartPosition(
subscription, timelineService, cursorConverter);
zkClient.fillEmptySubscription(cursors);
return true;
}
return false;
}

public interface PositionCalculator {
Subscription.InitialPosition getType();

Expand All @@ -117,30 +127,30 @@ public List<SubscriptionCursorWithoutToken> calculate(
final Subscription subscription,
final TimelineService timelineService,
final CursorConverter converter) {
return subscription.getEventTypes()
.stream()
.map(et -> {
try {
// get oldest active timeline
return timelineService.getActiveTimelinesOrdered(et).get(0);
} catch (final NakadiException e) {
throw new NakadiRuntimeException(e);
}
})
.collect(groupingBy(Timeline::getStorage)) // for performance reasons. See ARUHA-1387
.values()
.stream()
.flatMap(timelines -> {
try {
return timelineService.getTopicRepository(timelines.get(0))
.loadTopicStatistics(timelines).stream();
} catch (final ServiceUnavailableException e) {
throw new NakadiRuntimeException(e);
}
})
.map(PartitionStatistics::getBeforeFirst)
.map(converter::convertToNoToken)
.collect(Collectors.toList());
return subscription.getEventTypes()
.stream()
.map(et -> {
try {
// get oldest active timeline
return timelineService.getActiveTimelinesOrdered(et).get(0);
} catch (final NakadiException e) {
throw new NakadiRuntimeException(e);
}
})
.collect(groupingBy(Timeline::getStorage)) // for performance reasons. See ARUHA-1387
.values()
.stream()
.flatMap(timelines -> {
try {
return timelineService.getTopicRepository(timelines.get(0))
.loadTopicStatistics(timelines).stream();
} catch (final ServiceUnavailableException e) {
throw new NakadiRuntimeException(e);
}
})
.map(PartitionStatistics::getBeforeFirst)
.map(converter::convertToNoToken)
.collect(Collectors.toList());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ public class EventTypePartitionView {
private String oldestAvailableOffset;
private String newestAvailableOffset;

public EventTypePartitionView() {
}

public EventTypePartitionView(final String eventType, final String partitionId, final String oldestAvailableOffset,
final String newestAvailableOffset) {
this.eventType = eventType;
Expand Down

0 comments on commit fa65407

Please sign in to comment.