Skip to content

Commit

Permalink
add javadoc; cancel appropriately
Browse files Browse the repository at this point in the history
  • Loading branch information
prwhelan committed Sep 5, 2024
1 parent f1306b3 commit 481bf55
Showing 1 changed file with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ public class StreamingRestChunkedToXContentListener implements ActionListener<In
private final AtomicBoolean isLastPart = new AtomicBoolean(false);
private final RestChannel channel;
private final ToXContent.Params params;

/**
* A listener for the first part of the next entry to become available for transmission.
* Chunks are sent one at a time through the completion of this listener.
* This listener is initialized in {@link #initializeStream(InferenceAction.Response)} before the first chunk is requested.
* When a chunk is ready, this listener is completed with the converted {@link ChunkedRestResponseBodyPart}.
* After transmitting the chunk, {@link #requestNextChunk(ActionListener)} will set the next listener and request the next
* chunk. This cycle will repeat until this listener completes with the DONE chunk and the stream closes.
*/
private SubscribableListener<ChunkedRestResponseBodyPart> nextBodyPartListener;

public StreamingRestChunkedToXContentListener(RestChannel channel) {
Expand Down Expand Up @@ -190,6 +199,8 @@ public void onSubscribe(Flow.Subscription subscription) {
if (isLastPart.get() == false) {
this.subscription = subscription;
subscription.request(1);
} else {
subscription.cancel();
}
}

Expand All @@ -207,7 +218,6 @@ public void onError(Throwable throwable) {
if (isLastPart.compareAndSet(false, true)) {
logger.error("A failure occurred in ElasticSearch while streaming the response.", throwable);
nextBodyPartListener().onResponse(new ServerSentEventResponseBodyPart(ServerSentEvents.ERROR, errorChunk(throwable)));
subscription.cancel();
}
}

Expand Down

0 comments on commit 481bf55

Please sign in to comment.