Skip to content

Commit

Permalink
feat: add proxy apis
Browse files Browse the repository at this point in the history
  • Loading branch information
bgiori committed Aug 2, 2024
1 parent 13d4c49 commit 4ab3c0b
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 29 deletions.
38 changes: 21 additions & 17 deletions src/main/kotlin/LocalEvaluationClient.kt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
@file:OptIn(ExperimentalApi::class)

package com.amplitude.experiment

import com.amplitude.Amplitude
Expand All @@ -7,18 +9,17 @@ import com.amplitude.experiment.assignment.Assignment
import com.amplitude.experiment.assignment.AssignmentService
import com.amplitude.experiment.assignment.InMemoryAssignmentFilter
import com.amplitude.experiment.cohort.CohortApi
import com.amplitude.experiment.cohort.DirectCohortApi
import com.amplitude.experiment.cohort.DynamicCohortApi
import com.amplitude.experiment.cohort.InMemoryCohortStorage
import com.amplitude.experiment.deployment.DeploymentRunner
import com.amplitude.experiment.evaluation.EvaluationEngine
import com.amplitude.experiment.evaluation.EvaluationEngineImpl
import com.amplitude.experiment.evaluation.EvaluationFlag
import com.amplitude.experiment.evaluation.topologicalSort
import com.amplitude.experiment.flag.DirectFlagConfigApi
import com.amplitude.experiment.flag.DynamicFlagConfigApi
import com.amplitude.experiment.flag.InMemoryFlagConfigStorage
import com.amplitude.experiment.util.LocalEvaluationMetricsWrapper
import com.amplitude.experiment.util.Logger
import com.amplitude.experiment.util.Once
import com.amplitude.experiment.util.USER_GROUP_TYPE
import com.amplitude.experiment.util.filterDefaultVariants
import com.amplitude.experiment.util.getGroupedCohortIds
Expand All @@ -35,12 +36,12 @@ class LocalEvaluationClient internal constructor(
private val httpClient: OkHttpClient = OkHttpClient(),
cohortApi: CohortApi? = getCohortDownloadApi(config, httpClient)
) {
private val startLock = Once()

private val assignmentService: AssignmentService? = createAssignmentService(apiKey)
private val serverUrl: HttpUrl = getServerUrl(config)
private val evaluation: EvaluationEngine = EvaluationEngineImpl()
private val metrics: LocalEvaluationMetrics = LocalEvaluationMetricsWrapper(config.metrics)
private val flagConfigApi = DirectFlagConfigApi(apiKey, serverUrl, httpClient)
private val flagConfigApi = DynamicFlagConfigApi(apiKey, serverUrl, getProxyUrl(config), httpClient)
private val flagConfigStorage = InMemoryFlagConfigStorage()
private val cohortStorage = if (config.cohortSyncConfiguration != null) {
InMemoryCohortStorage()
Expand Down Expand Up @@ -138,17 +139,16 @@ class LocalEvaluationClient internal constructor(
}
}.build()
}


}

