From 1c895d0757205090b7967a4717d6cc4ccc968c72 Mon Sep 17 00:00:00 2001 From: Nicklas Lundin Date: Tue, 9 Apr 2024 06:35:18 +0200 Subject: [PATCH] fix: eventsender engine uploads on a cadence (#112) * fix: eventsender engine uploads on a cadence * refactor: extract to constant --- .../com/spotify/confidence/Confidence.java | 4 ++- .../confidence/EventSenderEngineImpl.java | 36 +++++++++++++++++-- .../ConfidenceEventSenderIntegrationTest.java | 31 +++++++++++++--- src/test/resources/logback-test.xml | 2 +- 4 files changed, 64 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/spotify/confidence/Confidence.java b/src/main/java/com/spotify/confidence/Confidence.java index 5c99ffa2..7a5399f7 100644 --- a/src/main/java/com/spotify/confidence/Confidence.java +++ b/src/main/java/com/spotify/confidence/Confidence.java @@ -21,6 +21,7 @@ @Beta public abstract class Confidence implements EventSender, Closeable { + private static final int FLUSH_TIMEOUT_MILLISECONDS = 500; protected Map context = Maps.newHashMap(); private Confidence() {} @@ -217,7 +218,8 @@ public Confidence build() { final GrpcEventUploader uploader = new GrpcEventUploader(clientSecret, clock, DEFAULT_CHANNEL); final var maxBatchSize = 5; - final EventSenderEngine engine = new EventSenderEngineImpl(maxBatchSize, uploader, clock); + final EventSenderEngine engine = + new EventSenderEngineImpl(maxBatchSize, uploader, clock, FLUSH_TIMEOUT_MILLISECONDS); return Confidence.create(engine, flagResolverClient); } } diff --git a/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java b/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java index b8855201..181a4e5a 100644 --- a/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java +++ b/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.*; +import org.slf4j.Logger; class EventSenderEngineImpl implements EventSenderEngine { static final String EVENT_NAME_PREFIX = "eventDefinitions/"; @@ -20,12 +21,19 @@ class EventSenderEngineImpl implements EventSenderEngine { private static final String SHUTDOWN_UPLOAD = "SHUTDOWN_UPLOAD"; private static final String SHUTDOWN_UPLOAD_COMPLETED = "SHUTDOWN_UPLOAD_COMPLETED"; private static final String SHUTDOWN_WRITE_COMPLETED = "SHUTDOWN_WRITE_COMPLETED"; + private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + private final int flushTimeoutMilliseconds; + private ScheduledFuture pendingFlush; private volatile boolean isStopped = false; - EventSenderEngineImpl(int maxBatchSize, EventUploader eventUploader, Clock clock) { + private static final Logger log = org.slf4j.LoggerFactory.getLogger(EventSenderEngineImpl.class); + + EventSenderEngineImpl( + int maxBatchSize, EventUploader eventUploader, Clock clock, int flushTimeoutMilliseconds) { this.maxBatchSize = maxBatchSize; this.eventUploader = eventUploader; this.clock = clock; + this.flushTimeoutMilliseconds = flushTimeoutMilliseconds; writeThread.submit(new WritePoller()); uploadThread.submit(new UploadPoller()); } @@ -44,13 +52,34 @@ public void run() { } final var numberOfPendingEvents = eventStorage.write(event); if (numberOfPendingEvents >= maxBatchSize) { - eventStorage.createBatch(); - uploadQueue.add(UPLOAD_SIG); + flush(); + } else { + scheduleFlush(); } } } } + private void scheduleFlush() { + // Cancel the existing scheduled task if it exists + if (pendingFlush != null && !pendingFlush.isDone()) { + pendingFlush.cancel(false); + } + if (flushTimeoutMilliseconds > 0) { + pendingFlush = + executorService.schedule(this::flush, flushTimeoutMilliseconds, TimeUnit.MILLISECONDS); + } + } + + private void flush() { + // Cancel the existing scheduled task if it exists + if (pendingFlush != null && !pendingFlush.isDone()) { + pendingFlush.cancel(false); + } + eventStorage.createBatch(); + uploadQueue.add(UPLOAD_SIG); + } + class UploadPoller implements Runnable { @Override public void run() { @@ -78,6 +107,7 @@ public void run() { public void send( String name, ConfidenceValue.Struct context, Optional message) { if (!isStopped) { + log.trace("Sending event: {}", name); writeQueue.add( new Event( EVENT_NAME_PREFIX + name, diff --git a/src/test/java/com/spotify/confidence/ConfidenceEventSenderIntegrationTest.java b/src/test/java/com/spotify/confidence/ConfidenceEventSenderIntegrationTest.java index be5960c4..b88bc4b2 100644 --- a/src/test/java/com/spotify/confidence/ConfidenceEventSenderIntegrationTest.java +++ b/src/test/java/com/spotify/confidence/ConfidenceEventSenderIntegrationTest.java @@ -25,7 +25,7 @@ public void testEngineUploads() throws IOException { final int maxBatchSize = 6; final int numEvents = 14; final EventSenderEngine engine = - new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock); + new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock, 500); final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient); int size = 0; while (size++ < numEvents) { @@ -54,13 +54,35 @@ public void testEngineCloseSuccessfullyWithoutEventsQueued() throws IOException final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of()); final int maxBatchSize = 6; final EventSenderEngine engine = - new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock); + new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock, 500); final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient); confidence.close(); // Should trigger the upload of an additional incomplete batch assertThat(alwaysSucceedUploader.uploadCalls.size()).isEqualTo(0); } + @Test + public void testEngineUploadsTriggeredByFlushTimeout() throws IOException, InterruptedException { + final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of()); + final int maxBatchSize = 6; + final EventSenderEngine engine = + new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock, 100); + final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient); + + // send only one event + confidence.send( + "navigate", ConfidenceValue.of(ImmutableMap.of("key", ConfidenceValue.of("size")))); + + // wait for the flush timeout to trigger the upload + Thread.sleep(200); + // assert + assertThat(alwaysSucceedUploader.uploadCalls.size()).isEqualTo(1); + assertThat(alwaysSucceedUploader.uploadCalls.get(0).events().size()).isEqualTo(1); + + // close + confidence.close(); + } + @Test public void testEngineUploadsWhenIntermittentErrorWillRetry() throws IOException { final int maxBatchSize = 3; @@ -68,7 +90,8 @@ public void testEngineUploadsWhenIntermittentErrorWillRetry() throws IOException // This will fail at the 2nd and 5th upload final List failAtUploadWithIndex = List.of(2, 5); final FakeUploader fakeUploader = new FakeUploader(failAtUploadWithIndex); - final EventSenderEngine engine = new EventSenderEngineImpl(maxBatchSize, fakeUploader, clock); + final EventSenderEngine engine = + new EventSenderEngineImpl(maxBatchSize, fakeUploader, clock, 500); final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient); int size = 0; while (size++ < numEvents) { @@ -97,7 +120,7 @@ public void multiThreadTest() throws IOException { final FakeUploader alwaysSucceedUploader = new FakeUploader(); final EventSenderEngine engine = - new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock); + new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock, 500); final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient); final List> futures = new ArrayList<>(); final ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 6eea4306..d870ae88 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -5,7 +5,7 @@ - + \ No newline at end of file