diff --git a/docs/changelog/111290.yaml b/docs/changelog/111290.yaml new file mode 100644 index 0000000000000..efcb01a4aedf9 --- /dev/null +++ b/docs/changelog/111290.yaml @@ -0,0 +1,5 @@ +pr: 111290 +summary: Fix enrich policy runner exception handling on empty segments response +area: Ingest Node +type: bug +issues: [] diff --git a/docs/changelog/111311.yaml b/docs/changelog/111311.yaml new file mode 100644 index 0000000000000..5786e11e885e2 --- /dev/null +++ b/docs/changelog/111311.yaml @@ -0,0 +1,6 @@ +pr: 111311 +summary: Adding support for data streams with a match-all template +area: Data streams +type: bug +issues: + - 111204 diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml index 35e3f38d55c26..39558d12b56cd 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml @@ -1150,3 +1150,94 @@ setup: name: simple-data-stream2 - is_true: acknowledged +--- +"Create data stream with match all template": + - requires: + cluster_features: ["gte_v8.16.0"] + reason: "data streams supoprt for match all templates only supported in 8.16" + + - do: + allowed_warnings: + - "index template [match-all-template] has index patterns [*] matching patterns from existing older templates [.monitoring-logstash,.monitoring-es,.monitoring-beats,.monitoring-alerts-7,.monitoring-kibana] with patterns (.monitoring-logstash => [.monitoring-logstash-7-*],.monitoring-es => [.monitoring-es-7-*],.monitoring-beats => [.monitoring-beats-7-*],.monitoring-alerts-7 => [.monitoring-alerts-7],.monitoring-kibana => [.monitoring-kibana-7-*]); this template [match-all-template] will take precedence during new index creation" + indices.put_index_template: + name: match-all-template + body: + index_patterns: [ "*" ] + priority: 1 + data_stream: {} + + - do: + indices.create_data_stream: + name: match-all-data-stream + - is_true: acknowledged + + - do: + cluster.health: + wait_for_status: green + + - do: + indices.get_data_stream: + name: "*" + - match: { data_streams.0.name: match-all-data-stream } + - match: { data_streams.0.generation: 1 } + - length: { data_streams.0.indices: 1 } + - match: { data_streams.0.indices.0.index_name: '/\.ds-match-all-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + - match: { data_streams.0.status: 'GREEN' } + - match: { data_streams.0.template: 'match-all-template' } + - match: { data_streams.0.hidden: false } + + - do: + indices.delete_data_stream: + name: match-all-data-stream + - is_true: acknowledged + + - do: + indices.delete_index_template: + name: match-all-template + - is_true: acknowledged + +--- +"Create hidden data stream with match all template": + - requires: + cluster_features: [ "gte_v8.16.0" ] + reason: "data streams supoprt for match all templates only supported in 8.16" + - do: + allowed_warnings: + - "index template [match-all-hidden-template] has index patterns [*] matching patterns from existing older templates [.monitoring-logstash,.monitoring-es,.monitoring-beats,.monitoring-alerts-7,.monitoring-kibana] with patterns (.monitoring-logstash => [.monitoring-logstash-7-*],.monitoring-es => [.monitoring-es-7-*],.monitoring-beats => [.monitoring-beats-7-*],.monitoring-alerts-7 => [.monitoring-alerts-7],.monitoring-kibana => [.monitoring-kibana-7-*]); this template [match-all-hidden-template] will take precedence during new index creation" + indices.put_index_template: + name: match-all-hidden-template + body: + index_patterns: [ "*" ] + priority: 1 + data_stream: + hidden: true + - do: + indices.create_data_stream: + name: match-all-hidden-data-stream + - is_true: acknowledged + + - do: + cluster.health: + wait_for_status: green + + - do: + indices.get_data_stream: + name: "*" + - length: { data_streams: 0 } + + - do: + indices.get_data_stream: + name: ['*'] + expand_wildcards: hidden + - length: { data_streams: 1 } + - match: { data_streams.0.name: match-all-hidden-data-stream } + - match: { data_streams.0.hidden: true } + + - do: + indices.delete_data_stream: + name: match-all-hidden-data-stream + - is_true: acknowledged + + - do: + indices.delete_index_template: + name: match-all-hidden-template diff --git a/muted-tests.yml b/muted-tests.yml index 2ba65aceff1d3..0d31ecb149c22 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -116,6 +116,10 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/111308 - class: org.elasticsearch.xpack.ml.integration.DatafeedJobsRestIT issue: https://github.com/elastic/elasticsearch/issues/111319 +- class: org.elasticsearch.xpack.searchablesnapshots.AzureSearchableSnapshotsIT + issue: https://github.com/elastic/elasticsearch/issues/111279 +- class: org.elasticsearch.repositories.azure.RepositoryAzureClientYamlTestSuiteIT + issue: https://github.com/elastic/elasticsearch/issues/111345 # Examples: # diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java index bd12cfdbc7962..429ebe365bbe1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java @@ -36,7 +36,7 @@ public class IndicesSegmentResponse extends ChunkedBroadcastResponse { private volatile Map indicesSegments; - IndicesSegmentResponse( + public IndicesSegmentResponse( ShardSegments[] shards, int totalShards, int successfulShards, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index e9658e71f895e..a84c1d4a782f4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -1309,7 +1309,12 @@ static List> findV2CandidateTemplates(Met for (Map.Entry entry : metadata.templatesV2().entrySet()) { final String name = entry.getKey(); final ComposableIndexTemplate template = entry.getValue(); - if (isHidden == false) { + /* + * We do not ordinarily return match-all templates for hidden indices. But all backing indices for data streams are hidden, + * and we do want to return even match-all templates for those. Not doing so can result in a situation where a data stream is + * built with a template that none of its indices match. + */ + if (isHidden == false || template.getDataStreamTemplate() != null) { final boolean matched = template.indexPatterns().stream().anyMatch(patternMatchPredicate); if (matched) { candidates.add(Tuple.tuple(name, template)); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java index d9c01953fbaab..eb00b8bf59594 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java @@ -2158,6 +2158,23 @@ public void testDataStreamsUsingTemplates() throws Exception { MetadataIndexTemplateService.innerRemoveIndexTemplateV2(stateWithTwoTemplates, "logs"); } + public void testDataStreamsUsingMatchAllTemplate() throws Exception { + ClusterState state = ClusterState.EMPTY_STATE; + final MetadataIndexTemplateService service = getMetadataIndexTemplateService(); + + ComposableIndexTemplate template = ComposableIndexTemplate.builder() + .indexPatterns(Collections.singletonList("*")) + .priority(100L) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(); + final String templateName = "all-data-streams-template"; + state = service.addIndexTemplateV2(state, false, templateName, template); + // When creating a data stream, we'll look for templates. The data stream is not hidden + assertThat(MetadataIndexTemplateService.findV2Template(state.metadata(), "some-data-stream", false), equalTo(templateName)); + // The write index for a data stream will be a hidden index. We need to make sure it matches the same template: + assertThat(MetadataIndexTemplateService.findV2Template(state.metadata(), "some-data-stream", true), equalTo(templateName)); + } + public void testRemovingHigherOrderTemplateOfDataStreamWithMultipleTemplates() throws Exception { ClusterState state = ClusterState.EMPTY_STATE; final MetadataIndexTemplateService service = getMetadataIndexTemplateService(); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index ca00f49100279..810fd03f061ea 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -25,11 +25,11 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.segments.IndexSegments; import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; -import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.FilterClient; import org.elasticsearch.client.internal.OriginSettingClient; @@ -575,48 +575,82 @@ private void refreshEnrichIndex(final String destinationIndexName, final int att protected void ensureSingleSegment(final String destinationIndexName, final int attempt) { enrichOriginClient().admin() .indices() - .segments(new IndicesSegmentsRequest(destinationIndexName), new DelegatingActionListener<>(listener) { - @Override - public void onResponse(IndicesSegmentResponse indicesSegmentResponse) { - IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName); - if (indexSegments == null) { + .segments(new IndicesSegmentsRequest(destinationIndexName), listener.delegateFailureAndWrap((l, indicesSegmentResponse) -> { + int failedShards = indicesSegmentResponse.getFailedShards(); + if (failedShards > 0) { + // Encountered a problem while querying the segments for the enrich index. Try and surface the problem in the log. + logger.warn( + "Policy [{}]: Encountered [{}] shard level failures while querying the segments for enrich index [{}]. " + + "Turn on DEBUG logging for details.", + policyName, + failedShards, + enrichIndexName + ); + if (logger.isDebugEnabled()) { + DefaultShardOperationFailedException[] shardFailures = indicesSegmentResponse.getShardFailures(); + int failureNumber = 1; + String logPrefix = "Policy [" + policyName + "]: Encountered shard failure ["; + String logSuffix = " of " + + shardFailures.length + + "] while querying segments for enrich index [" + + enrichIndexName + + "]. Shard ["; + for (DefaultShardOperationFailedException shardFailure : shardFailures) { + logger.debug( + logPrefix + failureNumber + logSuffix + shardFailure.index() + "][" + shardFailure.shardId() + "]", + shardFailure.getCause() + ); + failureNumber++; + } + } + } + IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName); + if (indexSegments == null) { + if (indicesSegmentResponse.getShardFailures().length == 0) { throw new ElasticsearchException( "Could not locate segment information for newly created index [{}]", destinationIndexName ); + } else { + DefaultShardOperationFailedException shardFailure = indicesSegmentResponse.getShardFailures()[0]; + throw new ElasticsearchException( + "Could not obtain segment information for newly created index [{}]; shard info [{}][{}]", + shardFailure.getCause(), + destinationIndexName, + shardFailure.index(), + shardFailure.shardId() + ); } - Map indexShards = indexSegments.getShards(); - assert indexShards.size() == 1 : "Expected enrich index to contain only one shard"; - ShardSegments[] shardSegments = indexShards.get(0).shards(); - assert shardSegments.length == 1 : "Expected enrich index to contain no replicas at this point"; - ShardSegments primarySegments = shardSegments[0]; - if (primarySegments.getSegments().size() > 1) { - int nextAttempt = attempt + 1; - if (nextAttempt > maxForceMergeAttempts) { - delegate.onFailure( - new ElasticsearchException( - "Force merging index [{}] attempted [{}] times but did not result in one segment.", - destinationIndexName, - attempt, - maxForceMergeAttempts - ) - ); - } else { - logger.debug( - "Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})", - policyName, - primarySegments.getSegments().size(), - nextAttempt, - maxForceMergeAttempts - ); - forceMergeEnrichIndex(destinationIndexName, nextAttempt); - } + } + Map indexShards = indexSegments.getShards(); + assert indexShards.size() == 1 : "Expected enrich index to contain only one shard"; + ShardSegments[] shardSegments = indexShards.get(0).shards(); + assert shardSegments.length == 1 : "Expected enrich index to contain no replicas at this point"; + ShardSegments primarySegments = shardSegments[0]; + if (primarySegments.getSegments().size() > 1) { + int nextAttempt = attempt + 1; + if (nextAttempt > maxForceMergeAttempts) { + throw new ElasticsearchException( + "Force merging index [{}] attempted [{}] times but did not result in one segment.", + destinationIndexName, + attempt, + maxForceMergeAttempts + ); } else { - // Force merge down to one segment successful - setIndexReadOnly(destinationIndexName); + logger.debug( + "Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})", + policyName, + primarySegments.getSegments().size(), + nextAttempt, + maxForceMergeAttempts + ); + forceMergeEnrichIndex(destinationIndexName, nextAttempt); } + } else { + // Force merge down to one segment successful + setIndexReadOnly(destinationIndexName); } - }); + })); } private void setIndexReadOnly(final String destinationIndexName) { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index 8ce1e7f350ccb..7ba3b356d6015 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.Client; @@ -2048,6 +2049,254 @@ protected void ensureSingleSegment(String destinationIndexName, int attempt) { ensureEnrichIndexIsReadOnly(createdEnrichIndex); } + public void testRunnerWithEmptySegmentsResponse() throws Exception { + final String sourceIndex = "source-index"; + DocWriteResponse indexRequest = client().index(new IndexRequest().index(sourceIndex).id("id").source(""" + { + "field1": "value1", + "field2": 2, + "field3": "ignored", + "field4": "ignored", + "field5": "value5" + }""", XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + assertResponse( + client().search(new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))), + sourceSearchResponse -> { + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field4"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); + } + ); + List enrichFields = List.of("field2", "field5"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + String createdEnrichIndex = ".enrich-test1-" + createTime; + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class); + Task asyncTask = testTaskManager.register("enrich", "policy_execution", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) {} + + @Override + public void setRequestId(long requestId) {} + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return policyName; + } + }); + ExecuteEnrichPolicyTask task = ((ExecuteEnrichPolicyTask) asyncTask); + // The executor would wrap the listener in order to clean up the task in the + // task manager, but we're just testing the runner, so we make sure to clean + // up after ourselves. + ActionListener wrappedListener = ActionListener.runBefore( + listener, + () -> testTaskManager.unregister(task) + ); + + // Wrap the client so that when we receive the indices segments action, we intercept the request and complete it on another thread + // with an empty segments response. + Client client = new FilterClient(client()) { + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (action.equals(IndicesSegmentsAction.INSTANCE)) { + testThreadPool.generic().execute(() -> { + @SuppressWarnings("unchecked") + ActionListener castListener = ((ActionListener) listener); + castListener.onResponse(new IndicesSegmentResponse(new ShardSegments[0], 0, 0, 0, List.of())); + }); + } else { + super.doExecute(action, request, listener); + } + } + }; + + EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner( + policyName, + policy, + task, + wrappedListener, + clusterService, + getInstanceFromNode(IndicesService.class), + client, + resolver, + createdEnrichIndex, + randomIntBetween(1, 10000), + randomIntBetween(3, 10) + ); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + if (latch.await(1, TimeUnit.MINUTES) == false) { + fail("Timeout while waiting for runner to complete"); + } + Exception exceptionThrown = exception.get(); + if (exceptionThrown == null) { + fail("Expected exception to be thrown from segment api"); + } + + // Validate exception information + assertThat(exceptionThrown, instanceOf(ElasticsearchException.class)); + assertThat(exceptionThrown.getMessage(), containsString("Could not locate segment information for newly created index")); + } + + public void testRunnerWithShardFailuresInSegmentResponse() throws Exception { + final String sourceIndex = "source-index"; + DocWriteResponse indexRequest = client().index(new IndexRequest().index(sourceIndex).id("id").source(""" + { + "field1": "value1", + "field2": 2, + "field3": "ignored", + "field4": "ignored", + "field5": "value5" + }""", XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + assertResponse( + client().search(new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))), + sourceSearchResponse -> { + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("field3"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field4"), is(equalTo("ignored"))); + assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); + } + ); + List enrichFields = List.of("field2", "field5"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + String createdEnrichIndex = ".enrich-test1-" + createTime; + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class); + Task asyncTask = testTaskManager.register("enrich", "policy_execution", new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) {} + + @Override + public void setRequestId(long requestId) {} + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ExecuteEnrichPolicyTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return policyName; + } + }); + ExecuteEnrichPolicyTask task = ((ExecuteEnrichPolicyTask) asyncTask); + // The executor would wrap the listener in order to clean up the task in the + // task manager, but we're just testing the runner, so we make sure to clean + // up after ourselves. + ActionListener wrappedListener = ActionListener.runBefore( + listener, + () -> testTaskManager.unregister(task) + ); + + // Wrap the client so that when we receive the indices segments action, we intercept the request and complete it on another thread + // with an failed segments response. + Client client = new FilterClient(client()) { + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (action.equals(IndicesSegmentsAction.INSTANCE)) { + testThreadPool.generic().execute(() -> { + @SuppressWarnings("unchecked") + ActionListener castListener = ((ActionListener) listener); + castListener.onResponse( + new IndicesSegmentResponse( + new ShardSegments[0], + 0, + 0, + 3, + List.of( + new DefaultShardOperationFailedException(createdEnrichIndex, 1, new ElasticsearchException("failure1")), + new DefaultShardOperationFailedException(createdEnrichIndex, 2, new ElasticsearchException("failure2")), + new DefaultShardOperationFailedException(createdEnrichIndex, 3, new ElasticsearchException("failure3")) + ) + ) + ); + }); + } else { + super.doExecute(action, request, listener); + } + } + }; + + EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner( + policyName, + policy, + task, + wrappedListener, + clusterService, + getInstanceFromNode(IndicesService.class), + client, + resolver, + createdEnrichIndex, + randomIntBetween(1, 10000), + randomIntBetween(3, 10) + ); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + if (latch.await(1, TimeUnit.MINUTES) == false) { + fail("Timeout while waiting for runner to complete"); + } + Exception exceptionThrown = exception.get(); + if (exceptionThrown == null) { + fail("Expected exception to be thrown from segment api"); + } + + // Validate exception information + assertThat(exceptionThrown, instanceOf(ElasticsearchException.class)); + assertThat(exceptionThrown.getMessage(), containsString("Could not obtain segment information for newly created index")); + assertThat(exceptionThrown.getCause(), instanceOf(ElasticsearchException.class)); + assertThat(exceptionThrown.getCause().getMessage(), containsString("failure1")); + } + public void testRunnerCancel() throws Exception { final String sourceIndex = "source-index"; DocWriteResponse indexRequest = client().index(new IndexRequest().index(sourceIndex).id("id").source(""" @@ -2495,7 +2744,7 @@ private ActionListener createTestListener( final CountDownLatch latch, final Consumer exceptionConsumer ) { - return new LatchedActionListener<>(ActionListener.wrap((r) -> logger.info("Run complete"), exceptionConsumer), latch); + return new LatchedActionListener<>(ActionListener.wrap((r) -> logger.debug("Run complete"), exceptionConsumer), latch); } private void validateMappingMetadata(Map mapping, String policyName, EnrichPolicy policy) {