Skip to content

Commit

Permalink
Fix shared poller's buffer (#556)
Browse files Browse the repository at this point in the history
Signed-off-by: Steven Sheehy <[email protected]>
  • Loading branch information
steven-sheehy authored Feb 25, 2020
1 parent 7483e89 commit bdb6cb2
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ public void init() {
.doOnComplete(() -> log.info("Completed polling"))
.doOnNext(context::onNext)
.doOnSubscribe(context::onStart)
.replay(listenerProperties.getBufferSize())
.autoConnect(0, disposable -> pollerDisposable = disposable);
.cache(listenerProperties.getBufferSize());

pollerDisposable = poller.subscribe();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,26 @@ void multipleSubscribers() {
.thenCancel()
.verify(Duration.ofMillis(100));
}

@Test
void bufferFilled() {
int bufferSize = listenerProperties.getBufferSize();
listenerProperties.setBufferSize(2);
topicListener.init();

TopicMessageFilter filter = TopicMessageFilter.builder()
.startTime(Instant.EPOCH)
.build();

getTopicListener().listen(filter)
.map(TopicMessage::getSequenceNumber)
.as(StepVerifier::create)
.thenAwait(Duration.ofMillis(50))
.then(() -> domainBuilder.topicMessages(5).blockLast())
.expectNext(1L, 2L, 3L, 4L, 5L)
.thenCancel()
.verify(Duration.ofMillis(500));

listenerProperties.setBufferSize(bufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -232,9 +231,9 @@ void timeout() {
pollingTopicMessageRetriever.retrieve(filter)
.map(TopicMessage::getSequenceNumber)
.as(StepVerifier::create)
.expectNext(1L)
.expectError(TimeoutException.class)
.verify(Duration.ofMillis(500));
.thenConsumeWhile(i -> true)
.expectTimeout(Duration.ofMillis(500))
.verify();

retrieverProperties.setMaxPageSize(maxPageSize);
retrieverProperties.setTimeout(timeout);
Expand Down

0 comments on commit bdb6cb2

Please sign in to comment.