Skip to content

Commit

Permalink
Move path generation logic to path input
Browse files Browse the repository at this point in the history
Signed-off-by: Himshikha Gupta <[email protected]>
  • Loading branch information
Himshikha Gupta committed Jun 7, 2024
1 parent 41a5f64 commit df1704f
Show file tree
Hide file tree
Showing 16 changed files with 297 additions and 198 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131))
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
- Add upload flow for writing routing table to remote store ([#13870](https://github.com/opensearch-project/OpenSearch/pull/13870))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,9 +738,7 @@ public boolean equals(Object o) {
IndexShardRoutingTable that = (IndexShardRoutingTable) o;

if (!shardId.equals(that.shardId)) return false;
if (!new HashSet<>(shards).equals(new HashSet<>(that.shards))) return false;

return true;
return shards.size() == that.shards.size() && shards.containsAll(that.shards) && that.shards.containsAll(shards);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -61,6 +63,30 @@
*/
public class RemoteRoutingTableService extends AbstractLifecycleComponent {

/**
* This setting is used to set the remote routing table store blob store path type strategy.
*/
public static final Setting<RemoteStoreEnums.PathType> REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>(
"cluster.remote_store.routing_table.path_type",
RemoteStoreEnums.PathType.HASHED_PREFIX.toString(),
RemoteStoreEnums.PathType::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* This setting is used to set the remote routing table store blob store path hash algorithm strategy.
* This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING}
* is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
*/
public static final Setting<RemoteStoreEnums.PathHashAlgorithm> REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>(
"cluster.remote_store.routing_table.path_hash_algo",
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(),
RemoteStoreEnums.PathHashAlgorithm::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String DELIMITER = "__";
Expand All @@ -70,11 +96,29 @@ public class RemoteRoutingTableService extends AbstractLifecycleComponent {
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;
private RemoteStoreEnums.PathType pathType;
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
public RemoteRoutingTableService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings
) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING);
this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING);
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting);
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting);
}

private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) {
this.pathType = pathType;
}

private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) {
this.pathHashAlgo = pathHashAlgo;
}

private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
Expand All @@ -90,6 +134,12 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException {
}
};

/**
* Returns diff between the two routing tables, which includes upserts and deletes.
* @param before previous routing table
* @param after current routing table
* @return diff of the previous and current routing table
*/
public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(
RoutingTable before,
RoutingTable after
Expand All @@ -102,6 +152,15 @@ public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, Index
);
}

/**
* Create async action for writing one {@code IndexRoutingTable} to remote store
* @param clusterState current cluster state
* @param indexRouting indexRoutingTable to write to remote store
* @param latchedActionListener listener for handling async action response
* @param clusterBasePath base path for remote file
* @return returns runnable async action
* @throws IOException exception thrown on failure in writing to remote store
*/
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
ClusterState clusterState,
IndexRoutingTable indexRouting,
Expand All @@ -110,13 +169,13 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
) throws IOException {

BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN);
BlobPath path = RemoteStoreEnums.PathType.HASHED_PREFIX.path(
BlobPath path = pathType.path(
RemoteStorePathStrategy.BasePathInput.builder().basePath(indexRoutingPath).indexUUID(indexRouting.getIndex().getUUID()).build(),
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64
pathHashAlgo
);
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);

final String fileName = getIndexRoutingFileName();
final String fileName = getIndexRoutingFileName(clusterState.term(), clusterState.version());

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(
Expand All @@ -138,6 +197,13 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener);
}

/**
* Combines IndicesRoutingMetadata from previous manifest and current uploaded indices, removes deleted indices.
* @param previousManifest previous manifest, used to get all existing indices routing paths
* @param indicesRoutingUploaded current uploaded indices routings
* @param indicesRoutingToDelete indices to delete
* @return combined list of metadata
*/
public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting(
ClusterMetadataManifest previousManifest,
List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingUploaded,
Expand Down Expand Up @@ -208,9 +274,14 @@ private void uploadIndex(
}
}

