Skip to content

Commit

Permalink
Add Tracing instrumentation for OpenSearch tasks management
Browse files Browse the repository at this point in the history
Signed-off-by: Shephali Mittal <[email protected]>
  • Loading branch information
Shephali Mittal committed Feb 6, 2024
1 parent 554cbf7 commit fcfc5a0
Show file tree
Hide file tree
Showing 20 changed files with 143 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.client.NoOpClient;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -162,7 +163,7 @@ public void setupForTest() {
testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest);
listener = new PlainActionFuture<>();
scrollId = null;
taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE);
testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest);
testTask.setWorker(testRequest.getRequestsPerSecond(), null);
worker = testTask.getWorkerState();
Expand Down
25 changes: 25 additions & 0 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.core.tasks.resourcetracker.ThreadResourceInfo;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.telemetry.tracing.Span;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -111,6 +112,8 @@ public class Task {
*/
private final long startTimeNanos;

private final Span span;

public Task(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
this(
id,
Expand Down Expand Up @@ -148,6 +151,21 @@ public Task(
this.headers = headers;
this.resourceStats = resourceStats;
this.resourceTrackingCompletionListeners = resourceTrackingCompletionListeners;
this.span = null;
}

public Task(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers, Span span) {
this.id = id;
this.type = type;
this.action = action;
this.description = description;
this.parentTask = parentTask;
this.startTime = System.currentTimeMillis();
this.startTimeNanos = System.nanoTime();
this.headers = headers;
this.resourceStats = new ConcurrentHashMap<>();
this.resourceTrackingCompletionListeners = new ArrayList<>();
this.span = span;
}

/**
Expand Down Expand Up @@ -277,6 +295,13 @@ public TaskId getParentTaskId() {
return parentTask;
}

/**
* Returns span related to Task
*/
public Span getSpan() {
return span;
}

/**
* Build a status for this task or null if this task doesn't have status.
* Since most tasks don't have status this defaults to returning null. While
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.tasks;

import org.opensearch.core.tasks.TaskId;
import org.opensearch.telemetry.tracing.Span;

import java.util.Map;

Expand Down Expand Up @@ -67,6 +68,13 @@ default Task createTask(long id, String type, String action, TaskId parentTaskId
return new Task(id, type, action, getDescription(), parentTaskId, headers);
}

/**
* Returns the task object that should be used to keep track of the processing of the request.
*/
default Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers, Span span) {
return new Task(id, type, action, getDescription(), parentTaskId, headers, span);
}

/**
* Returns optional description of the request to be displayed by the task manager
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public TaskCancellationMonitoringService(
cancellationStatsHolder = TASKS_TO_TRACK.stream()
.collect(Collectors.toConcurrentMap(task -> task, task -> new TaskCancellationStatsHolder()));
taskManager.addTaskEventListeners(this);
TraceableTaskListener traceableTaskListener = new TraceableTaskListener();
taskManager.addTaskEventListeners(traceableTaskListener);
}

void doRun() {
Expand Down
20 changes: 16 additions & 4 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TcpChannel;

Expand Down Expand Up @@ -132,22 +136,26 @@ public class TaskManager implements ClusterStateApplier {
private final Set<Consumer<Task>> taskResourceConsumer;
private final List<TaskEventListeners> taskEventListeners = new ArrayList<>();

private final Tracer tracer;

public static TaskManager createTaskManagerWithClusterSettings(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
Set<String> taskHeaders
Set<String> taskHeaders,
Tracer tracer
) {
final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders);
final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders, tracer);
clusterSettings.addSettingsUpdateConsumer(TASK_RESOURCE_CONSUMERS_ENABLED, taskManager::setTaskResourceConsumersEnabled);
return taskManager;
}

public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer) {
this.threadPool = threadPool;
this.taskHeaders = new ArrayList<>(taskHeaders);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings);
this.tracer = tracer;
taskResourceConsumer = new HashSet<>();
}

Expand Down Expand Up @@ -203,7 +211,11 @@ public Task register(String type, String action, TaskAwareRequest request) {
headers.put(key, httpHeader);
}
}
Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers);
Span span = tracer.startSpan(SpanBuilder.from("Task", request.getParentTask().getNodeId(), request));
Task task;
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers, span);
}
Objects.requireNonNull(task);
assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId";
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import org.opensearch.telemetry.tracing.Span;

/**
* Task Listener to ensure ending the span properly.
*/
public class TraceableTaskListener implements TaskManager.TaskEventListeners {

/**
* End the span on task cancellation
* @param task task which got cancelled.
*/
public void onTaskCancelled(CancellableTask task) {
Span span = task.getSpan();
if (span != null) {
span.endSpan();
}
}

/**
* End the span on task Completion
* @param task task which got completed.
*/
public void onTaskCompleted(Task task) {
Span span = task.getSpan();
if (span != null) {
span.endSpan();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ private AttributeNames() {
* Refresh Policy
*/
public static final String REFRESH_POLICY = "refresh_policy";

public static final String DESCRIPTION = "description";

public static final String PARENT_TASK_ID = "parent_task_id";
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.http.HttpRequest;
import org.opensearch.rest.RestRequest;
import org.opensearch.tasks.TaskAwareRequest;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.Transport;
Expand Down Expand Up @@ -74,6 +75,10 @@ public static SpanCreationContext from(String spanName, String nodeId, Replicate
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, request));
}

public static SpanCreationContext from(String spanName, String nodeId, TaskAwareRequest request) {
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(request));
}

private static String createSpanName(HttpRequest httpRequest) {
return httpRequest.method().name() + SEPARATOR + httpRequest.uri();
}
Expand All @@ -87,6 +92,13 @@ private static Attributes buildSpanAttributes(HttpRequest httpRequest) {
return attributes;
}

private static Attributes buildSpanAttributes(TaskAwareRequest request) {
Attributes attributes = Attributes.create()
.addAttribute(AttributeNames.DESCRIPTION, request.getDescription())
.addAttribute(AttributeNames.PARENT_TASK_ID, request.getParentTask().getId());
return attributes;
}

private static void populateHeader(HttpRequest httpRequest, Attributes attributes) {
HEADERS_TO_BE_ADDED_AS_ATTRIBUTES.forEach(x -> {
if (httpRequest.getHeaders() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public TransportService(
setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
tracerLog = Loggers.getLogger(logger, ".tracer");
taskManager = createTaskManager(settings, clusterSettings, threadPool, taskHeaders);
taskManager = createTaskManager(settings, clusterSettings, threadPool, taskHeaders, tracer);
this.interceptor = transportInterceptor;
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
Expand Down Expand Up @@ -273,12 +273,13 @@ protected TaskManager createTaskManager(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
Set<String> taskHeaders
Set<String> taskHeaders,
Tracer tracer
) {
if (clusterSettings != null) {
return TaskManager.createTaskManagerWithClusterSettings(settings, clusterSettings, threadPool, taskHeaders);
return TaskManager.createTaskManagerWithClusterSettings(settings, clusterSettings, threadPool, taskHeaders, tracer);
} else {
return new TaskManager(settings, threadPool, taskHeaders);
return new TaskManager(settings, threadPool, taskHeaders, tracer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.tasks.TaskCancellationService;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.tasks.MockTaskManager;
Expand Down Expand Up @@ -226,12 +227,13 @@ protected TaskManager createTaskManager(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
Set<String> taskHeaders
Set<String> taskHeaders,
Tracer tracer
) {
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
return new MockTaskManager(settings, threadPool, taskHeaders);
} else {
return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders);
return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders, tracer);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.node.Node;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void testActionFiltersRequest() throws InterruptedException {
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(
actionName,
actionFilters,
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE)
) {
@Override
protected void doExecute(Task task, TestRequest request, ActionListener<TestResponse> listener) {
Expand Down Expand Up @@ -183,7 +184,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void exe
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(
actionName,
actionFilters,
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE)
) {
@Override
protected void doExecute(Task task, TestRequest request, ActionListener<TestResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
Expand Down Expand Up @@ -76,7 +77,7 @@ private Actions(Settings settings, ThreadPool threadPool, ActionType<?>[] action
private static class InternalTransportAction extends TransportAction {

private InternalTransportAction(Settings settings, String actionName, ThreadPool threadPool) {
super(actionName, EMPTY_FILTERS, new TaskManager(settings, threadPool, Collections.emptySet()));
super(actionName, EMPTY_FILTERS, new TaskManager(settings, threadPool, Collections.emptySet(), NoopTracer.INSTANCE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.noop.NoopTracer;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -74,7 +75,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {

public void testSyncerSendsOffCorrectDocuments() throws Exception {
IndexShard shard = newStartedShard(true);
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE);
AtomicBoolean syncActionCalled = new AtomicBoolean();
List<ResyncReplicationRequest> resyncRequests = new ArrayList<>();
PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> {
Expand Down Expand Up @@ -164,7 +165,7 @@ public void testSyncerOnClosingShard() throws Exception {
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()),
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE),
syncAction
);
syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -133,7 +134,7 @@ public void testStartTask() {
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(
persistentTasksService,
registry,
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()),
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE),
executor
);

Expand Down Expand Up @@ -248,7 +249,7 @@ public void testParamsStatusAndNodeTaskAreDelegated() throws Exception {
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(
persistentTasksService,
registry,
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()),
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE),
executor
);

Expand Down Expand Up @@ -305,7 +306,7 @@ public void sendCompletionRequest(

int nonLocalNodesCount = randomInt(10);
MockExecutor executor = new MockExecutor();
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE);
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(persistentTasksService, registry, taskManager, executor);

ClusterState state = createInitialClusterState(nonLocalNodesCount, Settings.EMPTY);
Expand Down Expand Up @@ -394,7 +395,7 @@ public void sendCompletionRequest(
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(
persistentTasksService,
registry,
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()),
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE),
executor
);

Expand Down
Loading

0 comments on commit fcfc5a0

Please sign in to comment.