Skip to content

Commit

Permalink
Added test for slow downloads.
Browse files Browse the repository at this point in the history
Added maxCapacity for downloaded strings.
Introduced facility to obtain the response content as ByteBuffer.
Introduced ReactiveResponse.Result to carry both the response and the response content.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Nov 17, 2023
1 parent a7cec54 commit c016737
Show file tree
Hide file tree
Showing 9 changed files with 400 additions and 122 deletions.
44 changes: 24 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

# Jetty ReactiveStream HttpClient

A [ReactiveStreams](http://www.reactive-streams.org/) wrapper around
[Jetty](https://eclipse.org/jetty)'s
[HttpClient](https://www.eclipse.org/jetty/documentation/jetty-12/programming-guide/index.html#pg-client-http).
A [ReactiveStreams](http://www.reactive-streams.org/) wrapper around [Jetty](https://eclipse.dev/jetty)'s [HttpClient](https://www.eclipse.dev/jetty/documentation/jetty-12/programming-guide/index.html#pg-client-http).

## Versions

Expand Down Expand Up @@ -82,9 +80,7 @@ int status = Single.fromPublisher(publisher)

The response content is processed by passing a `BiFunction` to `ReactiveRequest.response()`.

The `BiFunction` takes as parameters the `ReactiveResponse` and a `Publisher` for the response
content, and must return a `Publisher` of items of type `T` that is the result of the response
content processing.
The `BiFunction` takes as parameters the `ReactiveResponse` and a `Publisher` for the response content, and must return a `Publisher` of items of type `T` that is the result of the response content processing.

Built-in utility functions can be found in `ReactiveResponse.Content`.

Expand All @@ -100,8 +96,21 @@ Publisher<ReactiveResponse> response = request.response(ReactiveResponse.Content
Publisher<String> string = request.response(ReactiveResponse.Content.asString());
```

Alternatively, you can write your own processing `BiFunction` using any
ReactiveStreams library, such as RxJava 2 (which provides class `Flowable`):
#### Example: discarding non 200 OK response content

```java
Publisher<ReactiveResponse.Result<String>> publisher = request.response((response, content) -> {
if (response.getStatus() == HttpStatus.OK_200) {
return ReactiveResponse.Content.asStringResult().apply(response, content);
} else {
return ReactiveResponse.Content.<String>asDiscardResult().apply(response, content);
}
});
```

Class `ReactiveResponse.Result` is a Java `record` that holds the response and the response content to allow application code to implement logic that uses both response information such as response status code and response headers, and response content information.

Alternatively, you can write your own processing `BiFunction` using any ReactiveStreams library, such as RxJava 2 (which provides class `Flowable`):

#### Example: discarding non 200 OK response content

Expand All @@ -123,7 +132,7 @@ Publisher<Content.Chunk> publisher = reactiveRequest.response((reactiveResponse,
});
```

Then the response content (if any) can be further processed:
The response content (if any) can be further processed:

```java
Single<Long> contentLength = Flowable.fromPublisher(publisher)
Expand All @@ -139,11 +148,9 @@ Single<Long> contentLength = Flowable.fromPublisher(publisher)

### Providing Request Content

Request content can be provided in a ReactiveStreams way, through the `ReactiveRequest.Content`
class, which _is-a_ `Publisher` with the additional specification of the content length
and the content type.
Below you can find an example using the utility methods in `ReactiveRequest.Content`
to create request content from a String:
Request content can be provided in a ReactiveStreams way, through the `ReactiveRequest.Content` class, which _is-a_ `Publisher` with the additional specification of the content length and the content type.

Below you can find an example using the utility methods in `ReactiveRequest.Content` to create request content from a String:

```java
HttpClient httpClient = ...;
Expand Down Expand Up @@ -177,14 +184,11 @@ ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localho

### Events

If you are interested in the request and/or response events that are emitted
by the Jetty HttpClient APIs, you can obtain a `Publisher` for request and/or
response events, and subscribe a listener to be notified of the events.
If you are interested in the request and/or response events that are emitted by the Jetty HttpClient APIs, you can obtain a `Publisher` for request and/or response events, and subscribe a listener to be notified of the events.

The event `Publisher`s are "hot" producers and do no buffer events.
If you subscribe to an event `Publisher` after the events have started, the
`Subscriber` will not be notified of events that already happened, and will
be notified of any event that will happen.

If you subscribe to an event `Publisher` after the events have started, the `Subscriber` will not be notified of events that already happened, and will be notified of any event that will happen.

```java
HttpClient httpClient = ...;
Expand Down
107 changes: 104 additions & 3 deletions src/main/java/org/eclipse/jetty/reactive/client/ReactiveResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.function.BiFunction;

import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.Content.Chunk;
import org.eclipse.jetty.reactive.client.internal.BufferingProcessor;
import org.eclipse.jetty.reactive.client.internal.ByteBufferBufferingProcessor;
import org.eclipse.jetty.reactive.client.internal.DiscardingProcessor;
import org.eclipse.jetty.reactive.client.internal.ResultProcessor;
import org.eclipse.jetty.reactive.client.internal.StringBufferingProcessor;
import org.eclipse.jetty.reactive.client.internal.Transformer;
import org.reactivestreams.Publisher;

/**
Expand Down Expand Up @@ -143,14 +145,113 @@ public static BiFunction<ReactiveResponse, Publisher<Chunk>, Publisher<ReactiveR

/**
* @return a response content processing function that converts the content to a string
* up to {@value StringBufferingProcessor#DEFAULT_MAX_CAPACITY} bytes.
*/
public static BiFunction<ReactiveResponse, Publisher<Chunk>, Publisher<String>> asString() {
return asString(StringBufferingProcessor.DEFAULT_MAX_CAPACITY);
}

/**
* @return a response content processing function that converts the content to a string
* up to the specified maximum capacity in bytes.
*/
public static BiFunction<ReactiveResponse, Publisher<Chunk>, Publisher<String>> asString(int maxCapacity) {
return (response, content) -> {
StringBufferingProcessor result = new StringBufferingProcessor(response, maxCapacity);
content.subscribe(result);
return result;
};
}

/**
* @return a response content processing function that converts the content to a {@link ByteBuffer}
* up to {@value ByteBufferBufferingProcessor#DEFAULT_MAX_CAPACITY} bytes.
*/
public static BiFunction<ReactiveResponse, Publisher<Chunk>, Publisher<ByteBuffer>> asByteBuffer() {
return asByteBuffer(ByteBufferBufferingProcessor.DEFAULT_MAX_CAPACITY);
}

/**
* @return a response content processing function that converts the content to a {@link ByteBuffer}
* up to the specified maximum capacity in bytes.
*/
public static BiFunction<ReactiveResponse, Publisher<Chunk>, Publisher<ByteBuffer>> asByteBuffer(int maxCapacity) {
return (response, content) -> {
BufferingProcessor result = new BufferingProcessor(response);
ByteBufferBufferingProcessor result = new ByteBufferBufferingProcessor(response, maxCapacity);
content.subscribe(result);
return result;
};
}

/**
* @return a response content processing function that discards the content
* and produces a {@link Result} with a {@code null} content of the given type.
*
* @param <T> the type of the content
*/
public static <T> BiFunction<ReactiveResponse, Publisher<Chunk>, Publisher<Result<T>>> asDiscardResult() {
return (response, content) -> {
ResultProcessor<ReactiveResponse> resultProcessor = new ResultProcessor<>(response);
discard().apply(response, content).subscribe(resultProcessor);
Transformer<Result<ReactiveResponse>, Result<T>> transformer = new Transformer<>(r -> null);
resultProcessor.subscribe(transformer);
return transformer;
};
}

/**
* @return a response content processing function that converts the content to a string
* up to {@value StringBufferingProcessor#DEFAULT_MAX_CAPACITY} bytes,
* and produces a {@link Result} with the string content.
*/
public static BiFunction<ReactiveResponse, Publisher<Chunk>, Publisher<Result<String>>> asStringResult() {
return asStringResult(StringBufferingProcessor.DEFAULT_MAX_CAPACITY);
}

/**
* @return a response content processing function that converts the content to a string
* up to the specified maximum capacity in bytes,
* and produces a {@link Result} with the string content.
*/
public static BiFunction<ReactiveResponse, Publisher<Chunk>, Publisher<Result<String>>> asStringResult(int maxCapacity) {
return (response, content) -> {
ResultProcessor<String> resultProcessor = new ResultProcessor<>(response);
asString(maxCapacity).apply(response, content).subscribe(resultProcessor);
return resultProcessor;
};
}

/**
* @return a response content processing function that converts the content to a {@link ByteBuffer}
* up to {@value ByteBufferBufferingProcessor#DEFAULT_MAX_CAPACITY} bytes,
* and produces a {@link Result} with the {@link ByteBuffer} content.
*/
public static BiFunction<ReactiveResponse, Publisher<Chunk>, Publisher<Result<ByteBuffer>>> asByteBufferResult() {
return asByteBufferResult(ByteBufferBufferingProcessor.DEFAULT_MAX_CAPACITY);
}

/**
* @return a response content processing function that converts the content to a {@link ByteBuffer}
* up to the specified maximum capacity in bytes,
* and produces a {@link Result} with the {@link ByteBuffer} content.
*/
public static BiFunction<ReactiveResponse, Publisher<Chunk>, Publisher<Result<ByteBuffer>>> asByteBufferResult(int maxCapacity) {
return (response, content) -> {
ResultProcessor<ByteBuffer> resultProcessor = new ResultProcessor<>(response);
asByteBuffer(maxCapacity).apply(response, content).subscribe(resultProcessor);
return resultProcessor;
};
}
}

/**
* <p>A record holding the {@link ReactiveResponse} and the response content.</p>
*
* @param response the response
* @param content the response content
* @param <T> the type of the response content
*/
public record Result<T>(ReactiveResponse response, T content) {
}

public static class Event {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.reactive.client.internal;

import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.reactive.client.ReactiveResponse;

/**
* <p>A {@link org.reactivestreams.Processor} that buffers {@link Content.Chunk}s
* up to a max capacity.</p>
* <p>When the {@code complete} event is received from upstream, {@link #process(List)}
* is invoked to processes the chunks and produce a single item of type {@code T},
* that is then published to downstream.</p>
*
* @param <T> the type of the item resulted from processing the chunks
*/
public abstract class AbstractBufferingProcessor<T> extends AbstractSingleProcessor<Content.Chunk, T> {
public static final int DEFAULT_MAX_CAPACITY = 2 * 1024 * 1024;

private final List<Content.Chunk> chunks = new ArrayList<>();
private final ReactiveResponse response;
private final int maxCapacity;
private int capacity;

public AbstractBufferingProcessor(ReactiveResponse response, int maxCapacity) {
this.response = response;
this.maxCapacity = maxCapacity;
}

public ReactiveResponse getResponse() {
return response;
}

@Override
public void onNext(Content.Chunk chunk) {
capacity += chunk.remaining();
if ((maxCapacity > 0 && capacity > maxCapacity) || capacity < 0) {
upStreamCancel();
onError(new IllegalStateException("buffering capacity %d exceeded".formatted(maxCapacity)));
return;
}
chunks.add(chunk);
upStreamRequest(1);
}

@Override
public void onError(Throwable throwable) {
chunks.forEach(Content.Chunk::release);
super.onError(throwable);
}

@Override
public void onComplete() {
T result = process(chunks);
downStreamOnNext(result);
super.onComplete();
}

@Override
public void cancel() {
chunks.forEach(Content.Chunk::release);
super.cancel();
}

protected abstract T process(List<Content.Chunk> chunks);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.eclipse.jetty.reactive.client.internal;

import java.util.Objects;

import org.eclipse.jetty.util.MathUtils;
import org.eclipse.jetty.util.thread.AutoLock;
import org.reactivestreams.Processor;
Expand Down Expand Up @@ -55,7 +54,7 @@ public void cancel() {
super.cancel();
}

private void upStreamCancel() {
protected void upStreamCancel() {
Subscription upStream;
try (AutoLock ignored = lock()) {
upStream = this.upStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.reactive.client.internal;

import java.nio.ByteBuffer;
import java.util.List;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.reactive.client.ReactiveResponse;

public class ByteBufferBufferingProcessor extends AbstractBufferingProcessor<ByteBuffer> {
public ByteBufferBufferingProcessor(ReactiveResponse response, int maxCapacity) {
super(response, maxCapacity);
}

@Override
protected ByteBuffer process(List<Content.Chunk> chunks) {
int length = chunks.stream().mapToInt(Content.Chunk::remaining).sum();
ByteBuffer result = ByteBuffer.allocateDirect(length);
for (Content.Chunk chunk : chunks) {
result.put(chunk.getByteBuffer());
chunk.release();
}
return result.flip();
}
}
Loading

0 comments on commit c016737

Please sign in to comment.