From 2332bc1cdb220adbd69f9d717cfcc69802fc3e14 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Wed, 29 Nov 2017 17:13:48 +0100 Subject: [PATCH 1/6] ARUHA-1347 Do not fail subscription stats when Kafka cluster does not exist --- .../runtime/UnknownStorageTypeException.java | 12 ++++++++++++ .../nakadi/service/CursorOperationsService.java | 12 ++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/zalando/nakadi/exceptions/runtime/UnknownStorageTypeException.java diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/UnknownStorageTypeException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/UnknownStorageTypeException.java new file mode 100644 index 0000000000..603db17a1b --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/UnknownStorageTypeException.java @@ -0,0 +1,12 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class UnknownStorageTypeException extends MyNakadiRuntimeException1 { + + public UnknownStorageTypeException(final String message) { + super(message); + } + + public UnknownStorageTypeException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java b/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java index c21b03f559..af059b43a0 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java @@ -9,13 +9,16 @@ import org.zalando.nakadi.domain.PartitionEndStatistics; import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.ShiftedNakadiCursor; +import org.zalando.nakadi.domain.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.InvalidCursorException; import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation; import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; +import org.zalando.nakadi.exceptions.runtime.UnknownStorageTypeException; import org.zalando.nakadi.repository.TopicRepository; +import org.zalando.nakadi.repository.kafka.KafkaCursor; import org.zalando.nakadi.service.timeline.TimelineService; import java.util.Collections; @@ -195,8 +198,13 @@ private NakadiCursor moveBack(final ShiftedNakadiCursor cursor) throws InvalidCu } private long numberOfEventsBeforeCursor(final NakadiCursor initialCursor) { - final TopicRepository topicRepository = getTopicRepository(initialCursor.getTimeline()); - return topicRepository.numberOfEventsBeforeCursor(initialCursor); + final Storage.Type storageType = initialCursor.getTimeline().getStorage().getType(); + switch (storageType) { + case KAFKA: + return KafkaCursor.toKafkaOffset(initialCursor.getOffset()); + default: + throw new UnknownStorageTypeException("Unknown storage type: " + storageType.toString()); + } } private Timeline getTimeline(final String eventTypeName, final int order) { From f6be551a7f529cc2830062ff7241236f138b6006 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Wed, 29 Nov 2017 17:38:26 +0100 Subject: [PATCH 2/6] ARUHA-1347 fix tests --- .../zalando/nakadi/service/CursorOperationsServiceTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java b/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java index 5226c5ef00..72effa71cd 100644 --- a/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java @@ -5,6 +5,7 @@ import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.ShiftedNakadiCursor; +import org.zalando.nakadi.domain.Storage; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation; import org.zalando.nakadi.repository.TopicRepository; @@ -304,6 +305,10 @@ private Timeline mockTimeline(final int order, @Nullable final Long latestOffset final Timeline timeline = mock(Timeline.class); when(timeline.getOrder()).thenReturn(order); + final Storage storage = new Storage(); + storage.setType(Storage.Type.KAFKA); + when(timeline.getStorage()).thenReturn(storage); + if (latestOffset == null) { when(timeline.isActive()).thenReturn(false); when(timeline.getLatestPosition()).thenReturn(null); From c499b6f89585c0aa878ad8acd71523a6ae080898 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Wed, 29 Nov 2017 17:44:23 +0100 Subject: [PATCH 3/6] ARUHA-1347 remove numberOfEventsBeforeCursor from TopicRepository --- .../java/org/zalando/nakadi/repository/TopicRepository.java | 2 -- .../nakadi/repository/kafka/KafkaTopicRepository.java | 5 ----- .../org/zalando/nakadi/service/CursorOperationsService.java | 6 ++---- 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/repository/TopicRepository.java b/src/main/java/org/zalando/nakadi/repository/TopicRepository.java index e8eec0c632..4ee6047234 100644 --- a/src/main/java/org/zalando/nakadi/repository/TopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/TopicRepository.java @@ -77,8 +77,6 @@ void validateReadCursors(List cursors) throws InvalidCursorExcepti long totalEventsInPartition(Timeline timeline, String partition); - long numberOfEventsBeforeCursor(NakadiCursor cursor); - void setRetentionTime(String topic, Long retentionMs) throws TopicConfigException; NakadiCursor createBeforeBeginCursor(Timeline timeline, String partition); diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index e01b3ebd88..4158ee2a16 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -470,11 +470,6 @@ public long totalEventsInPartition(final Timeline timeline, final String partiti } } - public long numberOfEventsBeforeCursor(final NakadiCursor cursor) { - // could be -1 in case the cursor points to BEGIN - return KafkaCursor.toKafkaOffset(cursor.getOffset()); - } - @Override public NakadiCursor createBeforeBeginCursor(final Timeline timeline, final String partition) { return new KafkaCursor(timeline.getTopic(), KafkaCursor.toKafkaPartition(partition), -1) diff --git a/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java b/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java index af059b43a0..1de8964500 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java @@ -89,8 +89,7 @@ public List cursorsLag(final String eventTypeName, final List new InvalidCursorOperation(PARTITION_NOT_FOUND)); // trick to avoid -1 position - move cursor to previous timeline while there is no data before // it - while (getTopicRepository(newestPosition.getTimeline()) - .numberOfEventsBeforeCursor(newestPosition) == -1) { + while (numberOfEventsBeforeCursor(newestPosition) == -1) { final int prevOrder = newestPosition.getTimeline().getOrder() - 1; final Timeline prevTimeline = timelines.stream() .filter(t -> t.getOrder() == prevOrder) @@ -168,8 +167,7 @@ private NakadiCursor moveBack(final ShiftedNakadiCursor cursor) throws InvalidCu NakadiCursor currentCursor = new NakadiCursor(cursor.getTimeline(), cursor.getPartition(), cursor.getOffset()); long toMoveBack = -cursor.getShift(); while (toMoveBack > 0) { - final long totalBefore = getTopicRepository(currentCursor.getTimeline()) - .numberOfEventsBeforeCursor(currentCursor); + final long totalBefore = numberOfEventsBeforeCursor(currentCursor); if (totalBefore < toMoveBack) { toMoveBack -= totalBefore + 1; // +1 is because end is inclusive From 25de477ac4dabe175a75bc910ac19f32aa77de2d Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Wed, 29 Nov 2017 17:59:28 +0100 Subject: [PATCH 4/6] ARUHA-1347 Changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index acf6ed8ba9..28e8dd4458 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Fixed + +- Fixed issue with subscription stats, which would fail when the last committed offset is on a storage that was deleted + ## [2.3.0] - 2017-11-14 ### Added From 8018280d636e3097fade9d9c8823cb9a530b1103 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 30 Nov 2017 15:31:42 +0100 Subject: [PATCH 5/6] ARUHA-1347 Test --- .../webservice/hila/SubscriptionAT.java | 6 +++ .../SubscriptionControllerTest.java | 38 ++++++++++++++++++- .../org/zalando/nakadi/utils/TestUtils.java | 8 +++- 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java index c3931d51a1..3c0f8e72ff 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java @@ -388,6 +388,12 @@ public void whenStatsOnNotInitializedSubscriptionThanCorrectResponse() throws IO } } + @Test + public void whenStatsWithOffsetInDeletedStorageThenDoNotFail() throws IOException { + final String et = createEventType().getName(); + final Subscription s = createSubscriptionForEventType(et); + } + private Response commitCursors(final Subscription subscription, final String cursor, final String streamId) { return given() .body(cursor) diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java index e0cc1c4a81..195aa1c669 100644 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java @@ -18,12 +18,14 @@ import org.zalando.nakadi.domain.PaginationLinks; import org.zalando.nakadi.domain.PaginationWrapper; import org.zalando.nakadi.domain.PartitionEndStatistics; +import org.zalando.nakadi.domain.Storage; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionEventTypeStats; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.NoSuchSubscriptionException; import org.zalando.nakadi.exceptions.ServiceUnavailableException; +import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.plugin.api.ApplicationService; import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.repository.TopicRepository; @@ -49,6 +51,7 @@ import javax.ws.rs.core.Response; import java.util.Collections; +import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -74,6 +77,7 @@ import static org.springframework.test.web.servlet.setup.MockMvcBuilders.standaloneSetup; import static org.zalando.nakadi.util.SubscriptionsUriHelper.createSubscriptionListUri; import static org.zalando.nakadi.utils.RandomSubscriptionBuilder.builder; +import static org.zalando.nakadi.utils.TestUtils.buildTimeline; import static org.zalando.nakadi.utils.TestUtils.buildTimelineWithTopic; import static org.zalando.nakadi.utils.TestUtils.createRandomSubscriptions; import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs; @@ -89,6 +93,7 @@ public class SubscriptionControllerTest { private final ZkSubscriptionClient zkSubscriptionClient; private final CursorConverter cursorConverter; private final CursorOperationsService cursorOperationsService; + private final TimelineService timelineService; private static final int PARTITIONS_PER_SUBSCRIPTION = 5; private static final Timeline TIMELINE = buildTimelineWithTopic("topic"); @@ -102,7 +107,7 @@ public SubscriptionControllerTest() throws Exception { final SubscriptionClientFactory zkSubscriptionClientFactory = mock(SubscriptionClientFactory.class); zkSubscriptionClient = mock(ZkSubscriptionClient.class); when(zkSubscriptionClientFactory.createClient(any(), any())).thenReturn(zkSubscriptionClient); - final TimelineService timelineService = mock(TimelineService.class); + timelineService = mock(TimelineService.class); when(timelineService.getActiveTimeline(any())).thenReturn(TIMELINE); when(timelineService.getTopicRepository((EventTypeBase) any())).thenReturn(topicRepository); when(timelineService.getTopicRepository((Timeline) any())).thenReturn(topicRepository); @@ -257,6 +262,37 @@ public void whenGetSubscriptionStatThenOk() throws Exception { TestUtils.JSON_TEST_HELPER.matchesObject(new ItemsWrapper<>(expectedStats)))); } + @Test + public void whenGetSubscriptionStatsWithOffsetFromDeletedStorageThenOk() throws Exception { + Storage deletedStorage = new Storage(); + final Timeline TIMELINE_DELETED_STORAGE = buildTimeline(TIMELINE.getEventType(), deletedStorage, "topic", new Date()); + final Subscription subscription = builder().withEventType(TIMELINE_DELETED_STORAGE.getEventType()).build(); + final Partition[] partitions = { + new Partition(TIMELINE.getEventType(), "0", "xz", null, Partition.State.ASSIGNED)}; + final ZkSubscriptionNode zkSubscriptionNode = new ZkSubscriptionNode(); + zkSubscriptionNode.setPartitions(partitions); + zkSubscriptionNode.setSessions(new Session[]{new Session("xz", 0)}); + when(subscriptionRepository.getSubscription(subscription.getId())).thenReturn(subscription); + when(zkSubscriptionClient.getZkSubscriptionNodeLocked()).thenReturn(zkSubscriptionNode); + final SubscriptionCursorWithoutToken currentOffset = + new SubscriptionCursorWithoutToken(TIMELINE.getEventType(), "0", "3"); + when(zkSubscriptionClient.getOffset(new EventTypePartition(TIMELINE.getEventType(), "0"))) + .thenReturn(currentOffset); + when(eventTypeRepository.findByName(TIMELINE.getEventType())) + .thenReturn(EventTypeTestBuilder.builder().name(TIMELINE.getEventType()).build()); + final List statistics = Collections.singletonList( + new KafkaPartitionEndStatistics(TIMELINE, 0, 13)); + when(topicRepository.loadTopicEndStatistics(eq(Collections.singletonList(TIMELINE)))).thenReturn(statistics); + final NakadiCursor currentCursor = mock(NakadiCursor.class); + when(currentCursor.getEventTypePartition()).thenReturn(new EventTypePartition(TIMELINE.getEventType(), "0")); + when(cursorConverter.convert((List) any())) + .thenReturn(Collections.singletonList(currentCursor)); + when(timelineService.getTopicRepository(TIMELINE_DELETED_STORAGE)).thenThrow(new MyNakadiRuntimeException1("Exception")); + + getSubscriptionStats(subscription.getId()) + .andExpect(status().isOk()); + } + @Test @SuppressWarnings("unchecked") public void whenGetSubscriptionNoEventTypesThenStatEmpty() throws Exception { diff --git a/src/test/java/org/zalando/nakadi/utils/TestUtils.java b/src/test/java/org/zalando/nakadi/utils/TestUtils.java index d999b83545..8f03dd3756 100644 --- a/src/test/java/org/zalando/nakadi/utils/TestUtils.java +++ b/src/test/java/org/zalando/nakadi/utils/TestUtils.java @@ -213,8 +213,12 @@ public static Timeline buildTimeline(final String etName) { return new Timeline(etName, 0, new Storage(), randomUUID(), new Date()); } - public static Timeline buildTimeline(final String etName, final String topic, final Date cratedAt) { - return new Timeline(etName, 0, new Storage(), topic, cratedAt); + public static Timeline buildTimeline(final String etName, final String topic, final Date createdAt) { + return new Timeline(etName, 0, new Storage(), topic, createdAt); + } + + public static Timeline buildTimeline(final String etName, final Storage storage, final String topic, final Date createdAt) { + return new Timeline(etName, 0, storage, topic, createdAt); } public static Timeline buildTimelineWithTopic(final String topic) { From 2d19b739e02a23ac31f8deced90ee779e40690e9 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 30 Nov 2017 16:12:14 +0100 Subject: [PATCH 6/6] ARUHA-1347 Move static method --- .../webservice/hila/SubscriptionAT.java | 6 ---- .../nakadi/repository/TopicRepository.java | 2 -- .../kafka/KafkaTopicRepository.java | 14 -------- .../service/CursorOperationsService.java | 17 +++++++-- .../SubscriptionControllerTest.java | 35 ------------------- .../org/zalando/nakadi/utils/TestUtils.java | 4 --- 6 files changed, 15 insertions(+), 63 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java index 3c0f8e72ff..c3931d51a1 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java @@ -388,12 +388,6 @@ public void whenStatsOnNotInitializedSubscriptionThanCorrectResponse() throws IO } } - @Test - public void whenStatsWithOffsetInDeletedStorageThenDoNotFail() throws IOException { - final String et = createEventType().getName(); - final Subscription s = createSubscriptionForEventType(et); - } - private Response commitCursors(final Subscription subscription, final String cursor, final String streamId) { return given() .body(cursor) diff --git a/src/main/java/org/zalando/nakadi/repository/TopicRepository.java b/src/main/java/org/zalando/nakadi/repository/TopicRepository.java index 4ee6047234..9b3915afb8 100644 --- a/src/main/java/org/zalando/nakadi/repository/TopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/TopicRepository.java @@ -75,8 +75,6 @@ void validateReadCursors(List cursors) throws InvalidCursorExcepti void validateCommitCursor(NakadiCursor cursor) throws InvalidCursorException; - long totalEventsInPartition(Timeline timeline, String partition); - void setRetentionTime(String topic, Long retentionMs) throws TopicConfigException; NakadiCursor createBeforeBeginCursor(Timeline timeline, String partition); diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 4158ee2a16..4c8480c80a 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -33,7 +33,6 @@ import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.TopicCreationException; import org.zalando.nakadi.exceptions.TopicDeletionException; -import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation; import org.zalando.nakadi.exceptions.runtime.TopicConfigException; import org.zalando.nakadi.exceptions.runtime.TopicRepositoryException; import org.zalando.nakadi.repository.EventConsumer; @@ -457,19 +456,6 @@ public int compareOffsets(final NakadiCursor first, final NakadiCursor second) t return KafkaCursor.fromNakadiCursor(first).compareTo(KafkaCursor.fromNakadiCursor(second)); } - // Method can work only with finished timeline (e.g. it will break for active timeline) - public long totalEventsInPartition(final Timeline timeline, final String partitionString) - throws InvalidCursorOperation { - final Timeline.StoragePosition positions = timeline.getLatestPosition(); - - try { - return 1 + ((Timeline.KafkaStoragePosition) positions).getLastOffsetForPartition( - KafkaCursor.toKafkaPartition(partitionString)); - } catch (final IllegalArgumentException ex) { - throw new InvalidCursorOperation(InvalidCursorOperation.Reason.PARTITION_NOT_FOUND); - } - } - @Override public NakadiCursor createBeforeBeginCursor(final Timeline timeline, final String partition) { return new KafkaCursor(timeline.getTopic(), KafkaCursor.toKafkaPartition(partition), -1) diff --git a/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java b/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java index 1de8964500..e4f02c5930 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java @@ -58,8 +58,7 @@ public long calculateDistance(final NakadiCursor initialCursor, final NakadiCurs for (int order = startOrder; order < Math.max(initialOrder, finalOrder); ++order) { final Timeline timeline = getTimeline(initialCursor.getEventType(), order); - final long eventsTotal = getTopicRepository(timeline).totalEventsInPartition( - timeline, initialCursor.getPartition()); + final long eventsTotal = totalEventsInPartition(timeline, initialCursor.getPartition()); result += (finalOrder > initialOrder) ? eventsTotal : -eventsTotal; } return result; @@ -221,4 +220,18 @@ private Timeline getTimeline(final String eventTypeName, final int order) { private TopicRepository getTopicRepository(final Timeline timeline) { return timelineService.getTopicRepository(timeline); } + + + // Method can work only with finished timeline (e.g. it will break for active timeline) + private long totalEventsInPartition(final Timeline timeline, final String partitionString) + throws InvalidCursorOperation { + final Timeline.StoragePosition positions = timeline.getLatestPosition(); + + try { + return 1 + ((Timeline.KafkaStoragePosition) positions).getLastOffsetForPartition( + KafkaCursor.toKafkaPartition(partitionString)); + } catch (final IllegalArgumentException ex) { + throw new InvalidCursorOperation(InvalidCursorOperation.Reason.PARTITION_NOT_FOUND); + } + } } diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java index 195aa1c669..fccc0052b2 100644 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java @@ -18,14 +18,12 @@ import org.zalando.nakadi.domain.PaginationLinks; import org.zalando.nakadi.domain.PaginationWrapper; import org.zalando.nakadi.domain.PartitionEndStatistics; -import org.zalando.nakadi.domain.Storage; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionEventTypeStats; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.NoSuchSubscriptionException; import org.zalando.nakadi.exceptions.ServiceUnavailableException; -import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.plugin.api.ApplicationService; import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.repository.TopicRepository; @@ -51,7 +49,6 @@ import javax.ws.rs.core.Response; import java.util.Collections; -import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -77,7 +74,6 @@ import static org.springframework.test.web.servlet.setup.MockMvcBuilders.standaloneSetup; import static org.zalando.nakadi.util.SubscriptionsUriHelper.createSubscriptionListUri; import static org.zalando.nakadi.utils.RandomSubscriptionBuilder.builder; -import static org.zalando.nakadi.utils.TestUtils.buildTimeline; import static org.zalando.nakadi.utils.TestUtils.buildTimelineWithTopic; import static org.zalando.nakadi.utils.TestUtils.createRandomSubscriptions; import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs; @@ -262,37 +258,6 @@ public void whenGetSubscriptionStatThenOk() throws Exception { TestUtils.JSON_TEST_HELPER.matchesObject(new ItemsWrapper<>(expectedStats)))); } - @Test - public void whenGetSubscriptionStatsWithOffsetFromDeletedStorageThenOk() throws Exception { - Storage deletedStorage = new Storage(); - final Timeline TIMELINE_DELETED_STORAGE = buildTimeline(TIMELINE.getEventType(), deletedStorage, "topic", new Date()); - final Subscription subscription = builder().withEventType(TIMELINE_DELETED_STORAGE.getEventType()).build(); - final Partition[] partitions = { - new Partition(TIMELINE.getEventType(), "0", "xz", null, Partition.State.ASSIGNED)}; - final ZkSubscriptionNode zkSubscriptionNode = new ZkSubscriptionNode(); - zkSubscriptionNode.setPartitions(partitions); - zkSubscriptionNode.setSessions(new Session[]{new Session("xz", 0)}); - when(subscriptionRepository.getSubscription(subscription.getId())).thenReturn(subscription); - when(zkSubscriptionClient.getZkSubscriptionNodeLocked()).thenReturn(zkSubscriptionNode); - final SubscriptionCursorWithoutToken currentOffset = - new SubscriptionCursorWithoutToken(TIMELINE.getEventType(), "0", "3"); - when(zkSubscriptionClient.getOffset(new EventTypePartition(TIMELINE.getEventType(), "0"))) - .thenReturn(currentOffset); - when(eventTypeRepository.findByName(TIMELINE.getEventType())) - .thenReturn(EventTypeTestBuilder.builder().name(TIMELINE.getEventType()).build()); - final List statistics = Collections.singletonList( - new KafkaPartitionEndStatistics(TIMELINE, 0, 13)); - when(topicRepository.loadTopicEndStatistics(eq(Collections.singletonList(TIMELINE)))).thenReturn(statistics); - final NakadiCursor currentCursor = mock(NakadiCursor.class); - when(currentCursor.getEventTypePartition()).thenReturn(new EventTypePartition(TIMELINE.getEventType(), "0")); - when(cursorConverter.convert((List) any())) - .thenReturn(Collections.singletonList(currentCursor)); - when(timelineService.getTopicRepository(TIMELINE_DELETED_STORAGE)).thenThrow(new MyNakadiRuntimeException1("Exception")); - - getSubscriptionStats(subscription.getId()) - .andExpect(status().isOk()); - } - @Test @SuppressWarnings("unchecked") public void whenGetSubscriptionNoEventTypesThenStatEmpty() throws Exception { diff --git a/src/test/java/org/zalando/nakadi/utils/TestUtils.java b/src/test/java/org/zalando/nakadi/utils/TestUtils.java index 8f03dd3756..2bea9f2240 100644 --- a/src/test/java/org/zalando/nakadi/utils/TestUtils.java +++ b/src/test/java/org/zalando/nakadi/utils/TestUtils.java @@ -217,10 +217,6 @@ public static Timeline buildTimeline(final String etName, final String topic, fi return new Timeline(etName, 0, new Storage(), topic, createdAt); } - public static Timeline buildTimeline(final String etName, final Storage storage, final String topic, final Date createdAt) { - return new Timeline(etName, 0, storage, topic, createdAt); - } - public static Timeline buildTimelineWithTopic(final String topic) { return new Timeline(randomUUID(), 0, new Storage(), topic, new Date()); }