Skip to content

Commit

Permalink
Performance tuning - Write on demand using Pull and remove `toUnicast…
Browse files Browse the repository at this point in the history
…Publisher` (#8)

Motivation:

The conversion between `fs2.Stream` and Reactive Streams `Publisher`
is one of the bottlenecks in that benchmark of #4

Modifications:

- Write on demand instead of `Publisher`

Result:

- Before
  ```
  Running 30s test @ http://127.0.0.1:8080/hello
    12 threads and 400 connections
    Thread Stats   Avg      Stdev     Max   +/- Stdev
      Latency    11.76ms    2.97ms  86.66ms   89.53%
        Req/Sec     1.71k     0.94k    3.64k    52.50%
    611170 requests in 30.02s, 107.62MB read
    Socket errors: connect 155, read 170, write 0, timeout 0
  Requests/sec:  20358.15
  Transfer/sec:      3.58MB
  ```
- After
  ```
  Running 30s test @ http://127.0.0.1:8080/http4s/thread
    12 threads and 400 connections
    Thread Stats   Avg      Stdev     Max   +/- Stdev
      Latency     3.77ms    2.41ms 145.85ms   96.60%
      Req/Sec     5.39k     2.88k   59.63k    60.04%
    1932651 requests in 30.10s, 312.64MB read
    Socket errors: connect 155, read 168, write 0, timeout 0
  Requests/sec:  64207.03
  Transfer/sec:     10.39MB

  Running 30s test @ http://127.0.0.1:8080/http4s/thread
    12 threads and 400 connections
    Thread Stats   Avg      Stdev     Max   +/- Stdev
      Latency     3.92ms    1.30ms  63.05ms   95.09%
      Req/Sec     5.11k     2.48k   10.36k    61.56%
    1831547 requests in 30.03s, 296.28MB read
    Socket errors: connect 155, read 160, write 0, timeout 0
  Requests/sec:  60989.03
  Transfer/sec:      9.87MBow it will be ignored.
  ```
  • Loading branch information
ikhoon authored Sep 2, 2020
1 parent 1cda05d commit e0b24a6
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 http4s.org
* Copyright 2020-2020 http4s.org
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand All @@ -24,7 +24,7 @@ object ArmeriaExampleApp {
ArmeriaServerBuilder[F]
.bindHttp(8080)
.withMeterRegistry(registry)
.withHttpRoutes("/http4s", ExampleService[F].routes)
.withHttpRoutes("/http4s", ExampleService[F].routes())
.withHttpService("/metrics", new PrometheusExpositionService(prometheusRegistry))
.withDecorator(
MetricCollectingService.newDecorator(MeterIdPrefixFunction.ofDefault("server")))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 http4s.org
* Copyright 2020-2020 http4s.org
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 http4s.org
* Copyright 2020-2020 http4s.org
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import cats.implicits._
import com.linecorp.armeria.common.{
HttpData,
HttpHeaderNames,
HttpHeaders,
HttpMethod,
HttpRequest,
HttpResponse,
HttpResponseWriter,
ResponseHeaders
}
import com.linecorp.armeria.common.util.Version
Expand All @@ -23,10 +26,9 @@ import io.chrisdavenport.vault.{Vault, Key => VaultKey}
import fs2._
import fs2.interop.reactivestreams._
import java.net.InetSocketAddress
import java.util.concurrent.CompletableFuture
import org.http4s.internal.CollectionCompat.CollectionConverters._
import ArmeriaHttp4sHandler.{RightUnit, defaultVault, toHttp4sMethod}
import org.http4s.internal.unsafeRunAsync
import ArmeriaHttp4sHandler.defaultVault
import org.http4s.server.{
DefaultServiceErrorHandler,
SecureSession,
Expand All @@ -44,55 +46,79 @@ private[armeria] class ArmeriaHttp4sHandler[F[_]](
)(implicit F: ConcurrentEffect[F])
extends HttpService {

val prefixLength = if (prefix.endsWith("/")) prefix.length - 1 else prefix.length
// micro-optimization: unwrap the service and call its .run directly
private val serviceFn: Request[F] => F[Response[F]] = service.run

override def serve(ctx: ServiceRequestContext, req: HttpRequest): HttpResponse = {
implicit val ec = ExecutionContext.fromExecutor(ctx.eventLoop())
val future = new CompletableFuture[HttpResponse]()
unsafeRunAsync(toRequest(ctx, req).fold(onParseFailure, handleRequest)) {
case Right(res) =>
IO.pure(discardReturn(future.complete(res)))
val responseWriter = HttpResponse.streaming()
unsafeRunAsync(
toRequest(ctx, req)
.fold(onParseFailure(_, responseWriter), handleRequest(_, responseWriter))) {
case Right(_) =>
IO.unit
case Left(ex) =>
IO.pure(discardReturn(future.completeExceptionally(ex)))
discardReturn(responseWriter.close(ex))
IO.unit
}
HttpResponse.from(future)
responseWriter
}

private def handleRequest(request: Request[F]): F[HttpResponse] =
private def handleRequest(request: Request[F], writer: HttpResponseWriter): F[Unit] =
serviceFn(request)
.recoverWith(serviceErrorHandler(request))
.map(toHttpResponse)
.flatMap(toHttpResponse(_, writer))

private def onParseFailure(parseFailure: ParseFailure): F[HttpResponse] = {
private def onParseFailure(parseFailure: ParseFailure, writer: HttpResponseWriter): F[Unit] = {
val response = Response[F](Status.BadRequest).withEntity(parseFailure.sanitized)
F.pure(toHttpResponse(response))
toHttpResponse(response, writer)
}

/** Converts http4s' [[Response]] to Armeria's [[HttpResponse]]. */
private def toHttpResponse(response: Response[F]): HttpResponse = {
val headers = Stream(toResponseHeaders(response.headers, response.status.some))
val body: Stream[F, HttpData] = response.body.chunks.map { chunk =>
val bytes = chunk.toBytes
HttpData.copyOf(bytes.values, bytes.offset, bytes.length)
private def toHttpResponse(response: Response[F], writer: HttpResponseWriter): F[Unit] = {
val headers = toHttpHeaders(response.headers, response.status.some)
writer.write(headers)
val body = response.body
if (body == EmptyBody) {
writer.close()
F.unit
} else
writeOnDemand(writer, body).stream
.onFinalize(maybeWriteTrailersAndClose(writer, response))
.compile
.drain
}

private def maybeWriteTrailersAndClose(writer: HttpResponseWriter, response: Response[F]) =
response.trailerHeaders.map { trailers =>
if (!trailers.isEmpty)
writer.write(toHttpHeaders(trailers, None))
writer.close()
}
val trailers = Stream
.eval(response.trailerHeaders)
.flatMap { trailers =>
if (trailers.isEmpty)
Stream.empty
else
Stream(toResponseHeaders(trailers, None))
}

HttpResponse.of((headers ++ body ++ trailers).toUnicastPublisher)
}
private def writeOnDemand(
writer: HttpResponseWriter,
body: Stream[F, Byte]): Pull[F, INothing, Unit] =
body.pull.uncons.flatMap {
case Some((head, tail)) =>
val bytes = head.toBytes
writer.write(HttpData.wrap(bytes.values, bytes.offset, bytes.length))
if (tail == Stream.empty)
Pull.done
else
Pull.eval(F.async[Unit] { cb =>
discardReturn(writer.whenConsumed().thenRun(() => cb(RightUnit)))
}) >> writeOnDemand(writer, tail)
case None =>
Pull.done
}

/** Converts Armeria's [[HttpRequest]] to http4s' [[Request]]. */
private def toRequest(ctx: ServiceRequestContext, req: HttpRequest): ParseResult[Request[F]] = {
val path = req.path()
for {
method <- Method.fromString(req.method().name())
method <- toHttp4sMethod(req.method())
uri <- Uri.requestTarget(path)
} yield Request(
method = method,
Expand All @@ -110,9 +136,9 @@ private[armeria] class ArmeriaHttp4sHandler[F[_]](
)
}

/** Converts http4s' [[Headers]] to Armeria's [[ResponseHeaders]]. */
private def toResponseHeaders(headers: Headers, status: Option[Status]): ResponseHeaders = {
val builder = status.fold(ResponseHeaders.builder())(s => ResponseHeaders.builder(s.code))
/** Converts http4s' [[Headers]] to Armeria's [[HttpHeaders]]. */
private def toHttpHeaders(headers: Headers, status: Option[Status]): HttpHeaders = {
val builder = status.fold(HttpHeaders.builder())(s => ResponseHeaders.builder(s.code))

for (header <- headers.toList)
builder.add(header.name.toString, header.value)
Expand All @@ -139,7 +165,7 @@ private[armeria] class ArmeriaHttp4sHandler[F[_]](
private def requestAttributes(ctx: ServiceRequestContext): Vault = {
val secure = ctx.sessionProtocol().isTls
defaultVault
.insert(Request.Keys.PathInfoCaret, prefix.length)
.insert(Request.Keys.PathInfoCaret, prefixLength)
.insert(ServiceRequestContexts.Key, ctx)
.insert(
Request.Keys.ConnectionInfo,
Expand Down Expand Up @@ -182,6 +208,32 @@ private[armeria] object ArmeriaHttp4sHandler {
ServerSoftware("armeria", Some(Version.get("armeria").artifactVersion()))

private val defaultVault: Vault = Vault.empty.insert(Request.Keys.ServerSoftware, serverSoftware)

private val OPTIONS: ParseResult[Method] = Right(Method.OPTIONS)
private val GET: ParseResult[Method] = Right(Method.GET)
private val HEAD: ParseResult[Method] = Right(Method.HEAD)
private val POST: ParseResult[Method] = Right(Method.POST)
private val PUT: ParseResult[Method] = Right(Method.PUT)
private val PATCH: ParseResult[Method] = Right(Method.PATCH)
private val DELETE: ParseResult[Method] = Right(Method.DELETE)
private val TRACE: ParseResult[Method] = Right(Method.TRACE)
private val CONNECT: ParseResult[Method] = Right(Method.CONNECT)

private val RightUnit = Right(())

private def toHttp4sMethod(method: HttpMethod): ParseResult[Method] =
method match {
case HttpMethod.OPTIONS => OPTIONS
case HttpMethod.GET => GET
case HttpMethod.HEAD => HEAD
case HttpMethod.POST => POST
case HttpMethod.PUT => PUT
case HttpMethod.PATCH => PATCH
case HttpMethod.DELETE => DELETE
case HttpMethod.TRACE => TRACE
case HttpMethod.CONNECT => CONNECT
case HttpMethod.UNKNOWN => Left(ParseFailure("Invalid method", method.name()))
}
}

object ServiceRequestContexts {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.linecorp.armeria.common.HttpStatus
import com.linecorp.armeria.server.logging.{ContentPreviewingService, LoggingService}
import java.net.{HttpURLConnection, URL}
import java.nio.charset.StandardCharsets
import org.http4s.HttpRoutes
import org.http4s.{Header, Headers, HttpRoutes}
import org.http4s.dsl.io._
import org.http4s.multipart.Multipart
import org.scalatest.funsuite.AnyFunSuite
Expand All @@ -35,6 +35,10 @@ class ArmeriaServerBuilderSpec extends AnyFunSuite with IOServerFixture with Mat
case req @ POST -> Root / "echo" =>
Ok(req.body)

case GET -> Root / "trailers" =>
Ok("Hello").map(response =>
response.withTrailerHeaders(IO(Headers.of(Header("my-trailers", "foo")))))

case _ -> Root / "never" =>
IO.never

Expand Down Expand Up @@ -87,6 +91,12 @@ class ArmeriaServerBuilderSpec extends AnyFunSuite with IOServerFixture with Mat
.contentUtf8() must startWith(input)
}

test("be able to send trailers") {
val response = client.get("/service/trailers").aggregate().join()
response.status() must be(HttpStatus.OK)
response.trailers().get("my-trailers") must be("foo")
}

test("return a 503 if the server doesn't respond") {
val noTimeoutClient = WebClient
.builder(s"http://127.0.0.1:${httpPort.get}")
Expand Down

0 comments on commit e0b24a6

Please sign in to comment.