Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Refactor Join inside the planner #115813

Merged
merged 10 commits into from
Oct 31, 2024

Conversation

costin
Copy link
Member

@costin costin commented Oct 29, 2024

Previously the Join was modeled as a unary node, embedding the right side as a local relationship inside the node but not exposed as a child. This caused a lot the associated methods (like references, output and inputSet) to misbehave and the physical plan rules to pick incorrect information, such as trying to extract the local relationship fields from the underlying source - the fix was to the local relationship fields as ReferenceAttribute (which of course had its own set of issues). Essentially Join was acting both as a source and as a streaming operator.

This PR looks to address this partially by:

  • refactoring Join into a proper binary node with left and right branches which are used for its references and input/outputSet.
  • refactoring InlineStats to prefer composition and move the Aggregate on the join right branch. This reuses the Aggregate resolution out of the box; in the process remove the Stats interface.
  • update some of the planner rules that only worked with Unary nodes.
  • refactor Mapper into (coordinator) Mapper and LocalMapper.
  • remove Phased interface by moving its functionality inside the planner (no need to unpack the phased classes, the join already indicates the two branches needed).
  • massage the Phased execution inside EsqlSession
  • improve FieldExtractor to handle binary nodes
  • fix incorrect references in Lookup
  • generalize ProjectAwayColumns rule

Relates #112266

Not all inline and lookup tests are passing:

  • 2 lookup fields are failing due to name clashes (qualifiers should fix this)
  • 7 or so inline failures with a similar issue

I've disabled the tests for now to have them around once we complete
adding the functionality.

Previously the Join was modeled as a unary node, embedding the right
side as a local relationship inside the node but not exposed as a child.
This caused a lot the associated methods (like references, output and
inputSet) to misbehave and the physical plan rules to pick incorrect
information, such as trying to extract the local relationship fields
from the underlying source - the fix was to the local relationship
fields as ReferenceAttribute (which of course had its own set of
issues). Essentially Join was acting both as a source and as a streaming
operator.

This PR looks to address this partially by:
- refactoring Join into a proper binary node with left and right
 branches which are used for its references and input/outputSet.
- refactoring InlineStats to prefer composition and move the Aggregate
 on the join right branch. This reuses the Aggregate resolution out of
 the box; in the process remove the Stats interface.
- update some of the planner rules that only worked with Unary nodes.
- refactor Mapper into (coordinator) Mapper and LocalMapper.
- remove Phased interface by moving its functionality inside the planner
(no need to unpack the phased classes, the join already indicates the
two branches needed).
- massage the Phased execution inside EsqlSession
- improve FieldExtractor to handle binary nodes
- fix incorrect references in Lookup
- generalize ProjectAwayColumns rule

Relates elastic#112266

Not all inline and lookup tests are passing:
- 2 lookup fields are failing due to name clashes (qualifiers should
 fix this)
- 7 or so inline failures with a similar issue

I've disabled the tests for now to have them around once we complete
 adding the functionality.
