Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CCR compatibility with remote translogs #1271

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,14 @@
import java.util.function.Supplier

import org.opensearch.index.engine.NRTReplicationEngine
import org.opensearch.replication.util.ValidationUtil


@OpenForTesting
internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, RepositoryPlugin, EnginePlugin {

private lateinit var client: Client
private lateinit var clusterService: ClusterService
private lateinit var threadPool: ThreadPool
private lateinit var replicationMetadataManager: ReplicationMetadataManager
private lateinit var replicationSettings: ReplicationSettings
Expand Down Expand Up @@ -207,6 +209,7 @@
repositoriesService: Supplier<RepositoriesService>): Collection<Any> {
this.client = client
this.threadPool = threadPool
this.clusterService = clusterService
this.replicationMetadataManager = ReplicationMetadataManager(clusterService, client,
ReplicationMetadataStore(client, clusterService, xContentRegistry))
this.replicationSettings = ReplicationSettings(clusterService)
Expand Down Expand Up @@ -379,9 +382,15 @@
}

override fun getCustomTranslogDeletionPolicyFactory(): Optional<TranslogDeletionPolicyFactory> {
return Optional.of(TranslogDeletionPolicyFactory{
indexSettings, retentionLeasesSupplier -> ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier)
})
// We don't need a retention lease translog deletion policy for remote store enabled clusters as
// we fetch the operations directly from lucene in such cases.
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) {
Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier ->
ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier)
})
} else {
Optional.empty()

Check warning on line 392 in src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt#L391-L392

Added lines #L391 - L392 were not covered by tests
}
}

override fun onIndexModule(indexModule: IndexModule) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.common.unit.TimeValue
import org.opensearch.core.index.shard.ShardId
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_EXECUTOR_NAME_LEADER
import org.opensearch.replication.seqno.RemoteClusterStats
import org.opensearch.replication.seqno.RemoteClusterTranslogService
import org.opensearch.replication.seqno.RemoteShardMetric
import org.opensearch.replication.util.completeWith
import org.opensearch.replication.util.coroutineContext
import org.opensearch.replication.util.stackTraceToString
import org.opensearch.replication.util.waitForGlobalCheckpoint
import org.opensearch.replication.util.*
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportActionProxy
import org.opensearch.transport.TransportService
Expand Down Expand Up @@ -79,7 +77,8 @@
indexMetric.lastFetchTime.set(relativeStartNanos)

val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) {
val isRemoteStoreEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService)
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
// the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
// should catch and start a new poll.
Expand All @@ -88,18 +87,18 @@
// At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced
// to the translog, which means we can't return those changes. Return to the caller to retry.
// TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog
if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) {
assert(gcp > indexShard.lastSyncedGlobalCheckpoint) { "Checkpoint didn't advance at all" }
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)}" }
throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...")
}
}

relativeStartNanos = System.nanoTime()
// At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
val toSeqNo = min(indexShard.lastSyncedGlobalCheckpoint, request.toSeqNo)
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled), request.toSeqNo)

var ops: List<Translog.Operation> = listOf()
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId)
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreEnabled == false
ankitkala marked this conversation as resolved.
Show resolved Hide resolved
if(fetchFromTranslog) {
try {
ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo)
Expand Down Expand Up @@ -137,12 +136,22 @@
indexMetric.ops.addAndGet(ops.size.toLong())

ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) }

GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, indexShard.lastSyncedGlobalCheckpoint)
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled))
}
}
}

private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreEnabled: Boolean): Long {
// We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store
// enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to
// lastKnownGlobalCheckpoint in such cases.
return if (isRemoteStoreEnabled) {
indexShard.lastKnownGlobalCheckpoint

Check warning on line 149 in src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt#L149

Added line #L149 was not covered by tests
} else {
indexShard.lastSyncedGlobalCheckpoint
}
}


private fun isTranslogPruningByRetentionLeaseEnabled(shardId: ShardId): Boolean {
val enabled = clusterService.state().metadata.indices.get(shardId.indexName)
Expand All @@ -162,7 +171,9 @@
}

override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator {
val shardIt = state.routingTable().shardRoutingTable(request.request().shardId)
// Random active shards
return state.routingTable().shardRoutingTable(request.request().shardId).activeInitializingShardsRandomIt()
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService)) shardIt.primaryShardIt()
else shardIt.activeInitializingShardsRandomIt()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata

// Remove translog pruning for the follower index
builder.remove(REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING.key)
builder.remove(IndexMetadata.SETTING_REMOTE_STORE_ENABLED)
builder.remove(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY)
builder.remove(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY)

val indexMdBuilder = IndexMetadata.builder(indexMetadata).settings(builder)
indexMetadata.aliases.values.forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import org.opensearch.env.Environment
import org.opensearch.index.IndexNotFoundException
import java.io.UnsupportedEncodingException
import org.opensearch.cluster.service.ClusterService
import org.opensearch.node.Node
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_PLUGIN_PRESENT_SETTING
import org.opensearch.replication.action.changes.TransportGetChangesAction
ankitkala marked this conversation as resolved.
Show resolved Hide resolved
import java.nio.file.Files
import java.nio.file.Path
import java.util.Locale
Expand Down Expand Up @@ -154,4 +157,8 @@ object ValidationUtil {

}

fun isRemoteStoreEnabledCluster(clusterService: ClusterService): Boolean {
return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false
}

}
Loading