diff --git a/README.md b/README.md index 1a6419b..d4fc299 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,7 @@ # Jetty ReactiveStream HttpClient -A [ReactiveStreams](http://www.reactive-streams.org/) wrapper around -[Jetty](https://eclipse.org/jetty)'s -[HttpClient](https://www.eclipse.org/jetty/documentation/jetty-12/programming-guide/index.html#pg-client-http). +A [ReactiveStreams](http://www.reactive-streams.org/) wrapper around [Jetty](https://eclipse.dev/jetty)'s [HttpClient](https://www.eclipse.dev/jetty/documentation/jetty-12/programming-guide/index.html#pg-client-http). ## Versions @@ -82,9 +80,7 @@ int status = Single.fromPublisher(publisher) The response content is processed by passing a `BiFunction` to `ReactiveRequest.response()`. -The `BiFunction` takes as parameters the `ReactiveResponse` and a `Publisher` for the response -content, and must return a `Publisher` of items of type `T` that is the result of the response -content processing. +The `BiFunction` takes as parameters the `ReactiveResponse` and a `Publisher` for the response content, and must return a `Publisher` of items of type `T` that is the result of the response content processing. Built-in utility functions can be found in `ReactiveResponse.Content`. @@ -100,8 +96,21 @@ Publisher response = request.response(ReactiveResponse.Content Publisher string = request.response(ReactiveResponse.Content.asString()); ``` -Alternatively, you can write your own processing `BiFunction` using any -ReactiveStreams library, such as RxJava 2 (which provides class `Flowable`): +#### Example: discarding non 200 OK response content + +```java +Publisher> publisher = request.response((response, content) -> { + if (response.getStatus() == HttpStatus.OK_200) { + return ReactiveResponse.Content.asStringResult().apply(response, content); + } else { + return ReactiveResponse.Content.asDiscardResult().apply(response, content); + } +}); +``` + +Class `ReactiveResponse.Result` is a Java `record` that holds the response and the response content to allow application code to implement logic that uses both response information such as response status code and response headers, and response content information. + +Alternatively, you can write your own processing `BiFunction` using any ReactiveStreams library, such as RxJava 2 (which provides class `Flowable`): #### Example: discarding non 200 OK response content @@ -123,7 +132,7 @@ Publisher publisher = reactiveRequest.response((reactiveResponse, }); ``` -Then the response content (if any) can be further processed: +The response content (if any) can be further processed: ```java Single contentLength = Flowable.fromPublisher(publisher) @@ -139,11 +148,9 @@ Single contentLength = Flowable.fromPublisher(publisher) ### Providing Request Content -Request content can be provided in a ReactiveStreams way, through the `ReactiveRequest.Content` -class, which _is-a_ `Publisher` with the additional specification of the content length -and the content type. -Below you can find an example using the utility methods in `ReactiveRequest.Content` -to create request content from a String: +Request content can be provided in a ReactiveStreams way, through the `ReactiveRequest.Content` class, which _is-a_ `Publisher` with the additional specification of the content length and the content type. + +Below you can find an example using the utility methods in `ReactiveRequest.Content` to create request content from a String: ```java HttpClient httpClient = ...; @@ -177,14 +184,11 @@ ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localho ### Events -If you are interested in the request and/or response events that are emitted -by the Jetty HttpClient APIs, you can obtain a `Publisher` for request and/or -response events, and subscribe a listener to be notified of the events. +If you are interested in the request and/or response events that are emitted by the Jetty HttpClient APIs, you can obtain a `Publisher` for request and/or response events, and subscribe a listener to be notified of the events. The event `Publisher`s are "hot" producers and do no buffer events. -If you subscribe to an event `Publisher` after the events have started, the -`Subscriber` will not be notified of events that already happened, and will -be notified of any event that will happen. + +If you subscribe to an event `Publisher` after the events have started, the `Subscriber` will not be notified of events that already happened, and will be notified of any event that will happen. ```java HttpClient httpClient = ...; diff --git a/src/main/java/org/eclipse/jetty/reactive/client/ReactiveResponse.java b/src/main/java/org/eclipse/jetty/reactive/client/ReactiveResponse.java index d18a11d..c93d267 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/ReactiveResponse.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/ReactiveResponse.java @@ -18,13 +18,15 @@ import java.nio.ByteBuffer; import java.util.Locale; import java.util.function.BiFunction; - import org.eclipse.jetty.client.Response; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.Content.Chunk; -import org.eclipse.jetty.reactive.client.internal.BufferingProcessor; +import org.eclipse.jetty.reactive.client.internal.ByteBufferBufferingProcessor; import org.eclipse.jetty.reactive.client.internal.DiscardingProcessor; +import org.eclipse.jetty.reactive.client.internal.ResultProcessor; +import org.eclipse.jetty.reactive.client.internal.StringBufferingProcessor; +import org.eclipse.jetty.reactive.client.internal.Transformer; import org.reactivestreams.Publisher; /** @@ -143,14 +145,113 @@ public static BiFunction, Publisher, Publisher> asString() { + return asString(StringBufferingProcessor.DEFAULT_MAX_CAPACITY); + } + + /** + * @return a response content processing function that converts the content to a string + * up to the specified maximum capacity in bytes. + */ + public static BiFunction, Publisher> asString(int maxCapacity) { + return (response, content) -> { + StringBufferingProcessor result = new StringBufferingProcessor(response, maxCapacity); + content.subscribe(result); + return result; + }; + } + + /** + * @return a response content processing function that converts the content to a {@link ByteBuffer} + * up to {@value ByteBufferBufferingProcessor#DEFAULT_MAX_CAPACITY} bytes. + */ + public static BiFunction, Publisher> asByteBuffer() { + return asByteBuffer(ByteBufferBufferingProcessor.DEFAULT_MAX_CAPACITY); + } + + /** + * @return a response content processing function that converts the content to a {@link ByteBuffer} + * up to the specified maximum capacity in bytes. + */ + public static BiFunction, Publisher> asByteBuffer(int maxCapacity) { return (response, content) -> { - BufferingProcessor result = new BufferingProcessor(response); + ByteBufferBufferingProcessor result = new ByteBufferBufferingProcessor(response, maxCapacity); content.subscribe(result); return result; }; } + + /** + * @return a response content processing function that discards the content + * and produces a {@link Result} with a {@code null} content of the given type. + * + * @param the type of the content + */ + public static BiFunction, Publisher>> asDiscardResult() { + return (response, content) -> { + ResultProcessor resultProcessor = new ResultProcessor<>(response); + discard().apply(response, content).subscribe(resultProcessor); + Transformer, Result> transformer = new Transformer<>(r -> null); + resultProcessor.subscribe(transformer); + return transformer; + }; + } + + /** + * @return a response content processing function that converts the content to a string + * up to {@value StringBufferingProcessor#DEFAULT_MAX_CAPACITY} bytes, + * and produces a {@link Result} with the string content. + */ + public static BiFunction, Publisher>> asStringResult() { + return asStringResult(StringBufferingProcessor.DEFAULT_MAX_CAPACITY); + } + + /** + * @return a response content processing function that converts the content to a string + * up to the specified maximum capacity in bytes, + * and produces a {@link Result} with the string content. + */ + public static BiFunction, Publisher>> asStringResult(int maxCapacity) { + return (response, content) -> { + ResultProcessor resultProcessor = new ResultProcessor<>(response); + asString(maxCapacity).apply(response, content).subscribe(resultProcessor); + return resultProcessor; + }; + } + + /** + * @return a response content processing function that converts the content to a {@link ByteBuffer} + * up to {@value ByteBufferBufferingProcessor#DEFAULT_MAX_CAPACITY} bytes, + * and produces a {@link Result} with the {@link ByteBuffer} content. + */ + public static BiFunction, Publisher>> asByteBufferResult() { + return asByteBufferResult(ByteBufferBufferingProcessor.DEFAULT_MAX_CAPACITY); + } + + /** + * @return a response content processing function that converts the content to a {@link ByteBuffer} + * up to the specified maximum capacity in bytes, + * and produces a {@link Result} with the {@link ByteBuffer} content. + */ + public static BiFunction, Publisher>> asByteBufferResult(int maxCapacity) { + return (response, content) -> { + ResultProcessor resultProcessor = new ResultProcessor<>(response); + asByteBuffer(maxCapacity).apply(response, content).subscribe(resultProcessor); + return resultProcessor; + }; + } + } + + /** + *

