Skip to content

Commit

Permalink
feat: add flag push (#30)
Browse files Browse the repository at this point in the history
* added flag push

* added config, minor renames

* add test, fixed small bugs

* remove updater onError cb, add updater tests

* fix bugs, added tests

* cleanup

* changed visibility modifiers

* added locks to ensure concurrency

* pr comments

* style

* added comment, fix max jitter for retry wrapper

* lint

* chore delete unused method
  • Loading branch information
zhukaihan authored Oct 21, 2024
1 parent fbd7b81 commit 27caeb5
Show file tree
Hide file tree
Showing 16 changed files with 1,471 additions and 87 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
testImplementation("io.mockk:mockk:${Versions.mockk}")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:${Versions.serializationRuntime}")
implementation("com.squareup.okhttp3:okhttp:${Versions.okhttp}")
implementation("com.squareup.okhttp3:okhttp-sse:${Versions.okhttpSse}")
implementation("com.amplitude:evaluation-core:${Versions.evaluationCore}")
implementation("com.amplitude:java-sdk:${Versions.amplitudeAnalytics}")
implementation("org.json:json:${Versions.json}")
Expand Down
1 change: 1 addition & 0 deletions buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ object Versions {
const val serializationRuntime = "1.4.1"
const val json = "20231013"
const val okhttp = "4.12.0"
const val okhttpSse = "4.12.0" // Update this alongside okhttp. Note this library isn't stable and may contain breaking changes. Search uses of okhttp3.internal classes before updating.
const val evaluationCore = "2.0.0-beta.2"
const val amplitudeAnalytics = "1.12.3"
const val mockk = "1.13.9"
Expand Down
20 changes: 19 additions & 1 deletion src/main/kotlin/LocalEvaluationClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amplitude.experiment.evaluation.EvaluationEngineImpl
import com.amplitude.experiment.evaluation.EvaluationFlag
import com.amplitude.experiment.evaluation.topologicalSort
import com.amplitude.experiment.flag.DynamicFlagConfigApi
import com.amplitude.experiment.flag.FlagConfigStreamApi
import com.amplitude.experiment.flag.InMemoryFlagConfigStorage
import com.amplitude.experiment.util.LocalEvaluationMetricsWrapper
import com.amplitude.experiment.util.Logger
Expand All @@ -42,8 +43,12 @@ class LocalEvaluationClient internal constructor(
) {
private val assignmentService: AssignmentService? = createAssignmentService(apiKey)
private val serverUrl: HttpUrl = getServerUrl(config)
private val streamServerUrl: HttpUrl = getStreamServerUrl(config)
private val evaluation: EvaluationEngine = EvaluationEngineImpl()
private val flagConfigApi = DynamicFlagConfigApi(apiKey, serverUrl, getProxyUrl(config), httpClient, metrics)
private val flagConfigApi = DynamicFlagConfigApi(apiKey, serverUrl, null, httpClient, metrics)
private val proxyUrl: HttpUrl? = getProxyUrl(config)
private val flagConfigProxyApi = if (proxyUrl == null) null else DynamicFlagConfigApi(apiKey, proxyUrl, null, httpClient)
private val flagConfigStreamApi = if (config.streamUpdates) FlagConfigStreamApi(apiKey, streamServerUrl, httpClient, config.streamFlagConnTimeoutMillis) else null
private val flagConfigStorage = InMemoryFlagConfigStorage()
private val cohortStorage = if (config.cohortSyncConfig == null) {
null
Expand All @@ -60,6 +65,8 @@ class LocalEvaluationClient internal constructor(
private val deploymentRunner = DeploymentRunner(
config = config,
flagConfigApi = flagConfigApi,
flagConfigProxyApi = flagConfigProxyApi,
flagConfigStreamApi = flagConfigStreamApi,
flagConfigStorage = flagConfigStorage,
cohortApi = cohortApi,
cohortStorage = cohortStorage,
Expand Down Expand Up @@ -190,6 +197,17 @@ private fun getServerUrl(config: LocalEvaluationConfig): HttpUrl {
}
}

private fun getStreamServerUrl(config: LocalEvaluationConfig): HttpUrl {
return if (config.streamServerUrl == LocalEvaluationConfig.Defaults.STREAM_SERVER_URL) {
when (config.serverZone) {
ServerZone.US -> US_STREAM_SERVER_URL.toHttpUrl()
ServerZone.EU -> EU_STREAM_SERVER_URL.toHttpUrl()
}
} else {
config.streamServerUrl.toHttpUrl()
}
}

private fun getProxyUrl(config: LocalEvaluationConfig): HttpUrl? {
return config.evaluationProxyConfig?.proxyUrl?.toHttpUrl()
}
Expand Down
32 changes: 32 additions & 0 deletions src/main/kotlin/LocalEvaluationConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ class LocalEvaluationConfig internal constructor(
@JvmField
val flagConfigPollerRequestTimeoutMillis: Long = Defaults.FLAG_CONFIG_POLLER_REQUEST_TIMEOUT_MILLIS,
@JvmField
val streamUpdates: Boolean = Defaults.STREAM_UPDATES,
@JvmField
val streamServerUrl: String = Defaults.STREAM_SERVER_URL,
@JvmField
val streamFlagConnTimeoutMillis: Long = Defaults.STREAM_FLAG_CONN_TIMEOUT_MILLIS,
@JvmField
val assignmentConfiguration: AssignmentConfiguration? = Defaults.ASSIGNMENT_CONFIGURATION,
@JvmField
val cohortSyncConfig: CohortSyncConfig? = Defaults.COHORT_SYNC_CONFIGURATION,
Expand Down Expand Up @@ -76,6 +82,12 @@ class LocalEvaluationConfig internal constructor(
*/
const val FLAG_CONFIG_POLLER_REQUEST_TIMEOUT_MILLIS = 10_000L

const val STREAM_UPDATES = false

const val STREAM_SERVER_URL = US_STREAM_SERVER_URL

const val STREAM_FLAG_CONN_TIMEOUT_MILLIS = 1_500L

/**
* null
*/
Expand Down Expand Up @@ -111,6 +123,9 @@ class LocalEvaluationConfig internal constructor(
private var serverUrl = Defaults.SERVER_URL
private var flagConfigPollerIntervalMillis = Defaults.FLAG_CONFIG_POLLER_INTERVAL_MILLIS
private var flagConfigPollerRequestTimeoutMillis = Defaults.FLAG_CONFIG_POLLER_REQUEST_TIMEOUT_MILLIS
private var streamUpdates = Defaults.STREAM_UPDATES
private var streamServerUrl = Defaults.STREAM_SERVER_URL
private var streamFlagConnTimeoutMillis = Defaults.STREAM_FLAG_CONN_TIMEOUT_MILLIS
private var assignmentConfiguration = Defaults.ASSIGNMENT_CONFIGURATION
private var cohortSyncConfiguration = Defaults.COHORT_SYNC_CONFIGURATION
private var evaluationProxyConfiguration = Defaults.EVALUATION_PROXY_CONFIGURATION
Expand All @@ -136,6 +151,18 @@ class LocalEvaluationConfig internal constructor(
this.flagConfigPollerRequestTimeoutMillis = flagConfigPollerRequestTimeoutMillis
}

fun streamUpdates(streamUpdates: Boolean) = apply {
this.streamUpdates = streamUpdates
}

fun streamServerUrl(streamServerUrl: String) = apply {
this.streamServerUrl = streamServerUrl
}

fun streamFlagConnTimeoutMillis(streamFlagConnTimeoutMillis: Long) = apply {
this.streamFlagConnTimeoutMillis = streamFlagConnTimeoutMillis
}

fun enableAssignmentTracking(assignmentConfiguration: AssignmentConfiguration) = apply {
this.assignmentConfiguration = assignmentConfiguration
}
Expand All @@ -161,6 +188,9 @@ class LocalEvaluationConfig internal constructor(
serverZone = serverZone,
flagConfigPollerIntervalMillis = flagConfigPollerIntervalMillis,
flagConfigPollerRequestTimeoutMillis = flagConfigPollerRequestTimeoutMillis,
streamUpdates = streamUpdates,
streamServerUrl = streamServerUrl,
streamFlagConnTimeoutMillis = streamFlagConnTimeoutMillis,
assignmentConfiguration = assignmentConfiguration,
cohortSyncConfig = cohortSyncConfiguration,
evaluationProxyConfig = evaluationProxyConfiguration,
Expand Down Expand Up @@ -207,6 +237,8 @@ interface LocalEvaluationMetrics {
fun onFlagConfigFetch()
fun onFlagConfigFetchFailure(exception: Exception)
fun onFlagConfigFetchOriginFallback(exception: Exception)
fun onFlagConfigStream()
fun onFlagConfigStreamFailure(exception: Exception?)
fun onCohortDownload()
fun onCohortDownloadTooLarge(exception: Exception)
fun onCohortDownloadFailure(exception: Exception)
Expand Down
2 changes: 2 additions & 0 deletions src/main/kotlin/ServerZone.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.amplitude.experiment

internal const val US_SERVER_URL = "https://api.lab.amplitude.com"
internal const val EU_SERVER_URL = "https://api.lab.eu.amplitude.com"
internal const val US_STREAM_SERVER_URL = "https://stream.lab.amplitude.com"
internal const val EU_STREAM_SERVER_URL = "https://stream.lab.eu.amplitude.com"
internal const val US_COHORT_SERVER_URL = "https://cohort-v2.lab.amplitude.com"
internal const val EU_COHORT_SERVER_URL = "https://cohort-v2.lab.eu.amplitude.com"
internal const val US_EVENT_SERVER_URL = "https://api2.amplitude.com/2/httpapi"
Expand Down
107 changes: 31 additions & 76 deletions src/main/kotlin/deployment/DeploymentRunner.kt
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
@file:OptIn(ExperimentalApi::class)

package com.amplitude.experiment.deployment

import com.amplitude.experiment.ExperimentalApi
import com.amplitude.experiment.LocalEvaluationConfig
import com.amplitude.experiment.LocalEvaluationMetrics
import com.amplitude.experiment.cohort.CohortApi
import com.amplitude.experiment.cohort.CohortLoader
import com.amplitude.experiment.cohort.CohortStorage
import com.amplitude.experiment.flag.FlagConfigApi
import com.amplitude.experiment.flag.FlagConfigFallbackRetryWrapper
import com.amplitude.experiment.flag.FlagConfigPoller
import com.amplitude.experiment.flag.FlagConfigStorage
import com.amplitude.experiment.flag.FlagConfigStreamApi
import com.amplitude.experiment.flag.FlagConfigStreamer
import com.amplitude.experiment.util.LocalEvaluationMetricsWrapper
import com.amplitude.experiment.util.Logger
import com.amplitude.experiment.util.Once
import com.amplitude.experiment.util.daemonFactory
import com.amplitude.experiment.util.getAllCohortIds
import com.amplitude.experiment.util.wrapMetrics
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

private const val MIN_COHORT_POLLING_INTERVAL = 60000L
private const val FLAG_POLLING_JITTER = 1000L

internal class DeploymentRunner(
private val config: LocalEvaluationConfig,
private val flagConfigApi: FlagConfigApi,
private val flagConfigProxyApi: FlagConfigApi? = null,
private val flagConfigStreamApi: FlagConfigStreamApi? = null,
private val flagConfigStorage: FlagConfigStorage,
cohortApi: CohortApi?,
private val cohortStorage: CohortStorage?,
Expand All @@ -39,21 +40,31 @@ internal class DeploymentRunner(
null
}
private val cohortPollingInterval: Long = getCohortPollingInterval()
// Fallback in this order: proxy, stream, poll.
private val amplitudeFlagConfigPoller = FlagConfigFallbackRetryWrapper(
FlagConfigPoller(flagConfigApi, flagConfigStorage, cohortLoader, cohortStorage, config, metrics),
null,
config.flagConfigPollerIntervalMillis,
)
private val amplitudeFlagConfigUpdater =
if (flagConfigStreamApi != null)
FlagConfigFallbackRetryWrapper(
FlagConfigStreamer(flagConfigStreamApi, flagConfigStorage, cohortLoader, cohortStorage, metrics),
amplitudeFlagConfigPoller,
FLAG_POLLING_JITTER
)
else amplitudeFlagConfigPoller
private val flagConfigUpdater =
if (flagConfigProxyApi != null)
FlagConfigFallbackRetryWrapper(
FlagConfigPoller(flagConfigProxyApi, flagConfigStorage, cohortLoader, cohortStorage, config, metrics),
amplitudeFlagConfigPoller
)
else
amplitudeFlagConfigUpdater

fun start() = lock.once {
refresh()
poller.scheduleWithFixedDelay(
{
try {
refresh()
} catch (t: Throwable) {
Logger.e("Refresh flag configs failed.", t)
}
},
config.flagConfigPollerIntervalMillis,
config.flagConfigPollerIntervalMillis,
TimeUnit.MILLISECONDS
)
flagConfigUpdater.start()
if (cohortLoader != null) {
poller.scheduleWithFixedDelay(
{
Expand All @@ -79,63 +90,7 @@ internal class DeploymentRunner(

fun stop() {
poller.shutdown()
}

fun refresh() {
Logger.d("Refreshing flag configs.")
// Get updated flags from the network.
val flagConfigs = wrapMetrics(
metric = metrics::onFlagConfigFetch,
failure = metrics::onFlagConfigFetchFailure,
) {
flagConfigApi.getFlagConfigs()
}

// Remove flags that no longer exist.
val flagKeys = flagConfigs.map { it.key }.toSet()
flagConfigStorage.removeIf { !flagKeys.contains(it.key) }

// Get all flags from storage
val storageFlags = flagConfigStorage.getFlagConfigs()

// Load cohorts for each flag if applicable and put the flag in storage.
val futures = ConcurrentHashMap<String, CompletableFuture<*>>()
for (flagConfig in flagConfigs) {
if (cohortLoader == null) {
flagConfigStorage.putFlagConfig(flagConfig)
continue
}
val cohortIds = flagConfig.getAllCohortIds()
val storageCohortIds = storageFlags[flagConfig.key]?.getAllCohortIds() ?: emptySet()
val cohortsToLoad = cohortIds - storageCohortIds
if (cohortsToLoad.isEmpty()) {
flagConfigStorage.putFlagConfig(flagConfig)
continue
}
for (cohortId in cohortsToLoad) {
futures.putIfAbsent(
cohortId,
cohortLoader.loadCohort(cohortId).handle { _, exception ->
if (exception != null) {
Logger.e("Failed to load cohort $cohortId", exception)
}
flagConfigStorage.putFlagConfig(flagConfig)
}
)
}
}
futures.values.forEach { it.join() }

// Delete unused cohorts
if (cohortStorage != null) {
val flagCohortIds = flagConfigStorage.getFlagConfigs().values.toList().getAllCohortIds()
val storageCohortIds = cohortStorage.getCohorts().keys
val deletedCohortIds = storageCohortIds - flagCohortIds
for (deletedCohortId in deletedCohortIds) {
cohortStorage.deleteCohort(deletedCohortId)
}
}
Logger.d("Refreshed ${flagConfigs.size} flag configs.")
flagConfigUpdater.shutdown()
}

private fun getCohortPollingInterval(): Long {
Expand Down
Loading

0 comments on commit 27caeb5

Please sign in to comment.