Skip to content

Commit

Permalink
fix: Response Errors don't trigger retries (#108)
Browse files Browse the repository at this point in the history
fix: Event errors dont trigger retries
  • Loading branch information
fabriziodemaria authored Mar 26, 2024
1 parent 4106c9f commit 81cfcfd
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public CompletableFuture<Boolean> upload(EventBatch batch) {

return GrpcUtil.toCompletableFuture(
stub.withDeadlineAfter(5, TimeUnit.SECONDS).publishEvents(request))
.thenApply(publishEventsResponse -> publishEventsResponse.getErrorsCount() == 0)
.thenApply(publishEventsResponse -> true)
.exceptionally(
(throwable -> {
// TODO update to use some user-configurable logging
Expand Down
46 changes: 42 additions & 4 deletions src/test/java/com/spotify/confidence/GrpcEventUploaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableMap;
import com.spotify.confidence.events.v1.EventError;
import com.spotify.confidence.events.v1.EventError.Reason;
import com.spotify.confidence.events.v1.EventsServiceGrpc;
import com.spotify.confidence.events.v1.PublishEventsRequest;
import com.spotify.confidence.events.v1.PublishEventsResponse;
Expand Down Expand Up @@ -132,9 +134,28 @@ public void testMapsMultiEventBatchToProtobuf() {
}
}

@Test
public void testMapsMultiEventBatchToProtobufSparseErrors()
throws ExecutionException, InterruptedException {
fakedEventsService.resultType = ResultType.FIRST_EVENT_ERROR;
final EventBatch batch =
new EventBatch(
List.of(
new Event("event1", messageStruct("m1"), contextStruct("c1"), 1337),
new Event("event2", messageStruct("m2"), contextStruct("c2"), 1338),
new Event("event3", messageStruct("m3"), contextStruct("c3"), 1339),
new Event("event4", messageStruct("m4"), contextStruct("c4"), 1340)));
final CompletableFuture<Boolean> completableFuture = uploader.upload(batch);
assertThat(fakedEventsService.requests).hasSize(1);
final PublishEventsRequest request = fakedEventsService.requests.get(0);
assertThat(request.getEventsList()).hasSize(4);
final Boolean result = completableFuture.get();
assertThat(result).isTrue();
}

@Test
public void testServiceThrows() throws ExecutionException, InterruptedException {
fakedEventsService.shouldError = true;
fakedEventsService.resultType = ResultType.REQUEST_ERROR;
final EventBatch batch =
new EventBatch(List.of(new Event("event1", messageStruct("1"), contextStruct("1"), 1337)));
final CompletableFuture<Boolean> completableFuture = uploader.upload(batch);
Expand All @@ -151,21 +172,38 @@ private ConfidenceValue.Struct messageStruct(String s) {
return ConfidenceValue.of(ImmutableMap.of("messageKey", ConfidenceValue.of("value_" + s)));
}

private enum ResultType {
REQUEST_ERROR,
FIRST_EVENT_ERROR,
SUCCESS
}

private static class FakedEventsService extends EventsServiceGrpc.EventsServiceImplBase {
public boolean shouldError;

public ResultType resultType;
final List<PublishEventsRequest> requests = new ArrayList<>();

public void clear() {
requests.clear();
shouldError = false;
resultType = ResultType.SUCCESS;
}

@Override
public void publishEvents(
PublishEventsRequest request, StreamObserver<PublishEventsResponse> responseObserver) {
requests.add(request);
if (shouldError) {
if (resultType == ResultType.REQUEST_ERROR) {
responseObserver.onError(new RuntimeException("error"));
} else if (resultType == ResultType.FIRST_EVENT_ERROR) {
responseObserver.onNext(
PublishEventsResponse.newBuilder()
.addErrors(
0,
EventError.newBuilder()
.setReason(Reason.EVENT_SCHEMA_VALIDATION_FAILED)
.build())
.build());
responseObserver.onCompleted();
} else {
responseObserver.onNext(PublishEventsResponse.newBuilder().build());
responseObserver.onCompleted();
Expand Down

0 comments on commit 81cfcfd

Please sign in to comment.