Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
ARUHA-1347 Move static method
Browse files Browse the repository at this point in the history
  • Loading branch information
Lionel Montrieux committed Nov 30, 2017
1 parent 8018280 commit 2d19b73
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,12 +388,6 @@ public void whenStatsOnNotInitializedSubscriptionThanCorrectResponse() throws IO
}
}

@Test
public void whenStatsWithOffsetInDeletedStorageThenDoNotFail() throws IOException {
final String et = createEventType().getName();
final Subscription s = createSubscriptionForEventType(et);
}

private Response commitCursors(final Subscription subscription, final String cursor, final String streamId) {
return given()
.body(cursor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ void validateReadCursors(List<NakadiCursor> cursors) throws InvalidCursorExcepti

void validateCommitCursor(NakadiCursor cursor) throws InvalidCursorException;

long totalEventsInPartition(Timeline timeline, String partition);

void setRetentionTime(String topic, Long retentionMs) throws TopicConfigException;

NakadiCursor createBeforeBeginCursor(Timeline timeline, String partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.TopicCreationException;
import org.zalando.nakadi.exceptions.TopicDeletionException;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation;
import org.zalando.nakadi.exceptions.runtime.TopicConfigException;
import org.zalando.nakadi.exceptions.runtime.TopicRepositoryException;
import org.zalando.nakadi.repository.EventConsumer;
Expand Down Expand Up @@ -457,19 +456,6 @@ public int compareOffsets(final NakadiCursor first, final NakadiCursor second) t
return KafkaCursor.fromNakadiCursor(first).compareTo(KafkaCursor.fromNakadiCursor(second));
}

// Method can work only with finished timeline (e.g. it will break for active timeline)
public long totalEventsInPartition(final Timeline timeline, final String partitionString)
throws InvalidCursorOperation {
final Timeline.StoragePosition positions = timeline.getLatestPosition();

try {
return 1 + ((Timeline.KafkaStoragePosition) positions).getLastOffsetForPartition(
KafkaCursor.toKafkaPartition(partitionString));
} catch (final IllegalArgumentException ex) {
throw new InvalidCursorOperation(InvalidCursorOperation.Reason.PARTITION_NOT_FOUND);
}
}

@Override
public NakadiCursor createBeforeBeginCursor(final Timeline timeline, final String partition) {
return new KafkaCursor(timeline.getTopic(), KafkaCursor.toKafkaPartition(partition), -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public long calculateDistance(final NakadiCursor initialCursor, final NakadiCurs

for (int order = startOrder; order < Math.max(initialOrder, finalOrder); ++order) {
final Timeline timeline = getTimeline(initialCursor.getEventType(), order);
final long eventsTotal = getTopicRepository(timeline).totalEventsInPartition(
timeline, initialCursor.getPartition());
final long eventsTotal = totalEventsInPartition(timeline, initialCursor.getPartition());
result += (finalOrder > initialOrder) ? eventsTotal : -eventsTotal;
}
return result;
Expand Down Expand Up @@ -221,4 +220,18 @@ private Timeline getTimeline(final String eventTypeName, final int order) {
private TopicRepository getTopicRepository(final Timeline timeline) {
return timelineService.getTopicRepository(timeline);
}


// Method can work only with finished timeline (e.g. it will break for active timeline)
private long totalEventsInPartition(final Timeline timeline, final String partitionString)
throws InvalidCursorOperation {
final Timeline.StoragePosition positions = timeline.getLatestPosition();

try {
return 1 + ((Timeline.KafkaStoragePosition) positions).getLastOffsetForPartition(
KafkaCursor.toKafkaPartition(partitionString));
} catch (final IllegalArgumentException ex) {
throw new InvalidCursorOperation(InvalidCursorOperation.Reason.PARTITION_NOT_FOUND);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
import org.zalando.nakadi.domain.PaginationLinks;
import org.zalando.nakadi.domain.PaginationWrapper;
import org.zalando.nakadi.domain.PartitionEndStatistics;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionEventTypeStats;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.NoSuchSubscriptionException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
import org.zalando.nakadi.plugin.api.ApplicationService;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.TopicRepository;
Expand All @@ -51,7 +49,6 @@

import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand All @@ -77,7 +74,6 @@
import static org.springframework.test.web.servlet.setup.MockMvcBuilders.standaloneSetup;
import static org.zalando.nakadi.util.SubscriptionsUriHelper.createSubscriptionListUri;
import static org.zalando.nakadi.utils.RandomSubscriptionBuilder.builder;
import static org.zalando.nakadi.utils.TestUtils.buildTimeline;
import static org.zalando.nakadi.utils.TestUtils.buildTimelineWithTopic;
import static org.zalando.nakadi.utils.TestUtils.createRandomSubscriptions;
import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs;
Expand Down Expand Up @@ -262,37 +258,6 @@ public void whenGetSubscriptionStatThenOk() throws Exception {
TestUtils.JSON_TEST_HELPER.matchesObject(new ItemsWrapper<>(expectedStats))));
}

@Test
public void whenGetSubscriptionStatsWithOffsetFromDeletedStorageThenOk() throws Exception {
Storage deletedStorage = new Storage();
final Timeline TIMELINE_DELETED_STORAGE = buildTimeline(TIMELINE.getEventType(), deletedStorage, "topic", new Date());
final Subscription subscription = builder().withEventType(TIMELINE_DELETED_STORAGE.getEventType()).build();
final Partition[] partitions = {
new Partition(TIMELINE.getEventType(), "0", "xz", null, Partition.State.ASSIGNED)};
final ZkSubscriptionNode zkSubscriptionNode = new ZkSubscriptionNode();
zkSubscriptionNode.setPartitions(partitions);
zkSubscriptionNode.setSessions(new Session[]{new Session("xz", 0)});
when(subscriptionRepository.getSubscription(subscription.getId())).thenReturn(subscription);
when(zkSubscriptionClient.getZkSubscriptionNodeLocked()).thenReturn(zkSubscriptionNode);
final SubscriptionCursorWithoutToken currentOffset =
new SubscriptionCursorWithoutToken(TIMELINE.getEventType(), "0", "3");
when(zkSubscriptionClient.getOffset(new EventTypePartition(TIMELINE.getEventType(), "0")))
.thenReturn(currentOffset);
when(eventTypeRepository.findByName(TIMELINE.getEventType()))
.thenReturn(EventTypeTestBuilder.builder().name(TIMELINE.getEventType()).build());
final List<PartitionEndStatistics> statistics = Collections.singletonList(
new KafkaPartitionEndStatistics(TIMELINE, 0, 13));
when(topicRepository.loadTopicEndStatistics(eq(Collections.singletonList(TIMELINE)))).thenReturn(statistics);
final NakadiCursor currentCursor = mock(NakadiCursor.class);
when(currentCursor.getEventTypePartition()).thenReturn(new EventTypePartition(TIMELINE.getEventType(), "0"));
when(cursorConverter.convert((List<SubscriptionCursorWithoutToken>) any()))
.thenReturn(Collections.singletonList(currentCursor));
when(timelineService.getTopicRepository(TIMELINE_DELETED_STORAGE)).thenThrow(new MyNakadiRuntimeException1("Exception"));

getSubscriptionStats(subscription.getId())
.andExpect(status().isOk());
}

@Test
@SuppressWarnings("unchecked")
public void whenGetSubscriptionNoEventTypesThenStatEmpty() throws Exception {
Expand Down
4 changes: 0 additions & 4 deletions src/test/java/org/zalando/nakadi/utils/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,6 @@ public static Timeline buildTimeline(final String etName, final String topic, fi
return new Timeline(etName, 0, new Storage(), topic, createdAt);
}

public static Timeline buildTimeline(final String etName, final Storage storage, final String topic, final Date createdAt) {
return new Timeline(etName, 0, storage, topic, createdAt);
}

public static Timeline buildTimelineWithTopic(final String topic) {
return new Timeline(randomUUID(), 0, new Storage(), topic, new Date());
}
Expand Down

0 comments on commit 2d19b73

Please sign in to comment.