Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix flag push fallback #37

Merged
merged 7 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/main/kotlin/deployment/DeploymentRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

private const val MIN_COHORT_POLLING_INTERVAL = 60000L
private const val FLAG_POLLING_JITTER = 1000L
private const val FLAG_STREAMING_RETRY_DELAY = 15000L
private const val FLAG_RETRY_JITTER = 1000L

internal class DeploymentRunner(
private val config: LocalEvaluationConfig,
Expand Down Expand Up @@ -51,14 +52,21 @@ internal class DeploymentRunner(
FlagConfigFallbackRetryWrapper(
FlagConfigStreamer(flagConfigStreamApi, flagConfigStorage, cohortLoader, cohortStorage, metrics),
amplitudeFlagConfigPoller,
FLAG_POLLING_JITTER
FLAG_STREAMING_RETRY_DELAY,
FLAG_RETRY_JITTER,
config.flagConfigPollerIntervalMillis,
0,
)
else amplitudeFlagConfigPoller
private val flagConfigUpdater =
if (flagConfigProxyApi != null)
FlagConfigFallbackRetryWrapper(
FlagConfigPoller(flagConfigProxyApi, flagConfigStorage, cohortLoader, cohortStorage, config, metrics),
amplitudeFlagConfigPoller
amplitudeFlagConfigPoller,
config.flagConfigPollerIntervalMillis,
0,
if (flagConfigStreamApi != null) FLAG_STREAMING_RETRY_DELAY else config.flagConfigPollerIntervalMillis,
if (flagConfigStreamApi != null) FLAG_RETRY_JITTER else 0,
)
else
amplitudeFlagConfigUpdater
Expand Down
120 changes: 76 additions & 44 deletions src/main/kotlin/flag/FlagConfigUpdater.kt
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ internal class FlagConfigPoller(
) {
private val lock: ReentrantLock = ReentrantLock()
private val pool = Executors.newScheduledThreadPool(1, daemonFactory)
private var scheduledFuture: ScheduledFuture<*>? = null // @GuardedBy(lock)
private var scheduledFuture: ScheduledFuture<*>? = null

/**
* Start will fetch once, then start poller to poll flag configs.
*/
override fun start(onError: (() -> Unit)?) {
refresh()
lock.withLock {
stopInternal()
stop()
scheduledFuture = pool.scheduleWithFixedDelay(
{
try {
Expand All @@ -148,16 +148,11 @@ internal class FlagConfigPoller(
}
}

// @GuardedBy(lock)
private fun stopInternal() {
// Pause only stop the task scheduled. It doesn't stop the executor.
scheduledFuture?.cancel(true)
scheduledFuture = null
}

override fun stop() {
lock.withLock {
stopInternal()
// Pause only stop the task scheduled. It doesn't stop the executor.
scheduledFuture?.cancel(true)
scheduledFuture = null
}
}

Expand Down Expand Up @@ -227,24 +222,21 @@ internal class FlagConfigStreamer(
private const val RETRY_DELAY_MILLIS_DEFAULT = 15 * 1000L
private const val MAX_JITTER_MILLIS_DEFAULT = 2000L

/**
* This is a wrapper class around flag config updaters.
* This provides retry capability in case errors encountered during update asynchronously, as well as fallbacks when an updater failed.
*
* `mainUpdater` cannot be a FlagConfigFallbackRetryWrapper.
* The developer should restructure arguments to make sure `mainUpdater` is never a `FlagConfigFallbackRetryWrapper`.
* All retry and fallback structures can be normalized into `mainUpdater`s not being `FlagConfigFallbackRetryWrapper`s.
*/
internal class FlagConfigFallbackRetryWrapper(
private val mainUpdater: FlagConfigUpdater,
private val fallbackUpdater: FlagConfigUpdater?,
retryDelayMillis: Long = RETRY_DELAY_MILLIS_DEFAULT,
maxJitterMillis: Long = MAX_JITTER_MILLIS_DEFAULT,
fallbackStartRetryDelayMillis: Long = RETRY_DELAY_MILLIS_DEFAULT,
fallbackMaxJitterMillis: Long = MAX_JITTER_MILLIS_DEFAULT,
) : FlagConfigUpdater {
private val lock: ReentrantLock = ReentrantLock()
private val reconnIntervalRange = max(0, retryDelayMillis - maxJitterMillis)..(min(retryDelayMillis, Long.MAX_VALUE - maxJitterMillis) + maxJitterMillis)
private val executor = Executors.newScheduledThreadPool(1, daemonFactory)
private var retryTask: ScheduledFuture<*>? = null // @GuardedBy(lock)
private val fallbackReconnIntervalRange = max(0, fallbackStartRetryDelayMillis - fallbackMaxJitterMillis)..(min(fallbackStartRetryDelayMillis, Long.MAX_VALUE - fallbackMaxJitterMillis) + fallbackMaxJitterMillis)
private val executor = Executors.newScheduledThreadPool(2, daemonFactory)
private var retryTask: ScheduledFuture<*>? = null
private var fallbackRetryTask: ScheduledFuture<*>? = null
private var isRunning = false

/**
* Since the wrapper retries for mainUpdater, so there will never be error case. Thus, onError will never be called.
Expand All @@ -254,8 +246,9 @@ internal class FlagConfigFallbackRetryWrapper(
* If main start failed, fallback updater tries to start.
* If fallback start failed as well, throws exception.
* If fallback start success, start success, main enters retry loop.
* After started, if main failed, fallback is started and main enters retry loop.
* Fallback success or failures status is not monitored. It's suggested to wrap fallback into a retry wrapper.
* After started, if main failed, main enters retry loop and fallback will start.
* If fallback start failed, fallback will enter start retry loop until it's successfully started.
* If fallback start success, but failed later, it's not monitored. It's recommended to wrap fallback with FlagConfigFallbackRetryWrapper.
*/
override fun start(onError: (() -> Unit)?) {
if (mainUpdater is FlagConfigFallbackRetryWrapper) {
Expand All @@ -268,58 +261,97 @@ internal class FlagConfigFallbackRetryWrapper(
try {
mainUpdater.start {
lock.withLock {
scheduleRetry() // Don't care if poller start error or not, always retry.
try {
fallbackUpdater?.start()
} catch (_: Throwable) {
if (isRunning) {
scheduleRetry() // Don't care if poller start error or not, always retry.
fallbackStart()
}
}
}
fallbackUpdater?.stop()
fallbackStop()
} catch (t: Throwable) {
Logger.e("Primary flag configs start failed, start fallback. Error: ", t)
if (fallbackUpdater == null) {
// No fallback, main start failed is wrapper start fail
Logger.e("Main flag configs start failed, no fallback. Error: ", t)
throw t
}
Logger.w("Main flag configs start failed, starting fallback. Error: ", t)
fallbackUpdater.start()
scheduleRetry()
}
isRunning = true
}
}

override fun stop() {
lock.withLock {
mainUpdater.stop()
fallbackUpdater?.stop()
isRunning = false
retryTask?.cancel(true)
fallbackStop()
mainUpdater.stop()
}
}

override fun shutdown() {
lock.withLock {
isRunning = false
retryTask?.cancel(true)
fallbackStop()
mainUpdater.shutdown()
fallbackUpdater?.shutdown()
retryTask?.cancel(true)
}
}

// @GuardedBy(lock)
private fun scheduleRetry() {
retryTask = executor.schedule({
try {
mainUpdater.start {
scheduleRetry() // Don't care if poller start error or not, always retry stream.
try {
fallbackUpdater?.start()
} catch (_: Throwable) {
lock.withLock {
retryTask = executor.schedule(
{
lock.withLock {
if (!isRunning) {
return@schedule
}
try {
mainUpdater.start {
lock.withLock {
if (isRunning) {
scheduleRetry() // Don't care if poller start error or not, always retry.
fallbackStart()
}
}
}
fallbackStop()
} catch (_: Throwable) {
scheduleRetry()
}
}
}
fallbackUpdater?.stop()
},
reconnIntervalRange.random(),
TimeUnit.MILLISECONDS
)
}
}

private fun fallbackStart() {
lock.withLock {
try {
fallbackUpdater?.start()
} catch (_: Throwable) {
scheduleRetry()
if (isRunning) {
fallbackRetryTask = executor.schedule(
{
fallbackStart()
},
fallbackReconnIntervalRange.random(),
TimeUnit.MILLISECONDS
)
} else {}
}
}, reconnIntervalRange.random(), TimeUnit.MILLISECONDS)
}
}


private fun fallbackStop() {
lock.withLock {
fallbackUpdater?.stop()
fallbackRetryTask?.cancel(true)
}
}
}
Loading
Loading