Skip to content

Commit

Permalink
[Remote Store] Rate limiter for low priority uploads (#14374) (#14421)
Browse files Browse the repository at this point in the history
---------
Signed-off-by: Gaurav Bafna <[email protected]>
(cherry picked from commit e22b651)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent f8a0448 commit ec0bed2
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))

### Dependencies
- Update to Apache Lucene 9.11.0 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,32 @@
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.admin.indices.shrink.ResizeType;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.VersionUtils;
import org.junit.Before;

import java.util.concurrent.ExecutionException;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteCloneIndexIT extends RemoteStoreBaseIntegTestCase {
Expand All @@ -69,6 +77,11 @@ protected boolean forbidPrivateIndexSettings() {
return false;
}

@Before
public void setup() {
asyncUploadMockFsRepo = true;
}

public void testCreateCloneIndex() {
Version version = VersionUtils.randomIndexCompatibleVersion(random());
int numPrimaryShards = randomIntBetween(1, 5);
Expand Down Expand Up @@ -140,6 +153,79 @@ public void testCreateCloneIndex() {

}

public void testCreateCloneIndexLowPriorityRateLimit() {
Version version = VersionUtils.randomIndexCompatibleVersion(random());
int numPrimaryShards = 1;
prepareCreate("source").setSettings(
Settings.builder().put(indexSettings()).put("number_of_shards", numPrimaryShards).put("index.version.created", version)
).get();
final int docs = randomIntBetween(0, 128);
for (int i = 0; i < docs; i++) {
client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
}
ByteSizeValue shardSize = client().admin().indices().prepareStats("source").execute().actionGet().getShards()[0].getStats()
.getStore()
.size();
logger.info("Shard size is {}", shardSize);
internalCluster().ensureAtLeastNumDataNodes(2);
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below.
ensureGreen();
// relocate all shards to one node such that we can merge it.
client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get();
ensureGreen();

// disable rebalancing to be able to capture the right stats. balancing can move the target primary
// making it hard to pin point the source shards.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"))
.get();
try {
// apply rate limiter
setLowPriorityUploadRate(REPOSITORY_NAME, "1kb");
assertAcked(
client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setResizeType(ResizeType.CLONE)
.setSettings(Settings.builder().put("index.number_of_replicas", 0).putNull("index.blocks.write").build())
.get()
);
ensureGreen();
long uploadPauseTime = 0L;
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
uploadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getLowPriorityRemoteUploadThrottleTimeInNanos();
}
assertThat(uploadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(5, 10)).nanos()));
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// clean up
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null)
.put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null)
)
.get();
}
}

protected void setLowPriorityUploadRate(String repoName, String value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put("max_remote_low_priority_upload_bytes_per_sec", value);
assertAcked(client().admin().cluster().preparePutRepository(repoName).setType(rmd.type()).setSettings(settings).get());
}

