Skip to content

Commit

Permalink
Push down completely when group-by Aggregation with only one device e…
Browse files Browse the repository at this point in the history
…ntry
  • Loading branch information
Wei-hao-Li authored Oct 15, 2024
1 parent e022f22 commit 9c8c1ed
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,15 @@ private PushDownLevel calculatePushDownLevel(
}

// calculate DataSet part
boolean singleDeviceEntry = tableScanNode.getDeviceEntries().size() < 2;
if (groupingKeys.isEmpty()) {
// GlobalAggregation
if (tableScanNode.getDeviceEntries().size() < 2) {
if (singleDeviceEntry) {
return PushDownLevel.COMPLETE;
} else {
// We need to two-stage Aggregation to combine Aggregation result of different DeviceEntry
return PushDownLevel.PARTIAL;
}
// We need to two-stage Aggregation to combine Aggregation result of different DeviceEntry
return PushDownLevel.PARTIAL;
}

List<FunctionCall> dateBinFunctionsOfTime = new ArrayList<>();
Expand All @@ -185,8 +187,9 @@ private PushDownLevel calculatePushDownLevel(
// appear in groupingKeys.

return PushDownLevel.NOOP;
} else if (ImmutableSet.copyOf(groupingKeys)
.containsAll(tableScanNode.getIdColumnsInTableStore(metadata, session))) {
} else if (singleDeviceEntry
|| ImmutableSet.copyOf(groupingKeys)
.containsAll(tableScanNode.getIdColumnsInTableStore(metadata, session))) {
// If all ID columns appear in groupingKeys and no Measurement column appears, we can push
// down completely.
return PushDownLevel.COMPLETE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,35 @@ public void completePushDownTest() {
"testdb.table1",
ImmutableList.of("tag1", "tag2", "tag3", "attr1", "time", "count"),
ImmutableSet.of("tag1", "tag2", "tag3", "attr1", "time", "s2")))));

// Global Aggregation or partialPushDown Aggregation with only one deviceEntry

// Output - AggTableScan
assertPlan(
planTester.createPlan("SELECT count(s2) FROM table1 where tag1='beijing' and tag2='A1'"),
output(
aggregationTableScan(
singleGroupingSet(),
ImmutableList.of(), // UnStreamable
Optional.empty(),
SINGLE,
"testdb.table1",
ImmutableList.of("count"),
ImmutableSet.of("s2"))));

assertPlan(
planTester.createPlan(
"SELECT count(s2) FROM table1 where tag1='beijing' and tag2='A1' group by tag3"),
output(
project(
aggregationTableScan(
singleGroupingSet("tag3"),
ImmutableList.of("tag3"),
Optional.empty(),
SINGLE,
"testdb.table1",
ImmutableList.of("tag3", "count"),
ImmutableSet.of("s2", "tag3")))));
}

@Test
Expand Down

0 comments on commit 9c8c1ed

Please sign in to comment.