Skip to content

Commit

Permalink
bug: Reactive Stream error handling is incorrect (#482)
Browse files Browse the repository at this point in the history
* Add test for error handling [skip ci]

* Potential fix
  • Loading branch information
timyates authored Jun 12, 2023
1 parent 8cb9c3e commit e1723f6
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.micronaut.servlet.jetty

import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.http.HttpHeaders
import io.micronaut.http.HttpRequest
import io.micronaut.http.HttpStatus
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.http.exceptions.HttpStatusException
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import spock.lang.Specification

@MicronautTest
@Property(name = "spec.name", value = SPEC_NAME)
class StreamErrorSpec extends Specification {

static final String SPEC_NAME = "StreamErrorSpec"

@Inject
@Client("/")
HttpClient client;

void "status error as first item"() {
when:
def response = client.toBlocking().exchange(HttpRequest.GET("/stream-error/status-error-as-first-item"), String)

then:
def e = thrown(HttpClientResponseException)
e.response.status == HttpStatus.NOT_FOUND
e.response.body() == "foo"
}

void "immediate status error"() {
when:
def response = client.toBlocking().exchange(HttpRequest.GET("/stream-error/status-error-immediate"), String)

then:
def e = thrown(HttpClientResponseException)
e.response.status == HttpStatus.NOT_FOUND
e.response.body() == "foo"
}

@Controller("/stream-error")
@Requires(property = "spec.name", value = SPEC_NAME)
static class StreamController {

@Get(uri = "/status-error-as-first-item")
Publisher<String> statusErrorAsFirstItem() {
return Flux.error(new HttpStatusException(HttpStatus.NOT_FOUND, (Object) "foo"));
}

@Get(uri = "/status-error-immediate")
Publisher<String> statusErrorImmediate() {
throw new HttpStatusException(HttpStatus.NOT_FOUND, (Object) "foo");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.micronaut.http.annotation.Produces;
import io.micronaut.http.codec.MediaTypeCodec;
import io.micronaut.http.cookie.Cookie;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.http.server.exceptions.InternalServerException;
import io.micronaut.servlet.http.ServletHttpResponse;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -98,6 +99,7 @@ public Publisher<MutableHttpResponse<?>> stream(Publisher<?> dataPublisher) {
boolean isJson = contentType.getSubtype().equals("json");
boolean first = true;
boolean raw = false;
boolean written = false;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
Expand Down Expand Up @@ -128,41 +130,7 @@ public void onNext(Object o) {
try {
if (outputStream.isReady() && !finished.get()) {

if (o instanceof byte[]) {
raw = true;
outputStream.write((byte[]) o);
flushIfReady();
} else if (o instanceof ByteBuffer) {
ByteBuffer buf = (ByteBuffer) o;
try {
raw = true;
outputStream.write(buf.toByteArray());
flushIfReady();
} finally {
if (buf instanceof ReferenceCounted) {
((ReferenceCounted) buf).release();
}
}
} else if (codec != null) {

if (isJson) {
if (first) {
outputStream.write('[');
first = false;
} else {
outputStream.write(',');
}
}
if (outputStream.isReady()) {
if (o instanceof CharSequence) {
outputStream.write(o.toString().getBytes(getCharacterEncoding()));
} else {
byte[] bytes = codec.encode(o);
outputStream.write(bytes);
}
flushIfReady();
}
}
writeToOutputStream(o);

if (outputStream.isReady()) {
subscription.request(1);
Expand All @@ -176,6 +144,45 @@ public void onNext(Object o) {
}
}

private void writeToOutputStream(Object o) throws IOException {
written = true;
if (o instanceof byte[]) {
raw = true;
outputStream.write((byte[]) o);
flushIfReady();
} else if (o instanceof ByteBuffer) {
ByteBuffer buf = (ByteBuffer) o;
try {
raw = true;
outputStream.write(buf.toByteArray());
flushIfReady();
} finally {
if (buf instanceof ReferenceCounted) {
((ReferenceCounted) buf).release();
}
}
} else if (codec != null) {

if (isJson) {
if (first) {
outputStream.write('[');
first = false;
} else {
outputStream.write(',');
}
}
if (outputStream.isReady()) {
if (o instanceof CharSequence) {
outputStream.write(o.toString().getBytes(getCharacterEncoding()));
} else {
byte[] bytes = codec.encode(o);
outputStream.write(bytes);
}
flushIfReady();
}
}
}

private void flushIfReady() throws IOException {
if (outputStream.isReady()) {
outputStream.flush();
Expand All @@ -185,11 +192,37 @@ private void flushIfReady() throws IOException {
@Override
public void onError(Throwable t) {
if (finished.compareAndSet(false, true)) {
emitter.error(t);
if (t instanceof HttpStatusException) {
maybeReportErrorDownstream(t);
} else {
emitter.error(t);
}
subscription.cancel();
}
}

private void maybeReportErrorDownstream(Throwable t) {
HttpStatusException httpStatusException = (HttpStatusException) t;
delegate.setStatus(httpStatusException.getStatus().getCode());
if (!written) {
try {
Object message = httpStatusException.getBody().orElse(httpStatusException.getMessage());
if (message instanceof CharSequence) {
outputStream.write(message.toString().getBytes(getCharacterEncoding()));
flushIfReady();
} else {
writeToOutputStream(message);
}
emitter.complete();
} catch (IOException e) {
emitter.error(e);
}
} else {
// Nothing we can do really...
emitter.error(t);
}
}

@Override
public void onComplete() {
if (finished.compareAndSet(false, true)) {
Expand Down

0 comments on commit e1723f6

Please sign in to comment.