From f41b1719dd941b80370bad33e0eaec8d5eb400ef Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Tue, 14 Jan 2025 10:58:30 +0000 Subject: [PATCH 1/6] remove cpuTimeAcc from createSegmentMapF (cherry picked from commit e875e92b41ae24fe049364c317d2d830fcf06ccd) --- .../BaseLeafFrameProcessorFactory.java | 3 +-- .../BroadcastJoinSegmentMapFnProcessor.java | 2 +- .../msq/querykit/InputNumberDataSource.java | 11 ----------- .../querykit/SimpleSegmentMapFnProcessor.java | 2 +- .../org/apache/druid/query/DataSource.java | 6 ++++-- .../apache/druid/query/FilteredDataSource.java | 18 ++++-------------- .../query/FrameBasedInlineDataSource.java | 7 ------- .../apache/druid/query/InlineDataSource.java | 12 ------------ .../org/apache/druid/query/JoinDataSource.java | 16 ++-------------- .../apache/druid/query/LookupDataSource.java | 13 ------------- .../apache/druid/query/QueryDataSource.java | 6 ++---- .../apache/druid/query/TableDataSource.java | 13 ------------- .../apache/druid/query/UnionDataSource.java | 12 ------------ .../apache/druid/query/UnnestDataSource.java | 14 +++----------- .../apache/druid/query/union/UnionQuery.java | 3 +-- .../druid/query/QueryDataSourceTest.java | 9 +++------ .../druid/query/QueryRunnerTestHelper.java | 2 +- .../druid/segment/join/NoopDataSource.java | 14 -------------- .../appenderator/SinkQuerySegmentWalker.java | 14 +++++++------- .../druid/server/LocalQuerySegmentWalker.java | 6 +----- .../server/coordination/ServerManager.java | 8 +++++--- .../server/TestClusterQuerySegmentWalker.java | 4 +--- .../calcite/external/ExternalDataSource.java | 4 +--- 23 files changed, 38 insertions(+), 161 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java index 013b6d4c93c0..3e6db7753933 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java @@ -58,7 +58,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; @@ -163,7 +162,7 @@ public ProcessorsAndChannels makeProcessors( if (segmentMapFnProcessor == null) { final Function segmentMapFn = - query.getDataSource().createSegmentMapFunction(query, new AtomicLong()); + query.getDataSource().createSegmentMapFunction(query); processorManager = processorManagerFn.apply(ImmutableList.of(segmentMapFn)); } else { processorManager = new ChainedProcessorManager<>(ProcessorManagers.of(() -> segmentMapFnProcessor), processorManagerFn); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java index b47a450aa000..d3ce49d5f8e0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java @@ -193,7 +193,7 @@ private void addFrame(final int channelNumber, final Frame frame) private Function createSegmentMapFunction() { - return inlineChannelData(query.getDataSource()).createSegmentMapFunction(query, new AtomicLong()); + return inlineChannelData(query.getDataSource()).createSegmentMapFunction(query); } DataSource inlineChannelData(final DataSource originalDataSource) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java index ccd3aef7573a..04a734f646ca 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java @@ -24,16 +24,11 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.DataSource; -import org.apache.druid.query.Query; import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.segment.SegmentReference; - import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; /** * Represents an input number, i.e., a positional index into @@ -96,12 +91,6 @@ public boolean isConcrete() return true; } - @Override - public Function createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc) - { - return Function.identity(); - } - @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java index 22ee0dd7fb4a..9e5a8964bf2f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java @@ -64,7 +64,7 @@ public List outputChannels() @Override public ReturnOrAwait> runIncrementally(final IntSet readableInputs) { - return ReturnOrAwait.returnObject(query.getDataSource().createSegmentMapFunction(query, new AtomicLong())); + return ReturnOrAwait.returnObject(query.getDataSource().createSegmentMapFunction(query)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index 360c339627f9..0f6e9f2d6b95 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; /** @@ -108,7 +107,10 @@ public interface DataSource * @param cpuTimeAcc the cpu time accumulator * @return the segment function */ - Function createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc); + default Function createSegmentMapFunction(Query query) + { + return Function.identity(); + } /** * Returns an updated datasource based on the specified new source. diff --git a/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java b/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java index 1644b3218d38..81779ea4cb50 100644 --- a/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java @@ -27,13 +27,12 @@ import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.FilteredSegment; import org.apache.druid.segment.SegmentReference; -import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; + import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; /** @@ -122,19 +121,10 @@ public boolean isConcrete() } @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTimeAccumulator - ) + public Function createSegmentMapFunction(Query query) { - final Function segmentMapFn = base.createSegmentMapFunction( - query, - cpuTimeAccumulator - ); - return JvmUtils.safeAccumulateThreadCpuTime( - cpuTimeAccumulator, - () -> baseSegment -> new FilteredSegment(segmentMapFn.apply(baseSegment), filter) - ); + final Function segmentMapFn = base.createSegmentMapFunction( query ); + return baseSegment -> new FilteredSegment(segmentMapFn.apply(baseSegment), filter); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java index 68d3db9a36d3..6d5d71b0d598 100644 --- a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java @@ -37,7 +37,6 @@ import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -170,12 +169,6 @@ public boolean isConcrete() return true; } - @Override - public Function createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc) - { - return Function.identity(); - } - @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java index a14eb63fe6f5..334c051e1647 100644 --- a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java @@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.RowAdapter; -import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -39,8 +38,6 @@ import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -242,15 +239,6 @@ public boolean isConcrete() return true; } - @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTimeAcc - ) - { - return Function.identity(); - } - @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java index 2eb459cf8cc3..40fd10fba223 100644 --- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java @@ -54,8 +54,6 @@ import org.apache.druid.segment.join.filter.JoinFilterPreAnalysisKey; import org.apache.druid.segment.join.filter.JoinableClauses; import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig; -import org.apache.druid.utils.JvmUtils; - import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; @@ -65,7 +63,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -301,14 +298,12 @@ public Set getVirtualColumnCandidates() @Override public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTimeAccumulator + Query query ) { return createSegmentMapFunctionInternal( analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null), analysis.getPreJoinableClauses(), - cpuTimeAccumulator, analysis.getBaseQuery().orElse(query) ); } @@ -444,14 +439,10 @@ private DataSourceAnalysis getAnalysisForDataSource() private Function createSegmentMapFunctionInternal( @Nullable final Filter baseFilter, final List clauses, - final AtomicLong cpuTimeAccumulator, final Query query ) { // compute column correlations here and RHS correlated values - return JvmUtils.safeAccumulateThreadCpuTime( - cpuTimeAccumulator, - () -> { if (clauses.isEmpty()) { return Function.identity(); } else { @@ -510,8 +501,7 @@ private Function createSegmentMapFunctionInt baseMapFn = Function.identity(); } else { baseMapFn = left.createSegmentMapFunction( - query, - cpuTimeAccumulator + query ); } return baseSegment -> @@ -522,8 +512,6 @@ private Function createSegmentMapFunctionInt joinFilterPreAnalysis ); } - } - ); } /** diff --git a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java index 873ca8893214..0f4dc46f315e 100644 --- a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java @@ -24,14 +24,10 @@ import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.segment.SegmentReference; - import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; /** * Represents a lookup. @@ -100,15 +96,6 @@ public boolean isConcrete() return true; } - @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTime - ) - { - return Function.identity(); - } - @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java index 08dc44126fee..35f548def51c 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java @@ -32,7 +32,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @JsonTypeName("query") @@ -110,12 +109,11 @@ public boolean isConcrete() @Override public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTime + Query query ) { final Query subQuery = this.getQuery(); - return subQuery.getDataSource().createSegmentMapFunction(subQuery, cpuTime); + return subQuery.getDataSource().createSegmentMapFunction(subQuery); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/TableDataSource.java b/processing/src/main/java/org/apache/druid/query/TableDataSource.java index fe9cf46e37b9..01599156e168 100644 --- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java @@ -25,14 +25,10 @@ import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.segment.SegmentReference; - import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; @JsonTypeName("table") public class TableDataSource implements DataSource @@ -97,15 +93,6 @@ public boolean isConcrete() return true; } - @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTime - ) - { - return Function.identity(); - } - @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java index 0384ecbc8852..405547129bd2 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java @@ -27,14 +27,11 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.segment.SegmentReference; import org.apache.druid.utils.CollectionUtils; import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -143,15 +140,6 @@ public boolean isConcrete() return dataSources.stream().allMatch(DataSource::isConcrete); } - @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTime - ) - { - return Function.identity(); - } - @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java index 06acca13a754..c2e26902bc62 100644 --- a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java @@ -28,13 +28,10 @@ import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.UnnestSegment; import org.apache.druid.segment.VirtualColumn; -import org.apache.druid.utils.JvmUtils; - import javax.annotation.Nullable; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; /** @@ -135,18 +132,13 @@ public boolean isConcrete() @Override public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTimeAccumulator + Query query ) { final Function segmentMapFn = base.createSegmentMapFunction( - query, - cpuTimeAccumulator - ); - return JvmUtils.safeAccumulateThreadCpuTime( - cpuTimeAccumulator, - () -> baseSegment -> new UnnestSegment(segmentMapFn.apply(baseSegment), virtualColumn, unnestFilter) + query ); + return baseSegment -> new UnnestSegment(segmentMapFn.apply(baseSegment), virtualColumn, unnestFilter); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java b/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java index a999a23fde20..2d16f674e674 100644 --- a/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java +++ b/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java @@ -49,7 +49,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; public class UnionQuery implements Query @@ -271,7 +270,7 @@ public boolean isConcrete() } @Override - public Function createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc) + public Function createSegmentMapFunction(Query query) { throw methodNotSupported(); } diff --git a/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java index 96d8ddd05a57..cde52961abcc 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java @@ -32,7 +32,6 @@ import org.junit.rules.ExpectedException; import java.util.Collections; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; public class QueryDataSourceTest @@ -174,17 +173,15 @@ public void test_serde() throws Exception public void test_withSegmentMapFunction() { Function parentsegmentMapFunction = queryDataSource.createSegmentMapFunction( - groupByQuery, - new AtomicLong() + groupByQuery ); Function childsegmentMapFunction = queryOnTableDataSource.createSegmentMapFunction( - groupByQuery, - new AtomicLong() + groupByQuery ); // The segment functions should both be identity functions and equal Assert.assertEquals(parentsegmentMapFunction, childsegmentMapFunction); } - + } diff --git a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java index 120ef503b8a0..2ddbdd20e4e6 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java @@ -559,7 +559,7 @@ public static > QueryRunner makeQueryRunnerWith { final DataSource base = query.getDataSource(); - final SegmentReference segmentReference = base.createSegmentMapFunction(query, new AtomicLong()) + final SegmentReference segmentReference = base.createSegmentMapFunction(query) .apply(ReferenceCountingSegment.wrapRootGenerationSegment(adapter)); return makeQueryRunner(factory, segmentReference, runnerName); } diff --git a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java index c211137c85e8..e3fb8e66ab26 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java +++ b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java @@ -20,15 +20,10 @@ package org.apache.druid.segment.join; import org.apache.druid.query.DataSource; -import org.apache.druid.query.Query; import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.segment.SegmentReference; - import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; /** * A datasource that returns nothing. Only used to test un-registered datasources. @@ -71,15 +66,6 @@ public boolean isConcrete() return false; } - @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTime - ) - { - return Function.identity(); - } - @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index c256e82c6d2d..6460f51ca57a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -75,6 +75,7 @@ import org.apache.druid.timeline.partition.IntegerPartitionChunk; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.utils.CloseableUtils; +import org.apache.druid.utils.JvmUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -201,11 +202,10 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final } // segmentMapFn maps each base Segment into a joined Segment if necessary. - final Function segmentMapFn = - dataSourceFromQuery.createSegmentMapFunction( - query, - cpuTimeAccumulator - ); + final Function segmentMapFn = JvmUtils.safeAccumulateThreadCpuTime( + cpuTimeAccumulator, + () -> dataSourceFromQuery.createSegmentMapFunction(query) + ); // We compute the join cache key here itself so it doesn't need to be re-computed for every segment final Optional cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey()); @@ -453,7 +453,7 @@ public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final // with subsegments (hydrants). return segmentId + "_H" + hydrantNumber; } - + /** * This class is responsible for emitting query/segment/time, query/wait/time and query/segmentAndCache/Time metrics for a Sink. * It accumulates query/segment/time and query/segmentAndCache/time metric for each FireHydrant at the level of Sink. @@ -599,7 +599,7 @@ private long getSegmentAndCacheTime() } } } - + private static class SinkHolder implements Overshadowable { private final Sink sink; diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java index ae6b67deb5f7..9d036fb47c38 100644 --- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java @@ -96,11 +96,7 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, final final AtomicLong cpuAccumulator = new AtomicLong(0L); final Function segmentMapFn = dataSourceFromQuery - .createSegmentMapFunction( - query, - cpuAccumulator - ); - + .createSegmentMapFunction(query); final QueryRunnerFactory> queryRunnerFactory = conglomerate.findFactory(query); final QueryRunner baseRunner = queryRunnerFactory.mergeRunners( diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index 3317375347db..b3297ce9a604 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -66,6 +66,7 @@ import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.utils.JvmUtils; import org.joda.time.Interval; import java.util.Collections; @@ -194,9 +195,10 @@ public QueryRunner getQueryRunnerForSegments(Query theQuery, Iterable< } else { return new ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs)); } - final Function segmentMapFn = - dataSourceFromQuery - .createSegmentMapFunction(newQuery, cpuTimeAccumulator); + final Function segmentMapFn = JvmUtils.safeAccumulateThreadCpuTime( + cpuTimeAccumulator, + () -> dataSourceFromQuery.createSegmentMapFunction(newQuery) + ); // We compute the datasource's cache key here itself so it doesn't need to be re-computed for every segment final Optional cacheKeyPrefix = Optional.ofNullable(dataSourceFromQuery.getCacheKey()); diff --git a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java index 21178fe140a7..a4b4c055afb6 100644 --- a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java @@ -62,7 +62,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; /** @@ -147,8 +146,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final } final Function segmentMapFn = dataSourceFromQuery.createSegmentMapFunction( - query, - new AtomicLong() + query ); final QueryRunner baseRunner = new FinalizeResultsQueryRunner<>( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java index e74a0eaefb64..c35dda4cda1c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; /** @@ -126,8 +125,7 @@ public boolean isConcrete() @Override public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTime + Query query ) { return Function.identity(); From faac1c611b22ff69df20564e5674db824a51427c Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Tue, 14 Jan 2025 11:03:37 +0000 Subject: [PATCH 2/6] undo --- .../druid/query/FrameBasedInlineDataSource.java | 7 +++++++ .../org/apache/druid/query/InlineDataSource.java | 12 ++++++++++++ .../org/apache/druid/query/LookupDataSource.java | 13 +++++++++++++ .../org/apache/druid/query/TableDataSource.java | 13 +++++++++++++ .../org/apache/druid/query/UnionDataSource.java | 12 ++++++++++++ 5 files changed, 57 insertions(+) diff --git a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java index 6d5d71b0d598..68d3db9a36d3 100644 --- a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java @@ -37,6 +37,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -169,6 +170,12 @@ public boolean isConcrete() return true; } + @Override + public Function createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc) + { + return Function.identity(); + } + @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java index 334c051e1647..a14eb63fe6f5 100644 --- a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -38,6 +39,8 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -239,6 +242,15 @@ public boolean isConcrete() return true; } + @Override + public Function createSegmentMapFunction( + Query query, + AtomicLong cpuTimeAcc + ) + { + return Function.identity(); + } + @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java index 0f4dc46f315e..873ca8893214 100644 --- a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java @@ -24,10 +24,14 @@ import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.segment.SegmentReference; + import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; /** * Represents a lookup. @@ -96,6 +100,15 @@ public boolean isConcrete() return true; } + @Override + public Function createSegmentMapFunction( + Query query, + AtomicLong cpuTime + ) + { + return Function.identity(); + } + @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/processing/src/main/java/org/apache/druid/query/TableDataSource.java b/processing/src/main/java/org/apache/druid/query/TableDataSource.java index 01599156e168..fe9cf46e37b9 100644 --- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java @@ -25,10 +25,14 @@ import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.segment.SegmentReference; + import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; @JsonTypeName("table") public class TableDataSource implements DataSource @@ -93,6 +97,15 @@ public boolean isConcrete() return true; } + @Override + public Function createSegmentMapFunction( + Query query, + AtomicLong cpuTime + ) + { + return Function.identity(); + } + @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java index 405547129bd2..0384ecbc8852 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java @@ -27,11 +27,14 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.utils.CollectionUtils; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -140,6 +143,15 @@ public boolean isConcrete() return dataSources.stream().allMatch(DataSource::isConcrete); } + @Override + public Function createSegmentMapFunction( + Query query, + AtomicLong cpuTime + ) + { + return Function.identity(); + } + @Override public DataSource withUpdatedDataSource(DataSource newSource) { From 9aaff07891ccec37c0246712b3fba4d92faaab05 Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Tue, 14 Jan 2025 11:04:59 +0000 Subject: [PATCH 3/6] cleanup --- .../druid/msq/querykit/InputNumberDataSource.java | 10 ++++++++++ .../main/java/org/apache/druid/query/DataSource.java | 5 +---- .../apache/druid/query/FrameBasedInlineDataSource.java | 3 +-- .../java/org/apache/druid/query/InlineDataSource.java | 6 +----- .../java/org/apache/druid/query/LookupDataSource.java | 6 +----- .../java/org/apache/druid/query/TableDataSource.java | 6 +----- .../java/org/apache/druid/query/UnionDataSource.java | 6 +----- .../org/apache/druid/segment/join/NoopDataSource.java | 10 ++++++++++ 8 files changed, 26 insertions(+), 26 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java index 04a734f646ca..05560796435b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java @@ -24,11 +24,15 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.DataSource; +import org.apache.druid.query.Query; import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.segment.SegmentReference; + import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.function.Function; /** * Represents an input number, i.e., a positional index into @@ -91,6 +95,12 @@ public boolean isConcrete() return true; } + @Override + public Function createSegmentMapFunction(Query query) + { + return Function.identity(); + } + @Override public DataSource withUpdatedDataSource(DataSource newSource) { diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index 0f6e9f2d6b95..38ad52e24a94 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -107,10 +107,7 @@ public interface DataSource * @param cpuTimeAcc the cpu time accumulator * @return the segment function */ - default Function createSegmentMapFunction(Query query) - { - return Function.identity(); - } + Function createSegmentMapFunction(Query query); /** * Returns an updated datasource based on the specified new source. diff --git a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java index 68d3db9a36d3..060c06f3adfd 100644 --- a/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java @@ -37,7 +37,6 @@ import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -171,7 +170,7 @@ public boolean isConcrete() } @Override - public Function createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc) + public Function createSegmentMapFunction(Query query) { return Function.identity(); } diff --git a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java index a14eb63fe6f5..1326938eddc6 100644 --- a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -243,10 +242,7 @@ public boolean isConcrete() } @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTimeAcc - ) + public Function createSegmentMapFunction(Query query) { return Function.identity(); } diff --git a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java index 873ca8893214..84a23cc02826 100644 --- a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; /** @@ -101,10 +100,7 @@ public boolean isConcrete() } @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTime - ) + public Function createSegmentMapFunction(Query query) { return Function.identity(); } diff --git a/processing/src/main/java/org/apache/druid/query/TableDataSource.java b/processing/src/main/java/org/apache/druid/query/TableDataSource.java index fe9cf46e37b9..ca5e46eee065 100644 --- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @JsonTypeName("table") @@ -98,10 +97,7 @@ public boolean isConcrete() } @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTime - ) + public Function createSegmentMapFunction(Query query) { return Function.identity(); } diff --git a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java index 0384ecbc8852..3673608c0c8b 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java @@ -33,7 +33,6 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -144,10 +143,7 @@ public boolean isConcrete() } @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTime - ) + public Function createSegmentMapFunction(Query query) { return Function.identity(); } diff --git a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java index e3fb8e66ab26..70e5b110de46 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java +++ b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java @@ -20,10 +20,14 @@ package org.apache.druid.segment.join; import org.apache.druid.query.DataSource; +import org.apache.druid.query.Query; import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.segment.SegmentReference; + import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.function.Function; /** * A datasource that returns nothing. Only used to test un-registered datasources. @@ -66,6 +70,12 @@ public boolean isConcrete() return false; } + @Override + public Function createSegmentMapFunction(Query query) + { + return Function.identity(); + } + @Override public DataSource withUpdatedDataSource(DataSource newSource) { From 0b483a7a11b577f7a3962c9efce9aecf1fd79357 Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Tue, 14 Jan 2025 11:10:37 +0000 Subject: [PATCH 4/6] cleanup --- .../main/java/org/apache/druid/query/FilteredDataSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java b/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java index 81779ea4cb50..0716fd156ba9 100644 --- a/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/FilteredDataSource.java @@ -123,7 +123,7 @@ public boolean isConcrete() @Override public Function createSegmentMapFunction(Query query) { - final Function segmentMapFn = base.createSegmentMapFunction( query ); + final Function segmentMapFn = base.createSegmentMapFunction(query); return baseSegment -> new FilteredSegment(segmentMapFn.apply(baseSegment), filter); } From 9e23d7d5d067e789c0cc3709f6c47045b09884e6 Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Thu, 23 Jan 2025 15:34:13 +0000 Subject: [PATCH 5/6] correct indent --- .../apache/druid/query/JoinDataSource.java | 138 +++++++++--------- .../apache/druid/query/UnnestDataSource.java | 2 + .../druid/query/QueryRunnerTestHelper.java | 1 - 3 files changed, 72 insertions(+), 69 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java index 40fd10fba223..344be256477e 100644 --- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java @@ -54,7 +54,9 @@ import org.apache.druid.segment.join.filter.JoinFilterPreAnalysisKey; import org.apache.druid.segment.join.filter.JoinableClauses; import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig; + import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -443,75 +445,75 @@ private Function createSegmentMapFunctionInt ) { // compute column correlations here and RHS correlated values - if (clauses.isEmpty()) { - return Function.identity(); - } else { - final JoinableClauses joinableClauses = JoinableClauses.createClauses( - clauses, - joinableFactoryWrapper.getJoinableFactory() - ); - final JoinFilterRewriteConfig filterRewriteConfig = JoinFilterRewriteConfig.forQuery(query); - - // Pick off any join clauses that can be converted into filters. - final Set requiredColumns = query.getRequiredColumns(); - final Filter baseFilterToUse; - final List clausesToUse; - - if (requiredColumns != null && filterRewriteConfig.isEnableRewriteJoinToFilter()) { - final Pair, List> conversionResult = JoinableFactoryWrapper.convertJoinsToFilters( - joinableClauses.getJoinableClauses(), - requiredColumns, - Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(), Integer.MAX_VALUE)) - ); - - baseFilterToUse = - Filters.maybeAnd( - Lists.newArrayList( - Iterables.concat( - Collections.singleton(baseFilter), - conversionResult.lhs - ) - ) - ).orElse(null); - clausesToUse = conversionResult.rhs; - } else { - baseFilterToUse = baseFilter; - clausesToUse = joinableClauses.getJoinableClauses(); - } - - // Analyze remaining join clauses to see if filters on them can be pushed down. - final JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis( - new JoinFilterPreAnalysisKey( - filterRewriteConfig, - clausesToUse, - query.getVirtualColumns(), - Filters.maybeAnd(Arrays.asList(baseFilterToUse, Filters.toFilter(query.getFilter()))) - .orElse(null) + if (clauses.isEmpty()) { + return Function.identity(); + } else { + final JoinableClauses joinableClauses = JoinableClauses.createClauses( + clauses, + joinableFactoryWrapper.getJoinableFactory() + ); + final JoinFilterRewriteConfig filterRewriteConfig = JoinFilterRewriteConfig.forQuery(query); + + // Pick off any join clauses that can be converted into filters. + final Set requiredColumns = query.getRequiredColumns(); + final Filter baseFilterToUse; + final List clausesToUse; + + if (requiredColumns != null && filterRewriteConfig.isEnableRewriteJoinToFilter()) { + final Pair, List> conversionResult = JoinableFactoryWrapper.convertJoinsToFilters( + joinableClauses.getJoinableClauses(), + requiredColumns, + Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(), Integer.MAX_VALUE)) + ); + + baseFilterToUse = + Filters.maybeAnd( + Lists.newArrayList( + Iterables.concat( + Collections.singleton(baseFilter), + conversionResult.lhs + ) ) - ); - final Function baseMapFn; - // A join data source is not concrete - // And isConcrete() of an unnest datasource delegates to its base - // Hence, in the case of a Join -> Unnest -> Join - // if we just use isConcrete on the left - // the segment map function for the unnest would never get called - // This calls us to delegate to the segmentMapFunction of the left - // only when it is not a JoinDataSource - if (left instanceof JoinDataSource) { - baseMapFn = Function.identity(); - } else { - baseMapFn = left.createSegmentMapFunction( - query - ); - } - return baseSegment -> - new HashJoinSegment( - baseMapFn.apply(baseSegment), - baseFilterToUse, - GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()), - joinFilterPreAnalysis - ); - } + ).orElse(null); + clausesToUse = conversionResult.rhs; + } else { + baseFilterToUse = baseFilter; + clausesToUse = joinableClauses.getJoinableClauses(); + } + + // Analyze remaining join clauses to see if filters on them can be pushed down. + final JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis( + new JoinFilterPreAnalysisKey( + filterRewriteConfig, + clausesToUse, + query.getVirtualColumns(), + Filters.maybeAnd(Arrays.asList(baseFilterToUse, Filters.toFilter(query.getFilter()))) + .orElse(null) + ) + ); + final Function baseMapFn; + // A join data source is not concrete + // And isConcrete() of an unnest datasource delegates to its base + // Hence, in the case of a Join -> Unnest -> Join + // if we just use isConcrete on the left + // the segment map function for the unnest would never get called + // This calls us to delegate to the segmentMapFunction of the left + // only when it is not a JoinDataSource + if (left instanceof JoinDataSource) { + baseMapFn = Function.identity(); + } else { + baseMapFn = left.createSegmentMapFunction( + query + ); + } + return baseSegment -> + new HashJoinSegment( + baseMapFn.apply(baseSegment), + baseFilterToUse, + GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()), + joinFilterPreAnalysis + ); + } } /** diff --git a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java index c2e26902bc62..b6b89823fe5b 100644 --- a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java @@ -28,7 +28,9 @@ import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.UnnestSegment; import org.apache.druid.segment.VirtualColumn; + import javax.annotation.Nullable; + import java.util.List; import java.util.Objects; import java.util.Set; diff --git a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java index 2ddbdd20e4e6..1d0a21061df3 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java @@ -79,7 +79,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.Stream; From 3d2ea3029e90150a8efb6be76fbf98b345f0f125 Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Thu, 30 Jan 2025 14:23:10 +0000 Subject: [PATCH 6/6] correct RestrictedDatasource --- .../druid/query/RestrictedDataSource.java | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java b/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java index f7f91072fc64..7f1303b4a47b 100644 --- a/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java @@ -29,14 +29,11 @@ import org.apache.druid.query.policy.Policy; import org.apache.druid.segment.RestrictedSegment; import org.apache.druid.segment.SegmentReference; -import org.apache.druid.utils.JvmUtils; - import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; /** @@ -125,18 +122,10 @@ public boolean isConcrete() } @Override - public Function createSegmentMapFunction( - Query query, - AtomicLong cpuTimeAccumulator - ) + public Function createSegmentMapFunction(Query query) { - return JvmUtils.safeAccumulateThreadCpuTime( - cpuTimeAccumulator, - () -> base.createSegmentMapFunction( - query, - cpuTimeAccumulator - ).andThen((segment) -> (new RestrictedSegment(segment, policy))) - ); + final Function segmentMapFn = base.createSegmentMapFunction(query); + return baseSegment -> new RestrictedSegment(segmentMapFn.apply(baseSegment), policy); } @Override