Skip to content

Commit

Permalink
Merge branch '28.0.1-confluent' of github.com:confluentinc/druid into…
Browse files Browse the repository at this point in the history
… 28.0.1-confluent
  • Loading branch information
pagrawal10 committed Jun 1, 2024
2 parents aa88da3 + 190f179 commit 319ed8a
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 33 deletions.
3 changes: 3 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ If the JVM does not support CPU time measurement for the current thread, `ingest
|`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included, and is only supported for Middle Manager nodes.| `category`, `workerVersion`|Varies|
|`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies|
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies|
|`worker/task/assigned/count`|Number of tasks assigned to an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/completed/count`|Number of tasks completed by an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|

## Shuffle metrics (Native parallel task)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },

"worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" },
"worker/task/success/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" },
"worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.metrics.IndexerTaskCountStatsProvider;
import org.apache.druid.utils.CollectionUtils;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -67,6 +71,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -76,7 +81,7 @@
* starts running and completed task on disk is deleted based on a periodic schedule where overlord is asked for
* active tasks to see which completed tasks are safe to delete.
*/
public class WorkerTaskManager
public class WorkerTaskManager implements IndexerTaskCountStatsProvider
{
private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class);

Expand Down Expand Up @@ -607,6 +612,34 @@ void addCompletedTask(final String taskId, final TaskAnnouncement taskAnnounceme
completedTasks.put(taskId, taskAnnouncement);
}

private <T> Map<String, Long> getNumTasksPerDatasource(Collection<T> taskList, Function<T, String> getDataSourceFunc)
{
final Map<String, Long> dataSourceToTaskCount = new HashMap<>();

for (T task : taskList) {
dataSourceToTaskCount.merge(getDataSourceFunc.apply(task), 1L, Long::sum);
}
return dataSourceToTaskCount;
}

@Override
public Map<String, Long> getWorkerRunningTasks()
{
return getNumTasksPerDatasource(CollectionUtils.mapValues(runningTasks, detail -> detail.task).values(), Task::getDataSource);
}

@Override
public Map<String, Long> getWorkerAssignedTasks()
{
return getNumTasksPerDatasource(assignedTasks.values(), Task::getDataSource);
}

@Override
public Map<String, Long> getWorkerCompletedTasks()
{
return getNumTasksPerDatasource(this.getCompletedTasks().values(), TaskAnnouncement::getTaskDataSource);
}

