From bb131f99f41bb5f98e7b41b4a048c55dbd1af1d4 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Mon, 4 Nov 2024 11:30:54 -0500 Subject: [PATCH 01/15] Bump versions in README to 2.19.0 and 2.18.1 (#16554) Signed-off-by: Craig Perkins --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 95fbac7bbecf1..6ae7e12948670 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,8 @@ [![Security Vulnerabilities](https://img.shields.io/github/issues/opensearch-project/OpenSearch/security%20vulnerability?labelColor=red)](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3A"security%20vulnerability") [![Open Issues](https://img.shields.io/github/issues/opensearch-project/OpenSearch)](https://github.com/opensearch-project/OpenSearch/issues) [![Open Pull Requests](https://img.shields.io/github/issues-pr/opensearch-project/OpenSearch)](https://github.com/opensearch-project/OpenSearch/pulls) -[![2.18.0 Open Issues](https://img.shields.io/github/issues/opensearch-project/OpenSearch/v2.18.0)](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3A"v2.18.0") -[![2.17.2 Open Issues](https://img.shields.io/github/issues/opensearch-project/OpenSearch/v2.17.2)](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3A"v2.17.2") +[![2.19.0 Open Issues](https://img.shields.io/github/issues/opensearch-project/OpenSearch/v2.19.0)](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3A"v2.19.0") +[![2.18.1 Open Issues](https://img.shields.io/github/issues/opensearch-project/OpenSearch/v2.18.1)](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3A"v2.18.1") [![3.0.0 Open Issues](https://img.shields.io/github/issues/opensearch-project/OpenSearch/v3.0.0)](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3A"v3.0.0") [![GHA gradle check](https://github.com/opensearch-project/OpenSearch/actions/workflows/gradle-check.yml/badge.svg)](https://github.com/opensearch-project/OpenSearch/actions/workflows/gradle-check.yml) [![GHA validate pull request](https://github.com/opensearch-project/OpenSearch/actions/workflows/wrapper.yml/badge.svg)](https://github.com/opensearch-project/OpenSearch/actions/workflows/wrapper.yml) From f32f5c66fa19286881eaed1160798a6883ba7d5d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Nov 2024 12:02:57 -0500 Subject: [PATCH 02/15] Bump org.apache.hadoop:hadoop-minicluster from 3.4.0 to 3.4.1 in /test/fixtures/hdfs-fixture (#16550) * Bump org.apache.hadoop:hadoop-minicluster in /test/fixtures/hdfs-fixture Bumps org.apache.hadoop:hadoop-minicluster from 3.4.0 to 3.4.1. --- updated-dependencies: - dependency-name: org.apache.hadoop:hadoop-minicluster dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 1 + test/fixtures/hdfs-fixture/build.gradle | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index edbf7c8ed065c..52c4d5a0b478a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521)) - Bump `com.google.apis:google-api-services-compute` from v1-rev20240407-2.0.0 to v1-rev20241015-2.0.0 ([#16502](https://github.com/opensearch-project/OpenSearch/pull/16502)) - Bump `com.azure:azure-storage-blob` from 12.23.0 to 12.28.1 ([#16501](https://github.com/opensearch-project/OpenSearch/pull/16501)) +- Bump `org.apache.hadoop:hadoop-minicluster` from 3.4.0 to 3.4.1 ([#16550](https://github.com/opensearch-project/OpenSearch/pull/16550)) ### Changed diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index d27273f357758..18bcee8b338fc 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -37,7 +37,7 @@ versions << [ ] dependencies { - api("org.apache.hadoop:hadoop-minicluster:3.4.0") { + api("org.apache.hadoop:hadoop-minicluster:3.4.1") { exclude module: 'websocket-client' exclude module: 'jettison' exclude module: 'netty' From 4c35a2b418aa4cf14b8f49d7639d386b96752ae8 Mon Sep 17 00:00:00 2001 From: kkewwei Date: Tue, 5 Nov 2024 05:23:32 +0800 Subject: [PATCH 03/15] fix rollover alias supports restored searchable snapshot index (#16483) Signed-off-by: kkewwei Signed-off-by: kkewwei --- CHANGELOG.md | 1 + .../alias/TransportIndicesAliasesAction.java | 4 +- .../rollover/TransportRolloverAction.java | 13 +- .../put/TransportUpdateSettingsAction.java | 7 +- .../cluster/block/ClusterBlocks.java | 26 ++- .../cluster/block/ClusterBlocksTests.java | 177 ++++++++++++++++++ 6 files changed, 215 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 52c4d5a0b478a..bbb30d78aa5d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Ensure index templates are not applied to system indices ([#16418](https://github.com/opensearch-project/OpenSearch/pull/16418)) - Remove resource usages object from search response headers ([#16532](https://github.com/opensearch-project/OpenSearch/pull/16532)) - Support retrieving doc values of unsigned long field ([#16543](https://github.com/opensearch-project/OpenSearch/pull/16543)) +- Fix rollover alias supports restored searchable snapshot index([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483)) ### Security diff --git a/server/src/main/java/org/opensearch/action/admin/indices/alias/TransportIndicesAliasesAction.java b/server/src/main/java/org/opensearch/action/admin/indices/alias/TransportIndicesAliasesAction.java index 81cb3102cfcb9..42e02e9e1aff8 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/alias/TransportIndicesAliasesAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/alias/TransportIndicesAliasesAction.java @@ -41,7 +41,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; import org.opensearch.cluster.block.ClusterBlockException; -import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.AliasAction; import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.metadata.IndexAbstraction; @@ -123,7 +123,7 @@ protected ClusterBlockException checkBlock(IndicesAliasesRequest request, Cluste for (IndicesAliasesRequest.AliasActions aliasAction : request.aliasActions()) { Collections.addAll(indices, aliasAction.indices()); } - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indices.toArray(new String[0])); + return ClusterBlocks.indicesWithRemoteSnapshotBlockedException(indices, state); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java index 3b11a3d82d707..28d1d14655e3b 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -44,7 +44,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.block.ClusterBlockException; -import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; @@ -62,8 +62,10 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -127,11 +129,10 @@ protected ClusterBlockException checkBlock(RolloverRequest request, ClusterState request.indicesOptions().expandWildcardsClosed() ); - return state.blocks() - .indicesBlockedException( - ClusterBlockLevel.METADATA_WRITE, - indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request) - ); + return ClusterBlocks.indicesWithRemoteSnapshotBlockedException( + new HashSet<>(Arrays.asList(indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request))), + state + ); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java index 09cceca52ce23..779b136abef5c 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java @@ -42,7 +42,7 @@ import org.opensearch.cluster.ack.ClusterStateUpdateResponse; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; -import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MetadataUpdateSettingsService; import org.opensearch.cluster.service.ClusterService; @@ -118,9 +118,8 @@ protected ClusterBlockException checkBlock(UpdateSettingsRequest request, Cluste return globalBlock; } if (request.settings().size() == 1 && // we have to allow resetting these settings otherwise users can't unblock an index - IndexMetadata.INDEX_BLOCKS_METADATA_SETTING.exists(request.settings()) - || IndexMetadata.INDEX_READ_ONLY_SETTING.exists(request.settings()) - || IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.exists(request.settings())) { + ClusterBlocks.INDEX_DATA_READ_ONLY_BLOCK_SETTINGS.stream() + .anyMatch(booleanSetting -> booleanSetting.exists(request.settings()))) { return null; } diff --git a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java index 615ea18315cd1..c894fa5dce714 100644 --- a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java @@ -33,19 +33,23 @@ package org.opensearch.cluster.block; import org.opensearch.cluster.AbstractDiffable; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.Diff; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MetadataIndexStateService; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.settings.Setting; import org.opensearch.common.util.set.Sets; import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.VerifiableWriteable; import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.IndexModule; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; @@ -57,6 +61,7 @@ import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableSet; import static java.util.stream.Collectors.toSet; +import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; /** * Represents current cluster level blocks to block dirty operations done against the cluster. @@ -66,7 +71,11 @@ @PublicApi(since = "1.0.0") public class ClusterBlocks extends AbstractDiffable implements VerifiableWriteable { public static final ClusterBlocks EMPTY_CLUSTER_BLOCK = new ClusterBlocks(emptySet(), Map.of()); - + public static final Set> INDEX_DATA_READ_ONLY_BLOCK_SETTINGS = Set.of( + IndexMetadata.INDEX_READ_ONLY_SETTING, + IndexMetadata.INDEX_BLOCKS_METADATA_SETTING, + IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING + ); private final Set global; private final Map> indicesBlocks; @@ -276,6 +285,21 @@ public ClusterBlockException indicesAllowReleaseResources(String[] indices) { return new ClusterBlockException(indexLevelBlocks); } + public static ClusterBlockException indicesWithRemoteSnapshotBlockedException(Collection concreteIndices, ClusterState state) { + for (String index : concreteIndices) { + if (state.blocks().indexBlocked(ClusterBlockLevel.METADATA_WRITE, index)) { + IndexMetadata indexMeta = state.metadata().index(index); + if (indexMeta != null + && (IndexModule.Type.REMOTE_SNAPSHOT.match(indexMeta.getSettings().get(INDEX_STORE_TYPE_SETTING.getKey())) == false + || ClusterBlocks.INDEX_DATA_READ_ONLY_BLOCK_SETTINGS.stream() + .anyMatch(booleanSetting -> booleanSetting.exists(indexMeta.getSettings())))) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices.toArray(new String[0])); + } + } + } + return null; + } + @Override public String toString() { if (global.isEmpty() && indices().isEmpty()) { diff --git a/server/src/test/java/org/opensearch/cluster/block/ClusterBlocksTests.java b/server/src/test/java/org/opensearch/cluster/block/ClusterBlocksTests.java index 839e831d38b1b..47e3d0cb44cc9 100644 --- a/server/src/test/java/org/opensearch/cluster/block/ClusterBlocksTests.java +++ b/server/src/test/java/org/opensearch/cluster/block/ClusterBlocksTests.java @@ -8,12 +8,40 @@ package org.opensearch.cluster.block; +import com.carrotsearch.randomizedtesting.RandomizedTest; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.AliasMetadata; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; import org.opensearch.test.OpenSearchTestCase; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import static org.opensearch.cluster.block.ClusterBlockTests.randomClusterBlock; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_METADATA_BLOCK; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_BLOCK; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_WRITE_BLOCK; +import static org.opensearch.cluster.metadata.IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; public class ClusterBlocksTests extends OpenSearchTestCase { @@ -52,4 +80,153 @@ public void testWriteVerifiableTo() throws Exception { clusterBlocks2.writeVerifiableTo(checksumOut2); assertEquals(checksumOut.getChecksum(), checksumOut2.getChecksum()); } + + public void testGlobalBlock() { + String index = "test-000001"; + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName("cluster")); + Set indices = new HashSet<>(); + indices.add(index); + + // no global blocks + { + stateBuilder.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK); + ClusterState clusterState = stateBuilder.build(); + clusterState.blocks(); + assertNull(ClusterBlocks.indicesWithRemoteSnapshotBlockedException(indices, clusterState)); + } + + // has global block + { + for (ClusterBlock block : Arrays.asList( + INDEX_READ_ONLY_BLOCK, + INDEX_READ_BLOCK, + INDEX_WRITE_BLOCK, + INDEX_METADATA_BLOCK, + INDEX_READ_ONLY_ALLOW_DELETE_BLOCK, + REMOTE_READ_ONLY_ALLOW_DELETE + )) { + stateBuilder.blocks(ClusterBlocks.builder().addGlobalBlock(block).build()); + ClusterState clusterState = stateBuilder.build(); + clusterState.blocks(); + assertNull(ClusterBlocks.indicesWithRemoteSnapshotBlockedException(indices, clusterState)); + } + } + } + + public void testIndexWithBlock() { + String index = "test-000001"; + Set indices = new HashSet<>(); + indices.add(index); + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName("cluster")); + stateBuilder.blocks(ClusterBlocks.builder().addIndexBlock(index, IndexMetadata.INDEX_METADATA_BLOCK)); + stateBuilder.metadata(Metadata.builder().put(createIndexMetadata(index, false, null, null), false)); + ClusterState clusterState = stateBuilder.build(); + clusterState.blocks(); + assertNotNull(ClusterBlocks.indicesWithRemoteSnapshotBlockedException(indices, stateBuilder.build())); + } + + public void testRemoteIndexBlock() { + String remoteIndex = "remote_index"; + Set indices = new HashSet<>(); + indices.add(remoteIndex); + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName("cluster")); + + { + IndexMetadata remoteSnapshotIndexMetadata = createIndexMetadata(remoteIndex, true, null, null); + stateBuilder.metadata(Metadata.builder().put(remoteSnapshotIndexMetadata, false)); + stateBuilder.blocks(ClusterBlocks.builder().addBlocks(remoteSnapshotIndexMetadata)); + + ClusterState clusterState = stateBuilder.build(); + assertTrue(clusterState.blocks().hasIndexBlock(remoteIndex, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE)); + clusterState.blocks(); + assertNull(ClusterBlocks.indicesWithRemoteSnapshotBlockedException(indices, clusterState)); + } + + // searchable snapshot index with block + { + Setting setting = RandomizedTest.randomFrom(new ArrayList<>(ClusterBlocks.INDEX_DATA_READ_ONLY_BLOCK_SETTINGS)); + IndexMetadata remoteSnapshotIndexMetadata = createIndexMetadata(remoteIndex, true, null, setting); + stateBuilder.metadata(Metadata.builder().put(remoteSnapshotIndexMetadata, false)); + stateBuilder.blocks(ClusterBlocks.builder().addBlocks(remoteSnapshotIndexMetadata)); + ClusterState clusterState = stateBuilder.build(); + clusterState.blocks(); + assertNotNull(ClusterBlocks.indicesWithRemoteSnapshotBlockedException(indices, clusterState)); + } + } + + public void testRemoteIndexWithoutBlock() { + String remoteIndex = "remote_index"; + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName("cluster")); + + String alias = "alias1"; + IndexMetadata remoteSnapshotIndexMetadata = createIndexMetadata(remoteIndex, true, alias, null); + String index = "test-000001"; + IndexMetadata indexMetadata = createIndexMetadata(index, false, alias, null); + stateBuilder.metadata(Metadata.builder().put(remoteSnapshotIndexMetadata, false).put(indexMetadata, false)); + + Set indices = new HashSet<>(); + indices.add(remoteIndex); + ClusterState clusterState = stateBuilder.build(); + clusterState.blocks(); + assertNull(ClusterBlocks.indicesWithRemoteSnapshotBlockedException(indices, clusterState)); + } + + public void testRemoteIndexWithIndexBlock() { + String index = "test-000001"; + String remoteIndex = "remote_index"; + String alias = "alias1"; + { + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName("cluster")); + IndexMetadata remoteSnapshotIndexMetadata = createIndexMetadata(remoteIndex, true, alias, null); + IndexMetadata indexMetadata = createIndexMetadata(index, false, alias, null); + stateBuilder.metadata(Metadata.builder().put(remoteSnapshotIndexMetadata, false).put(indexMetadata, false)) + .blocks(ClusterBlocks.builder().addBlocks(remoteSnapshotIndexMetadata)); + ClusterState clusterState = stateBuilder.build(); + clusterState.blocks(); + assertNull(ClusterBlocks.indicesWithRemoteSnapshotBlockedException(Collections.singleton(index), clusterState)); + clusterState.blocks(); + assertNull(ClusterBlocks.indicesWithRemoteSnapshotBlockedException(Collections.singleton(remoteIndex), clusterState)); + } + + { + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName("cluster")); + Setting setting = RandomizedTest.randomFrom(new ArrayList<>(ClusterBlocks.INDEX_DATA_READ_ONLY_BLOCK_SETTINGS)); + IndexMetadata remoteSnapshotIndexMetadata = createIndexMetadata(remoteIndex, true, alias, setting); + IndexMetadata indexMetadata = createIndexMetadata(index, false, alias, null); + stateBuilder.metadata(Metadata.builder().put(remoteSnapshotIndexMetadata, false).put(indexMetadata, false)) + .blocks(ClusterBlocks.builder().addBlocks(remoteSnapshotIndexMetadata)); + ClusterState clusterState = stateBuilder.build(); + assertNull(ClusterBlocks.indicesWithRemoteSnapshotBlockedException(Collections.singleton(index), clusterState)); + assertNotNull(ClusterBlocks.indicesWithRemoteSnapshotBlockedException(Collections.singleton(remoteIndex), clusterState)); + } + } + + private IndexMetadata createIndexMetadata(String index, boolean isRemoteIndex, String alias, Setting blockSetting) { + IndexMetadata.Builder builder = IndexMetadata.builder(index).settings(createIndexSettingBuilder(isRemoteIndex, blockSetting)); + if (alias != null) { + AliasMetadata.Builder aliasBuilder = AliasMetadata.builder(alias); + return builder.putAlias(aliasBuilder.build()).build(); + } + return builder.build(); + } + + private Settings.Builder createIndexSettingBuilder(boolean isRemoteIndex, Setting blockSetting) { + Settings.Builder builder = Settings.builder() + .put(IndexMetadata.SETTING_INDEX_UUID, "abc") + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 1); + + if (isRemoteIndex) { + builder.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + .put(IndexSettings.SEARCHABLE_SNAPSHOT_REPOSITORY.getKey(), "repo") + .put(IndexSettings.SEARCHABLE_SNAPSHOT_ID_NAME.getKey(), "snapshot"); + } + if (blockSetting != null) { + builder.put(blockSetting.getKey(), true); + } + + return builder; + } } From b25e10afb9e216c547a59409d909ec1ecae101ec Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 5 Nov 2024 13:53:35 -0500 Subject: [PATCH 04/15] Ensure support of the transport-nio by security plugin (HTTP) (#16474) * Ensure support of the transport-nio by security plugin (HTTP) Signed-off-by: Andriy Redko * Add header verifier and decompressor support of secure NIO transport variant Signed-off-by: Andriy Redko --------- Signed-off-by: Andriy Redko --- CHANGELOG.md | 1 + .../ssl/SecureNetty4HttpServerTransport.java | 4 +- .../http/netty4/Netty4HttpClient.java | 2 - plugins/transport-nio/build.gradle | 5 +- ...-native-unix-common-4.1.114.Final.jar.sha1 | 1 + .../opensearch/http/nio/NioPipeliningIT.java | 4 +- .../http/nio/HttpReadWriteHandler.java | 39 +- .../org/opensearch/http/nio/NettyAdaptor.java | 20 +- .../http/nio/NioHttpServerTransport.java | 130 +++- .../org/opensearch/http/nio/ssl/SslUtils.java | 48 ++ .../opensearch/http/nio/ssl/package-info.java | 12 + .../transport/nio/NioTransportPlugin.java | 35 ++ .../opensearch/http/nio/NioHttpClient.java | 54 +- .../http/nio/NioHttpServerTransportTests.java | 12 +- .../SecureNioHttpServerTransportTests.java | 558 ++++++++++++++++++ .../src/test/resources/README.txt | 14 + .../src/test/resources/certificate.crt | 22 + .../src/test/resources/certificate.key | 28 + .../reactor/netty4/ReactorHttpClient.java | 4 +- .../SecureHttpTransportSettingsProvider.java | 10 + 20 files changed, 966 insertions(+), 37 deletions(-) create mode 100644 plugins/transport-nio/licenses/netty-transport-native-unix-common-4.1.114.Final.jar.sha1 create mode 100644 plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/SslUtils.java create mode 100644 plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/package-info.java create mode 100644 plugins/transport-nio/src/test/java/org/opensearch/http/nio/ssl/SecureNioHttpServerTransportTests.java create mode 100644 plugins/transport-nio/src/test/resources/README.txt create mode 100644 plugins/transport-nio/src/test/resources/certificate.crt create mode 100644 plugins/transport-nio/src/test/resources/certificate.key diff --git a/CHANGELOG.md b/CHANGELOG.md index bbb30d78aa5d0..bba62e97a49e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Latency and Memory allocation improvements to Multi Term Aggregation queries ([#14993](https://github.com/opensearch-project/OpenSearch/pull/14993)) - Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111)) +- Ensure support of the transport-nio by security plugin ([#16474](https://github.com/opensearch-project/OpenSearch/pull/16474)) - Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795)) - Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471)) - Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284)) diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransport.java index 978c92870bd75..e3a6dbb4c57b5 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransport.java @@ -67,8 +67,8 @@ * @see SecuritySSLNettyHttpServerTransport */ public class SecureNetty4HttpServerTransport extends Netty4HttpServerTransport { - public static final String REQUEST_HEADER_VERIFIER = "HeaderVerifier"; - public static final String REQUEST_DECOMPRESSOR = "RequestDecompressor"; + public static final String REQUEST_HEADER_VERIFIER = SecureHttpTransportSettingsProvider.REQUEST_HEADER_VERIFIER; + public static final String REQUEST_DECOMPRESSOR = SecureHttpTransportSettingsProvider.REQUEST_DECOMPRESSOR; private static final Logger logger = LogManager.getLogger(SecureNetty4HttpServerTransport.class); private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java index ef6b67ea44299..cf841f2e24b1e 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java @@ -315,13 +315,11 @@ private static class CountDownLatchHandlerHttp2 extends AwaitableChannelInitiali private final CountDownLatch latch; private final Collection content; - private final boolean secure; private Http2SettingsHandler settingsHandler; CountDownLatchHandlerHttp2(final CountDownLatch latch, final Collection content, final boolean secure) { this.latch = latch; this.content = content; - this.secure = secure; } @Override diff --git a/plugins/transport-nio/build.gradle b/plugins/transport-nio/build.gradle index ee557aa0efc79..c0f0150378434 100644 --- a/plugins/transport-nio/build.gradle +++ b/plugins/transport-nio/build.gradle @@ -50,6 +50,7 @@ dependencies { api "io.netty:netty-handler:${versions.netty}" api "io.netty:netty-resolver:${versions.netty}" api "io.netty:netty-transport:${versions.netty}" + api "io.netty:netty-transport-native-unix-common:${versions.netty}" } tasks.named("dependencyLicenses").configure { @@ -151,10 +152,6 @@ thirdPartyAudit { 'io.netty.internal.tcnative.SessionTicketKey', 'io.netty.internal.tcnative.SniHostNameMatcher', - // from io.netty.channel.unix (netty) - 'io.netty.channel.unix.FileDescriptor', - 'io.netty.channel.unix.UnixChannel', - 'reactor.blockhound.BlockHound$Builder', 'reactor.blockhound.integration.BlockHoundIntegration' ) diff --git a/plugins/transport-nio/licenses/netty-transport-native-unix-common-4.1.114.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-transport-native-unix-common-4.1.114.Final.jar.sha1 new file mode 100644 index 0000000000000..a80b9e51be74b --- /dev/null +++ b/plugins/transport-nio/licenses/netty-transport-native-unix-common-4.1.114.Final.jar.sha1 @@ -0,0 +1 @@ +d1171bb99411f282068f49d780cedf8c9adeabfd \ No newline at end of file diff --git a/plugins/transport-nio/src/internalClusterTest/java/org/opensearch/http/nio/NioPipeliningIT.java b/plugins/transport-nio/src/internalClusterTest/java/org/opensearch/http/nio/NioPipeliningIT.java index 4f26e8ae65259..c4541e3b1c7d3 100644 --- a/plugins/transport-nio/src/internalClusterTest/java/org/opensearch/http/nio/NioPipeliningIT.java +++ b/plugins/transport-nio/src/internalClusterTest/java/org/opensearch/http/nio/NioPipeliningIT.java @@ -61,8 +61,8 @@ public void testThatNioHttpServerSupportsPipelining() throws Exception { TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses(); TransportAddress transportAddress = randomFrom(boundAddresses); - try (NioHttpClient nettyHttpClient = new NioHttpClient()) { - Collection responses = nettyHttpClient.get(transportAddress.address(), requests); + try (NioHttpClient client = NioHttpClient.http()) { + Collection responses = client.get(transportAddress.address(), requests); assertThat(responses, hasSize(5)); Collection opaqueIds = NioHttpClient.returnOpaqueIds(responses); diff --git a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/HttpReadWriteHandler.java index d44515f3dc727..6438cca9cc33d 100644 --- a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/HttpReadWriteHandler.java @@ -32,6 +32,7 @@ package org.opensearch.http.nio; +import org.opensearch.common.Nullable; import org.opensearch.common.unit.TimeValue; import org.opensearch.http.HttpHandlingSettings; import org.opensearch.http.HttpPipelinedRequest; @@ -44,6 +45,8 @@ import org.opensearch.nio.TaskScheduler; import org.opensearch.nio.WriteOperation; +import javax.net.ssl.SSLEngine; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -58,6 +61,7 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.ssl.SslHandler; public class HttpReadWriteHandler implements NioChannelHandler { @@ -77,6 +81,28 @@ public HttpReadWriteHandler( HttpHandlingSettings settings, TaskScheduler taskScheduler, LongSupplier nanoClock + ) { + this( + nioHttpChannel, + transport, + settings, + taskScheduler, + nanoClock, + null, /* no header verifier */ + new HttpContentDecompressor(), + null /* no SSL/TLS */ + ); + } + + HttpReadWriteHandler( + NioHttpChannel nioHttpChannel, + NioHttpServerTransport transport, + HttpHandlingSettings settings, + TaskScheduler taskScheduler, + LongSupplier nanoClock, + @Nullable ChannelHandler headerVerifier, + ChannelHandler decompressor, + @Nullable SSLEngine sslEngine ) { this.nioHttpChannel = nioHttpChannel; this.transport = transport; @@ -85,6 +111,12 @@ public HttpReadWriteHandler( this.readTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(settings.getReadTimeoutMillis()); List handlers = new ArrayList<>(8); + + SslHandler sslHandler = null; + if (sslEngine != null) { + sslHandler = new SslHandler(sslEngine); + } + HttpRequestDecoder decoder = new HttpRequestDecoder( settings.getMaxInitialLineLength(), settings.getMaxHeaderSize(), @@ -92,7 +124,10 @@ public HttpReadWriteHandler( ); decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); handlers.add(decoder); - handlers.add(new HttpContentDecompressor()); + if (headerVerifier != null) { + handlers.add(headerVerifier); + } + handlers.add(decompressor); handlers.add(new HttpResponseEncoder()); handlers.add(new HttpObjectAggregator(settings.getMaxContentLength())); if (settings.isCompression()) { @@ -102,7 +137,7 @@ public HttpReadWriteHandler( handlers.add(new NioHttpResponseCreator()); handlers.add(new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents())); - adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0])); + adaptor = new NettyAdaptor(sslHandler, handlers.toArray(new ChannelHandler[0])); adaptor.addCloseListener((v, e) -> nioHttpChannel.close()); } diff --git a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NettyAdaptor.java b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NettyAdaptor.java index 0b7f4ee7646d1..426690b4b696d 100644 --- a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NettyAdaptor.java +++ b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NettyAdaptor.java @@ -33,6 +33,7 @@ package org.opensearch.http.nio; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.Nullable; import org.opensearch.nio.FlushOperation; import org.opensearch.nio.Page; import org.opensearch.nio.WriteOperation; @@ -49,6 +50,7 @@ import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.ssl.SslHandler; class NettyAdaptor { @@ -56,9 +58,13 @@ class NettyAdaptor { private final LinkedList flushOperations = new LinkedList<>(); NettyAdaptor(ChannelHandler... handlers) { - nettyChannel = new EmbeddedChannel(); - nettyChannel.pipeline().addLast("write_captor", new ChannelOutboundHandlerAdapter() { + this(null, handlers); + } + NettyAdaptor(@Nullable SslHandler sslHandler, ChannelHandler... handlers) { + this.nettyChannel = new EmbeddedChannel(); + + nettyChannel.pipeline().addLast("write_captor", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { // This is a little tricky. The embedded channel will complete the promise once it writes the message @@ -75,12 +81,22 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } }); + if (sslHandler != null) { + nettyChannel.pipeline().addAfter("write_captor", "ssl_handler", sslHandler); + } nettyChannel.pipeline().addLast(handlers); } public void close() throws Exception { assert flushOperations.isEmpty() : "Should close outbound operations before calling close"; + final SslHandler sslHandler = (SslHandler) nettyChannel.pipeline().get("ssl_handler"); + if (sslHandler != null) { + // The nettyChannel.close() or sslHandler.closeOutbound() futures will block indefinitely, + // removing the handler instead from the channel. + nettyChannel.pipeline().remove(sslHandler); + } + ChannelFuture closeFuture = nettyChannel.close(); // This should be safe as we are not a real network channel closeFuture.await(); diff --git a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NioHttpServerTransport.java index ecf9ad9f17f87..9eca5fc87120d 100644 --- a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NioHttpServerTransport.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.Nullable; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -47,6 +48,8 @@ import org.opensearch.http.AbstractHttpServerTransport; import org.opensearch.http.HttpChannel; import org.opensearch.http.HttpServerChannel; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.nio.ssl.SslUtils; import org.opensearch.nio.BytesChannelContext; import org.opensearch.nio.ChannelFactory; import org.opensearch.nio.Config; @@ -56,16 +59,28 @@ import org.opensearch.nio.NioSocketChannel; import org.opensearch.nio.ServerChannelContext; import org.opensearch.nio.SocketChannelContext; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportAdapterProvider; import org.opensearch.transport.nio.NioGroupFactory; import org.opensearch.transport.nio.PageAllocator; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLException; + import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.Collections; +import java.util.List; +import java.util.Optional; import java.util.function.Consumer; +import java.util.stream.Collectors; + +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.HttpContentDecompressor; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; @@ -83,6 +98,9 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { private static final Logger logger = LogManager.getLogger(NioHttpServerTransport.class); + public static final String REQUEST_HEADER_VERIFIER = SecureHttpTransportSettingsProvider.REQUEST_HEADER_VERIFIER; + public static final String REQUEST_DECOMPRESSOR = SecureHttpTransportSettingsProvider.REQUEST_DECOMPRESSOR; + protected final PageAllocator pageAllocator; private final NioGroupFactory nioGroupFactory; @@ -97,6 +115,34 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { private volatile NioGroup nioGroup; private ChannelFactory channelFactory; + private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; + + public NioHttpServerTransport( + Settings settings, + NetworkService networkService, + BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, + ThreadPool threadPool, + NamedXContentRegistry xContentRegistry, + Dispatcher dispatcher, + NioGroupFactory nioGroupFactory, + ClusterSettings clusterSettings, + Tracer tracer + ) { + this( + settings, + networkService, + bigArrays, + pageCacheRecycler, + threadPool, + xContentRegistry, + dispatcher, + nioGroupFactory, + clusterSettings, + null, + tracer + ); + } public NioHttpServerTransport( Settings settings, @@ -108,6 +154,7 @@ public NioHttpServerTransport( Dispatcher dispatcher, NioGroupFactory nioGroupFactory, ClusterSettings clusterSettings, + @Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider, Tracer tracer ) { super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer); @@ -127,6 +174,7 @@ public NioHttpServerTransport( this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings); this.tcpSendBufferSize = Math.toIntExact(SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings).getBytes()); this.tcpReceiveBufferSize = Math.toIntExact(SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings).getBytes()); + this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider; logger.debug( "using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}]," @@ -178,8 +226,8 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws IOExcep return httpServerChannel; } - protected ChannelFactory channelFactory() { - return new HttpChannelFactory(); + protected ChannelFactory channelFactory() throws SSLException { + return new HttpChannelFactory(secureHttpTransportSettingsProvider); } protected void acceptChannel(NioSocketChannel socketChannel) { @@ -187,8 +235,11 @@ protected void acceptChannel(NioSocketChannel socketChannel) { } private class HttpChannelFactory extends ChannelFactory { + private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; + private final ChannelInboundHandlerAdapter headerVerifier; + private final TransportAdapterProvider decompressorProvider; - private HttpChannelFactory() { + private HttpChannelFactory(@Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider) { super( tcpNoDelay, tcpKeepAlive, @@ -199,17 +250,85 @@ private HttpChannelFactory() { tcpSendBufferSize, tcpReceiveBufferSize ); + this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider; + + final List headerVerifiers = getHeaderVerifiers(secureHttpTransportSettingsProvider); + final Optional> decompressorProviderOpt = getDecompressorProvider( + secureHttpTransportSettingsProvider + ); + + // There could be multiple request decompressor providers configured, using the first one + decompressorProviderOpt.ifPresent(p -> logger.debug("Using request decompressor provider: {}", p)); + + if (headerVerifiers.size() > 1) { + throw new IllegalArgumentException( + "Cannot have more than one header verifier configured, supplied " + headerVerifiers.size() + ); + } + + this.headerVerifier = headerVerifiers.isEmpty() ? null : headerVerifiers.get(0); + this.decompressorProvider = decompressorProviderOpt.orElseGet(() -> new TransportAdapterProvider() { + @Override + public String name() { + return REQUEST_DECOMPRESSOR; + } + + @Override + public Optional create(Settings settings, HttpServerTransport transport, Class adapterClass) { + return Optional.empty(); + } + }); + + } + + private List getHeaderVerifiers( + @Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider + ) { + if (secureHttpTransportSettingsProvider == null) { + return Collections.emptyList(); + } + + return secureHttpTransportSettingsProvider.getHttpTransportAdapterProviders(settings) + .stream() + .filter(p -> REQUEST_HEADER_VERIFIER.equalsIgnoreCase(p.name())) + .map(p -> p.create(settings, NioHttpServerTransport.this, ChannelInboundHandlerAdapter.class)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + + private Optional> getDecompressorProvider( + @Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider + ) { + if (secureHttpTransportSettingsProvider == null) { + return Optional.empty(); + } + + return secureHttpTransportSettingsProvider.getHttpTransportAdapterProviders(settings) + .stream() + .filter(p -> REQUEST_DECOMPRESSOR.equalsIgnoreCase(p.name())) + .findFirst(); } @Override - public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel, Config.Socket socketConfig) { + public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel, Config.Socket socketConfig) throws IOException { + SSLEngine engine = null; + if (secureHttpTransportSettingsProvider != null) { + engine = secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(settings, NioHttpServerTransport.this) + .orElseGet(SslUtils::createDefaultServerSSLEngine); + } + NioHttpChannel httpChannel = new NioHttpChannel(channel); HttpReadWriteHandler handler = new HttpReadWriteHandler( httpChannel, NioHttpServerTransport.this, handlingSettings, selector.getTaskScheduler(), - threadPool::relativeTimeInMillis + threadPool::relativeTimeInMillis, + headerVerifier, + decompressorProvider.create(settings, NioHttpServerTransport.this, ChannelInboundHandlerAdapter.class) + .orElseGet(HttpContentDecompressor::new), + engine ); Consumer exceptionHandler = (e) -> onException(httpChannel, e); SocketChannelContext context = new BytesChannelContext( @@ -244,6 +363,5 @@ public NioHttpServerChannel createServerChannel( httpServerChannel.setContext(context); return httpServerChannel; } - } } diff --git a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/SslUtils.java b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/SslUtils.java new file mode 100644 index 0000000000000..afd67f9799273 --- /dev/null +++ b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/SslUtils.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ +package org.opensearch.http.nio.ssl; + +import org.opensearch.OpenSearchSecurityException; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import java.security.NoSuchAlgorithmException; + +public class SslUtils { + private static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" }; + + private SslUtils() { + + } + + public static SSLEngine createDefaultServerSSLEngine() { + try { + final SSLEngine engine = SSLContext.getDefault().createSSLEngine(); + engine.setEnabledProtocols(DEFAULT_SSL_PROTOCOLS); + engine.setUseClientMode(false); + return engine; + } catch (final NoSuchAlgorithmException ex) { + throw new OpenSearchSecurityException("Unable to initialize default server SSL engine", ex); + } + } + + public static SSLEngine createDefaultClientSSLEngine() { + try { + final SSLEngine engine = SSLContext.getDefault().createSSLEngine(); + engine.setEnabledProtocols(DEFAULT_SSL_PROTOCOLS); + engine.setUseClientMode(true); + return engine; + } catch (final NoSuchAlgorithmException ex) { + throw new OpenSearchSecurityException("Unable to initialize default client SSL engine", ex); + } + } +} diff --git a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/package-info.java b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/package-info.java new file mode 100644 index 0000000000000..a67f8247ebd4d --- /dev/null +++ b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * SSL supporting utility classes + */ +package org.opensearch.http.nio.ssl; diff --git a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java index d4be876867651..7707369b59120 100644 --- a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java +++ b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java @@ -47,9 +47,11 @@ import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.HttpServerTransport.Dispatcher; import org.opensearch.http.nio.NioHttpServerTransport; import org.opensearch.plugins.NetworkPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transport; @@ -66,6 +68,7 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin { public static final String NIO_TRANSPORT_NAME = "nio-transport"; public static final String NIO_HTTP_TRANSPORT_NAME = "nio-http-transport"; + public static final String NIO_SECURE_HTTP_TRANSPORT_NAME = "nio-http-transport-secure"; private static final Logger logger = LogManager.getLogger(NioTransportPlugin.class); @@ -140,6 +143,38 @@ public Map> getHttpTransports( ); } + @Override + public Map> getSecureHttpTransports( + Settings settings, + ThreadPool threadPool, + BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, + CircuitBreakerService circuitBreakerService, + NamedXContentRegistry xContentRegistry, + NetworkService networkService, + Dispatcher dispatcher, + ClusterSettings clusterSettings, + SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider, + Tracer tracer + ) { + return Collections.singletonMap( + NIO_SECURE_HTTP_TRANSPORT_NAME, + () -> new NioHttpServerTransport( + settings, + networkService, + bigArrays, + pageCacheRecycler, + threadPool, + xContentRegistry, + dispatcher, + getNioGroupFactory(settings), + clusterSettings, + secureHttpTransportSettingsProvider, + tracer + ) + ); + } + private synchronized NioGroupFactory getNioGroupFactory(Settings settings) { NioGroupFactory nioGroupFactory = groupFactory.get(); if (nioGroupFactory != null) { diff --git a/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpClient.java b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpClient.java index 45e51c6855f79..ff878eb55e411 100644 --- a/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpClient.java +++ b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpClient.java @@ -71,6 +71,7 @@ import java.util.function.Consumer; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelHandler; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -83,6 +84,10 @@ import io.netty.handler.codec.http.HttpRequestEncoder; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseDecoder; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory; import static io.netty.handler.codec.http.HttpHeaderNames.HOST; @@ -92,7 +97,7 @@ /** * Tiny helper to send http requests over nio. */ -class NioHttpClient implements Closeable { +public class NioHttpClient implements Closeable { static Collection returnOpaqueIds(Collection responses) { List list = new ArrayList<>(responses.size()); @@ -105,9 +110,11 @@ static Collection returnOpaqueIds(Collection responses private static final Logger logger = LogManager.getLogger(NioHttpClient.class); private final NioSelectorGroup nioGroup; + private final boolean secure; - NioHttpClient() { + private NioHttpClient(final boolean secure) { try { + this.secure = secure; nioGroup = new NioSelectorGroup( daemonThreadFactory(Settings.EMPTY, "nio-http-client"), 1, @@ -118,6 +125,14 @@ static Collection returnOpaqueIds(Collection responses } } + public static NioHttpClient http() { + return new NioHttpClient(false); + } + + public static NioHttpClient https() { + return new NioHttpClient(true); + } + public Collection get(InetSocketAddress remoteAddress, String... uris) throws InterruptedException { Collection requests = new ArrayList<>(uris.length); for (int i = 0; i < uris.length; i++) { @@ -138,7 +153,8 @@ public final FullHttpResponse send(InetSocketAddress remoteAddress, FullHttpRequ public final NioSocketChannel connect(InetSocketAddress remoteAddress) { ChannelFactory factory = new ClientChannelFactory( new CountDownLatch(0), - new ArrayList<>() + new ArrayList<>(), + secure ); try { NioSocketChannel nioSocketChannel = nioGroup.openChannel(remoteAddress, factory); @@ -160,7 +176,7 @@ private synchronized Collection sendRequests(InetSocketAddress final CountDownLatch latch = new CountDownLatch(requests.size()); final Collection content = Collections.synchronizedList(new ArrayList<>(requests.size())); - ChannelFactory factory = new ClientChannelFactory(latch, content); + ChannelFactory factory = new ClientChannelFactory(latch, content, secure); NioSocketChannel nioSocketChannel = null; try { @@ -196,8 +212,9 @@ private class ClientChannelFactory extends ChannelFactory content; + private final boolean secure; - private ClientChannelFactory(CountDownLatch latch, Collection content) { + private ClientChannelFactory(CountDownLatch latch, Collection content, final boolean secure) { super( NetworkService.TCP_NO_DELAY.get(Settings.EMPTY), NetworkService.TCP_KEEP_ALIVE.get(Settings.EMPTY), @@ -210,12 +227,14 @@ private ClientChannelFactory(CountDownLatch latch, Collection ); this.latch = latch; this.content = content; + this.secure = secure; } @Override - public NioSocketChannel createChannel(NioSelector selector, java.nio.channels.SocketChannel channel, Config.Socket socketConfig) { + public NioSocketChannel createChannel(NioSelector selector, java.nio.channels.SocketChannel channel, Config.Socket socketConfig) + throws IOException { NioSocketChannel nioSocketChannel = new NioSocketChannel(channel); - HttpClientHandler handler = new HttpClientHandler(nioSocketChannel, latch, content); + HttpClientHandler handler = new HttpClientHandler(nioSocketChannel, latch, content, secure); Consumer exceptionHandler = (e) -> { latch.countDown(); onException(e); @@ -249,17 +268,34 @@ private static class HttpClientHandler implements NioChannelHandler { private final CountDownLatch latch; private final Collection content; - private HttpClientHandler(NioSocketChannel channel, CountDownLatch latch, Collection content) { + private HttpClientHandler( + NioSocketChannel channel, + CountDownLatch latch, + Collection content, + final boolean secure + ) throws IOException { this.latch = latch; this.content = content; final int maxContentLength = Math.toIntExact(new ByteSizeValue(100, ByteSizeUnit.MB).getBytes()); List handlers = new ArrayList<>(5); + + SslHandler sslHandler = null; + if (secure) { + sslHandler = new SslHandler( + SslContextBuilder.forClient() + .clientAuth(ClientAuth.NONE) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build() + .newEngine(UnpooledByteBufAllocator.DEFAULT) + ); + } + handlers.add(new HttpResponseDecoder()); handlers.add(new HttpRequestEncoder()); handlers.add(new HttpContentDecompressor()); handlers.add(new HttpObjectAggregator(maxContentLength)); - adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0])); + adaptor = new NettyAdaptor(sslHandler, handlers.toArray(new ChannelHandler[0])); adaptor.addCloseListener((v, e) -> channel.close()); } diff --git a/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpServerTransportTests.java b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpServerTransportTests.java index 09594673de5b2..61b42f2a6b77a 100644 --- a/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpServerTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpServerTransportTests.java @@ -193,7 +193,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, ) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (NioHttpClient client = new NioHttpClient()) { + try (NioHttpClient client = NioHttpClient.http()) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"); request.headers().set(HttpHeaderNames.EXPECT, expectation); HttpUtil.setContentLength(request, contentLength); @@ -310,7 +310,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); // Test pre-flight request - try (NioHttpClient client = new NioHttpClient()) { + try (NioHttpClient client = NioHttpClient.http()) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/"); request.headers().add(CorsHandler.ORIGIN, "test-cors.org"); request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST"); @@ -327,7 +327,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } // Test short-circuited request - try (NioHttpClient client = new NioHttpClient()) { + try (NioHttpClient client = NioHttpClient.http()) { final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); request.headers().add(CorsHandler.ORIGIN, "google.com"); @@ -384,7 +384,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (NioHttpClient client = new NioHttpClient()) { + try (NioHttpClient client = NioHttpClient.http()) { DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip")); final FullHttpResponse response = client.send(remoteAddress.address(), request); @@ -451,7 +451,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (NioHttpClient client = new NioHttpClient()) { + try (NioHttpClient client = NioHttpClient.http()) { final String url = "/" + new String(new byte[maxInitialLineLength], Charset.forName("UTF-8")); final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); @@ -514,7 +514,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); - try (NioHttpClient client = new NioHttpClient()) { + try (NioHttpClient client = NioHttpClient.http()) { NioSocketChannel channel = null; try { CountDownLatch channelClosedLatch = new CountDownLatch(1); diff --git a/plugins/transport-nio/src/test/java/org/opensearch/http/nio/ssl/SecureNioHttpServerTransportTests.java b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/ssl/SecureNioHttpServerTransportTests.java new file mode 100644 index 0000000000000..1adfe0370344c --- /dev/null +++ b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/ssl/SecureNioHttpServerTransportTests.java @@ -0,0 +1,558 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.nio.ssl; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.common.network.NetworkAddress; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.MockBigArrays; +import org.opensearch.common.util.MockPageCacheRecycler; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.http.BindHttpException; +import org.opensearch.http.CorsHandler; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.HttpTransportSettings; +import org.opensearch.http.NullDispatcher; +import org.opensearch.http.nio.NioHttpClient; +import org.opensearch.http.nio.NioHttpServerTransport; +import org.opensearch.nio.NioSocketChannel; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; +import org.opensearch.plugins.TransportExceptionHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.nio.NioGroupFactory; +import org.junit.After; +import org.junit.Before; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLException; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; +import static org.opensearch.core.rest.RestStatus.OK; +import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN; +import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ENABLED; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * Tests for the secure {@link NioHttpServerTransport} class. + */ +public class SecureNioHttpServerTransportTests extends OpenSearchTestCase { + + private NetworkService networkService; + private ThreadPool threadPool; + private MockBigArrays bigArrays; + private MockPageCacheRecycler pageRecycler; + private ClusterSettings clusterSettings; + private SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; + + @Before + public void setup() throws Exception { + networkService = new NetworkService(Collections.emptyList()); + threadPool = new TestThreadPool("test"); + pageRecycler = new MockPageCacheRecycler(Settings.EMPTY); + bigArrays = new MockBigArrays(pageRecycler, new NoneCircuitBreakerService()); + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + secureHttpTransportSettingsProvider = new SecureHttpTransportSettingsProvider() { + @Override + public Optional buildHttpServerExceptionHandler(Settings settings, HttpServerTransport transport) { + return Optional.empty(); + } + + @Override + public Optional buildSecureHttpServerEngine(Settings settings, HttpServerTransport transport) throws SSLException { + try { + SSLEngine engine = SslContextBuilder.forServer( + SecureNioHttpServerTransportTests.class.getResourceAsStream("/certificate.crt"), + SecureNioHttpServerTransportTests.class.getResourceAsStream("/certificate.key") + ).trustManager(InsecureTrustManagerFactory.INSTANCE).build().newEngine(UnpooledByteBufAllocator.DEFAULT); + return Optional.of(engine); + } catch (final IOException ex) { + throw new SSLException(ex); + } + } + }; + } + + @After + public void shutdown() throws Exception { + if (threadPool != null) { + threadPool.shutdownNow(); + } + threadPool = null; + networkService = null; + bigArrays = null; + clusterSettings = null; + } + + /** + * Test that {@link NioHttpServerTransport} supports the "Expect: 100-continue" HTTP header + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectContinueHeader() throws InterruptedException { + final Settings settings = createSettings(); + final int contentLength = randomIntBetween(1, HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).bytesAsInt()); + runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.CONTINUE); + } + + /** + * Test that {@link NioHttpServerTransport} responds to a + * 100-continue expectation with too large a content-length + * with a 413 status. + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedException { + final String key = HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(); + final int maxContentLength = randomIntBetween(1, 104857600); + final Settings settings = createBuilderWithPort().put(key, maxContentLength + "b").build(); + final int contentLength = randomIntBetween(maxContentLength + 1, Integer.MAX_VALUE); + runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE); + } + + /** + * Test that {@link NioHttpServerTransport} responds to an unsupported expectation with a 417 status. + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectUnsupportedExpectation() throws InterruptedException { + Settings settings = createSettings(); + runExpectHeaderTest(settings, "chocolate=yummy", 0, HttpResponseStatus.EXPECTATION_FAILED); + } + + private void runExpectHeaderTest( + final Settings settings, + final String expectation, + final int contentLength, + final HttpResponseStatus expectedStatus + ) throws InterruptedException { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + @Override + public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { + channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))); + } + + @Override + public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(); + } + }; + try ( + NioHttpServerTransport transport = new NioHttpServerTransport( + settings, + networkService, + bigArrays, + pageRecycler, + threadPool, + xContentRegistry(), + dispatcher, + new NioGroupFactory(settings, logger), + clusterSettings, + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + try (NioHttpClient client = NioHttpClient.https()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"); + request.headers().set(HttpHeaderNames.EXPECT, expectation); + HttpUtil.setContentLength(request, contentLength); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(expectedStatus)); + if (expectedStatus.equals(HttpResponseStatus.CONTINUE)) { + final FullHttpRequest continuationRequest = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, + HttpMethod.POST, + "/", + Unpooled.EMPTY_BUFFER + ); + final FullHttpResponse continuationResponse = client.send(remoteAddress.address(), continuationRequest); + try { + assertThat(continuationResponse.status(), is(HttpResponseStatus.OK)); + assertThat( + new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), + is("done") + ); + } finally { + continuationResponse.release(); + } + } + } finally { + response.release(); + } + } + } + } + + public void testBindUnavailableAddress() { + Settings initialSettings = createSettings(); + try ( + NioHttpServerTransport transport = new NioHttpServerTransport( + initialSettings, + networkService, + bigArrays, + pageRecycler, + threadPool, + xContentRegistry(), + new NullDispatcher(), + new NioGroupFactory(Settings.EMPTY, logger), + clusterSettings, + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + Settings settings = Settings.builder() + .put("http.port", remoteAddress.getPort()) + .put("network.host", remoteAddress.getAddress()) + .build(); + try ( + NioHttpServerTransport otherTransport = new NioHttpServerTransport( + settings, + networkService, + bigArrays, + pageRecycler, + threadPool, + xContentRegistry(), + new NullDispatcher(), + new NioGroupFactory(Settings.EMPTY, logger), + clusterSettings, + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start); + assertEquals("Failed to bind to " + NetworkAddress.format(remoteAddress.address()), bindHttpException.getMessage()); + } + } + } + + public void testCorsRequest() throws InterruptedException { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); + throw new AssertionError(); + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(); + } + + }; + + final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true) + .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org") + .build(); + + try ( + NioHttpServerTransport transport = new NioHttpServerTransport( + settings, + networkService, + bigArrays, + pageRecycler, + threadPool, + xContentRegistry(), + dispatcher, + new NioGroupFactory(settings, logger), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + // Test pre-flight request + try (NioHttpClient client = NioHttpClient.https()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/"); + request.headers().add(CorsHandler.ORIGIN, "test-cors.org"); + request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST"); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + assertThat(response.headers().get(CorsHandler.ACCESS_CONTROL_ALLOW_ORIGIN), equalTo("test-cors.org")); + assertThat(response.headers().get(CorsHandler.VARY), equalTo(CorsHandler.ORIGIN)); + assertTrue(response.headers().contains(CorsHandler.DATE)); + } finally { + response.release(); + } + } + + // Test short-circuited request + try (NioHttpClient client = NioHttpClient.https()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + request.headers().add(CorsHandler.ORIGIN, "google.com"); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.FORBIDDEN)); + } finally { + response.release(); + } + } + } + } + + public void testLargeCompressedResponse() throws InterruptedException { + final String responseString = randomAlphaOfLength(4 * 1024 * 1024); + final String url = "/thing"; + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + if (url.equals(request.uri())) { + channel.sendResponse(new BytesRestResponse(OK, responseString)); + } else { + logger.error("--> Unexpected successful uri [{}]", request.uri()); + throw new AssertionError(); + } + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(); + } + + }; + + try ( + NioHttpServerTransport transport = new NioHttpServerTransport( + Settings.EMPTY, + networkService, + bigArrays, + pageRecycler, + threadPool, + xContentRegistry(), + dispatcher, + new NioGroupFactory(Settings.EMPTY, logger), + clusterSettings, + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (NioHttpClient client = NioHttpClient.https()) { + DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip")); + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + byte[] bytes = new byte[response.content().readableBytes()]; + response.content().readBytes(bytes); + assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(responseString)); + } finally { + response.release(); + } + } + } + } + + public void testBadRequest() throws InterruptedException { + final AtomicReference causeReference = new AtomicReference<>(); + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); + throw new AssertionError(); + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + causeReference.set(cause); + try { + final OpenSearchException e = new OpenSearchException("you sent a bad request and you should feel bad"); + channel.sendResponse(new BytesRestResponse(channel, BAD_REQUEST, e)); + } catch (final IOException e) { + throw new AssertionError(e); + } + } + + }; + + final Settings settings; + final int maxInitialLineLength; + final Setting httpMaxInitialLineLengthSetting = HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; + if (randomBoolean()) { + maxInitialLineLength = httpMaxInitialLineLengthSetting.getDefault(Settings.EMPTY).bytesAsInt(); + settings = createSettings(); + } else { + maxInitialLineLength = randomIntBetween(1, 8192); + settings = createBuilderWithPort().put(httpMaxInitialLineLengthSetting.getKey(), maxInitialLineLength + "b").build(); + } + + try ( + NioHttpServerTransport transport = new NioHttpServerTransport( + settings, + networkService, + bigArrays, + pageRecycler, + threadPool, + xContentRegistry(), + dispatcher, + new NioGroupFactory(settings, logger), + clusterSettings, + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (NioHttpClient client = NioHttpClient.https()) { + final String url = "/" + new String(new byte[maxInitialLineLength], Charset.forName("UTF-8")); + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.BAD_REQUEST)); + assertThat( + new String(response.content().array(), Charset.forName("UTF-8")), + containsString("you sent a bad request and you should feel bad") + ); + } finally { + response.release(); + } + } + } + + assertNotNull(causeReference.get()); + assertThat(causeReference.get(), instanceOf(TooLongFrameException.class)); + } + + public void testReadTimeout() throws Exception { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); + throw new AssertionError("Should not have received a dispatched request"); + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError("Should not have received a dispatched request"); + } + + }; + + Settings settings = createBuilderWithPort().put( + HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), + new TimeValue(randomIntBetween(100, 300)) + ).build(); + + try ( + NioHttpServerTransport transport = new NioHttpServerTransport( + settings, + networkService, + bigArrays, + pageRecycler, + threadPool, + xContentRegistry(), + dispatcher, + new NioGroupFactory(settings, logger), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (NioHttpClient client = NioHttpClient.https()) { + NioSocketChannel channel = null; + try { + CountDownLatch channelClosedLatch = new CountDownLatch(1); + channel = client.connect(remoteAddress.address()); + channel.addCloseListener((r, t) -> channelClosedLatch.countDown()); + assertTrue("Channel should be closed due to read timeout", channelClosedLatch.await(1, TimeUnit.MINUTES)); + } finally { + if (channel != null) { + channel.close(); + } + } + } + } + } + + private Settings createSettings() { + return createBuilderWithPort().build(); + } + + private Settings.Builder createBuilderWithPort() { + return Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()); + } +} diff --git a/plugins/transport-nio/src/test/resources/README.txt b/plugins/transport-nio/src/test/resources/README.txt new file mode 100644 index 0000000000000..a4353cee45a97 --- /dev/null +++ b/plugins/transport-nio/src/test/resources/README.txt @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# +# This is README describes how the certificates in this directory were created. +# This file can also be executed as a script +# + +# 1. Create certificate key + +openssl req -x509 -sha256 -newkey rsa:2048 -keyout certificate.key -out certificate.crt -days 1024 -nodes + +# 2. Export the certificate in pkcs12 format + +openssl pkcs12 -export -in certificate.crt -inkey certificate.key -out server.p12 -name netty4-secure -password pass:password + diff --git a/plugins/transport-nio/src/test/resources/certificate.crt b/plugins/transport-nio/src/test/resources/certificate.crt new file mode 100644 index 0000000000000..54c78fdbcf6de --- /dev/null +++ b/plugins/transport-nio/src/test/resources/certificate.crt @@ -0,0 +1,22 @@ +-----BEGIN CERTIFICATE----- +MIIDkzCCAnugAwIBAgIUddAawr5zygcd+Dcn9WVDpO4BJ7YwDQYJKoZIhvcNAQEL +BQAwWTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MB4X +DTI0MDMxNDE5NDQzOVoXDTI3MDEwMjE5NDQzOVowWTELMAkGA1UEBhMCQVUxEzAR +BgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5 +IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAzjOKkg6Iba5zfZ8b/RYw+PGmGEfbdGuuF10Wz4Jmx/Nk4VfDLxdh +TW8VllUL2JD7uPkjABj7pW3awAbvIJ+VGbKqfBr1Nsz0mPPzhT8cfuMH/FDZgQs3 +4HuqDKr0LfC1Kw5E3WF0GVMBDNu0U+nKoeqySeYjGdxDnd3W4cqK5AnUxL0RnIny +Bw7ZuhcU55XndH/Xauro/2EpvJduDsWMdqt7ZfIf1TOmaiQHK+82yb/drVaJbczK +uTpn1Kv2bnzkQEckgq+z1dLNOOyvP2xf+nsziw5ilJe92e5GJOUJYFAlEgUAGpfD +dv6j/gTRYvdJCJItOQEQtektNCAZsoc0wwIDAQABo1MwUTAdBgNVHQ4EFgQUzHts +wIt+zhB/R4U4Do2P6rr0YhkwHwYDVR0jBBgwFoAUzHtswIt+zhB/R4U4Do2P6rr0 +YhkwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAveh870jJX7vt +oLCrdugsyo79pR4f7Nr1kUy3jJrfoaoUmrjiiiHWgT22fGwp7j1GZF2mVfo8YVaK +63YNn5gB2NNZhguPOFC4AdvHRYOKRBOaOvWK8oq7BcJ//18JYI/pPnpgkYvJjqv4 +gFKaZX9qWtujHpAmKiVGs7pwYGNXfixPHRNV4owcfHMIH5dhbbqT49j94xVpjbXs +OymKtFl4kpCE/0LzKFrFcuu55Am1VLBHx2cPpHLOipgUcF5BHFlQ8AXiCMOwfPAw +d22mLB6Gt1oVEpyvQHYd3e04FetEXQ9E8T+NKWZx/8Ucf+IWBYmZBRxch6O83xgk +bAbGzqkbzQ== +-----END CERTIFICATE----- diff --git a/plugins/transport-nio/src/test/resources/certificate.key b/plugins/transport-nio/src/test/resources/certificate.key new file mode 100644 index 0000000000000..228350180935d --- /dev/null +++ b/plugins/transport-nio/src/test/resources/certificate.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDOM4qSDohtrnN9 +nxv9FjD48aYYR9t0a64XXRbPgmbH82ThV8MvF2FNbxWWVQvYkPu4+SMAGPulbdrA +Bu8gn5UZsqp8GvU2zPSY8/OFPxx+4wf8UNmBCzfge6oMqvQt8LUrDkTdYXQZUwEM +27RT6cqh6rJJ5iMZ3EOd3dbhyorkCdTEvRGcifIHDtm6FxTnled0f9dq6uj/YSm8 +l24OxYx2q3tl8h/VM6ZqJAcr7zbJv92tVoltzMq5OmfUq/ZufORARySCr7PV0s04 +7K8/bF/6ezOLDmKUl73Z7kYk5QlgUCUSBQAal8N2/qP+BNFi90kIki05ARC16S00 +IBmyhzTDAgMBAAECggEAVOdiElvLjyX6xeoC00YU6hxOIMdNtHU2HMamwtDV01UD +38mMQ9KjrQelYt4n34drLrHe2IZw75/5J4JzagJrmUY47psHBwaDXItuZRokeJaw +zhLYTEs7OcKRtV+a5WOspUrdzi33aQoFb67zZG3qkpsZyFXrdBV+/fy/Iv+MCvLH +xR0jQ5mzE3cw20R7S4nddChBA/y8oKGOo6QRf2SznC1jL/+yolHvJPEn1v8AUxYm +BMPHxj1O0c4M4IxnJQ3Y5Jy9OaFMyMsFlF1hVhc/3LDDxDyOuBsVsFDicojyrRea +GKngIke0yezy7Wo4NUcp8YQhafonpWVsSJJdOUotcQKBgQD0rihFBXVtcG1d/Vy7 +FvLHrmccD56JNV744LSn2CDM7W1IulNbDUZINdCFqL91u5LpxozeE1FPY1nhwncJ +N7V7XYCaSLCuV1YJzRmUCjnzk2RyopGpzWog3f9uUFGgrk1HGbNAv99k/REya6Iu +IRSkuQhaJOj3bRXzonh0K4GjewKBgQDXvamtCioOUMSP8vq919YMkBw7F+z/fr0p +pamO8HL9eewAUg6N92JQ9kobSo/GptdmdHIjs8LqnS5C3H13GX5Qlf5GskOlCpla +V55ElaSp0gvKwWE168U7gQH4etPQAXXJrOGFaGbPj9W81hTUud7HVE88KYdfWTBo +I7TuE25tWQKBgBRjcr2Vn9xXsvVTCGgamG5lLPhcoNREGz7X0pXt34XT/vhBdnKu +331i5pZMom+YCrzqK5DRwUPBPpseTjb5amj2OKIijn5ojqXQbmI0m/GdBZC71TF2 +CXLlrMQvcy3VeGEFVjd+BYpvwAAYkfIQFZ1IQdbpHnSHpX2guzLK8UmDAoGBANUy +PIcf0EetUVHfkCIjNQfdMcjD8BTcLhsF9vWmcDxFTA9VB8ULf0D64mjt2f85yQsa +b+EQN8KZ6alxMxuLOeRxFYLPj0F9o+Y/R8wHBV48kCKhz2r1v0b6SfQ/jSm1B61x +BrxLW64qOdIOzS8bLyhUDKkrcPesr8V548aRtUKhAoGBAKlNJFd8BCGKD9Td+3dE +oP1iHTX5XZ+cQIqL0e+GMQlK4HnQP566DFZU5/GHNNAfmyxd5iSRwhTqPMHRAmOb +pqQwsyufx0dFeIBxeSO3Z6jW5h2sl4nBipZpw9bzv6EBL1xRr0SfMNZzdnf4JFzc +0htGo/VO93Z2pv8w7uGUz1nN +-----END PRIVATE KEY----- diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java index 0953e51484bd3..8d20650d76583 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java @@ -181,7 +181,7 @@ private List processRequestsWithBody( private List sendRequests( final InetSocketAddress remoteAddress, final Collection requests, - boolean orderer + boolean ordered ) { final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); try { @@ -209,7 +209,7 @@ private List sendRequests( ) .toArray(Mono[]::new); - if (orderer == false) { + if (ordered == false) { return ParallelFlux.from(monos).sequential().collectList().block(); } else { return Flux.concat(monos).flatMapSequential(r -> Mono.just(r)).collectList().block(); diff --git a/server/src/main/java/org/opensearch/plugins/SecureHttpTransportSettingsProvider.java b/server/src/main/java/org/opensearch/plugins/SecureHttpTransportSettingsProvider.java index ff86cbc04e240..b7a47b0f4c742 100644 --- a/server/src/main/java/org/opensearch/plugins/SecureHttpTransportSettingsProvider.java +++ b/server/src/main/java/org/opensearch/plugins/SecureHttpTransportSettingsProvider.java @@ -27,6 +27,16 @@ */ @ExperimentalApi public interface SecureHttpTransportSettingsProvider { + /** + * The well-known name of header verifier {@link TransportAdapterProvider} provider instance + */ + final String REQUEST_HEADER_VERIFIER = "HeaderVerifier"; + + /** + * The well-known name of request decompressor {@link TransportAdapterProvider} provider instance + */ + final String REQUEST_DECOMPRESSOR = "RequestDecompressor"; + /** * Collection of additional {@link TransportAdapterProvider}s that are specific to particular HTTP transport * @param settings settings From e07499a771afbc335e1f7f08a82f8197e5826939 Mon Sep 17 00:00:00 2001 From: Robson Araujo Date: Tue, 5 Nov 2024 11:08:13 -0800 Subject: [PATCH 05/15] Improve performance for resolving derived fields (#16564) Doing the type check before the string comparison makes it much faster to resolve derived fields. Signed-off-by: Robson Araujo --- .../opensearch/index/mapper/DefaultDerivedFieldResolver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/mapper/DefaultDerivedFieldResolver.java b/server/src/main/java/org/opensearch/index/mapper/DefaultDerivedFieldResolver.java index 4dd17703b6f55..1e8ef4134a8e7 100644 --- a/server/src/main/java/org/opensearch/index/mapper/DefaultDerivedFieldResolver.java +++ b/server/src/main/java/org/opensearch/index/mapper/DefaultDerivedFieldResolver.java @@ -72,7 +72,7 @@ public Set resolvePattern(String pattern) { Set derivedFields = new HashSet<>(); if (queryShardContext != null && queryShardContext.getMapperService() != null) { for (MappedFieldType fieldType : queryShardContext.getMapperService().fieldTypes()) { - if (Regex.simpleMatch(pattern, fieldType.name()) && fieldType instanceof DerivedFieldType) { + if (fieldType instanceof DerivedFieldType && Regex.simpleMatch(pattern, fieldType.name())) { derivedFields.add(fieldType.name()); } } From 4213cc27305c37ea71e5b5a5addd17e5383e8029 Mon Sep 17 00:00:00 2001 From: Finn Date: Tue, 5 Nov 2024 13:02:07 -0800 Subject: [PATCH 06/15] Make cacheEntry.getIndexInput() privileged when fetching blobs from remote snapshot (#16544) * Make cacheEntry.getIndexInput() privileged when fetching blobs from remote store Signed-off-by: Finn Carroll * Rebase Signed-off-by: Finn Carroll * Spotless apply Signed-off-by: Finn Carroll * Clean up doPrivileged calls Signed-off-by: Finn Carroll * Comment Signed-off-by: Finn Carroll * Move fetchBlob to PrivilegedExceptionAction. Catch and unwrap IOException. Signed-off-by: Finn Carroll * Unused import Signed-off-by: Finn Carroll * Update server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java Co-authored-by: Andriy Redko Signed-off-by: Finn * Typo 'thrown'. Catch and throw unknown exception as IOException. Signed-off-by: Finn Carroll --------- Signed-off-by: Finn Carroll Signed-off-by: Finn Co-authored-by: Andriy Redko --- CHANGELOG.md | 1 + .../store/remote/utils/TransferManager.java | 64 +++++++++++-------- 2 files changed, 40 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bba62e97a49e0..b94483c42c6f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remove resource usages object from search response headers ([#16532](https://github.com/opensearch-project/OpenSearch/pull/16532)) - Support retrieving doc values of unsigned long field ([#16543](https://github.com/opensearch-project/OpenSearch/pull/16543)) - Fix rollover alias supports restored searchable snapshot index([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483)) +- Fix permissions error on scripted query against remote snapshot ([#16544](https://github.com/opensearch-project/OpenSearch/pull/16544)) ### Security diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index 94c25202ac90c..77a8ccfafbac2 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -24,7 +24,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.security.AccessController; -import java.security.PrivilegedAction; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,39 +57,52 @@ public TransferManager(final StreamReader streamReader, final FileCache fileCach /** * Given a blobFetchRequestList, return it's corresponding IndexInput. + * + * Note: Scripted queries/aggs may trigger a blob fetch within a new security context. + * As such the following operations require elevated permissions. + * + * cacheEntry.getIndexInput() downloads new blobs from the remote store to local fileCache. + * fileCache.compute() as inserting into the local fileCache may trigger an eviction. + * * @param blobFetchRequest to fetch * @return future of IndexInput augmented with internal caching maintenance tasks */ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException { - final Path key = blobFetchRequest.getFilePath(); logger.trace("fetchBlob called for {}", key.toString()); - // We need to do a privileged action here in order to fetch from remote - // and write/evict from local file cache in case this is invoked as a side - // effect of a plugin (such as a scripted search) that doesn't have the - // necessary permissions. - final CachedIndexInput cacheEntry = AccessController.doPrivileged((PrivilegedAction) () -> { - return fileCache.compute(key, (path, cachedIndexInput) -> { - if (cachedIndexInput == null || cachedIndexInput.isClosed()) { - logger.trace("Transfer Manager - IndexInput closed or not in cache"); - // Doesn't exist or is closed, either way create a new one - return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); - } else { - logger.trace("Transfer Manager - Already in cache"); - // already in the cache and ready to be used (open) - return cachedIndexInput; + try { + return AccessController.doPrivileged((PrivilegedExceptionAction) () -> { + CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { + if (cachedIndexInput == null || cachedIndexInput.isClosed()) { + logger.trace("Transfer Manager - IndexInput closed or not in cache"); + // Doesn't exist or is closed, either way create a new one + return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); + } else { + logger.trace("Transfer Manager - Already in cache"); + // already in the cache and ready to be used (open) + return cachedIndexInput; + } + }); + + // Cache entry was either retrieved from the cache or newly added, either + // way the reference count has been incremented by one. We can only + // decrement this reference _after_ creating the clone to be returned. + try { + return cacheEntry.getIndexInput().clone(); + } finally { + fileCache.decRef(key); } }); - }); - - // Cache entry was either retrieved from the cache or newly added, either - // way the reference count has been incremented by one. We can only - // decrement this reference _after_ creating the clone to be returned. - try { - return cacheEntry.getIndexInput().clone(); - } finally { - fileCache.decRef(key); + } catch (PrivilegedActionException e) { + final Exception cause = e.getException(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new IOException(cause); + } } } From 034bd2b6483c180b4a4439d62452cc50198c37fb Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 6 Nov 2024 16:02:00 +0800 Subject: [PATCH 07/15] Bump com.google.apis:google-api-services-compute from v1-rev20241015-2.0.0 to v1-rev20241021-2.0.0 in /plugins/discovery-gce (#16548) * Bump com.google.apis:google-api-services-compute Bumps com.google.apis:google-api-services-compute from v1-rev20241015-2.0.0 to v1-rev20241021-2.0.0. --- updated-dependencies: - dependency-name: com.google.apis:google-api-services-compute dependency-type: direct:production ... Signed-off-by: dependabot[bot] * Updating SHAs Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 2 +- plugins/discovery-gce/build.gradle | 2 +- .../google-api-services-compute-v1-rev20241015-2.0.0.jar.sha1 | 1 - .../google-api-services-compute-v1-rev20241021-2.0.0.jar.sha1 | 1 + 4 files changed, 3 insertions(+), 3 deletions(-) delete mode 100644 plugins/discovery-gce/licenses/google-api-services-compute-v1-rev20241015-2.0.0.jar.sha1 create mode 100644 plugins/discovery-gce/licenses/google-api-services-compute-v1-rev20241021-2.0.0.jar.sha1 diff --git a/CHANGELOG.md b/CHANGELOG.md index b94483c42c6f4..6d9cf50d0da5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Dependencies - Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521)) -- Bump `com.google.apis:google-api-services-compute` from v1-rev20240407-2.0.0 to v1-rev20241015-2.0.0 ([#16502](https://github.com/opensearch-project/OpenSearch/pull/16502)) +- Bump `com.google.apis:google-api-services-compute` from v1-rev20240407-2.0.0 to v1-rev20241021-2.0.0 ([#16502](https://github.com/opensearch-project/OpenSearch/pull/16502), [#16548](https://github.com/opensearch-project/OpenSearch/pull/16548)) - Bump `com.azure:azure-storage-blob` from 12.23.0 to 12.28.1 ([#16501](https://github.com/opensearch-project/OpenSearch/pull/16501)) - Bump `org.apache.hadoop:hadoop-minicluster` from 3.4.0 to 3.4.1 ([#16550](https://github.com/opensearch-project/OpenSearch/pull/16550)) diff --git a/plugins/discovery-gce/build.gradle b/plugins/discovery-gce/build.gradle index 4e05544a33f1d..5f4670357f927 100644 --- a/plugins/discovery-gce/build.gradle +++ b/plugins/discovery-gce/build.gradle @@ -18,7 +18,7 @@ opensearchplugin { } dependencies { - api "com.google.apis:google-api-services-compute:v1-rev20241015-2.0.0" + api "com.google.apis:google-api-services-compute:v1-rev20241021-2.0.0" api "com.google.api-client:google-api-client:1.35.2" api "com.google.oauth-client:google-oauth-client:1.36.0" api "com.google.http-client:google-http-client:${versions.google_http_client}" diff --git a/plugins/discovery-gce/licenses/google-api-services-compute-v1-rev20241015-2.0.0.jar.sha1 b/plugins/discovery-gce/licenses/google-api-services-compute-v1-rev20241015-2.0.0.jar.sha1 deleted file mode 100644 index 1de9a570242fd..0000000000000 --- a/plugins/discovery-gce/licenses/google-api-services-compute-v1-rev20241015-2.0.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -83d293916d59ced480e48fd8c0aefb643e27566c \ No newline at end of file diff --git a/plugins/discovery-gce/licenses/google-api-services-compute-v1-rev20241021-2.0.0.jar.sha1 b/plugins/discovery-gce/licenses/google-api-services-compute-v1-rev20241021-2.0.0.jar.sha1 new file mode 100644 index 0000000000000..309d10035f35a --- /dev/null +++ b/plugins/discovery-gce/licenses/google-api-services-compute-v1-rev20241021-2.0.0.jar.sha1 @@ -0,0 +1 @@ +cc3bd864ec5ac819699ea24a64109bfda42cb55c \ No newline at end of file From 9f790ee1e89063fd38501e64faf1df7109b3f4ec Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Wed, 6 Nov 2024 11:48:05 -0800 Subject: [PATCH 08/15] Fix non-x64 bwc build targets (#16575) There were a few issues here: the '-x64' suffix was being unconditionally appeneded, debian uses underscores not hyphens, and the rpm target uses the '.86_64' suffix. Signed-off-by: Andrew Ross --- .../InternalDistributionBwcSetupPlugin.java | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/buildSrc/src/main/java/org/opensearch/gradle/internal/InternalDistributionBwcSetupPlugin.java b/buildSrc/src/main/java/org/opensearch/gradle/internal/InternalDistributionBwcSetupPlugin.java index 0502280cb69ad..846c7e0d46b70 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/internal/InternalDistributionBwcSetupPlugin.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/internal/InternalDistributionBwcSetupPlugin.java @@ -181,15 +181,19 @@ private static List resolveArchiveProjects(File checkoutDir if (name.contains("zip") || name.contains("tar")) { int index = name.lastIndexOf('-'); String baseName = name.substring(0, index); - classifier = "-" + baseName + "-x64"; + classifier = "-" + baseName; + // The x64 variants do not have the architecture built into the task name, so it needs to be appended + if (name.equals("darwin-tar") || name.equals("linux-tar") || name.equals("windows-zip")) { + classifier += "-x64"; + } extension = name.substring(index + 1); if (extension.equals("tar")) { extension += ".gz"; } } else if (name.contains("deb")) { - classifier = "-amd64"; + classifier = "_amd64"; } else if (name.contains("rpm")) { - classifier = "-x64"; + classifier = ".x86_64"; } } else { extension = name.substring(4); @@ -256,9 +260,21 @@ private static class DistributionProject { this.name = name; this.projectPath = baseDir + "/" + name; if (version.onOrAfter("1.1.0")) { + // Deb uses underscores (I don't know why...): + // https://github.com/opensearch-project/OpenSearch/blob/f6d9a86f0e2e8241fd58b7e8b6cdeaf931b5108f/distribution/packages/build.gradle#L139 + final String separator = name.equals("deb") ? "_" : "-"; this.distFile = new File( checkoutDir, - baseDir + "/" + name + "/build/distributions/opensearch-min-" + version + "-SNAPSHOT" + classifier + "." + extension + baseDir + + "/" + + name + + "/build/distributions/opensearch-min" + + separator + + version + + "-SNAPSHOT" + + classifier + + "." + + extension ); } else { this.distFile = new File( From aa5c39bbbce5bfcb06f4892ff5d6ccaea79126b2 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Thu, 7 Nov 2024 10:44:40 -0500 Subject: [PATCH 09/15] Detect Breaking Changes check does not fail on new method added to an @PublicApi interface (#16585) Signed-off-by: Andriy Redko --- server/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/server/build.gradle b/server/build.gradle index c19e171c90f96..d3c7d4089125c 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -414,6 +414,7 @@ tasks.register("japicmp", me.champeau.gradle.japicmp.JapicmpTask) { onlyModified = true failOnModification = true ignoreMissingClasses = true + failOnSourceIncompatibility = true annotationIncludes = ['@org.opensearch.common.annotation.PublicApi', '@org.opensearch.common.annotation.DeprecatedApi'] annotationExcludes = ['@org.opensearch.common.annotation.InternalApi'] txtOutputFile = layout.buildDirectory.file("reports/java-compatibility/report.txt") From 9b7681c1e56db5dd61787bd6f1ff9015781a8717 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Thu, 7 Nov 2024 10:29:42 -0800 Subject: [PATCH 10/15] Make IndexStoreListener a pluggable interface (#16583) Signed-off-by: Jay Deng --- CHANGELOG.md | 3 +- .../org/opensearch/env/NodeEnvironment.java | 15 +--- .../index/store/IndexStoreListener.java | 73 +++++++++++++++++++ .../remote/filecache/FileCacheCleaner.java | 3 +- .../main/java/org/opensearch/node/Node.java | 22 +++++- .../opensearch/plugins/IndexStorePlugin.java | 9 +++ .../opensearch/env/NodeEnvironmentTests.java | 42 ++++++++--- 7 files changed, 138 insertions(+), 29 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/store/IndexStoreListener.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d9cf50d0da5a..60535b2cca895 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795)) - Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471)) - Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284)) -- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483/files)) +- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483)) +- Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583)) ### Dependencies - Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521)) diff --git a/server/src/main/java/org/opensearch/env/NodeEnvironment.java b/server/src/main/java/org/opensearch/env/NodeEnvironment.java index 709c0eba4f57f..5c6e44d063dd7 100644 --- a/server/src/main/java/org/opensearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/opensearch/env/NodeEnvironment.java @@ -71,6 +71,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.FsDirectoryFactory; +import org.opensearch.index.store.IndexStoreListener; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsProbe; import org.opensearch.monitor.jvm.JvmInfo; @@ -1412,18 +1413,4 @@ private static void tryWriteTempFile(Path path) throws IOException { } } } - - /** - * A listener that is executed on per-index and per-shard store events, like deleting shard path - * - * @opensearch.internal - */ - public interface IndexStoreListener { - default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {} - - default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {} - - IndexStoreListener EMPTY = new IndexStoreListener() { - }; - } } diff --git a/server/src/main/java/org/opensearch/index/store/IndexStoreListener.java b/server/src/main/java/org/opensearch/index/store/IndexStoreListener.java new file mode 100644 index 0000000000000..5a8dd28d43bbc --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/IndexStoreListener.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexSettings; + +import java.util.Collections; +import java.util.List; + +/** + * A listener that is executed on per-index and per-shard store events, like deleting shard path + * + * @opensearch.api + */ +@PublicApi(since = "2.19.0") +public interface IndexStoreListener { + default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {} + + default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {} + + IndexStoreListener EMPTY = new IndexStoreListener() { + }; + + /** + * A Composite listener that multiplexes calls to each of the listeners methods. + * + * @opensearch.api + */ + @PublicApi(since = "2.19.0") + final class CompositeIndexStoreListener implements IndexStoreListener { + private final List listeners; + private final static Logger logger = LogManager.getLogger(CompositeIndexStoreListener.class); + + public CompositeIndexStoreListener(List listeners) { + this.listeners = Collections.unmodifiableList(listeners); + } + + @Override + public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) { + for (IndexStoreListener listener : listeners) { + try { + listener.beforeShardPathDeleted(shardId, indexSettings, env); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("beforeShardPathDeleted listener [{}] failed", listener), e); + } + } + } + + @Override + public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) { + for (IndexStoreListener listener : listeners) { + try { + listener.beforeIndexPathDeleted(index, indexSettings, env); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("beforeIndexPathDeleted listener [{}] failed", listener), e); + } + } + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java index 0261ab24dfa7a..3cdd41b94a5e9 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java @@ -18,6 +18,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.IndexStoreListener; import java.io.IOException; import java.nio.file.DirectoryStream; @@ -33,7 +34,7 @@ * * @opensearch.internal */ -public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener { +public class FileCacheCleaner implements IndexStoreListener { private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class); private final Provider fileCacheProvider; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e74fca60b0201..c78ee6711dcda 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -157,6 +157,7 @@ import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.remote.RemoteIndexPathUploader; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; +import org.opensearch.index.store.IndexStoreListener; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheCleaner; @@ -548,10 +549,27 @@ protected Node( */ this.environment = new Environment(settings, initialEnvironment.configDir(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings)); Environment.assertEquivalent(initialEnvironment, this.environment); + Stream indexStoreListenerStream = pluginsService.filterPlugins(IndexStorePlugin.class) + .stream() + .map(IndexStorePlugin::getIndexStoreListener) + .filter(Optional::isPresent) + .map(Optional::get); + // FileCache is only initialized on search nodes, so we only create FileCacheCleaner on search nodes as well if (DiscoveryNode.isSearchNode(settings) == false) { - nodeEnvironment = new NodeEnvironment(tmpSettings, environment); + nodeEnvironment = new NodeEnvironment( + settings, + environment, + new IndexStoreListener.CompositeIndexStoreListener(indexStoreListenerStream.collect(Collectors.toList())) + ); } else { - nodeEnvironment = new NodeEnvironment(settings, environment, new FileCacheCleaner(this::fileCache)); + nodeEnvironment = new NodeEnvironment( + settings, + environment, + new IndexStoreListener.CompositeIndexStoreListener( + Stream.concat(indexStoreListenerStream, Stream.of(new FileCacheCleaner(this::fileCache))) + .collect(Collectors.toList()) + ) + ); } logger.info( "node name [{}], node ID [{}], cluster name [{}], roles {}", diff --git a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java index ebd5717a00319..f0df8a122ed7d 100644 --- a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java @@ -39,11 +39,13 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.IndexStoreListener; import org.opensearch.indices.recovery.RecoveryState; import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.Optional; /** * A plugin that provides alternative directory implementations. @@ -105,4 +107,11 @@ interface RecoveryStateFactory { default Map getRecoveryStateFactories() { return Collections.emptyMap(); } + + /** + * The {@link IndexStoreListener}s for this plugin which are triggered upon shard/index path deletion + */ + default Optional getIndexStoreListener() { + return Optional.empty(); + } } diff --git a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java index 962eb743dca6e..3ee9e859c198f 100644 --- a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java @@ -45,6 +45,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.gateway.MetadataStateFormat; import org.opensearch.index.IndexSettings; +import org.opensearch.index.store.IndexStoreListener; import org.opensearch.node.Node; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.NodeRoles; @@ -360,24 +361,39 @@ protected void doRun() throws Exception { } public void testIndexStoreListener() throws Exception { - final AtomicInteger shardCounter = new AtomicInteger(0); - final AtomicInteger indexCounter = new AtomicInteger(0); + final AtomicInteger shardCounter1 = new AtomicInteger(0); + final AtomicInteger shardCounter2 = new AtomicInteger(0); + final AtomicInteger indexCounter1 = new AtomicInteger(0); + final AtomicInteger indexCounter2 = new AtomicInteger(0); final Index index = new Index("foo", "fooUUID"); final ShardId shardId = new ShardId(index, 0); - final NodeEnvironment.IndexStoreListener listener = new NodeEnvironment.IndexStoreListener() { + final IndexStoreListener listener1 = new IndexStoreListener() { @Override public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) { assertEquals(shardId, inShardId); - shardCounter.incrementAndGet(); + shardCounter1.incrementAndGet(); } @Override public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) { assertEquals(index, inIndex); - indexCounter.incrementAndGet(); + indexCounter1.incrementAndGet(); } }; - final NodeEnvironment env = newNodeEnvironment(listener); + final IndexStoreListener listener2 = new IndexStoreListener() { + @Override + public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) { + assertEquals(shardId, inShardId); + shardCounter2.incrementAndGet(); + } + + @Override + public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) { + assertEquals(index, inIndex); + indexCounter2.incrementAndGet(); + } + }; + final NodeEnvironment env = newNodeEnvironment(new IndexStoreListener.CompositeIndexStoreListener(List.of(listener1, listener2))); for (Path path : env.indexPaths(index)) { Files.createDirectories(path.resolve("0")); @@ -386,26 +402,30 @@ public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, N for (Path path : env.indexPaths(index)) { assertTrue(Files.exists(path.resolve("0"))); } - assertEquals(0, shardCounter.get()); + assertEquals(0, shardCounter1.get()); + assertEquals(0, shardCounter2.get()); env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings); for (Path path : env.indexPaths(index)) { assertFalse(Files.exists(path.resolve("0"))); } - assertEquals(1, shardCounter.get()); + assertEquals(1, shardCounter1.get()); + assertEquals(1, shardCounter2.get()); for (Path path : env.indexPaths(index)) { assertTrue(Files.exists(path)); } - assertEquals(0, indexCounter.get()); + assertEquals(0, indexCounter1.get()); + assertEquals(0, indexCounter2.get()); env.deleteIndexDirectorySafe(index, 5000, idxSettings); for (Path path : env.indexPaths(index)) { assertFalse(Files.exists(path)); } - assertEquals(1, indexCounter.get()); + assertEquals(1, indexCounter1.get()); + assertEquals(1, indexCounter2.get()); assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty()); env.close(); } @@ -680,7 +700,7 @@ public NodeEnvironment newNodeEnvironment() throws IOException { return newNodeEnvironment(Settings.EMPTY); } - public NodeEnvironment newNodeEnvironment(NodeEnvironment.IndexStoreListener listener) throws IOException { + public NodeEnvironment newNodeEnvironment(IndexStoreListener listener) throws IOException { Settings build = buildEnvSettings(Settings.EMPTY); return new NodeEnvironment(build, TestEnvironment.newEnvironment(build), listener); } From 5909e1ad30ae00476d121536a5cb415eafc15a9e Mon Sep 17 00:00:00 2001 From: "mend-for-github-com[bot]" <50673670+mend-for-github-com[bot]@users.noreply.github.com> Date: Thu, 7 Nov 2024 14:56:58 -0500 Subject: [PATCH 11/15] Update dependency org.apache.zookeeper:zookeeper to v3.9.3 (#16593) Co-authored-by: mend-for-github-com[bot] <50673670+mend-for-github-com[bot]@users.noreply.github.com> --- test/fixtures/hdfs-fixture/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index 18bcee8b338fc..8a402879970d7 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -71,7 +71,7 @@ dependencies { api "org.jetbrains.kotlin:kotlin-stdlib:${versions.kotlin}" api "org.eclipse.jetty:jetty-server:${versions.jetty}" api "org.eclipse.jetty.websocket:javax-websocket-server-impl:${versions.jetty}" - api 'org.apache.zookeeper:zookeeper:3.9.2' + api 'org.apache.zookeeper:zookeeper:3.9.3' api "org.apache.commons:commons-text:1.12.0" api "commons-net:commons-net:3.11.1" api "ch.qos.logback:logback-core:1.5.12" From e68838819710d7040cf2b591590285f1b86f0da0 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Fri, 8 Nov 2024 10:42:53 -0500 Subject: [PATCH 12/15] [AUTO] [main] Add bwc version 2.18.1. (#16573) * Add bwc version 2.18.1 Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * Update Version.java Signed-off-by: Andriy Redko --------- Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Signed-off-by: Andriy Redko Co-authored-by: opensearch-ci-bot <83309141+opensearch-ci-bot@users.noreply.github.com> Co-authored-by: Andriy Redko --- .ci/bwcVersions | 1 + libs/core/src/main/java/org/opensearch/Version.java | 1 + 2 files changed, 2 insertions(+) diff --git a/.ci/bwcVersions b/.ci/bwcVersions index 17bb5a7df9b21..d1b4e4c509cb9 100644 --- a/.ci/bwcVersions +++ b/.ci/bwcVersions @@ -41,4 +41,5 @@ BWC_VERSION: - "2.17.1" - "2.17.2" - "2.18.0" + - "2.18.1" - "2.19.0" \ No newline at end of file diff --git a/libs/core/src/main/java/org/opensearch/Version.java b/libs/core/src/main/java/org/opensearch/Version.java index 4d685e3bc654a..ec0a18dbbf882 100644 --- a/libs/core/src/main/java/org/opensearch/Version.java +++ b/libs/core/src/main/java/org/opensearch/Version.java @@ -112,6 +112,7 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_2_17_1 = new Version(2170199, org.apache.lucene.util.Version.LUCENE_9_11_1); public static final Version V_2_17_2 = new Version(2170299, org.apache.lucene.util.Version.LUCENE_9_11_1); public static final Version V_2_18_0 = new Version(2180099, org.apache.lucene.util.Version.LUCENE_9_12_0); + public static final Version V_2_18_1 = new Version(2180199, org.apache.lucene.util.Version.LUCENE_9_12_0); public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_12_0); public static final Version V_2_19_0 = new Version(2190099, org.apache.lucene.util.Version.LUCENE_9_12_0); public static final Version CURRENT = V_3_0_0; From 10873f16e43780dbac4bf879e3324285461581cc Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Mon, 11 Nov 2024 16:45:43 +0530 Subject: [PATCH 13/15] Increase segrep pressure checkpoint default limit to 10 (#16577) Signed-off-by: Gaurav Bafna --- CHANGELOG.md | 1 + .../opensearch/index/SegmentReplicationPressureService.java | 2 +- .../index/SegmentReplicationPressureServiceTests.java | 3 +++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60535b2cca895..a0529d8fa6b63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795)) - Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471)) - Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284)) +- Increase segrep pressure checkpoint default limit to 30 ([#16577](https://github.com/opensearch-project/OpenSearch/pull/16577/files)) - Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483)) - Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583)) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 297fe093f7f4e..03b162a9c1755 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -60,7 +60,7 @@ public class SegmentReplicationPressureService implements Closeable { public static final Setting MAX_INDEXING_CHECKPOINTS = Setting.intSetting( "segrep.pressure.checkpoint.limit", - 4, + 30, 1, Setting.Property.Dynamic, Setting.Property.NodeScope diff --git a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java index a9725f638cc53..166c0e16bfe8b 100644 --- a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java @@ -33,6 +33,7 @@ import org.mockito.stubbing.Answer; import static java.util.Arrays.asList; +import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS; import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING; import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_BACKPRESSURE_SETTING; import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED; @@ -53,6 +54,7 @@ public class SegmentReplicationPressureServiceTests extends OpenSearchIndexLevel .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) .put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueSeconds(5)) + .put(MAX_INDEXING_CHECKPOINTS.getKey(), 4) .build(); public void testIsSegrepLimitBreached() throws Exception { @@ -200,6 +202,7 @@ public void testFailStaleReplicaTask() throws Exception { .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) .put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueMillis(10)) .put(MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING.getKey(), TimeValue.timeValueMillis(20)) + .put(MAX_INDEXING_CHECKPOINTS.getKey(), 4) .build(); try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { From 607a08e465014a9f8615ee30f3d5d402284ea9ff Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Nov 2024 09:56:42 -0500 Subject: [PATCH 14/15] Bump lycheeverse/lychee-action from 2.0.2 to 2.1.0 (#16610) * Bump lycheeverse/lychee-action from 2.0.2 to 2.1.0 Bumps [lycheeverse/lychee-action](https://github.com/lycheeverse/lychee-action) from 2.0.2 to 2.1.0. - [Release notes](https://github.com/lycheeverse/lychee-action/releases) - [Commits](https://github.com/lycheeverse/lychee-action/compare/v2.0.2...v2.1.0) --- updated-dependencies: - dependency-name: lycheeverse/lychee-action dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- .github/workflows/links.yml | 2 +- CHANGELOG.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/links.yml b/.github/workflows/links.yml index cadbe71bb6ea8..3697750dab97a 100644 --- a/.github/workflows/links.yml +++ b/.github/workflows/links.yml @@ -13,7 +13,7 @@ jobs: - uses: actions/checkout@v4 - name: lychee Link Checker id: lychee - uses: lycheeverse/lychee-action@v2.0.2 + uses: lycheeverse/lychee-action@v2.1.0 with: args: --accept=200,403,429 --exclude-mail **/*.html **/*.md **/*.txt **/*.json --exclude-file .lychee.excludes fail: true diff --git a/CHANGELOG.md b/CHANGELOG.md index a0529d8fa6b63..e95a95990beaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `com.google.apis:google-api-services-compute` from v1-rev20240407-2.0.0 to v1-rev20241021-2.0.0 ([#16502](https://github.com/opensearch-project/OpenSearch/pull/16502), [#16548](https://github.com/opensearch-project/OpenSearch/pull/16548)) - Bump `com.azure:azure-storage-blob` from 12.23.0 to 12.28.1 ([#16501](https://github.com/opensearch-project/OpenSearch/pull/16501)) - Bump `org.apache.hadoop:hadoop-minicluster` from 3.4.0 to 3.4.1 ([#16550](https://github.com/opensearch-project/OpenSearch/pull/16550)) +- Bump `lycheeverse/lychee-action` from 2.0.2 to 2.1.0 ([#16610](https://github.com/opensearch-project/OpenSearch/pull/16610)) ### Changed From 6e34a8024a2b884143f101f03e6ebffab9eed6e6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Nov 2024 12:25:47 -0500 Subject: [PATCH 15/15] Bump me.champeau.gradle.japicmp from 0.4.4 to 0.4.5 in /server (#16614) * Bump me.champeau.gradle.japicmp from 0.4.4 to 0.4.5 in /server Bumps me.champeau.gradle.japicmp from 0.4.4 to 0.4.5. --- updated-dependencies: - dependency-name: me.champeau.gradle.japicmp dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 1 + server/build.gradle | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e95a95990beaf..d3086096cb8f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `com.azure:azure-storage-blob` from 12.23.0 to 12.28.1 ([#16501](https://github.com/opensearch-project/OpenSearch/pull/16501)) - Bump `org.apache.hadoop:hadoop-minicluster` from 3.4.0 to 3.4.1 ([#16550](https://github.com/opensearch-project/OpenSearch/pull/16550)) - Bump `lycheeverse/lychee-action` from 2.0.2 to 2.1.0 ([#16610](https://github.com/opensearch-project/OpenSearch/pull/16610)) +- Bump `me.champeau.gradle.japicmp` from 0.4.4 to 0.4.5 ([#16614](https://github.com/opensearch-project/OpenSearch/pull/16614)) ### Changed diff --git a/server/build.gradle b/server/build.gradle index d3c7d4089125c..f1679ccfbec30 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -36,7 +36,7 @@ plugins { id('opensearch.publish') id('opensearch.internal-cluster-test') id('opensearch.optional-dependencies') - id('me.champeau.gradle.japicmp') version '0.4.4' + id('me.champeau.gradle.japicmp') version '0.4.5' } publishing {