Skip to content

Commit

Permalink
Use opensearch version for deserializing ClusterState.Custom and Rout…
Browse files Browse the repository at this point in the history
…ing table

Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed Oct 27, 2024
1 parent f98da36 commit 7a49332
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -183,15 +184,16 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
public void getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<IndexRoutingTable> latchedActionListener
LatchedActionListener<IndexRoutingTable> latchedActionListener,
Version version
) {

ActionListener<IndexRoutingTable> actionListener = ActionListener.wrap(
latchedActionListener::onResponse,
latchedActionListener::onFailure
);

RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor, version);

remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.cluster.routing.remote;

import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -71,7 +72,8 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
public void getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<IndexRoutingTable> latchedActionListener
LatchedActionListener<IndexRoutingTable> latchedActionListener,
Version version
) {
// noop
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.cluster.routing.remote;

import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand All @@ -31,7 +32,8 @@ public interface RemoteRoutingTableService extends LifecycleComponent {
void getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<IndexRoutingTable> latchedActionListener
LatchedActionListener<IndexRoutingTable> latchedActionListener,
Version version
);

void getAsyncIndexRoutingTableDiffReadAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,8 @@ ClusterState readClusterStateInParallel(
remoteRoutingTableService.getAsyncIndexRoutingReadAction(
clusterUUID,
indexRouting.getUploadedFilename(),
routingTableLatchedActionListener
routingTableLatchedActionListener,
manifest.getOpensearchVersion()
);
}

Expand Down Expand Up @@ -1366,7 +1367,8 @@ ClusterState readClusterStateInParallel(
entry.getValue().getAttributeName(),
clusterUUID,
blobStoreRepository.getCompressor(),
namedWriteableRegistry
namedWriteableRegistry,
manifest.getOpensearchVersion()
),
listener
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote.model;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterState.Custom;
import org.opensearch.common.io.Streams;
Expand Down Expand Up @@ -65,16 +66,17 @@ public RemoteClusterStateCustoms(
final String customType,
final String clusterUUID,
final Compressor compressor,
final NamedWriteableRegistry namedWriteableRegistry
final NamedWriteableRegistry namedWriteableRegistry,
final Version version
) {
super(clusterUUID, compressor, null);
this.blobName = blobName;
this.customType = customType;
this.namedWriteableRegistry = namedWriteableRegistry;
this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>(
"cluster-state-custom",
is -> readFrom(is, namedWriteableRegistry, customType)
);
this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>("cluster-state-custom", is -> {
is.setVersion(version);
return readFrom(is, namedWriteableRegistry, customType);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote.routingtable;

import org.opensearch.Version;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
Expand Down Expand Up @@ -37,8 +38,7 @@ public class RemoteIndexRoutingTable extends AbstractClusterMetadataWriteableBlo
private long term;
private long version;
private BlobPathParameters blobPathParameters;
public static final ChecksumWritableBlobStoreFormat<IndexRoutingTable> INDEX_ROUTING_TABLE_FORMAT =
new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom);
public ChecksumWritableBlobStoreFormat<IndexRoutingTable> indexRoutingTableFormat;

public RemoteIndexRoutingTable(
IndexRoutingTable indexRoutingTable,
Expand All @@ -52,6 +52,7 @@ public RemoteIndexRoutingTable(
this.indexRoutingTable = indexRoutingTable;
this.term = term;
this.version = version;
this.indexRoutingTableFormat = new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom);
}

/**
Expand All @@ -60,12 +61,16 @@ public RemoteIndexRoutingTable(
* @param clusterUUID UUID of the cluster
* @param compressor Compressor object
*/
public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor) {
public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor, Version opensearchVersion) {
super(clusterUUID, compressor);
this.index = null;
this.term = -1;
this.version = -1;
this.blobName = blobName;
this.indexRoutingTableFormat = new ChecksumWritableBlobStoreFormat<>("index-routing-table", is -> {
is.setVersion(opensearchVersion);
return IndexRoutingTable.readFrom(is);
});
}

@Override
Expand Down Expand Up @@ -104,11 +109,11 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return INDEX_ROUTING_TABLE_FORMAT.serialize(indexRoutingTable, generateBlobFileName(), getCompressor()).streamInput();
return indexRoutingTableFormat.serialize(indexRoutingTable, generateBlobFileName(), getCompressor()).streamInput();
}

@Override
public IndexRoutingTable deserialize(InputStream in) throws IOException {
return INDEX_ROUTING_TABLE_FORMAT.deserialize(blobName, Streams.readFully(in));
return indexRoutingTableFormat.deserialize(blobName, Streams.readFully(in));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
Expand Down Expand Up @@ -71,7 +72,6 @@
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_FILE;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_FORMAT;
import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.REMOTE_ROUTING_TABLE_DIFF_FORMAT;
import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_FILE;
import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_METADATA_PREFIX;
Expand Down Expand Up @@ -568,8 +568,14 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception {
String indexName = randomAlphaOfLength(randomIntBetween(1, 50));
ClusterState clusterState = createClusterState(indexName);
String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName);
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(
uploadedFileName,
"cluster-uuid",
compressor,
Version.CURRENT
);
when(blobContainer.readBlob(indexName)).thenReturn(
INDEX_ROUTING_TABLE_FORMAT.serialize(
remoteIndexRoutingTable.indexRoutingTableFormat.serialize(
clusterState.getRoutingTable().getIndicesRouting().get(indexName),
uploadedFileName,
compressor
Expand All @@ -581,7 +587,8 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception {
remoteRoutingTableService.getAsyncIndexRoutingReadAction(
"cluster-uuid",
uploadedFileName,
new LatchedActionListener<>(listener, latch)
new LatchedActionListener<>(listener, latch),
Version.CURRENT
);
latch.await();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote;

import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -263,7 +264,8 @@ public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedExc
custom.getWriteableName(),
CLUSTER_UUID,
compressor,
namedWriteableRegistry
namedWriteableRegistry,
Version.CURRENT
);
when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn(
remoteClusterStateCustoms.clusterStateCustomsFormat.serialize(custom, fileName, compressor).streamInput()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote.model;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterState.Custom;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -101,7 +102,8 @@ public void testClusterUUID() {
"test-custom",
clusterUUID,
compressor,
namedWriteableRegistry
namedWriteableRegistry,
Version.CURRENT
);
assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID));
}
Expand All @@ -123,7 +125,8 @@ public void testFullBlobName() {
"test-custom",
clusterUUID,
compressor,
namedWriteableRegistry
namedWriteableRegistry,
Version.CURRENT
);
assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME));
}
Expand All @@ -145,7 +148,8 @@ public void testBlobFileName() {
"test-custom",
clusterUUID,
compressor,
namedWriteableRegistry
namedWriteableRegistry,
Version.CURRENT
);
assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME));
}
Expand All @@ -157,7 +161,8 @@ public void testBlobPathTokens() {
"test-custom",
clusterUUID,
compressor,
namedWriteableRegistry
namedWriteableRegistry,
Version.CURRENT
);
assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "clusterStateCustoms" }));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ public void testClusterUUID() {
);
assertEquals(remoteObjectForUpload.clusterUUID(), clusterUUID);

RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor);
RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(
TEST_BLOB_NAME,
clusterUUID,
compressor,
Version.CURRENT
);
assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID);
});
}
Expand Down Expand Up @@ -137,7 +142,12 @@ public void testFullBlobName() {
);
assertThat(remoteObjectForUpload.getFullBlobName(), nullValue());

RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor);
RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(
TEST_BLOB_NAME,
clusterUUID,
compressor,
Version.CURRENT
);
assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME));
});
}
Expand Down Expand Up @@ -167,14 +177,24 @@ public void testBlobFileName() {
);
assertThat(remoteObjectForUpload.getBlobFileName(), nullValue());

RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor);
RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(
TEST_BLOB_NAME,
clusterUUID,
compressor,
Version.CURRENT
);
assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME));
});
}

public void testBlobPathTokens() {
String uploadedFile = "user/local/opensearch/routingTable";
RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(uploadedFile, clusterUUID, compressor);
RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(
uploadedFile,
clusterUUID,
compressor,
Version.CURRENT
);
assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "routingTable" }));
}

Expand Down

0 comments on commit 7a49332

Please sign in to comment.