From 2d19b739e02a23ac31f8deced90ee779e40690e9 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 30 Nov 2017 16:12:14 +0100 Subject: [PATCH] ARUHA-1347 Move static method --- .../webservice/hila/SubscriptionAT.java | 6 ---- .../nakadi/repository/TopicRepository.java | 2 -- .../kafka/KafkaTopicRepository.java | 14 -------- .../service/CursorOperationsService.java | 17 +++++++-- .../SubscriptionControllerTest.java | 35 ------------------- .../org/zalando/nakadi/utils/TestUtils.java | 4 --- 6 files changed, 15 insertions(+), 63 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java index 3c0f8e72ff..c3931d51a1 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/SubscriptionAT.java @@ -388,12 +388,6 @@ public void whenStatsOnNotInitializedSubscriptionThanCorrectResponse() throws IO } } - @Test - public void whenStatsWithOffsetInDeletedStorageThenDoNotFail() throws IOException { - final String et = createEventType().getName(); - final Subscription s = createSubscriptionForEventType(et); - } - private Response commitCursors(final Subscription subscription, final String cursor, final String streamId) { return given() .body(cursor) diff --git a/src/main/java/org/zalando/nakadi/repository/TopicRepository.java b/src/main/java/org/zalando/nakadi/repository/TopicRepository.java index 4ee6047234..9b3915afb8 100644 --- a/src/main/java/org/zalando/nakadi/repository/TopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/TopicRepository.java @@ -75,8 +75,6 @@ void validateReadCursors(List cursors) throws InvalidCursorExcepti void validateCommitCursor(NakadiCursor cursor) throws InvalidCursorException; - long totalEventsInPartition(Timeline timeline, String partition); - void setRetentionTime(String topic, Long retentionMs) throws TopicConfigException; NakadiCursor createBeforeBeginCursor(Timeline timeline, String partition); diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 4158ee2a16..4c8480c80a 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -33,7 +33,6 @@ import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.TopicCreationException; import org.zalando.nakadi.exceptions.TopicDeletionException; -import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation; import org.zalando.nakadi.exceptions.runtime.TopicConfigException; import org.zalando.nakadi.exceptions.runtime.TopicRepositoryException; import org.zalando.nakadi.repository.EventConsumer; @@ -457,19 +456,6 @@ public int compareOffsets(final NakadiCursor first, final NakadiCursor second) t return KafkaCursor.fromNakadiCursor(first).compareTo(KafkaCursor.fromNakadiCursor(second)); } - // Method can work only with finished timeline (e.g. it will break for active timeline) - public long totalEventsInPartition(final Timeline timeline, final String partitionString) - throws InvalidCursorOperation { - final Timeline.StoragePosition positions = timeline.getLatestPosition(); - - try { - return 1 + ((Timeline.KafkaStoragePosition) positions).getLastOffsetForPartition( - KafkaCursor.toKafkaPartition(partitionString)); - } catch (final IllegalArgumentException ex) { - throw new InvalidCursorOperation(InvalidCursorOperation.Reason.PARTITION_NOT_FOUND); - } - } - @Override public NakadiCursor createBeforeBeginCursor(final Timeline timeline, final String partition) { return new KafkaCursor(timeline.getTopic(), KafkaCursor.toKafkaPartition(partition), -1) diff --git a/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java b/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java index 1de8964500..e4f02c5930 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorOperationsService.java @@ -58,8 +58,7 @@ public long calculateDistance(final NakadiCursor initialCursor, final NakadiCurs for (int order = startOrder; order < Math.max(initialOrder, finalOrder); ++order) { final Timeline timeline = getTimeline(initialCursor.getEventType(), order); - final long eventsTotal = getTopicRepository(timeline).totalEventsInPartition( - timeline, initialCursor.getPartition()); + final long eventsTotal = totalEventsInPartition(timeline, initialCursor.getPartition()); result += (finalOrder > initialOrder) ? eventsTotal : -eventsTotal; } return result; @@ -221,4 +220,18 @@ private Timeline getTimeline(final String eventTypeName, final int order) { private TopicRepository getTopicRepository(final Timeline timeline) { return timelineService.getTopicRepository(timeline); } + + + // Method can work only with finished timeline (e.g. it will break for active timeline) + private long totalEventsInPartition(final Timeline timeline, final String partitionString) + throws InvalidCursorOperation { + final Timeline.StoragePosition positions = timeline.getLatestPosition(); + + try { + return 1 + ((Timeline.KafkaStoragePosition) positions).getLastOffsetForPartition( + KafkaCursor.toKafkaPartition(partitionString)); + } catch (final IllegalArgumentException ex) { + throw new InvalidCursorOperation(InvalidCursorOperation.Reason.PARTITION_NOT_FOUND); + } + } } diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java index 195aa1c669..fccc0052b2 100644 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java @@ -18,14 +18,12 @@ import org.zalando.nakadi.domain.PaginationLinks; import org.zalando.nakadi.domain.PaginationWrapper; import org.zalando.nakadi.domain.PartitionEndStatistics; -import org.zalando.nakadi.domain.Storage; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionEventTypeStats; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.NoSuchSubscriptionException; import org.zalando.nakadi.exceptions.ServiceUnavailableException; -import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.plugin.api.ApplicationService; import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.repository.TopicRepository; @@ -51,7 +49,6 @@ import javax.ws.rs.core.Response; import java.util.Collections; -import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -77,7 +74,6 @@ import static org.springframework.test.web.servlet.setup.MockMvcBuilders.standaloneSetup; import static org.zalando.nakadi.util.SubscriptionsUriHelper.createSubscriptionListUri; import static org.zalando.nakadi.utils.RandomSubscriptionBuilder.builder; -import static org.zalando.nakadi.utils.TestUtils.buildTimeline; import static org.zalando.nakadi.utils.TestUtils.buildTimelineWithTopic; import static org.zalando.nakadi.utils.TestUtils.createRandomSubscriptions; import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs; @@ -262,37 +258,6 @@ public void whenGetSubscriptionStatThenOk() throws Exception { TestUtils.JSON_TEST_HELPER.matchesObject(new ItemsWrapper<>(expectedStats)))); } - @Test - public void whenGetSubscriptionStatsWithOffsetFromDeletedStorageThenOk() throws Exception { - Storage deletedStorage = new Storage(); - final Timeline TIMELINE_DELETED_STORAGE = buildTimeline(TIMELINE.getEventType(), deletedStorage, "topic", new Date()); - final Subscription subscription = builder().withEventType(TIMELINE_DELETED_STORAGE.getEventType()).build(); - final Partition[] partitions = { - new Partition(TIMELINE.getEventType(), "0", "xz", null, Partition.State.ASSIGNED)}; - final ZkSubscriptionNode zkSubscriptionNode = new ZkSubscriptionNode(); - zkSubscriptionNode.setPartitions(partitions); - zkSubscriptionNode.setSessions(new Session[]{new Session("xz", 0)}); - when(subscriptionRepository.getSubscription(subscription.getId())).thenReturn(subscription); - when(zkSubscriptionClient.getZkSubscriptionNodeLocked()).thenReturn(zkSubscriptionNode); - final SubscriptionCursorWithoutToken currentOffset = - new SubscriptionCursorWithoutToken(TIMELINE.getEventType(), "0", "3"); - when(zkSubscriptionClient.getOffset(new EventTypePartition(TIMELINE.getEventType(), "0"))) - .thenReturn(currentOffset); - when(eventTypeRepository.findByName(TIMELINE.getEventType())) - .thenReturn(EventTypeTestBuilder.builder().name(TIMELINE.getEventType()).build()); - final List statistics = Collections.singletonList( - new KafkaPartitionEndStatistics(TIMELINE, 0, 13)); - when(topicRepository.loadTopicEndStatistics(eq(Collections.singletonList(TIMELINE)))).thenReturn(statistics); - final NakadiCursor currentCursor = mock(NakadiCursor.class); - when(currentCursor.getEventTypePartition()).thenReturn(new EventTypePartition(TIMELINE.getEventType(), "0")); - when(cursorConverter.convert((List) any())) - .thenReturn(Collections.singletonList(currentCursor)); - when(timelineService.getTopicRepository(TIMELINE_DELETED_STORAGE)).thenThrow(new MyNakadiRuntimeException1("Exception")); - - getSubscriptionStats(subscription.getId()) - .andExpect(status().isOk()); - } - @Test @SuppressWarnings("unchecked") public void whenGetSubscriptionNoEventTypesThenStatEmpty() throws Exception { diff --git a/src/test/java/org/zalando/nakadi/utils/TestUtils.java b/src/test/java/org/zalando/nakadi/utils/TestUtils.java index 8f03dd3756..2bea9f2240 100644 --- a/src/test/java/org/zalando/nakadi/utils/TestUtils.java +++ b/src/test/java/org/zalando/nakadi/utils/TestUtils.java @@ -217,10 +217,6 @@ public static Timeline buildTimeline(final String etName, final String topic, fi return new Timeline(etName, 0, new Storage(), topic, createdAt); } - public static Timeline buildTimeline(final String etName, final Storage storage, final String topic, final Date createdAt) { - return new Timeline(etName, 0, storage, topic, createdAt); - } - public static Timeline buildTimelineWithTopic(final String topic) { return new Timeline(randomUUID(), 0, new Storage(), topic, new Date()); }