public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException {
asyncUploadMockFsRepo = false;
Version version = VersionUtils.randomIndexCompatibleVersion(random());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;

public class MockFsMetadataSupportedRepository extends FsRepository {
public class MockFsMetadataSupportedRepository extends ReloadableFsRepository {

public static Setting<Boolean> TRIGGER_DATA_INTEGRITY_FAILURE = Setting.boolSetting(
"mock_fs_repository.trigger_data_integrity_failure",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class RemoteDirectory extends Directory {

private final UnaryOperator<OffsetRangeInputStream> uploadRateLimiter;

private final UnaryOperator<OffsetRangeInputStream> lowPriorityUploadRateLimiter;

private final UnaryOperator<InputStream> downloadRateLimiter;

/**
Expand All @@ -76,15 +78,17 @@ public BlobContainer getBlobContainer() {
}

public RemoteDirectory(BlobContainer blobContainer) {
this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity());
this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity(), UnaryOperator.identity());
}

public RemoteDirectory(
BlobContainer blobContainer,
UnaryOperator<OffsetRangeInputStream> uploadRateLimiter,
UnaryOperator<OffsetRangeInputStream> lowPriorityUploadRateLimiter,
UnaryOperator<InputStream> downloadRateLimiter
) {
this.blobContainer = blobContainer;
this.lowPriorityUploadRateLimiter = lowPriorityUploadRateLimiter;
this.uploadRateLimiter = uploadRateLimiter;
this.downloadRateLimiter = downloadRateLimiter;
}
Expand Down Expand Up @@ -357,13 +361,23 @@ private void uploadBlob(
remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported();
}
lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15);
RemoteTransferContainer.OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
if (lowPriorityUpload) {
offsetRangeInputStreamSupplier = (size, position) -> lowPriorityUploadRateLimiter.apply(
new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)
);
} else {
offsetRangeInputStreamSupplier = (size, position) -> uploadRateLimiter.apply(
new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)
);
}
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
src,
remoteFileName,
contentLength,
true,
lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL,
(size, position) -> uploadRateLimiter.apply(new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)),
offsetRangeInputStreamSupplier,
expectedChecksum,
remoteIntegrityEnabled
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
RemoteDirectory dataDirectory = new RemoteDirectory(
blobStoreRepository.blobStore().blobContainer(dataPath),
blobStoreRepository::maybeRateLimitRemoteUploadTransfers,
blobStoreRepository::maybeRateLimitLowPriorityRemoteUploadTransfers,
blobStoreRepository::maybeRateLimitRemoteDownloadTransfers
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public long getRemoteUploadThrottleTimeInNanos() {
return in.getRemoteUploadThrottleTimeInNanos();
}

@Override
public long getLowPriorityRemoteUploadThrottleTimeInNanos() {
return in.getRemoteUploadThrottleTimeInNanos();
}

@Override
public long getRemoteDownloadThrottleTimeInNanos() {
return in.getRemoteDownloadThrottleTimeInNanos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,17 @@ default void deleteSnapshotsAndReleaseLockFiles(
long getRestoreThrottleTimeInNanos();

/**
* Returns restore throttle time in nanoseconds
* Returns upload throttle time in nanoseconds
*/
long getRemoteUploadThrottleTimeInNanos();

/**
* Returns low priority upload throttle time in nanoseconds
*/
default long getLowPriorityRemoteUploadThrottleTimeInNanos() {
return 0;
}

/**
* Returns restore throttle time in nanoseconds
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private volatile RateLimiter remoteUploadRateLimiter;

private volatile RateLimiter remoteUploadLowPriorityRateLimiter;

private volatile RateLimiter remoteDownloadRateLimiter;

private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric();
Expand All @@ -327,6 +329,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final CounterMetric remoteUploadRateLimitingTimeInNanos = new CounterMetric();

private final CounterMetric remoteUploadLowPriorityRateLimitingTimeInNanos = new CounterMetric();

public static final ChecksumBlobStoreFormat<Metadata> GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"metadata",
METADATA_NAME_FORMAT,
Expand Down Expand Up @@ -449,6 +453,11 @@ private void readRepositoryMetadata(RepositoryMetadata repositoryMetadata) {
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO);
remoteUploadLowPriorityRateLimiter = getRateLimiter(
metadata.settings(),
"max_remote_low_priority_upload_bytes_per_sec",
ByteSizeValue.ZERO
);
remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO);
readOnly = READONLY_SETTING.get(metadata.settings());
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
Expand Down Expand Up @@ -1968,6 +1977,11 @@ public long getRemoteUploadThrottleTimeInNanos() {
return remoteUploadRateLimitingTimeInNanos.count();
}

@Override
public long getLowPriorityRemoteUploadThrottleTimeInNanos() {
return remoteUploadLowPriorityRateLimitingTimeInNanos.count();
}

@Override
public long getRemoteDownloadThrottleTimeInNanos() {
return remoteDownloadRateLimitingTimeInNanos.count();
Expand Down Expand Up @@ -3315,6 +3329,20 @@ public OffsetRangeInputStream maybeRateLimitRemoteUploadTransfers(OffsetRangeInp
);
}

public OffsetRangeInputStream maybeRateLimitLowPriorityRemoteUploadTransfers(OffsetRangeInputStream offsetRangeInputStream) {
return maybeRateLimitRemoteTransfers(
maybeRateLimitRemoteTransfers(
offsetRangeInputStream,
() -> remoteUploadRateLimiter,
remoteUploadRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_UPLOAD
),
() -> remoteUploadLowPriorityRateLimiter,
remoteUploadLowPriorityRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_UPLOAD
);
}

public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream) {
return maybeRateLimit(
maybeRateLimit(
Expand Down

0 comments on commit ec0bed2

Please sign in to comment.