private fun getCohortDownloadApi(config: LocalEvaluationConfig, httpClient: OkHttpClient): CohortApi? {
return if (config.cohortSyncConfiguration != null) {
DirectCohortApi(
DynamicCohortApi(
apiKey = config.cohortSyncConfiguration.apiKey,
secretKey = config.cohortSyncConfiguration.secretKey,
maxCohortSize = config.cohortSyncConfiguration.maxCohortSize,
serverUrl = getCohortServerUrl(config),
proxyUrl = getProxyUrl(config),
httpClient = httpClient,
)
} else {
Expand All @@ -157,21 +157,25 @@ private fun getCohortDownloadApi(config: LocalEvaluationConfig, httpClient: OkHt
}

private fun getServerUrl(config: LocalEvaluationConfig): HttpUrl {
if (config.serverZone == LocalEvaluationConfig.Defaults.SERVER_ZONE) {
return config.serverUrl.toHttpUrl()
return if (config.serverZone == LocalEvaluationConfig.Defaults.SERVER_ZONE) {
config.serverUrl.toHttpUrl()
} else {
return when (config.serverZone) {
when (config.serverZone) {
ServerZone.US -> US_SERVER_URL.toHttpUrl()
ServerZone.EU -> EU_SERVER_URL.toHttpUrl()
}
}
}

private fun getProxyUrl(config: LocalEvaluationConfig): HttpUrl? {
return config.evaluationProxyConfiguration?.proxyUrl?.toHttpUrl()
}

private fun getCohortServerUrl(config: LocalEvaluationConfig): HttpUrl {
if (config.serverZone == LocalEvaluationConfig.Defaults.SERVER_ZONE) {
return config.cohortServerUrl.toHttpUrl()
return if (config.serverZone == LocalEvaluationConfig.Defaults.SERVER_ZONE) {
config.cohortServerUrl.toHttpUrl()
} else {
return when (config.serverZone) {
when (config.serverZone) {
ServerZone.US -> US_SERVER_URL.toHttpUrl()
ServerZone.EU -> EU_SERVER_URL.toHttpUrl()
}
Expand All @@ -182,10 +186,10 @@ private fun getEventServerUrl(
config: LocalEvaluationConfig,
assignmentConfiguration: AssignmentConfiguration
): String {
if (config.serverZone == LocalEvaluationConfig.Defaults.SERVER_ZONE) {
return assignmentConfiguration.serverUrl
return if (config.serverZone == LocalEvaluationConfig.Defaults.SERVER_ZONE) {
assignmentConfiguration.serverUrl
} else {
return when (config.serverZone) {
when (config.serverZone) {
ServerZone.US -> US_EVENT_SERVER_URL
ServerZone.EU -> EU_EVENT_SERVER_URL
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/kotlin/LocalEvaluationConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,10 @@ interface LocalEvaluationMetrics {
fun onAssignmentEventFailure(exception: Exception)
fun onFlagConfigFetch()
fun onFlagConfigFetchFailure(exception: Exception)
fun onFlagConfigFetchOriginFallback(exception: Exception)
fun onCohortDownload()
fun onCohortDownloadFailure(exception: Exception)
fun onCohortDownloadOriginFallback(exception: Exception)
fun onProxyCohortMembership()
fun onProxyCohortMembershipFailure(exception: Exception)
}
20 changes: 19 additions & 1 deletion src/main/kotlin/cohort/CohortApi.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.amplitude.experiment.cohort

import com.amplitude.experiment.LIBRARY_VERSION
import com.amplitude.experiment.LocalEvaluationMetrics
import com.amplitude.experiment.util.BackoffConfig
import com.amplitude.experiment.util.HttpErrorResponseException
import com.amplitude.experiment.util.LocalEvaluationMetricsWrapper
import com.amplitude.experiment.util.Logger
import com.amplitude.experiment.util.backoff
import com.amplitude.experiment.util.get
Expand Down Expand Up @@ -41,12 +43,14 @@ internal interface CohortApi {
fun getCohort(cohortId: String, cohort: Cohort?): Cohort
}

internal class DirectCohortApi(
internal class DynamicCohortApi(
apiKey: String,
secretKey: String,
private val maxCohortSize: Int,
private val serverUrl: HttpUrl,
private val proxyUrl: HttpUrl?,
private val httpClient: OkHttpClient,
private val metrics: LocalEvaluationMetrics = LocalEvaluationMetricsWrapper()
) : CohortApi {

private val token = Base64.getEncoder().encodeToString("$apiKey:$secretKey".toByteArray())
Expand All @@ -58,6 +62,19 @@ internal class DirectCohortApi(
)

override fun getCohort(cohortId: String, cohort: Cohort?): Cohort {
return if (proxyUrl != null) {
try {
getCohort(proxyUrl, cohortId, cohort)
} catch (e: Exception) {
metrics.onCohortDownloadOriginFallback(e)
getCohort(serverUrl, cohortId, cohort)
}
} else {
getCohort(serverUrl, cohortId, cohort)
}
}

private fun getCohort(url: HttpUrl, cohortId: String, cohort: Cohort?): Cohort {
Logger.d("getCohortMembers($cohortId): start")
val future = backoff(backoffConfig, {
val headers = mapOf(
Expand Down Expand Up @@ -98,4 +115,5 @@ internal class DirectCohortApi(
throw e.cause ?: e
}
}

}
52 changes: 43 additions & 9 deletions src/main/kotlin/flag/FlagConfigApi.kt
Original file line number Diff line number Diff line change
@@ -1,28 +1,62 @@
package com.amplitude.experiment.flag

import com.amplitude.experiment.LocalEvaluationMetrics
import com.amplitude.experiment.evaluation.EvaluationFlag
import com.amplitude.experiment.util.BackoffConfig
import com.amplitude.experiment.util.LocalEvaluationMetricsWrapper
import com.amplitude.experiment.util.backoff
import com.amplitude.experiment.util.get
import okhttp3.HttpUrl
import okhttp3.OkHttpClient
import java.util.concurrent.ExecutionException

internal interface FlagConfigApi {
fun getFlagConfigs(): List<EvaluationFlag>
}

internal class DirectFlagConfigApi(
internal class DynamicFlagConfigApi(
private val deploymentKey: String,
private val serverUrl: HttpUrl,
private val proxyUrl: HttpUrl?,
private val httpClient: OkHttpClient,
private val metrics: LocalEvaluationMetrics = LocalEvaluationMetricsWrapper()
) : FlagConfigApi {

private val backoffConfig = BackoffConfig(
attempts = 3,
min = 500,
max = 2000,
scalar = 2.0,
)

override fun getFlagConfigs(): List<EvaluationFlag> {
return httpClient.get<List<EvaluationFlag>>(
serverUrl = serverUrl,
path = "sdk/v2/flags",
headers = mapOf(
"Authorization" to "Api-Key $deploymentKey",
),
queries = mapOf("v" to "0")
).get()
return if (proxyUrl != null) {
try {
getFlagConfigs(proxyUrl)
} catch (e: Exception) {
metrics.onFlagConfigFetchOriginFallback(e)
getFlagConfigs(serverUrl)
}
} else {
getFlagConfigs(serverUrl)
}
}

private fun getFlagConfigs(url: HttpUrl): List<EvaluationFlag> {
val future = backoff(backoffConfig) {
httpClient.get<List<EvaluationFlag>>(
serverUrl = url,
path = "sdk/v2/flags",
headers = mapOf(
"Authorization" to "Api-Key $deploymentKey",
),
queries = mapOf("v" to "0")
)
}
try {
return future.get()
} catch(e: ExecutionException) {
throw e.cause ?: e
}
}
}
10 changes: 10 additions & 0 deletions src/main/kotlin/util/Metrics.kt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ internal class LocalEvaluationMetricsWrapper(
executor?.execute { metrics.onFlagConfigFetchFailure(exception) }
}

override fun onFlagConfigFetchOriginFallback(exception: Exception) {
val metrics = metrics ?: return
executor?.execute { metrics.onFlagConfigFetchOriginFallback(exception) }
}

override fun onCohortDownload() {
val metrics = metrics ?: return
executor?.execute { metrics.onCohortDownload() }
Expand All @@ -76,6 +81,11 @@ internal class LocalEvaluationMetricsWrapper(
executor?.execute { metrics.onCohortDownloadFailure(exception) }
}

override fun onCohortDownloadOriginFallback(exception: Exception) {
val metrics = metrics ?: return
executor?.execute { metrics.onCohortDownloadOriginFallback(exception) }
}

override fun onProxyCohortMembership() {
val metrics = metrics ?: return
executor?.execute { metrics.onProxyCohortMembership() }
Expand Down
6 changes: 5 additions & 1 deletion src/main/kotlin/util/Request.kt
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,9 @@ internal inline fun <reified T> OkHttpClient.get(
headers: Map<String, String>? = null,
queries: Map<String, String>? = null,
): CompletableFuture<T> {
return this.get<T>(serverUrl, path, headers, queries) {}
return this.get<T>(serverUrl, path, headers, queries) { response ->
if (!response.isSuccessful) {
throw HttpErrorResponseException(response.code)
}
}
}
2 changes: 2 additions & 0 deletions src/test/kotlin/LocalEvaluationClientTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.amplitude.experiment

import com.amplitude.experiment.cohort.Cohort
import com.amplitude.experiment.cohort.CohortApi
import com.amplitude.experiment.util.Logger
import com.amplitude.experiment.util.SystemLogger
import io.mockk.every
import io.mockk.mockk
import org.junit.Assert
Expand Down
3 changes: 2 additions & 1 deletion src/test/kotlin/cohort/CohortDownloadApiTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class CohortDownloadApiTest {
private val httpClient = OkHttpClient()
private val server = MockWebServer()
private val url = server.url("/")
private val api = DirectCohortApi(apiKey,secretKey, maxCohortSize, url, httpClient)
private val proxyUrl = server.url("/")
private val api = DynamicCohortApi(apiKey,secretKey, maxCohortSize, url, proxyUrl, httpClient)
@Test
fun `cohort download, success`() {
val response = cohortResponse("a", setOf("1"))
Expand Down

0 comments on commit 4ab3c0b

Please sign in to comment.