Skip to content

Commit

Permalink
Merge branch 'main' into 1127/aws
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Oct 10, 2024
2 parents 283c281 + 9ae0140 commit 05f3656
Show file tree
Hide file tree
Showing 36 changed files with 3,474 additions and 1,850 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ class PublishPluginFuncTest extends AbstractGradleFuncTest {
<distribution>repo</distribution>
</license>
<license>
<name>Server Side Public License, v 1</name>
<url>https://www.mongodb.com/licensing/server-side-public-license</url>
<name>GNU Affero General Public License Version 3</name>
<url>https://raw.githubusercontent.com/elastic/elasticsearch/v1.0/licenses/AGPL-3.0+SSPL-1.0+ELASTIC-LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
<license>
<name>The OSI-approved Open Source license Version 3.0</name>
<url>https://raw.githubusercontent.com/elastic/elasticsearch/v1.0/licenses/AGPL-3.0+SSPL-1.0+ELASTIC-LICENSE-2.0.txt</url>
<name>Server Side Public License, v 1</name>
<url>https://www.mongodb.com/licensing/server-side-public-license</url>
<distribution>repo</distribution>
</license>
</licenses>
Expand Down Expand Up @@ -150,13 +150,13 @@ class PublishPluginFuncTest extends AbstractGradleFuncTest {
<distribution>repo</distribution>
</license>
<license>
<name>Server Side Public License, v 1</name>
<url>https://www.mongodb.com/licensing/server-side-public-license</url>
<name>GNU Affero General Public License Version 3</name>
<url>https://raw.githubusercontent.com/elastic/elasticsearch/v1.0/licenses/AGPL-3.0+SSPL-1.0+ELASTIC-LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
<license>
<name>The OSI-approved Open Source license Version 3.0</name>
<url>https://raw.githubusercontent.com/elastic/elasticsearch/v1.0/licenses/AGPL-3.0+SSPL-1.0+ELASTIC-LICENSE-2.0.txt</url>
<name>Server Side Public License, v 1</name>
<url>https://www.mongodb.com/licensing/server-side-public-license</url>
<distribution>repo</distribution>
</license>
</licenses>
Expand Down Expand Up @@ -239,13 +239,13 @@ class PublishPluginFuncTest extends AbstractGradleFuncTest {
<distribution>repo</distribution>
</license>
<license>
<name>Server Side Public License, v 1</name>
<url>https://www.mongodb.com/licensing/server-side-public-license</url>
<name>GNU Affero General Public License Version 3</name>
<url>https://raw.githubusercontent.com/elastic/elasticsearch/v1.0/licenses/AGPL-3.0+SSPL-1.0+ELASTIC-LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
<license>
<name>The OSI-approved Open Source license Version 3.0</name>
<url>https://raw.githubusercontent.com/elastic/elasticsearch/v1.0/licenses/AGPL-3.0+SSPL-1.0+ELASTIC-LICENSE-2.0.txt</url>
<name>Server Side Public License, v 1</name>
<url>https://www.mongodb.com/licensing/server-side-public-license</url>
<distribution>repo</distribution>
</license>
</licenses>
Expand Down Expand Up @@ -337,13 +337,13 @@ class PublishPluginFuncTest extends AbstractGradleFuncTest {
<distribution>repo</distribution>
</license>
<license>
<name>Server Side Public License, v 1</name>
<url>https://www.mongodb.com/licensing/server-side-public-license</url>
<name>GNU Affero General Public License Version 3</name>
<url>https://raw.githubusercontent.com/elastic/elasticsearch/v1.0/licenses/AGPL-3.0+SSPL-1.0+ELASTIC-LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
<license>
<name>The OSI-approved Open Source license Version 3.0</name>
<url>https://raw.githubusercontent.com/elastic/elasticsearch/v1.0/licenses/AGPL-3.0+SSPL-1.0+ELASTIC-LICENSE-2.0.txt</url>
<name>Server Side Public License, v 1</name>
<url>https://www.mongodb.com/licensing/server-side-public-license</url>
<distribution>repo</distribution>
</license>
</licenses>
Expand Down Expand Up @@ -415,13 +415,13 @@ class PublishPluginFuncTest extends AbstractGradleFuncTest {
<distribution>repo</distribution>
</license>
<license>
<name>Server Side Public License, v 1</name>
<url>https://www.mongodb.com/licensing/server-side-public-license</url>
<name>GNU Affero General Public License Version 3</name>
<url>https://raw.githubusercontent.com/elastic/elasticsearch/v2.0/licenses/AGPL-3.0+SSPL-1.0+ELASTIC-LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
<license>
<name>The OSI-approved Open Source license Version 3.0</name>
<url>https://raw.githubusercontent.com/elastic/elasticsearch/v2.0/licenses/AGPL-3.0+SSPL-1.0+ELASTIC-LICENSE-2.0.txt</url>
<name>Server Side Public License, v 1</name>
<url>https://www.mongodb.com/licensing/server-side-public-license</url>
<distribution>repo</distribution>
</license>
</licenses>
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/112905.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 112905
summary: "[ES|QL] Named parameter for field names and field name patterns"
area: ES|QL
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/114386.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114386
summary: Improve handling of failure to create persistent task
area: Task Management
type: bug
issues: []
3 changes: 3 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ tests:
- class: org.elasticsearch.xpack.inference.DefaultElserIT
method: testInferCreatesDefaultElser
issue: https://github.com/elastic/elasticsearch/issues/114503
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=synonyms/60_synonym_rule_get/Synonym set not found}
issue: https://github.com/elastic/elasticsearch/issues/114432

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.persistent;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class PersistentTaskCreationFailureIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(FailingCreationPersistentTasksPlugin.class);
}

private static boolean hasPersistentTask(ClusterState clusterState) {
return findTasks(clusterState, FailingCreationPersistentTaskExecutor.TASK_NAME).isEmpty() == false;
}

public void testPersistentTasksThatFailDuringCreationAreRemovedFromClusterState() {

final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var plugins = StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false)
.flatMap(ps -> ps.filterPlugins(FailingCreationPersistentTasksPlugin.class))
.toList();
plugins.forEach(plugin -> plugin.hasFailedToCreateTask.set(false));

final var taskCreatedListener = ClusterServiceUtils.addTemporaryStateListener(
masterClusterService,
PersistentTaskCreationFailureIT::hasPersistentTask
);

taskCreatedListener.andThenAccept(v -> {
// enqueue some higher-priority cluster state updates to check that they do not cause retries of the failing task creation step
for (int i = 0; i < 5; i++) {
masterClusterService.submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
assertTrue(hasPersistentTask(currentState));

assertTrue(waitUntil(() -> {
final var completePersistentTaskPendingTasksCount = masterClusterService.getMasterService()
.pendingTasks()
.stream()
.filter(
pendingClusterTask -> pendingClusterTask.getSource().string().equals("finish persistent task (failed)")
)
.count();
assertThat(completePersistentTaskPendingTasksCount, lessThanOrEqualTo(1L));
return completePersistentTaskPendingTasksCount == 1L;
}));

return currentState.copyAndUpdateMetadata(
mdb -> mdb.putCustom(
PersistentTasksCustomMetadata.TYPE,
PersistentTasksCustomMetadata.builder(
PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(currentState)
)
// create and remove a fake task just to force a change in lastAllocationId so that
// PersistentTasksNodeService checks for changes and potentially retries
.addTask("test", "test", null, PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT)
.removeTask("test")
.build()
)
);
}

@Override
public void onFailure(Exception e) {
fail(e);
}
});
}
});

