Skip to content

Commit

Permalink
Merge branch 'main' into auto-ops/service-account
Browse files Browse the repository at this point in the history
  • Loading branch information
pickypg committed Jul 26, 2024
2 parents 81f89a7 + 5be31ac commit 9b4370b
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 38 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/111290.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111290
summary: Fix enrich policy runner exception handling on empty segments response
area: Ingest Node
type: bug
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/111311.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111311
summary: Adding support for data streams with a match-all template
area: Data streams
type: bug
issues:
- 111204
Original file line number Diff line number Diff line change
Expand Up @@ -1150,3 +1150,94 @@ setup:
name: simple-data-stream2
- is_true: acknowledged

---
"Create data stream with match all template":
- requires:
cluster_features: ["gte_v8.16.0"]
reason: "data streams supoprt for match all templates only supported in 8.16"

- do:
allowed_warnings:
- "index template [match-all-template] has index patterns [*] matching patterns from existing older templates [.monitoring-logstash,.monitoring-es,.monitoring-beats,.monitoring-alerts-7,.monitoring-kibana] with patterns (.monitoring-logstash => [.monitoring-logstash-7-*],.monitoring-es => [.monitoring-es-7-*],.monitoring-beats => [.monitoring-beats-7-*],.monitoring-alerts-7 => [.monitoring-alerts-7],.monitoring-kibana => [.monitoring-kibana-7-*]); this template [match-all-template] will take precedence during new index creation"
indices.put_index_template:
name: match-all-template
body:
index_patterns: [ "*" ]
priority: 1
data_stream: {}

- do:
indices.create_data_stream:
name: match-all-data-stream
- is_true: acknowledged

- do:
cluster.health:
wait_for_status: green

- do:
indices.get_data_stream:
name: "*"
- match: { data_streams.0.name: match-all-data-stream }
- match: { data_streams.0.generation: 1 }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-match-all-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { data_streams.0.status: 'GREEN' }
- match: { data_streams.0.template: 'match-all-template' }
- match: { data_streams.0.hidden: false }

- do:
indices.delete_data_stream:
name: match-all-data-stream
- is_true: acknowledged

- do:
indices.delete_index_template:
name: match-all-template
- is_true: acknowledged

---
"Create hidden data stream with match all template":
- requires:
cluster_features: [ "gte_v8.16.0" ]
reason: "data streams supoprt for match all templates only supported in 8.16"
- do:
allowed_warnings:
- "index template [match-all-hidden-template] has index patterns [*] matching patterns from existing older templates [.monitoring-logstash,.monitoring-es,.monitoring-beats,.monitoring-alerts-7,.monitoring-kibana] with patterns (.monitoring-logstash => [.monitoring-logstash-7-*],.monitoring-es => [.monitoring-es-7-*],.monitoring-beats => [.monitoring-beats-7-*],.monitoring-alerts-7 => [.monitoring-alerts-7],.monitoring-kibana => [.monitoring-kibana-7-*]); this template [match-all-hidden-template] will take precedence during new index creation"
indices.put_index_template:
name: match-all-hidden-template
body:
index_patterns: [ "*" ]
priority: 1
data_stream:
hidden: true
- do:
indices.create_data_stream:
name: match-all-hidden-data-stream
- is_true: acknowledged

- do:
cluster.health:
wait_for_status: green

- do:
indices.get_data_stream:
name: "*"
- length: { data_streams: 0 }

- do:
indices.get_data_stream:
name: ['*']
expand_wildcards: hidden
- length: { data_streams: 1 }
- match: { data_streams.0.name: match-all-hidden-data-stream }
- match: { data_streams.0.hidden: true }

- do:
indices.delete_data_stream:
name: match-all-hidden-data-stream
- is_true: acknowledged

