Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #791 from zalando/ARUHA-1347
Browse files Browse the repository at this point in the history
Aruha 1347 - do not fail when getting subscriptions stats with deleted storage
  • Loading branch information
antban authored Dec 5, 2017
2 parents ab5b42f + 2d19b73 commit 3ff1034
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 34 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Fixed

- Fixed issue with subscription stats, which would fail when the last committed offset is on a storage that was deleted

## [2.3.0] - 2017-11-14

### Added
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.zalando.nakadi.exceptions.runtime;

public class UnknownStorageTypeException extends MyNakadiRuntimeException1 {

public UnknownStorageTypeException(final String message) {
super(message);
}

public UnknownStorageTypeException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ void validateReadCursors(List<NakadiCursor> cursors) throws InvalidCursorExcepti

void validateCommitCursor(NakadiCursor cursor) throws InvalidCursorException;

long totalEventsInPartition(Timeline timeline, String partition);

long numberOfEventsBeforeCursor(NakadiCursor cursor);

void setRetentionTime(String topic, Long retentionMs) throws TopicConfigException;

NakadiCursor createBeforeBeginCursor(Timeline timeline, String partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.TopicCreationException;
import org.zalando.nakadi.exceptions.TopicDeletionException;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation;
import org.zalando.nakadi.exceptions.runtime.TopicConfigException;
import org.zalando.nakadi.exceptions.runtime.TopicRepositoryException;
import org.zalando.nakadi.repository.EventConsumer;
Expand Down Expand Up @@ -457,24 +456,6 @@ public int compareOffsets(final NakadiCursor first, final NakadiCursor second) t
return KafkaCursor.fromNakadiCursor(first).compareTo(KafkaCursor.fromNakadiCursor(second));
}

// Method can work only with finished timeline (e.g. it will break for active timeline)
public long totalEventsInPartition(final Timeline timeline, final String partitionString)
throws InvalidCursorOperation {
final Timeline.StoragePosition positions = timeline.getLatestPosition();

try {
return 1 + ((Timeline.KafkaStoragePosition) positions).getLastOffsetForPartition(
KafkaCursor.toKafkaPartition(partitionString));
} catch (final IllegalArgumentException ex) {
throw new InvalidCursorOperation(InvalidCursorOperation.Reason.PARTITION_NOT_FOUND);
}
}

public long numberOfEventsBeforeCursor(final NakadiCursor cursor) {
// could be -1 in case the cursor points to BEGIN
return KafkaCursor.toKafkaOffset(cursor.getOffset());
}

@Override
public NakadiCursor createBeforeBeginCursor(final Timeline timeline, final String partition) {
return new KafkaCursor(timeline.getTopic(), KafkaCursor.toKafkaPartition(partition), -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
import org.zalando.nakadi.domain.PartitionEndStatistics;
import org.zalando.nakadi.domain.PartitionStatistics;
import org.zalando.nakadi.domain.ShiftedNakadiCursor;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.InvalidCursorException;
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
import org.zalando.nakadi.exceptions.runtime.UnknownStorageTypeException;
import org.zalando.nakadi.repository.TopicRepository;
import org.zalando.nakadi.repository.kafka.KafkaCursor;
import org.zalando.nakadi.service.timeline.TimelineService;

import java.util.Collections;
Expand Down Expand Up @@ -55,8 +58,7 @@ public long calculateDistance(final NakadiCursor initialCursor, final NakadiCurs

for (int order = startOrder; order < Math.max(initialOrder, finalOrder); ++order) {
final Timeline timeline = getTimeline(initialCursor.getEventType(), order);
final long eventsTotal = getTopicRepository(timeline).totalEventsInPartition(
timeline, initialCursor.getPartition());
final long eventsTotal = totalEventsInPartition(timeline, initialCursor.getPartition());
result += (finalOrder > initialOrder) ? eventsTotal : -eventsTotal;
}
return result;
Expand Down Expand Up @@ -86,8 +88,7 @@ public List<NakadiCursorLag> cursorsLag(final String eventTypeName, final List<N
.findAny().orElseThrow(() -> new InvalidCursorOperation(PARTITION_NOT_FOUND));
// trick to avoid -1 position - move cursor to previous timeline while there is no data before
// it
while (getTopicRepository(newestPosition.getTimeline())
.numberOfEventsBeforeCursor(newestPosition) == -1) {
while (numberOfEventsBeforeCursor(newestPosition) == -1) {
final int prevOrder = newestPosition.getTimeline().getOrder() - 1;
final Timeline prevTimeline = timelines.stream()
.filter(t -> t.getOrder() == prevOrder)
Expand Down Expand Up @@ -165,8 +166,7 @@ private NakadiCursor moveBack(final ShiftedNakadiCursor cursor) throws InvalidCu
NakadiCursor currentCursor = new NakadiCursor(cursor.getTimeline(), cursor.getPartition(), cursor.getOffset());
long toMoveBack = -cursor.getShift();
while (toMoveBack > 0) {
final long totalBefore = getTopicRepository(currentCursor.getTimeline())
.numberOfEventsBeforeCursor(currentCursor);
final long totalBefore = numberOfEventsBeforeCursor(currentCursor);
if (totalBefore < toMoveBack) {
toMoveBack -= totalBefore + 1; // +1 is because end is inclusive

Expand Down Expand Up @@ -195,8 +195,13 @@ private NakadiCursor moveBack(final ShiftedNakadiCursor cursor) throws InvalidCu
}

private long numberOfEventsBeforeCursor(final NakadiCursor initialCursor) {
final TopicRepository topicRepository = getTopicRepository(initialCursor.getTimeline());
return topicRepository.numberOfEventsBeforeCursor(initialCursor);
final Storage.Type storageType = initialCursor.getTimeline().getStorage().getType();
switch (storageType) {
case KAFKA:
return KafkaCursor.toKafkaOffset(initialCursor.getOffset());
default:
throw new UnknownStorageTypeException("Unknown storage type: " + storageType.toString());
}
}

private Timeline getTimeline(final String eventTypeName, final int order) {
Expand All @@ -215,4 +220,18 @@ private Timeline getTimeline(final String eventTypeName, final int order) {
private TopicRepository getTopicRepository(final Timeline timeline) {
return timelineService.getTopicRepository(timeline);
}


// Method can work only with finished timeline (e.g. it will break for active timeline)
private long totalEventsInPartition(final Timeline timeline, final String partitionString)
throws InvalidCursorOperation {
final Timeline.StoragePosition positions = timeline.getLatestPosition();

try {
return 1 + ((Timeline.KafkaStoragePosition) positions).getLastOffsetForPartition(
KafkaCursor.toKafkaPartition(partitionString));
} catch (final IllegalArgumentException ex) {
throw new InvalidCursorOperation(InvalidCursorOperation.Reason.PARTITION_NOT_FOUND);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class SubscriptionControllerTest {
private final ZkSubscriptionClient zkSubscriptionClient;
private final CursorConverter cursorConverter;
private final CursorOperationsService cursorOperationsService;
private final TimelineService timelineService;
private static final int PARTITIONS_PER_SUBSCRIPTION = 5;
private static final Timeline TIMELINE = buildTimelineWithTopic("topic");

Expand All @@ -102,7 +103,7 @@ public SubscriptionControllerTest() throws Exception {
final SubscriptionClientFactory zkSubscriptionClientFactory = mock(SubscriptionClientFactory.class);
zkSubscriptionClient = mock(ZkSubscriptionClient.class);
when(zkSubscriptionClientFactory.createClient(any(), any())).thenReturn(zkSubscriptionClient);
final TimelineService timelineService = mock(TimelineService.class);
timelineService = mock(TimelineService.class);
when(timelineService.getActiveTimeline(any())).thenReturn(TIMELINE);
when(timelineService.getTopicRepository((EventTypeBase) any())).thenReturn(topicRepository);
when(timelineService.getTopicRepository((Timeline) any())).thenReturn(topicRepository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.zalando.nakadi.config.NakadiSettings;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.domain.ShiftedNakadiCursor;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation;
import org.zalando.nakadi.repository.TopicRepository;
Expand Down Expand Up @@ -304,6 +305,10 @@ private Timeline mockTimeline(final int order, @Nullable final Long latestOffset
final Timeline timeline = mock(Timeline.class);
when(timeline.getOrder()).thenReturn(order);

final Storage storage = new Storage();
storage.setType(Storage.Type.KAFKA);
when(timeline.getStorage()).thenReturn(storage);

if (latestOffset == null) {
when(timeline.isActive()).thenReturn(false);
when(timeline.getLatestPosition()).thenReturn(null);
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/zalando/nakadi/utils/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ public static Timeline buildTimeline(final String etName) {
return new Timeline(etName, 0, new Storage(), randomUUID(), new Date());
}

public static Timeline buildTimeline(final String etName, final String topic, final Date cratedAt) {
return new Timeline(etName, 0, new Storage(), topic, cratedAt);
public static Timeline buildTimeline(final String etName, final String topic, final Date createdAt) {
return new Timeline(etName, 0, new Storage(), topic, createdAt);
}

public static Timeline buildTimelineWithTopic(final String topic) {
Expand Down

0 comments on commit 3ff1034

Please sign in to comment.