A record holding the {@link ReactiveResponse} and the response content.

+ * + * @param response the response + * @param content the response content + * @param the type of the response content + */ + public record Result(ReactiveResponse response, T content) { } public static class Event { diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractBufferingProcessor.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractBufferingProcessor.java new file mode 100644 index 0000000..960a846 --- /dev/null +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractBufferingProcessor.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.jetty.reactive.client.internal; + +import java.util.ArrayList; +import java.util.List; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.reactive.client.ReactiveResponse; + +/** + *

A {@link org.reactivestreams.Processor} that buffers {@link Content.Chunk}s + * up to a max capacity.

+ *

When the {@code complete} event is received from upstream, {@link #process(List)} + * is invoked to processes the chunks and produce a single item of type {@code T}, + * that is then published to downstream.

+ * + * @param the type of the item resulted from processing the chunks + */ +public abstract class AbstractBufferingProcessor extends AbstractSingleProcessor { + public static final int DEFAULT_MAX_CAPACITY = 2 * 1024 * 1024; + + private final List chunks = new ArrayList<>(); + private final ReactiveResponse response; + private final int maxCapacity; + private int capacity; + + public AbstractBufferingProcessor(ReactiveResponse response, int maxCapacity) { + this.response = response; + this.maxCapacity = maxCapacity; + } + + public ReactiveResponse getResponse() { + return response; + } + + @Override + public void onNext(Content.Chunk chunk) { + capacity += chunk.remaining(); + if ((maxCapacity > 0 && capacity > maxCapacity) || capacity < 0) { + upStreamCancel(); + onError(new IllegalStateException("buffering capacity %d exceeded".formatted(maxCapacity))); + return; + } + chunks.add(chunk); + upStreamRequest(1); + } + + @Override + public void onError(Throwable throwable) { + chunks.forEach(Content.Chunk::release); + super.onError(throwable); + } + + @Override + public void onComplete() { + T result = process(chunks); + downStreamOnNext(result); + super.onComplete(); + } + + @Override + public void cancel() { + chunks.forEach(Content.Chunk::release); + super.cancel(); + } + + protected abstract T process(List chunks); +} diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSingleProcessor.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSingleProcessor.java index 4c6a223..bef698d 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSingleProcessor.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/AbstractSingleProcessor.java @@ -16,7 +16,6 @@ package org.eclipse.jetty.reactive.client.internal; import java.util.Objects; - import org.eclipse.jetty.util.MathUtils; import org.eclipse.jetty.util.thread.AutoLock; import org.reactivestreams.Processor; @@ -55,7 +54,7 @@ public void cancel() { super.cancel(); } - private void upStreamCancel() { + protected void upStreamCancel() { Subscription upStream; try (AutoLock ignored = lock()) { upStream = this.upStream; diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/ByteBufferBufferingProcessor.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/ByteBufferBufferingProcessor.java new file mode 100644 index 0000000..8944616 --- /dev/null +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/ByteBufferBufferingProcessor.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.jetty.reactive.client.internal; + +import java.nio.ByteBuffer; +import java.util.List; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.reactive.client.ReactiveResponse; + +public class ByteBufferBufferingProcessor extends AbstractBufferingProcessor { + public ByteBufferBufferingProcessor(ReactiveResponse response, int maxCapacity) { + super(response, maxCapacity); + } + + @Override + protected ByteBuffer process(List chunks) { + int length = chunks.stream().mapToInt(Content.Chunk::remaining).sum(); + ByteBuffer result = ByteBuffer.allocateDirect(length); + for (Content.Chunk chunk : chunks) { + result.put(chunk.getByteBuffer()); + chunk.release(); + } + return result.flip(); + } +} diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/ResultProcessor.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/ResultProcessor.java new file mode 100644 index 0000000..c2a801c --- /dev/null +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/ResultProcessor.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.jetty.reactive.client.internal; + +import org.eclipse.jetty.reactive.client.ReactiveResponse; + +/** + *

A {@link org.reactivestreams.Processor} that receives (typically) + * a single item of response content of type {@code T} from upstream, + * and produces a single {@link ReactiveResponse.Result} that wraps + * the {@link ReactiveResponse} and the response content item.

+ * + * @param the type of the response content associated + */ +public class ResultProcessor extends AbstractSingleProcessor> { + private final ReactiveResponse response; + private T item; + + public ResultProcessor(ReactiveResponse response) { + this.response = response; + } + + @Override + public void onNext(T item) { + this.item = item; + } + + @Override + public void onComplete() { + downStreamOnNext(new ReactiveResponse.Result<>(response, item)); + super.onComplete(); + } +} diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/BufferingProcessor.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/StringBufferingProcessor.java similarity index 63% rename from src/main/java/org/eclipse/jetty/reactive/client/internal/BufferingProcessor.java rename to src/main/java/org/eclipse/jetty/reactive/client/internal/StringBufferingProcessor.java index 4992f0b..8692af7 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/internal/BufferingProcessor.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/StringBufferingProcessor.java @@ -17,28 +17,18 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.List; - +import java.util.Objects; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.reactive.client.ReactiveResponse; -public class BufferingProcessor extends AbstractSingleProcessor { - private final List chunks = new ArrayList<>(); - private final ReactiveResponse response; - - public BufferingProcessor(ReactiveResponse response) { - this.response = response; - } - - @Override - public void onNext(Content.Chunk chunk) { - chunks.add(chunk); - upStreamRequest(1); +public class StringBufferingProcessor extends AbstractBufferingProcessor { + public StringBufferingProcessor(ReactiveResponse response, int maxCapacity) { + super(response, maxCapacity); } @Override - public void onComplete() { + protected String process(List chunks) { int length = chunks.stream().mapToInt(Content.Chunk::remaining).sum(); byte[] bytes = new byte[length]; int offset = 0; @@ -48,14 +38,7 @@ public void onComplete() { offset += l; chunk.release(); } - - String encoding = response.getEncoding(); - if (encoding == null) { - encoding = StandardCharsets.UTF_8.name(); - } - - downStreamOnNext(new String(bytes, Charset.forName(encoding))); - - super.onComplete(); + String encoding = Objects.requireNonNullElse(getResponse().getEncoding(), StandardCharsets.UTF_8.name()); + return new String(bytes, Charset.forName(encoding)); } } diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/Transformer.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/Transformer.java new file mode 100644 index 0000000..887b6ef --- /dev/null +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/Transformer.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.jetty.reactive.client.internal; + +import java.util.Objects; +import java.util.function.Function; + +/** + *

A {@link org.reactivestreams.Processor} that applies a function + * to transform items from input type {@code I} to output type {@code O}.

+ * + * @param the input type + * @param the output type + */ +public class Transformer extends AbstractSingleProcessor { + private final Function transformer; + + public Transformer(Function transformer) { + this.transformer = Objects.requireNonNull(transformer); + } + + @Override + public void onNext(I i) { + downStreamOnNext(transformer.apply(i)); + } +} diff --git a/src/test/java/org/eclipse/jetty/reactive/client/RxJava2Test.java b/src/test/java/org/eclipse/jetty/reactive/client/RxJava2Test.java index 818658f..f2ae536 100644 --- a/src/test/java/org/eclipse/jetty/reactive/client/RxJava2Test.java +++ b/src/test/java/org/eclipse/jetty/reactive/client/RxJava2Test.java @@ -17,6 +17,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; @@ -27,11 +28,10 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; - import io.reactivex.rxjava3.core.Emitter; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Single; @@ -42,15 +42,14 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.RetainableByteBuffer; -import org.eclipse.jetty.reactive.client.internal.AbstractSingleProcessor; import org.eclipse.jetty.reactive.client.internal.AbstractSinglePublisher; -import org.eclipse.jetty.reactive.client.internal.BufferingProcessor; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.util.Blocker; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.MathUtils; import org.eclipse.jetty.util.thread.AutoLock; import org.junit.jupiter.api.Disabled; @@ -58,7 +57,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -81,7 +79,8 @@ public boolean handle(Request request, Response response, Callback callback) { CountDownLatch contentLatch = new CountDownLatch(1); CountDownLatch responseLatch = new CountDownLatch(1); - ReactiveRequest request = ReactiveRequest.newBuilder(httpClient().newRequest("https://example.org")).build(); + URI uri = URI.create("https://example.org"); + ReactiveRequest request = ReactiveRequest.newBuilder(httpClient().newRequest(uri)).build(); Flowable.fromPublisher(request.response((reactiveResponse, chunkPublisher) -> Flowable.fromPublisher(chunkPublisher) .map(chunk -> { ByteBuffer byteBuffer = chunk.getByteBuffer(); @@ -370,15 +369,13 @@ public boolean handle(Request request, Response response, Callback callback) { }); ReactiveRequest request = ReactiveRequest.newBuilder(httpClient(), uri()).build(); - Pair> pair = Single.fromPublisher(request.response((response, content) -> - Flowable.just(new Pair<>(response, content)))).blockingGet(); - ReactiveResponse response = pair.one; + ReactiveResponse.Result> result = Single.fromPublisher(request.response((response, content) -> + Flowable.just(new ReactiveResponse.Result<>(response, content)))).blockingGet(); - assertEquals(response.getStatus(), HttpStatus.OK_200); + assertEquals(result.response().getStatus(), HttpStatus.OK_200); - BufferingProcessor processor = new BufferingProcessor(response); - pair.two.subscribe(processor); - String responseContent = Single.fromPublisher(processor).blockingGet(); + String responseContent = Single.fromPublisher(ReactiveResponse.Content.asString().apply(result.response(), result.content())) + .blockingGet(); assertEquals(responseContent, pangram); } @@ -525,7 +522,7 @@ public boolean handle(Request request, Response response, Callback callback) thr } else { return ReactiveResponse.Content.asString().apply(response, content); } - })).subscribe((status, failure) -> { + })).subscribe((content, failure) -> { if (failure != null) { latch.countDown(); } @@ -562,68 +559,67 @@ public boolean handle(Request request, Response response, Callback callback) thr ReactiveRequest request = ReactiveRequest.newBuilder(httpClient().newRequest(uri())).build(); - // RxJava2 API always call request(Long.MAX_VALUE), - // but I want to control backpressure explicitly, - // so below the RxJava2 APIs are not used (much). - - Publisher bufRespPub = request.response((response, content) -> { - BufferedResponse result = new BufferedResponse(response); + Publisher> publisher = request.response((response, content) -> { if (response.getStatus() == HttpStatus.OK_200) { - Processor processor = new AbstractSingleProcessor<>() { - @Override - public void onNext(Content.Chunk chunk) { - // Accumulate the chunks, without consuming - // the buffers nor completing the callbacks. - result.chunks.add(chunk); - upStreamRequest(1); - } - - @Override - public void onComplete() { - downStreamOnNext(result); - super.onComplete(); - } - }; - content.subscribe(processor); - return processor; + return ReactiveResponse.Content.asStringResult().apply(response, content); } else { - ReactiveResponse.Content.discard().apply(response, content); - return Flowable.just(result); + return ReactiveResponse.Content.asDiscardResult().apply(response, content); } }); - AtomicReference ref = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - bufRespPub.subscribe(new Subscriber<>() { - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(1); - } + ReactiveResponse.Result result = Single.fromPublisher(publisher) + .blockingGet(); - @Override - public void onNext(BufferedResponse bufferedResponse) { - ref.set(bufferedResponse); - } + assertEquals(result.response().getStatus(), HttpStatus.OK_200); +; + String expected = StandardCharsets.UTF_8.decode(ByteBuffer.allocate(original.length) + .put(original) + .flip()).toString(); + assertEquals(expected, result.content()); + } + @ParameterizedTest + @MethodSource("protocols") + public void testSlowDownload(String protocol) throws Exception { + byte[] bytes = new byte[512 * 1024]; + ThreadLocalRandom.current().nextBytes(bytes); + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + prepare(protocol, new Handler.Abstract() { @Override - public void onError(Throwable throwable) { - } + public boolean handle(Request request, Response response, Callback callback) { + new IteratingCallback() { + @Override + protected Action process() throws Exception { + // Write slowly 1 byte at a time. + if (byteBuffer.position() % 256 == 0) { + Thread.sleep(1); + } + ByteBuffer data = ByteBuffer.wrap(new byte[]{byteBuffer.get()}); + boolean last = !byteBuffer.hasRemaining(); + response.write(last, data, this); + return last ? Action.SUCCEEDED : Action.SCHEDULED; + } - @Override - public void onComplete() { - latch.countDown(); + @Override + protected void onCompleteSuccess() { + callback.succeeded(); + } + + @Override + protected void onCompleteFailure(Throwable cause) { + callback.failed(cause); + } + }.iterate(); + return true; } }); - assertTrue(latch.await(5, TimeUnit.SECONDS)); - BufferedResponse bufferedResponse = ref.get(); - assertEquals(bufferedResponse.response.getStatus(), HttpStatus.OK_200); - ByteBuffer content = ByteBuffer.allocate(content1.length + content2.length); - bufferedResponse.chunks.forEach(chunk -> { - content.put(chunk.getByteBuffer()); - chunk.release(); - }); - assertEquals(content.flip(), ByteBuffer.wrap(original)); + ReactiveRequest request = ReactiveRequest.newBuilder(httpClient().newRequest(uri())).build(); + + ReactiveResponse.Result result = Single.fromPublisher(request.response(ReactiveResponse.Content.asByteBufferResult())) + .blockingGet(); + assertEquals(HttpStatus.OK_200, result.response().getStatus()); + assertArrayEquals(bytes, BufferUtil.toArray(result.content())); } @ParameterizedTest @@ -690,15 +686,6 @@ public boolean handle(Request request, Response response, Callback callback) { assertEquals(text1 + text2, completable.get(5, TimeUnit.SECONDS)); } - private record Pair(X one, Y two) { - } - - private record BufferedResponse(ReactiveResponse response, List chunks) { - private BufferedResponse(ReactiveResponse response) { - this(response, new ArrayList<>()); - } - } - private static class ChunkListSinglePublisher extends AbstractSinglePublisher { private final List byteBuffers = new ArrayList<>(); private boolean complete;