Skip to content

Commit

Permalink
fix: drop the flushpolicies interface to keep it simple (#113)
Browse files Browse the repository at this point in the history
* fix: drop the flushpolicies interface to keep it simple

* refactor: make the write function return the current size
  • Loading branch information
nicklasl authored Apr 2, 2024
1 parent 9d10bfd commit ba04c2d
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 109 deletions.
29 changes: 0 additions & 29 deletions src/main/java/com/spotify/confidence/BatchSizeFlushPolicy.java

This file was deleted.

6 changes: 2 additions & 4 deletions src/main/java/com/spotify/confidence/Confidence.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.spotify.confidence;

import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.Closer;
Expand All @@ -10,7 +9,6 @@
import io.grpc.ManagedChannelBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -218,8 +216,8 @@ public Confidence build() {
final SystemClock clock = new SystemClock();
final GrpcEventUploader uploader =
new GrpcEventUploader(clientSecret, clock, DEFAULT_CHANNEL);
final List<FlushPolicy> flushPolicies = ImmutableList.of(new BatchSizeFlushPolicy(5));
final EventSenderEngine engine = new EventSenderEngineImpl(flushPolicies, uploader, clock);
final var maxBatchSize = 5;
final EventSenderEngine engine = new EventSenderEngineImpl(maxBatchSize, uploader, clock);
return Confidence.create(engine, flagResolverClient);
}
}
Expand Down
14 changes: 5 additions & 9 deletions src/main/java/com/spotify/confidence/EventSenderEngineImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@ class EventSenderEngineImpl implements EventSenderEngine {
private final BlockingQueue<String> shutdownQueue = new LinkedBlockingQueue<>(1);
private final EventSenderStorage eventStorage = new InMemoryStorage();
private final EventUploader eventUploader;
private final List<FlushPolicy> flushPolicies;
private final int maxBatchSize;
private final Clock clock;
private static final String UPLOAD_SIG = "UPLOAD";
private static final String SHUTDOWN_UPLOAD = "SHUTDOWN_UPLOAD";
private static final String SHUTDOWN_UPLOAD_COMPLETED = "SHUTDOWN_UPLOAD_COMPLETED";
private static final String SHUTDOWN_WRITE_COMPLETED = "SHUTDOWN_WRITE_COMPLETED";
private volatile boolean isStopped = false;

EventSenderEngineImpl(
List<FlushPolicy> flushPolicyList, EventUploader eventUploader, Clock clock) {
this.flushPolicies = flushPolicyList;
EventSenderEngineImpl(int maxBatchSize, EventUploader eventUploader, Clock clock) {
this.maxBatchSize = maxBatchSize;
this.eventUploader = eventUploader;
this.clock = clock;
writeThread.submit(new WritePoller());
Expand All @@ -43,13 +42,10 @@ public void run() {
Thread.yield();
continue;
}
eventStorage.write(event);
flushPolicies.forEach(FlushPolicy::hit);

if (flushPolicies.stream().anyMatch(FlushPolicy::shouldFlush)) {
final var numberOfPendingEvents = eventStorage.write(event);
if (numberOfPendingEvents >= maxBatchSize) {
eventStorage.createBatch();
uploadQueue.add(UPLOAD_SIG);
flushPolicies.forEach(FlushPolicy::reset);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.List;

interface EventSenderStorage {
void write(Event event);
int write(Event event);

void createBatch();

Expand Down
9 changes: 0 additions & 9 deletions src/main/java/com/spotify/confidence/FlushPolicy.java

This file was deleted.

3 changes: 2 additions & 1 deletion src/main/java/com/spotify/confidence/InMemoryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ class InMemoryStorage implements EventSenderStorage {
private final List<EventBatch> batches = new ArrayList<>();

@Override
public synchronized void write(Event event) {
public synchronized int write(Event event) {
this.events.add(event);
return this.events.size();
}

public synchronized void createBatch() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.spotify.confidence;

import static com.spotify.confidence.EventSenderTestUtils.getFlushPolicies;
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableMap;
Expand All @@ -23,10 +22,10 @@ public class ConfidenceEventSenderIntegrationTest {
@Test
public void testEngineUploads() throws IOException {
final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
final int batchSize = 6;
final int maxBatchSize = 6;
final int numEvents = 14;
final EventSenderEngine engine =
new EventSenderEngineImpl(getFlushPolicies(10000, batchSize), alwaysSucceedUploader, clock);
new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock);
final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient);
int size = 0;
while (size++ < numEvents) {
Expand All @@ -35,27 +34,27 @@ public void testEngineUploads() throws IOException {
}

confidence.close(); // Should trigger the upload of an additional incomplete batch
final int additionalBatch = (numEvents % batchSize) > 0 ? 1 : 0;
final int additionalBatch = (numEvents % maxBatchSize) > 0 ? 1 : 0;

assertThat(alwaysSucceedUploader.uploadCalls.size())
.isEqualTo((numEvents / batchSize + additionalBatch));
.isEqualTo((numEvents / maxBatchSize + additionalBatch));
final List<EventBatch> fullEventBatches =
alwaysSucceedUploader.uploadCalls.subList(0, alwaysSucceedUploader.uploadCalls.size() - 1);
assertThat(fullEventBatches.stream().allMatch(batch -> batch.events().size() == batchSize))
assertThat(fullEventBatches.stream().allMatch(batch -> batch.events().size() == maxBatchSize))
.isTrue();
if (additionalBatch != 0) {
final EventBatch lastBatch =
alwaysSucceedUploader.uploadCalls.get(alwaysSucceedUploader.uploadCalls.size() - 1);
assertThat(lastBatch.events().size()).isEqualTo(numEvents % batchSize);
assertThat(lastBatch.events().size()).isEqualTo(numEvents % maxBatchSize);
}
}

@Test
public void testEngineCloseSuccessfullyWithoutEventsQueued() throws IOException {
final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
final int batchSize = 6;
final int maxBatchSize = 6;
final EventSenderEngine engine =
new EventSenderEngineImpl(getFlushPolicies(10000, batchSize), alwaysSucceedUploader, clock);
new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock);
final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient);

confidence.close(); // Should trigger the upload of an additional incomplete batch
Expand All @@ -64,13 +63,12 @@ public void testEngineCloseSuccessfullyWithoutEventsQueued() throws IOException

@Test
public void testEngineUploadsWhenIntermittentErrorWillRetry() throws IOException {
final int batchSize = 3;
final int maxBatchSize = 3;
final int numEvents = 14;
// This will fail at the 2nd and 5th upload
final List<Integer> failAtUploadWithIndex = List.of(2, 5);
final FakeUploader fakeUploader = new FakeUploader(failAtUploadWithIndex);
final EventSenderEngine engine =
new EventSenderEngineImpl(getFlushPolicies(10000, batchSize), fakeUploader, clock);
final EventSenderEngine engine = new EventSenderEngineImpl(maxBatchSize, fakeUploader, clock);
final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient);
int size = 0;
while (size++ < numEvents) {
Expand All @@ -80,26 +78,26 @@ public void testEngineUploadsWhenIntermittentErrorWillRetry() throws IOException
}

confidence.close(); // Should trigger the upload of an additional incomplete batch
final int additionalBatch = (numEvents % batchSize) > 0 ? 1 : 0;
final int additionalBatch = (numEvents % maxBatchSize) > 0 ? 1 : 0;

// Verify we had the correct number of calls to the uploader (including retries)
assertThat(fakeUploader.uploadCalls.size())
.isEqualTo((numEvents / batchSize + additionalBatch) + failAtUploadWithIndex.size());
.isEqualTo((numEvents / maxBatchSize + additionalBatch) + failAtUploadWithIndex.size());
// Verify we had the correct number of unique calls to the uploader
assertThat(Set.copyOf(fakeUploader.uploadCalls).size())
.isEqualTo((numEvents / batchSize + additionalBatch));
.isEqualTo((numEvents / maxBatchSize + additionalBatch));
}

@Test
public void multiThreadTest() throws IOException {
final int numberOfThreads = 50;
final int numberOfEvents = 100000;
final int eventsPerThread = numberOfEvents / numberOfThreads;
final int batchSize = 30;
final int maxBatchSize = 30;

final FakeUploader alwaysSucceedUploader = new FakeUploader();
final EventSenderEngine engine =
new EventSenderEngineImpl(getFlushPolicies(10000, batchSize), alwaysSucceedUploader, clock);
new EventSenderEngineImpl(maxBatchSize, alwaysSucceedUploader, clock);
final Confidence confidence = Confidence.create(engine, fakeFlagResolverClient);
final List<Future<Boolean>> futures = new ArrayList<>();
final ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
Expand All @@ -126,8 +124,8 @@ public void multiThreadTest() throws IOException {
}
});
confidence.close();
final int additionalBatch = (numberOfEvents % batchSize) > 0 ? 1 : 0;
final int expectedNumberOfBatches = (numberOfEvents / batchSize) + additionalBatch;
final int additionalBatch = (numberOfEvents % maxBatchSize) > 0 ? 1 : 0;
final int expectedNumberOfBatches = (numberOfEvents / maxBatchSize) + additionalBatch;
assertThat(alwaysSucceedUploader.uploadCalls.size()).isEqualTo(expectedNumberOfBatches);
}
}
37 changes: 0 additions & 37 deletions src/test/java/com/spotify/confidence/EventSenderTestUtils.java

This file was deleted.

0 comments on commit ba04c2d

Please sign in to comment.