From e5013454179f5fd0bdbb7f55a1efc16babb00bf0 Mon Sep 17 00:00:00 2001 From: sricharanvuppu <113983630+sricharanvuppu@users.noreply.github.com> Date: Wed, 28 Jun 2023 10:32:00 +0530 Subject: [PATCH 1/3] Handling OpenSearchRejectExecuteException Exception (#1004) * Handling OpenSearchRejectExecuteException Exception * introduced writersPerShard setting. Signed-off-by: sricharanvuppu (cherry picked from commit 448e7a7501e5ff8740dc2d7635c08ae62d19147e) Signed-off-by: sricharanvuppu --- .../replication/ReplicationException.kt | 14 +++- .../replication/ReplicationPlugin.kt | 16 ++-- .../replication/ReplicationSettings.kt | 2 + .../task/shard/ShardReplicationTask.kt | 3 +- .../task/shard/TranslogSequencer.kt | 76 ++++++++++++++----- .../opensearch/replication/util/Extensions.kt | 25 ++++-- .../task/shard/TranslogSequencerTests.kt | 2 +- 7 files changed, 99 insertions(+), 39 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationException.kt b/src/main/kotlin/org/opensearch/replication/ReplicationException.kt index 89d2456c..891be0a3 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationException.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationException.kt @@ -12,22 +12,28 @@ package org.opensearch.replication import org.opensearch.OpenSearchException +import org.opensearch.OpenSearchStatusException import org.opensearch.action.ShardOperationFailedException import org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE import org.opensearch.index.shard.ShardId +import org.opensearch.rest.RestStatus /** * Base class replication exceptions. Note: Replication process may throw exceptions that do not derive from this such as * [org.opensearch.ResourceAlreadyExistsException], [org.opensearch.index.IndexNotFoundException] or * [org.opensearch.index.shard.ShardNotFoundException]. */ -class ReplicationException: OpenSearchException { +class ReplicationException: OpenSearchStatusException { - constructor(message: String, vararg args: Any) : super(message, *args) + constructor(message: String, status : RestStatus, cause: Throwable, vararg args: Any) : super(message, status, cause, *args) - constructor(message: String, cause: Throwable, vararg args: Any) : super(message, cause, *args) + constructor(message: String, vararg args: Any) : super(message, RestStatus.INTERNAL_SERVER_ERROR, *args) - constructor(message: String, shardFailures: Array) : super(message) { + constructor(message: String, status: RestStatus, vararg args: Any) : super(message, status, *args) + + constructor(cause: Throwable, status: RestStatus, vararg args: Any) : super(cause.message, status, *args) + + constructor(message: String, shardFailures: Array): super(message, shardFailures.firstOrNull()?.status()?:RestStatus.INTERNAL_SERVER_ERROR) { shardFailures.firstOrNull()?.let { setShard(ShardId(it.index(), INDEX_UUID_NA_VALUE, it.shardId())) // Add first failure as cause and rest as suppressed... diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index 952c462a..a61bb2bc 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -171,6 +171,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, Setting.Property.Dynamic, Setting.Property.NodeScope) val REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD = Setting.intSetting("plugins.replication.follower.concurrent_readers_per_shard", 2, 1, Setting.Property.Dynamic, Setting.Property.NodeScope) + val REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD = Setting.intSetting("plugins.replication.follower.concurrent_writers_per_shard", 2, 1, + Setting.Property.Dynamic, Setting.Property.NodeScope) val REPLICATION_PARALLEL_READ_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.follower.poll_interval", TimeValue.timeValueMillis(50), TimeValue.timeValueMillis(1), TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope) val REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.autofollow.fetch_poll_interval", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30), @@ -342,14 +344,14 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, override fun getSettings(): List> { return listOf(REPLICATED_INDEX_SETTING, REPLICATION_FOLLOWER_OPS_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE, - REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD, - REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS, - REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL, - REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL, - REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING, - REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE) + REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD, + REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS, + REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL, + REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL, + REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING, + REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE, + REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD) } - override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry, clusterService: ClusterService, recoverySettings: RecoverySettings): Map { val repoFactory = Repository.Factory { repoMetadata: RepositoryMetadata -> diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt b/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt index a6d1bbd3..2b516f8e 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt @@ -24,6 +24,7 @@ open class ReplicationSettings(clusterService: ClusterService) { @Volatile var chunkSize = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE.get(clusterService.settings) @Volatile var concurrentFileChunks = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS.get(clusterService.settings) @Volatile var readersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD) + @Volatile var writersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD) @Volatile var batchSize = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE) @Volatile var pollDuration: TimeValue = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL) @Volatile var autofollowFetchPollDuration = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL) @@ -41,6 +42,7 @@ open class ReplicationSettings(clusterService: ClusterService) { clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE) { value: ByteSizeValue -> this.chunkSize = value} clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS) { value: Int -> this.concurrentFileChunks = value} clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD) { value: Int -> this.readersPerShard = value} + clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD) { value: Int -> this.writersPerShard = value} clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE) { batchSize = it } clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL) { pollDuration = it } clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION) { leaseRenewalMaxFailureDuration = it } diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt index 24128f8f..77792279 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt @@ -214,7 +214,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: // Since this setting is not dynamic, setting update would only reflect after pause-resume or on a new replication job. val rateLimiter = Semaphore(replicationSettings.readersPerShard) val sequencer = TranslogSequencer(scope, replicationMetadata, followerShardId, leaderAlias, leaderShardId.indexName, - TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats) + TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats, replicationSettings.writersPerShard) val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings) coroutineScope { @@ -250,7 +250,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: followerClusterStats.stats[followerShardId]!!.opsReadFailures.addAndGet(1) logInfo("Unable to get changes from seqNo: $fromSeqNo. ${e.stackTraceToString()}") changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1) - // Propagate 4xx exceptions up the chain and halt replication as they are irrecoverable val range4xx = 400.rangeTo(499) if (e is OpenSearchException && diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt index be5fe89c..92e69bd3 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt @@ -21,17 +21,22 @@ import org.opensearch.replication.util.suspendExecuteWithRetries import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ObsoleteCoroutinesApi -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.actor import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore import org.opensearch.client.Client +import org.opensearch.OpenSearchException +import org.opensearch.action.support.TransportActions import org.opensearch.common.logging.Loggers +import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog import org.opensearch.tasks.TaskId import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit +import org.opensearch.rest.RestStatus + /** * A TranslogSequencer allows multiple producers of [Translog.Operation]s to write them in sequence number order to an @@ -49,17 +54,19 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: private val followerShardId: ShardId, private val leaderAlias: String, private val leaderIndexName: String, private val parentTaskId: TaskId, private val client: Client, initialSeqNo: Long, - private val followerClusterStats: FollowerClusterStats) { + private val followerClusterStats: FollowerClusterStats, writersPerShard : Int) { private val unAppliedChanges = ConcurrentHashMap() private val log = Loggers.getLogger(javaClass, followerShardId)!! private val completed = CompletableDeferred() - private val sequencer = scope.actor(capacity = Channel.UNLIMITED) { + private val sequencer = scope.actor(capacity = 0) { // Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will // raise the same exception. See [SendChannel.close] method for details. + val rateLimiter = Semaphore(writersPerShard) var highWatermark = initialSeqNo for (m in channel) { + rateLimiter.acquire() while (unAppliedChanges.containsKey(highWatermark + 1)) { val next = unAppliedChanges.remove(highWatermark + 1)!! val replayRequest = ReplayChangesRequest(followerShardId, next.changes, next.maxSeqNoOfUpdatesOrDeletes, @@ -69,25 +76,55 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: var relativeStartNanos = System.nanoTime() val retryOnExceptions = ArrayList>() retryOnExceptions.add(MappingNotAvailableException::class.java) + var tryReplay = true + try { + while (tryReplay) { + tryReplay = false + try { + val replayResponse = client.suspendExecuteWithRetries( + replicationMetadata, + ReplayChangesAction.INSTANCE, + replayRequest, + log = log, + retryOn = retryOnExceptions + ) + if (replayResponse.shardInfo.failed > 0) { + replayResponse.shardInfo.failures.forEachIndexed { i, failure -> + log.error("Failed replaying changes. Failure:$i:$failure}") + } + followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet( + replayResponse.shardInfo.failed.toLong() + ) + throw ReplicationException( + "failed to replay changes", + replayResponse.shardInfo.failures + ) + } - val replayResponse = client.suspendExecuteWithRetries( - replicationMetadata, - ReplayChangesAction.INSTANCE, - replayRequest, - log = log, - retryOn = retryOnExceptions - ) - if (replayResponse.shardInfo.failed > 0) { - replayResponse.shardInfo.failures.forEachIndexed { i, failure -> - log.error("Failed replaying changes. Failure:$i:$failure}") + val tookInNanos = System.nanoTime() - relativeStartNanos + followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet( + TimeUnit.NANOSECONDS.toMillis(tookInNanos) + ) + followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet( + replayRequest.changes.size.toLong() + ) + } catch (e: OpenSearchException) { + if (e !is IndexNotFoundException && (retryOnExceptions.contains(e.javaClass) + || TransportActions.isShardNotAvailableException(e) + // This waits for the dependencies to load and retry. Helps during boot-up + || e.status().status >= 500 + || e.status() == RestStatus.TOO_MANY_REQUESTS)) { + tryReplay = true + } + else { + log.error("Got non-retriable Exception:${e.message} with status:${e.status()}") + throw e + } + } } - followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet(replayResponse.shardInfo.failed.toLong()) - throw ReplicationException("failed to replay changes", replayResponse.shardInfo.failures) + } finally { + rateLimiter.release() } - - val tookInNanos = System.nanoTime() - relativeStartNanos - followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos)) - followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong()) } highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark } @@ -100,6 +137,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: completed.await() } + suspend fun send(changes : GetChangesResponse) { unAppliedChanges[changes.fromSeqNo] = changes sequencer.send(Unit) diff --git a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt index 96749f7b..643cc010 100644 --- a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt +++ b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt @@ -29,6 +29,7 @@ import org.opensearch.action.index.IndexResponse import org.opensearch.action.support.TransportActions import org.opensearch.client.Client import org.opensearch.common.util.concurrent.ThreadContext +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId import org.opensearch.index.store.Store @@ -43,6 +44,7 @@ import org.opensearch.transport.NodeDisconnectedException import org.opensearch.transport.NodeNotConnectedException import java.io.PrintWriter import java.io.StringWriter +import java.lang.Exception /* * Extension function to use the store object @@ -110,7 +112,8 @@ suspend fun Client.suspendExecuteWith defaultContext: Boolean = false): Resp { var currentBackoff = backoff retryOn.addAll(defaultRetryableExceptions()) - repeat(numberOfRetries - 1) { + var retryException: Exception + repeat(numberOfRetries - 1) { index -> try { return suspendExecute(replicationMetadata, action, req, injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) @@ -122,19 +125,29 @@ suspend fun Client.suspendExecuteWith // This waits for the dependencies to load and retry. Helps during boot-up || e.status().status >= 500 || e.status() == RestStatus.TOO_MANY_REQUESTS)) { - log.warn("Encountered a failure while executing in $req. Retrying in ${currentBackoff/1000} seconds" + - ".", e) - delay(currentBackoff) - currentBackoff = (currentBackoff * factor).toLong().coerceAtMost(maxTimeOut) + retryException = e; } else { throw e } + } catch (e: OpenSearchRejectedExecutionException) { + if(index < numberOfRetries-2) { + retryException = e; + } + else { + throw ReplicationException(e, RestStatus.TOO_MANY_REQUESTS) + } } + log.warn( + "Encountered a failure while executing in $req. Retrying in ${currentBackoff / 1000} seconds" + + ".", retryException + ) + delay(currentBackoff) + currentBackoff = (currentBackoff * factor).toLong().coerceAtMost(maxTimeOut) + } return suspendExecute(replicationMetadata, action, req, injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) // last attempt } - /** * Restore shard from leader cluster with retries. * Only specified error are retried diff --git a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt index ed5afb06..b1ffc495 100644 --- a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt @@ -84,7 +84,7 @@ class TranslogSequencerTests : OpenSearchTestCase() { stats.stats[followerShardId] = FollowerShardMetric() val startSeqNo = randomNonNegativeLong() val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID, - client, startSeqNo, stats) + client, startSeqNo, stats, 2) // Send requests out of order (shuffled seqNo) and await for them to be processed. var batchSeqNo = startSeqNo From 9ca0978bd8fa38797eff855ab33dd6d8d0340ac4 Mon Sep 17 00:00:00 2001 From: sricharanvuppu Date: Wed, 28 Jun 2023 14:49:00 +0530 Subject: [PATCH 2/3] correcting OpenSearchRejectedExecutionException import Signed-off-by: sricharanvuppu --- src/main/kotlin/org/opensearch/replication/util/Extensions.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt index 643cc010..56f35fb9 100644 --- a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt +++ b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt @@ -28,8 +28,8 @@ import org.opensearch.action.index.IndexRequestBuilder import org.opensearch.action.index.IndexResponse import org.opensearch.action.support.TransportActions import org.opensearch.client.Client +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException import org.opensearch.common.util.concurrent.ThreadContext -import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId import org.opensearch.index.store.Store From 411c0e0f46200175eeebd9f7a905828a9bce9df6 Mon Sep 17 00:00:00 2001 From: sricharanvuppu Date: Tue, 26 Sep 2023 18:14:00 +0530 Subject: [PATCH 3/3] adding missing checkpoint and correcting follower stats test case Signed-off-by: sricharanvuppu --- .../task/shard/TranslogSequencer.kt | 18 ++++++---- .../integ/rest/StartReplicationIT.kt | 35 +++++++++---------- .../task/shard/TranslogSequencerTests.kt | 16 +++++++-- 3 files changed, 42 insertions(+), 27 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt index 92e69bd3..97127291 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt @@ -31,6 +31,7 @@ import org.opensearch.common.logging.Loggers import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog +import org.opensearch.replication.util.indicesService import org.opensearch.tasks.TaskId import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap @@ -60,18 +61,22 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: private val log = Loggers.getLogger(javaClass, followerShardId)!! private val completed = CompletableDeferred() + val followerIndexService = indicesService.indexServiceSafe(followerShardId.index) + val indexShard = followerIndexService.getShard(followerShardId.id) + private val sequencer = scope.actor(capacity = 0) { + // Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will // raise the same exception. See [SendChannel.close] method for details. val rateLimiter = Semaphore(writersPerShard) var highWatermark = initialSeqNo for (m in channel) { - rateLimiter.acquire() while (unAppliedChanges.containsKey(highWatermark + 1)) { val next = unAppliedChanges.remove(highWatermark + 1)!! val replayRequest = ReplayChangesRequest(followerShardId, next.changes, next.maxSeqNoOfUpdatesOrDeletes, - leaderAlias, leaderIndexName) + leaderAlias, leaderIndexName) replayRequest.parentTask = parentTaskId + rateLimiter.acquire() launch { var relativeStartNanos = System.nanoTime() val retryOnExceptions = ArrayList>() @@ -108,15 +113,16 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet( replayRequest.changes.size.toLong() ) + followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint } catch (e: OpenSearchException) { if (e !is IndexNotFoundException && (retryOnExceptions.contains(e.javaClass) || TransportActions.isShardNotAvailableException(e) // This waits for the dependencies to load and retry. Helps during boot-up || e.status().status >= 500 || e.status() == RestStatus.TOO_MANY_REQUESTS)) { - tryReplay = true - } - else { + tryReplay = true + } + else { log.error("Got non-retriable Exception:${e.message} with status:${e.status()}") throw e } @@ -142,4 +148,4 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: unAppliedChanges[changes.fromSeqNo] = changes sequencer.send(Unit) } -} +} \ No newline at end of file diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index 01f280e0..bb0048f4 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -865,36 +865,31 @@ class StartReplicationIT: MultiClusterRestTestCase() { }, 60L, TimeUnit.SECONDS) } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/cross-cluster-replication/issues/176") fun `test follower stats`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - val leaderIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader" val followerIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower" - val leaderIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader" val followerIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower" -// val followerIndex2 = "follower_index_2" -// val followerIndex3 = "follower_index_3" createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create( - CreateIndexRequest(leaderIndexName), - RequestOptions.DEFAULT + CreateIndexRequest(leaderIndexName), + RequestOptions.DEFAULT ) assertThat(createIndexResponse.isAcknowledged).isTrue() followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName, followerIndexName), - TimeValue.timeValueSeconds(10), - true + StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + true ) followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName2, followerIndexName2), - TimeValue.timeValueSeconds(10), - true + StartReplicationRequest("source", leaderIndexName, followerIndexName2), + TimeValue.timeValueSeconds(10), + true ) followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName3, followerIndexName3), - TimeValue.timeValueSeconds(10), - true + StartReplicationRequest("source", leaderIndexName, followerIndexName3), + TimeValue.timeValueSeconds(10), + true ) val docCount = 50 for (i in 1..docCount) { @@ -902,12 +897,16 @@ class StartReplicationIT: MultiClusterRestTestCase() { leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) } followerClient.pauseReplication(followerIndexName2) - val stats = followerClient.followerStats() + followerClient.stopReplication(followerIndexName3) + var stats = followerClient.followerStats() assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1") assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1") assertThat(stats.getValue("num_failed_indices").toString()).isEqualTo("0") assertThat(stats.getValue("num_shard_tasks").toString()).isEqualTo("1") - assertThat(stats.getValue("operations_written").toString()).isEqualTo("50") + assertBusy({ + stats = followerClient.followerStats() + assertThat(stats.getValue("operations_written").toString()).isEqualTo("50") + }, 60, TimeUnit.SECONDS) assertThat(stats.getValue("operations_read").toString()).isEqualTo("50") assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0") assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0") diff --git a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt index b1ffc495..623a6396 100644 --- a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt @@ -23,14 +23,19 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ObsoleteCoroutinesApi import kotlinx.coroutines.test.runBlockingTest import org.assertj.core.api.Assertions.assertThat +import org.mockito.Mockito import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest import org.opensearch.action.ActionResponse import org.opensearch.action.ActionType import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo import org.opensearch.common.settings.Settings +import org.opensearch.index.IndexService +import org.opensearch.index.shard.IndexShard import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog +import org.opensearch.indices.IndicesService +import org.opensearch.replication.util.indicesService import org.opensearch.tasks.TaskId.EMPTY_TASK_ID import org.opensearch.test.OpenSearchTestCase import org.opensearch.test.OpenSearchTestCase.randomList @@ -67,7 +72,7 @@ class TranslogSequencerTests : OpenSearchTestCase() { val leaderIndex = "leaderIndex" val followerShardId = ShardId("follower", "follower_uuid", 0) val replicationMetadata = ReplicationMetadata(leaderAlias, ReplicationStoreMetadataType.INDEX.name, ReplicationOverallState.RUNNING.name, "test user", - ReplicationContext(followerShardId.indexName, null), ReplicationContext(leaderIndex, null), Settings.EMPTY) + ReplicationContext(followerShardId.indexName, null), ReplicationContext(leaderIndex, null), Settings.EMPTY) val client = RequestCapturingClient() init { closeAfterSuite(client) @@ -83,8 +88,13 @@ class TranslogSequencerTests : OpenSearchTestCase() { val stats = FollowerClusterStats() stats.stats[followerShardId] = FollowerShardMetric() val startSeqNo = randomNonNegativeLong() + indicesService = Mockito.mock(IndicesService::class.java) + val followerIndexService = Mockito.mock(IndexService::class.java) + val indexShard = Mockito.mock(IndexShard::class.java) + Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService) + Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard) val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID, - client, startSeqNo, stats, 2) + client, startSeqNo, stats, 2) // Send requests out of order (shuffled seqNo) and await for them to be processed. var batchSeqNo = startSeqNo @@ -110,7 +120,7 @@ class TranslogSequencerTests : OpenSearchTestCase() { val changes = randomList(1, randomIntBetween(1, 512)) { seqNo = seqNo.inc() Translog.Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), seqNo, - 1L, "{}".toByteArray(Charsets.UTF_8)) + 1L, "{}".toByteArray(Charsets.UTF_8)) } return Pair(GetChangesResponse(changes, startSeqNo.inc(), startSeqNo, -1), seqNo) }