diff --git a/src/main/java/com/spotify/confidence/BatchSizeFlushPolicy.java b/src/main/java/com/spotify/confidence/BatchSizeFlushPolicy.java deleted file mode 100644 index 44aac3ea..00000000 --- a/src/main/java/com/spotify/confidence/BatchSizeFlushPolicy.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.spotify.confidence; - -import java.util.concurrent.atomic.AtomicInteger; - -class BatchSizeFlushPolicy implements FlushPolicy { - - private final int size; - - public BatchSizeFlushPolicy(int size) { - this.size = size; - } - - private final AtomicInteger currentCount = new AtomicInteger(0); - - @Override - public void hit() { - currentCount.getAndIncrement(); - } - - @Override - public boolean shouldFlush() { - return currentCount.get() >= size; - } - - @Override - public void reset() { - currentCount.set(0); - } -} diff --git a/src/main/java/com/spotify/confidence/Confidence.java b/src/main/java/com/spotify/confidence/Confidence.java index 76c07662..5c99ffa2 100644 --- a/src/main/java/com/spotify/confidence/Confidence.java +++ b/src/main/java/com/spotify/confidence/Confidence.java @@ -1,7 +1,6 @@ package com.spotify.confidence; import com.google.common.annotations.Beta; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.io.Closer; @@ -10,7 +9,6 @@ import io.grpc.ManagedChannelBuilder; import java.io.Closeable; import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -218,8 +216,8 @@ public Confidence build() { final SystemClock clock = new SystemClock(); final GrpcEventUploader uploader = new GrpcEventUploader(clientSecret, clock, DEFAULT_CHANNEL); - final List flushPolicies = ImmutableList.of(new BatchSizeFlushPolicy(5)); - final EventSenderEngine engine = new EventSenderEngineImpl(flushPolicies, uploader, clock); + final var maxBatchSize = 5; + final EventSenderEngine engine = new EventSenderEngineImpl(maxBatchSize, uploader, clock); return Confidence.create(engine, flagResolverClient); } } diff --git a/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java b/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java index 16869adb..b8855201 100644 --- a/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java +++ b/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java @@ -14,7 +14,7 @@ class EventSenderEngineImpl implements EventSenderEngine { private final BlockingQueue shutdownQueue = new LinkedBlockingQueue<>(1); private final EventSenderStorage eventStorage = new InMemoryStorage(); private final EventUploader eventUploader; - private final List flushPolicies; + private final int maxBatchSize; private final Clock clock; private static final String UPLOAD_SIG = "UPLOAD"; private static final String SHUTDOWN_UPLOAD = "SHUTDOWN_UPLOAD"; @@ -22,9 +22,8 @@ class EventSenderEngineImpl implements EventSenderEngine { private static final String SHUTDOWN_WRITE_COMPLETED = "SHUTDOWN_WRITE_COMPLETED"; private volatile boolean isStopped = false; - EventSenderEngineImpl( - List flushPolicyList, EventUploader eventUploader, Clock clock) { - this.flushPolicies = flushPolicyList; + EventSenderEngineImpl(int maxBatchSize, EventUploader eventUploader, Clock clock) { + this.maxBatchSize = maxBatchSize; this.eventUploader = eventUploader; this.clock = clock; writeThread.submit(new WritePoller()); @@ -43,13 +42,10 @@ public void run() { Thread.yield(); continue; } - eventStorage.write(event); - flushPolicies.forEach(FlushPolicy::hit); - - if (flushPolicies.stream().anyMatch(FlushPolicy::shouldFlush)) { + final var numberOfPendingEvents = eventStorage.write(event); + if (numberOfPendingEvents >= maxBatchSize) { eventStorage.createBatch(); uploadQueue.add(UPLOAD_SIG); - flushPolicies.forEach(FlushPolicy::reset); } } } diff --git a/src/main/java/com/spotify/confidence/EventSenderStorage.java b/src/main/java/com/spotify/confidence/EventSenderStorage.java index d00bcc8c..eaf42d52 100644 --- a/src/main/java/com/spotify/confidence/EventSenderStorage.java +++ b/src/main/java/com/spotify/confidence/EventSenderStorage.java @@ -3,7 +3,7 @@ import java.util.List; interface EventSenderStorage { - void write(Event event); + int write(Event event); void createBatch(); diff --git a/src/main/java/com/spotify/confidence/FlushPolicy.java b/src/main/java/com/spotify/confidence/FlushPolicy.java deleted file mode 100644 index c47273b8..00000000 --- a/src/main/java/com/spotify/confidence/FlushPolicy.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.spotify.confidence; - -interface FlushPolicy { - void hit(); - - boolean shouldFlush(); - - void reset(); -} diff --git a/src/main/java/com/spotify/confidence/InMemoryStorage.java b/src/main/java/com/spotify/confidence/InMemoryStorage.java index 485e3596..86c6f9a6 100644 --- a/src/main/java/com/spotify/confidence/InMemoryStorage.java +++ b/src/main/java/com/spotify/confidence/InMemoryStorage.java @@ -8,8 +8,9 @@ class InMemoryStorage implements EventSenderStorage { private final List batches = new ArrayList<>(); @Override - public synchronized void write(Event event) { + public synchronized int write(Event event) { this.events.add(event); + return this.events.size(); } public synchronized void createBatch() { diff --git a/src/test/java/com/spotify/confidence/ConfidenceEventSenderIntegrationTest.java b/src/test/java/com/spotify/confidence/ConfidenceEventSenderIntegrationTest.java index c8c792f5..be5960c4 100644 --- a/src/test/java/com/spotify/confidence/ConfidenceEventSenderIntegrationTest.java +++ b/src/test/java/com/spotify/confidence/ConfidenceEventSenderIntegrationTest.java @@ -1,6 +1,5 @@ package com.spotify.confidence; -import static com.spotify.confidence.EventSenderTestUtils.getFlushPolicies; import static org.assertj.core.api.Assertions.assertThat; import com.google.common.collect.ImmutableMap; @@ -23,10 +22,10 @@ public class ConfidenceEventSenderIntegrationTest { @Test public void testEngineUploads() throws IOException { final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of()); - final int batchSize = 6; + final int maxBatchSize = 6; final int numEvents = 14; final EventSenderEngine engine = - new EventSenderEngineImpl(getFlushPolicies(10000, batchSize), alwaysSucceedUploader, clock); + new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock); final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient); int size = 0; while (size++ < numEvents) { @@ -35,27 +34,27 @@ public void testEngineUploads() throws IOException { } confidence.close(); // Should trigger the upload of an additional incomplete batch - final int additionalBatch = (numEvents % batchSize) > 0 ? 1 : 0; + final int additionalBatch = (numEvents % maxBatchSize) > 0 ? 1 : 0; assertThat(alwaysSucceedUploader.uploadCalls.size()) - .isEqualTo((numEvents / batchSize + additionalBatch)); + .isEqualTo((numEvents / maxBatchSize + additionalBatch)); final List fullEventBatches = alwaysSucceedUploader.uploadCalls.subList(0, alwaysSucceedUploader.uploadCalls.size() - 1); - assertThat(fullEventBatches.stream().allMatch(batch -> batch.events().size() == batchSize)) + assertThat(fullEventBatches.stream().allMatch(batch -> batch.events().size() == maxBatchSize)) .isTrue(); if (additionalBatch != 0) { final EventBatch lastBatch = alwaysSucceedUploader.uploadCalls.get(alwaysSucceedUploader.uploadCalls.size() - 1); - assertThat(lastBatch.events().size()).isEqualTo(numEvents % batchSize); + assertThat(lastBatch.events().size()).isEqualTo(numEvents % maxBatchSize); } } @Test public void testEngineCloseSuccessfullyWithoutEventsQueued() throws IOException { final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of()); - final int batchSize = 6; + final int maxBatchSize = 6; final EventSenderEngine engine = - new EventSenderEngineImpl(getFlushPolicies(10000, batchSize), alwaysSucceedUploader, clock); + new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock); final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient); confidence.close(); // Should trigger the upload of an additional incomplete batch @@ -64,13 +63,12 @@ public void testEngineCloseSuccessfullyWithoutEventsQueued() throws IOException @Test public void testEngineUploadsWhenIntermittentErrorWillRetry() throws IOException { - final int batchSize = 3; + final int maxBatchSize = 3; final int numEvents = 14; // This will fail at the 2nd and 5th upload final List failAtUploadWithIndex = List.of(2, 5); final FakeUploader fakeUploader = new FakeUploader(failAtUploadWithIndex); - final EventSenderEngine engine = - new EventSenderEngineImpl(getFlushPolicies(10000, batchSize), fakeUploader, clock); + final EventSenderEngine engine = new EventSenderEngineImpl(maxBatchSize, fakeUploader, clock); final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient); int size = 0; while (size++ < numEvents) { @@ -80,14 +78,14 @@ public void testEngineUploadsWhenIntermittentErrorWillRetry() throws IOException } confidence.close(); // Should trigger the upload of an additional incomplete batch - final int additionalBatch = (numEvents % batchSize) > 0 ? 1 : 0; + final int additionalBatch = (numEvents % maxBatchSize) > 0 ? 1 : 0; // Verify we had the correct number of calls to the uploader (including retries) assertThat(fakeUploader.uploadCalls.size()) - .isEqualTo((numEvents / batchSize + additionalBatch) + failAtUploadWithIndex.size()); + .isEqualTo((numEvents / maxBatchSize + additionalBatch) + failAtUploadWithIndex.size()); // Verify we had the correct number of unique calls to the uploader assertThat(Set.copyOf(fakeUploader.uploadCalls).size()) - .isEqualTo((numEvents / batchSize + additionalBatch)); + .isEqualTo((numEvents / maxBatchSize + additionalBatch)); } @Test @@ -95,11 +93,11 @@ public void multiThreadTest() throws IOException { final int numberOfThreads = 50; final int numberOfEvents = 100000; final int eventsPerThread = numberOfEvents / numberOfThreads; - final int batchSize = 30; + final int maxBatchSize = 30; final FakeUploader alwaysSucceedUploader = new FakeUploader(); final EventSenderEngine engine = - new EventSenderEngineImpl(getFlushPolicies(10000, batchSize), alwaysSucceedUploader, clock); + new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock); final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient); final List> futures = new ArrayList<>(); final ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); @@ -126,8 +124,8 @@ public void multiThreadTest() throws IOException { } }); confidence.close(); - final int additionalBatch = (numberOfEvents % batchSize) > 0 ? 1 : 0; - final int expectedNumberOfBatches = (numberOfEvents / batchSize) + additionalBatch; + final int additionalBatch = (numberOfEvents % maxBatchSize) > 0 ? 1 : 0; + final int expectedNumberOfBatches = (numberOfEvents / maxBatchSize) + additionalBatch; assertThat(alwaysSucceedUploader.uploadCalls.size()).isEqualTo(expectedNumberOfBatches); } } diff --git a/src/test/java/com/spotify/confidence/EventSenderTestUtils.java b/src/test/java/com/spotify/confidence/EventSenderTestUtils.java deleted file mode 100644 index 4496a214..00000000 --- a/src/test/java/com/spotify/confidence/EventSenderTestUtils.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.spotify.confidence; - -import java.util.ArrayList; -import java.util.List; - -public final class EventSenderTestUtils { - - public EventSenderTestUtils() {} - - static List getFlushPolicies(int minInterval, int minSize) { - final List flushPolicyList = new ArrayList<>(); - final FlushPolicy sizeFlushPolicy = new BatchSizeFlushPolicy(minSize); - - final FlushPolicy intervalFlushPolicy = - new FlushPolicy() { - - long lastFlush = System.currentTimeMillis(); - - @Override - public void hit() {} - - @Override - public boolean shouldFlush() { - final long currentTime = System.currentTimeMillis(); - return currentTime - lastFlush > minInterval; - } - - @Override - public void reset() { - lastFlush = System.currentTimeMillis(); - } - }; - flushPolicyList.add(sizeFlushPolicy); - flushPolicyList.add(intervalFlushPolicy); - return flushPolicyList; - } -}