safeAwait(
l -> internalCluster().getInstance(PersistentTasksService.class)
.sendStartRequest(
UUIDs.base64UUID(),
FailingCreationPersistentTaskExecutor.TASK_NAME,
new FailingCreationTaskParams(),
null,
l.map(ignored -> null)
)
);

safeAwait(
taskCreatedListener.<Void>andThen(
(l, v) -> ClusterServiceUtils.addTemporaryStateListener(
masterClusterService,
clusterState -> hasPersistentTask(clusterState) == false
).addListener(l)
)
);

assertEquals(1L, plugins.stream().filter(plugin -> plugin.hasFailedToCreateTask.get()).count());
}

public static class FailingCreationPersistentTasksPlugin extends Plugin implements PersistentTaskPlugin {

private final AtomicBoolean hasFailedToCreateTask = new AtomicBoolean();

@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
ClusterService clusterService,
ThreadPool threadPool,
Client client,
SettingsModule settingsModule,
IndexNameExpressionResolver expressionResolver
) {
return List.of(new FailingCreationPersistentTaskExecutor(hasFailedToCreateTask));
}

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return List.of(
new NamedWriteableRegistry.Entry(
PersistentTaskParams.class,
FailingCreationPersistentTaskExecutor.TASK_NAME,
FailingCreationTaskParams::new
)
);
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return List.of(
new NamedXContentRegistry.Entry(
PersistentTaskParams.class,
new ParseField(FailingCreationPersistentTaskExecutor.TASK_NAME),
p -> {
p.skipChildren();
return new FailingCreationTaskParams();
}
)
);
}
}

