Skip to content

Commit

Permalink
Return a CompletableFuture from BulkIngester.flush method
Browse files Browse the repository at this point in the history
  • Loading branch information
aidin36 committed May 13, 2024
1 parent 64ec619 commit e765121
Showing 1 changed file with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -80,9 +80,9 @@ private static class RequestExecution<Context> {
public final long id;
public final BulkRequest request;
public final List<Context> contexts;
public final CompletionStage<BulkResponse> futureResponse;
public final CompletableFuture<BulkResponse> futureResponse;

RequestExecution(long id, BulkRequest request, List<Context> contexts, CompletionStage<BulkResponse> futureResponse) {
RequestExecution(long id, BulkRequest request, List<Context> contexts, CompletableFuture<BulkResponse> futureResponse) {
this.id = id;
this.request = request;
this.contexts = contexts;
Expand Down Expand Up @@ -271,7 +271,11 @@ private void failsafeFlush() {
}
}

public void flush() {
/**
* @return A future of the response. The BulkResponse is empty if there was nothing to execute.
*/
@Nullable
public CompletableFuture<BulkResponse> flush() {
RequestExecution<Context> exec = sendRequestCondition.whenReadyIf(
() -> {
// May happen on manual and periodic flushes
Expand All @@ -294,7 +298,7 @@ public void flush() {
listener.beforeBulk(id, request, requestContexts);
}

CompletionStage<BulkResponse> result = client.bulk(request);
CompletableFuture<BulkResponse> result = client.bulk(request);
requestsInFlightCount++;

if (listener == null) {
Expand Down Expand Up @@ -327,7 +331,15 @@ public void flush() {
}
return null;
});

return exec.futureResponse;
}

return CompletableFuture.completedFuture(BulkResponse.of(b -> b
.errors(false)
.items(Collections.emptyList())
.took(1))
);
}

public void add(BulkOperation operation, Context context) {
Expand Down

0 comments on commit e765121

Please sign in to comment.