diff --git a/docs/changelog/110630.yaml b/docs/changelog/110630.yaml new file mode 100644 index 0000000000000..9bf78e1209753 --- /dev/null +++ b/docs/changelog/110630.yaml @@ -0,0 +1,5 @@ +pr: 110630 +summary: Telemetry for inference adaptive allocations +area: Machine Learning +type: feature +issues: [] diff --git a/docs/reference/ingest/processors/geoip.asciidoc b/docs/reference/ingest/processors/geoip.asciidoc index 738ac234d6162..230146d483144 100644 --- a/docs/reference/ingest/processors/geoip.asciidoc +++ b/docs/reference/ingest/processors/geoip.asciidoc @@ -64,12 +64,12 @@ depend on what has been found and which properties were configured in `propertie * If the GeoIP2 Domain database is used, then the following fields may be added under the `target_field`: `ip`, and `domain`. The fields actually added depend on what has been found and which properties were configured in `properties`. * If the GeoIP2 ISP database is used, then the following fields may be added under the `target_field`: `ip`, `asn`, -`organization_name`, `network`, `isp`, `isp_organization`, `mobile_country_code`, and `mobile_network_code`. The fields actually added +`organization_name`, `network`, `isp`, `isp_organization_name`, `mobile_country_code`, and `mobile_network_code`. The fields actually added depend on what has been found and which properties were configured in `properties`. * If the GeoIP2 Enterprise database is used, then the following fields may be added under the `target_field`: `ip`, `country_iso_code`, `country_name`, `continent_code`, `continent_name`, `region_iso_code`, `region_name`, `city_name`, `timezone`, `location`, `asn`, `organization_name`, `network`, `hosting_provider`, `tor_exit_node`, `anonymous_vpn`, `anonymous`, `public_proxy`, -`residential_proxy`, `domain`, `isp`, `isp_organization`, `mobile_country_code`, `mobile_network_code`, `user_type`, and +`residential_proxy`, `domain`, `isp`, `isp_organization_name`, `mobile_country_code`, `mobile_network_code`, `user_type`, and `connection_type`. The fields actually added depend on what has been found and which properties were configured in `properties`. preview::["Do not use the GeoIP2 Anonymous IP, GeoIP2 Connection Type, GeoIP2 Domain, GeoIP2 ISP, and GeoIP2 Enterprise databases in production environments. This functionality is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features."] diff --git a/docs/reference/search/retriever.asciidoc b/docs/reference/search/retriever.asciidoc index b86339b905631..1b7376c21daab 100644 --- a/docs/reference/search/retriever.asciidoc +++ b/docs/reference/search/retriever.asciidoc @@ -209,6 +209,11 @@ GET /index/_search The `text_similarity_reranker` is a type of retriever that enhances search results by re-ranking documents based on semantic similarity to a specified inference text, using a machine learning model. +[TIP] +==== +Refer to <> for a high level overview of semantic reranking. +==== + ===== Prerequisites To use `text_similarity_reranker` you must first set up a `rerank` task using the <>. @@ -262,13 +267,13 @@ GET /index/_search "text_similarity_reranker": { "retriever": { "standard": { ... } - } - }, - "field": "text", - "inference_id": "my-cohere-rerank-model", - "inference_text": "Most famous landmark in Paris", - "rank_window_size": 100, - "min_score": 0.5 + }, + "field": "text", + "inference_id": "my-cohere-rerank-model", + "inference_text": "Most famous landmark in Paris", + "rank_window_size": 100, + "min_score": 0.5 + } } } ---- diff --git a/docs/reference/search/search-your-data/retrievers-reranking/semantic-reranking.asciidoc b/docs/reference/search/search-your-data/retrievers-reranking/semantic-reranking.asciidoc index 75c06aa953302..f25741fca0b8f 100644 --- a/docs/reference/search/search-your-data/retrievers-reranking/semantic-reranking.asciidoc +++ b/docs/reference/search/search-your-data/retrievers-reranking/semantic-reranking.asciidoc @@ -5,7 +5,7 @@ preview::[] [TIP] ==== -This overview focuses more on the high-level concepts and use cases for semantic reranking. For full implementation details on how to set up and use semantic reranking in {es}, see the <> in the Search API docs. +This overview focuses more on the high-level concepts and use cases for semantic reranking. For full implementation details on how to set up and use semantic reranking in {es}, see the <> in the Search API docs. ==== Rerankers improve the relevance of results from earlier-stage retrieval mechanisms. @@ -89,11 +89,16 @@ In {es}, semantic rerankers are implemented using the {es} <>. +. *Choose a reranking model*. +Currently you can: + +** Integrate directly with the <> using the `rerank` task type +** Integrate directly with the <> using the `rerank` task type +** Upload a model to {es} from Hugging Face with {eland-docs}/machine-learning.html#ml-nlp-pytorch[Eland] +*** Then set up an <> with the `rerank` task type +. *Create a `rerank` task using the <>*. The Inference API creates an inference endpoint and configures your chosen machine learning model to perform the reranking task. -. Define a `text_similarity_reranker` retriever in your search request. +. *Define a `text_similarity_reranker` retriever in your search request*. The retriever syntax makes it simple to configure both the retrieval and reranking of search results in a single API call. .*Example search request* with semantic reranker @@ -127,20 +132,6 @@ POST _search // TEST[skip:TBD] ============== -[discrete] -[[semantic-reranking-types]] -==== Supported reranking types - -The following `text_similarity_reranker` model configuration options are available. - -*Text similarity with cross-encoder* - -This solution uses a hosted or 3rd party inference service which relies on a cross-encoder model. -The model receives the text fields from the _top-K_ documents, as well as the search query, and calculates scores directly, which are then used to rerank the documents. - -Used with the Cohere inference service rolled out in 8.13, turn on semantic reranking that works out of the box. -Check out our https://github.com/elastic/elasticsearch-labs/blob/main/notebooks/integrations/cohere/cohere-elasticsearch.ipynb[Python notebook] for using Cohere with {es}. - [discrete] [[semantic-reranking-learn-more]] ==== Learn more diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APMMeterRegistry.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APMMeterRegistry.java index 382fc9417eac0..831e2f19e0126 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APMMeterRegistry.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/APMMeterRegistry.java @@ -38,6 +38,7 @@ import org.elasticsearch.telemetry.metric.LongWithAttributes; import org.elasticsearch.telemetry.metric.MeterRegistry; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; @@ -96,11 +97,11 @@ public DoubleCounter getDoubleCounter(String name) { } @Override - public DoubleAsyncCounter registerDoubleAsyncCounter( + public DoubleAsyncCounter registerDoublesAsyncCounter( String name, String description, String unit, - Supplier observer + Supplier> observer ) { try (ReleasableLock lock = registerLock.acquire()) { return register(doubleAsynchronousCounters, new DoubleAsyncCounterAdapter(meter, name, description, unit, observer)); @@ -125,7 +126,12 @@ public DoubleUpDownCounter getDoubleUpDownCounter(String name) { } @Override - public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier observer) { + public DoubleGauge registerDoublesGauge( + String name, + String description, + String unit, + Supplier> observer + ) { try (ReleasableLock lock = registerLock.acquire()) { return register(doubleGauges, new DoubleGaugeAdapter(meter, name, description, unit, observer)); } @@ -156,7 +162,12 @@ public LongCounter registerLongCounter(String name, String description, String u } @Override - public LongAsyncCounter registerLongAsyncCounter(String name, String description, String unit, Supplier observer) { + public LongAsyncCounter registerLongsAsyncCounter( + String name, + String description, + String unit, + Supplier> observer + ) { try (ReleasableLock lock = registerLock.acquire()) { return register(longAsynchronousCounters, new LongAsyncCounterAdapter(meter, name, description, unit, observer)); } @@ -185,7 +196,7 @@ public LongUpDownCounter getLongUpDownCounter(String name) { } @Override - public LongGauge registerLongGauge(String name, String description, String unit, Supplier observer) { + public LongGauge registerLongsGauge(String name, String description, String unit, Supplier> observer) { try (ReleasableLock lock = registerLock.acquire()) { return register(longGauges, new LongGaugeAdapter(meter, name, description, unit, observer)); } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleAsyncCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleAsyncCounterAdapter.java index 6b17a83619ef7..ab735c41ca890 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleAsyncCounterAdapter.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleAsyncCounterAdapter.java @@ -15,12 +15,19 @@ import org.elasticsearch.telemetry.metric.DoubleAsyncCounter; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; +import java.util.Collection; import java.util.Objects; import java.util.function.Supplier; public class DoubleAsyncCounterAdapter extends AbstractInstrument implements DoubleAsyncCounter { - public DoubleAsyncCounterAdapter(Meter meter, String name, String description, String unit, Supplier observer) { + public DoubleAsyncCounterAdapter( + Meter meter, + String name, + String description, + String unit, + Supplier> observer + ) { super(meter, new Builder(name, description, unit, observer)); } @@ -30,9 +37,9 @@ public void close() throws Exception { } private static class Builder extends AbstractInstrument.Builder { - private final Supplier observer; + private final Supplier> observer; - private Builder(String name, String description, String unit, Supplier observer) { + private Builder(String name, String description, String unit, Supplier> observer) { super(name, description, unit); this.observer = Objects.requireNonNull(observer); } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java index ed6ecee66d696..2a9c2d45981ed 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java @@ -14,6 +14,7 @@ import org.elasticsearch.telemetry.apm.AbstractInstrument; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; +import java.util.Collection; import java.util.Objects; import java.util.function.Supplier; @@ -24,7 +25,13 @@ public class DoubleGaugeAdapter extends AbstractInstrument observer) { + public DoubleGaugeAdapter( + Meter meter, + String name, + String description, + String unit, + Supplier> observer + ) { super(meter, new Builder(name, description, unit, observer)); } @@ -34,9 +41,9 @@ public void close() throws Exception { } private static class Builder extends AbstractInstrument.Builder { - private final Supplier observer; + private final Supplier> observer; - private Builder(String name, String description, String unit, Supplier observer) { + private Builder(String name, String description, String unit, Supplier> observer) { super(name, description, unit); this.observer = Objects.requireNonNull(observer); } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongAsyncCounterAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongAsyncCounterAdapter.java index 14c58139d03e1..1bc21ef2c831c 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongAsyncCounterAdapter.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongAsyncCounterAdapter.java @@ -15,12 +15,19 @@ import org.elasticsearch.telemetry.metric.LongAsyncCounter; import org.elasticsearch.telemetry.metric.LongWithAttributes; +import java.util.Collection; import java.util.Objects; import java.util.function.Supplier; public class LongAsyncCounterAdapter extends AbstractInstrument implements LongAsyncCounter { - public LongAsyncCounterAdapter(Meter meter, String name, String description, String unit, Supplier observer) { + public LongAsyncCounterAdapter( + Meter meter, + String name, + String description, + String unit, + Supplier> observer + ) { super(meter, new Builder(name, description, unit, observer)); } @@ -30,9 +37,9 @@ public void close() throws Exception { } private static class Builder extends AbstractInstrument.Builder { - private final Supplier observer; + private final Supplier> observer; - private Builder(String name, String description, String unit, Supplier observer) { + private Builder(String name, String description, String unit, Supplier> observer) { super(name, description, unit); this.observer = Objects.requireNonNull(observer); } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java index 52c19c80c284f..eab9ed2eb5278 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java @@ -14,6 +14,7 @@ import org.elasticsearch.telemetry.apm.AbstractInstrument; import org.elasticsearch.telemetry.metric.LongWithAttributes; +import java.util.Collection; import java.util.Objects; import java.util.function.Supplier; @@ -21,7 +22,7 @@ * LongGaugeAdapter wraps an otel ObservableLongGauge */ public class LongGaugeAdapter extends AbstractInstrument implements org.elasticsearch.telemetry.metric.LongGauge { - public LongGaugeAdapter(Meter meter, String name, String description, String unit, Supplier observer) { + public LongGaugeAdapter(Meter meter, String name, String description, String unit, Supplier> observer) { super(meter, new Builder(name, description, unit, observer)); } @@ -31,11 +32,11 @@ public void close() throws Exception { } private static class Builder extends AbstractInstrument.Builder { - private final Supplier observer; + private final Supplier> observer; - private Builder(String name, String description, String unit, Supplier observer) { + private Builder(String name, String description, String unit, Supplier> observer) { super(name, description, unit); - this.observer = Objects.requireNonNull(observer); + this.observer = observer; } @Override diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java index 3e8ab415bd25e..1d760c8c12791 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/OtelHelper.java @@ -17,6 +17,7 @@ import org.elasticsearch.telemetry.metric.DoubleWithAttributes; import org.elasticsearch.telemetry.metric.LongWithAttributes; +import java.util.Collection; import java.util.Map; import java.util.function.Consumer; import java.util.function.Supplier; @@ -53,37 +54,45 @@ static Attributes fromMap(Map attributes) { return builder.build(); } - static Consumer doubleMeasurementCallback(Supplier observer) { + static Consumer doubleMeasurementCallback(Supplier> observer) { return measurement -> { - DoubleWithAttributes observation; + Collection observations; try { - observation = observer.get(); + observations = observer.get(); } catch (RuntimeException err) { assert false : "observer must not throw [" + err.getMessage() + "]"; logger.error("doubleMeasurementCallback observer unexpected error", err); return; } - if (observation == null) { + if (observations == null) { return; } - measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes())); + for (DoubleWithAttributes observation : observations) { + if (observation != null) { + measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes())); + } + } }; } - static Consumer longMeasurementCallback(Supplier observer) { + static Consumer longMeasurementCallback(Supplier> observer) { return measurement -> { - LongWithAttributes observation; + Collection observations; try { - observation = observer.get(); + observations = observer.get(); } catch (RuntimeException err) { assert false : "observer must not throw [" + err.getMessage() + "]"; logger.error("longMeasurementCallback observer unexpected error", err); return; } - if (observation == null) { + if (observations == null) { return; } - measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes())); + for (LongWithAttributes observation : observations) { + if (observation != null) { + measurement.record(observation.value(), OtelHelper.fromMap(observation.attributes())); + } + } }; } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index e39705a71f56c..82b9e930280b7 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -592,7 +592,7 @@ private Map retrieveEnterpriseGeoData(GeoIpDatabase geoIpDatabas } case ISP_ORGANIZATION_NAME -> { if (ispOrganization != null) { - geoData.put("isp_organization", ispOrganization); + geoData.put("isp_organization_name", ispOrganization); } } case MOBILE_COUNTRY_CODE -> { @@ -660,7 +660,7 @@ private Map retrieveIspGeoData(GeoIpDatabase geoIpDatabase, Inet } case ISP_ORGANIZATION_NAME -> { if (ispOrganization != null) { - geoData.put("isp_organization", ispOrganization); + geoData.put("isp_organization_name", ispOrganization); } } case MOBILE_COUNTRY_CODE -> { diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 6276155d9f083..87d1881a9e743 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -463,7 +463,7 @@ public void testEnterprise() throws Exception { assertThat(geoData.get("residential_proxy"), equalTo(false)); assertThat(geoData.get("domain"), equalTo("frpt.net")); assertThat(geoData.get("isp"), equalTo("Fairpoint Communications")); - assertThat(geoData.get("isp_organization"), equalTo("Fairpoint Communications")); + assertThat(geoData.get("isp_organization_name"), equalTo("Fairpoint Communications")); assertThat(geoData.get("user_type"), equalTo("residential")); assertThat(geoData.get("connection_type"), equalTo("Cable/DSL")); } @@ -497,7 +497,7 @@ public void testIsp() throws Exception { assertThat(geoData.get("organization_name"), equalTo("CELLCO-PART")); assertThat(geoData.get("network"), equalTo("149.101.100.0/28")); assertThat(geoData.get("isp"), equalTo("Verizon Wireless")); - assertThat(geoData.get("isp_organization"), equalTo("Verizon Wireless")); + assertThat(geoData.get("isp_organization_name"), equalTo("Verizon Wireless")); assertThat(geoData.get("mobile_network_code"), equalTo("004")); assertThat(geoData.get("mobile_country_code"), equalTo("310")); } diff --git a/modules/repository-azure/src/yamlRestTest/java/org/elasticsearch/repositories/azure/RepositoryAzureClientYamlTestSuiteIT.java b/modules/repository-azure/src/yamlRestTest/java/org/elasticsearch/repositories/azure/RepositoryAzureClientYamlTestSuiteIT.java index ba476e754a59e..1fd04e73364ab 100644 --- a/modules/repository-azure/src/yamlRestTest/java/org/elasticsearch/repositories/azure/RepositoryAzureClientYamlTestSuiteIT.java +++ b/modules/repository-azure/src/yamlRestTest/java/org/elasticsearch/repositories/azure/RepositoryAzureClientYamlTestSuiteIT.java @@ -14,6 +14,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.elasticsearch.core.Booleans; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestTrustStore; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; @@ -30,7 +31,9 @@ public class RepositoryAzureClientYamlTestSuiteIT extends ESClientYamlSuiteTestC private static final String AZURE_TEST_SASTOKEN = System.getProperty("test.azure.sas_token"); private static AzureHttpFixture fixture = new AzureHttpFixture( - USE_FIXTURE ? AzureHttpFixture.Protocol.HTTPS : AzureHttpFixture.Protocol.NONE, + USE_FIXTURE + ? ESTestCase.inFipsJvm() ? AzureHttpFixture.Protocol.HTTP : AzureHttpFixture.Protocol.HTTPS + : AzureHttpFixture.Protocol.NONE, AZURE_TEST_ACCOUNT, AZURE_TEST_CONTAINER, AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_TEST_ACCOUNT) @@ -59,7 +62,11 @@ public class RepositoryAzureClientYamlTestSuiteIT extends ESClientYamlSuiteTestC s -> USE_FIXTURE ) .setting("thread_pool.repository_azure.max", () -> String.valueOf(randomIntBetween(1, 10)), s -> USE_FIXTURE) - .systemProperty("javax.net.ssl.trustStore", () -> trustStore.getTrustStorePath().toString(), s -> USE_FIXTURE) + .systemProperty( + "javax.net.ssl.trustStore", + () -> trustStore.getTrustStorePath().toString(), + s -> USE_FIXTURE && ESTestCase.inFipsJvm() == false + ) .build(); @ClassRule(order = 1) diff --git a/muted-tests.yml b/muted-tests.yml index 0d31ecb149c22..d106ca3c9d701 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -44,9 +44,6 @@ tests: - class: "org.elasticsearch.xpack.test.rest.XPackRestIT" issue: "https://github.com/elastic/elasticsearch/issues/109687" method: "test {p0=sql/translate/Translate SQL}" -- class: "org.elasticsearch.action.admin.indices.rollover.RolloverIT" - issue: "https://github.com/elastic/elasticsearch/issues/110034" - method: "testRolloverWithClosedWriteIndex" - class: org.elasticsearch.index.store.FsDirectoryFactoryTests method: testStoreDirectory issue: https://github.com/elastic/elasticsearch/issues/110210 @@ -111,11 +108,29 @@ tests: - class: org.elasticsearch.upgrades.LogsIndexModeFullClusterRestartIT method: testLogsIndexing {cluster=UPGRADED} issue: https://github.com/elastic/elasticsearch/issues/111306 -- class: org.elasticsearch.xpack.core.ml.job.config.DetectionRuleTests - method: testEqualsAndHashcode - issue: https://github.com/elastic/elasticsearch/issues/111308 - class: org.elasticsearch.xpack.ml.integration.DatafeedJobsRestIT issue: https://github.com/elastic/elasticsearch/issues/111319 +- class: org.elasticsearch.xpack.esql.analysis.VerifierTests + method: testMatchFilter + issue: https://github.com/elastic/elasticsearch/issues/111380 +- class: org.elasticsearch.xpack.ml.integration.InferenceIngestInputConfigIT + method: testIngestWithInputFields + issue: https://github.com/elastic/elasticsearch/issues/111383 +- class: org.elasticsearch.xpack.ml.integration.InferenceIngestInputConfigIT + method: testIngestWithMultipleInputFields + issue: https://github.com/elastic/elasticsearch/issues/111384 +- class: org.elasticsearch.xpack.restart.FullClusterRestartIT + method: testTransformLegacyTemplateCleanup {cluster=OLD} + issue: https://github.com/elastic/elasticsearch/issues/111385 +- class: org.elasticsearch.xpack.restart.FullClusterRestartIT + method: testRollupAfterRestart {cluster=OLD} + issue: https://github.com/elastic/elasticsearch/issues/111392 +- class: org.elasticsearch.xpack.restart.FullClusterRestartIT + method: testRollupAfterRestart {cluster=UPGRADED} + issue: https://github.com/elastic/elasticsearch/issues/111393 +- class: org.elasticsearch.xpack.security.authc.oidc.OpenIdConnectAuthIT + method: testAuthenticateWithCodeFlowAndClientPost + issue: https://github.com/elastic/elasticsearch/issues/111396 - class: org.elasticsearch.xpack.searchablesnapshots.AzureSearchableSnapshotsIT issue: https://github.com/elastic/elasticsearch/issues/111279 - class: org.elasticsearch.repositories.azure.RepositoryAzureClientYamlTestSuiteIT diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/rollover/RolloverIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/rollover/RolloverIT.java index 4d52383bfc4e1..a568424300e75 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/rollover/RolloverIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/rollover/RolloverIT.java @@ -665,7 +665,7 @@ public void testRolloverWithClosedWriteIndex() throws Exception { assertAcked(prepareCreate(openNonwriteIndex).addAlias(new Alias(aliasName)).get()); assertAcked(prepareCreate(closedIndex).addAlias(new Alias(aliasName)).get()); assertAcked(prepareCreate(writeIndexPrefix + "000001").addAlias(new Alias(aliasName).writeIndex(true)).get()); - + ensureGreen(openNonwriteIndex, closedIndex, writeIndexPrefix + "000001"); index(closedIndex, null, "{\"foo\": \"bar\"}"); index(aliasName, null, "{\"foo\": \"bar\"}"); index(aliasName, null, "{\"foo\": \"bar\"}"); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java new file mode 100644 index 0000000000000..70add580f8d8c --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.support.replication; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.function.BiFunction; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class TransportReplicationActionBypassCircuitBreakerOnReplicaIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(TestPlugin.class); + } + + public static class Request extends ReplicationRequest { + public Request(ShardId shardId) { + super(shardId); + } + + public Request(StreamInput in) throws IOException { + super(in); + } + + @Override + public String toString() { + return "test-request"; + } + } + + public static class Response extends ReplicationResponse { + public Response() {} + + public Response(StreamInput in) throws IOException { + super(in); + } + } + + public static class TestAction extends TransportReplicationAction { + private static final String ACTION_NAME = "internal:test-replication-action"; + private static final ActionType TYPE = new ActionType<>(ACTION_NAME); + + @Inject + public TestAction( + Settings settings, + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + ThreadPool threadPool, + ShardStateAction shardStateAction, + ActionFilters actionFilters + ) { + super( + settings, + ACTION_NAME, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + Request::new, + Request::new, + threadPool.executor(ThreadPool.Names.GENERIC), + SyncGlobalCheckpointAfterOperation.DoNotSync, + PrimaryActionExecution.RejectOnOverload, + ReplicaActionExecution.BypassCircuitBreaker + ); + } + + @Override + protected Response newResponseInstance(StreamInput in) throws IOException { + return new Response(in); + } + + @Override + protected void shardOperationOnPrimary( + Request shardRequest, + IndexShard primary, + ActionListener> listener + ) { + listener.onResponse(new PrimaryResult<>(shardRequest, new Response())); + } + + @Override + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + listener.onResponse(new ReplicaResult()); + } + } + + public static class TestPlugin extends Plugin implements ActionPlugin { + + public TestPlugin() {} + + @Override + public List> getActions() { + return List.of(new ActionHandler<>(TestAction.TYPE, TestAction.class)); + } + } + + private enum PrimaryOrReplica implements BiFunction { + PRIMARY { + @Override + public String apply(String primaryName, String replicaName) { + return primaryName; + } + }, + REPLICA { + @Override + public String apply(String primaryName, String replicaName) { + return replicaName; + } + } + } + + public void testActionCompletesWhenReplicaCircuitBreakersAreAtCapacity() { + maxOutCircuitBreakersAndExecuteAction(PrimaryOrReplica.REPLICA); + } + + public void testActionFailsWhenPrimaryCircuitBreakersAreAtCapacity() { + AssertionError assertionError = assertThrows( + AssertionError.class, + () -> maxOutCircuitBreakersAndExecuteAction(PrimaryOrReplica.PRIMARY) + ); + assertNotNull( + "Not caused by CircuitBreakingException " + ExceptionsHelper.stackTrace(assertionError), + ExceptionsHelper.unwrap(assertionError, CircuitBreakingException.class) + ); + } + + private void maxOutCircuitBreakersAndExecuteAction(PrimaryOrReplica nodeToMaxOutCircuitBreakers) { + internalCluster().startMasterOnlyNodes(2); + String primary = internalCluster().startDataOnlyNode(); + assertAcked( + prepareCreate("test").setSettings( + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + ) + ); + + String replica = internalCluster().startDataOnlyNode(); + String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + ensureGreen("test"); + + try ( + var ignored = fullyAllocateCircuitBreakerOnNode( + nodeToMaxOutCircuitBreakers.apply(primary, replica), + CircuitBreaker.IN_FLIGHT_REQUESTS + ) + ) { + PlainActionFuture testActionResult = new PlainActionFuture<>(); + client(coordinator).execute(TestAction.TYPE, new Request(new ShardId(resolveIndex("test"), 0)), testActionResult); + safeGet(testActionResult); + } + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java index c4737468a766c..459ca39e86b0e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java @@ -107,7 +107,8 @@ public TestAction( Request::new, threadPool.executor(ThreadPool.Names.GENERIC), SyncGlobalCheckpointAfterOperation.DoNotSync, - PrimaryActionExecution.RejectOnOverload + PrimaryActionExecution.RejectOnOverload, + ReplicaActionExecution.SubjectToCircuitBreaker ); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java index 71c6ef956c4d4..66fc6e0236b53 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java @@ -17,9 +17,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.disruption.NetworkDisruption; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; @@ -259,46 +257,31 @@ public void testJoinWaitsForCircuitBreaker() throws InterruptedException { logger.info("--> master [{}], victim [{}]", masterName, victimName); // fill up the circuit breaker to breaking point - final var circuitBreaker = internalCluster().getInstance(CircuitBreakerService.class, victimName) - .getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); - long allocationSize = 1; - while (true) { - try { - circuitBreaker.addEstimateBytesAndMaybeBreak(allocationSize, "test"); - } catch (CircuitBreakingException e) { - circuitBreaker.addWithoutBreaking(allocationSize); - break; - } - allocationSize <<= 1; - assert 0 <= allocationSize; - } - - // drop the victim from the cluster with a network disruption - final var masterTransportService = MockTransportService.getInstance(masterName); - masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(TransportService.class, victimName)); - logger.info("--> waiting for victim's departure"); - ensureStableCluster(2, masterName); - - // verify that the victim sends no joins while the circuit breaker is breaking - final var victimTransportService = MockTransportService.getInstance(victimName); - victimTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - assertNotEquals(action, JoinHelper.JOIN_ACTION_NAME); - connection.sendRequest(requestId, action, request, options); - }); - - // fix the network disruption - logger.info("--> removing network disruption"); - masterTransportService.clearAllRules(); - ensureStableCluster(2, masterName); - - // permit joins again - victimTransportService.addSendBehavior(null); - - // release the breaker - logger.info("--> releasing allocations from circuit breaker"); - while (0 < allocationSize) { - circuitBreaker.addWithoutBreaking(-allocationSize); - allocationSize >>= 1; + try (var ignored = fullyAllocateCircuitBreakerOnNode(victimName, CircuitBreaker.IN_FLIGHT_REQUESTS)) { + + // drop the victim from the cluster with a network disruption + final var masterTransportService = MockTransportService.getInstance(masterName); + masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(TransportService.class, victimName)); + logger.info("--> waiting for victim's departure"); + ensureStableCluster(2, masterName); + + // verify that the victim sends no joins while the circuit breaker is breaking + final var victimTransportService = MockTransportService.getInstance(victimName); + victimTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + assertNotEquals(action, JoinHelper.JOIN_ACTION_NAME); + connection.sendRequest(requestId, action, request, options); + }); + + // fix the network disruption + logger.info("--> removing network disruption"); + masterTransportService.clearAllRules(); + ensureStableCluster(2, masterName); + + // permit joins again + victimTransportService.addSendBehavior(null); + + // release the breaker + logger.info("--> releasing allocations from circuit breaker"); } logger.info("--> waiting for cluster to heal"); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 643f92ec3378f..3ea246ca0e611 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -68,7 +68,8 @@ public TransportVerifyShardBeforeCloseAction( ShardRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT), SyncGlobalCheckpointAfterOperation.DoNotSync, - PrimaryActionExecution.RejectOnOverload + PrimaryActionExecution.RejectOnOverload, + ReplicaActionExecution.SubjectToCircuitBreaker ); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 69e1309b89aef..dedd4a27678ea 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -60,7 +60,8 @@ public TransportShardFlushAction( ShardFlushRequest::new, threadPool.executor(ThreadPool.Names.FLUSH), SyncGlobalCheckpointAfterOperation.DoNotSync, - PrimaryActionExecution.RejectOnOverload + PrimaryActionExecution.RejectOnOverload, + ReplicaActionExecution.SubjectToCircuitBreaker ); transportService.registerRequestHandler( PRE_SYNCED_FLUSH_ACTION_NAME, diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java index e93b3983ee85b..1de5988da26c9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java @@ -69,7 +69,8 @@ public TransportVerifyShardIndexBlockAction( ShardRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT), SyncGlobalCheckpointAfterOperation.DoNotSync, - PrimaryActionExecution.RejectOnOverload + PrimaryActionExecution.RejectOnOverload, + ReplicaActionExecution.SubjectToCircuitBreaker ); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index cc4edcf0efb81..15ff792a888e6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -70,7 +70,8 @@ public TransportShardRefreshAction( ShardRefreshReplicaRequest::new, threadPool.executor(ThreadPool.Names.REFRESH), SyncGlobalCheckpointAfterOperation.DoNotSync, - PrimaryActionExecution.RejectOnOverload + PrimaryActionExecution.RejectOnOverload, + ReplicaActionExecution.SubjectToCircuitBreaker ); // registers the unpromotable version of shard refresh action new TransportUnpromotableShardRefreshAction(clusterService, transportService, shardStateAction, actionFilters, indicesService); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index fc9df7bbf73b9..56dd651f1611e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -117,7 +117,8 @@ public TransportShardBulkAction( ExecutorSelector.getWriteExecutorForShard(threadPool), PrimaryActionExecution.RejectOnOverload, indexingPressure, - systemIndices + systemIndices, + ReplicaActionExecution.SubjectToCircuitBreaker ); this.updateHelper = updateHelper; this.mappingUpdatedAction = mappingUpdatedAction; diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 5a891f33480fa..9d40764951f26 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -73,7 +73,8 @@ public TransportResyncReplicationAction( ExecutorSelector.getWriteExecutorForShard(threadPool), PrimaryActionExecution.Force, /* we should never reject resync because of thread pool capacity on primary */ indexingPressure, - systemIndices + systemIndices, + ReplicaActionExecution.SubjectToCircuitBreaker ); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java index 7ad81154691c0..da8479873a4b6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java @@ -42,7 +42,13 @@ public void start() { } } - static void doCheckNoMissingShards(String phaseName, SearchRequest request, GroupShardsIterator shardsIts) { + protected String missingShardsErrorMessage(StringBuilder missingShards) { + return "Search rejected due to missing shards [" + + missingShards + + "]. Consider using `allow_partial_search_results` setting to bypass this error."; + } + + protected void doCheckNoMissingShards(String phaseName, SearchRequest request, GroupShardsIterator shardsIts) { assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults"; if (request.allowPartialSearchResults() == false) { final StringBuilder missingShards = new StringBuilder(); @@ -58,9 +64,7 @@ static void doCheckNoMissingShards(String phaseName, SearchRequest request, Grou } if (missingShards.isEmpty() == false) { // Status red - shard is missing all copies and would produce partial results for an index search - final String msg = "Search rejected due to missing shards [" - + missingShards - + "]. Consider using `allow_partial_search_results` setting to bypass this error."; + final String msg = missingShardsErrorMessage(missingShards); throw new SearchPhaseExecutionException(phaseName, msg, null, ShardSearchFailure.EMPTY_ARRAY); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 91784ba331857..92d90fa8e55ad 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -213,6 +213,11 @@ SearchPhase openPointInTimePhase( searchRequest.getMaxConcurrentShardRequests(), clusters ) { + @Override + protected String missingShardsErrorMessage(StringBuilder missingShards) { + return "[open_point_in_time] action requires all shards to be available. Missing shards: [" + missingShards + "]"; + } + @Override protected void executePhaseOnShard( SearchShardIterator shardIt, diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 3c97bda2ef8d0..7e3e5bdee206d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -117,6 +117,20 @@ protected enum SyncGlobalCheckpointAfterOperation { AttemptAfterSuccess } + /** + * Execution of the replica action + */ + protected enum ReplicaActionExecution { + /** + * Will only execute when permitted by the configured circuit breakers + */ + SubjectToCircuitBreaker, + /** + * Will bypass the configured circuit breaker checks + */ + BypassCircuitBreaker + } + /** * The timeout for retrying replication requests. */ @@ -170,12 +184,14 @@ protected TransportReplicationAction( Writeable.Reader replicaRequestReader, Executor executor, SyncGlobalCheckpointAfterOperation syncGlobalCheckpointAfterOperation, - PrimaryActionExecution primaryActionExecution + PrimaryActionExecution primaryActionExecution, + ReplicaActionExecution replicaActionExecution ) { // TODO: consider passing the executor, investigate doExecute and let InboundHandler/TransportAction handle concurrency. super(actionName, actionFilters, transportService.getTaskManager(), EsExecutors.DIRECT_EXECUTOR_SERVICE); assert syncGlobalCheckpointAfterOperation != null : "Must specify global checkpoint sync behaviour"; assert primaryActionExecution != null : "Must specify primary action execution behaviour"; + assert replicaActionExecution != null : "Must specify replica action execution behaviour"; this.threadPool = threadPool; this.transportService = transportService; this.clusterService = clusterService; @@ -209,12 +225,15 @@ protected TransportReplicationAction( this::handlePrimaryRequest ); - // we must never reject on because of thread pool capacity on replicas + boolean canTripCircuitBreakerOnReplica = switch (replicaActionExecution) { + case BypassCircuitBreaker -> false; + case SubjectToCircuitBreaker -> true; + }; transportService.registerRequestHandler( transportReplicaAction, executor, - true, - true, + true, // we must never reject because of thread pool capacity on replicas + canTripCircuitBreakerOnReplica, in -> new ConcreteReplicaRequest<>(replicaRequestReader, in), this::handleReplicaRequest ); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index f380710cc0794..90b636ed69e24 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -78,7 +78,8 @@ protected TransportWriteAction( BiFunction executorFunction, PrimaryActionExecution primaryActionExecution, IndexingPressure indexingPressure, - SystemIndices systemIndices + SystemIndices systemIndices, + ReplicaActionExecution replicaActionExecution ) { // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the // ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class. @@ -95,7 +96,8 @@ protected TransportWriteAction( replicaRequest, EsExecutors.DIRECT_EXECUTOR_SERVICE, SyncGlobalCheckpointAfterOperation.AttemptAfterSuccess, - primaryActionExecution + primaryActionExecution, + replicaActionExecution ); this.executorFunction = executorFunction; this.indexingPressure = indexingPressure; diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index a051d9c2df430..e7f97173de288 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -64,7 +64,8 @@ public GlobalCheckpointSyncAction( Request::new, threadPool.executor(ThreadPool.Names.WRITE), SyncGlobalCheckpointAfterOperation.DoNotSync, - PrimaryActionExecution.Force + PrimaryActionExecution.Force, + ReplicaActionExecution.SubjectToCircuitBreaker ); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 0aa0f0b8d1556..af23386254b66 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -83,7 +83,8 @@ public RetentionLeaseBackgroundSyncAction( Request::new, threadPool.executor(ThreadPool.Names.MANAGEMENT), SyncGlobalCheckpointAfterOperation.DoNotSync, - PrimaryActionExecution.RejectOnOverload + PrimaryActionExecution.RejectOnOverload, + ReplicaActionExecution.SubjectToCircuitBreaker ); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 0efcf8ac9298b..1678e0021df59 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -93,7 +93,8 @@ public RetentionLeaseSyncAction( new ManagementOnlyExecutorFunction(threadPool), PrimaryActionExecution.RejectOnOverload, indexingPressure, - systemIndices + systemIndices, + ReplicaActionExecution.SubjectToCircuitBreaker ); } diff --git a/server/src/main/java/org/elasticsearch/inference/ServiceSettings.java b/server/src/main/java/org/elasticsearch/inference/ServiceSettings.java index 34a58f83963ce..58e87105f70a3 100644 --- a/server/src/main/java/org/elasticsearch/inference/ServiceSettings.java +++ b/server/src/main/java/org/elasticsearch/inference/ServiceSettings.java @@ -9,6 +9,7 @@ package org.elasticsearch.inference; import org.elasticsearch.common.io.stream.VersionedNamedWriteable; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper; import org.elasticsearch.xcontent.ToXContentObject; @@ -48,5 +49,6 @@ default DenseVectorFieldMapper.ElementType elementType() { * be chosen when initializing a deployment within their service. In this situation, return null. * @return the model used to perform inference or null if the model is not defined */ + @Nullable String modelId(); } diff --git a/server/src/main/java/org/elasticsearch/telemetry/metric/MeterRegistry.java b/server/src/main/java/org/elasticsearch/telemetry/metric/MeterRegistry.java index 0f690558361e4..12c62859fd372 100644 --- a/server/src/main/java/org/elasticsearch/telemetry/metric/MeterRegistry.java +++ b/server/src/main/java/org/elasticsearch/telemetry/metric/MeterRegistry.java @@ -8,6 +8,8 @@ package org.elasticsearch.telemetry.metric; +import java.util.Collection; +import java.util.Collections; import java.util.function.Supplier; /** @@ -15,6 +17,7 @@ * only be registered once. * TODO(stu): describe name, unit and description */ + public interface MeterRegistry { /** * Register a {@link DoubleCounter}. The returned object may be reused. @@ -57,7 +60,20 @@ public interface MeterRegistry { * Must not throw an exception and must be safe to call from different threads. * @return the registered meter. */ - DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier observer); + default DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier observer) { + return registerDoublesGauge(name, description, unit, () -> Collections.singleton(observer.get())); + } + + /** + * Register a {@link DoubleGauge}. The returned object may be reused. + * @param name name of the gauge + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @param observer callback to use. This is called once during reporting period. + * Must not throw an exception and must be safe to call from different threads. + * @return the registered meter. + */ + DoubleGauge registerDoublesGauge(String name, String description, String unit, Supplier> observer); /** * Retrieved a previously registered {@link DoubleGauge}. @@ -98,7 +114,23 @@ public interface MeterRegistry { * @param unit the unit (bytes, sec, hour) * @param observer a callback to provide a metric value upon observation (metric interval) */ - LongAsyncCounter registerLongAsyncCounter(String name, String description, String unit, Supplier observer); + default LongAsyncCounter registerLongAsyncCounter(String name, String description, String unit, Supplier observer) { + return registerLongsAsyncCounter(name, description, unit, () -> Collections.singleton(observer.get())); + } + + /** + * Register a {@link LongAsyncCounter} with an asynchronous callback. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @param observer a callback to provide a metric values upon observation (metric interval) + */ + LongAsyncCounter registerLongsAsyncCounter( + String name, + String description, + String unit, + Supplier> observer + ); /** * Retrieved a previously registered {@link LongAsyncCounter}. @@ -114,7 +146,28 @@ public interface MeterRegistry { * @param unit the unit (bytes, sec, hour) * @param observer a callback to provide a metric value upon observation (metric interval) */ - DoubleAsyncCounter registerDoubleAsyncCounter(String name, String description, String unit, Supplier observer); + default DoubleAsyncCounter registerDoubleAsyncCounter( + String name, + String description, + String unit, + Supplier observer + ) { + return registerDoublesAsyncCounter(name, description, unit, () -> Collections.singleton(observer.get())); + } + + /** + * Register a {@link DoubleAsyncCounter} with an asynchronous callback. The returned object may be reused. + * @param name name of the counter + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @param observer a callback to provide a metric values upon observation (metric interval) + */ + DoubleAsyncCounter registerDoublesAsyncCounter( + String name, + String description, + String unit, + Supplier> observer + ); /** * Retrieved a previously registered {@link DoubleAsyncCounter}. @@ -155,7 +208,20 @@ public interface MeterRegistry { * Must not throw an exception and must be safe to call from different threads. * @return the registered meter. */ - LongGauge registerLongGauge(String name, String description, String unit, Supplier observer); + default LongGauge registerLongGauge(String name, String description, String unit, Supplier observer) { + return registerLongsGauge(name, description, unit, () -> Collections.singleton(observer.get())); + } + + /** + * Register a {@link LongGauge}. The returned object may be reused. + * @param name name of the gauge + * @param description description of purpose + * @param unit the unit (bytes, sec, hour) + * @param observer callback to use. This is called once during reporting period. + * Must not throw an exception and must be safe to call from different threads. + * @return the registered meter. + */ + LongGauge registerLongsGauge(String name, String description, String unit, Supplier> observer); /** * Retrieved a previously registered {@link LongGauge}. @@ -204,7 +270,12 @@ public DoubleUpDownCounter getDoubleUpDownCounter(String name) { } @Override - public DoubleGauge registerDoubleGauge(String name, String description, String unit, Supplier observer) { + public DoubleGauge registerDoublesGauge( + String name, + String description, + String unit, + Supplier> observer + ) { return DoubleGauge.NOOP; } @@ -229,11 +300,11 @@ public LongCounter registerLongCounter(String name, String description, String u } @Override - public LongAsyncCounter registerLongAsyncCounter( + public LongAsyncCounter registerLongsAsyncCounter( String name, String description, String unit, - Supplier observer + Supplier> observer ) { return LongAsyncCounter.NOOP; } @@ -244,11 +315,11 @@ public LongAsyncCounter getLongAsyncCounter(String name) { } @Override - public DoubleAsyncCounter registerDoubleAsyncCounter( + public DoubleAsyncCounter registerDoublesAsyncCounter( String name, String description, String unit, - Supplier observer + Supplier> observer ) { return DoubleAsyncCounter.NOOP; } @@ -274,7 +345,12 @@ public LongUpDownCounter getLongUpDownCounter(String name) { } @Override - public LongGauge registerLongGauge(String name, String description, String unit, Supplier observer) { + public LongGauge registerLongsGauge( + String name, + String description, + String unit, + Supplier> observer + ) { return LongGauge.NOOP; } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 04ad7d410e9b0..1e891aa80b696 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1519,7 +1519,8 @@ private class TestAction extends TransportReplicationAction EsExecutors.DIRECT_EXECUTOR_SERVICE, PrimaryActionExecution.RejectOnOverload, new IndexingPressure(Settings.EMPTY), - EmptySystemIndices.INSTANCE + EmptySystemIndices.INSTANCE, + ReplicaActionExecution.SubjectToCircuitBreaker ); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; @@ -456,7 +457,8 @@ protected TestAction( (service, ignore) -> EsExecutors.DIRECT_EXECUTOR_SERVICE, PrimaryActionExecution.RejectOnOverload, new IndexingPressure(settings), - EmptySystemIndices.INSTANCE + EmptySystemIndices.INSTANCE, + ReplicaActionExecution.SubjectToCircuitBreaker ); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; diff --git a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java index 33693c297f166..97fe0ad1370ef 100644 --- a/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java +++ b/test/framework/src/main/java/org/elasticsearch/telemetry/RecordingMeterRegistry.java @@ -23,6 +23,7 @@ import org.elasticsearch.telemetry.metric.LongWithAttributes; import org.elasticsearch.telemetry.metric.MeterRegistry; +import java.util.Collection; import java.util.function.Supplier; /** @@ -76,6 +77,16 @@ public DoubleGauge registerDoubleGauge(String name, String description, String u return instrument; } + @Override + public DoubleGauge registerDoublesGauge( + String name, + String description, + String unit, + Supplier> observer + ) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public DoubleGauge getDoubleGauge(String name) { return (DoubleGauge) recorder.getInstrument(InstrumentType.DOUBLE_GAUGE, name); @@ -115,6 +126,16 @@ public LongAsyncCounter registerLongAsyncCounter(String name, String description return instrument; } + @Override + public LongAsyncCounter registerLongsAsyncCounter( + String name, + String description, + String unit, + Supplier> observer + ) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public LongAsyncCounter getLongAsyncCounter(String name) { return (LongAsyncCounter) recorder.getInstrument(InstrumentType.LONG_ASYNC_COUNTER, name); @@ -132,6 +153,16 @@ public DoubleAsyncCounter registerDoubleAsyncCounter( return instrument; } + @Override + public DoubleAsyncCounter registerDoublesAsyncCounter( + String name, + String description, + String unit, + Supplier> observer + ) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public DoubleAsyncCounter getDoubleAsyncCounter(String name) { return (DoubleAsyncCounter) recorder.getInstrument(InstrumentType.DOUBLE_ASYNC_COUNTER, name); @@ -170,6 +201,11 @@ public LongGauge registerLongGauge(String name, String description, String unit, return instrument; } + @Override + public LongGauge registerLongsGauge(String name, String description, String unit, Supplier> observer) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public LongGauge getLongGauge(String name) { return (LongGauge) recorder.getInstrument(InstrumentType.LONG_GAUGE, name); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index db6fc9ea696d5..33d36ed5e2cdb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -82,6 +82,8 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkAddress; @@ -100,6 +102,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; @@ -119,6 +122,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.IndicesRequestCache; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.monitor.jvm.HotThreads; import org.elasticsearch.node.NodeMocksPlugin; @@ -2543,4 +2547,41 @@ public Settings onNodeStopped(String nodeName) throws Exception { } }); } + + /** + * Allocate the entire capacity of a circuit breaker on a specific node + * + * @param targetNode The node on which to allocate + * @param breakerName The circuit breaker to allocate + * @return A {@link Releasable} which will de-allocate the amount allocated + */ + protected static Releasable fullyAllocateCircuitBreakerOnNode(String targetNode, String breakerName) { + final var circuitBreaker = internalCluster().getInstance(CircuitBreakerService.class, targetNode).getBreaker(breakerName); + final long totalAllocated = fullyAllocate(circuitBreaker); + return () -> circuitBreaker.addWithoutBreaking(-totalAllocated); + } + + /** + * Fully allocate a circuit breaker + * + * @param circuitBreaker The circuit breaker to allocate + * @return the amount of bytes allocated + */ + private static long fullyAllocate(CircuitBreaker circuitBreaker) { + long allocationSize = 1; + long totalAllocated = 0; + while (true) { + try { + circuitBreaker.addEstimateBytesAndMaybeBreak(allocationSize, "test"); + totalAllocated += allocationSize; + } catch (CircuitBreakingException e) { + circuitBreaker.addWithoutBreaking(allocationSize); + totalAllocated += allocationSize; + break; + } + allocationSize <<= 1; + assert 0 <= allocationSize; + } + return totalAllocated; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestTrustStore.java b/test/framework/src/main/java/org/elasticsearch/test/TestTrustStore.java index 01069b8fb295c..e17a309dbc9c8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestTrustStore.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestTrustStore.java @@ -22,6 +22,7 @@ import java.util.Objects; import static org.apache.lucene.tests.util.LuceneTestCase.createTempDir; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class TestTrustStore extends ExternalResource { @@ -35,6 +36,7 @@ public TestTrustStore(CheckedSupplier pemStreamSupplie private Path trustStorePath; public Path getTrustStorePath() { + assertFalse("Tests in FIPS mode cannot supply a custom trust store", ESTestCase.inFipsJvm()); return Objects.requireNonNullElseGet(trustStorePath, () -> ESTestCase.fail(null, "trust store not created")); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 2d0c43315f746..28e52cbef5df0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -69,7 +69,8 @@ public TransportBulkShardOperationsAction( ExecutorSelector.getWriteExecutorForShard(threadPool), PrimaryActionExecution.RejectOnOverload, indexingPressure, - systemIndices + systemIndices, + ReplicaActionExecution.SubjectToCircuitBreaker ); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/DetectionRuleTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/DetectionRuleTests.java index d716f34f86e6c..127088d82bade 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/DetectionRuleTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/DetectionRuleTests.java @@ -139,6 +139,8 @@ protected DetectionRule mutateInstance(DetectionRule instance) { if (actions.contains(RuleAction.FORCE_TIME_SHIFT) && params.getForceTimeShift() == null) { params = new RuleParams(new RuleParamsForForceTimeShift(randomLong())); + } else if (actions.contains(RuleAction.FORCE_TIME_SHIFT) == false && params.getForceTimeShift() != null) { + params = new RuleParams(); } return new DetectionRule.Builder(conditions).setActions(actions).setScope(scope).setParams(params).build(); diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java index fce2c54c535c9..ec9398358d180 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java @@ -26,6 +26,7 @@ import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.inference.InferenceServiceExtension; import org.elasticsearch.inference.InferenceServiceRegistry; +import org.elasticsearch.node.PluginComponentBinding; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.MapperPlugin; @@ -84,8 +85,8 @@ import org.elasticsearch.xpack.inference.services.huggingface.elser.HuggingFaceElserService; import org.elasticsearch.xpack.inference.services.mistral.MistralService; import org.elasticsearch.xpack.inference.services.openai.OpenAiService; -import org.elasticsearch.xpack.inference.telemetry.InferenceAPMStats; -import org.elasticsearch.xpack.inference.telemetry.StatsMap; +import org.elasticsearch.xpack.inference.telemetry.ApmInferenceStats; +import org.elasticsearch.xpack.inference.telemetry.InferenceStats; import java.util.ArrayList; import java.util.Collection; @@ -196,10 +197,10 @@ public Collection createComponents(PluginServices services) { var actionFilter = new ShardBulkInferenceActionFilter(registry, modelRegistry); shardBulkInferenceActionFilter.set(actionFilter); - var statsFactory = new InferenceAPMStats.Factory(services.telemetryProvider().getMeterRegistry()); - var statsMap = new StatsMap<>(InferenceAPMStats::key, statsFactory::newInferenceRequestAPMCounter); + var meterRegistry = services.telemetryProvider().getMeterRegistry(); + var stats = new PluginComponentBinding<>(InferenceStats.class, ApmInferenceStats.create(meterRegistry)); - return List.of(modelRegistry, registry, httpClientManager, statsMap); + return List.of(modelRegistry, registry, httpClientManager, stats); } @Override diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportInferenceAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportInferenceAction.java index 575697b5d0d39..b7fff3b704695 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportInferenceAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportInferenceAction.java @@ -21,22 +21,26 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.inference.action.InferenceAction; import org.elasticsearch.xpack.inference.registry.ModelRegistry; +import org.elasticsearch.xpack.inference.telemetry.InferenceStats; public class TransportInferenceAction extends HandledTransportAction { private final ModelRegistry modelRegistry; private final InferenceServiceRegistry serviceRegistry; + private final InferenceStats inferenceStats; @Inject public TransportInferenceAction( TransportService transportService, ActionFilters actionFilters, ModelRegistry modelRegistry, - InferenceServiceRegistry serviceRegistry + InferenceServiceRegistry serviceRegistry, + InferenceStats inferenceStats ) { super(InferenceAction.NAME, transportService, actionFilters, InferenceAction.Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); this.modelRegistry = modelRegistry; this.serviceRegistry = serviceRegistry; + this.inferenceStats = inferenceStats; } @Override @@ -76,6 +80,7 @@ protected void doExecute(Task task, InferenceAction.Request request, ActionListe unparsedModel.settings(), unparsedModel.secrets() ); + inferenceStats.incrementRequestCount(model); inferOnService(model, request, service.get(), delegate); }); diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/cohere/embeddings/CohereEmbeddingsModel.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/cohere/embeddings/CohereEmbeddingsModel.java index 538d88a59ca76..fea5226bf9c6f 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/cohere/embeddings/CohereEmbeddingsModel.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/cohere/embeddings/CohereEmbeddingsModel.java @@ -28,7 +28,7 @@ public static CohereEmbeddingsModel of(CohereEmbeddingsModel model, Map serviceSettings, @@ -37,7 +37,7 @@ public CohereEmbeddingsModel( ConfigurationParseContext context ) { this( - modelId, + inferenceId, taskType, service, CohereEmbeddingsServiceSettings.fromMap(serviceSettings, context), diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsServiceSettings.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsServiceSettings.java index d474e935fbda7..6ef1f6f0feefe 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsServiceSettings.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsServiceSettings.java @@ -150,7 +150,7 @@ public OpenAiEmbeddingsServiceSettings( @Nullable RateLimitSettings rateLimitSettings ) { this.uri = uri; - this.modelId = modelId; + this.modelId = Objects.requireNonNull(modelId); this.organizationId = organizationId; this.similarity = similarity; this.dimensions = dimensions; diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/ApmInferenceStats.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/ApmInferenceStats.java new file mode 100644 index 0000000000000..ae14a0792dead --- /dev/null +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/ApmInferenceStats.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.inference.telemetry; + +import org.elasticsearch.inference.Model; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.MeterRegistry; + +import java.util.HashMap; +import java.util.Objects; + +public class ApmInferenceStats implements InferenceStats { + private final LongCounter inferenceAPMRequestCounter; + + public ApmInferenceStats(LongCounter inferenceAPMRequestCounter) { + this.inferenceAPMRequestCounter = Objects.requireNonNull(inferenceAPMRequestCounter); + } + + @Override + public void incrementRequestCount(Model model) { + var service = model.getConfigurations().getService(); + var taskType = model.getTaskType(); + var modelId = model.getServiceSettings().modelId(); + + var attributes = new HashMap(5); + attributes.put("service", service); + attributes.put("task_type", taskType.toString()); + if (modelId != null) { + attributes.put("model_id", modelId); + } + + inferenceAPMRequestCounter.incrementBy(1, attributes); + } + + public static ApmInferenceStats create(MeterRegistry meterRegistry) { + return new ApmInferenceStats( + meterRegistry.registerLongCounter( + "es.inference.requests.count.total", + "Inference API request counts for a particular service, task type, model ID", + "operations" + ) + ); + } +} diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/InferenceAPMStats.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/InferenceAPMStats.java deleted file mode 100644 index 76977fef76045..0000000000000 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/InferenceAPMStats.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.inference.telemetry; - -import org.elasticsearch.inference.Model; -import org.elasticsearch.telemetry.metric.LongCounter; -import org.elasticsearch.telemetry.metric.MeterRegistry; - -import java.util.Map; -import java.util.Objects; - -public class InferenceAPMStats extends InferenceStats { - - private final LongCounter inferenceAPMRequestCounter; - - public InferenceAPMStats(Model model, MeterRegistry meterRegistry) { - super(model); - this.inferenceAPMRequestCounter = meterRegistry.registerLongCounter( - "es.inference.requests.count", - "Inference API request counts for a particular service, task type, model ID", - "operations" - ); - } - - @Override - public void increment() { - super.increment(); - inferenceAPMRequestCounter.incrementBy(1, Map.of("service", service, "task_type", taskType.toString(), "model_id", modelId)); - } - - public static final class Factory { - private final MeterRegistry meterRegistry; - - public Factory(MeterRegistry meterRegistry) { - this.meterRegistry = Objects.requireNonNull(meterRegistry); - } - - public InferenceAPMStats newInferenceRequestAPMCounter(Model model) { - return new InferenceAPMStats(model, meterRegistry); - } - } -} diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/InferenceStats.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/InferenceStats.java index d639f9da71f56..d080e818e45fc 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/InferenceStats.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/InferenceStats.java @@ -8,52 +8,14 @@ package org.elasticsearch.xpack.inference.telemetry; import org.elasticsearch.inference.Model; -import org.elasticsearch.inference.TaskType; -import org.elasticsearch.xpack.core.inference.InferenceRequestStats; -import java.util.Objects; -import java.util.concurrent.atomic.LongAdder; +public interface InferenceStats { -public class InferenceStats implements Stats { - protected final String service; - protected final TaskType taskType; - protected final String modelId; - protected final LongAdder counter = new LongAdder(); + /** + * Increment the counter for a particular value in a thread safe manner. + * @param model the model to increment request count for + */ + void incrementRequestCount(Model model); - public static String key(Model model) { - StringBuilder builder = new StringBuilder(); - builder.append(model.getConfigurations().getService()); - builder.append(":"); - builder.append(model.getTaskType()); - - if (model.getServiceSettings().modelId() != null) { - builder.append(":"); - builder.append(model.getServiceSettings().modelId()); - } - - return builder.toString(); - } - - public InferenceStats(Model model) { - Objects.requireNonNull(model); - - service = model.getConfigurations().getService(); - taskType = model.getTaskType(); - modelId = model.getServiceSettings().modelId(); - } - - @Override - public void increment() { - counter.increment(); - } - - @Override - public long getCount() { - return counter.sum(); - } - - @Override - public InferenceRequestStats toSerializableForm() { - return new InferenceRequestStats(service, taskType, modelId, getCount()); - } + InferenceStats NOOP = model -> {}; } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/Stats.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/Stats.java deleted file mode 100644 index bb1e9c98fc2cb..0000000000000 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/Stats.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.inference.telemetry; - -import org.elasticsearch.xpack.core.inference.SerializableStats; - -public interface Stats { - - /** - * Increase the counter by one. - */ - void increment(); - - /** - * Return the current value of the counter. - * @return the current value of the counter - */ - long getCount(); - - /** - * Convert the object into a serializable form that can be written across nodes and returned in xcontent format. - * @return the serializable format of the object - */ - SerializableStats toSerializableForm(); -} diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/StatsMap.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/StatsMap.java deleted file mode 100644 index 1cfecfb4507d6..0000000000000 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/StatsMap.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.inference.telemetry; - -import org.elasticsearch.xpack.core.inference.SerializableStats; - -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Function; -import java.util.stream.Collectors; - -/** - * A map to provide tracking incrementing statistics. - * - * @param The input to derive the keys and values for the map - * @param The type of the values stored in the map - */ -public class StatsMap { - - private final ConcurrentMap stats = new ConcurrentHashMap<>(); - private final Function keyCreator; - private final Function valueCreator; - - /** - * @param keyCreator a function for creating a key in the map based on the input provided - * @param valueCreator a function for creating a value in the map based on the input provided - */ - public StatsMap(Function keyCreator, Function valueCreator) { - this.keyCreator = Objects.requireNonNull(keyCreator); - this.valueCreator = Objects.requireNonNull(valueCreator); - } - - /** - * Increment the counter for a particular value in a thread safe manner. - * @param input the input to derive the appropriate key in the map - */ - public void increment(Input input) { - var value = stats.computeIfAbsent(keyCreator.apply(input), key -> valueCreator.apply(input)); - value.increment(); - } - - /** - * Build a map that can be serialized. This takes a snapshot of the current state. Any concurrent calls to increment may or may not - * be represented in the resulting serializable map. - * @return a map that is more easily serializable - */ - public Map toSerializableMap() { - return stats.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().toSerializableForm())); - } -} diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/telemetry/ApmInferenceStatsTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/telemetry/ApmInferenceStatsTests.java new file mode 100644 index 0000000000000..1a5aba5f89ad2 --- /dev/null +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/telemetry/ApmInferenceStatsTests.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.inference.telemetry; + +import org.elasticsearch.inference.Model; +import org.elasticsearch.inference.ModelConfigurations; +import org.elasticsearch.inference.ServiceSettings; +import org.elasticsearch.inference.TaskType; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.MeterRegistry; +import org.elasticsearch.test.ESTestCase; + +import java.util.Map; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ApmInferenceStatsTests extends ESTestCase { + + public void testRecordWithModel() { + var longCounter = mock(LongCounter.class); + + var stats = new ApmInferenceStats(longCounter); + + stats.incrementRequestCount(model("service", TaskType.ANY, "modelId")); + + verify(longCounter).incrementBy( + eq(1L), + eq(Map.of("service", "service", "task_type", TaskType.ANY.toString(), "model_id", "modelId")) + ); + } + + public void testRecordWithoutModel() { + var longCounter = mock(LongCounter.class); + + var stats = new ApmInferenceStats(longCounter); + + stats.incrementRequestCount(model("service", TaskType.ANY, null)); + + verify(longCounter).incrementBy(eq(1L), eq(Map.of("service", "service", "task_type", TaskType.ANY.toString()))); + } + + public void testCreation() { + assertNotNull(ApmInferenceStats.create(MeterRegistry.NOOP)); + } + + private Model model(String service, TaskType taskType, String modelId) { + var configuration = mock(ModelConfigurations.class); + when(configuration.getService()).thenReturn(service); + var settings = mock(ServiceSettings.class); + if (modelId != null) { + when(settings.modelId()).thenReturn(modelId); + } + + var model = mock(Model.class); + when(model.getTaskType()).thenReturn(taskType); + when(model.getConfigurations()).thenReturn(configuration); + when(model.getServiceSettings()).thenReturn(settings); + + return model; + } +} diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/telemetry/StatsMapTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/telemetry/StatsMapTests.java deleted file mode 100644 index fcd8d3d7cefbc..0000000000000 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/telemetry/StatsMapTests.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.inference.telemetry; - -import org.elasticsearch.inference.TaskType; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.inference.services.ConfigurationParseContext; -import org.elasticsearch.xpack.inference.services.cohere.embeddings.CohereEmbeddingsModel; -import org.elasticsearch.xpack.inference.services.cohere.embeddings.CohereEmbeddingsServiceSettingsTests; -import org.elasticsearch.xpack.inference.services.cohere.embeddings.CohereEmbeddingsTaskSettingsTests; -import org.elasticsearch.xpack.inference.services.openai.embeddings.OpenAiEmbeddingsModel; -import org.elasticsearch.xpack.inference.services.openai.embeddings.OpenAiEmbeddingsServiceSettingsTests; -import org.elasticsearch.xpack.inference.services.openai.embeddings.OpenAiEmbeddingsTaskSettingsTests; - -import java.util.Map; - -import static org.hamcrest.Matchers.is; - -public class StatsMapTests extends ESTestCase { - public void testAddingEntry_InitializesTheCountToOne() { - var stats = new StatsMap<>(InferenceStats::key, InferenceStats::new); - - stats.increment( - new OpenAiEmbeddingsModel( - "inference_id", - TaskType.TEXT_EMBEDDING, - "openai", - OpenAiEmbeddingsServiceSettingsTests.getServiceSettingsMap("modelId", null, null), - OpenAiEmbeddingsTaskSettingsTests.getTaskSettingsMap(null), - null, - ConfigurationParseContext.REQUEST - ) - ); - - var converted = stats.toSerializableMap(); - - assertThat( - converted, - is( - Map.of( - "openai:text_embedding:modelId", - new org.elasticsearch.xpack.core.inference.InferenceRequestStats("openai", TaskType.TEXT_EMBEDDING, "modelId", 1) - ) - ) - ); - } - - public void testIncrementingWithSeparateModels_IncrementsTheCounterToTwo() { - var stats = new StatsMap<>(InferenceStats::key, InferenceStats::new); - - var model1 = new OpenAiEmbeddingsModel( - "inference_id", - TaskType.TEXT_EMBEDDING, - "openai", - OpenAiEmbeddingsServiceSettingsTests.getServiceSettingsMap("modelId", null, null), - OpenAiEmbeddingsTaskSettingsTests.getTaskSettingsMap(null), - null, - ConfigurationParseContext.REQUEST - ); - - var model2 = new OpenAiEmbeddingsModel( - "inference_id", - TaskType.TEXT_EMBEDDING, - "openai", - OpenAiEmbeddingsServiceSettingsTests.getServiceSettingsMap("modelId", null, null), - OpenAiEmbeddingsTaskSettingsTests.getTaskSettingsMap(null), - null, - ConfigurationParseContext.REQUEST - ); - - stats.increment(model1); - stats.increment(model2); - - var converted = stats.toSerializableMap(); - - assertThat( - converted, - is( - Map.of( - "openai:text_embedding:modelId", - new org.elasticsearch.xpack.core.inference.InferenceRequestStats("openai", TaskType.TEXT_EMBEDDING, "modelId", 2) - ) - ) - ); - } - - public void testNullModelId_ResultsInKeyWithout() { - var stats = new StatsMap<>(InferenceStats::key, InferenceStats::new); - - stats.increment( - new CohereEmbeddingsModel( - "inference_id", - TaskType.TEXT_EMBEDDING, - "cohere", - CohereEmbeddingsServiceSettingsTests.getServiceSettingsMap(null, null, null), - CohereEmbeddingsTaskSettingsTests.getTaskSettingsMap(null, null), - null, - ConfigurationParseContext.REQUEST - ) - ); - - var converted = stats.toSerializableMap(); - - assertThat( - converted, - is( - Map.of( - "cohere:text_embedding", - new org.elasticsearch.xpack.core.inference.InferenceRequestStats("cohere", TaskType.TEXT_EMBEDDING, null, 1) - ) - ) - ); - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 22a9c2dbcc281..c4bf92401be9d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -1283,6 +1283,7 @@ public Collection createComponents(PluginServices services) { clusterService, client, inferenceAuditor, + telemetryProvider.getMeterRegistry(), mlAssignmentNotifier, machineLearningExtension.get().isAnomalyDetectionEnabled(), machineLearningExtension.get().isDataFrameAnalyticsEnabled(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index a1664b7023fc0..2b3ed3f7a656c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.inference.assignment.AdaptiveAllocationsFeatureFlag; @@ -68,6 +69,7 @@ public final class MlInitializationService implements ClusterStateListener { ClusterService clusterService, Client client, InferenceAuditor inferenceAuditor, + MeterRegistry meterRegistry, MlAssignmentNotifier mlAssignmentNotifier, boolean isAnomalyDetectionEnabled, boolean isDataFrameAnalyticsEnabled, @@ -87,7 +89,7 @@ public final class MlInitializationService implements ClusterStateListener { isDataFrameAnalyticsEnabled, isNlpEnabled ), - new AdaptiveAllocationsScalerService(threadPool, clusterService, client, inferenceAuditor, isNlpEnabled), + new AdaptiveAllocationsScalerService(threadPool, clusterService, client, inferenceAuditor, meterRegistry, isNlpEnabled), clusterService ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java index 15f647bc76697..044556d1b30ac 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScaler.java @@ -35,10 +35,15 @@ public class AdaptiveAllocationsScaler { private final KalmanFilter1d inferenceTimeEstimator; private int numberOfAllocations; + private int neededNumberOfAllocations; private Integer minNumberOfAllocations; private Integer maxNumberOfAllocations; private boolean dynamicsChanged; + private Double lastMeasuredRequestRate; + private Double lastMeasuredInferenceTime; + private Long lastMeasuredQueueSize; + AdaptiveAllocationsScaler(String deploymentId, int numberOfAllocations) { this.deploymentId = deploymentId; // A smoothing factor of 100 roughly means the last 100 measurements have an effect @@ -51,9 +56,14 @@ public class AdaptiveAllocationsScaler { requestRateEstimator = new KalmanFilter1d(deploymentId + ":rate", 100, true); inferenceTimeEstimator = new KalmanFilter1d(deploymentId + ":time", 100, false); this.numberOfAllocations = numberOfAllocations; - this.minNumberOfAllocations = null; - this.maxNumberOfAllocations = null; - this.dynamicsChanged = false; + neededNumberOfAllocations = numberOfAllocations; + minNumberOfAllocations = null; + maxNumberOfAllocations = null; + dynamicsChanged = false; + + lastMeasuredRequestRate = null; + lastMeasuredInferenceTime = null; + lastMeasuredQueueSize = null; } void setMinMaxNumberOfAllocations(Integer minNumberOfAllocations, Integer maxNumberOfAllocations) { @@ -62,6 +72,8 @@ void setMinMaxNumberOfAllocations(Integer minNumberOfAllocations, Integer maxNum } void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSeconds, int numberOfAllocations) { + lastMeasuredQueueSize = stats.pendingCount(); + // The request rate (per second) is the request count divided by the time. // Assuming a Poisson process for the requests, the variance in the request // count equals the mean request count, and the variance in the request rate @@ -74,6 +86,7 @@ void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSe double requestRateEstimate = requestRateEstimator.hasValue() ? requestRateEstimator.estimate() : requestRate; double requestRateVariance = Math.max(1.0, requestRateEstimate * timeIntervalSeconds) / Math.pow(timeIntervalSeconds, 2); requestRateEstimator.add(requestRate, requestRateVariance, false); + lastMeasuredRequestRate = requestRate; if (stats.requestCount() > 0 && Double.isNaN(stats.inferenceTime()) == false) { // The inference time distribution is unknown. For simplicity, we assume @@ -86,6 +99,9 @@ void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSe double inferenceTimeEstimate = inferenceTimeEstimator.hasValue() ? inferenceTimeEstimator.estimate() : inferenceTime; double inferenceTimeVariance = Math.pow(inferenceTimeEstimate, 2) / stats.requestCount(); inferenceTimeEstimator.add(inferenceTime, inferenceTimeVariance, dynamicsChanged); + lastMeasuredInferenceTime = inferenceTime; + } else { + lastMeasuredInferenceTime = null; } this.numberOfAllocations = numberOfAllocations; @@ -104,6 +120,14 @@ void process(AdaptiveAllocationsScalerService.Stats stats, double timeIntervalSe return requestRateUpper * inferenceTimeUpper; } + Double getRequestRateEstimate() { + return requestRateEstimator.hasValue() ? requestRateEstimator.estimate() : null; + } + + Double getInferenceTimeEstimate() { + return inferenceTimeEstimator.hasValue() ? inferenceTimeEstimator.estimate() : null; + } + Integer scale() { if (requestRateEstimator.hasValue() == false) { return null; @@ -121,6 +145,8 @@ Integer scale() { numberOfAllocations--; } + this.neededNumberOfAllocations = numberOfAllocations; + if (maxNumberOfAllocations == null) { numberOfAllocations = Math.min(numberOfAllocations, MAX_NUMBER_OF_ALLOCATIONS_SAFEGUARD); } @@ -161,4 +187,28 @@ Integer scale() { return null; } } + + public String getDeploymentId() { + return deploymentId; + } + + public long getNumberOfAllocations() { + return numberOfAllocations; + } + + public long getNeededNumberOfAllocations() { + return neededNumberOfAllocations; + } + + public Double getLastMeasuredRequestRate() { + return lastMeasuredRequestRate; + } + + public Double getLastMeasuredInferenceTime() { + return lastMeasuredInferenceTime; + } + + public Long getLastMeasuredQueueSize() { + return lastMeasuredQueueSize; + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java index 30e3871ad5ad0..063ecae3726b1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java @@ -19,6 +19,9 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.telemetry.metric.DoubleWithAttributes; +import org.elasticsearch.telemetry.metric.LongWithAttributes; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; @@ -30,11 +33,15 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.notifications.InferenceAuditor; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; /** * Periodically schedules adaptive allocations scaling. This process consists @@ -75,6 +82,108 @@ Stats sub(Stats value) { } } + private class Metrics { + + private final List metrics = new ArrayList<>(); + + Metrics() {} + + void init() { + if (metrics.isEmpty() == false) { + return; + } + metrics.add( + meterRegistry.registerLongsGauge( + "es.ml.trained_models.adaptive_allocations.actual_number_of_allocations.current", + "the actual number of allocations", + "", + () -> observeLong(AdaptiveAllocationsScaler::getNumberOfAllocations) + ) + ); + metrics.add( + meterRegistry.registerLongsGauge( + "es.ml.trained_models.adaptive_allocations.needed_number_of_allocations.current", + "the number of allocations needed according to the adaptive allocations scaler", + "", + () -> observeLong(AdaptiveAllocationsScaler::getNeededNumberOfAllocations) + ) + ); + metrics.add( + meterRegistry.registerDoublesGauge( + "es.ml.trained_models.adaptive_allocations.measured_request_rate.current", + "the request rate reported by the stats API", + "1/s", + () -> observeDouble(AdaptiveAllocationsScaler::getLastMeasuredRequestRate) + ) + ); + metrics.add( + meterRegistry.registerDoublesGauge( + "es.ml.trained_models.adaptive_allocations.estimated_request_rate.current", + "the request rate estimated by the adaptive allocations scaler", + "1/s", + () -> observeDouble(AdaptiveAllocationsScaler::getRequestRateEstimate) + ) + ); + metrics.add( + meterRegistry.registerDoublesGauge( + "es.ml.trained_models.adaptive_allocations.measured_inference_time.current", + "the inference time reported by the stats API", + "s", + () -> observeDouble(AdaptiveAllocationsScaler::getLastMeasuredInferenceTime) + ) + ); + metrics.add( + meterRegistry.registerDoublesGauge( + "es.ml.trained_models.adaptive_allocations.estimated_inference_time.current", + "the inference time estimated by the adaptive allocations scaler", + "s", + () -> observeDouble(AdaptiveAllocationsScaler::getInferenceTimeEstimate) + ) + ); + metrics.add( + meterRegistry.registerLongsGauge( + "es.ml.trained_models.adaptive_allocations.queue_size.current", + "the queue size reported by the stats API", + "s", + () -> observeLong(AdaptiveAllocationsScaler::getLastMeasuredQueueSize) + ) + ); + } + + Collection observeLong(Function getValue) { + List observations = new ArrayList<>(); + for (AdaptiveAllocationsScaler scaler : scalers.values()) { + Long value = getValue.apply(scaler); + if (value != null) { + observations.add(new LongWithAttributes(value, Map.of("deployment_id", scaler.getDeploymentId()))); + } + } + return observations; + } + + Collection observeDouble(Function getValue) { + List observations = new ArrayList<>(); + for (AdaptiveAllocationsScaler scaler : scalers.values()) { + Double value = getValue.apply(scaler); + if (value != null) { + observations.add(new DoubleWithAttributes(value, Map.of("deployment_id", scaler.getDeploymentId()))); + } + } + return observations; + } + + void close() { + for (AutoCloseable metric : metrics) { + try { + metric.close(); + } catch (Exception e) { + // do nothing + } + } + metrics.clear(); + } + } + /** * The time interval between the adaptive allocations triggers. */ @@ -92,6 +201,8 @@ Stats sub(Stats value) { private final ClusterService clusterService; private final Client client; private final InferenceAuditor inferenceAuditor; + private final MeterRegistry meterRegistry; + private final Metrics metrics; private final boolean isNlpEnabled; private final Map> lastInferenceStatsByDeploymentAndNode; private Long lastInferenceStatsTimestampMillis; @@ -106,9 +217,10 @@ public AdaptiveAllocationsScalerService( ClusterService clusterService, Client client, InferenceAuditor inferenceAuditor, + MeterRegistry meterRegistry, boolean isNlpEnabled ) { - this(threadPool, clusterService, client, inferenceAuditor, isNlpEnabled, DEFAULT_TIME_INTERVAL_SECONDS); + this(threadPool, clusterService, client, inferenceAuditor, meterRegistry, isNlpEnabled, DEFAULT_TIME_INTERVAL_SECONDS); } // visible for testing @@ -117,6 +229,7 @@ public AdaptiveAllocationsScalerService( ClusterService clusterService, Client client, InferenceAuditor inferenceAuditor, + MeterRegistry meterRegistry, boolean isNlpEnabled, int timeIntervalSeconds ) { @@ -124,6 +237,7 @@ public AdaptiveAllocationsScalerService( this.clusterService = clusterService; this.client = client; this.inferenceAuditor = inferenceAuditor; + this.meterRegistry = meterRegistry; this.isNlpEnabled = isNlpEnabled; this.timeIntervalSeconds = timeIntervalSeconds; @@ -131,11 +245,13 @@ public AdaptiveAllocationsScalerService( lastInferenceStatsTimestampMillis = null; lastScaleUpTimesMillis = new HashMap<>(); scalers = new HashMap<>(); + metrics = new Metrics(); busy = new AtomicBoolean(false); } public synchronized void start() { updateAutoscalers(clusterService.state()); + metrics.init(); clusterService.addListener(this); if (scalers.isEmpty() == false) { startScheduling(); @@ -144,6 +260,7 @@ public synchronized void start() { public synchronized void stop() { stopScheduling(); + metrics.close(); } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 2f251e3b0aee6..a5b9597886e15 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.inference.adaptiveallocations.AdaptiveAllocationsScalerService; @@ -40,6 +41,7 @@ public class MlInitializationServiceTests extends ESTestCase { private ClusterService clusterService; private Client client; private InferenceAuditor inferenceAuditor; + private MeterRegistry meterRegistry; private MlAssignmentNotifier mlAssignmentNotifier; @Before @@ -49,6 +51,7 @@ public void setUpMocks() { clusterService = mock(ClusterService.class); client = mock(Client.class); inferenceAuditor = mock(InferenceAuditor.class); + meterRegistry = mock(MeterRegistry.class); mlAssignmentNotifier = mock(MlAssignmentNotifier.class); when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME); @@ -75,6 +78,7 @@ public void testInitialize() { clusterService, client, inferenceAuditor, + meterRegistry, mlAssignmentNotifier, true, true, @@ -91,6 +95,7 @@ public void testInitialize_noMasterNode() { clusterService, client, inferenceAuditor, + meterRegistry, mlAssignmentNotifier, true, true, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java index 3ad44f256dc66..4aaddc91231f3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ScalingExecutorBuilder; import org.elasticsearch.threadpool.TestThreadPool; @@ -55,6 +56,7 @@ public class AdaptiveAllocationsScalerServiceTests extends ESTestCase { private ClusterService clusterService; private Client client; private InferenceAuditor inferenceAuditor; + private MeterRegistry meterRegistry; @Override @Before @@ -66,6 +68,7 @@ public void setUp() throws Exception { clusterService = mock(ClusterService.class); client = mock(Client.class); inferenceAuditor = mock(InferenceAuditor.class); + meterRegistry = mock(MeterRegistry.class); } @Override @@ -156,6 +159,7 @@ public void test() throws IOException { clusterService, client, inferenceAuditor, + meterRegistry, true, 1 ); diff --git a/x-pack/plugin/repositories-metering-api/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java b/x-pack/plugin/repositories-metering-api/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java index 0e7eeb965d1f3..3a66854191088 100644 --- a/x-pack/plugin/repositories-metering-api/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java +++ b/x-pack/plugin/repositories-metering-api/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Booleans; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestTrustStore; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.xpack.repositories.metering.AbstractRepositoriesMeteringAPIRestTestCase; @@ -28,7 +29,9 @@ public class AzureRepositoriesMeteringIT extends AbstractRepositoriesMeteringAPI private static final String AZURE_TEST_SASTOKEN = System.getProperty("test.azure.sas_token"); private static AzureHttpFixture fixture = new AzureHttpFixture( - USE_FIXTURE ? AzureHttpFixture.Protocol.HTTPS : AzureHttpFixture.Protocol.NONE, + USE_FIXTURE + ? ESTestCase.inFipsJvm() ? AzureHttpFixture.Protocol.HTTP : AzureHttpFixture.Protocol.HTTPS + : AzureHttpFixture.Protocol.NONE, AZURE_TEST_ACCOUNT, AZURE_TEST_CONTAINER, AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_TEST_ACCOUNT) @@ -57,7 +60,11 @@ public class AzureRepositoriesMeteringIT extends AbstractRepositoriesMeteringAPI () -> "ignored;DefaultEndpointsProtocol=https;BlobEndpoint=" + fixture.getAddress(), s -> USE_FIXTURE ) - .systemProperty("javax.net.ssl.trustStore", () -> trustStore.getTrustStorePath().toString(), s -> USE_FIXTURE) + .systemProperty( + "javax.net.ssl.trustStore", + () -> trustStore.getTrustStorePath().toString(), + s -> USE_FIXTURE && ESTestCase.inFipsJvm() == false + ) .build(); @ClassRule(order = 1) diff --git a/x-pack/plugin/searchable-snapshots/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/AzureSearchableSnapshotsIT.java b/x-pack/plugin/searchable-snapshots/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/AzureSearchableSnapshotsIT.java index d86632d77b51c..d2cdef121fe40 100644 --- a/x-pack/plugin/searchable-snapshots/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/AzureSearchableSnapshotsIT.java +++ b/x-pack/plugin/searchable-snapshots/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/AzureSearchableSnapshotsIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Booleans; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestTrustStore; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.rest.ESRestTestCase; @@ -29,7 +30,9 @@ public class AzureSearchableSnapshotsIT extends AbstractSearchableSnapshotsRestT private static final String AZURE_TEST_SASTOKEN = System.getProperty("test.azure.sas_token"); private static AzureHttpFixture fixture = new AzureHttpFixture( - USE_FIXTURE ? AzureHttpFixture.Protocol.HTTPS : AzureHttpFixture.Protocol.NONE, + USE_FIXTURE + ? ESTestCase.inFipsJvm() ? AzureHttpFixture.Protocol.HTTP : AzureHttpFixture.Protocol.HTTPS + : AzureHttpFixture.Protocol.NONE, AZURE_TEST_ACCOUNT, AZURE_TEST_CONTAINER, AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_TEST_ACCOUNT) diff --git a/x-pack/plugin/snapshot-based-recoveries/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/AzureSnapshotBasedRecoveryIT.java b/x-pack/plugin/snapshot-based-recoveries/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/AzureSnapshotBasedRecoveryIT.java index 8895574f85d02..bac69158a860c 100644 --- a/x-pack/plugin/snapshot-based-recoveries/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/AzureSnapshotBasedRecoveryIT.java +++ b/x-pack/plugin/snapshot-based-recoveries/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/AzureSnapshotBasedRecoveryIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Booleans; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestTrustStore; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.junit.ClassRule; @@ -28,7 +29,9 @@ public class AzureSnapshotBasedRecoveryIT extends AbstractSnapshotBasedRecoveryR private static final String AZURE_TEST_SASTOKEN = System.getProperty("test.azure.sas_token"); private static AzureHttpFixture fixture = new AzureHttpFixture( - USE_FIXTURE ? AzureHttpFixture.Protocol.HTTPS : AzureHttpFixture.Protocol.NONE, + USE_FIXTURE + ? ESTestCase.inFipsJvm() ? AzureHttpFixture.Protocol.HTTP : AzureHttpFixture.Protocol.HTTPS + : AzureHttpFixture.Protocol.NONE, AZURE_TEST_ACCOUNT, AZURE_TEST_CONTAINER, AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_TEST_ACCOUNT) @@ -59,7 +62,11 @@ public class AzureSnapshotBasedRecoveryIT extends AbstractSnapshotBasedRecoveryR s -> USE_FIXTURE ) .setting("xpack.license.self_generated.type", "trial") - .systemProperty("javax.net.ssl.trustStore", () -> trustStore.getTrustStorePath().toString(), s -> USE_FIXTURE) + .systemProperty( + "javax.net.ssl.trustStore", + () -> trustStore.getTrustStorePath().toString(), + s -> USE_FIXTURE && ESTestCase.inFipsJvm() == false + ) .build(); @ClassRule(order = 1) diff --git a/x-pack/plugin/snapshot-repo-test-kit/qa/azure/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/AzureSnapshotRepoTestKitIT.java b/x-pack/plugin/snapshot-repo-test-kit/qa/azure/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/AzureSnapshotRepoTestKitIT.java index 7451f37cd0e40..d0b5fca158472 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/qa/azure/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/AzureSnapshotRepoTestKitIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/qa/azure/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/AzureSnapshotRepoTestKitIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Booleans; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestTrustStore; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.junit.ClassRule; @@ -27,7 +28,9 @@ public class AzureSnapshotRepoTestKitIT extends AbstractSnapshotRepoTestKitRestT private static final String AZURE_TEST_SASTOKEN = System.getProperty("test.azure.sas_token"); private static AzureHttpFixture fixture = new AzureHttpFixture( - USE_FIXTURE ? AzureHttpFixture.Protocol.HTTPS : AzureHttpFixture.Protocol.NONE, + USE_FIXTURE + ? ESTestCase.inFipsJvm() ? AzureHttpFixture.Protocol.HTTP : AzureHttpFixture.Protocol.HTTPS + : AzureHttpFixture.Protocol.NONE, AZURE_TEST_ACCOUNT, AZURE_TEST_CONTAINER, AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_TEST_ACCOUNT) @@ -62,7 +65,11 @@ public class AzureSnapshotRepoTestKitIT extends AbstractSnapshotRepoTestKitRestT c.systemProperty("test.repository_test_kit.skip_cas", "true"); } }) - .systemProperty("javax.net.ssl.trustStore", () -> trustStore.getTrustStorePath().toString(), s -> USE_FIXTURE) + .systemProperty( + "javax.net.ssl.trustStore", + () -> trustStore.getTrustStorePath().toString(), + s -> USE_FIXTURE && ESTestCase.inFipsJvm() == false + ) .build(); @ClassRule(order = 1) diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java index f0d4eaec7c848..73a90f247810e 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -40,7 +41,6 @@ import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; -import org.elasticsearch.test.junit.annotations.TestIssueLogging; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.junit.Before; @@ -58,12 +58,14 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC; import static org.elasticsearch.repositories.blobstore.testkit.ContendedRegisterAnalyzeAction.bytesFromLong; import static org.elasticsearch.repositories.blobstore.testkit.ContendedRegisterAnalyzeAction.longFromBytes; import static org.hamcrest.Matchers.allOf; @@ -87,9 +89,26 @@ protected Collection> nodePlugins() { return List.of(TestPlugin.class, LocalStateCompositeXPackPlugin.class, SnapshotRepositoryTestKit.class); } + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + // no throttling, so that even analyses which run to completion do not take too long + .put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), ByteSizeValue.ZERO) + .build(); + } + @Before public void createBlobStore() { - createRepositoryNoVerify("test-repo", TestPlugin.DISRUPTABLE_REPO_TYPE); + createRepository( + "test-repo", + TestPlugin.DISRUPTABLE_REPO_TYPE, + randomRepositorySettings() + // no throttling, so that even analyses which run to completion do not take too long + .put(MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO) + .put(MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO), + false + ); blobStore = new DisruptableBlobStore(); for (final RepositoriesService repositoriesService : internalCluster().getInstances(RepositoriesService.class)) { @@ -105,8 +124,7 @@ public void testSuccess() { final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo"); request.blobCount(1); request.maxBlobSize(ByteSizeValue.ofBytes(10L)); - - analyseRepository(request); + safeAwait((ActionListener l) -> analyseRepository(request, l)); } public void testFailsOnReadError() { @@ -125,7 +143,8 @@ public byte[] onRead(byte[] actualContents, long position, long length) throws I } }); - final Exception exception = expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + final Exception exception = analyseRepositoryExpectFailure(request); + assertAnalysisFailureMessage(exception.getMessage()); final IOException ioException = (IOException) ExceptionsHelper.unwrap(exception, IOException.class); assert ioException != null : exception; assertThat(ioException.getMessage(), equalTo("simulated")); @@ -149,7 +168,7 @@ public byte[] onRead(byte[] actualContents, long position, long length) { } }); - expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage()); } public void testFailsOnChecksumMismatch() { @@ -180,7 +199,7 @@ public byte[] onRead(byte[] actualContents, long position, long length) { } }); - expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage()); } public void testFailsOnWriteException() { @@ -201,7 +220,8 @@ public void onWrite() throws IOException { }); - final Exception exception = expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + final Exception exception = analyseRepositoryExpectFailure(request); + assertAnalysisFailureMessage(exception.getMessage()); final IOException ioException = (IOException) ExceptionsHelper.unwrap(exception, IOException.class); assert ioException != null : exception; assertThat(ioException.getMessage(), equalTo("simulated")); @@ -223,7 +243,7 @@ public Map onList(Map actualListing) }); - expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage()); } public void testFailsOnListingException() { @@ -243,7 +263,7 @@ public Map onList(Map actualListing) } }); - expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage()); } public void testFailsOnDeleteException() { @@ -258,7 +278,7 @@ public void onDelete() throws IOException { } }); - expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage()); } public void testFailsOnIncompleteDelete() { @@ -286,7 +306,7 @@ public Map onList(Map actualListing) } }); - expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage()); } public void testFailsIfBlobCreatedOnAbort() { @@ -303,12 +323,21 @@ public boolean createBlobOnAbort() { } }); - try { - analyseRepository(request); - assertFalse(writeWasAborted.get()); - } catch (RepositoryVerificationException e) { - assertTrue(writeWasAborted.get()); - } + safeAwait((ActionListener l) -> analyseRepository(request, l.delegateResponse((ll, e) -> { + if (ExceptionsHelper.unwrapCause(e) instanceof RepositoryVerificationException repositoryVerificationException) { + assertAnalysisFailureMessage(repositoryVerificationException.getMessage()); + assertTrue( + "did not abort write, so why did the verification fail?", + // clear flag for final assertion + writeWasAborted.compareAndSet(true, false) + ); + ll.onResponse(null); + } else { + ll.onFailure(e); + } + }))); + + assertFalse(writeWasAborted.get()); } public void testFailsIfRegisterIncorrect() { @@ -325,7 +354,7 @@ public BytesReference onContendedCompareAndExchange(BytesRegister register, Byte return register.compareAndExchange(expected, updated); } }); - expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage()); } public void testFailsIfRegisterHoldsSpuriousValue() { @@ -355,14 +384,22 @@ public BytesReference onContendedCompareAndExchange(BytesRegister register, Byte return register.compareAndExchange(expected, updated); } }); - try { - analyseRepository(request); - assertFalse(sawSpuriousValue.get()); - } catch (RepositoryVerificationException e) { - if (sawSpuriousValue.get() == false) { - fail(e, "did not see spurious value, so why did the verification fail?"); + + safeAwait((ActionListener l) -> analyseRepository(request, l.delegateResponse((ll, e) -> { + if (ExceptionsHelper.unwrapCause(e) instanceof RepositoryVerificationException repositoryVerificationException) { + assertAnalysisFailureMessage(repositoryVerificationException.getMessage()); + assertTrue( + "did not see spurious value, so why did the verification fail?", + // clear flag for final assertion + sawSpuriousValue.compareAndSet(true, false) + ); + ll.onResponse(null); + } else { + ll.onFailure(e); } - } + }))); + + assertFalse(sawSpuriousValue.get()); } private static void assertAnalysisFailureMessage(String message) { @@ -387,7 +424,7 @@ public boolean compareAndExchangeReturnsWitness(String key) { return isContendedRegisterKey(key) == false; } }); - final var exception = expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + final var exception = analyseRepositoryExpectFailure(request); assertThat( exception.getMessage(), allOf( @@ -409,7 +446,7 @@ public boolean compareAndExchangeReturnsWitness(String key) { return false; } }); - final var exception = expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + final var exception = analyseRepositoryExpectFailure(request); assertAnalysisFailureMessage(exception.getMessage()); assertThat( asInstanceOf(RepositoryVerificationException.class, ExceptionsHelper.unwrapCause(exception.getCause())).getMessage(), @@ -417,10 +454,6 @@ public boolean compareAndExchangeReturnsWitness(String key) { ); } - @TestIssueLogging( - issueUrl = "https://github.com/elastic/elasticsearch/issues/111343", - value = "org.elasticsearch.repositories.blobstore.testkit:TRACE" - ) public void testFailsIfEmptyRegisterRejected() { final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo"); blobStore.setDisruption(new Disruption() { @@ -429,7 +462,7 @@ public boolean acceptsEmptyRegister() { return false; } }); - final var exception = expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + final var exception = analyseRepositoryExpectFailure(request); assertAnalysisFailureMessage(exception.getMessage()); final var cause = ExceptionsHelper.unwrapCause(exception.getCause()); if (cause instanceof IOException ioException) { @@ -445,8 +478,18 @@ public boolean acceptsEmptyRegister() { } } - private void analyseRepository(RepositoryAnalyzeAction.Request request) { - client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(5L, TimeUnit.MINUTES); + private RepositoryVerificationException analyseRepositoryExpectFailure(RepositoryAnalyzeAction.Request request) { + return asInstanceOf( + RepositoryVerificationException.class, + ExceptionsHelper.unwrapCause(safeAwaitFailure(RepositoryAnalyzeAction.Response.class, l -> analyseRepository(request, l))) + ); + } + + private void analyseRepository(RepositoryAnalyzeAction.Request request, ActionListener listener) { + client().execute(RepositoryAnalyzeAction.INSTANCE, request, listener.delegateFailureAndWrap((l, response) -> { + RepositoryAnalysisSuccessIT.assertNoThrottling(response); + l.onResponse(response); + })); } private static void assertPurpose(OperationPurpose purpose) { diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java index 47b44f41f72d2..e4d9bf9041b4a 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -26,7 +27,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; @@ -38,6 +38,7 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ObjectPath; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.junit.Before; @@ -51,15 +52,18 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_RESTORE_BYTES_PER_SEC; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_SNAPSHOT_BYTES_PER_SEC; import static org.elasticsearch.repositories.blobstore.testkit.ContendedRegisterAnalyzeAction.longFromBytes; import static org.elasticsearch.repositories.blobstore.testkit.RepositoryAnalysisFailureIT.isContendedRegisterKey; +import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; @@ -83,9 +87,21 @@ protected Collection> nodePlugins() { return List.of(TestPlugin.class, LocalStateCompositeXPackPlugin.class, SnapshotRepositoryTestKit.class); } + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), ByteSizeValue.ZERO) + .build(); + } + public void testRepositoryAnalysis() { - final Settings.Builder settings = Settings.builder(); + final Settings.Builder settings = Settings.builder() + // no throttling + .put(MAX_SNAPSHOT_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO) + .put(MAX_RESTORE_BYTES_PER_SEC.getKey(), ByteSizeValue.ZERO); + if (randomBoolean()) { settings.put(BASE_PATH_SETTING_KEY, randomAlphaOfLength(10)); } @@ -136,11 +152,22 @@ public void testRepositoryAnalysis() { blobStore.setMaxTotalBlobSize(request.getMaxTotalDataSize().getBytes()); } - request.timeout(TimeValue.timeValueSeconds(20)); - - client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(30L, TimeUnit.SECONDS); + request.timeout(SAFE_AWAIT_TIMEOUT); + final RepositoryAnalyzeAction.Response response = safeAwait(l -> client().execute(RepositoryAnalyzeAction.INSTANCE, request, l)); assertThat(blobStore.currentPath, nullValue()); + + assertNoThrottling(response); + } + + static void assertNoThrottling(RepositoryAnalyzeAction.Response response) { + try { + final var responseMap = convertToMap(response); + assertEquals(Strings.toString(response), 0, (int) ObjectPath.eval("summary.write.total_throttled_nanos", responseMap)); + assertEquals(Strings.toString(response), 0, (int) ObjectPath.eval("summary.read.total_throttled_nanos", responseMap)); + } catch (IOException e) { + fail(e); + } } public static class TestPlugin extends Plugin implements RepositoryPlugin { diff --git a/x-pack/plugin/sql/qa/jdbc/single-node/src/javaRestTest/java/org/elasticsearch/xpack/sql/qa/jdbc/single_node/JdbcShardFailureIT.java b/x-pack/plugin/sql/qa/jdbc/single-node/src/javaRestTest/java/org/elasticsearch/xpack/sql/qa/jdbc/single_node/JdbcShardFailureIT.java index f83047411f0b0..0e0f7dc9722d9 100644 --- a/x-pack/plugin/sql/qa/jdbc/single-node/src/javaRestTest/java/org/elasticsearch/xpack/sql/qa/jdbc/single_node/JdbcShardFailureIT.java +++ b/x-pack/plugin/sql/qa/jdbc/single-node/src/javaRestTest/java/org/elasticsearch/xpack/sql/qa/jdbc/single_node/JdbcShardFailureIT.java @@ -76,7 +76,7 @@ public void createTestIndex() throws IOException { public void testPartialResponseHandling() throws SQLException { try (Connection c = esJdbc(); Statement s = c.createStatement()) { SQLException exception = expectThrows(SQLException.class, () -> s.executeQuery("SELECT * FROM test ORDER BY test_field ASC")); - assertThat(exception.getMessage(), containsString("Search rejected due to missing shards")); + assertThat(exception.getMessage(), containsString("[open_point_in_time] action requires all shards to be available")); } } } diff --git a/x-pack/plugin/sql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcShardFailureIT.java b/x-pack/plugin/sql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcShardFailureIT.java index dc9989b26c3b2..91f3ab029f55c 100644 --- a/x-pack/plugin/sql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcShardFailureIT.java +++ b/x-pack/plugin/sql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcShardFailureIT.java @@ -89,7 +89,7 @@ public void testPartialResponseHandling() throws Exception { createTestIndex(); try (Connection c = esJdbc(); Statement s = c.createStatement()) { SQLException exception = expectThrows(SQLException.class, () -> s.executeQuery("SELECT * FROM test ORDER BY test_field ASC")); - assertThat(exception.getMessage(), containsString("Search rejected due to missing shards")); + assertThat(exception.getMessage(), containsString("[open_point_in_time] action requires all shards to be available")); } }