Skip to content

Commit

Permalink
Fire subscribers outside of provider lock
Browse files Browse the repository at this point in the history
  • Loading branch information
NichtStudioCode committed Oct 25, 2024
1 parent fca6320 commit f4a4fb4
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package xyz.xenondevs.commons.provider

import xyz.xenondevs.commons.collections.isNullOrEmpty
import xyz.xenondevs.commons.collections.weakHashSet
import java.util.*
import java.util.concurrent.locks.ReentrantLock
Expand Down Expand Up @@ -32,11 +33,14 @@ private class ParentProviderWrapper<C, P>(
val ignored: Set<AbstractProvider<*>> // set of providers which should not cause parent.onChildChanged when updating child (only one level deep)
) {

fun onChildChanged(changed: AbstractProvider<*>) {
fun onChildChanged(
changed: AbstractProvider<*>,
preparedSubscribers: MutableList<() -> Unit>
) {
if (changed in ignored)
return

parent.onChildChanged(child, push)
parent.onChildChanged(child, push, preparedSubscribers)
}

}
Expand Down Expand Up @@ -66,9 +70,11 @@ abstract class AbstractProvider<T>(
private var weakInactiveChildren: MutableSet<AbstractProvider<*>>? = null
private var subscribers: MutableList<(T) -> Unit>? = null
private var weakSubscribers: MutableMap<Any, ArrayList<(Any, T) -> Unit>>? = null
private var immediateSubscribers: MutableList<(T) -> Unit>? = null
private var weakImmediateSubscribers: MutableMap<Any, ArrayList<(Any, T) -> Unit>>? = null

val parents: Set<AbstractProvider<*>>
get() = lock.withLock {
get() = lock.withLock {
val parents = HashSet<AbstractProvider<*>>()
activeParents?.forEach { parents.add(it.parent) }
inactiveParents?.forEach { parents.add(it) }
Expand Down Expand Up @@ -104,10 +110,18 @@ abstract class AbstractProvider<T>(
}
}

override fun set(value: T) = lock.withLock {
if (this.value != value) {
this.value = value
onSelfChanged()
override fun set(value: T) {
val preparedSubscribers = ArrayList<() -> Unit>(0)

lock.withLock {
if (this.value != value) {
this.value = value
onSelfChanged(preparedSubscribers)
}
}

for (subscriber in preparedSubscribers) {
subscriber()
}
}

Expand All @@ -117,72 +131,119 @@ abstract class AbstractProvider<T>(
subscribers!!.add(action)
}

fun subscribeImmediate(action: (T) -> Unit): Unit = lock.withLock {
if (immediateSubscribers == null)
immediateSubscribers = ArrayList(1)
immediateSubscribers!!.add(action)
}

@Suppress("UNCHECKED_CAST")
override fun <R : Any> subscribeWeak(owner: R, action: (R, T) -> Unit): Unit = lock.withLock {
if (weakSubscribers == null)
weakSubscribers = WeakHashMap(1)
weakSubscribers!!.getOrPut(owner) { ArrayList(1) } += action as (Any, T) -> Unit
}

@Suppress("UNCHECKED_CAST")
fun <R : Any> subscribeWeakImmediate(owner: R, action: (R, T) -> Unit): Unit = lock.withLock {
if (weakImmediateSubscribers == null)
weakImmediateSubscribers = WeakHashMap(1)
weakImmediateSubscribers!!.getOrPut(owner) { ArrayList(1) } += action as (Any, T) -> Unit
}

override fun unsubscribe(action: (T) -> Unit): Unit = lock.withLock {
subscribers?.remove(action)
}

fun unsubscribeImmediate(action: (T) -> Unit): Unit = lock.withLock {
immediateSubscribers?.remove(action)
}

@Suppress("UNCHECKED_CAST")
override fun <R : Any> unsubscribeWeak(owner: R, action: (R, T) -> Unit): Unit = lock.withLock {
action as (Any, T) -> Unit
weakSubscribers?.get(owner)?.remove(action)
}

@Suppress("UNCHECKED_CAST")
fun <R : Any> unsubscribeWeakImmediate(owner: R, action: (R, T) -> Unit): Unit = lock.withLock {
action as (Any, T) -> Unit
weakImmediateSubscribers?.get(owner)?.remove(action)
}

override fun <R : Any> unsubscribeWeak(owner: R): Unit = lock.withLock {
weakSubscribers?.remove(owner)
}

fun onSelfChanged() {
fun <R : Any> unsubscribeWeakImmediate(owner: R): Unit = lock.withLock {
weakImmediateSubscribers?.remove(owner)
}

fun onSelfChanged(preparedSubscribers: MutableList<() -> Unit>) {
assert(lock.isHeldByCurrentThread)

fireSubscribers()
prepareSubscribers(preparedSubscribers)

activeParents?.forEach { it.onChildChanged(this) }
activeChildren?.forEach { it.onParentChanged(this) }
activeParents?.forEach { it.onChildChanged(this, preparedSubscribers) }
activeChildren?.forEach { it.onParentChanged(this, preparedSubscribers) }
}

fun onParentChanged(changedParent: AbstractProvider<*>) {
fun onParentChanged(
changedParent: AbstractProvider<*>,
preparedSubscribers: MutableList<() -> Unit>
) {
assert(lock.isHeldByCurrentThread)

value = InternalProviderValue.Uninitialized

fireSubscribers()
prepareSubscribers(preparedSubscribers)

activeParents?.forEach {
if (it.parent != changedParent)
it.onChildChanged(changedParent)
it.onChildChanged(changedParent, preparedSubscribers)
}
activeChildren?.forEach { it.onParentChanged(this) }
activeChildren?.forEach { it.onParentChanged(this, preparedSubscribers) }
}

fun <C> onChildChanged(changedChild: AbstractProvider<C>, push: PushFunction<C, T>) {
fun <C> onChildChanged(
changedChild: AbstractProvider<C>,
push: PushFunction<C, T>,
preparedSubscribers: MutableList<() -> Unit>
) {
assert(lock.isHeldByCurrentThread)

value = PushSource(changedChild, push)
fireSubscribers()
prepareSubscribers(preparedSubscribers)

activeParents?.forEach { it.onChildChanged(changedChild) }
activeParents?.forEach { it.onChildChanged(changedChild, preparedSubscribers) }
activeChildren?.forEach {
if (it != changedChild)
it.onParentChanged(this)
it.onParentChanged(this, preparedSubscribers)
}
}

private fun fireSubscribers() { // TODO: fire outside of lock ?
private fun prepareSubscribers(preparedSubscribers: MutableList<() -> Unit>) {
assert(lock.isHeldByCurrentThread)

if (subscribers.isNullOrEmpty() && weakSubscribers.isNullOrEmpty())
return
if (subscribers.isNullOrEmpty()
&& weakSubscribers.isNullOrEmpty()
&& immediateSubscribers.isNullOrEmpty()
&& weakImmediateSubscribers.isNullOrEmpty()
) return

val value = get()
subscribers?.forEach { it(value) }
subscribers?.forEach { subscriber ->
preparedSubscribers += { subscriber(value) }
}
weakSubscribers?.forEach { (owner, actions) ->
for (action in actions) {
preparedSubscribers += { action(owner, value) }
}
}
immediateSubscribers?.forEach { subscriber ->
subscriber(value)
}
weakImmediateSubscribers?.forEach { (owner, actions) ->
for (action in actions) {
action(owner, value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import java.lang.ref.WeakReference
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

// TODO: locks don't need to be changed if all parents have the same lock

/**
* Creates and returns a new [Provider] that combines all values of [providers].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

package xyz.xenondevs.commons.provider

import java.lang.ref.WeakReference
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import java.lang.ref.WeakReference

/**
* Creates a new [MutableProvider] that defaults to [value] if the value of [this][MutableProvider] is null.
Expand All @@ -16,7 +16,7 @@ fun <T : Any> MutableProvider<T?>.defaultsTo(value: T): MutableProvider<T> =
/**
* Creates a new [MutableProvider] that defaults to [value] if the value of [this][MutableProvider] is null.
* The default value is propagated upwards when the value of the returned provider is loaded.
*
*
* The returned provider will only be stored in a [WeakReference] in the parent provider ([this][MutableProvider]).
*/
fun <T : Any> MutableProvider<T?>.weakDefaultsTo(value: T): MutableProvider<T> =
Expand Down Expand Up @@ -73,7 +73,7 @@ private class MutableDefaultValueProvider<T : Any>(
var value = parent.get()
if (value == null) {
value = defaultValue
parent.onChildChanged(this) { it }
parent.onChildChanged(this, { it }, ArrayList(0)) // fixme: does not fire subscribers
}

return value
Expand Down Expand Up @@ -102,7 +102,7 @@ private class MutableDefaultProviderProvider<T : Any>(
var value = provider.get()
if (value == null) {
value = defaultProvider.get()
provider.onChildChanged(this) { it }
provider.onChildChanged(this, { it }, ArrayList(0))
}

return value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private class FlatMappingProvider<P, T>(
provider.addChild(active = true, weak = weak, child = this)
addParent(provider, ignored = setOf(parent)) { it }

parent.subscribeWeak(this) { thisRef, parentValue ->
parent.subscribeWeakImmediate(this) { thisRef, parentValue ->
thisRef.handleProviderUpdate(transform(parentValue))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ private class ObservedListProvider<T>(
var observedList: ObservableList<T>? = null
observedList = ObservableList(list) {
if (this.observedList === observedList) {
lock.withLock { onSelfChanged() }
val preparedSubscribers = ArrayList<() -> Unit>(0)
lock.withLock { onSelfChanged(preparedSubscribers) }
preparedSubscribers.forEach { it() }
}
}
this.observedList = observedList
Expand Down Expand Up @@ -160,7 +162,9 @@ private class ObservedMapProvider<K, V>(
var observedMap: ObservableMap<K, V>? = null
observedMap = ObservableMap(map) {
if (this.observedMap === observedMap) {
lock.withLock { onSelfChanged() }
val preparedSubscribers = ArrayList<() -> Unit>(0)
lock.withLock { onSelfChanged(preparedSubscribers) }
preparedSubscribers.forEach { it() }
}
}
this.observedMap = observedMap
Expand Down Expand Up @@ -193,7 +197,9 @@ private class ObservedSetProvider<T>(
var observedSet: ObservableSet<T>? = null
observedSet = ObservableSet(set) {
if (this.observedSet === observedSet) {
lock.withLock { onSelfChanged() }
val preparedSubscribers = ArrayList<() -> Unit>(0)
lock.withLock { onSelfChanged(preparedSubscribers) }
preparedSubscribers.forEach { it() }
}
}
this.observedSet = observedSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,18 @@ class ProviderTest {
assertEquals(1, mirror3)
}

@OptIn(UnstableProviderApi::class)
@Test
fun testProviderSubscriberCalledOutsideLock() {
val provider = mutableProvider(0) as AbstractProvider<Int>
val mapped = provider.map({ it + 1 }, { it - 1 }) as AbstractProvider<Int>

provider.subscribe { assertEquals(false, provider.lock.isHeldByCurrentThread) }
provider.subscribeWeak(this) { _, _ -> assertEquals(false, provider.lock.isHeldByCurrentThread) }
mapped.subscribe { assertEquals(false, provider.lock.isHeldByCurrentThread) }
mapped.subscribeWeak(this) { _, _ -> assertEquals(false, provider.lock.isHeldByCurrentThread) }

provider.set(1)
}

}

0 comments on commit f4a4fb4

Please sign in to comment.