diff --git a/kyo-core/jvm/src/main/scala/kyo/Path.scala b/kyo-core/jvm/src/main/scala/kyo/Path.scala index 779e37f05..96fdbe2e2 100644 --- a/kyo-core/jvm/src/main/scala/kyo/Path.scala +++ b/kyo-core/jvm/src/main/scala/kyo/Path.scala @@ -152,7 +152,7 @@ class Path private (val path: List[String]) derives CanEqual: end if } - private def readLoop[A, ReadTpe, Res]( + private[kyo] def readLoop[A, ReadTpe, Res]( acquire: Res < IO, release: Res => Unit < Async, readOnce: Res => Maybe[ReadTpe] < IO, @@ -358,14 +358,17 @@ object Path: case s: String => List(s) case p: Path => p.path } - val javaPath = if flattened.isEmpty then Paths.get("") else Paths.get(flattened.head, flattened.tail*) - val normalizedPath = javaPath.normalize().toString - new Path(if normalizedPath.isEmpty then Nil else normalizedPath.split(File.separator).toList) + val javaPath = if flattened.isEmpty then Paths.get("") else Paths.get(flattened.head, flattened.tail*) + fromJavaPath(javaPath) end apply def apply(path: Part*): Path = apply(path.toList) + def fromJavaPath(path: JPath): Path = + val normalizedPath = path.normalize().toString + new Path(if normalizedPath.isEmpty then Nil else normalizedPath.split(File.separator).toList) + case class BasePaths( cache: Path, config: Path, diff --git a/kyo-sttp/jvm/src/main/scala/kyo/capabilities/KyoStreams.scala b/kyo-sttp/jvm/src/main/scala/kyo/capabilities/KyoStreams.scala new file mode 100644 index 000000000..1698b55b2 --- /dev/null +++ b/kyo-sttp/jvm/src/main/scala/kyo/capabilities/KyoStreams.scala @@ -0,0 +1,14 @@ +package kyo.capabilities + +import kyo.Abort +import kyo.Async +import kyo.IO +import kyo.Resource +import kyo.Stream +import sttp.capabilities.Streams + +trait KyoStreams extends Streams[KyoStreams]: + override type BinaryStream = Stream[Byte, Async & Resource] + override type Pipe[A, B] = Stream[A, Async & Resource] => Stream[B, Async & Resource] + +object KyoStreams extends KyoStreams diff --git a/kyo-sttp/jvm/src/main/scala/kyo/interop/Adapter.scala b/kyo-sttp/jvm/src/main/scala/kyo/interop/Adapter.scala new file mode 100644 index 000000000..520c264a6 --- /dev/null +++ b/kyo-sttp/jvm/src/main/scala/kyo/interop/Adapter.scala @@ -0,0 +1,401 @@ +package kyo.interop + +import java.util.concurrent.Flow.* +import kyo.* +import kyo.Emit.Ack +import kyo.Maybe.* +import kyo.Result.* +import kyo.kernel.ArrowEffect +import scala.annotation.tailrec + +object Adapters: + + private trait KyoSubscriber[A] extends Subscriber[A]: + def isDone: Boolean < IO + def await: Unit < (Async & Abort[SubscriberDone]) + def interrupt: Unit < IO + end KyoSubscriber + + abstract class SubscriberDone + case object SubscriberDone extends SubscriberDone + + private def makeSubscriber[A]( + bufferSize: Int + )( + using Frame + ): (KyoSubscriber[A], Fiber.Promise[Nothing, (Subscription, Queue[A])]) < IO = + IO.Unsafe.apply { + val queue = Queue.Unsafe.init[A](capacity = bufferSize, access = Access.MultiProducerMultiConsumer) + val onSubscribePromise = Fiber.Promise.Unsafe.init[Nothing, (Subscription, Queue[A])]() + val kyoSubscriber = new KyoSubscriber[A]: + val isSubscribedOrInterrupted = AtomicBoolean.Unsafe.init(false) + + // volatile is enough, as these below can only be set by the producer thread + @volatile + private var _isDone: Maybe[Result[Throwable, Unit]] = Maybe.empty + @volatile + private var _waitPromise: Maybe[Fiber.Promise.Unsafe[SubscriberDone, Unit]] = Maybe.empty + + override def isDone: Boolean < IO = IO(_isDone.isDefined) + + override def interrupt: Unit < IO = isSubscribedOrInterrupted.safe.set(true) + + private def failDone(doneResult: Result[Throwable, Unit]): Nothing < Abort[SubscriberDone] = + doneResult.fold { error => + Abort.panic(error.getFailure) + } { _ => + Abort.fail(SubscriberDone) + } + end failDone + + override def await: Unit < (Async & Abort[SubscriberDone]) = + _isDone.fold { + val awaitPromise = Fiber.Promise.Unsafe.init[SubscriberDone, Unit]() + _waitPromise = Maybe(awaitPromise) + queue.empty().fold { _ => + // subscriber is not done, but queue is closed + _waitPromise = Maybe.empty + fail(new IllegalStateException("Queue is closed")) + await + } { isEmpty => + if isEmpty then + _isDone.fold { + // queue is empty, we parks until notified + awaitPromise.safe.useResult { + case Success(_) => () + case Error(_: SubscriberDone) => Abort.fail(SubscriberDone) + case Error(t: Throwable) => Abort.panic(t) + } + } { doneResult => + // The producer has cancelled or errored in the meantime + _waitPromise = Maybe.empty + failDone(doneResult) + } + else + // An element has arrived in the meantime, we do not need to start waiting + _waitPromise = Maybe.empty + IO.unit + } + } { doneResult => + queue.empty().fold { _ => + // queue is closed + failDone(doneResult) + } { isEmpty => + if isEmpty then + failDone(doneResult) + else + // There are still elements in queue, we flush them first + IO.unit + } + } + end await + + private def fail(t: Throwable): Unit = + _isDone = Maybe(Result.fail(t)) + _waitPromise.foreach { p => + p.completeDiscard(Result.panic(t)) + } + end fail + + private def failNullAndThrow(msg: String): Nothing = + val e = new NullPointerException(msg) + fail(e) + throw e + end failNullAndThrow + + override def onSubscribe(s: Subscription): Unit = + if isNull(s) then + val e = new NullPointerException("s was null in onSubscribe") + onSubscribePromise.completeDiscard(Result.panic(e)) + throw e + else + val shouldCancel = isSubscribedOrInterrupted.getAndSet(true) + if shouldCancel then + s.cancel() + else + onSubscribePromise.completeDiscard(Result.success(s -> queue.safe)) + end if + + override def onNext(a: A): Unit = + if isNull(a) then + failNullAndThrow("a was null in onNext") + else + queue.offer(a) match + case Success(_) => _waitPromise.foreach { p => + p.completeDiscard(Result.success(())) + } + case _ => fail(new IllegalStateException("Queue is closed")) + + override def onError(t: Throwable): Unit = + if isNull(t) then + failNullAndThrow("t was null in onNext") + else + fail(t) + + override def onComplete(): Unit = + _isDone = Maybe(Result.success(())) + _waitPromise.foreach { p => + p.completeDiscard(Result.fail(SubscriberDone)) + } + end onComplete + + (kyoSubscriber, onSubscribePromise.safe) + } + end makeSubscriber + + private def createEmit[A]( + subscription: Subscription, + queue: Queue[A], + doAwait: () => Unit < (Async & Abort[SubscriberDone]), + checkDone: () => Boolean < IO + )( + using + Tag[A], + Frame + ): Ack < (Emit[Chunk[A]] & Async) = + def request(n: Long): Unit < IO = IO(subscription.request(n)) + + def stop: Unit < IO = IO(subscription.cancel()) + + def pullLoop(initialRequested: Long) = + Loop[Long, Ack, Emit[Chunk[A]] & Async](initialRequested) { requested => + for + pollSize <- IO(Math.min(requested, Int.MaxValue)) + seqResult <- Abort.run[Closed].apply[Seq[A], (Emit[Chunk[A]] & Async), Nothing](queue.drain) + outcome <- seqResult match + case Success(seq) => + if seq.isEmpty then + Abort.run[SubscriberDone](doAwait()).map { + case Success(_) => Loop.continue[Long, Ack, Emit[Chunk[A]] & Async](requested) + case _ => stop.map(_ => Loop.done[Long, Ack](Ack.Stop)) + } + else + Emit.andMap[Chunk[A], (Ack, Long), Emit[Chunk[A]] & Async](Chunk.from(seq)) { + case Ack.Stop => + for + leftOver <- queue.close + chunk = leftOver.map(Chunk.from(_)).getOrElse(Chunk.empty[A]) + _ <- stop + yield (Ack.Stop, -1L) + case continue: Ack => + checkDone().map { done => + if (requested == seq.size) && !done then + request(queue.capacity).map(_ => continue -> initialRequested) + else + continue -> (requested - seq.size) + } + }.map { (ack, nextRequested) => + ack match + case Ack.Stop => Loop.done[Long, Ack](ack) + case continue: Ack => Loop.continue[Long, Ack, Emit[Chunk[A]] & Async](nextRequested) + } + case _ => stop.map(_ => Loop.done[Long, Ack](Ack.Stop)) + yield outcome + } + + val initialRequested = queue.capacity.longValue + request(initialRequested).map(_ => pullLoop(initialRequested)) + end createEmit + + def publisherToStream[A]( + publisher: => Publisher[A], + bufferSize: Int = 16 + )( + using + Tag[A], + Frame + ): Stream[A, Async & Resource] = + val emit: Ack < (Emit[Chunk[A]] & Async & Resource) = + for + (subscriber, onSubscribePromise) <- Resource.acquireRelease(makeSubscriber[A](bufferSize)) { + (subscriber, onSubscribePromise) => + for + _ <- subscriber.interrupt + _ <- onSubscribePromise.interrupt + yield () + } + _ = publisher.subscribe(subscriber) + (subscription, queue) <- Resource.acquireRelease(onSubscribePromise.get) { + (subscription, queue) => + for + _ <- IO(subscription.cancel()) + _ <- queue.close + yield () + } + ack <- createEmit[A](subscription, queue, () => subscriber.await, () => subscriber.isDone) + yield ack + Stream(emit) + end publisherToStream + + private case class KyoSubscriptionState( + requested: Long, + maybeAwaitPromise: Maybe[(Int, Fiber.Promise[Unit, Int])] + ) derives CanEqual + + private class KyoSubscription[A](subscriber: Subscriber[A])(using Frame, AllowUnsafe) extends Subscription: + + private val initialState = KyoSubscriptionState(0L, Absent) + private val canceled = KyoSubscriptionState(-1L, Absent) + + private def requested(n: Long) = KyoSubscriptionState(n, Absent) + + private def awaiting(n: Int, p: Fiber.Promise[Unit, Int]) = KyoSubscriptionState(0L, Present(n -> p)) + + private val state: AtomicRef[KyoSubscriptionState] = AtomicRef.Unsafe.init(initialState).safe + + def offer(n: Int): Maybe[Int] < Async = + state.get.map { initialState => + Loop[KyoSubscriptionState, Maybe[Int], Async](initialState) { curState => + curState match + case `canceled` => + state.cas(curState, canceled).map { success => + if success then + Loop.done[KyoSubscriptionState, Maybe[Int]](Maybe.empty[Int]) + else + state.get.map(nextState => Loop.continue[KyoSubscriptionState, Maybe[Int], Async](nextState)) + } + case KyoSubscriptionState(0L, _) => + for + promise <- Fiber.Promise.init[Unit, Int] + nextState = awaiting(n, promise) + success <- state.cas(curState, nextState) + outcome <- if success then + promise.useResult { + case Success(accepted) => Maybe(accepted) + case _ => Maybe.empty[Int] + }.map(maybeAccepted => Loop.done[KyoSubscriptionState, Maybe[Int]](maybeAccepted)) + else + state.get.map(nextState => Loop.continue[KyoSubscriptionState, Maybe[Int], Async](nextState)) + yield outcome + case KyoSubscriptionState(requestedCount, _) => + val newRequestedCount = Math.max(requestedCount - n, 0L) + val accepted = Math.min(requestedCount, n.toLong).toInt + val nextState = requested(newRequestedCount) + state.cas(curState, nextState).map { success => + if success then + Loop.done[KyoSubscriptionState, Maybe[Int]](Maybe(accepted)) + else + state.get.map(nextState => Loop.continue[KyoSubscriptionState, Maybe[Int], Async](nextState)) + } + + } + } + end offer + + def isCanceled: Boolean < IO = state.get.map(_.requested < 0) + + override def request(n: Long): Unit = + if n <= 0 then subscriber.onError(new IllegalArgumentException("non-positive subscription request")) + val computation: Unit < IO = state.get.map { initialState => + Loop[KyoSubscriptionState, Unit, IO](initialState) { curState => + curState match + case `canceled` => + state.cas(curState, canceled).map { success => + if success then + Loop.done[KyoSubscriptionState, Unit](()) + else + state.get.map(nextState => Loop.continue[KyoSubscriptionState, Unit, IO](nextState)) + } + case KyoSubscriptionState(requestedCount, Present(offered -> awaitPromise)) => + val newRequestedCount = requestedCount + n + val accepted = Math.min(offered.toLong, newRequestedCount) + val remaining = newRequestedCount - accepted + val nextState = requested(remaining) + state.cas(curState, nextState).map { success => + if success then + awaitPromise.completeDiscard(Success(accepted.toInt)).map(_ => + Loop.done[KyoSubscriptionState, Unit](()) + ) + else + state.get.map(nextState => Loop.continue[KyoSubscriptionState, Unit, IO](nextState)) + } + case KyoSubscriptionState(requestedCount, _) if ((Long.MaxValue - n) > requestedCount) => + val nextState = requested(requestedCount + n) + state.cas(curState, nextState).map { success => + if success then + Loop.done[KyoSubscriptionState, Unit](()) + else + state.get.map(nextState => Loop.continue[KyoSubscriptionState, Unit, IO](nextState)) + } + case _ => + val nextState = requested(Long.MaxValue) + state.cas(curState, nextState).map { success => + if success then + Loop.done[KyoSubscriptionState, Unit](()) + else + state.get.map(nextState => Loop.continue[KyoSubscriptionState, Unit, IO](nextState)) + } + } + } + IO.Unsafe.runLazy(computation).eval + end request + + override def cancel(): Unit = + val computation: Unit < IO = + state.getAndSet(canceled).map { oldState => + oldState.maybeAwaitPromise.fold { + IO.unit + } { (_, awaitPromise) => + awaitPromise.completeDiscard(Result.fail(())) + } + } + IO.Unsafe.runLazy(computation).eval + end cancel + + end KyoSubscription + + def streamToPublisher[A]( + stream: Stream[A, Async & Resource] + )( + using + Tag[Emit[Chunk[A]]], + Frame + ): Publisher[A] < IO = + + def drainStreamComputation( + emit: Ack < (Emit[Chunk[A]] & Async & Resource), + subscription: KyoSubscription[? >: A], + f: Chunk[A] => Unit < IO + )( + using + tag: Tag[Emit[Chunk[A]]], + frame: Frame + ): Unit < (Async & Resource) = + ArrowEffect.handle(tag, emit.unit)( + [C] => + (input, cont) => + Loop[Chunk[A], Ack, Async & Resource](input) { curChunk => + if curChunk.nonEmpty then + for + maybeAccepted <- subscription.offer(curChunk.size) + outcome <- maybeAccepted match + case Present(accepted) => + f(curChunk.take(accepted)).andThen { + Loop.continue[Chunk[A], Ack, Async & Resource](curChunk.drop(accepted)) + } + case _ => + IO(Loop.done[Chunk[A], Ack](Ack.Stop)) + yield outcome + else + Loop.done(Ack.Continue()) + }.map(ack => cont(ack)) + ) + + IO.Unsafe { + new Publisher[A]: + override def subscribe(subscriber: Subscriber[? >: A]): Unit = + if isNull(subscriber) then + throw new NullPointerException("Subscriber must not be null.") + else + val computation: Unit < (Async & Resource) = + for + subscription <- Resource.acquireRelease(new KyoSubscription(subscriber))(_.cancel()) + _ = subscriber.onSubscribe(subscription) + _ <- drainStreamComputation(stream.emit, subscription, chunk => chunk.foreach(subscriber.onNext(_))) + _ <- subscription.isCanceled.map(isCanceled => if !isCanceled then subscriber.onComplete()) + yield () + + discard(IO.Unsafe.run(Abort.run(Async.runAndBlock(Duration.Infinity)(Resource.run(computation)))).eval) + } + end streamToPublisher +end Adapters diff --git a/kyo-sttp/jvm/src/main/scala/sttp/client3/HttpClientKyoBackend.scala b/kyo-sttp/jvm/src/main/scala/sttp/client3/HttpClientKyoBackend.scala index 7a265cb7e..b4c1670ab 100644 --- a/kyo-sttp/jvm/src/main/scala/sttp/client3/HttpClientKyoBackend.scala +++ b/kyo-sttp/jvm/src/main/scala/sttp/client3/HttpClientKyoBackend.scala @@ -4,17 +4,25 @@ import java.io.InputStream import java.io.UnsupportedEncodingException import java.net.http.HttpClient import java.net.http.HttpRequest +import java.net.http.HttpRequest.BodyPublisher +import java.net.http.HttpRequest.BodyPublishers import java.net.http.HttpResponse import java.net.http.HttpResponse.BodyHandlers +import java.nio.ByteBuffer import java.util.concurrent.Executor +import java.util.concurrent.Flow.Publisher import java.util.zip.GZIPInputStream import java.util.zip.InflaterInputStream +import java.util as ju import kyo.* +import kyo.capabilities.KyoStreams import kyo.internal.KyoSttpMonad import kyo.internal.KyoSttpMonad.* +import kyo.interop.Adapters +import scala.collection.mutable.ArrayBuffer import sttp.capabilities.WebSockets import sttp.client3.HttpClientBackend.EncodingHandler -import sttp.client3.HttpClientFutureBackend.InputStreamEncodingHandler +import sttp.client3.internal.* import sttp.client3.internal.NoStreams import sttp.client3.internal.emptyInputStream import sttp.client3.internal.httpclient.* @@ -27,8 +35,8 @@ class HttpClientKyoBackend private ( client: HttpClient, closeClient: Boolean, customizeRequest: HttpRequest => HttpRequest, - customEncodingHandler: InputStreamEncodingHandler -) extends HttpClientAsyncBackend[M, Nothing, WebSockets, InputStream, InputStream]( + customEncodingHandler: EncodingHandler[KyoStreams.BinaryStream] +) extends HttpClientAsyncBackend[M, KyoStreams, WebSockets, Publisher[ju.List[ByteBuffer]], KyoStreams.BinaryStream]( client, KyoSttpMonad, closeClient, @@ -36,25 +44,11 @@ class HttpClientKyoBackend private ( customEncodingHandler ): - override val streams: NoStreams = NoStreams - - override protected val bodyToHttpClient = - new BodyToHttpClient[KyoSttpMonad.M, Nothing]: - override val streams: NoStreams = NoStreams - override given monad: MonadError[KyoSttpMonad.M] = KyoSttpMonad - override def streamToPublisher(stream: Nothing) = - stream - - override protected val bodyFromHttpClient = - new InputStreamBodyFromHttpClient[KyoSttpMonad.M, Nothing]: - override def inputStreamToStream(is: InputStream) = - KyoSttpMonad.error(new IllegalStateException("Streaming is not supported")) - override val streams: NoStreams = NoStreams - override given monad: MonadError[KyoSttpMonad.M] = KyoSttpMonad - override def compileWebSocketPipe( - ws: WebSocket[KyoSttpMonad.M], - pipe: streams.Pipe[WebSocketFrame.Data[?], WebSocketFrame] - ) = pipe + override val streams: KyoStreams = KyoStreams + + override protected val bodyToHttpClient = new KyoBodyToHttpClient + + override protected val bodyFromHttpClient = new KyoBodyFromHttpClient override protected def createSimpleQueue[A] = Channel.initWith[A](Int.MaxValue)(new KyoSimpleQueue[A](_)) @@ -62,18 +56,26 @@ class HttpClientKyoBackend private ( override protected def createSequencer = Meter.initMutex.map(new KyoSequencer(_)) - override protected def standardEncoding: (InputStream, String) => InputStream = { - case (body, "gzip") => new GZIPInputStream(body) - case (body, "deflate") => new InflaterInputStream(body) - case (_, ce) => throw new UnsupportedEncodingException(s"Unsupported encoding: $ce") + override protected def standardEncoding: (KyoStreams.BinaryStream, String) => KyoStreams.BinaryStream = { + case (_, ce) => throw new UnsupportedEncodingException(s"Unsupported encoding: $ce") } - override protected def createBodyHandler: HttpResponse.BodyHandler[InputStream] = - BodyHandlers.ofInputStream() - - override protected def bodyHandlerBodyToBody(p: InputStream): InputStream = p - - override protected def emptyBody(): InputStream = emptyInputStream() + override protected def createBodyHandler: HttpResponse.BodyHandler[Publisher[ju.List[ByteBuffer]]] = + BodyHandlers.ofPublisher() + + override protected def bodyHandlerBodyToBody(p: Publisher[ju.List[ByteBuffer]]): KyoStreams.BinaryStream = + Adapters.publisherToStream(p).mapChunk { chunkList => + val builder = ArrayBuffer.newBuilder[Byte] + chunkList.foreach { list => + val iterator = list.iterator() + while iterator.hasNext() do + val bytes = iterator.next().safeRead() + builder ++= bytes + } + Chunk.from(builder.result().toArray) + } + + override protected def emptyBody(): KyoStreams.BinaryStream = Stream.empty[Byte] end HttpClientKyoBackend object HttpClientKyoBackend: @@ -84,7 +86,7 @@ object HttpClientKyoBackend: client: HttpClient, closeClient: Boolean, customizeRequest: HttpRequest => HttpRequest, - customEncodingHandler: InputStreamEncodingHandler + customEncodingHandler: EncodingHandler[KyoStreams.BinaryStream] ): SttpBackend[KyoSttpMonad.M, WebSockets] = new FollowRedirectsBackend( new HttpClientKyoBackend( @@ -98,7 +100,7 @@ object HttpClientKyoBackend: def apply( options: SttpBackendOptions = SttpBackendOptions.Default, customizeRequest: HttpRequest => HttpRequest = identity, - customEncodingHandler: InputStreamEncodingHandler = PartialFunction.empty, + customEncodingHandler: EncodingHandler[KyoStreams.BinaryStream] = PartialFunction.empty, executor: Option[Executor] = Some(r => r.run()) ): SttpBackend[KyoSttpMonad.M, WebSockets] = HttpClientKyoBackend( @@ -111,7 +113,7 @@ object HttpClientKyoBackend: def usingClient( client: HttpClient, customizeRequest: HttpRequest => HttpRequest = identity, - customEncodingHandler: InputStreamEncodingHandler = PartialFunction.empty + customEncodingHandler: EncodingHandler[KyoStreams.BinaryStream] = PartialFunction.empty ): SttpBackend[KyoSttpMonad.M, WebSockets] = HttpClientKyoBackend( client, diff --git a/kyo-sttp/jvm/src/main/scala/sttp/client3/KyoBodyFromHttpClient.scala b/kyo-sttp/jvm/src/main/scala/sttp/client3/KyoBodyFromHttpClient.scala new file mode 100644 index 000000000..877722a64 --- /dev/null +++ b/kyo-sttp/jvm/src/main/scala/sttp/client3/KyoBodyFromHttpClient.scala @@ -0,0 +1,103 @@ +package sttp.client3 + +import kyo.* +import kyo.Emit.Ack +import kyo.Result.Success +import kyo.capabilities.* +import kyo.capabilities.KyoStreams +import kyo.internal.KyoSttpMonad +import kyo.internal.KyoSttpMonad.* +import kyo.sink +import sttp.capabilities.Streams +import sttp.client3.internal.BodyFromResponseAs +import sttp.client3.internal.SttpFile +import sttp.client3.internal.httpclient.BodyFromHttpClient +import sttp.client3.ws.GotAWebSocketException +import sttp.client3.ws.NotAWebSocketException +import sttp.model.ResponseMetadata +import sttp.monad.MonadError +import sttp.ws.WebSocket +import sttp.ws.WebSocketClosed +import sttp.ws.WebSocketFrame + +final class KyoBodyFromHttpClient extends BodyFromHttpClient[KyoSttpMonad.M, KyoStreams, KyoStreams.BinaryStream]: + override val streams: KyoStreams = KyoStreams + override given monad: MonadError[KyoSttpMonad.M] = KyoSttpMonad + + override protected def bodyFromResponseAs + : BodyFromResponseAs[KyoSttpMonad.M, KyoStreams.BinaryStream, WebSocket[KyoSttpMonad.M], KyoStreams.BinaryStream] = + new BodyFromResponseAs[KyoSttpMonad.M, KyoStreams.BinaryStream, WebSocket[KyoSttpMonad.M], KyoStreams.BinaryStream]: + override protected def withReplayableBody( + response: KyoStreams.BinaryStream, + replayableBody: Either[Array[Byte], SttpFile] + ): KyoSttpMonad.M[KyoStreams.BinaryStream] = + replayableBody match + case Left(byteArray) => Stream.init(Chunk.from(byteArray)) + case Right(file) => IO(Path.fromJavaPath(file.toPath)).map(_.readBytesStream) + override protected def regularIgnore(response: KyoStreams.BinaryStream): KyoSttpMonad.M[Unit] = + Resource.run(response.runDiscard) + + override protected def regularAsByteArray( + response: KyoStreams.BinaryStream + ): KyoSttpMonad.M[Array[Byte]] = + Resource.run(response.run.map(_.toList.toArray)) + + override protected def regularAsFile( + response: KyoStreams.BinaryStream, + file: SttpFile + ): KyoSttpMonad.M[SttpFile] = IO(Path.fromJavaPath(file.toPath)) + .map(path => Resource.run(response.sink(path))) + .map(_ => file) + + override protected def regularAsStream( + response: KyoStreams.BinaryStream + ): KyoSttpMonad.M[(KyoStreams.BinaryStream, () => KyoSttpMonad.M[Unit])] = + IO(response, () => Resource.run(response.runDiscard)) + + override protected def handleWS[T]( + responseAs: WebSocketResponseAs[T, ?], + meta: ResponseMetadata, + ws: WebSocket[KyoSttpMonad.M] + ): KyoSttpMonad.M[T] = bodyFromWs(responseAs, ws, meta) + + override protected def cleanupWhenNotAWebSocket( + response: KyoStreams.BinaryStream, + e: NotAWebSocketException + ): KyoSttpMonad.M[Unit] = Resource.run(response.runDiscard) + + override protected def cleanupWhenGotWebSocket( + response: WebSocket[KyoSttpMonad.M], + e: GotAWebSocketException + ): KyoSttpMonad.M[Unit] = response.close() + end bodyFromResponseAs + + override def compileWebSocketPipe( + ws: WebSocket[KyoSttpMonad.M], + pipe: KyoStreams.Pipe[WebSocketFrame.Data[?], WebSocketFrame] + ): KyoSttpMonad.M[Unit] = + def receiveFrame: Result[WebSocketClosed, WebSocketFrame] < Async = + Abort.run(Abort.catching[WebSocketClosed](ws.receive())) + + def emitFromWebSocket: Ack < (Emit[Chunk[WebSocketFrame.Data[?]]] & Async) = + Loop[Unit, Ack, Emit[Chunk[WebSocketFrame.Data[?]]] & Async](()) { _ => + receiveFrame.map { + case Success(WebSocketFrame.Close(_, _)) => Loop.done[Unit, Ack](Ack.Stop) + case Success(WebSocketFrame.Ping(payload)) => + ws.send(WebSocketFrame.Pong(payload)).andThen(Loop.continue[Ack]) + case Success(WebSocketFrame.Pong(_)) => Loop.continue[Ack] + case Success(in: WebSocketFrame.Data[?]) => Emit.andMap(Chunk(in)) { + case Ack.Stop => Loop.done[Unit, Ack](Ack.Stop) + case _ => Loop.continue[Ack] + } + case _ => Loop.done[Unit, Ack](Ack.Stop) + } + } + + val pipeComputation: Unit < (Async & Resource) = pipe(Stream(emitFromWebSocket)) + .runForeach(dataFrame => ws.send(dataFrame)) + .andThen(Resource.ensure(ws.close())) + + Resource.run(pipeComputation) + end compileWebSocketPipe + +end KyoBodyFromHttpClient diff --git a/kyo-sttp/jvm/src/main/scala/sttp/client3/KyoBodyToHttpClient.scala b/kyo-sttp/jvm/src/main/scala/sttp/client3/KyoBodyToHttpClient.scala new file mode 100644 index 000000000..a9a38c8c8 --- /dev/null +++ b/kyo-sttp/jvm/src/main/scala/sttp/client3/KyoBodyToHttpClient.scala @@ -0,0 +1,25 @@ +package sttp.client3 + +import java.net.http.HttpRequest.BodyPublisher +import java.net.http.HttpRequest.BodyPublishers +import java.nio.ByteBuffer +import kyo.* +import kyo.capabilities.KyoStreams +import kyo.internal.KyoSttpMonad +import kyo.internal.KyoSttpMonad.* +import kyo.interop.Adapters +import sttp.client3.internal.httpclient.BodyToHttpClient +import sttp.monad.MonadError + +final class KyoBodyToHttpClient extends BodyToHttpClient[KyoSttpMonad.M, KyoStreams]: + override val streams: KyoStreams = KyoStreams + override given monad: MonadError[KyoSttpMonad.M] = KyoSttpMonad + override def streamToPublisher(stream: KyoStreams.BinaryStream): KyoSttpMonad.M[BodyPublisher] = + val byteBufferStream = stream.mapChunk { chunk => + Chunk(ByteBuffer.wrap(chunk.toArray)) + } + Adapters.streamToPublisher(byteBufferStream).map { publisher => + BodyPublishers.fromPublisher(publisher) + } + end streamToPublisher +end KyoBodyToHttpClient