@costin costin added the :Analytics/ES|QL AKA ESQL label Oct 29, 2024
@elasticsearchmachine elasticsearchmachine added Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.0.0 labels Oct 29, 2024
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@@ -599,7 +600,10 @@ else if (Files.isDirectory(path)) {
Files.walkFileTree(path, EnumSet.allOf(FileVisitOption.class), 1, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Regex.simpleMatch(filePattern, file.toString())) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix to make CsvTests work inside the IDE against individual files ("lookup.csv-spec" vs "*.csv-spec)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commented tests are failing - the plan is to revisit them once lookup join is properly added. Right now this is not a priority...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment so that these tests can easily be found later. As they are now, it's just an ignored set of tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only two tests are failing and the issue is name collision/merging in join.
Because there's no qualifier, given a field x we cannot determine at runtime from what side to extract the field - e.g.

FROM l | LOOKUP r ON k | KEEP a

during physical planning we know k is available on both l and r but have no idea whether a comes from l or r and thus cannot determine on what branch to place the field extractors.
There are different solutions to this approach but are outside of this PR - the most comprehensive one imo is name qualifiers but it's also the most complicated.

@@ -405,8 +405,8 @@ protected LogicalPlan doRule(LogicalPlan plan) {
childrenOutput.addAll(output);
}

if (plan instanceof Stats stats) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By exposing the Aggregate, there's no reason to have the Stats interface anymore.
The upside is the Aggregate resolution gets reused automatically (including nested filters, etc..)

@@ -47,7 +44,7 @@ public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry) {
this.indexResolver = indexResolver;
this.preAnalyzer = new PreAnalyzer();
this.functionRegistry = new EsqlFunctionRegistry();
this.mapper = new Mapper(functionRegistry);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

functionRegistry is not used inside the Mapper.

@@ -60,7 +57,7 @@ public void esql(
EnrichPolicyResolver enrichPolicyResolver,
EsqlExecutionInfo executionInfo,
IndicesExpressionGrouper indicesExpressionGrouper,
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced a interface inside the generic BiConsumer to preserve the generics signature.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just renames

@@ -45,7 +44,7 @@ public PhysicalPlan apply(PhysicalPlan plan) {
Holder<AttributeSet> requiredAttributes = new Holder<>(plan.outputSet());

// This will require updating should we choose to have non-unary execution plans in the future.
return plan.transformDown(UnaryExec.class, currentPlanNode -> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the rule work across all type of nodes - this took a while to figure out.

@@ -40,7 +42,12 @@ public class InsertFieldExtraction extends Rule<PhysicalPlan, PhysicalPlan> {
public PhysicalPlan apply(PhysicalPlan plan) {
// apply the plan locally, adding a field extractor right before data is loaded
// by going bottom-up
plan = plan.transformUp(UnaryExec.class, p -> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the rule to work across all node types.

@@ -84,7 +84,7 @@
*/
public class LogicalPlanBuilder extends ExpressionBuilder {

private int queryDepth = 0;
interface PlanFactory extends Function<LogicalPlan, LogicalPlan> {}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this interface at the beginning since otherwise in case of merging failure, when the file gets messed up, IntelliJ gets very confused and marks the whole file full of errors. By declaring it up front, it properly picks it up and significantly reduces the errors making it easy to determine where the missing bracket is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few convenient methods.

* </p>
*/
public class InlineStats extends UnaryPlan implements NamedWriteable, Phased, Stats {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Phased is no longer needed since there are no longer embedded nodes - these are now exposed in the plan, as first-class expressions; it's up to the planner, mapper and session to properly address them.

Comment on lines -129 to 108
AttributeSet set = child().outputSet();
this.lazyOutput = mergeOutputAttributes(aggregate.output(), aggregate.child().output());
}
return lazyOutput;
}

// TODO: in case of inlinestats, the join key is always the grouping
private JoinConfig joinConfig() {
List<Expression> groupings = aggregate.groupings();
List<Attribute> namedGroupings = new ArrayList<>(groupings.size());
for (Expression g : groupings) {
namedGroupings.add(Expressions.attribute(g));
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part needs to be revisited and JoinConfig refactored to better incorporate the field resolution.

Comment on lines -80 to -82
protected AttributeSet computeReferences() {
return Expressions.references(config.leftFields()).combine(Expressions.references(config.rightFields()));
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already happening in the parent.

@@ -126,7 +117,7 @@ public static List<Attribute> computeOutput(List<Attribute> leftOutput, List<Att
case LEFT -> {
// Right side becomes nullable.
List<Attribute> fieldsAddedFromRight = removeCollisionsWithMatchFields(rightOutput, matchFieldSet, matchFieldNames);
yield mergeOutputAttributes(makeNullable(makeReference(fieldsAddedFromRight)), leftOutput);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this nifty workaround was used to prevent extraction from the source - now it's no longer needed (and furthermore creates problems in the planner).

Comment on lines -42 to +41
LocalSourceExec hashData,
PhysicalPlan left,
PhysicalPlan hashData,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the 'embedded' subplan, part of the overall plan.

/**
* Physical plan representing a subquery, meaning a section of the plan that needs to be executed independently.
*/
public class SubqueryExec extends UnaryExec {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check but I believe we might not need this yet.

Comment on lines +178 to +200
private void executeSubPlans(
PhysicalPlan physicalPlan,
PlanRunner runner,
EsqlExecutionInfo executionInfo,
EsqlQueryRequest request,
ActionListener<Result> listener
) {
List<PlanTuple> subplans = new ArrayList<>();

// Currently the inlinestats are limited and supported as streaming operators, thus present inside the fragment as logical plans
// Below they get collected, translated into a separate, coordinator based plan and the results 'broadcasted' as a local relation
physicalPlan.forEachUp(FragmentExec.class, f -> {
f.fragment().forEachUp(InlineJoin.class, ij -> {
// extract the right side of the plan and replace its source
LogicalPlan subplan = InlineJoin.replaceStub(ij.left(), ij.right());
// mark the new root node as optimized
subplan.setOptimized();
PhysicalPlan subqueryPlan = logicalPlanToPhysicalPlan(subplan, request);
subplans.add(new PlanTuple(subqueryPlan, ij.right()));
});
});

Iterator<PlanTuple> iterator = subplans.iterator();
Copy link
Member Author

@costin costin Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi "phase" execution - currently highly tailored to InlineJoin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Utils methods from Phased.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be resurrected as logical plan tests.

Copy link
Member Author

@costin costin Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@quux00 example of moving the CCS stuff outside EsqlSession. Essentially all static methods and the listener can easily be moved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it's mostly similar to what I did in my refactoring PR: https://github.com/elastic/elasticsearch/pull/115976/files#diff-4365e19b82cdd92b19ceba22a3178836bf06c2241f9ff4d1e9100fdef3b93dfd

So we can go with yours.

* Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so
* the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error.
*/
abstract static class CssPartialErrorsActionListener implements ActionListener<LogicalPlan> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@quux00 Moved the listener out, keeping only the necessary bits and letting the caller implement the onResult()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a slightly different approach in my refactoring PR: https://github.com/elastic/elasticsearch/pull/115976/files#diff-40060e2ec9003953a228c4a03bdc80a301949b0d4e3dccc1978798e33a992a73R146

but this works, so if you prefer your model, that's fine with me.

runPhase.accept(logicalPlanToPhysicalPlan(optimizedPlan, request), listener);
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
// TODO: this could be snuck into the underlying listener
CcsUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@quux00 let me know if this makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing to test is to make sure it doesn't throw assertion errors when running with INLINESTATS, since that can do planning twice and this only wants to be called once (until we make it work with the 2 phase execution model). If it passes that, then it's fine for now. Based on reading the code below I think we're OK since the recursion is in executeSubPlans.

Block[] blocks = SessionUtils.fromPages(schema, pages);
return new LocalRelation(plan.source(), schema, LocalSupplier.of(blocks));
}

private LogicalPlan parse(String query, QueryParams params) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@quux00 please double check this section.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, looks good. Same thing I did in my refactoring PR. As long as the various CrossCluster IT tests (like CrossClustersQueryIT) pass, you're good.

Copy link
Contributor

@astefan astefan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment so that these tests can easily be found later. As they are now, it's just an ignored set of tests.


@Override
public String commandName() {
return "<telemetry needs decoupling from the logical plan>";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion would be to create an issue and add a TODO here. I think this thing here will actually report a metric with this text string. CC @luigidellaquila

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the update to the TelemetryIT where a ghost STATS command is counted, separately from INLINESTATS
02fab88 (#115813)

package org.elasticsearch.xpack.esql.plan.logical;

/**
* Interface signaling to the planner that the declaring plan.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the javadoc is incomplete :-).

/**
* Class for sharing code across Mappers.
*/
class Common {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name doesn't say much and I think it could help a better one. Maybe MapperUtils?

// collect source attributes and add the extractor
var extractor = new FieldExtractExec(p.source(), p.child(), List.copyOf(missing));
p = p.replaceChild(extractor);
// identify child (for binary nodes) that exports doc values and place the field extractor there
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it confusing to name this "doc values". Isn't this about doc id? (EsQueryExec.DOC_ID_FIELD)

Copy link
Contributor

@quux00 quux00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (I only reviewed the parts flagged around the CCS refactorings.) There are a few other minor things I did my refactoring PR, but we can probably close that PR in favor of yours and I'll make the other minor tweaks in my next non-refactoring PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it's mostly similar to what I did in my refactoring PR: https://github.com/elastic/elasticsearch/pull/115976/files#diff-4365e19b82cdd92b19ceba22a3178836bf06c2241f9ff4d1e9100fdef3b93dfd

So we can go with yours.

* Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so
* the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error.
*/
abstract static class CssPartialErrorsActionListener implements ActionListener<LogicalPlan> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a slightly different approach in my refactoring PR: https://github.com/elastic/elasticsearch/pull/115976/files#diff-40060e2ec9003953a228c4a03bdc80a301949b0d4e3dccc1978798e33a992a73R146

but this works, so if you prefer your model, that's fine with me.

runPhase.accept(logicalPlanToPhysicalPlan(optimizedPlan, request), listener);
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
// TODO: this could be snuck into the underlying listener
CcsUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing to test is to make sure it doesn't throw assertion errors when running with INLINESTATS, since that can do planning twice and this only wants to be called once (until we make it work with the 2 phase execution model). If it passes that, then it's fine for now. Based on reading the code below I think we're OK since the recursion is in executeSubPlans.

Block[] blocks = SessionUtils.fromPages(schema, pages);
return new LocalRelation(plan.source(), schema, LocalSupplier.of(blocks));
}

private LogicalPlan parse(String query, QueryParams params) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, looks good. Same thing I did in my refactoring PR. As long as the various CrossCluster IT tests (like CrossClustersQueryIT) pass, you're good.

@@ -45,7 +45,7 @@ public void testCreateIndexExpressionFromAvailableClusters() {
executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true));

String indexExpr = EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo);
String indexExpr = CcsUtils.createIndexExpressionFromAvailableClusters(executionInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a follow on PR, I'll remove this test and create a CCSUtils test to house these tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @quux00 - hope I didn't waste too much of your time with the duplicate PR. I ended up doing them while working on the merging conflicts and made the pings hoping to draw your attention before too long.

I'll try to get this is in ASAP so you can continue to add your own modification in.

@costin costin force-pushed the esql/inlinestats-as-join branch 2 times, most recently from 06da516 to 02fab88 Compare October 31, 2024 02:12
@costin costin enabled auto-merge (squash) October 31, 2024 04:33
@costin costin merged commit 4ee98e8 into elastic:main Oct 31, 2024
16 checks passed
@costin costin deleted the esql/inlinestats-as-join branch October 31, 2024 05:35
@costin
Copy link
Member Author

costin commented Oct 31, 2024

Backport to 8.x through #116045

elasticsearchmachine pushed a commit that referenced this pull request Oct 31, 2024
* ESQL: Refactor Join inside the planner (#115813)

First PR that introduces a Join as a first class citizen in the planner.
Previously the Join was modeled as a unary node, embedding the right
side as a local relationship inside the node but not exposed as a child.
This caused a lot the associated methods (like references, output and
inputSet) to misbehave and the physical plan rules to pick incorrect
information, such as trying to extract the local relationship fields
from the underlying source - the fix was to the local relationship
fields as ReferenceAttribute (which of course had its own set of
issues). Essentially Join was acting both as a source and as a streaming
operator.

This PR looks to partially address this by:
- refactoring Join into a proper binary node with left and right
 branches which are used for its references and input/outputSet.
- refactoring InlineStats to prefer composition and move the Aggregate
 on the join right branch. This reuses the Aggregate resolution out of
 the box; in the process remove the Stats interface.
- update some of the planner rules that only worked with Unary nodes.
- refactor Mapper into (coordinator) Mapper and LocalMapper.
- remove Phased interface by moving its functionality inside the planner
(no need to unpack the phased classes, the join already indicates the
two branches needed).
- massage the Phased execution inside EsqlSession
- improve FieldExtractor to handle binary nodes
- fix incorrect references in Lookup
- generalize ProjectAwayColumns rule

Relates #112266

Not all inline and lookup tests are passing:
- 2 lookup fields are failing due to name clashes (qualifiers should
 fix this)
- 7 or so inline failures with a similar issue

I've disabled the tests for now to have them around once we complete
 adding the functionality.

(cherry picked from commit 4ee98e8)

* ES|QL: Mute test for #116003 (#116005)

(cherry picked from commit 681f509)

---------

Co-authored-by: Luigi Dell'Aquila <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL >refactoring Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.0.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants