Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Moved filter to constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
Marco Lehmann committed Feb 4, 2019
1 parent 6fc19dc commit 613be22
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private StreamParameters(
.filter(timeout -> timeout > 0 && timeout <= EventStreamConfig.MAX_STREAM_TIMEOUT)
.orElse((long) EventStreamConfig.generateDefaultStreamTimeout()));
this.maxUncommittedMessages = userParameters.getMaxUncommittedEvents().orElse(10);
this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit();
this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit().filter(v -> v != 0);
this.partitions = userParameters.getPartitions();
this.consumingClient = consumingClient;

Expand All @@ -78,12 +78,12 @@ public long getMessagesAllowedToSend(final long limit, final long sentSoFar) {
return streamLimitEvents.map(v -> Math.max(0, Math.min(limit, v - sentSoFar))).orElse(limit);
}

public boolean isStreamLimitReached(final long commitedEvents) {
return streamLimitEvents.map(v -> v <= commitedEvents).orElse(false);
public boolean isStreamLimitReached(final long committedEvents) {
return streamLimitEvents.map(v -> v <= committedEvents).orElse(false);
}

public boolean isKeepAliveLimitReached(final IntStream keepAlive) {
return batchKeepAliveIterations.map(it -> keepAlive.allMatch(v -> v >= it) && it > 0).orElse(false);
return batchKeepAliveIterations.map(it -> keepAlive.allMatch(v -> v >= it)).orElse(false);
}

public Client getConsumingClient() {
Expand Down

0 comments on commit 613be22

Please sign in to comment.