Skip to content

Commit

Permalink
Support Fill in Table Model
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 committed Sep 27, 2024
1 parent 2d88c35 commit 133d41b
Show file tree
Hide file tree
Showing 11 changed files with 741 additions and 43 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ public class StatementAnalyzer {
private final StatementAnalyzerFactory statementAnalyzerFactory;

private Analysis analysis;

private boolean hasFillInParentScope = false;
private final MPPQueryContext queryContext;

private final AccessControl accessControl;
Expand Down Expand Up @@ -508,6 +510,7 @@ protected Scope visitExplainAnalyze(ExplainAnalyze node, Optional<Scope> context
@Override
protected Scope visitQuery(Query node, Optional<Scope> context) {
Scope withScope = analyzeWith(node, context);
hasFillInParentScope = node.getFill().isPresent() || hasFillInParentScope;
Scope queryBodyScope = process(node.getQueryBody(), withScope);

if (node.getFill().isPresent()) {
Expand All @@ -521,7 +524,8 @@ protected Scope visitQuery(Query node, Optional<Scope> context) {

if ((queryBodyScope.getOuterQueryParent().isPresent() || !isTopLevel)
&& !node.getLimit().isPresent()
&& !node.getOffset().isPresent()) {
&& !node.getOffset().isPresent()
&& hasFillInParentScope) {
// not the root scope and ORDER BY is ineffective
analysis.markRedundantOrderBy(node.getOrderBy().get());
warningCollector.add(
Expand Down Expand Up @@ -696,6 +700,14 @@ private boolean tryProcessRecursiveQuery(
throw new SemanticException(
"immediate WITH clause in recursive query is not supported");
});
withQuery
.getQuery()
.getFill()
.ifPresent(
orderBy -> {
throw new SemanticException(
"immediate FILL clause in recursive query is not supported");
});
withQuery
.getQuery()
.getOrderBy()
Expand Down Expand Up @@ -790,6 +802,7 @@ protected Scope visitTableSubquery(TableSubquery node, Optional<Scope> scope) {
protected Scope visitQuerySpecification(QuerySpecification node, Optional<Scope> scope) {
// TODO: extract candidate names from SELECT, WHERE, HAVING, GROUP BY and ORDER BY expressions
// to pass down to analyzeFrom
hasFillInParentScope = node.getFill().isPresent() || hasFillInParentScope;

Scope sourceScope = analyzeFrom(node, scope);

Expand All @@ -802,9 +815,12 @@ protected Scope visitQuerySpecification(QuerySpecification node, Optional<Scope>

Scope outputScope = computeAndAssignOutputScope(node, scope, sourceScope);

if (node.getFill().isPresent()) {
analyzeFill(node.getFill().get(), outputScope);
}
node.getFill()
.ifPresent(
fill -> {
Scope fillScope = computeAndAssignFillScope(fill, sourceScope, outputScope);
analyzeFill(fill, fillScope);
});

List<Expression> orderByExpressions = emptyList();
Optional<Scope> orderByScope = Optional.empty();
Expand All @@ -816,7 +832,8 @@ protected Scope visitQuerySpecification(QuerySpecification node, Optional<Scope>

if ((sourceScope.getOuterQueryParent().isPresent() || !isTopLevel)
&& !node.getLimit().isPresent()
&& !node.getOffset().isPresent()) {
&& !node.getOffset().isPresent()
&& hasFillInParentScope) {
// not the root scope and ORDER BY is ineffective
analysis.markRedundantOrderBy(orderBy);
warningCollector.add(
Expand Down Expand Up @@ -1471,6 +1488,18 @@ private Scope computeAndAssignOrderByScope(OrderBy node, Scope sourceScope, Scop
return orderByScope;
}

private Scope computeAndAssignFillScope(Fill node, Scope sourceScope, Scope outputScope) {
// Fill should "see" both output and FROM fields during initial analysis and
// non-aggregation query planning
Scope fillScope =
Scope.builder()
.withParent(sourceScope)
.withRelationType(outputScope.getRelationId(), outputScope.getRelationType())
.build();
analysis.setScope(node, fillScope);
return fillScope;
}

@Override
protected Scope visitSubqueryExpression(SubqueryExpression node, Optional<Scope> context) {
return process(node.getQuery(), context);
Expand Down Expand Up @@ -2098,7 +2127,7 @@ private void analyzeFill(Fill node, Scope scope) {
Analysis.FillAnalysis fillAnalysis;
if (node.getFillMethod() == FillPolicy.PREVIOUS) {
if (node.getTimeDurationThreshold().isPresent()) {
FieldReference helperColumn = getHelperColumn(node, scope);
FieldReference helperColumn = getHelperColumn(node, scope, FillPolicy.PREVIOUS);
ExpressionAnalyzer.analyzeExpression(
metadata,
queryContext,
Expand Down Expand Up @@ -2131,7 +2160,7 @@ private void analyzeFill(Fill node, Scope scope) {
correlationSupport);
fillAnalysis = new Analysis.ValueFillAnalysis(literal);
} else if (node.getFillMethod() == FillPolicy.LINEAR) {
FieldReference helperColumn = getHelperColumn(node, scope);
FieldReference helperColumn = getHelperColumn(node, scope, FillPolicy.LINEAR);
ExpressionAnalyzer.analyzeExpression(
metadata,
queryContext,
Expand All @@ -2151,19 +2180,21 @@ private void analyzeFill(Fill node, Scope scope) {
analysis.setFill(node, fillAnalysis);
}

private FieldReference getHelperColumn(Fill node, Scope scope) {
private FieldReference getHelperColumn(Fill node, Scope scope, FillPolicy fillMethod) {
FieldReference helperColumn;
if (node.getIndex().isPresent()) {
long ordinal = node.getIndex().get().getParsedValue();
if (ordinal < 1 || ordinal > scope.getRelationType().getVisibleFieldCount()) {
throw new SemanticException(
String.format("LINEAR FILL position %s is not in select list", ordinal));
String.format(
"%s FILL position %s is not in select list", fillMethod.name(), ordinal));
} else if (!isTimestampType(
scope.getRelationType().getFieldByIndex((int) ordinal).getType())) {
scope.getRelationType().getFieldByIndex((int) ordinal - 1).getType())) {
throw new SemanticException(
String.format(
"Type of helper column for LINEAR FILL should only be TIMESTAMP, but type of the column you specify is %s",
scope.getRelationType().getFieldByIndex((int) ordinal).getType()));
"Type of helper column for %s FILL should only be TIMESTAMP, but type of the column you specify is %s",
fillMethod.name(),
scope.getRelationType().getFieldByIndex((int) ordinal - 1).getType()));
} else {
helperColumn = new FieldReference(toIntExact(ordinal - 1));
}
Expand All @@ -2179,7 +2210,9 @@ private FieldReference getHelperColumn(Fill node, Scope scope) {
}
if (index == -1) {
throw new SemanticException(
"Cannot infer the helper column for LINEAR FILL, there exists no column whose type is TIMESTAMP");
String.format(
"Cannot infer the helper column for %s FILL, there exists no column whose type is TIMESTAMP",
fillMethod.name()));
}
helperColumn = new FieldReference(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected Binary visitLiteral(Literal node, Void context) {

@Override
protected Binary visitBooleanLiteral(BooleanLiteral node, Void context) {
return new Binary(BytesUtils.boolToBytes(node.getValue()));
return new Binary(String.valueOf(node.getValue()), charset);
}

@Override
Expand All @@ -60,7 +60,7 @@ protected Binary visitLongLiteral(LongLiteral node, Void context) {

@Override
protected Binary visitDoubleLiteral(DoubleLiteral node, Void context) {
return new Binary(BytesUtils.doubleToBytes(node.getValue()));
return new Binary(String.valueOf(node.getValue()), charset);
}

@Override
Expand All @@ -70,7 +70,7 @@ protected Binary visitStringLiteral(StringLiteral node, Void context) {

@Override
protected Binary visitBinaryLiteral(BinaryLiteral node, Void context) {
return new Binary(node.toHexString(), charset);
return new Binary(BytesUtils.parseBlobByteArrayToString(node.getValue()), charset);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected Long visitLongLiteral(LongLiteral node, Void context) {

@Override
protected Long visitDoubleLiteral(DoubleLiteral node, Void context) {
return null;
return (long) node.getValue();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public QueryPlanner(
public RelationPlan plan(Query query) {
PlanBuilder builder = planQueryBody(query.getQueryBody());

builder = fill(builder, query.getFill());

// TODO result is :input[0], :input[1], :input[2]
List<Analysis.SelectExpression> selectExpressions = analysis.getSelectExpressions(query);
List<Expression> outputs =
Expand All @@ -132,8 +134,6 @@ public RelationPlan plan(Query query) {
builder.appendProjections(
Iterables.concat(orderBy, outputs), symbolAllocator, queryContext);
}

builder = fill(builder, query.getFill());
Optional<OrderingScheme> orderingScheme =
orderingScheme(builder, query.getOrderBy(), analysis.getOrderByExpressions(query));
builder = sort(builder, orderingScheme);
Expand Down Expand Up @@ -166,6 +166,23 @@ public RelationPlan plan(QuerySpecification node) {
}

List<Expression> outputs = outputExpressions(selectExpressions);

if (node.getFill().isPresent()) {
// Add projections for the outputs of SELECT, but stack them on top of the ones from the FROM
// clause so both are visible
// when resolving the ORDER BY clause.
builder = builder.appendProjections(outputs, symbolAllocator, queryContext);
// The new scope is the composite of the fields from the FROM and SELECT clause (local nested
// scopes). Fields from the bottom of
// the scope stack need to be placed first to match the expected layout for nested scopes.
List<Symbol> newFields = new ArrayList<>(builder.getTranslations().getFieldSymbolsList());

outputs.stream().map(builder::translate).forEach(newFields::add);

builder = builder.withScope(analysis.getScope(node.getFill().get()), newFields);
builder = fill(builder, node.getFill());
}

if (node.getOrderBy().isPresent()) {
// ORDER BY requires outputs of SELECT to be visible.
// For queries with aggregation, it also requires grouping keys and translated aggregations.
Expand Down Expand Up @@ -201,7 +218,6 @@ public RelationPlan plan(QuerySpecification node) {
Iterables.concat(orderBy, outputs), symbolAllocator, queryContext);
}

builder = fill(builder, node.getFill());
Optional<OrderingScheme> orderingScheme =
orderingScheme(builder, node.getOrderBy(), analysis.getOrderByExpressions(node));
builder = sort(builder, orderingScheme);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;

Expand Down Expand Up @@ -142,6 +143,9 @@ public List<PlanNode> visitOutput(OutputNode node, PlanContext context) {

@Override
public List<PlanNode> visitFill(FillNode node, PlanContext context) {
if (!(node instanceof ValueFillNode)) {
context.clearExpectedOrderingScheme();
}
List<PlanNode> childrenNodes = node.getChild().accept(this, context);
OrderingScheme childOrdering = nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
if (childOrdering != null) {
Expand All @@ -154,6 +158,7 @@ public List<PlanNode> visitFill(FillNode node, PlanContext context) {
}

node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, childrenNodes));
context.setHasSeenFill(true);
return Collections.singletonList(node);
}

Expand Down Expand Up @@ -227,8 +232,7 @@ public List<PlanNode> visitProject(ProjectNode node, PlanContext context) {

@Override
public List<PlanNode> visitTopK(TopKNode node, PlanContext context) {
context.expectedOrderingScheme = node.getOrderingScheme();
context.hasSortProperty = true;
context.setExpectedOrderingScheme(node.getOrderingScheme());
nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());

checkArgument(
Expand Down Expand Up @@ -259,8 +263,7 @@ public List<PlanNode> visitTopK(TopKNode node, PlanContext context) {

@Override
public List<PlanNode> visitSort(SortNode node, PlanContext context) {
context.expectedOrderingScheme = node.getOrderingScheme();
context.hasSortProperty = true;
context.setExpectedOrderingScheme(node.getOrderingScheme());
nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());

List<PlanNode> childrenNodes = node.getChild().accept(this, context);
Expand All @@ -285,8 +288,7 @@ public List<PlanNode> visitSort(SortNode node, PlanContext context) {

@Override
public List<PlanNode> visitStreamSort(StreamSortNode node, PlanContext context) {
context.expectedOrderingScheme = node.getOrderingScheme();
context.hasSortProperty = true;
context.setExpectedOrderingScheme(node.getOrderingScheme());
nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());

List<PlanNode> childrenNodes = node.getChild().accept(this, context);
Expand Down Expand Up @@ -825,6 +827,7 @@ public List<PlanNode> visitTableDeviceFetch(

public static class PlanContext {
final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
boolean hasSeenFill = false;
boolean hasExchangeNode = false;
boolean hasSortProperty = false;
OrderingScheme expectedOrderingScheme;
Expand All @@ -837,5 +840,19 @@ public PlanContext() {
public NodeDistribution getNodeDistribution(PlanNodeId nodeId) {
return this.nodeDistributionMap.get(nodeId);
}

public void clearExpectedOrderingScheme() {
expectedOrderingScheme = null;
hasSortProperty = false;
}

public void setExpectedOrderingScheme(OrderingScheme expectedOrderingScheme) {
this.expectedOrderingScheme = expectedOrderingScheme;
hasSortProperty = true;
}

public void setHasSeenFill(boolean hasSeenFill) {
this.hasSeenFill = hasSeenFill;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException {
}

public static PreviousFillNode deserialize(ByteBuffer byteBuffer) {
boolean isNull = ReadWriteIOUtils.readBool(byteBuffer);
boolean hasValue = ReadWriteIOUtils.readBool(byteBuffer);
TimeDuration timeDuration = null;
if (!isNull) {
if (hasValue) {
timeDuration = TimeDuration.deserialize(byteBuffer);
}
isNull = ReadWriteIOUtils.readBool(byteBuffer);
hasValue = ReadWriteIOUtils.readBool(byteBuffer);
Symbol helperColumn = null;
if (!isNull) {
if (hasValue) {
helperColumn = Symbol.deserialize(byteBuffer);
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
Expand Down
Loading

0 comments on commit 133d41b

Please sign in to comment.