- do:
indices.delete_index_template:
name: match-all-hidden-template
4 changes: 4 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ tests:
issue: https://github.com/elastic/elasticsearch/issues/111308
- class: org.elasticsearch.xpack.ml.integration.DatafeedJobsRestIT
issue: https://github.com/elastic/elasticsearch/issues/111319
- class: org.elasticsearch.xpack.searchablesnapshots.AzureSearchableSnapshotsIT
issue: https://github.com/elastic/elasticsearch/issues/111279
- class: org.elasticsearch.repositories.azure.RepositoryAzureClientYamlTestSuiteIT
issue: https://github.com/elastic/elasticsearch/issues/111345

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class IndicesSegmentResponse extends ChunkedBroadcastResponse {

private volatile Map<String, IndexSegments> indicesSegments;

IndicesSegmentResponse(
public IndicesSegmentResponse(
ShardSegments[] shards,
int totalShards,
int successfulShards,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,12 @@ static List<Tuple<String, ComposableIndexTemplate>> findV2CandidateTemplates(Met
for (Map.Entry<String, ComposableIndexTemplate> entry : metadata.templatesV2().entrySet()) {
final String name = entry.getKey();
final ComposableIndexTemplate template = entry.getValue();
if (isHidden == false) {
/*
* We do not ordinarily return match-all templates for hidden indices. But all backing indices for data streams are hidden,
* and we do want to return even match-all templates for those. Not doing so can result in a situation where a data stream is
* built with a template that none of its indices match.
*/
if (isHidden == false || template.getDataStreamTemplate() != null) {
final boolean matched = template.indexPatterns().stream().anyMatch(patternMatchPredicate);
if (matched) {
candidates.add(Tuple.tuple(name, template));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2158,6 +2158,23 @@ public void testDataStreamsUsingTemplates() throws Exception {
MetadataIndexTemplateService.innerRemoveIndexTemplateV2(stateWithTwoTemplates, "logs");
}

public void testDataStreamsUsingMatchAllTemplate() throws Exception {
ClusterState state = ClusterState.EMPTY_STATE;
final MetadataIndexTemplateService service = getMetadataIndexTemplateService();

ComposableIndexTemplate template = ComposableIndexTemplate.builder()
.indexPatterns(Collections.singletonList("*"))
.priority(100L)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build();
final String templateName = "all-data-streams-template";
state = service.addIndexTemplateV2(state, false, templateName, template);
// When creating a data stream, we'll look for templates. The data stream is not hidden
assertThat(MetadataIndexTemplateService.findV2Template(state.metadata(), "some-data-stream", false), equalTo(templateName));
// The write index for a data stream will be a hidden index. We need to make sure it matches the same template:
assertThat(MetadataIndexTemplateService.findV2Template(state.metadata(), "some-data-stream", true), equalTo(templateName));
}

public void testRemovingHigherOrderTemplateOfDataStreamWithMultipleTemplates() throws Exception {
ClusterState state = ClusterState.EMPTY_STATE;
final MetadataIndexTemplateService service = getMetadataIndexTemplateService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.FilterClient;
import org.elasticsearch.client.internal.OriginSettingClient;
Expand Down Expand Up @@ -575,48 +575,82 @@ private void refreshEnrichIndex(final String destinationIndexName, final int att
protected void ensureSingleSegment(final String destinationIndexName, final int attempt) {
enrichOriginClient().admin()
.indices()
.segments(new IndicesSegmentsRequest(destinationIndexName), new DelegatingActionListener<>(listener) {
@Override
public void onResponse(IndicesSegmentResponse indicesSegmentResponse) {
IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName);
if (indexSegments == null) {
.segments(new IndicesSegmentsRequest(destinationIndexName), listener.delegateFailureAndWrap((l, indicesSegmentResponse) -> {
int failedShards = indicesSegmentResponse.getFailedShards();
if (failedShards > 0) {
// Encountered a problem while querying the segments for the enrich index. Try and surface the problem in the log.
logger.warn(
"Policy [{}]: Encountered [{}] shard level failures while querying the segments for enrich index [{}]. "
+ "Turn on DEBUG logging for details.",
policyName,
failedShards,
enrichIndexName
);
if (logger.isDebugEnabled()) {
DefaultShardOperationFailedException[] shardFailures = indicesSegmentResponse.getShardFailures();
int failureNumber = 1;
String logPrefix = "Policy [" + policyName + "]: Encountered shard failure [";
String logSuffix = " of "
+ shardFailures.length
+ "] while querying segments for enrich index ["
+ enrichIndexName
+ "]. Shard [";
for (DefaultShardOperationFailedException shardFailure : shardFailures) {
logger.debug(
logPrefix + failureNumber + logSuffix + shardFailure.index() + "][" + shardFailure.shardId() + "]",
shardFailure.getCause()
);
failureNumber++;
}
}
}
IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName);
if (indexSegments == null) {
if (indicesSegmentResponse.getShardFailures().length == 0) {
throw new ElasticsearchException(
"Could not locate segment information for newly created index [{}]",
destinationIndexName
);
} else {
DefaultShardOperationFailedException shardFailure = indicesSegmentResponse.getShardFailures()[0];
throw new ElasticsearchException(
"Could not obtain segment information for newly created index [{}]; shard info [{}][{}]",
shardFailure.getCause(),
destinationIndexName,
shardFailure.index(),
shardFailure.shardId()
);
}
Map<Integer, IndexShardSegments> indexShards = indexSegments.getShards();
assert indexShards.size() == 1 : "Expected enrich index to contain only one shard";
ShardSegments[] shardSegments = indexShards.get(0).shards();
assert shardSegments.length == 1 : "Expected enrich index to contain no replicas at this point";
ShardSegments primarySegments = shardSegments[0];
if (primarySegments.getSegments().size() > 1) {
int nextAttempt = attempt + 1;
if (nextAttempt > maxForceMergeAttempts) {
delegate.onFailure(
new ElasticsearchException(
"Force merging index [{}] attempted [{}] times but did not result in one segment.",
destinationIndexName,
attempt,
maxForceMergeAttempts
)
);
} else {
logger.debug(
"Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})",
policyName,
primarySegments.getSegments().size(),
nextAttempt,
maxForceMergeAttempts
);
forceMergeEnrichIndex(destinationIndexName, nextAttempt);
}
}
Map<Integer, IndexShardSegments> indexShards = indexSegments.getShards();
assert indexShards.size() == 1 : "Expected enrich index to contain only one shard";
ShardSegments[] shardSegments = indexShards.get(0).shards();
assert shardSegments.length == 1 : "Expected enrich index to contain no replicas at this point";
ShardSegments primarySegments = shardSegments[0];
if (primarySegments.getSegments().size() > 1) {
int nextAttempt = attempt + 1;
if (nextAttempt > maxForceMergeAttempts) {
throw new ElasticsearchException(
"Force merging index [{}] attempted [{}] times but did not result in one segment.",
destinationIndexName,
attempt,
maxForceMergeAttempts
);
} else {
// Force merge down to one segment successful
setIndexReadOnly(destinationIndexName);
logger.debug(
"Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})",
policyName,
primarySegments.getSegments().size(),
nextAttempt,
maxForceMergeAttempts
);
forceMergeEnrichIndex(destinationIndexName, nextAttempt);
}
} else {
// Force merge down to one segment successful
setIndexReadOnly(destinationIndexName);
}
});
}));
}

private void setIndexReadOnly(final String destinationIndexName) {
Expand Down
Loading

0 comments on commit 9b4370b

Please sign in to comment.