Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated to use correct comparison for distinctUntilChanged #97

Merged
merged 5 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,22 +257,26 @@ class DefaultUnleash(
}

override fun setContext(context: UnleashContext) {
unleashContextState.value = context
if (started.get()) {
refreshTogglesNow()
runBlocking {
withContext(Dispatchers.IO) {
fetcher.refreshTogglesWithContext(context)
}
}
}
unleashContextState.value = context
}

@Throws(TimeoutException::class)
override fun setContextWithTimeout(context: UnleashContext, timeout: Long) {
unleashContextState.value = context
if (started.get()) {
runBlocking {
withTimeout(timeout) {
fetcher.refreshToggles()
fetcher.refreshTogglesWithContext(context)
}
}
}
unleashContextState.value = context
}

override fun setContextAsync(context: UnleashContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import io.getunleash.android.errors.ServerException
import io.getunleash.android.events.HeartbeatEvent
import io.getunleash.android.http.Throttler
import io.getunleash.android.unleashScope
import java.io.Closeable
import java.io.IOException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
Expand All @@ -30,54 +37,55 @@ import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.internal.closeQuietly
import java.io.Closeable
import java.io.IOException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

/**
* Http Client for fetching data from Unleash Proxy.
* By default creates an OkHttpClient with readTimeout set to 2 seconds and a cache of 10 MBs
* @param httpClient - the http client to use for fetching toggles from Unleash proxy
* Http Client for fetching data from Unleash Proxy. By default creates an OkHttpClient with
* readTimeout set to 2 seconds and a cache of 10 MBs
* @param httpClient
* - the http client to use for fetching toggles from Unleash proxy
*/
open class UnleashFetcher(
unleashConfig: UnleashConfig,
private val httpClient: OkHttpClient,
private val unleashContext: StateFlow<UnleashContext>,
unleashConfig: UnleashConfig,
private val httpClient: OkHttpClient,
private val unleashContext: StateFlow<UnleashContext>,
) : Closeable {
companion object {
private const val TAG = "UnleashFetcher"
}

@Volatile private var contextForLastFetch: UnleashContext? = null
private val proxyUrl = unleashConfig.proxyUrl?.toHttpUrl()
private val applicationHeaders = unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy)
private val applicationHeaders =
unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy)
private val appName = unleashConfig.appName
private var etag: String? = null
private val featuresReceivedFlow = MutableSharedFlow<UnleashState>(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
private val fetcherHeartbeatFlow = MutableSharedFlow<HeartbeatEvent>(
extraBufferCapacity = 5,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
private val featuresReceivedFlow =
MutableSharedFlow<UnleashState>(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
private val fetcherHeartbeatFlow =
MutableSharedFlow<HeartbeatEvent>(
extraBufferCapacity = 5,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
private val coroutineContextForContextChange: CoroutineContext = Dispatchers.IO
private val currentCall = AtomicReference<Call?>(null)
private val throttler =
Throttler(
TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval),
longestAcceptableIntervalSeconds = 300,
proxyUrl.toString()
)
Throttler(
TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval),
longestAcceptableIntervalSeconds = 300,
proxyUrl.toString()
)

fun getFeaturesReceivedFlow() = featuresReceivedFlow.asSharedFlow()

fun startWatchingContext() {
unleashScope.launch {
unleashContext.distinctUntilChanged { old, new -> old != new }.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-state-flow/

Values in state flow are conflated using Any.equals comparison in a similar way to distinctUntilChanged operator. It is used to conflate incoming updates to value in MutableStateFlow and to suppress emission of the values to collectors when new value is equal to the previously emitted one. State flow behavior with classes that violate the contract for Any.equals is unspecified.

unleashContext.collect {
if (it == contextForLastFetch) {
Log.d(TAG, "Context unchanged, skipping refresh toggles")
return@collect
}
withContext(coroutineContextForContextChange) {
Log.d(TAG, "Unleash context changed: $it")
refreshToggles()
Expand All @@ -89,7 +97,7 @@ open class UnleashFetcher(
suspend fun refreshToggles(): ToggleResponse {
if (throttler.performAction()) {
Log.d(TAG, "Refreshing toggles")
val response = refreshTogglesWithContext(unleashContext.value)
val response = doFetchToggles(unleashContext.value)
fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message))
return response
}
Expand All @@ -98,15 +106,28 @@ open class UnleashFetcher(
return ToggleResponse(Status.THROTTLED)
}

internal suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse {
suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse {
if (throttler.performAction()) {
Log.d(TAG, "Refreshing toggles")
val response = doFetchToggles(ctx)
fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message))
return response
}
Log.i(TAG, "Skipping refresh toggles due to throttling")
fetcherHeartbeatFlow.emit(HeartbeatEvent(Status.THROTTLED))
return ToggleResponse(Status.THROTTLED)
}

internal suspend fun doFetchToggles(ctx: UnleashContext): ToggleResponse {
contextForLastFetch = ctx
val response = fetchToggles(ctx)
if (response.isSuccess()) {

val toggles = response.config!!.toggles.groupBy { it.name }
.mapValues { (_, v) -> v.first() }
val toggles =
response.config!!.toggles.groupBy { it.name }.mapValues { (_, v) -> v.first() }
Log.d(
TAG,
"Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow"
TAG,
"Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow"
)
featuresReceivedFlow.emit(UnleashState(ctx, toggles))
return ToggleResponse(response.status, toggles)
Expand All @@ -124,26 +145,31 @@ open class UnleashFetcher(

private suspend fun fetchToggles(ctx: UnleashContext): FetchResponse {
if (proxyUrl == null) {
return FetchResponse(Status.FAILED, error = IllegalStateException("Proxy URL is not set"))
return FetchResponse(
Status.FAILED,
error = IllegalStateException("Proxy URL is not set")
)
}
val contextUrl = buildContextUrl(ctx)
try {
val request = Request.Builder().url(contextUrl)
.headers(applicationHeaders.toHeaders())
val request = Request.Builder().url(contextUrl).headers(applicationHeaders.toHeaders())
if (etag != null) {
request.header("If-None-Match", etag!!)
}
val call = this.httpClient.newCall(request.build())
val inFlightCall = currentCall.get()
if (!currentCall.compareAndSet(inFlightCall, call)) {
return FetchResponse(
Status.FAILED,
error = IllegalStateException("Failed to set new call while ${inFlightCall?.request()?.url} is in flight")
Status.FAILED,
error =
IllegalStateException(
"Failed to set new call while ${inFlightCall?.request()?.url} is in flight"
)
)
} else if (inFlightCall != null && !inFlightCall.isCanceled()) {
} else if (inFlightCall != null && !inFlightCall.isCanceled() && !inFlightCall.isExecuted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not cancel completed requests

Log.d(
TAG,
"Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}"
TAG,
"Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}"
)
inFlightCall.cancel()
}
Expand All @@ -159,23 +185,21 @@ open class UnleashFetcher(
res.body?.use { b ->
try {
val proxyResponse: ProxyResponse =
proxyResponseAdapter.fromJson(b.string())!!
proxyResponseAdapter.fromJson(b.string())!!
FetchResponse(Status.SUCCESS, proxyResponse)
} catch (e: Exception) {
// If we fail to parse, just keep data
FetchResponse(Status.FAILED, error = e)
}
} ?: FetchResponse(Status.FAILED, error = NoBodyException())
}
?: FetchResponse(Status.FAILED, error = NoBodyException())
}

res.code == 304 -> {
FetchResponse(Status.NOT_MODIFIED)
}

res.code == 401 -> {
FetchResponse(Status.FAILED, error = NotAuthorizedException())
}

else -> {
FetchResponse(Status.FAILED, error = ServerException(res.code))
}
Expand All @@ -188,31 +212,33 @@ open class UnleashFetcher(

private suspend fun Call.await(): Response {
return suspendCancellableCoroutine { continuation ->
enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
continuation.resume(response)
}
enqueue(
object : Callback {
override fun onResponse(call: Call, response: Response) {
continuation.resume(response)
}

override fun onFailure(call: Call, e: IOException) {
// Don't bother with resuming the continuation if it is already cancelled.
if (continuation.isCancelled) return
continuation.resumeWithException(e)
}
})
override fun onFailure(call: Call, e: IOException) {
// Don't bother with resuming the continuation if it is already
// cancelled.
if (continuation.isCancelled) return
continuation.resumeWithException(e)
}
}
)

continuation.invokeOnCancellation {
try {
cancel()
} catch (ex: Throwable) {
//Ignore cancel exception
// Ignore cancel exception
}
}
}
}

private fun buildContextUrl(ctx: UnleashContext): HttpUrl {
var contextUrl = proxyUrl!!.newBuilder()
.addQueryParameter("appName", appName)
var contextUrl = proxyUrl!!.newBuilder().addQueryParameter("appName", appName)
if (ctx.userId != null) {
contextUrl.addQueryParameter("userId", ctx.userId)
}
Expand Down
Loading