Skip to content

Commit

Permalink
Strict nullability with : Any, and made RxBoxImp a bit more effic…
Browse files Browse the repository at this point in the history
…ient (#12)
  • Loading branch information
nedtwigg authored Jan 24, 2025
2 parents 68cc247 + 9621865 commit 4b46450
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 144 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

## [Unreleased]
### Changed
- **BREAKING** Replace `RxJava Disposable` with `Kotlin Job`, and remove `rxjava` completely. ([#10](https://github.com/diffplug/durian-rx/pull/10))
- Add strict nullability to RxBox and improve efficiency. ([#12](https://github.com/diffplug/durian-rx/pull/12))
- Bump required java from 11 to 17. ([#9](https://github.com/diffplug/durian-rx/pull/9))
- Replace `RxJava Disposable` with `Kotlin Job`, and remove `rxjava` completely. ([#10](https://github.com/diffplug/durian-rx/pull/10))

## [4.0.1] - 2022-12-20
### Fixed
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/diffplug/common/rx/ForwardingBox.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kotlinx.coroutines.flow.Flow
*
* Especially useful for overridding set().
*/
open class ForwardingBox<T, BoxType : Box<T>>
open class ForwardingBox<T : Any, BoxType : Box<T>>
protected constructor(protected val delegate: BoxType) : Box<T> {
override fun get(): T {
return delegate.get()
Expand All @@ -37,7 +37,7 @@ protected constructor(protected val delegate: BoxType) : Box<T> {
delegate.set(value)
}

class Cas<T> protected constructor(delegate: CasBox<T>) :
class Cas<T : Any> protected constructor(delegate: CasBox<T>) :
ForwardingBox<T, CasBox<T>>(delegate), CasBox<T> {
override fun compareAndSet(expect: T, update: T): Boolean {
return delegate.compareAndSet(expect, update)
Expand All @@ -48,21 +48,21 @@ protected constructor(protected val delegate: BoxType) : Box<T> {
}
}

class Lock<T> protected constructor(delegate: LockBox<T>) :
class Lock<T : Any> protected constructor(delegate: LockBox<T>) :
ForwardingBox<T, LockBox<T>>(delegate), LockBox<T> {
override fun lock(): Any {
return delegate.lock()
}
}

open class Rx<T> protected constructor(delegate: RxBox<T>) :
open class Rx<T : Any> protected constructor(delegate: RxBox<T>) :
ForwardingBox<T, RxBox<T>>(delegate), RxBox<T> {
override fun asFlow(): Flow<T> {
return delegate.asFlow()
}
}

class RxLock<T> protected constructor(delegate: RxLockBox<T>) :
class RxLock<T : Any> protected constructor(delegate: RxLockBox<T>) :
ForwardingBox<T, RxLockBox<T>>(delegate), RxLockBox<T> {
override fun lock(): Any {
return delegate.lock()
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/com/diffplug/common/rx/GuardedExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.Executor
import java.util.function.Supplier
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.Flow

/**
Expand All @@ -33,12 +32,11 @@ import kotlinx.coroutines.flow.Flow
*/
open class GuardedExecutor(val delegate: RxExecutor, val guard: Chit) : Executor, RxSubscriber {
override fun execute(command: Runnable) {
delegate.executor().execute(guard.guard(command))
delegate.executor.execute(guard.guard(command))
}

/** Creates a runnable which runs on this Executor iff the guard widget is not disposed. */
fun wrap(delegate: Runnable): Runnable {
Objects.requireNonNull(delegate)
return Runnable { execute(guard.guard(delegate)) }
}

Expand All @@ -48,7 +46,7 @@ open class GuardedExecutor(val delegate: RxExecutor, val guard: Chit) : Executor
guard.runWhenDisposed { job.cancel() }
job
} else {
SupervisorJob().apply { cancel() }
Rx.sentinelJob
}
}

Expand Down
58 changes: 0 additions & 58 deletions src/main/java/com/diffplug/common/rx/MappedImp.java

This file was deleted.

43 changes: 43 additions & 0 deletions src/main/java/com/diffplug/common/rx/MappedImp.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2020 DiffPlug
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.diffplug.common.rx

import com.diffplug.common.base.Box
import com.diffplug.common.base.Converter
import java.util.function.Function

internal open class MappedImp<T : Any, R : Any, BoxType : Box<T>>(
@JvmField protected val delegate: BoxType,
@JvmField protected val converter: Converter<T, R>
) : Box<R> {
override fun get(): R = converter.convertNonNull(delegate.get())

override fun set(value: R) = delegate.set(converter.revertNonNull(value))

/** Shortcut for doing a set() on the result of a get(). */
override fun modify(mutator: Function<in R, out R>): R {
val result = Box.Nullable.ofNull<R>()
delegate.modify { input: T ->
val unmappedResult = mutator.apply(converter.convertNonNull(input))
result.set(unmappedResult)
converter.revertNonNull(unmappedResult)
}
return result.get()
}

override fun toString(): String =
"[" + delegate + " mapped to " + get() + " by " + converter + "]"
}
8 changes: 6 additions & 2 deletions src/main/java/com/diffplug/common/rx/MultiSelectModel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,14 @@ class MultiSelectModel<T : Any>(

companion object {
/** Creates an Optional<Either> from an Either<Optional>. </Optional></Either> */
fun <T, U> optEitherFrom(either: Either<Optional<T>, Optional<U>>): Optional<Either<T, U>> {
fun <T : Any, U : Any> optEitherFrom(
either: Either<Optional<T>, Optional<U>>
): Optional<Either<T, U>> {
return either.fold({ leftOpt: Optional<T> ->
leftOpt.map { l: T -> Either.createLeft(l) }
}) { rightOpt: Optional<U> -> rightOpt.map { r: U -> Either.createRight(r) } }
}) { rightOpt: Optional<U> ->
rightOpt.map { r: U -> Either.createRight(r) }
}
}
}
}
8 changes: 5 additions & 3 deletions src/main/java/com/diffplug/common/rx/Rx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ import kotlinx.coroutines.flow.merge
* (https://diffplug.github.io/durian-swt/javadoc/snapshot/com/diffplug/common/swt/SwtExec.html)
*/
object Rx {
@JvmStatic
fun <T> createEmitFlow() =
MutableSharedFlow<T>(replay = 0, extraBufferCapacity = 1, BufferOverflow.SUSPEND)

Expand Down Expand Up @@ -132,6 +133,7 @@ object Rx {
* Creates an Rx instance which will call the given consumer whenever the followed stream or
* future completes, whether with an error or not, and the error (if present) will be logged.
*/
@JvmStatic
fun <T> onTerminateLogError(onTerminate: Consumer<Optional<Throwable>>): RxListener<T> {
return RxListener(Consumers.doNothing(), DefaultTerminate(onTerminate))
}
Expand Down Expand Up @@ -371,7 +373,7 @@ object Rx {

/** Reliable way to sync two RxBox to each other. */
@JvmStatic
fun <T> sync(left: RxBox<T>, right: RxBox<T>) {
fun <T : Any> sync(left: RxBox<T>, right: RxBox<T>) {
sync(sameThreadExecutor(), left, right)
}

Expand All @@ -380,7 +382,7 @@ object Rx {
* changes
*/
@JvmStatic
fun <T> sync(subscriber: RxSubscriber, left: RxBox<T>, right: RxBox<T>) {
fun <T : Any> sync(subscriber: RxSubscriber, left: RxBox<T>, right: RxBox<T>) {
val firstChange = Box.Nullable.ofNull<Either<T, T>?>()
subscriber.subscribe(left) { leftVal: T ->
// the left changed before we could acknowledge it
Expand Down Expand Up @@ -449,5 +451,5 @@ object Rx {
}
}

val sentinelJob: Job = Job().apply { cancel() }
@JvmStatic val sentinelJob: Job = Job().apply { cancel() }
}
36 changes: 15 additions & 21 deletions src/main/java/com/diffplug/common/rx/RxBox.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map

/** [RxGetter] and [Box] combined in one: a value you can set, get, and subscribe to. */
interface RxBox<T> : RxGetter<T>, Box<T> {
interface RxBox<T : Any> : RxGetter<T>, Box<T> {
/** Returns a read-only version of this `RxBox`. */
fun readOnly(): RxGetter<T> {
return this
}
fun readOnly(): RxGetter<T> = this

/** Maps one `RxBox` to another `RxBox`. */
override fun <R> map(converter: Converter<T, R>): RxBox<R> {
override fun <R : Any> map(converter: Converter<T, R>): RxBox<R> {
return RxBoxImp.Mapped(this, converter)
}

Expand Down Expand Up @@ -70,30 +68,26 @@ interface RxBox<T> : RxGetter<T>, Box<T> {

companion object {
/** Creates an `RxBox` with the given initial value. */
@JvmStatic
fun <T> of(initial: T): RxBox<T> {
return RxBoxImp(initial)
}
@JvmStatic fun <T : Any> of(initial: T): RxBox<T> = RxBoxImp(initial)

/**
* Creates an `RxBox` which implements the "getter" part with `RxGetter`, and the setter part
* with `Consumer`.
*/
@JvmStatic
fun <T> from(getter: RxGetter<T>, setter: Consumer<T>): RxBox<T> {
return object : RxBox<T> {
override fun asFlow(): Flow<T> {
return getter.asFlow()
}
fun <T : Any> from(getter: RxGetter<T>, setter: Consumer<T>): RxBox<T> =
object : RxBox<T> {
override fun asFlow(): Flow<T> {
return getter.asFlow()
}

override fun get(): T {
return getter.get()
}
override fun get(): T {
return getter.get()
}

override fun set(value: T) {
setter.accept(value)
override fun set(value: T) {
setter.accept(value)
}
}
}
}
}
}
18 changes: 6 additions & 12 deletions src/main/java/com/diffplug/common/rx/RxBoxImp.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,22 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map

internal open class RxBoxImp<T> private constructor(initial: T, subject: MutableStateFlow<T>) :
RxBox<T> {
private var value: T = initial
private val subject: MutableStateFlow<T> = subject

constructor(initial: T) : this(initial, MutableStateFlow(initial)) {}
internal open class RxBoxImp<T : Any>(initial: T) : RxBox<T> {
private val subject = MutableStateFlow(initial)

override fun set(newValue: T) {
if (newValue != value) {
value = newValue
if (subject.value != newValue) {
subject.value = newValue
}
}

override fun get(): T = value
override fun get(): T = subject.value

override fun asFlow(): Flow<T> = subject

internal class Mapped<T, R>(delegate: RxBox<T>, converter: Converter<T, R>) :
internal class Mapped<T : Any, R : Any>(delegate: RxBox<T>, converter: Converter<T, R>) :
MappedImp<T, R, RxBox<T>>(delegate, converter), RxBox<R> {
val flow: Flow<R> =
delegate.asFlow().map { a: T -> converter.convertNonNull(a) }.distinctUntilChanged()
val flow: Flow<R> = delegate.asFlow().map(converter::convertNonNull).distinctUntilChanged()

override fun asFlow(): Flow<R> = flow
}
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/com/diffplug/common/rx/RxExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,13 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch

class RxExecutor
internal constructor(private val executor: Executor, val dispatcher: CoroutineDispatcher) :
class RxExecutor internal constructor(val executor: Executor, val dispatcher: CoroutineDispatcher) :
RxSubscriber {

interface Has : Executor {
val rxExecutor: RxExecutor
}

fun executor() = executor

override fun <T> subscribe(flow: Flow<T>, listener: RxListener<T>) {
subscribeDisposable(flow, listener)
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/diffplug/common/rx/RxGetter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import kotlinx.coroutines.flow.map
* not change (e.g. a field is set to its current value, which produces no change) then the
* `Observable` will not fire.
*/
interface RxGetter<T> : IFlowable<T>, Supplier<T> {
interface RxGetter<T : Any> : IFlowable<T>, Supplier<T> {
/**
* Maps an `RxGetter` to a new `RxGetter` by applying the `mapper` function to all of its values.
*
Expand All @@ -46,7 +46,7 @@ interface RxGetter<T> : IFlowable<T>, Supplier<T> {
* * Incorrect: `("A", "B", "C") -> map(String::length) = (1, 1, 1)`
* * Correct: `("A", "B", "C") -> map(String::length) = (1)`
*/
fun <R> map(mapper: Function<in T, out R>): RxGetter<R> {
fun <R : Any> map(mapper: Function<in T, out R>): RxGetter<R> {
val src = this
val mapped = src.asFlow().map { t: T -> mapper.apply(t) }
val observable = mapped.distinctUntilChanged()
Expand All @@ -70,7 +70,7 @@ interface RxGetter<T> : IFlowable<T>, Supplier<T> {
* recorded by a non-volatile field.
*/
@JvmStatic
fun <T> from(observable: Flow<T>, initialValue: T): RxGetter<T> {
fun <T : Any> from(observable: Flow<T>, initialValue: T): RxGetter<T> {
val box = Box.of(initialValue)
subscribe(observable) { value: T -> box.set(value) }
return object : RxGetter<T> {
Expand All @@ -90,7 +90,7 @@ interface RxGetter<T> : IFlowable<T>, Supplier<T> {
* As with [.map], the observable only emits a new value if its value has changed.
*/
@JvmStatic
fun <T1, T2, R> combineLatest(
fun <T1 : Any, T2 : Any, R : Any> combineLatest(
t: RxGetter<out T1>,
u: RxGetter<out T2>,
combine: BiFunction<in T1, in T2, out R>
Expand Down
Loading

0 comments on commit 4b46450

Please sign in to comment.