diff --git a/build.sbt b/build.sbt
index 3d4f09b1..390e4c9d 100644
--- a/build.sbt
+++ b/build.sbt
@@ -13,7 +13,7 @@ scalaVersion in ThisBuild := "2.11.2"
crossScalaVersions in ThisBuild := Seq("2.10.4", "2.11.2")
libraryDependencies ++= Seq(
- "io.reactivex" % "rxjava" % "1.0.0-rc.4",
+ "io.reactivex" % "rxjava" % "1.0.0-rc.5",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test")
diff --git a/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala
index ea5b8ac1..9b1c3828 100755
--- a/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala
+++ b/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala
@@ -391,20 +391,6 @@ class RxScalaDemo extends JUnitSuite {
waitFor(Olympics.yearTicks)
}
- @Test def groupByUntilExample() {
- val numbers = Observable.interval(250 millis).take(14)
- val grouped = numbers.groupByUntil(x => x % 2){ case (key, obs) => obs.filter(x => x == 7) }
- val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
- sequenced.subscribe(x => println(s"Emitted group: $x"))
- }
-
- @Test def groupByUntilExample2() {
- val numbers = Observable.interval(250 millis).take(14)
- val grouped = numbers.groupByUntil(x => x % 2, x => x * 10){ case (key, obs) => Observable.interval(2 seconds) }
- val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
- sequenced.toBlocking.foreach(x => println(s"Emitted group: $x"))
- }
-
@Test def combineLatestExample() {
val firstCounter = Observable.interval(250 millis)
val secondCounter = Observable.interval(550 millis)
@@ -554,7 +540,7 @@ class RxScalaDemo extends JUnitSuite {
println(doubleAverage(Observable.empty).toBlocking.single)
println(doubleAverage(List(0.0).toObservable).toBlocking.single)
println(doubleAverage(List(4.44).toObservable).toBlocking.single)
- println(doubleAverage(List(1, 2, 3.5).toObservable).toBlocking.single)
+ println(doubleAverage(List(1.0, 2.0, 3.5).toObservable).toBlocking.single)
}
@Test def testSum() {
@@ -1115,20 +1101,51 @@ class RxScalaDemo extends JUnitSuite {
Observable[String]({ subscriber =>
println("subscribing")
subscriber.onError(new RuntimeException("always fails"))
- }).retryWhen(attempts => {
- attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
+ }).retryWhen({ throwableObservable =>
+ throwableObservable.zipWith(Observable.from(1 to 3))((t, i) => i).flatMap(i => {
println("delay retry by " + i + " second(s)")
Observable.timer(Duration(i, TimeUnit.SECONDS))
})
}).toBlocking.foreach(s => println(s))
}
+ @Test def retryWhenDifferentExceptionsExample(): Unit = {
+ var observableCreateCount = 1 // Just to support switching which Exception is produced
+ Observable[String]({ subscriber =>
+ println("subscribing")
+ if (observableCreateCount <= 2) {
+ subscriber.onError(new IOException("IO Fail"))
+ } else {
+ subscriber.onError(new RuntimeException("Other failure"))
+ }
+ observableCreateCount += 1
+ }).retryWhen({ throwableObservable =>
+ throwableObservable.zip(Observable.from(1 to 3)).flatMap({ case (error, retryCount) =>
+ error match {
+ // Only retry 2 times if we get a IOException and then error out with the third IOException.
+ // Let the other Exception's pass through and complete the Observable.
+ case _: IOException =>
+ if (retryCount <= 3) {
+ println("IOException delay retry by " + retryCount + " second(s)")
+ Observable.timer(Duration(retryCount, TimeUnit.SECONDS))
+ } else {
+ Observable.error(error)
+ }
+
+ case _ =>
+ println("got error " + error + ", will stop retrying")
+ Observable.empty
+ }
+ })
+ }).toBlocking.foreach(s => println(s))
+ }
+
@Test def repeatWhenExample(): Unit = {
Observable[String]({ subscriber =>
println("subscribing")
subscriber.onCompleted()
- }).repeatWhen(attempts => {
- attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
+ }).repeatWhen({ unitObservable =>
+ unitObservable.zipWith(Observable.from(1 to 3))((u, i) => i).flatMap(i => {
println("delay repeat by " + i + " second(s)")
Observable.timer(Duration(i, TimeUnit.SECONDS))
})
diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala
index 0c87b6d7..5c75863d 100755
--- a/src/main/scala/rx/lang/scala/Observable.scala
+++ b/src/main/scala/rx/lang/scala/Observable.scala
@@ -2260,57 +2260,6 @@ trait Observable[+T]
}
}
- /**
- * Groups the items emitted by this Observable according to a specified discriminator function and terminates these groups
- * according to a function.
- *
- * @param f
- * a function that extracts the key from an item
- * @param closings
- * the function that accepts the key of a given group and an observable representing that group, and returns
- * an observable that emits a single Closing when the group should be closed.
- * @tparam K
- * the type of the keys returned by the discriminator function.
- * @return an Observable that emits `(key, observable)` pairs, where `observable`
- * contains all items for which `f` returned `key` before `closings` emits a value.
- */
- def groupByUntil[K](f: T => K)(closings: (K, Observable[T])=>Observable[Any]): Observable[(K, Observable[T])] = {
- val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Any]] =
- (jGrObs: rx.observables.GroupedObservable[K, _ <: T]) => closings(jGrObs.getKey, toScalaObservable[T](jGrObs)).asJavaObservable
- val o1 = asJavaObservable.groupByUntil[K, Any](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
- val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o))
- toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
- }
-
- /**
- * Groups the items emitted by an [[Observable]] (transformed by a selector) according to a specified key selector function
- * until the duration Observable expires for the key.
- *
- *
- *
- * Note: The `Observable` in the pair `(K, Observable[V])` will cache the items it is to emit until such time as it
- * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those `Observable` that
- * do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like `take(0)` to them.
- *
- * @param keySelector a function to extract the key for each item
- * @param valueSelector a function to map each item emitted by the source [[Observable]] to an item emitted by one
- * of the resulting `Observable[V]`s
- * @param closings a function to signal the expiration of a group
- * @return an [[Observable]] that emits pairs of key and `Observable[V]`, each of which corresponds to a key
- * value and each of which emits all items emitted by the source [[Observable]] during that
- * key's duration that share that same key value, transformed by the value selector
- */
- def groupByUntil[K, V](keySelector: T => K, valueSelector: T => V)(closings: (K, Observable[V]) => Observable[Any]): Observable[(K, Observable[V])] = {
- val jKeySelector: Func1[_ >: T, _ <: K] = keySelector
- val jValueSelector: Func1[_ >: T, _ <: V] = valueSelector
- val jDurationSelector = new Func1[rx.observables.GroupedObservable[_ <: K, _ <: V], rx.Observable[_ <: Any]] {
- override def call(jgo: rx.observables.GroupedObservable[_ <: K, _ <: V]): rx.Observable[_ <: Any] = closings(jgo.getKey, toScalaObservable[V](jgo))
- }
- val f = (o: rx.observables.GroupedObservable[K, _ <: V]) => (o.getKey, toScalaObservable[V](o))
- val jo = asJavaObservable.groupByUntil[K, V, Any](jKeySelector, jValueSelector, jDurationSelector).map[(K, Observable[V])](f)
- toScalaObservable[(K, Observable[V])](jo)
- }
-
/**
* Correlates the items emitted by two Observables based on overlapping durations.
*
@@ -3265,7 +3214,7 @@ trait Observable[+T] /** * Returns an Observable that emits the same values as the source observable with the exception of an * `onError`. An `onError` notification from the source will result in the emission of a - * [[Notification]] to the Observable provided as an argument to the `notificationHandler` + * [[Throwable]] to the Observable provided as an argument to the `notificationHandler` * function. If the Observable returned `onCompletes` or `onErrors` then `retry` will call * `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will * resubscribe to the source Observable. @@ -3276,21 +3225,25 @@ trait Observable[+T] * * This retries 3 times, each time incrementing the number of seconds it waits. * - *
+ * @example + * + * This retries 3 times, each time incrementing the number of seconds it waits. + * + * {{{ * Observable[String]({ subscriber => * println("subscribing") * subscriber.onError(new RuntimeException("always fails")) - * }).retryWhen(attempts => { - * attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => { + * }).retryWhen({ throwableObservable => + * throwableObservable.zipWith(Observable.from(1 to 3))((t, i) => i).flatMap(i => { * println("delay retry by " + i + " second(s)") * Observable.timer(Duration(i, TimeUnit.SECONDS)) * }) * }).toBlocking.foreach(s => println(s)) - *+ * }}} * * Output is: * - *
+ * {{{ * subscribing * delay retry by 1 second(s) * subscribing @@ -3298,23 +3251,25 @@ trait Observable[+T] * subscribing * delay retry by 3 second(s) * subscribing - *+ * }}} + * *
*
@@ -3333,18 +3288,19 @@ trait Observable[+T]
*
*
+ *
+ * @example
+ *
+ * This repeats 3 times, each time incrementing the number of seconds it waits.
+ *
+ * {{{
+ * Observable[String]({ subscriber =>
+ * println("subscribing")
+ * subscriber.onCompleted()
+ * }).repeatWhen({ unitObservable =>
+ * unitObservable.zipWith(Observable.from(1 to 3))((u, i) => i).flatMap(i => {
+ * println("delay repeat by " + i + " second(s)")
+ * Observable.timer(Duration(i, TimeUnit.SECONDS))
+ * })
+ * }).toBlocking.foreach(s => println(s))
+ * }}}
+ *
+ * Output is:
+ *
+ * {{{
+ * subscribing
+ * delay repeat by 1 second(s)
+ * subscribing
+ * delay repeat by 2 second(s)
+ * subscribing
+ * delay repeat by 3 second(s)
+ * subscribing
+ * }}}
+ *
*