Skip to content

Commit

Permalink
EventStream.fromPublisher (#121)
Browse files Browse the repository at this point in the history
Arman's implementation from #114, rebased on master, moved into a separate package, etc.
  • Loading branch information
raquo authored Feb 26, 2024
1 parent 57b0938 commit 5eeef88
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 0 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,21 @@ If you have an `Observable[Future[A]]`, you can flatten it into `Observable[A]`
A failed future results in an error (see [Error Handling](#error-handling)).


#### Creating Observables from other Streaming APIs

`EventStream.fromPublisher[A]` creates a stream that subscribes to a [Flow.Publisher](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Publisher.html), and emits the values that it produces.

`Flow.Publisher` is a Java [Reactive Streams](http://www.reactive-streams.org/) interface that is useful for interoperating between streaming APIs. For example, you can transform an [FS2](https://fs2.io/) `Stream[IO, A]` into an Airstream `EventStream[A]`.

The resulting `EventStream` creates a new `Flow.Subscriber` and subscribes it to the publisher every time the `EventStream` is [started](https://github.com/raquo/Airstream#starting-observables), and cancels the subscription when the stream is stopped.


```scala
import cats.effect.unsafe.implicits._ // imports implicit IORuntime
EventStream.fromPublisher(fs2Stream.unsafeToPublisher())
```


#### `EventStream.fromJsPromise` and `Signal.fromJsPromise`

Behave the same as `fromFuture` above, but accept `js.Promise` instead. Useful for integration with JS libraries and APIs.
Expand Down
10 changes: 10 additions & 0 deletions src/main/scala/com/raquo/airstream/core/EventStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import com.raquo.airstream.debug.{DebuggableStream, Debugger, DebuggerStream}
import com.raquo.airstream.distinct.DistinctStream
import com.raquo.airstream.eventbus.EventBus
import com.raquo.airstream.extensions._
import com.raquo.airstream.javaflow.FlowPublisherStream
import com.raquo.airstream.misc._
import com.raquo.airstream.split.{SplittableOneStream, SplittableStream}
import com.raquo.airstream.status.{AsyncStatusObservable, Status}
import com.raquo.airstream.timing._
import com.raquo.ew.JsArray

import java.util.concurrent.Flow
import scala.annotation.unused
import scala.concurrent.{ExecutionContext, Future}
import scala.scalajs.js
Expand Down Expand Up @@ -388,6 +390,14 @@ object EventStream {
new JsPromiseStream[A](promise, emitOnce)
}

/** Create a stream from a [[java.util.concurrent.Flow.Publisher]]
* - Use this to bring in events from other streaming libraries
* that can provide a `Flow.Publisher`, such as FS2 an Monix.
*/
def fromPublisher[A](publisher: Flow.Publisher[A], emitOnce: Boolean = false): EventStream[A] = {
FlowPublisherStream(publisher, emitOnce)
}

/** Easy helper for custom events. See [[CustomStreamSource]] for docs.
*
* Provide `start` and `stop` callbacks that will be called when the stream
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.raquo.airstream.javaflow

import com.raquo.airstream.core.EventStream

import java.util.concurrent.Flow

object FlowPublisherStream {

def apply[A](publisher: Flow.Publisher[A], emitOnce: Boolean = false): EventStream[A] = {
var subscription: Flow.Subscription = null

EventStream.fromCustomSource[A](
shouldStart = startIndex => if (emitOnce) startIndex == 1 else true,
start = (fireEvent, fireError, _, _) => {
val subscriber = new Flow.Subscriber[A] {
def onNext(value: A): Unit = fireEvent(value)
def onError(err: Throwable): Unit = fireError(err)
def onComplete(): Unit = ()
def onSubscribe(sub: Flow.Subscription): Unit = {
sub.request(Long.MaxValue) // unlimited demand for events
subscription = sub
}
}
publisher.subscribe(subscriber)
},
stop = _ => {
if (subscription ne null) {
subscription.cancel()
subscription = null
}
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.raquo.airstream.fixtures.{Effect, TestableOwner}
import com.raquo.airstream.ownership.Owner
import org.scalactic.anyvals.NonEmptyList

import java.util.concurrent.Flow
import scala.collection.mutable

class EventStreamSpec extends UnitSpec {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.raquo.airstream.javaflow

import com.raquo.airstream.UnitSpec
import com.raquo.airstream.core.EventStream
import com.raquo.airstream.fixtures.{Effect, TestableOwner}
import com.raquo.airstream.ownership.Owner

import java.util.concurrent.Flow
import scala.collection.mutable

class FlowPublisherStreamSpec extends UnitSpec {

class RangePublisher(range: Range) extends Flow.Publisher[Int] {
def subscribe(subscriber: Flow.Subscriber[_ >: Int]): Unit = {
val subscription = new Flow.Subscription {
def request(n: Long): Unit = range.foreach(subscriber.onNext(_))
def cancel(): Unit = ()
}
subscriber.onSubscribe(subscription)
}
}

it("EventStream.fromPublisher") {

implicit val owner: Owner = new TestableOwner

val range = 1 to 3
val stream = EventStream.fromPublisher(new RangePublisher(range))

val effects = mutable.Buffer[Effect[_]]()
val sub1 = stream.foreach(newValue => effects += Effect("obs1", newValue))

effects.toList shouldBe range.map(i => Effect("obs1", i))
effects.clear()

sub1.kill()

val sub2 = stream.foreach(newValue => effects += Effect("obs2", newValue))

effects.toList shouldBe range.map(i => Effect("obs2", i))
effects.clear()
}
}

0 comments on commit 5eeef88

Please sign in to comment.