-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: split event listeners #26
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,10 @@ import io.getunleash.android.data.Toggle | |
import io.getunleash.android.data.UnleashContext | ||
import io.getunleash.android.data.UnleashState | ||
import io.getunleash.android.data.Variant | ||
import io.getunleash.android.events.UnleashEventListener | ||
import io.getunleash.android.events.UnleashImpressionEventListener | ||
import io.getunleash.android.events.UnleashListener | ||
import io.getunleash.android.events.UnleashReadyListener | ||
import io.getunleash.android.events.UnleashStateListener | ||
import io.getunleash.android.http.ClientBuilder | ||
import io.getunleash.android.http.NetworkStatusHelper | ||
import io.getunleash.android.metrics.MetricsCollector | ||
|
@@ -35,7 +38,6 @@ import kotlinx.coroutines.channels.BufferOverflow | |
import kotlinx.coroutines.flow.MutableSharedFlow | ||
import kotlinx.coroutines.flow.MutableStateFlow | ||
import kotlinx.coroutines.flow.asStateFlow | ||
import kotlinx.coroutines.flow.filter | ||
import kotlinx.coroutines.flow.first | ||
import kotlinx.coroutines.flow.takeWhile | ||
import kotlinx.coroutines.launch | ||
|
@@ -58,7 +60,7 @@ class DefaultUnleash( | |
private val unleashConfig: UnleashConfig, | ||
unleashContext: UnleashContext = UnleashContext(), | ||
cacheImpl: ToggleCache = InMemoryToggleCache(), | ||
eventListeners: List<UnleashEventListener> = emptyList(), | ||
eventListeners: List<UnleashListener> = emptyList(), | ||
private val lifecycle: Lifecycle = getLifecycle(androidContext), | ||
private val coroutineScope: CoroutineScope = unleashScope | ||
) : Unleash { | ||
|
@@ -72,11 +74,11 @@ class DefaultUnleash( | |
private val cache: ObservableToggleCache = ObservableCache(cacheImpl, coroutineScope) | ||
private var started = AtomicBoolean(false) | ||
private var ready = AtomicBoolean(false) | ||
private val readyFlow = MutableStateFlow(false) | ||
private val fetcher: UnleashFetcher? | ||
private val networkStatusHelper = NetworkStatusHelper(androidContext) | ||
private val impressionEventsFlow = MutableSharedFlow<ImpressionEvent>( | ||
extraBufferCapacity = 64, | ||
replay = 1, | ||
extraBufferCapacity = 1000, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this is too much, or maybe not. This is just to make sure we don't drop impression events, but we also protect the memory. Replay turns out to be useful for predictability, because sometimes the listeners don't get to register before the first event happens (same as with isReady flow). Arguably, maybe a replay is not needed for impression events |
||
onBufferOverflow = BufferOverflow.DROP_OLDEST | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may miss some updates if you're not fast enough |
||
) | ||
|
||
|
@@ -109,14 +111,13 @@ class DefaultUnleash( | |
} | ||
|
||
fun start( | ||
eventListeners: List<UnleashEventListener> = emptyList(), | ||
eventListeners: List<UnleashListener> = emptyList(), | ||
bootstrap: List<Toggle> = emptyList() | ||
) { | ||
if (!started.compareAndSet(false, true)) { | ||
Log.w(TAG, "Unleash already started, ignoring start call") | ||
return | ||
} | ||
eventListeners.forEach { addUnleashEventListener(it) } | ||
networkStatusHelper.registerNetworkListener(taskManager) | ||
if (unleashConfig.localStorageConfig.enabled) { | ||
val localBackup = getLocalBackup() | ||
|
@@ -127,6 +128,7 @@ class DefaultUnleash( | |
cache.subscribeTo(it.getFeaturesReceivedFlow()) | ||
} | ||
lifecycle.addObserver(taskManager) | ||
eventListeners.forEach { addUnleashEventListener(it) } | ||
if (bootstrap.isNotEmpty()) { | ||
Log.i(TAG, "Using provided bootstrap toggles") | ||
cache.write(UnleashState(unleashContextState.value, bootstrap.associateBy { it.name })) | ||
|
@@ -238,23 +240,25 @@ class DefaultUnleash( | |
unleashContextState.value = context | ||
} | ||
|
||
override fun addUnleashEventListener(listener: UnleashEventListener) { | ||
coroutineScope.launch { | ||
val firstReady = readyFlow.asStateFlow().filter { it }.first() | ||
Log.d(TAG, "Ready state changed to $firstReady, notifying $listener") | ||
override fun addUnleashEventListener(listener: UnleashListener) { | ||
|
||
if (listener is UnleashReadyListener) coroutineScope.launch { | ||
cache.getUpdatesFlow().first{ | ||
true | ||
} | ||
if (ready.compareAndSet(false, true)) { | ||
Log.d(TAG, "Unleash state changed to ready") | ||
} | ||
Log.d(TAG, "Notifying UnleashReadyListener") | ||
listener.onReady() | ||
} | ||
coroutineScope.launch { | ||
if (listener is UnleashStateListener) coroutineScope.launch { | ||
cache.getUpdatesFlow().collect { | ||
if (ready.compareAndSet(false, true)) { | ||
Log.d(TAG, "Unleash is now ready") | ||
readyFlow.value = true | ||
} | ||
Log.d(TAG, "Cache updated, notifying $listener that state changed") | ||
listener.onStateChanged() | ||
} | ||
} | ||
coroutineScope.launch { | ||
|
||
if (listener is UnleashImpressionEventListener) coroutineScope.launch { | ||
impressionEventsFlow.collect { event -> | ||
listener.onImpression(event) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ import io.getunleash.android.data.UnleashState | |
import io.getunleash.android.unleashScope | ||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.channels.BufferOverflow | ||
import kotlinx.coroutines.flow.Flow | ||
import kotlinx.coroutines.flow.MutableSharedFlow | ||
import kotlinx.coroutines.flow.asSharedFlow | ||
|
@@ -17,7 +18,10 @@ class ObservableCache(private val cache: ToggleCache, private val coroutineScope | |
private const val TAG = "ObservableCache" | ||
} | ||
|
||
private var events = MutableSharedFlow<UnleashState>() | ||
private var newStateEventFlow = MutableSharedFlow<UnleashState>( | ||
replay = 1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes the cache more predictable. During testing I noticed that the coroutine that was adding a listener to the cache: https://github.com/Unleash/UnleashAndroidSdk/blob/3da55190599b660be58797075410dcc6f69e017b/unleashandroidsdk/src/main/java/io/getunleash/android/DefaultUnleash.kt#L246-L251 was executing after the first new state arrived (this is particularly true when using bootstraped toggles as they are written immediately to the cache). This caused the ready listener to miss the ready event. With replay = 1 in place, even in this situation, the listener that arrives later will be able to see the first UnleashState received, despite the execution order. Particularly for state, I believe having the latest always available even for new subscribers is important. We are also not having a queue of states, just keep the latest and drop the oldest as soon as new states arrive There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense to me |
||
onBufferOverflow = BufferOverflow.DROP_OLDEST | ||
) | ||
override fun read(): Map<String, Toggle> { | ||
return cache.read() | ||
} | ||
|
@@ -28,12 +32,11 @@ class ObservableCache(private val cache: ToggleCache, private val coroutineScope | |
|
||
override fun write(state: UnleashState) { | ||
cache.write(state) | ||
Log.d(TAG, "Done writing cache") | ||
Log.d(TAG, "Done writing cache with ${newStateEventFlow.subscriptionCount.value} subscribers") | ||
coroutineScope.launch { | ||
Log.d(TAG, "Emitting new state with ${state.toggles.size} toggles for ${state.context}") | ||
events.emit(state) | ||
Log.d(TAG, "Emitting new state with ${state.toggles.size} toggles") | ||
newStateEventFlow.emit(state) | ||
} | ||
Log.d(TAG, "Done sending event") | ||
} | ||
|
||
override fun subscribeTo(featuresReceived: Flow<UnleashState>) { | ||
|
@@ -49,6 +52,6 @@ class ObservableCache(private val cache: ToggleCache, private val coroutineScope | |
} | ||
|
||
override fun getUpdatesFlow(): Flow<UnleashState> { | ||
return events.asSharedFlow() | ||
return newStateEventFlow.asSharedFlow() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package io.getunleash.android.events | ||
|
||
import io.getunleash.android.data.ImpressionEvent | ||
|
||
interface UnleashListener | ||
interface UnleashReadyListener: UnleashListener { | ||
fun onReady() | ||
} | ||
|
||
interface UnleashStateListener: UnleashListener { | ||
fun onStateChanged() | ||
} | ||
|
||
interface UnleashImpressionEventListener: UnleashListener { | ||
fun onImpression(event: ImpressionEvent) | ||
} |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the ability of sending multiple event listeners instead of just one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, Both
List<UnleashEventListener>
andList<UnleashListener>
, allow you to send more than one though? They accept a list of listeners. So the real change here is a rename of the Interface?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, previously it was just an event listener but the change from 1 to many was done in a previous PR, this comment is stale :D currently, only changing the interface