private String getIndexRoutingFileName() {
return String.join(DELIMITER, INDEX_ROUTING_FILE_PREFIX, RemoteStoreUtils.invertLong(System.currentTimeMillis()));

private String getIndexRoutingFileName(long term, long version) {
return String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(term),
RemoteStoreUtils.invertLong(version),
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public BlobPath parent() {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BlobPath strings = (BlobPath) o;
return Objects.equals(paths, strings.paths);
BlobPath that = (BlobPath) o;
return Objects.equals(paths, that.paths);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
Expand Down Expand Up @@ -721,6 +722,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
RemoteRoutingTableService.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING,
RemoteRoutingTableService.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING,

// Admission Control Settings
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public RemoteClusterStateService(
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings)
? Optional.of(new RemoteRoutingTableService(repositoriesService, settings))
? Optional.of(new RemoteRoutingTableService(repositoriesService, settings, clusterSettings))
: Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.opensearch.index.remote.RemoteStoreEnums.DataType;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy.RemoteStorePathInput;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -141,7 +141,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
DataCategory dataCategory = entry.getKey();
for (DataType type : entry.getValue()) {
for (int shardNo = 0; shardNo < shardCount; shardNo++) {
RemoteStorePathInput pathInput = RemoteStorePathInput.builder()
PathInput pathInput = PathInput.builder()
.basePath(new BlobPath().add(basePath))
.indexUUID(indexUUID)
.shardId(Integer.toString(shardNo))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.hash.FNV1a;
import org.opensearch.index.remote.RemoteStorePathStrategy.RemoteStorePathInput;

import java.util.HashMap;
import java.util.Locale;
Expand Down Expand Up @@ -95,14 +94,7 @@ public enum PathType {
public BlobPath generatePath(RemoteStorePathStrategy.BasePathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.isNull(hashAlgorithm) : "hashAlgorithm is expected to be null with fixed remote store path type";
// Hash algorithm is not used in FIXED path type
BlobPath path = pathInput.basePath().add(pathInput.indexUUID());
if (pathInput instanceof RemoteStorePathInput) {
RemoteStorePathInput remoteStorePathInput = (RemoteStorePathInput) pathInput;
path.add(remoteStorePathInput.shardId())
.add(remoteStorePathInput.dataCategory().getName())
.add(remoteStorePathInput.dataType().getName());
}
return path;
return pathInput.basePath().add(pathInput.variablePath());
}

@Override
Expand All @@ -114,17 +106,7 @@ boolean requiresHashAlgorithm() {
@Override
public BlobPath generatePath(RemoteStorePathStrategy.BasePathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
BlobPath path = BlobPath.cleanPath()
.add(hashAlgorithm.hash(pathInput))
.add(pathInput.basePath())
.add(pathInput.indexUUID());
if (pathInput instanceof RemoteStorePathInput) {
RemoteStorePathInput remoteStorePathInput = (RemoteStorePathInput) pathInput;
path.add(remoteStorePathInput.shardId())
.add(remoteStorePathInput.dataCategory().getName())
.add(remoteStorePathInput.dataType().getName());
}
return path;
return BlobPath.cleanPath().add(hashAlgorithm.hash(pathInput)).add(pathInput.basePath()).add(pathInput.variablePath());
}

@Override
Expand All @@ -136,14 +118,7 @@ boolean requiresHashAlgorithm() {
@Override
public BlobPath generatePath(RemoteStorePathStrategy.BasePathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
BlobPath path = pathInput.basePath().add(hashAlgorithm.hash(pathInput)).add(pathInput.indexUUID());
if (pathInput instanceof RemoteStorePathInput) {
RemoteStorePathInput remoteStorePathInput = (RemoteStorePathInput) pathInput;
path.add(remoteStorePathInput.shardId())
.add(remoteStorePathInput.dataCategory().getName())
.add(remoteStorePathInput.dataType().getName());
}
return path;
return pathInput.basePath().add(hashAlgorithm.hash(pathInput)).add(pathInput.variablePath());
}

@Override
Expand Down Expand Up @@ -195,16 +170,7 @@ public static PathType fromCode(int code) {
* @return the blob path for the path input.
*/
public BlobPath path(RemoteStorePathStrategy.BasePathInput pathInput, PathHashAlgorithm hashAlgorithm) {
if (pathInput instanceof RemoteStorePathInput) {
RemoteStorePathInput remoteStorePathInput = (RemoteStorePathInput) pathInput;
DataCategory dataCategory = remoteStorePathInput.dataCategory();
DataType dataType = remoteStorePathInput.dataType();
assert dataCategory.isSupportedDataType(dataType) : "category:"
+ dataCategory
+ " type:"
+ dataType
+ " are not supported together";
}

return generatePath(pathInput, hashAlgorithm);
}

Expand Down Expand Up @@ -239,14 +205,11 @@ public enum PathHashAlgorithm {
FNV_1A_BASE64(0) {
@Override
String hash(RemoteStorePathStrategy.BasePathInput pathInput) {
String input = pathInput.indexUUID();
if (pathInput instanceof RemoteStorePathInput) {
RemoteStorePathInput remoteStorePathInput = (RemoteStorePathInput) pathInput;
input += remoteStorePathInput.shardId() + remoteStorePathInput.dataCategory().getName() + remoteStorePathInput
.dataType()
.getName();
StringBuilder input = new StringBuilder();
for (String paths : pathInput.variablePath().toArray()) {
input.append(paths);
}
long hash = FNV1a.hash64(input);
long hash = FNV1a.hash64(input.toString());
return longToUrlBase64(hash);
}
},
Expand All @@ -257,14 +220,11 @@ String hash(RemoteStorePathStrategy.BasePathInput pathInput) {
FNV_1A_COMPOSITE_1(1) {
@Override
String hash(RemoteStorePathStrategy.BasePathInput pathInput) {
String input = pathInput.indexUUID();
if (pathInput instanceof RemoteStorePathInput) {
RemoteStorePathInput remoteStorePathInput = (RemoteStorePathInput) pathInput;
input += remoteStorePathInput.shardId() + remoteStorePathInput.dataCategory().getName() + remoteStorePathInput
.dataType()
.getName();
StringBuilder input = new StringBuilder();
for (String paths : pathInput.variablePath().toArray()) {
input.append(paths);
}
long hash = FNV1a.hash64(input);
long hash = FNV1a.hash64(input.toString());
return longToCompositeBase64AndBinaryEncoding(hash, 20);
}
};
Expand Down
Loading

0 comments on commit df1704f

Please sign in to comment.