Skip to content

Commit

Permalink
Add Tracing instrumentation for OpenSearch tasks management
Browse files Browse the repository at this point in the history
Signed-off-by: Shephali Mittal <[email protected]>
  • Loading branch information
Shephali Mittal committed Feb 6, 2024
1 parent 554cbf7 commit 013e09f
Show file tree
Hide file tree
Showing 21 changed files with 144 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove concurrent segment search feature flag for GA launch ([#12074](https://github.com/opensearch-project/OpenSearch/pull/12074))
- Enable Fuzzy codec for doc id fields using a bloom filter ([#11022](https://github.com/opensearch-project/OpenSearch/pull/11022))
- [Metrics Framework] Adds support for Histogram metric ([#12062](https://github.com/opensearch-project/OpenSearch/pull/12062))
- [Tracing Instrumentation] Add instrumentation for OpenSearch tasks management ([#12187](https://github.com/opensearch-project/OpenSearch/pull/12187))

### Dependencies
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.client.NoOpClient;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -162,7 +163,7 @@ public void setupForTest() {
testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest);
listener = new PlainActionFuture<>();
scrollId = null;
taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE);
testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest);
testTask.setWorker(testRequest.getRequestsPerSecond(), null);
worker = testTask.getWorkerState();
Expand Down
25 changes: 25 additions & 0 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.core.tasks.resourcetracker.ThreadResourceInfo;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.telemetry.tracing.Span;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -111,6 +112,8 @@ public class Task {
*/
private final long startTimeNanos;

private final Span span;

public Task(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
this(
id,
Expand Down Expand Up @@ -148,6 +151,21 @@ public Task(
this.headers = headers;
this.resourceStats = resourceStats;
this.resourceTrackingCompletionListeners = resourceTrackingCompletionListeners;
this.span = null;
}

public Task(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers, Span span) {
this.id = id;
this.type = type;
this.action = action;
this.description = description;
this.parentTask = parentTask;
this.startTime = System.currentTimeMillis();
this.startTimeNanos = System.nanoTime();
this.headers = headers;
this.resourceStats = new ConcurrentHashMap<>();
this.resourceTrackingCompletionListeners = new ArrayList<>();
this.span = span;
}

/**
Expand Down Expand Up @@ -277,6 +295,13 @@ public TaskId getParentTaskId() {
return parentTask;
}

/**
* Returns span related to Task
*/
public Span getSpan() {
return span;
}

/**
* Build a status for this task or null if this task doesn't have status.
* Since most tasks don't have status this defaults to returning null. While
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.tasks;

import org.opensearch.core.tasks.TaskId;
import org.opensearch.telemetry.tracing.Span;

import java.util.Map;

Expand Down Expand Up @@ -67,6 +68,13 @@ default Task createTask(long id, String type, String action, TaskId parentTaskId
return new Task(id, type, action, getDescription(), parentTaskId, headers);
}

/**
* Returns the task object that should be used to keep track of the processing of the request.
*/
default Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers, Span span) {
return new Task(id, type, action, getDescription(), parentTaskId, headers, span);
}

/**
* Returns optional description of the request to be displayed by the task manager
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public TaskCancellationMonitoringService(
cancellationStatsHolder = TASKS_TO_TRACK.stream()
.collect(Collectors.toConcurrentMap(task -> task, task -> new TaskCancellationStatsHolder()));
taskManager.addTaskEventListeners(this);
TraceableTaskListener traceableTaskListener = new TraceableTaskListener();
taskManager.addTaskEventListeners(traceableTaskListener);
}

void doRun() {
Expand Down
20 changes: 16 additions & 4 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TcpChannel;

Expand Down Expand Up @@ -132,22 +136,26 @@ public class TaskManager implements ClusterStateApplier {
private final Set<Consumer<Task>> taskResourceConsumer;
private final List<TaskEventListeners> taskEventListeners = new ArrayList<>();

private final Tracer tracer;

public static TaskManager createTaskManagerWithClusterSettings(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
Set<String> taskHeaders
Set<String> taskHeaders,
Tracer tracer
) {
final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders);
final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders, tracer);
clusterSettings.addSettingsUpdateConsumer(TASK_RESOURCE_CONSUMERS_ENABLED, taskManager::setTaskResourceConsumersEnabled);
return taskManager;
}

public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders, Tracer tracer) {
this.threadPool = threadPool;
this.taskHeaders = new ArrayList<>(taskHeaders);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings);
this.tracer = tracer;
taskResourceConsumer = new HashSet<>();
}

Expand Down Expand Up @@ -203,7 +211,11 @@ public Task register(String type, String action, TaskAwareRequest request) {
headers.put(key, httpHeader);
}
}
Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers);
Span span = tracer.startSpan(SpanBuilder.from("Task", request.getParentTask().getNodeId(), request));
Task task;
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers, span);
}
Objects.requireNonNull(task);
assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId";
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import org.opensearch.telemetry.tracing.Span;

/**
* Task Listener to ensure ending the span properly.
*/
public class TraceableTaskListener implements TaskManager.TaskEventListeners {

/**
* End the span on task cancellation
* @param task task which got cancelled.
*/
public void onTaskCancelled(CancellableTask task) {
Span span = task.getSpan();
if (span != null) {
span.endSpan();
}
}

/**
* End the span on task Completion
* @param task task which got completed.
*/
public void onTaskCompleted(Task task) {
Span span = task.getSpan();
if (span != null) {
span.endSpan();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ private AttributeNames() {
* Refresh Policy
*/
public static final String REFRESH_POLICY = "refresh_policy";

public static final String DESCRIPTION = "description";

public static final String PARENT_TASK_ID = "parent_task_id";
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.http.HttpRequest;
import org.opensearch.rest.RestRequest;
import org.opensearch.tasks.TaskAwareRequest;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.Transport;
Expand Down Expand Up @@ -74,6 +75,10 @@ public static SpanCreationContext from(String spanName, String nodeId, Replicate
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, request));
}

public static SpanCreationContext from(String spanName, String nodeId, TaskAwareRequest request) {
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(request));
}

private static String createSpanName(HttpRequest httpRequest) {
return httpRequest.method().name() + SEPARATOR + httpRequest.uri();
}
Expand All @@ -87,6 +92,13 @@ private static Attributes buildSpanAttributes(HttpRequest httpRequest) {
return attributes;
}

private static Attributes buildSpanAttributes(TaskAwareRequest request) {
Attributes attributes = Attributes.create()
.addAttribute(AttributeNames.DESCRIPTION, request.getDescription())
.addAttribute(AttributeNames.PARENT_TASK_ID, request.getParentTask().getId());
return attributes;
}

private static void populateHeader(HttpRequest httpRequest, Attributes attributes) {
HEADERS_TO_BE_ADDED_AS_ATTRIBUTES.forEach(x -> {
if (httpRequest.getHeaders() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public TransportService(
setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
tracerLog = Loggers.getLogger(logger, ".tracer");
taskManager = createTaskManager(settings, clusterSettings, threadPool, taskHeaders);
taskManager = createTaskManager(settings, clusterSettings, threadPool, taskHeaders, tracer);
this.interceptor = transportInterceptor;
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
Expand Down Expand Up @@ -273,12 +273,13 @@ protected TaskManager createTaskManager(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
Set<String> taskHeaders
Set<String> taskHeaders,
Tracer tracer
) {
if (clusterSettings != null) {
return TaskManager.createTaskManagerWithClusterSettings(settings, clusterSettings, threadPool, taskHeaders);
return TaskManager.createTaskManagerWithClusterSettings(settings, clusterSettings, threadPool, taskHeaders, tracer);
} else {
return new TaskManager(settings, threadPool, taskHeaders);
return new TaskManager(settings, threadPool, taskHeaders, tracer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.tasks.TaskCancellationService;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.tasks.MockTaskManager;
Expand Down Expand Up @@ -226,12 +227,13 @@ protected TaskManager createTaskManager(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
Set<String> taskHeaders
Set<String> taskHeaders,
Tracer tracer
) {
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
return new MockTaskManager(settings, threadPool, taskHeaders);
} else {
return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders);
return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders, tracer);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.node.Node;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void testActionFiltersRequest() throws InterruptedException {
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(
actionName,
actionFilters,
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE)
) {
@Override
protected void doExecute(Task task, TestRequest request, ActionListener<TestResponse> listener) {
Expand Down Expand Up @@ -183,7 +184,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void exe
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(
actionName,
actionFilters,
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE)
) {
@Override
protected void doExecute(Task task, TestRequest request, ActionListener<TestResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
Expand Down Expand Up @@ -76,7 +77,7 @@ private Actions(Settings settings, ThreadPool threadPool, ActionType<?>[] action
private static class InternalTransportAction extends TransportAction {

private InternalTransportAction(Settings settings, String actionName, ThreadPool threadPool) {
super(actionName, EMPTY_FILTERS, new TaskManager(settings, threadPool, Collections.emptySet()));
super(actionName, EMPTY_FILTERS, new TaskManager(settings, threadPool, Collections.emptySet(), NoopTracer.INSTANCE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.noop.NoopTracer;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -74,7 +75,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {

public void testSyncerSendsOffCorrectDocuments() throws Exception {
IndexShard shard = newStartedShard(true);
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE);
AtomicBoolean syncActionCalled = new AtomicBoolean();
List<ResyncReplicationRequest> resyncRequests = new ArrayList<>();
PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> {
Expand Down Expand Up @@ -164,7 +165,7 @@ public void testSyncerOnClosingShard() throws Exception {
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()),
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet(), NoopTracer.INSTANCE),
syncAction
);
syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately
Expand Down
Loading

0 comments on commit 013e09f

Please sign in to comment.