diff --git a/unleashandroidsdk/src/main/java/io/getunleash/android/DefaultUnleash.kt b/unleashandroidsdk/src/main/java/io/getunleash/android/DefaultUnleash.kt index a067dcd..7c1245b 100644 --- a/unleashandroidsdk/src/main/java/io/getunleash/android/DefaultUnleash.kt +++ b/unleashandroidsdk/src/main/java/io/getunleash/android/DefaultUnleash.kt @@ -257,22 +257,26 @@ class DefaultUnleash( } override fun setContext(context: UnleashContext) { - unleashContextState.value = context if (started.get()) { - refreshTogglesNow() + runBlocking { + withContext(Dispatchers.IO) { + fetcher.refreshTogglesWithContext(context) + } + } } + unleashContextState.value = context } @Throws(TimeoutException::class) override fun setContextWithTimeout(context: UnleashContext, timeout: Long) { - unleashContextState.value = context if (started.get()) { runBlocking { withTimeout(timeout) { - fetcher.refreshToggles() + fetcher.refreshTogglesWithContext(context) } } } + unleashContextState.value = context } override fun setContextAsync(context: UnleashContext) { diff --git a/unleashandroidsdk/src/main/java/io/getunleash/android/polling/UnleashFetcher.kt b/unleashandroidsdk/src/main/java/io/getunleash/android/polling/UnleashFetcher.kt index 93c7fa8..d383820 100644 --- a/unleashandroidsdk/src/main/java/io/getunleash/android/polling/UnleashFetcher.kt +++ b/unleashandroidsdk/src/main/java/io/getunleash/android/polling/UnleashFetcher.kt @@ -11,6 +11,13 @@ import io.getunleash.android.errors.ServerException import io.getunleash.android.events.HeartbeatEvent import io.getunleash.android.http.Throttler import io.getunleash.android.unleashScope +import java.io.Closeable +import java.io.IOException +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow @@ -30,54 +37,55 @@ import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.Response import okhttp3.internal.closeQuietly -import java.io.Closeable -import java.io.IOException -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException /** - * Http Client for fetching data from Unleash Proxy. - * By default creates an OkHttpClient with readTimeout set to 2 seconds and a cache of 10 MBs - * @param httpClient - the http client to use for fetching toggles from Unleash proxy + * Http Client for fetching data from Unleash Proxy. By default creates an OkHttpClient with + * readTimeout set to 2 seconds and a cache of 10 MBs + * @param httpClient + * - the http client to use for fetching toggles from Unleash proxy */ open class UnleashFetcher( - unleashConfig: UnleashConfig, - private val httpClient: OkHttpClient, - private val unleashContext: StateFlow, + unleashConfig: UnleashConfig, + private val httpClient: OkHttpClient, + private val unleashContext: StateFlow, ) : Closeable { companion object { private const val TAG = "UnleashFetcher" } - + @Volatile private var contextForLastFetch: UnleashContext? = null private val proxyUrl = unleashConfig.proxyUrl?.toHttpUrl() - private val applicationHeaders = unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy) + private val applicationHeaders = + unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy) private val appName = unleashConfig.appName private var etag: String? = null - private val featuresReceivedFlow = MutableSharedFlow( - replay = 1, - onBufferOverflow = BufferOverflow.DROP_OLDEST - ) - private val fetcherHeartbeatFlow = MutableSharedFlow( - extraBufferCapacity = 5, - onBufferOverflow = BufferOverflow.DROP_OLDEST - ) + private val featuresReceivedFlow = + MutableSharedFlow( + replay = 1, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + private val fetcherHeartbeatFlow = + MutableSharedFlow( + extraBufferCapacity = 5, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) private val coroutineContextForContextChange: CoroutineContext = Dispatchers.IO private val currentCall = AtomicReference(null) private val throttler = - Throttler( - TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval), - longestAcceptableIntervalSeconds = 300, - proxyUrl.toString() - ) + Throttler( + TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval), + longestAcceptableIntervalSeconds = 300, + proxyUrl.toString() + ) fun getFeaturesReceivedFlow() = featuresReceivedFlow.asSharedFlow() fun startWatchingContext() { unleashScope.launch { - unleashContext.distinctUntilChanged { old, new -> old != new }.collect { + unleashContext.collect { + if (it == contextForLastFetch) { + Log.d(TAG, "Context unchanged, skipping refresh toggles") + return@collect + } withContext(coroutineContextForContextChange) { Log.d(TAG, "Unleash context changed: $it") refreshToggles() @@ -89,7 +97,7 @@ open class UnleashFetcher( suspend fun refreshToggles(): ToggleResponse { if (throttler.performAction()) { Log.d(TAG, "Refreshing toggles") - val response = refreshTogglesWithContext(unleashContext.value) + val response = doFetchToggles(unleashContext.value) fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message)) return response } @@ -98,15 +106,28 @@ open class UnleashFetcher( return ToggleResponse(Status.THROTTLED) } - internal suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse { + suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse { + if (throttler.performAction()) { + Log.d(TAG, "Refreshing toggles") + val response = doFetchToggles(ctx) + fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message)) + return response + } + Log.i(TAG, "Skipping refresh toggles due to throttling") + fetcherHeartbeatFlow.emit(HeartbeatEvent(Status.THROTTLED)) + return ToggleResponse(Status.THROTTLED) + } + + internal suspend fun doFetchToggles(ctx: UnleashContext): ToggleResponse { + contextForLastFetch = ctx val response = fetchToggles(ctx) if (response.isSuccess()) { - val toggles = response.config!!.toggles.groupBy { it.name } - .mapValues { (_, v) -> v.first() } + val toggles = + response.config!!.toggles.groupBy { it.name }.mapValues { (_, v) -> v.first() } Log.d( - TAG, - "Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow" + TAG, + "Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow" ) featuresReceivedFlow.emit(UnleashState(ctx, toggles)) return ToggleResponse(response.status, toggles) @@ -124,12 +145,14 @@ open class UnleashFetcher( private suspend fun fetchToggles(ctx: UnleashContext): FetchResponse { if (proxyUrl == null) { - return FetchResponse(Status.FAILED, error = IllegalStateException("Proxy URL is not set")) + return FetchResponse( + Status.FAILED, + error = IllegalStateException("Proxy URL is not set") + ) } val contextUrl = buildContextUrl(ctx) try { - val request = Request.Builder().url(contextUrl) - .headers(applicationHeaders.toHeaders()) + val request = Request.Builder().url(contextUrl).headers(applicationHeaders.toHeaders()) if (etag != null) { request.header("If-None-Match", etag!!) } @@ -137,13 +160,16 @@ open class UnleashFetcher( val inFlightCall = currentCall.get() if (!currentCall.compareAndSet(inFlightCall, call)) { return FetchResponse( - Status.FAILED, - error = IllegalStateException("Failed to set new call while ${inFlightCall?.request()?.url} is in flight") + Status.FAILED, + error = + IllegalStateException( + "Failed to set new call while ${inFlightCall?.request()?.url} is in flight" + ) ) - } else if (inFlightCall != null && !inFlightCall.isCanceled()) { + } else if (inFlightCall != null && !inFlightCall.isCanceled() && !inFlightCall.isExecuted()) { Log.d( - TAG, - "Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}" + TAG, + "Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}" ) inFlightCall.cancel() } @@ -159,23 +185,21 @@ open class UnleashFetcher( res.body?.use { b -> try { val proxyResponse: ProxyResponse = - proxyResponseAdapter.fromJson(b.string())!! + proxyResponseAdapter.fromJson(b.string())!! FetchResponse(Status.SUCCESS, proxyResponse) } catch (e: Exception) { // If we fail to parse, just keep data FetchResponse(Status.FAILED, error = e) } - } ?: FetchResponse(Status.FAILED, error = NoBodyException()) + } + ?: FetchResponse(Status.FAILED, error = NoBodyException()) } - res.code == 304 -> { FetchResponse(Status.NOT_MODIFIED) } - res.code == 401 -> { FetchResponse(Status.FAILED, error = NotAuthorizedException()) } - else -> { FetchResponse(Status.FAILED, error = ServerException(res.code)) } @@ -188,31 +212,33 @@ open class UnleashFetcher( private suspend fun Call.await(): Response { return suspendCancellableCoroutine { continuation -> - enqueue(object : Callback { - override fun onResponse(call: Call, response: Response) { - continuation.resume(response) - } + enqueue( + object : Callback { + override fun onResponse(call: Call, response: Response) { + continuation.resume(response) + } - override fun onFailure(call: Call, e: IOException) { - // Don't bother with resuming the continuation if it is already cancelled. - if (continuation.isCancelled) return - continuation.resumeWithException(e) - } - }) + override fun onFailure(call: Call, e: IOException) { + // Don't bother with resuming the continuation if it is already + // cancelled. + if (continuation.isCancelled) return + continuation.resumeWithException(e) + } + } + ) continuation.invokeOnCancellation { try { cancel() } catch (ex: Throwable) { - //Ignore cancel exception + // Ignore cancel exception } } } } private fun buildContextUrl(ctx: UnleashContext): HttpUrl { - var contextUrl = proxyUrl!!.newBuilder() - .addQueryParameter("appName", appName) + var contextUrl = proxyUrl!!.newBuilder().addQueryParameter("appName", appName) if (ctx.userId != null) { contextUrl.addQueryParameter("userId", ctx.userId) } diff --git a/unleashandroidsdk/src/test/java/io/getunleash/android/DefaultUnleashTest.kt b/unleashandroidsdk/src/test/java/io/getunleash/android/DefaultUnleashTest.kt index 1a9844e..09b808f 100644 --- a/unleashandroidsdk/src/test/java/io/getunleash/android/DefaultUnleashTest.kt +++ b/unleashandroidsdk/src/test/java/io/getunleash/android/DefaultUnleashTest.kt @@ -15,11 +15,12 @@ import io.getunleash.android.polling.Status import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.fail import org.assertj.core.groups.Tuple import org.awaitility.Awaitility.await +import org.junit.Assert import org.junit.Test import org.mockito.Mockito.mock -import org.mockito.Mockito.verify import org.robolectric.shadows.ShadowLog import java.io.File import java.util.concurrent.TimeUnit @@ -299,7 +300,7 @@ class DefaultUnleashTest : BaseTest() { .proxyUrl(server.url("").toString()) .clientKey("key-123") .pollingStrategy.enabled(true) - .pollingStrategy.delay(10000) // delay enough so it won't trigger a new request + .pollingStrategy.delay(20000) // delay enough so it won't trigger a new request .metricsStrategy.enabled(false) .localStorageConfig.enabled(false) .build(), @@ -327,29 +328,84 @@ class DefaultUnleashTest : BaseTest() { })) await().atMost(5, TimeUnit.SECONDS).until { - togglesUpdated > 0 + togglesUpdated == 1 } // change context to force a refresh - unleash.setContext(UnleashContext(userId = "2")) + unleash.setContextAsync(UnleashContext(userId = "2")) await().atMost(2, TimeUnit.SECONDS).until { - togglesChecked > 0 + togglesChecked == 1 } - unleash.setContext(UnleashContext(userId = "3")) + unleash.setContextAsync(UnleashContext(userId = "3")) await().atMost(2, TimeUnit.SECONDS).until { - togglesFailed > 0 + togglesFailed == 1 } // too fast request after an error should be throttled - unleash.setContext(UnleashContext(userId = "4")) + unleash.setContextAsync(UnleashContext(userId = "4")) await().atMost(2, TimeUnit.SECONDS).until { - togglesThrottled > 0 + togglesThrottled == 1 } assertThat(togglesUpdated).isEqualTo(1) assertThat(togglesChecked).isEqualTo(1) - assertThat(togglesFailed).isEqualTo(1) + assertThat(togglesFailed).isEqualTo(1) assertThat(togglesThrottled).isEqualTo(1) } + @Test + fun `when set context call we only refresh once`() { + val server = MockWebServer() + server.enqueue( + MockResponse().setBody( + this::class.java.classLoader?.getResource("sample-response.json")!!.readText() + ) + ) + server.enqueue( + MockResponse().setResponseCode(304).setBody("") + ) + server.enqueue( + MockResponse().setResponseCode(304).setBody("") + ) + val unleash = DefaultUnleash( + androidContext = mock(Context::class.java), + unleashConfig = UnleashConfig.newBuilder("test-android-app") + .proxyUrl(server.url("").toString()) + .clientKey("key-123") + .pollingStrategy.enabled(true) + .pollingStrategy.delay(20000) // delay enough so it won't trigger a new request + .metricsStrategy.enabled(false) + .localStorageConfig.enabled(false) + .build(), + unleashContext = UnleashContext(userId = "1"), + lifecycle = mock(Lifecycle::class.java), + ) + + var togglesUpdated = 0 + var togglesChecked = 0 + + unleash.start(eventListeners = listOf(object : UnleashFetcherHeartbeatListener { + override fun togglesUpdated() { + togglesUpdated++ + } + + override fun togglesChecked() { + togglesChecked++ + } + + override fun onError(event: HeartbeatEvent) { + Assert.fail("Should not have errors") + } + })) + + await().atMost(5, TimeUnit.SECONDS).until { + togglesUpdated == 1 + } + + // change context to force a refresh + unleash.setContext(UnleashContext(userId = "2")) + assertThat(togglesChecked).isEqualTo(1) + unleash.setContext(UnleashContext(userId = "3")) + assertThat(togglesChecked).isEqualTo(2) + } @Test fun `if unleash is not started, setting context does not poll, until start is called`() { diff --git a/unleashandroidsdk/src/test/java/io/getunleash/android/polling/UnleashFetcherTest.kt b/unleashandroidsdk/src/test/java/io/getunleash/android/polling/UnleashFetcherTest.kt index 5efba1e..381b771 100644 --- a/unleashandroidsdk/src/test/java/io/getunleash/android/polling/UnleashFetcherTest.kt +++ b/unleashandroidsdk/src/test/java/io/getunleash/android/polling/UnleashFetcherTest.kt @@ -77,12 +77,12 @@ class UnleashFetcherTest : BaseTest() { runBlocking { launch { println("Setting context to 123") - unleashFetcher.refreshTogglesWithContext(UnleashContext(userId = "123")) + unleashFetcher.doFetchToggles(UnleashContext(userId = "123")) } delay(150) launch { println("Setting context to 321") - unleashFetcher.refreshTogglesWithContext(UnleashContext(userId = "321")) + unleashFetcher.doFetchToggles(UnleashContext(userId = "321")) } }