Skip to content

Commit

Permalink
Fix stale index deletion in snapshots for hashed prefix path type (#1…
Browse files Browse the repository at this point in the history
…6617)

Signed-off-by: Ashish Singh <[email protected]>
(cherry picked from commit 4cce608)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Nov 14, 2024
1 parent 77231fb commit e78439a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,37 @@ public class DeleteSnapshotIT extends AbstractSnapshotIntegTestCase {

private static final String REMOTE_REPO_NAME = "remote-store-repo-name";

public void testStaleIndexDeletion() throws Exception {
String indexName1 = ".testindex1";
String repoName = "test-restore-snapshot-repo";
String snapshotName1 = "test-restore-snapshot1";
Path absolutePath = randomRepoPath().toAbsolutePath();
logger.info("Path [{}]", absolutePath);

Client client = client();
// Write a document
String docId = Integer.toString(randomInt());
index(indexName1, "_doc", docId, "value", "expected");
createRepository(repoName, "fs", absolutePath);

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(repoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1)
.get();
assertTrue(createSnapshotResponse.getSnapshotInfo().successfulShards() > 0);
assertEquals(createSnapshotResponse.getSnapshotInfo().totalShards(), createSnapshotResponse.getSnapshotInfo().successfulShards());
assertEquals(SnapshotState.SUCCESS, createSnapshotResponse.getSnapshotInfo().state());

assertAcked(startDeleteSnapshot(repoName, snapshotName1).get());
assertBusy(() -> assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(absolutePath.resolve(BlobStoreRepository.INDICES_DIR))));
assertBusy(() -> assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(absolutePath.resolve(SnapshotShardPaths.DIR))));
// At the end there are 2 files that exists - index-N and index.latest
assertBusy(() -> assertEquals(2, RemoteStoreBaseIntegTestCase.getFileCount(absolutePath)));
}

public void testDeleteSnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2457,11 +2457,23 @@ private List<String> findMatchingShardPaths(String indexId, Map<String, BlobMeta
* @return An Optional containing the shard path with the highest generation number, or empty if the list is empty
*/
private Optional<String> findHighestGenerationShardPaths(List<String> matchingShardPaths) {
return matchingShardPaths.stream()
.map(s -> s.split("\\" + SnapshotShardPaths.DELIMITER))
.sorted((a, b) -> Integer.parseInt(b[2]) - Integer.parseInt(a[2]))
.map(parts -> String.join(SnapshotShardPaths.DELIMITER, parts))
.findFirst();
if (matchingShardPaths.isEmpty()) {
return Optional.empty();
}

int maxGen = Integer.MIN_VALUE;
String maxGenShardPath = null;

for (String shardPath : matchingShardPaths) {
String[] parts = shardPath.split("\\" + SnapshotShardPaths.DELIMITER);
int shardCount = Integer.parseInt(parts[parts.length - 3]);
if (shardCount > maxGen) {
maxGen = shardCount;
maxGenShardPath = shardPath;
}
}
assert maxGenShardPath != null : "Valid maxGenShardPath should be present";
return Optional.of(maxGenShardPath);
}

/**
Expand Down Expand Up @@ -2724,22 +2736,28 @@ public void finalizeSnapshot(
* on account of new indexes by same index name being snapshotted that exists already in the repository's snapshots.
*/
private void cleanupRedundantSnapshotShardPaths(Set<String> updatedShardPathsIndexIds) {
Set<String> updatedIndexIds = updatedShardPathsIndexIds.stream()
.map(s -> getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]))
.collect(Collectors.toSet());
Set<String> indexIdShardPaths = getSnapshotShardPaths().keySet();
List<String> staleShardPaths = indexIdShardPaths.stream().filter(s -> updatedShardPathsIndexIds.contains(s) == false).filter(s -> {
String indexId = getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]);
return updatedIndexIds.contains(indexId);
}).collect(Collectors.toList());
try {
Set<String> updatedIndexIds = updatedShardPathsIndexIds.stream()
.map(s -> getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]))
.collect(Collectors.toSet());
logger.debug(new ParameterizedMessage("updatedIndexIds={}", updatedIndexIds));
Set<String> indexIdShardPaths = getSnapshotShardPaths().keySet();
logger.debug(new ParameterizedMessage("indexIdShardPaths={}", indexIdShardPaths));
List<String> staleShardPaths = indexIdShardPaths.stream()
.filter(s -> updatedShardPathsIndexIds.contains(s) == false)
.filter(s -> {
String indexId = getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]);
return updatedIndexIds.contains(indexId);
})
.collect(Collectors.toList());
logger.debug(new ParameterizedMessage("staleShardPaths={}", staleShardPaths));
deleteFromContainer(snapshotShardPathBlobContainer(), staleShardPaths);
} catch (IOException e) {
} catch (Exception e) {

Check warning on line 2755 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L2755

Added line #L2755 was not covered by tests
logger.warn(
new ParameterizedMessage(
"Repository [{}] Exception during snapshot stale index deletion {}",
"Repository [{}] Exception during snapshot stale index deletion for updatedIndexIds {}",
metadata.name(),
staleShardPaths
updatedShardPathsIndexIds
),
e
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,25 @@ public static SnapshotShardPaths fromXContent(XContentParser ignored) {
* Parses a shard path string and extracts relevant shard information.
*
* @param shardPath The shard path string to parse. Expected format is:
* [index_id]#[index_name]#[shard_count]#[path_type_code]#[path_hash_algorithm_code]
* snapshot_path_[index_id].[index_name].[shard_count].[path_type_code].[path_hash_algorithm_code]
* @return A {@link ShardInfo} object containing the parsed index ID and shard count.
* @throws IllegalArgumentException if the shard path format is invalid or cannot be parsed.
*/
public static ShardInfo parseShardPath(String shardPath) {
String[] parts = shardPath.split("\\" + SnapshotShardPaths.DELIMITER);
if (parts.length != 5) {
int len = parts.length;
if (len < 5) {
throw new IllegalArgumentException("Invalid shard path format: " + shardPath);
}
try {
IndexId indexId = new IndexId(parts[1], getIndexId(parts[0]), Integer.parseInt(parts[3]));
int shardCount = Integer.parseInt(parts[2]);
String indexName = shardPath.substring(
// First separator after index id
shardPath.indexOf(DELIMITER) + 1,
// Since we know there are exactly 3 fields at the end
shardPath.lastIndexOf(DELIMITER, shardPath.lastIndexOf(DELIMITER, shardPath.lastIndexOf(DELIMITER) - 1) - 1)
);
IndexId indexId = new IndexId(indexName, getIndexId(parts[0]), Integer.parseInt(parts[len - 2]));
int shardCount = Integer.parseInt(parts[len - 3]);
return new ShardInfo(indexId, shardCount);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid shard path format: " + shardPath, e);
Expand Down

0 comments on commit e78439a

Please sign in to comment.