diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index ea8f980c14972..a531660cfcd36 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.Diff; import org.opensearch.cluster.routing.IndexRoutingTable; @@ -183,7 +184,8 @@ public List getAllUploadedIndices public void getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - LatchedActionListener latchedActionListener + LatchedActionListener latchedActionListener, + Version version ) { ActionListener actionListener = ActionListener.wrap( @@ -191,7 +193,7 @@ public void getAsyncIndexRoutingReadAction( latchedActionListener::onFailure ); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor, version); remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index 17687199c39d6..0913df63cd89b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.routing.remote; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.Diff; import org.opensearch.cluster.routing.IndexRoutingTable; @@ -71,7 +72,8 @@ public List getAllUploadedIndices public void getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - LatchedActionListener latchedActionListener + LatchedActionListener latchedActionListener, + Version version ) { // noop } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index d7ef3a29aa21f..6fd777a43107e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.routing.remote; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.Diff; import org.opensearch.cluster.routing.IndexRoutingTable; @@ -31,7 +32,8 @@ public interface RemoteRoutingTableService extends LifecycleComponent { void getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - LatchedActionListener latchedActionListener + LatchedActionListener latchedActionListener, + Version version ); void getAsyncIndexRoutingTableDiffReadAction( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 8179b6883523e..84f765ee628d7 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1231,7 +1231,8 @@ ClusterState readClusterStateInParallel( remoteRoutingTableService.getAsyncIndexRoutingReadAction( clusterUUID, indexRouting.getUploadedFilename(), - routingTableLatchedActionListener + routingTableLatchedActionListener, + manifest.getOpensearchVersion() ); } @@ -1366,7 +1367,8 @@ ClusterState readClusterStateInParallel( entry.getValue().getAttributeName(), clusterUUID, blobStoreRepository.getCompressor(), - namedWriteableRegistry + namedWriteableRegistry, + manifest.getOpensearchVersion() ), listener ); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java index e5e44525520f4..64cf79175392e 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.model; +import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterState.Custom; import org.opensearch.common.io.Streams; @@ -65,16 +66,17 @@ public RemoteClusterStateCustoms( final String customType, final String clusterUUID, final Compressor compressor, - final NamedWriteableRegistry namedWriteableRegistry + final NamedWriteableRegistry namedWriteableRegistry, + final Version version ) { super(clusterUUID, compressor, null); this.blobName = blobName; this.customType = customType; this.namedWriteableRegistry = namedWriteableRegistry; - this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>( - "cluster-state-custom", - is -> readFrom(is, namedWriteableRegistry, customType) - ); + this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>("cluster-state-custom", is -> { + is.setVersion(version); + return readFrom(is, namedWriteableRegistry, customType); + }); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java index 46c5074c48eb8..00b800bef1302 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.routingtable; +import org.opensearch.Version; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.common.io.Streams; import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity; @@ -37,8 +38,7 @@ public class RemoteIndexRoutingTable extends AbstractClusterMetadataWriteableBlo private long term; private long version; private BlobPathParameters blobPathParameters; - public static final ChecksumWritableBlobStoreFormat INDEX_ROUTING_TABLE_FORMAT = - new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom); + public ChecksumWritableBlobStoreFormat indexRoutingTableFormat; public RemoteIndexRoutingTable( IndexRoutingTable indexRoutingTable, @@ -52,6 +52,7 @@ public RemoteIndexRoutingTable( this.indexRoutingTable = indexRoutingTable; this.term = term; this.version = version; + this.indexRoutingTableFormat = new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom); } /** @@ -60,12 +61,16 @@ public RemoteIndexRoutingTable( * @param clusterUUID UUID of the cluster * @param compressor Compressor object */ - public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor) { + public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor, Version opensearchVersion) { super(clusterUUID, compressor); this.index = null; this.term = -1; this.version = -1; this.blobName = blobName; + this.indexRoutingTableFormat = new ChecksumWritableBlobStoreFormat<>("index-routing-table", is -> { + is.setVersion(opensearchVersion); + return IndexRoutingTable.readFrom(is); + }); } @Override @@ -104,11 +109,11 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return INDEX_ROUTING_TABLE_FORMAT.serialize(indexRoutingTable, generateBlobFileName(), getCompressor()).streamInput(); + return indexRoutingTableFormat.serialize(indexRoutingTable, generateBlobFileName(), getCompressor()).streamInput(); } @Override public IndexRoutingTable deserialize(InputStream in) throws IOException { - return INDEX_ROUTING_TABLE_FORMAT.deserialize(blobName, Streams.readFully(in)); + return indexRoutingTableFormat.deserialize(blobName, Streams.readFully(in)); } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 63501f878d55d..e861490ee6894 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -35,6 +35,7 @@ import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; @@ -71,7 +72,6 @@ import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_FILE; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; -import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_FORMAT; import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.REMOTE_ROUTING_TABLE_DIFF_FORMAT; import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_FILE; import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_METADATA_PREFIX; @@ -568,8 +568,14 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( + uploadedFileName, + "cluster-uuid", + compressor, + Version.CURRENT + ); when(blobContainer.readBlob(indexName)).thenReturn( - INDEX_ROUTING_TABLE_FORMAT.serialize( + remoteIndexRoutingTable.indexRoutingTableFormat.serialize( clusterState.getRoutingTable().getIndicesRouting().get(indexName), uploadedFileName, compressor @@ -581,7 +587,8 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception { remoteRoutingTableService.getAsyncIndexRoutingReadAction( "cluster-uuid", uploadedFileName, - new LatchedActionListener<>(listener, latch) + new LatchedActionListener<>(listener, latch), + Version.CURRENT ); latch.await(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index 67b1528466a9e..2be1f29d9b960 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -263,7 +264,8 @@ public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedExc custom.getWriteableName(), CLUSTER_UUID, compressor, - namedWriteableRegistry + namedWriteableRegistry, + Version.CURRENT ); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( remoteClusterStateCustoms.clusterStateCustomsFormat.serialize(custom, fileName, compressor).streamInput() diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java index 1b020e13324a4..589e8da1e7c2a 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.model; +import org.opensearch.Version; import org.opensearch.cluster.ClusterState.Custom; import org.opensearch.cluster.SnapshotsInProgress; import org.opensearch.common.blobstore.BlobPath; @@ -101,7 +102,8 @@ public void testClusterUUID() { "test-custom", clusterUUID, compressor, - namedWriteableRegistry + namedWriteableRegistry, + Version.CURRENT ); assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID)); } @@ -123,7 +125,8 @@ public void testFullBlobName() { "test-custom", clusterUUID, compressor, - namedWriteableRegistry + namedWriteableRegistry, + Version.CURRENT ); assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); } @@ -145,7 +148,8 @@ public void testBlobFileName() { "test-custom", clusterUUID, compressor, - namedWriteableRegistry + namedWriteableRegistry, + Version.CURRENT ); assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); } @@ -157,7 +161,8 @@ public void testBlobPathTokens() { "test-custom", clusterUUID, compressor, - namedWriteableRegistry + namedWriteableRegistry, + Version.CURRENT ); assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "clusterStateCustoms" })); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java index 29d4ffa978851..b2dd5c67c47dc 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java @@ -107,7 +107,12 @@ public void testClusterUUID() { ); assertEquals(remoteObjectForUpload.clusterUUID(), clusterUUID); - RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + TEST_BLOB_NAME, + clusterUUID, + compressor, + Version.CURRENT + ); assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID); }); } @@ -137,7 +142,12 @@ public void testFullBlobName() { ); assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); - RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + TEST_BLOB_NAME, + clusterUUID, + compressor, + Version.CURRENT + ); assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); }); } @@ -167,14 +177,24 @@ public void testBlobFileName() { ); assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); - RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + TEST_BLOB_NAME, + clusterUUID, + compressor, + Version.CURRENT + ); assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); }); } public void testBlobPathTokens() { String uploadedFile = "user/local/opensearch/routingTable"; - RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(uploadedFile, clusterUUID, compressor); + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + uploadedFile, + clusterUUID, + compressor, + Version.CURRENT + ); assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "routingTable" })); }