Skip to content

Commit

Permalink
Merge branch '2.x' into backport-2.x-16296
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Oct 14, 2024
2 parents ee29108 + 9950cba commit 62596bd
Show file tree
Hide file tree
Showing 27 changed files with 581 additions and 493 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove Identity FeatureFlag ([#16024](https://github.com/opensearch-project/OpenSearch/pull/16024))
- Ensure RestHandler.Wrapper delegates all implementations to the wrapped handler ([#16154](https://github.com/opensearch-project/OpenSearch/pull/16154))
- Optimise clone operation for incremental full cluster snapshots ([#16296](https://github.com/opensearch-project/OpenSearch/pull/16296))
- Code cleanup: Remove ApproximateIndexOrDocValuesQuery ([#16273](https://github.com/opensearch-project/OpenSearch/pull/16273))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
package org.opensearch.snapshots;

import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.remotestore.RemoteSnapshotIT;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -479,7 +481,6 @@ public void testCloneSnapshotV2MasterSwitch() throws Exception {
assertThat(snapInfo, containsInAnyOrder(csr.getSnapshotInfo(), csr2.getSnapshotInfo()));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/16191")
public void testDeleteWhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
Expand Down Expand Up @@ -538,7 +539,7 @@ public void testDeleteWhileV2CreateOngoing() throws Exception {
assertThat(snapInfo, contains(csr.getSnapshotInfo()));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/16191")
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/16205")
public void testDeleteAndCloneV1WhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
Expand Down Expand Up @@ -569,6 +570,7 @@ public void testDeleteAndCloneV1WhileV2CreateOngoing() throws Exception {
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);
ensureGreen();

startFullSnapshot(repoName, "snapshot-v1").actionGet();
startFullSnapshot(repoName, "snapshot-v1-2").actionGet();
Expand Down Expand Up @@ -597,6 +599,148 @@ public void testDeleteAndCloneV1WhileV2CreateOngoing() throws Exception {
assertTrue(startCloneSnapshot.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(3, snapInfo.size());

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(repoName, "snapshot-v1-2-clone")
.setWaitForCompletion(true)
.setIndices(indexName1)
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.get();
assertEquals(restoreSnapshotResponse.status(), RestStatus.OK);
ensureGreen();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/16205")
public void testDeleteAndCloneV1WhileCreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String repoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false);
createRepository(repoName, "mock", settings);

Client client = client();
Settings indexSettings = getIndexSettings(10, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

startFullSnapshot(repoName, "snapshot-v1").actionGet();
startFullSnapshot(repoName, "snapshot-v1-2").actionGet();

blockClusterManagerOnWriteIndexFile(repoName);

final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-v2");
awaitNumberOfSnapshotsInProgress(1);

ActionFuture<AcknowledgedResponse> startDeleteSnapshot = startDeleteSnapshot(repoName, "snapshot-v1");
ActionFuture<AcknowledgedResponse> startCloneSnapshot = startCloneSnapshot(repoName, "snapshot-v1-2", "snapshot-v1-2-clone");

unblockNode(repoName, clusterManagerName);
snapshotFuture.actionGet();
assertTrue(startDeleteSnapshot.actionGet().isAcknowledged());
assertTrue(startCloneSnapshot.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(3, snapInfo.size());

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(repoName, "snapshot-v1-2-clone")
.setWaitForCompletion(true)
.setIndices(indexName1)
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.get();
assertEquals(restoreSnapshotResponse.status(), RestStatus.OK);
ensureGreen();
}

public void testCloneV1WhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String repoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false);
createRepository(repoName, "mock", settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

startFullSnapshot(repoName, "snapshot-v1").actionGet();

// Creating a v2 repo
settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
createRepository(repoName, "mock", settings);

blockClusterManagerOnWriteIndexFile(repoName);

final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-v2");
awaitNumberOfSnapshotsInProgress(1);

ActionFuture<AcknowledgedResponse> startCloneSnapshot = startCloneSnapshot(repoName, "snapshot-v1", "snapshot-v1-2-clone");

unblockNode(repoName, clusterManagerName);
CreateSnapshotResponse csr = snapshotFuture.actionGet();
assertTrue(csr.getSnapshotInfo().getPinnedTimestamp() != 0);
assertTrue(startCloneSnapshot.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(3, snapInfo.size());

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(repoName, "snapshot-v1-2-clone")
.setWaitForCompletion(true)
.setIndices(indexName1)
.setRenamePattern(indexName1)
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.get();
assertEquals(restoreSnapshotResponse.status(), RestStatus.OK);
ensureGreen();
}

protected ActionFuture<AcknowledgedResponse> startCloneSnapshot(String repoName, String sourceSnapshotName, String snapshotName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return allocation.decision(
Decision.YES,
NAME,
getDecisionDetails(true, shardRouting, targetNode, " for strict compatibility mode")
getDecisionDetails(true, shardRouting, targetNode, " for strict compatibility mode", allocation)
);
}

Expand All @@ -103,19 +103,23 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return allocation.decision(
isNoDecision ? Decision.NO : Decision.YES,
NAME,
getDecisionDetails(!isNoDecision, shardRouting, targetNode, reason)
getDecisionDetails(!isNoDecision, shardRouting, targetNode, reason, allocation)
);
} else if (migrationDirection.equals(Direction.DOCREP)) {
// docrep migration direction is currently not supported
return allocation.decision(Decision.YES, NAME, getDecisionDetails(true, shardRouting, targetNode, " for DOCREP direction"));
return allocation.decision(
Decision.YES,
NAME,
getDecisionDetails(true, shardRouting, targetNode, " for DOCREP direction", allocation)
);
} else {
// check for remote store backed indices
if (remoteSettingsBackedIndex && targetNode.isRemoteStoreNode() == false) {
// allocations and relocations must be to a remote node
String reason = new StringBuilder(" because a remote store backed index's shard copy can only be ").append(
(shardRouting.assignedToNode() == false) ? "allocated" : "relocated"
).append(" to a remote node").toString();
return allocation.decision(Decision.NO, NAME, getDecisionDetails(false, shardRouting, targetNode, reason));
return allocation.decision(Decision.NO, NAME, getDecisionDetails(false, shardRouting, targetNode, reason, allocation));
}

if (shardRouting.primary()) {
Expand All @@ -128,9 +132,9 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
// handle scenarios for allocation of a new shard's primary copy
private Decision primaryShardDecision(ShardRouting primaryShardRouting, DiscoveryNode targetNode, RoutingAllocation allocation) {
if (targetNode.isRemoteStoreNode() == false) {
return allocation.decision(Decision.NO, NAME, getDecisionDetails(false, primaryShardRouting, targetNode, ""));
return allocation.decision(Decision.NO, NAME, getDecisionDetails(false, primaryShardRouting, targetNode, "", allocation));
}
return allocation.decision(Decision.YES, NAME, getDecisionDetails(true, primaryShardRouting, targetNode, ""));
return allocation.decision(Decision.YES, NAME, getDecisionDetails(true, primaryShardRouting, targetNode, "", allocation));
}

// Checks if primary shard is on a remote node.
Expand All @@ -150,20 +154,41 @@ private Decision replicaShardDecision(ShardRouting replicaShardRouting, Discover
return allocation.decision(
Decision.NO,
NAME,
getDecisionDetails(false, replicaShardRouting, targetNode, " since primary shard copy is not yet migrated to remote")
getDecisionDetails(
false,
replicaShardRouting,
targetNode,
" since primary shard copy is not yet migrated to remote",
allocation
)
);
}
return allocation.decision(
Decision.YES,
NAME,
getDecisionDetails(true, replicaShardRouting, targetNode, " since primary shard copy has been migrated to remote")
getDecisionDetails(
true,
replicaShardRouting,
targetNode,
" since primary shard copy has been migrated to remote",
allocation
)
);
}
return allocation.decision(Decision.YES, NAME, getDecisionDetails(true, replicaShardRouting, targetNode, ""));
return allocation.decision(Decision.YES, NAME, getDecisionDetails(true, replicaShardRouting, targetNode, "", allocation));
}

// get detailed reason for the decision
private String getDecisionDetails(boolean isYes, ShardRouting shardRouting, DiscoveryNode targetNode, String reason) {
private String getDecisionDetails(
boolean isYes,
ShardRouting shardRouting,
DiscoveryNode targetNode,
String reason,
RoutingAllocation allocation
) {
if (allocation.debugDecision() == false) {
return null;
}
return new StringBuilder("[").append(migrationDirection.direction)
.append(" migration_direction]: ")
.append(shardRouting.primary() ? "primary" : "replica")
Expand Down
Loading

0 comments on commit 62596bd

Please sign in to comment.