diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 28e8a9fa96469..c6f21897801ff 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -291,6 +291,9 @@ If the JVM does not support CPU time measurement for the current thread, `ingest |`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included, and is only supported for Middle Manager nodes.| `category`, `workerVersion`|Varies| |`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies| |`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies| +|`worker/task/assigned/count`|Number of tasks assigned to an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| +|`worker/task/completed/count`|Number of tasks completed by an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| +|`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| ## Shuffle metrics (Native parallel task) diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 3d2422ce235f8..4fcdba685c091 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -66,6 +66,9 @@ "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, + "worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : "count" }, + "worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : "count" }, + "worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, "worker/task/success/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, "worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 86bb642ac2f13..729ac1d1617e3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -51,12 +51,16 @@ import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestsSnapshot; +import org.apache.druid.server.metrics.IndexerTaskCountStatsProvider; +import org.apache.druid.utils.CollectionUtils; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -67,6 +71,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -76,7 +81,7 @@ * starts running and completed task on disk is deleted based on a periodic schedule where overlord is asked for * active tasks to see which completed tasks are safe to delete. */ -public class WorkerTaskManager +public class WorkerTaskManager implements IndexerTaskCountStatsProvider { private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class); @@ -607,6 +612,34 @@ void addCompletedTask(final String taskId, final TaskAnnouncement taskAnnounceme completedTasks.put(taskId, taskAnnouncement); } + private Map getNumTasksPerDatasource(Collection taskList, Function getDataSourceFunc) + { + final Map dataSourceToTaskCount = new HashMap<>(); + + for (T task : taskList) { + dataSourceToTaskCount.merge(getDataSourceFunc.apply(task), 1L, Long::sum); + } + return dataSourceToTaskCount; + } + + @Override + public Map getWorkerRunningTasks() + { + return getNumTasksPerDatasource(CollectionUtils.mapValues(runningTasks, detail -> detail.task).values(), Task::getDataSource); + } + + @Override + public Map getWorkerAssignedTasks() + { + return getNumTasksPerDatasource(assignedTasks.values(), Task::getDataSource); + } + + @Override + public Map getWorkerCompletedTasks() + { + return getNumTasksPerDatasource(this.getCompletedTasks().values(), TaskAnnouncement::getTaskDataSource); + } + private static class TaskDetails { private final Task task; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 93c5635492dae..b492ae78d03e4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -447,6 +447,11 @@ private NoopTask createNoopTask(String id) return new NoopTask(id, null, null, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)); } + private NoopTask createNoopTask(String id, String dataSource) + { + return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)); + } + /** * Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble * for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}. @@ -477,4 +482,43 @@ private Task setUpCompletedTasksCleanupTest() throws Exception EasyMock.reset(overlordClient); return task; } + + @Test + public void getWorkerTaskStatsTest() throws Exception + { + EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes(); + EasyMock.replay(overlordClient); + + Task task1 = createNoopTask("task1", "wikipedia"); + Task task2 = createNoopTask("task2", "wikipedia"); + Task task3 = createNoopTask("task3", "animals"); + + workerTaskManager.start(); + // befor assigning tasks we should get no running tasks + Assert.assertEquals(workerTaskManager.getWorkerRunningTasks().size(), 0L); + + workerTaskManager.assignTask(task1); + workerTaskManager.assignTask(task2); + workerTaskManager.assignTask(task3); + + Thread.sleep(25); + //should return all 3 tasks as running + Assert.assertEquals(workerTaskManager.getWorkerRunningTasks(), ImmutableMap.of( + "wikipedia", 2L, + "animals", 1L + )); + + Map runningTasks; + do { + runningTasks = workerTaskManager.getWorkerRunningTasks(); + Thread.sleep(10); + } while (!runningTasks.isEmpty()); + + // When running tasks are empty all task should be reported as completed + Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), ImmutableMap.of( + "wikipedia", 2L, + "animals", 1L + )); + Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L); + } } diff --git a/licenses.yaml b/licenses.yaml index d2469f1ca295e..ec14741b56f80 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -191,7 +191,7 @@ name: AWS SDK for Java license_category: binary module: java-core license_name: Apache License version 2.0 -version: 1.12.497 +version: 1.12.638 libraries: - com.amazonaws: aws-java-sdk-core - com.amazonaws: aws-java-sdk-ec2 @@ -505,7 +505,7 @@ name: Apache Commons Codec license_category: binary module: java-core license_name: Apache License version 2.0 -version: 1.13 +version: 1.16.1 libraries: - commons-codec: commons-codec notices: @@ -630,7 +630,7 @@ name: Apache Commons Compress license_category: binary module: java-core license_name: Apache License version 2.0 -version: 1.24.0 +version: 1.26.0 libraries: - org.apache.commons: commons-compress notices: @@ -963,7 +963,7 @@ name: org.bitbucket.b_c jose4j license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 0.9.3 +version: 0.9.6 libraries: - org.bitbucket.b_c: jose4j @@ -2380,20 +2380,6 @@ libraries: --- -name: Ion Java -license_category: binary -module: java-core -license_name: Apache License version 2.0 -version: 1.0.2 -libraries: - - software.amazon.ion: ion-java -notices: - - ion-java: | - Amazon Ion Java - Copyright 2007-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - ---- - name: Apache Hadoop license_category: binary module: hadoop-client @@ -3441,7 +3427,7 @@ name: PostgreSQL JDBC Driver license_category: binary module: extensions/druid-lookups-cached-single license_name: BSD-2-Clause License -version: 42.6.0 +version: 42.7.2 copyright: PostgreSQL Global Development Group license_file_path: licenses/bin/postgresql.BSD2 libraries: @@ -3453,7 +3439,7 @@ name: PostgreSQL JDBC Driver license_category: binary module: extensions/druid-lookups-cached-global license_name: BSD-2-Clause License -version: 42.6.0 +version: 42.7.2 copyright: PostgreSQL Global Development Group license_file_path: licenses/bin/postgresql.BSD2 libraries: @@ -3465,7 +3451,7 @@ name: PostgreSQL JDBC Driver license_category: binary module: extensions/postgresql-metadata-storage license_name: BSD-2-Clause License -version: 42.6.0 +version: 42.7.2 copyright: PostgreSQL Global Development Group license_file_path: licenses/bin/postgresql.BSD2 libraries: diff --git a/pom.xml b/pom.xml index d2d24d6ec56b4..c0f3037f536e9 100644 --- a/pom.xml +++ b/pom.xml @@ -105,7 +105,7 @@ 2.7.3 3.10.6.Final 4.1.100.Final - 42.6.0 + 42.7.2 3.24.0 1.3.1 1.7.36 @@ -117,7 +117,7 @@ however it is required in some cases when running against mockito 4.x (mockito 4.x is required for Java <11. We use the following property to pick the proper artifact based on Java version (see pre-java-11 profile) --> core - 1.12.497 + 1.12.638 2.8.0 0.8.7 6.2.5.Final @@ -284,7 +284,7 @@ commons-codec commons-codec - 1.13 + 1.16.1 commons-io @@ -381,7 +381,7 @@ org.bitbucket.b_c jose4j - 0.9.3 + 0.9.6 + + commons-codec + commons-codec + runtime + org.apache.commons commons-math3 diff --git a/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java new file mode 100644 index 0000000000000..735bc27abb3a5 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import java.util.Map; + +/** + * Provides task count metrics for the indexers + * These metrics are reported by indexers + */ +public interface IndexerTaskCountStatsProvider +{ + /** + * Map from datasource name to the number of running tasks on the Indexer. + */ + Map getWorkerRunningTasks(); + + /** + * Map from datasource name to the number of assigned tasks to the Indexer. + */ + Map getWorkerAssignedTasks(); + + /** + * Map from datasource name to the number of completed tasks by the Indexer. + */ + Map getWorkerCompletedTasks(); +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java index d6b322c2f0e98..d07311c1a4626 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java @@ -28,11 +28,13 @@ import org.apache.druid.java.util.metrics.AbstractMonitor; import org.apache.druid.query.DruidMetrics; +import java.util.Map; import java.util.Set; public class WorkerTaskCountStatsMonitor extends AbstractMonitor { private final WorkerTaskCountStatsProvider statsProvider; + private final IndexerTaskCountStatsProvider indexerStatsProvider; private final String workerCategory; private final String workerVersion; private final boolean isMiddleManager; @@ -46,9 +48,11 @@ public WorkerTaskCountStatsMonitor( this.isMiddleManager = nodeRoles.contains(NodeRole.MIDDLE_MANAGER); if (isMiddleManager) { this.statsProvider = injector.getInstance(WorkerTaskCountStatsProvider.class); + this.indexerStatsProvider = null; this.workerCategory = statsProvider.getWorkerCategory(); this.workerVersion = statsProvider.getWorkerVersion(); } else { + this.indexerStatsProvider = injector.getInstance(IndexerTaskCountStatsProvider.class); this.statsProvider = null; this.workerCategory = null; this.workerVersion = null; @@ -64,6 +68,10 @@ public boolean doMonitor(ServiceEmitter emitter) emit(emitter, "worker/taskSlot/idle/count", statsProvider.getWorkerIdleTaskSlotCount()); emit(emitter, "worker/taskSlot/total/count", statsProvider.getWorkerTotalTaskSlotCount()); emit(emitter, "worker/taskSlot/used/count", statsProvider.getWorkerUsedTaskSlotCount()); + } else { + emit(emitter, "worker/task/running/count", indexerStatsProvider.getWorkerRunningTasks()); + emit(emitter, "worker/task/assigned/count", indexerStatsProvider.getWorkerAssignedTasks()); + emit(emitter, "worker/task/completed/count", indexerStatsProvider.getWorkerCompletedTasks()); } return true; } @@ -77,4 +85,15 @@ private void emit(ServiceEmitter emitter, String metricName, Long value) emitter.emit(builder.setMetric(metricName, value)); } } + + public void emit(ServiceEmitter emitter, String metricName, Map dataSourceTaskMap) + { + for (Map.Entry dataSourceTaskCount : dataSourceTaskMap.entrySet()) { + if (dataSourceTaskCount.getValue() != null) { + ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + builder.setDimension(DruidMetrics.DATASOURCE, dataSourceTaskCount.getKey()); + emitter.emit(builder.setMetric(metricName, dataSourceTaskCount.getValue())); + } + } + } } diff --git a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java index fadb2f9882693..ff9fcffb8d999 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java @@ -31,14 +31,17 @@ import org.junit.Test; import javax.annotation.Nullable; +import java.util.Map; public class WorkerTaskCountStatsMonitorTest { private Injector injectorForMiddleManager; private Injector injectorForMiddleManagerNullStats; private Injector injectorForPeon; + private Injector injectorForIndexer; private WorkerTaskCountStatsProvider statsProvider; + private IndexerTaskCountStatsProvider indexerTaskStatsProvider; private WorkerTaskCountStatsProvider nullStatsProvider; @Before @@ -89,6 +92,36 @@ public String getWorkerVersion() } }; + indexerTaskStatsProvider = new IndexerTaskCountStatsProvider() + { + @Override + public Map getWorkerRunningTasks() + { + return ImmutableMap.of( + "wikipedia", 2L, + "animals", 3L + ); + } + + @Override + public Map getWorkerAssignedTasks() + { + return ImmutableMap.of( + "products", 3L, + "orders", 7L + ); + } + + @Override + public Map getWorkerCompletedTasks() + { + return ImmutableMap.of( + "inventory", 8L, + "metrics", 9L + ); + } + }; + nullStatsProvider = new WorkerTaskCountStatsProvider() { @Nullable @@ -156,6 +189,12 @@ public String getWorkerVersion() injectorForPeon = Guice.createInjector( ImmutableList.of(binder -> {}) ); + + injectorForIndexer = Guice.createInjector( + ImmutableList.of( + binder -> binder.bind(IndexerTaskCountStatsProvider.class).toInstance(indexerTaskStatsProvider) + ) + ); } @Test @@ -194,20 +233,49 @@ public void testMonitor() } @Test - public void testMonitorWithNulls() + public void testMonitorIndexer() { final WorkerTaskCountStatsMonitor monitor = - new WorkerTaskCountStatsMonitor(injectorForMiddleManagerNullStats, ImmutableSet.of(NodeRole.MIDDLE_MANAGER)); + new WorkerTaskCountStatsMonitor(injectorForIndexer, ImmutableSet.of(NodeRole.INDEXER)); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(0, emitter.getEvents().size()); + Assert.assertEquals(6, emitter.getEvents().size()); + emitter.verifyValue( + "worker/task/running/count", + ImmutableMap.of("dataSource", "wikipedia"), + 2L + ); + emitter.verifyValue( + "worker/task/running/count", + ImmutableMap.of("dataSource", "animals"), + 3L + ); + emitter.verifyValue( + "worker/task/assigned/count", + ImmutableMap.of("dataSource", "products"), + 3L + ); + emitter.verifyValue( + "worker/task/assigned/count", + ImmutableMap.of("dataSource", "orders"), + 7L + ); + emitter.verifyValue( + "worker/task/completed/count", + ImmutableMap.of("dataSource", "inventory"), + 8L + ); + emitter.verifyValue( + "worker/task/completed/count", + ImmutableMap.of("dataSource", "metrics"), + 9L + ); } - @Test - public void testMonitorNotMiddleManager() + public void testMonitorWithNulls() { final WorkerTaskCountStatsMonitor monitor = - new WorkerTaskCountStatsMonitor(injectorForPeon, ImmutableSet.of(NodeRole.PEON)); + new WorkerTaskCountStatsMonitor(injectorForMiddleManagerNullStats, ImmutableSet.of(NodeRole.MIDDLE_MANAGER)); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); Assert.assertEquals(0, emitter.getEvents().size()); diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index aea0922efb266..7b90b2eddcd25 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -61,6 +61,7 @@ import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.ThreadingTaskRunner; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.WorkerTaskManager; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.shuffle.ShuffleModule; import org.apache.druid.java.util.common.logger.Logger; @@ -79,6 +80,7 @@ import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.jetty.CliIndexerServerModule; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; +import org.apache.druid.server.metrics.IndexerTaskCountStatsProvider; import org.eclipse.jetty.server.Server; import java.util.List; @@ -152,6 +154,7 @@ public void configure(Binder binder) binder.bind(TaskRunner.class).to(ThreadingTaskRunner.class); binder.bind(QuerySegmentWalker.class).to(ThreadingTaskRunner.class); binder.bind(ThreadingTaskRunner.class).in(LazySingleton.class); + binder.bind(IndexerTaskCountStatsProvider.class).to(WorkerTaskManager.class); CliPeon.bindRowIngestionMeters(binder); CliPeon.bindChatHandler(binder);