diff --git a/http-server-jetty/src/test/groovy/io/micronaut/servlet/jetty/StreamErrorSpec.groovy b/http-server-jetty/src/test/groovy/io/micronaut/servlet/jetty/StreamErrorSpec.groovy new file mode 100644 index 000000000..614d95a0d --- /dev/null +++ b/http-server-jetty/src/test/groovy/io/micronaut/servlet/jetty/StreamErrorSpec.groovy @@ -0,0 +1,65 @@ +package io.micronaut.servlet.jetty + +import io.micronaut.context.annotation.Property +import io.micronaut.context.annotation.Requires +import io.micronaut.http.HttpHeaders +import io.micronaut.http.HttpRequest +import io.micronaut.http.HttpStatus +import io.micronaut.http.MediaType +import io.micronaut.http.annotation.Controller +import io.micronaut.http.annotation.Get +import io.micronaut.http.client.HttpClient +import io.micronaut.http.client.annotation.Client +import io.micronaut.http.client.exceptions.HttpClientResponseException +import io.micronaut.http.exceptions.HttpStatusException +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import spock.lang.Specification + +@MicronautTest +@Property(name = "spec.name", value = SPEC_NAME) +class StreamErrorSpec extends Specification { + + static final String SPEC_NAME = "StreamErrorSpec" + + @Inject + @Client("/") + HttpClient client; + + void "status error as first item"() { + when: + def response = client.toBlocking().exchange(HttpRequest.GET("/stream-error/status-error-as-first-item"), String) + + then: + def e = thrown(HttpClientResponseException) + e.response.status == HttpStatus.NOT_FOUND + e.response.body() == "foo" + } + + void "immediate status error"() { + when: + def response = client.toBlocking().exchange(HttpRequest.GET("/stream-error/status-error-immediate"), String) + + then: + def e = thrown(HttpClientResponseException) + e.response.status == HttpStatus.NOT_FOUND + e.response.body() == "foo" + } + + @Controller("/stream-error") + @Requires(property = "spec.name", value = SPEC_NAME) + static class StreamController { + + @Get(uri = "/status-error-as-first-item") + Publisher statusErrorAsFirstItem() { + return Flux.error(new HttpStatusException(HttpStatus.NOT_FOUND, (Object) "foo")); + } + + @Get(uri = "/status-error-immediate") + Publisher statusErrorImmediate() { + throw new HttpStatusException(HttpStatus.NOT_FOUND, (Object) "foo"); + } + } +} diff --git a/servlet-engine/src/main/java/io/micronaut/servlet/engine/DefaultServletHttpResponse.java b/servlet-engine/src/main/java/io/micronaut/servlet/engine/DefaultServletHttpResponse.java index b30213a6e..152a7afc7 100644 --- a/servlet-engine/src/main/java/io/micronaut/servlet/engine/DefaultServletHttpResponse.java +++ b/servlet-engine/src/main/java/io/micronaut/servlet/engine/DefaultServletHttpResponse.java @@ -33,6 +33,7 @@ import io.micronaut.http.annotation.Produces; import io.micronaut.http.codec.MediaTypeCodec; import io.micronaut.http.cookie.Cookie; +import io.micronaut.http.exceptions.HttpStatusException; import io.micronaut.http.server.exceptions.InternalServerException; import io.micronaut.servlet.http.ServletHttpResponse; import org.reactivestreams.Publisher; @@ -98,6 +99,7 @@ public Publisher> stream(Publisher dataPublisher) { boolean isJson = contentType.getSubtype().equals("json"); boolean first = true; boolean raw = false; + boolean written = false; @Override public void onSubscribe(Subscription s) { subscription = s; @@ -128,41 +130,7 @@ public void onNext(Object o) { try { if (outputStream.isReady() && !finished.get()) { - if (o instanceof byte[]) { - raw = true; - outputStream.write((byte[]) o); - flushIfReady(); - } else if (o instanceof ByteBuffer) { - ByteBuffer buf = (ByteBuffer) o; - try { - raw = true; - outputStream.write(buf.toByteArray()); - flushIfReady(); - } finally { - if (buf instanceof ReferenceCounted) { - ((ReferenceCounted) buf).release(); - } - } - } else if (codec != null) { - - if (isJson) { - if (first) { - outputStream.write('['); - first = false; - } else { - outputStream.write(','); - } - } - if (outputStream.isReady()) { - if (o instanceof CharSequence) { - outputStream.write(o.toString().getBytes(getCharacterEncoding())); - } else { - byte[] bytes = codec.encode(o); - outputStream.write(bytes); - } - flushIfReady(); - } - } + writeToOutputStream(o); if (outputStream.isReady()) { subscription.request(1); @@ -176,6 +144,45 @@ public void onNext(Object o) { } } + private void writeToOutputStream(Object o) throws IOException { + written = true; + if (o instanceof byte[]) { + raw = true; + outputStream.write((byte[]) o); + flushIfReady(); + } else if (o instanceof ByteBuffer) { + ByteBuffer buf = (ByteBuffer) o; + try { + raw = true; + outputStream.write(buf.toByteArray()); + flushIfReady(); + } finally { + if (buf instanceof ReferenceCounted) { + ((ReferenceCounted) buf).release(); + } + } + } else if (codec != null) { + + if (isJson) { + if (first) { + outputStream.write('['); + first = false; + } else { + outputStream.write(','); + } + } + if (outputStream.isReady()) { + if (o instanceof CharSequence) { + outputStream.write(o.toString().getBytes(getCharacterEncoding())); + } else { + byte[] bytes = codec.encode(o); + outputStream.write(bytes); + } + flushIfReady(); + } + } + } + private void flushIfReady() throws IOException { if (outputStream.isReady()) { outputStream.flush(); @@ -185,11 +192,37 @@ private void flushIfReady() throws IOException { @Override public void onError(Throwable t) { if (finished.compareAndSet(false, true)) { - emitter.error(t); + if (t instanceof HttpStatusException) { + maybeReportErrorDownstream(t); + } else { + emitter.error(t); + } subscription.cancel(); } } + private void maybeReportErrorDownstream(Throwable t) { + HttpStatusException httpStatusException = (HttpStatusException) t; + delegate.setStatus(httpStatusException.getStatus().getCode()); + if (!written) { + try { + Object message = httpStatusException.getBody().orElse(httpStatusException.getMessage()); + if (message instanceof CharSequence) { + outputStream.write(message.toString().getBytes(getCharacterEncoding())); + flushIfReady(); + } else { + writeToOutputStream(message); + } + emitter.complete(); + } catch (IOException e) { + emitter.error(e); + } + } else { + // Nothing we can do really... + emitter.error(t); + } + } + @Override public void onComplete() { if (finished.compareAndSet(false, true)) {