public static class FailingCreationTaskParams implements PersistentTaskParams {
public FailingCreationTaskParams() {}

public FailingCreationTaskParams(StreamInput in) {}

@Override
public String getWriteableName() {
return FailingCreationPersistentTaskExecutor.TASK_NAME;
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersion.current();
}

@Override
public void writeTo(StreamOutput out) throws IOException {}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.endObject();
return builder;
}
}

static class FailingCreationPersistentTaskExecutor extends PersistentTasksExecutor<FailingCreationTaskParams> {
static final String TASK_NAME = "cluster:admin/persistent/test_creation_failure";

private final AtomicBoolean hasFailedToCreateTask;

FailingCreationPersistentTaskExecutor(AtomicBoolean hasFailedToCreateTask) {
super(TASK_NAME, r -> fail("execution is unexpected"));
this.hasFailedToCreateTask = hasFailedToCreateTask;
}

@Override
protected AllocatedPersistentTask createTask(
long id,
String type,
String action,
TaskId parentTaskId,
PersistentTasksCustomMetadata.PersistentTask<FailingCreationTaskParams> taskInProgress,
Map<String, String> headers
) {
assertTrue("already failed before", hasFailedToCreateTask.compareAndSet(false, true));
throw new RuntimeException("simulated");
}

@Override
protected void nodeOperation(AllocatedPersistentTask task, FailingCreationTaskParams params, PersistentTaskState state) {
fail("execution is unexpected");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public T get(long key) {
* an insertion.
*/
public T put(long key, T value) {
assert value != null : "Null values are not supported";
if (size >= maxSize) {
assert size == maxSize;
grow();
Expand Down Expand Up @@ -94,9 +95,6 @@ public T remove(long key) {
}

private T set(long key, T value) {
if (value == null) {
throw new IllegalArgumentException("Null values are not supported");
}
for (long i = slot(hash(key), mask);; i = nextSlot(i, mask)) {
final T previous = values.getAndSet(i, value);
if (previous == null) {
Expand All @@ -116,7 +114,7 @@ private T set(long key, T value) {

@Override
public Iterator<Cursor<T>> iterator() {
return new Iterator<Cursor<T>>() {
return new Iterator<>() {

boolean cached;
final Cursor<T> cursor;
Expand Down Expand Up @@ -181,9 +179,21 @@ protected boolean used(long bucket) {
protected void removeAndAdd(long index) {
final long key = keys.get(index);
final T value = values.getAndSet(index, null);
--size;
final T removed = set(key, value);
assert removed == null;
reset(key, value);
}

private void reset(long key, T value) {
final ObjectArray<T> values = this.values;
final long mask = this.mask;
for (long i = slot(hash(key), mask);; i = nextSlot(i, mask)) {
final T previous = values.get(i);
if (previous == null) {
// slot was free
keys.set(i, key);
values.set(i, value);
break;
}
}
}

public static final class Cursor<T> {
Expand Down
Loading

0 comments on commit 05f3656

Please sign in to comment.