Skip to content

Commit

Permalink
fix: eventsender engine uploads on a cadence (#112)
Browse files Browse the repository at this point in the history
* fix: eventsender engine uploads on a cadence

* refactor: extract to constant
  • Loading branch information
nicklasl authored Apr 9, 2024
1 parent 774e671 commit 1c895d0
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 9 deletions.
4 changes: 3 additions & 1 deletion src/main/java/com/spotify/confidence/Confidence.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
@Beta
public abstract class Confidence implements EventSender, Closeable {

private static final int FLUSH_TIMEOUT_MILLISECONDS = 500;
protected Map<String, ConfidenceValue> context = Maps.newHashMap();

private Confidence() {}
Expand Down Expand Up @@ -217,7 +218,8 @@ public Confidence build() {
final GrpcEventUploader uploader =
new GrpcEventUploader(clientSecret, clock, DEFAULT_CHANNEL);
final var maxBatchSize = 5;
final EventSenderEngine engine = new EventSenderEngineImpl(maxBatchSize, uploader, clock);
final EventSenderEngine engine =
new EventSenderEngineImpl(maxBatchSize, uploader, clock, FLUSH_TIMEOUT_MILLISECONDS);
return Confidence.create(engine, flagResolverClient);
}
}
Expand Down
36 changes: 33 additions & 3 deletions src/main/java/com/spotify/confidence/EventSenderEngineImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
import org.slf4j.Logger;

class EventSenderEngineImpl implements EventSenderEngine {
static final String EVENT_NAME_PREFIX = "eventDefinitions/";
Expand All @@ -20,12 +21,19 @@ class EventSenderEngineImpl implements EventSenderEngine {
private static final String SHUTDOWN_UPLOAD = "SHUTDOWN_UPLOAD";
private static final String SHUTDOWN_UPLOAD_COMPLETED = "SHUTDOWN_UPLOAD_COMPLETED";
private static final String SHUTDOWN_WRITE_COMPLETED = "SHUTDOWN_WRITE_COMPLETED";
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private final int flushTimeoutMilliseconds;
private ScheduledFuture<?> pendingFlush;
private volatile boolean isStopped = false;

EventSenderEngineImpl(int maxBatchSize, EventUploader eventUploader, Clock clock) {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(EventSenderEngineImpl.class);

EventSenderEngineImpl(
int maxBatchSize, EventUploader eventUploader, Clock clock, int flushTimeoutMilliseconds) {
this.maxBatchSize = maxBatchSize;
this.eventUploader = eventUploader;
this.clock = clock;
this.flushTimeoutMilliseconds = flushTimeoutMilliseconds;
writeThread.submit(new WritePoller());
uploadThread.submit(new UploadPoller());
}
Expand All @@ -44,13 +52,34 @@ public void run() {
}
final var numberOfPendingEvents = eventStorage.write(event);
if (numberOfPendingEvents >= maxBatchSize) {
eventStorage.createBatch();
uploadQueue.add(UPLOAD_SIG);
flush();
} else {
scheduleFlush();
}
}
}
}

private void scheduleFlush() {
// Cancel the existing scheduled task if it exists
if (pendingFlush != null && !pendingFlush.isDone()) {
pendingFlush.cancel(false);
}
if (flushTimeoutMilliseconds > 0) {
pendingFlush =
executorService.schedule(this::flush, flushTimeoutMilliseconds, TimeUnit.MILLISECONDS);
}
}

private void flush() {
// Cancel the existing scheduled task if it exists
if (pendingFlush != null && !pendingFlush.isDone()) {
pendingFlush.cancel(false);
}
eventStorage.createBatch();
uploadQueue.add(UPLOAD_SIG);
}

class UploadPoller implements Runnable {
@Override
public void run() {
Expand Down Expand Up @@ -78,6 +107,7 @@ public void run() {
public void send(
String name, ConfidenceValue.Struct context, Optional<ConfidenceValue.Struct> message) {
if (!isStopped) {
log.trace("Sending event: {}", name);
writeQueue.add(
new Event(
EVENT_NAME_PREFIX + name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void testEngineUploads() throws IOException {
final int maxBatchSize = 6;
final int numEvents = 14;
final EventSenderEngine engine =
new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock);
new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock, 500);
final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient);
int size = 0;
while (size++ < numEvents) {
Expand Down Expand Up @@ -54,21 +54,44 @@ public void testEngineCloseSuccessfullyWithoutEventsQueued() throws IOException
final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
final int maxBatchSize = 6;
final EventSenderEngine engine =
new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock);
new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock, 500);
final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient);

confidence.close(); // Should trigger the upload of an additional incomplete batch
assertThat(alwaysSucceedUploader.uploadCalls.size()).isEqualTo(0);
}

@Test
public void testEngineUploadsTriggeredByFlushTimeout() throws IOException, InterruptedException {
final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
final int maxBatchSize = 6;
final EventSenderEngine engine =
new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock, 100);
final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient);

// send only one event
confidence.send(
"navigate", ConfidenceValue.of(ImmutableMap.of("key", ConfidenceValue.of("size"))));

// wait for the flush timeout to trigger the upload
Thread.sleep(200);
// assert
assertThat(alwaysSucceedUploader.uploadCalls.size()).isEqualTo(1);
assertThat(alwaysSucceedUploader.uploadCalls.get(0).events().size()).isEqualTo(1);

// close
confidence.close();
}

@Test
public void testEngineUploadsWhenIntermittentErrorWillRetry() throws IOException {
final int maxBatchSize = 3;
final int numEvents = 14;
// This will fail at the 2nd and 5th upload
final List<Integer> failAtUploadWithIndex = List.of(2, 5);
final FakeUploader fakeUploader = new FakeUploader(failAtUploadWithIndex);
final EventSenderEngine engine = new EventSenderEngineImpl(maxBatchSize, fakeUploader, clock);
final EventSenderEngine engine =
new EventSenderEngineImpl(maxBatchSize, fakeUploader, clock, 500);
final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient);
int size = 0;
while (size++ < numEvents) {
Expand Down Expand Up @@ -97,7 +120,7 @@ public void multiThreadTest() throws IOException {

final FakeUploader alwaysSucceedUploader = new FakeUploader();
final EventSenderEngine engine =
new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock);
new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock, 500);
final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient);
final List<Future<Boolean>> futures = new ArrayList<>();
final ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</encoder>
</appender>

<root level="debug">
<root level="trace">
<appender-ref ref="STDOUT" />
</root>
</configuration>

0 comments on commit 1c895d0

Please sign in to comment.