Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1017 from m99coder/indefinitly-sent-keep-alives
Browse files Browse the repository at this point in the history
Fixed check if keep alive limit is reached for indefinitely configured limit
  • Loading branch information
v-stepanov authored Feb 4, 2019
2 parents 23a2f29 + 613be22 commit a6f17e6
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private StreamParameters(
.filter(timeout -> timeout > 0 && timeout <= EventStreamConfig.MAX_STREAM_TIMEOUT)
.orElse((long) EventStreamConfig.generateDefaultStreamTimeout()));
this.maxUncommittedMessages = userParameters.getMaxUncommittedEvents().orElse(10);
this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit();
this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit().filter(v -> v != 0);
this.partitions = userParameters.getPartitions();
this.consumingClient = consumingClient;

Expand All @@ -78,8 +78,8 @@ public long getMessagesAllowedToSend(final long limit, final long sentSoFar) {
return streamLimitEvents.map(v -> Math.max(0, Math.min(limit, v - sentSoFar))).orElse(limit);
}

public boolean isStreamLimitReached(final long commitedEvents) {
return streamLimitEvents.map(v -> v <= commitedEvents).orElse(false);
public boolean isStreamLimitReached(final long committedEvents) {
return streamLimitEvents.map(v -> v <= committedEvents).orElse(false);
}

public boolean isKeepAliveLimitReached(final IntStream keepAlive) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ public void checkIsKeepAliveLimitReached() throws Exception {
assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 4, 12)), is(false));
}

@Test
public void checkIsKeepAliveLimitReachedIndefinitely() throws Exception {
final StreamParameters streamParameters = createStreamParameters(1, null, 0, null, 0, 0, 0, mock(Client.class));

assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 6, 12)), is(false));
assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 4, 12)), is(false));
}

@Test
public void checkGetMessagesAllowedToSend() throws Exception {
final StreamParameters streamParameters = createStreamParameters(1, 200L, 0, null, null, 0, 0,
Expand Down

0 comments on commit a6f17e6

Please sign in to comment.