private static class TaskDetails
{
private final Task task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,11 @@ private NoopTask createNoopTask(String id)
return new NoopTask(id, null, null, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
}

private NoopTask createNoopTask(String id, String dataSource)
{
return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
}

/**
* Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble
* for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}.
Expand Down Expand Up @@ -477,4 +482,43 @@ private Task setUpCompletedTasksCleanupTest() throws Exception
EasyMock.reset(overlordClient);
return task;
}

@Test
public void getWorkerTaskStatsTest() throws Exception
{
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
EasyMock.replay(overlordClient);

Task task1 = createNoopTask("task1", "wikipedia");
Task task2 = createNoopTask("task2", "wikipedia");
Task task3 = createNoopTask("task3", "animals");

workerTaskManager.start();
// befor assigning tasks we should get no running tasks
Assert.assertEquals(workerTaskManager.getWorkerRunningTasks().size(), 0L);

workerTaskManager.assignTask(task1);
workerTaskManager.assignTask(task2);
workerTaskManager.assignTask(task3);

Thread.sleep(25);
//should return all 3 tasks as running
Assert.assertEquals(workerTaskManager.getWorkerRunningTasks(), ImmutableMap.of(
"wikipedia", 2L,
"animals", 1L
));

Map<String, Long> runningTasks;
do {
runningTasks = workerTaskManager.getWorkerRunningTasks();
Thread.sleep(10);
} while (!runningTasks.isEmpty());

// When running tasks are empty all task should be reported as completed
Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), ImmutableMap.of(
"wikipedia", 2L,
"animals", 1L
));
Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L);
}
}
28 changes: 7 additions & 21 deletions licenses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ name: AWS SDK for Java
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 1.12.497
version: 1.12.638
libraries:
- com.amazonaws: aws-java-sdk-core
- com.amazonaws: aws-java-sdk-ec2
Expand Down Expand Up @@ -505,7 +505,7 @@ name: Apache Commons Codec
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 1.13
version: 1.16.1
libraries:
- commons-codec: commons-codec
notices:
Expand Down Expand Up @@ -630,7 +630,7 @@ name: Apache Commons Compress
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 1.24.0
version: 1.26.0
libraries:
- org.apache.commons: commons-compress
notices:
Expand Down Expand Up @@ -963,7 +963,7 @@ name: org.bitbucket.b_c jose4j
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 0.9.3
version: 0.9.6
libraries:
- org.bitbucket.b_c: jose4j

Expand Down Expand Up @@ -2380,20 +2380,6 @@ libraries:

---

name: Ion Java
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 1.0.2
libraries:
- software.amazon.ion: ion-java
notices:
- ion-java: |
Amazon Ion Java
Copyright 2007-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
---

name: Apache Hadoop
license_category: binary
module: hadoop-client
Expand Down Expand Up @@ -3441,7 +3427,7 @@ name: PostgreSQL JDBC Driver
license_category: binary
module: extensions/druid-lookups-cached-single
license_name: BSD-2-Clause License
version: 42.6.0
version: 42.7.2
copyright: PostgreSQL Global Development Group
license_file_path: licenses/bin/postgresql.BSD2
libraries:
Expand All @@ -3453,7 +3439,7 @@ name: PostgreSQL JDBC Driver
license_category: binary
module: extensions/druid-lookups-cached-global
license_name: BSD-2-Clause License
version: 42.6.0
version: 42.7.2
copyright: PostgreSQL Global Development Group
license_file_path: licenses/bin/postgresql.BSD2
libraries:
Expand All @@ -3465,7 +3451,7 @@ name: PostgreSQL JDBC Driver
license_category: binary
module: extensions/postgresql-metadata-storage
license_name: BSD-2-Clause License
version: 42.6.0
version: 42.7.2
copyright: PostgreSQL Global Development Group
license_file_path: licenses/bin/postgresql.BSD2
libraries:
Expand Down
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
<mariadb.version>2.7.3</mariadb.version>
<netty3.version>3.10.6.Final</netty3.version>
<netty4.version>4.1.100.Final</netty4.version>
<postgresql.version>42.6.0</postgresql.version>
<postgresql.version>42.7.2</postgresql.version>
<protobuf.version>3.24.0</protobuf.version>
<resilience4j.version>1.3.1</resilience4j.version>
<slf4j.version>1.7.36</slf4j.version>
Expand All @@ -117,7 +117,7 @@
however it is required in some cases when running against mockito 4.x (mockito 4.x is required for Java <11.
We use the following property to pick the proper artifact based on Java version (see pre-java-11 profile) -->
<mockito.inline.artifact>core</mockito.inline.artifact>
<aws.sdk.version>1.12.497</aws.sdk.version>
<aws.sdk.version>1.12.638</aws.sdk.version>
<caffeine.version>2.8.0</caffeine.version>
<jacoco.version>0.8.7</jacoco.version>
<hibernate-validator.version>6.2.5.Final</hibernate-validator.version>
Expand Down Expand Up @@ -284,7 +284,7 @@
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.13</version>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down Expand Up @@ -381,7 +381,7 @@
<dependency>
<groupId>org.bitbucket.b_c</groupId>
<artifactId>jose4j</artifactId>
<version>0.9.3</version>
<version>0.9.6</version>
</dependency>
<!-- transitive dependency of kafka-clientorg.apache.calcite:calcite-testkit
and kafka-protobuf-provider
Expand Down Expand Up @@ -570,7 +570,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.24.0</version>
<version>1.26.0</version>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
Expand Down
6 changes: 6 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<!-- commons-codec is an optional dependency of commons-compress starting with 1.26.0 which we require at runtime -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.metrics;

import java.util.Map;

/**
* Provides task count metrics for the indexers
* These metrics are reported by indexers
*/
public interface IndexerTaskCountStatsProvider
{
/**
* Map from datasource name to the number of running tasks on the Indexer.
*/
Map<String, Long> getWorkerRunningTasks();

/**
* Map from datasource name to the number of assigned tasks to the Indexer.
*/
Map<String, Long> getWorkerAssignedTasks();

/**
* Map from datasource name to the number of completed tasks by the Indexer.
*/
Map<String, Long> getWorkerCompletedTasks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.query.DruidMetrics;

import java.util.Map;
import java.util.Set;

public class WorkerTaskCountStatsMonitor extends AbstractMonitor
{
private final WorkerTaskCountStatsProvider statsProvider;
private final IndexerTaskCountStatsProvider indexerStatsProvider;
private final String workerCategory;
private final String workerVersion;
private final boolean isMiddleManager;
Expand All @@ -46,9 +48,11 @@ public WorkerTaskCountStatsMonitor(
this.isMiddleManager = nodeRoles.contains(NodeRole.MIDDLE_MANAGER);
if (isMiddleManager) {
this.statsProvider = injector.getInstance(WorkerTaskCountStatsProvider.class);
this.indexerStatsProvider = null;
this.workerCategory = statsProvider.getWorkerCategory();
this.workerVersion = statsProvider.getWorkerVersion();
} else {
this.indexerStatsProvider = injector.getInstance(IndexerTaskCountStatsProvider.class);
this.statsProvider = null;
this.workerCategory = null;
this.workerVersion = null;
Expand All @@ -64,6 +68,10 @@ public boolean doMonitor(ServiceEmitter emitter)
emit(emitter, "worker/taskSlot/idle/count", statsProvider.getWorkerIdleTaskSlotCount());
emit(emitter, "worker/taskSlot/total/count", statsProvider.getWorkerTotalTaskSlotCount());
emit(emitter, "worker/taskSlot/used/count", statsProvider.getWorkerUsedTaskSlotCount());
} else {
emit(emitter, "worker/task/running/count", indexerStatsProvider.getWorkerRunningTasks());
emit(emitter, "worker/task/assigned/count", indexerStatsProvider.getWorkerAssignedTasks());
emit(emitter, "worker/task/completed/count", indexerStatsProvider.getWorkerCompletedTasks());
}
return true;
}
Expand All @@ -77,4 +85,15 @@ private void emit(ServiceEmitter emitter, String metricName, Long value)
emitter.emit(builder.setMetric(metricName, value));
}
}

public void emit(ServiceEmitter emitter, String metricName, Map<String, Long> dataSourceTaskMap)
{
for (Map.Entry<String, Long> dataSourceTaskCount : dataSourceTaskMap.entrySet()) {
if (dataSourceTaskCount.getValue() != null) {
ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
builder.setDimension(DruidMetrics.DATASOURCE, dataSourceTaskCount.getKey());
emitter.emit(builder.setMetric(metricName, dataSourceTaskCount.getValue()));
}
}
}
}
Loading

0 comments on commit 319ed8a

Please sign in to comment.