From fcfc5a0a950dc3ab5e827dfe7ea920607260d33d Mon Sep 17 00:00:00 2001 From: Shephali Mittal Date: Tue, 6 Feb 2024 16:08:33 +0530 Subject: [PATCH] Add Tracing instrumentation for OpenSearch tasks management Signed-off-by: Shephali Mittal --- .../reindex/AsyncBulkByScrollActionTests.java | 3 +- .../main/java/org/opensearch/tasks/Task.java | 25 ++++++++++++ .../opensearch/tasks/TaskAwareRequest.java | 8 ++++ .../TaskCancellationMonitoringService.java | 2 + .../org/opensearch/tasks/TaskManager.java | 20 ++++++++-- .../tasks/TraceableTaskListener.java | 39 +++++++++++++++++++ .../telemetry/tracing/AttributeNames.java | 4 ++ .../telemetry/tracing/SpanBuilder.java | 12 ++++++ .../transport/TransportService.java | 9 +++-- .../node/tasks/TaskManagerTestCase.java | 6 ++- .../TransportActionFilterChainTests.java | 5 ++- .../client/node/NodeClientHeadersTests.java | 3 +- .../shard/PrimaryReplicaSyncerTests.java | 5 ++- .../PersistentTasksNodeServiceTests.java | 9 +++-- .../indices/RestValidateQueryActionTests.java | 3 +- .../opensearch/tasks/TaskManagerTests.java | 5 ++- .../transport/InboundHandlerTests.java | 2 +- ...enSearchIndexLevelReplicationTestCase.java | 3 +- .../test/tasks/MockTaskManager.java | 3 +- .../test/transport/MockTransportService.java | 5 ++- 20 files changed, 143 insertions(+), 28 deletions(-) create mode 100644 server/src/main/java/org/opensearch/tasks/TraceableTaskListener.java diff --git a/modules/reindex/src/test/java/org/opensearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/opensearch/index/reindex/AsyncBulkByScrollActionTests.java index 8ddc1ff778982..504e3d2ca7689 100644 --- a/modules/reindex/src/test/java/org/opensearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/opensearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -89,6 +89,7 @@ import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.client.NoOpClient; import org.opensearch.threadpool.TestThreadPool; @@ -162,7 +163,7 @@ public void setupForTest() { testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest); listener = new PlainActionFuture<>(); scrollId = null; - taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE); testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest); testTask.setWorker(testRequest.getRequestsPerSecond(), null); worker = testTask.getWorkerState(); diff --git a/server/src/main/java/org/opensearch/tasks/Task.java b/server/src/main/java/org/opensearch/tasks/Task.java index a21a454a65d0e..3f0fabd9d88be 100644 --- a/server/src/main/java/org/opensearch/tasks/Task.java +++ b/server/src/main/java/org/opensearch/tasks/Task.java @@ -48,6 +48,7 @@ import org.opensearch.core.tasks.resourcetracker.ThreadResourceInfo; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.telemetry.tracing.Span; import java.io.IOException; import java.util.ArrayList; @@ -111,6 +112,8 @@ public class Task { */ private final long startTimeNanos; + private final Span span; + public Task(long id, String type, String action, String description, TaskId parentTask, Map headers) { this( id, @@ -148,6 +151,21 @@ public Task( this.headers = headers; this.resourceStats = resourceStats; this.resourceTrackingCompletionListeners = resourceTrackingCompletionListeners; + this.span = null; + } + + public Task(long id, String type, String action, String description, TaskId parentTask, Map headers, Span span) { + this.id = id; + this.type = type; + this.action = action; + this.description = description; + this.parentTask = parentTask; + this.startTime = System.currentTimeMillis(); + this.startTimeNanos = System.nanoTime(); + this.headers = headers; + this.resourceStats = new ConcurrentHashMap<>(); + this.resourceTrackingCompletionListeners = new ArrayList<>(); + this.span = span; } /** @@ -277,6 +295,13 @@ public TaskId getParentTaskId() { return parentTask; } + /** + * Returns span related to Task + */ + public Span getSpan() { + return span; + } + /** * Build a status for this task or null if this task doesn't have status. * Since most tasks don't have status this defaults to returning null. While diff --git a/server/src/main/java/org/opensearch/tasks/TaskAwareRequest.java b/server/src/main/java/org/opensearch/tasks/TaskAwareRequest.java index 6913f6cfc375c..4c830525abd28 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskAwareRequest.java +++ b/server/src/main/java/org/opensearch/tasks/TaskAwareRequest.java @@ -33,6 +33,7 @@ package org.opensearch.tasks; import org.opensearch.core.tasks.TaskId; +import org.opensearch.telemetry.tracing.Span; import java.util.Map; @@ -67,6 +68,13 @@ default Task createTask(long id, String type, String action, TaskId parentTaskId return new Task(id, type, action, getDescription(), parentTaskId, headers); } + /** + * Returns the task object that should be used to keep track of the processing of the request. + */ + default Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers, Span span) { + return new Task(id, type, action, getDescription(), parentTaskId, headers, span); + } + /** * Returns optional description of the request to be displayed by the task manager */ diff --git a/server/src/main/java/org/opensearch/tasks/TaskCancellationMonitoringService.java b/server/src/main/java/org/opensearch/tasks/TaskCancellationMonitoringService.java index 343d4571593a7..654712d6fcfae 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskCancellationMonitoringService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskCancellationMonitoringService.java @@ -66,6 +66,8 @@ public TaskCancellationMonitoringService( cancellationStatsHolder = TASKS_TO_TRACK.stream() .collect(Collectors.toConcurrentMap(task -> task, task -> new TaskCancellationStatsHolder())); taskManager.addTaskEventListeners(this); + TraceableTaskListener traceableTaskListener = new TraceableTaskListener(); + taskManager.addTaskEventListeners(traceableTaskListener); } void doRun() { diff --git a/server/src/main/java/org/opensearch/tasks/TaskManager.java b/server/src/main/java/org/opensearch/tasks/TaskManager.java index a49968ab85e89..46d817eabf95d 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskManager.java +++ b/server/src/main/java/org/opensearch/tasks/TaskManager.java @@ -60,6 +60,10 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.tasks.TaskCancelledException; import org.opensearch.core.tasks.TaskId; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TcpChannel; @@ -132,22 +136,26 @@ public class TaskManager implements ClusterStateApplier { private final Set> taskResourceConsumer; private final List taskEventListeners = new ArrayList<>(); + private final Tracer tracer; + public static TaskManager createTaskManagerWithClusterSettings( Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, - Set taskHeaders + Set taskHeaders, + Tracer tracer ) { - final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders); + final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders, tracer); clusterSettings.addSettingsUpdateConsumer(TASK_RESOURCE_CONSUMERS_ENABLED, taskManager::setTaskResourceConsumersEnabled); return taskManager; } - public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders, Tracer tracer) { this.threadPool = threadPool; this.taskHeaders = new ArrayList<>(taskHeaders); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings); + this.tracer = tracer; taskResourceConsumer = new HashSet<>(); } @@ -203,7 +211,11 @@ public Task register(String type, String action, TaskAwareRequest request) { headers.put(key, httpHeader); } } - Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers); + Span span = tracer.startSpan(SpanBuilder.from("Task", request.getParentTask().getNodeId(), request)); + Task task; + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers, span); + } Objects.requireNonNull(task); assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId"; if (logger.isTraceEnabled()) { diff --git a/server/src/main/java/org/opensearch/tasks/TraceableTaskListener.java b/server/src/main/java/org/opensearch/tasks/TraceableTaskListener.java new file mode 100644 index 0000000000000..abd5dc4e2bb85 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/TraceableTaskListener.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import org.opensearch.telemetry.tracing.Span; + +/** + * Task Listener to ensure ending the span properly. + */ +public class TraceableTaskListener implements TaskManager.TaskEventListeners { + + /** + * End the span on task cancellation + * @param task task which got cancelled. + */ + public void onTaskCancelled(CancellableTask task) { + Span span = task.getSpan(); + if (span != null) { + span.endSpan(); + } + } + + /** + * End the span on task Completion + * @param task task which got completed. + */ + public void onTaskCompleted(Task task) { + Span span = task.getSpan(); + if (span != null) { + span.endSpan(); + } + } +} diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java index b6b2cf360d1c5..a983a4fc2f1d9 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java @@ -94,4 +94,8 @@ private AttributeNames() { * Refresh Policy */ public static final String REFRESH_POLICY = "refresh_policy"; + + public static final String DESCRIPTION = "description"; + + public static final String PARENT_TASK_ID = "parent_task_id"; } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index 1dce422943b7a..5b8aabe42705c 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -14,6 +14,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.http.HttpRequest; import org.opensearch.rest.RestRequest; +import org.opensearch.tasks.TaskAwareRequest; import org.opensearch.telemetry.tracing.attributes.Attributes; import org.opensearch.transport.TcpChannel; import org.opensearch.transport.Transport; @@ -74,6 +75,10 @@ public static SpanCreationContext from(String spanName, String nodeId, Replicate return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, request)); } + public static SpanCreationContext from(String spanName, String nodeId, TaskAwareRequest request) { + return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(request)); + } + private static String createSpanName(HttpRequest httpRequest) { return httpRequest.method().name() + SEPARATOR + httpRequest.uri(); } @@ -87,6 +92,13 @@ private static Attributes buildSpanAttributes(HttpRequest httpRequest) { return attributes; } + private static Attributes buildSpanAttributes(TaskAwareRequest request) { + Attributes attributes = Attributes.create() + .addAttribute(AttributeNames.DESCRIPTION, request.getDescription()) + .addAttribute(AttributeNames.PARENT_TASK_ID, request.getParentTask().getId()); + return attributes; + } + private static void populateHeader(HttpRequest httpRequest, Attributes attributes) { HEADERS_TO_BE_ADDED_AS_ATTRIBUTES.forEach(x -> { if (httpRequest.getHeaders() != null diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index d50266d8c9e4a..d23f29d430699 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -232,7 +232,7 @@ public TransportService( setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); - taskManager = createTaskManager(settings, clusterSettings, threadPool, taskHeaders); + taskManager = createTaskManager(settings, clusterSettings, threadPool, taskHeaders, tracer); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); @@ -273,12 +273,13 @@ protected TaskManager createTaskManager( Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, - Set taskHeaders + Set taskHeaders, + Tracer tracer ) { if (clusterSettings != null) { - return TaskManager.createTaskManagerWithClusterSettings(settings, clusterSettings, threadPool, taskHeaders); + return TaskManager.createTaskManagerWithClusterSettings(settings, clusterSettings, threadPool, taskHeaders, tracer); } else { - return new TaskManager(settings, threadPool, taskHeaders); + return new TaskManager(settings, threadPool, taskHeaders, tracer); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index a3fa0f9cb16e4..966568305cbf3 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -60,6 +60,7 @@ import org.opensearch.tasks.TaskCancellationService; import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.tasks.MockTaskManager; @@ -226,12 +227,13 @@ protected TaskManager createTaskManager( Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, - Set taskHeaders + Set taskHeaders, + Tracer tracer ) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders); + return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders, tracer); } } }; diff --git a/server/src/test/java/org/opensearch/action/support/TransportActionFilterChainTests.java b/server/src/test/java/org/opensearch/action/support/TransportActionFilterChainTests.java index a4f40db365f9a..f4a114de8580f 100644 --- a/server/src/test/java/org/opensearch/action/support/TransportActionFilterChainTests.java +++ b/server/src/test/java/org/opensearch/action/support/TransportActionFilterChainTests.java @@ -43,6 +43,7 @@ import org.opensearch.node.Node; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; import org.junit.After; @@ -98,7 +99,7 @@ public void testActionFiltersRequest() throws InterruptedException { TransportAction transportAction = new TransportAction( actionName, actionFilters, - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()) + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE) ) { @Override protected void doExecute(Task task, TestRequest request, ActionListener listener) { @@ -183,7 +184,7 @@ public void exe TransportAction transportAction = new TransportAction( actionName, actionFilters, - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()) + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE) ) { @Override protected void doExecute(Task task, TestRequest request, ActionListener listener) { diff --git a/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java b/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java index 176c94b01c878..e5007e421b6b0 100644 --- a/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java +++ b/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java @@ -44,6 +44,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.threadpool.ThreadPool; import java.util.Collections; @@ -76,7 +77,7 @@ private Actions(Settings settings, ThreadPool threadPool, ActionType[] action private static class InternalTransportAction extends TransportAction { private InternalTransportAction(Settings settings, String actionName, ThreadPool threadPool) { - super(actionName, EMPTY_FILTERS, new TaskManager(settings, threadPool, Collections.emptySet())); + super(actionName, EMPTY_FILTERS, new TaskManager(settings, threadPool, Collections.emptySet(), NoopTracer.INSTANCE)); } @Override diff --git a/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java index b1bcaac2c1947..d397a12194cd9 100644 --- a/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java @@ -56,6 +56,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import java.io.IOException; import java.nio.ByteBuffer; @@ -74,7 +75,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { public void testSyncerSendsOffCorrectDocuments() throws Exception { IndexShard shard = newStartedShard(true); - TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE); AtomicBoolean syncActionCalled = new AtomicBoolean(); List resyncRequests = new ArrayList<>(); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { @@ -164,7 +165,7 @@ public void testSyncerOnClosingShard() throws Exception { threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); }; PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer( - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE), syncAction ); syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately diff --git a/server/src/test/java/org/opensearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/opensearch/persistent/PersistentTasksNodeServiceTests.java index a97e3504f4d34..03cadae56efd5 100644 --- a/server/src/test/java/org/opensearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/opensearch/persistent/PersistentTasksNodeServiceTests.java @@ -54,6 +54,7 @@ import org.opensearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -133,7 +134,7 @@ public void testStartTask() { PersistentTasksNodeService coordinator = new PersistentTasksNodeService( persistentTasksService, registry, - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE), executor ); @@ -248,7 +249,7 @@ public void testParamsStatusAndNodeTaskAreDelegated() throws Exception { PersistentTasksNodeService coordinator = new PersistentTasksNodeService( persistentTasksService, registry, - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE), executor ); @@ -305,7 +306,7 @@ public void sendCompletionRequest( int nonLocalNodesCount = randomInt(10); MockExecutor executor = new MockExecutor(); - TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE); PersistentTasksNodeService coordinator = new PersistentTasksNodeService(persistentTasksService, registry, taskManager, executor); ClusterState state = createInitialClusterState(nonLocalNodesCount, Settings.EMPTY); @@ -394,7 +395,7 @@ public void sendCompletionRequest( PersistentTasksNodeService coordinator = new PersistentTasksNodeService( persistentTasksService, registry, - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE), executor ); diff --git a/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java index 3fb6764846da6..4a4e3469fadb9 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java @@ -50,6 +50,7 @@ import org.opensearch.search.AbstractSearchTestCase; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.rest.FakeRestChannel; import org.opensearch.test.rest.FakeRestRequest; import org.opensearch.threadpool.TestThreadPool; @@ -93,7 +94,7 @@ public class RestValidateQueryActionTests extends AbstractSearchTestCase { */ @BeforeClass public static void stubValidateQueryAction() { - final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE); final TransportAction transportAction = new TransportAction<>( ValidateQueryAction.NAME, diff --git a/server/src/test/java/org/opensearch/tasks/TaskManagerTests.java b/server/src/test/java/org/opensearch/tasks/TaskManagerTests.java index fac5c89cdfc92..08fed5f9909fa 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskManagerTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskManagerTests.java @@ -40,6 +40,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.action.ActionListener; import org.opensearch.core.tasks.TaskId; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.TestThreadPool; @@ -97,7 +98,7 @@ public void testResultsServiceRetryTotalTime() { } public void testTrackingChannelTask() throws Exception { - final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE); Set cancelledTasks = ConcurrentCollections.newConcurrentSet(); taskManager.setTaskCancellationService(new TaskCancellationService(mock(TransportService.class)) { @Override @@ -145,7 +146,7 @@ void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitF } public void testTrackingTaskAndCloseChannelConcurrently() throws Exception { - final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE); Set cancelledTasks = ConcurrentCollections.newConcurrentSet(); taskManager.setTaskCancellationService(new TaskCancellationService(mock(TransportService.class)) { @Override diff --git a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java index e002297911788..7f9389de38176 100644 --- a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java @@ -89,7 +89,7 @@ public class InboundHandlerTests extends OpenSearchTestCase { @Before public void setUp() throws Exception { super.setUp(); - taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE); channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address()) { public void sendMessage(BytesReference reference, ActionListener listener) { super.sendMessage(reference, listener); diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 9800782272ede..e8740f4b04936 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -102,6 +102,7 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; @@ -221,7 +222,7 @@ protected class ReplicationGroup implements AutoCloseable, Iterable private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer( - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), + new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE), (request, parentTask, primaryAllocationId, primaryTerm, listener) -> { try { new ResyncAction(request, listener, ReplicationGroup.this).execute(); diff --git a/test/framework/src/main/java/org/opensearch/test/tasks/MockTaskManager.java b/test/framework/src/main/java/org/opensearch/test/tasks/MockTaskManager.java index 677ec7a0a6600..2f338f22e2203 100644 --- a/test/framework/src/main/java/org/opensearch/test/tasks/MockTaskManager.java +++ b/test/framework/src/main/java/org/opensearch/test/tasks/MockTaskManager.java @@ -43,6 +43,7 @@ import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskAwareRequest; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.threadpool.ThreadPool; import java.util.Collection; @@ -65,7 +66,7 @@ public class MockTaskManager extends TaskManager { private final Collection listeners = new CopyOnWriteArrayList<>(); public MockTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { - super(settings, threadPool, taskHeaders); + super(settings, threadPool, taskHeaders, NoopTracer.INSTANCE); } @Override diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java index 6bf5381b62cc9..f362083643101 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java @@ -277,12 +277,13 @@ protected TaskManager createTaskManager( Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, - Set taskHeaders + Set taskHeaders, + Tracer tracer ) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders); + return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders, tracer); } }