groupings();
-
-}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/SurrogateLogicalPlan.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/SurrogateLogicalPlan.java
new file mode 100644
index 0000000000000..96a64452ea762
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/SurrogateLogicalPlan.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plan.logical;
+
+/**
+ * Interface signaling to the planner that the declaring plan should be replaced with the surrogate plan.
+ * This usually occurs for predefined commands that get "normalized" into a more generic form.
+ * @see org.elasticsearch.xpack.esql.expression.SurrogateExpression
+ */
+public interface SurrogateLogicalPlan {
+ /**
+ * Returns the plan to be replaced with.
+ */
+ LogicalPlan surrogate();
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
new file mode 100644
index 0000000000000..87c9db1db4807
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plan.logical.join;
+
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockUtils;
+import org.elasticsearch.xpack.esql.core.expression.Alias;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.expression.Literal;
+import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.Project;
+import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
+import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Specialized type of join where the source of the left and right plans are the same. The plans themselves can contain different nodes
+ * however at the core, both have the same source.
+ * Furthermore, this type of join indicates the right side is performing a subquery identical to the left side - meaning its result is
+ * required before joining with the left side.
+ *
+ * This helps the model since we want any transformation applied to the source to show up on both sides of the join - due the immutability
+ * of the tree (which uses value instead of reference semantics), even if the same node instance would be used, any transformation applied
+ * on one side (which would create a new source) would not be reflected on the other side (still use the old source instance).
+ * This dedicated instance handles that by replacing the source of the right with a StubRelation that simplifies copies the output of the
+ * source, making it easy to serialize/deserialize as well as traversing the plan.
+ */
+public class InlineJoin extends Join {
+ public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+ LogicalPlan.class,
+ "InlineJoin",
+ InlineJoin::readFrom
+ );
+
+ /**
+ * Replaces the source of the target plan with a stub preserving the output of the source plan.
+ */
+ public static LogicalPlan stubSource(UnaryPlan sourcePlan, LogicalPlan target) {
+ return sourcePlan.replaceChild(new StubRelation(sourcePlan.source(), target.output()));
+ }
+
+ /**
+ * Replaces the stubbed source with the actual source.
+ */
+ public static LogicalPlan replaceStub(LogicalPlan source, LogicalPlan stubbed) {
+ return stubbed.transformUp(StubRelation.class, stubRelation -> source);
+ }
+
+ /**
+ * TODO: perform better planning
+ * Keep the join in place or replace it with a projection in case no grouping is necessary.
+ */
+ public static LogicalPlan inlineData(InlineJoin target, LocalRelation data) {
+ if (target.config().matchFields().isEmpty()) {
+ List schema = data.output();
+ Block[] blocks = data.supplier().get();
+ List aliases = new ArrayList<>(schema.size());
+ for (int i = 0; i < schema.size(); i++) {
+ Attribute attr = schema.get(i);
+ aliases.add(new Alias(attr.source(), attr.name(), Literal.of(attr, BlockUtils.toJavaObject(blocks[i], 0))));
+ }
+ LogicalPlan left = target.left();
+ return new Project(target.source(), left, CollectionUtils.combine(left.output(), aliases));
+ } else {
+ return target.replaceRight(data);
+ }
+ }
+
+ public InlineJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config) {
+ super(source, left, right, config);
+ }
+
+ public InlineJoin(
+ Source source,
+ LogicalPlan left,
+ LogicalPlan right,
+ JoinType type,
+ List matchFields,
+ List leftFields,
+ List rightFields
+ ) {
+ super(source, left, right, type, matchFields, leftFields, rightFields);
+ }
+
+ private static InlineJoin readFrom(StreamInput in) throws IOException {
+ PlanStreamInput planInput = (PlanStreamInput) in;
+ Source source = Source.readFrom(planInput);
+ LogicalPlan left = in.readNamedWriteable(LogicalPlan.class);
+ LogicalPlan right = in.readNamedWriteable(LogicalPlan.class);
+ JoinConfig config = new JoinConfig(in);
+ return new InlineJoin(source, left, replaceStub(left, right), config);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return ENTRY.name;
+ }
+
+ @Override
+ protected NodeInfo info() {
+ // Do not just add the JoinConfig as a whole - this would prevent correctly registering the
+ // expressions and references.
+ JoinConfig config = config();
+ return NodeInfo.create(
+ this,
+ InlineJoin::new,
+ left(),
+ right(),
+ config.type(),
+ config.matchFields(),
+ config.leftFields(),
+ config.rightFields()
+ );
+ }
+
+ @Override
+ public Join replaceChildren(LogicalPlan left, LogicalPlan right) {
+ return new InlineJoin(source(), left, right, config());
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java
index e920028f04cb9..f9be61ed2c8d7 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java
@@ -61,7 +61,7 @@ public Join(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
- source().writeTo(out);
+ Source.EMPTY.writeTo(out);
out.writeNamedWriteable(left());
out.writeNamedWriteable(right());
config.writeTo(out);
@@ -76,11 +76,6 @@ public JoinConfig config() {
return config;
}
- @Override
- protected AttributeSet computeReferences() {
- return Expressions.references(config.leftFields()).combine(Expressions.references(config.rightFields()));
- }
-
@Override
protected NodeInfo info() {
// Do not just add the JoinConfig as a whole - this would prevent correctly registering the
@@ -98,10 +93,6 @@ protected NodeInfo info() {
}
@Override
- public Join replaceChildren(List newChildren) {
- return new Join(source(), newChildren.get(0), newChildren.get(1), config);
- }
-
public Join replaceChildren(LogicalPlan left, LogicalPlan right) {
return new Join(source(), left, right, config);
}
@@ -126,7 +117,7 @@ public static List computeOutput(List leftOutput, List {
// Right side becomes nullable.
List fieldsAddedFromRight = removeCollisionsWithMatchFields(rightOutput, matchFieldSet, matchFieldNames);
- yield mergeOutputAttributes(makeNullable(makeReference(fieldsAddedFromRight)), leftOutput);
+ yield mergeOutputAttributes(fieldsAddedFromRight, leftOutput);
}
default -> throw new UnsupportedOperationException("Other JOINs than LEFT not supported");
};
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/StubRelation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/StubRelation.java
new file mode 100644
index 0000000000000..4f04024d61d46
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/StubRelation.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plan.logical.join;
+
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Collections.emptyList;
+
+/**
+ * Synthetic {@link LogicalPlan} used by the planner that the child plan is referred elsewhere.
+ * Essentially this means
+ * referring to another node in the plan and acting as a relationship.
+ * Used for duplicating parts of the plan without having to clone the nodes.
+ */
+public class StubRelation extends LeafPlan {
+ public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+ LogicalPlan.class,
+ "StubRelation",
+ StubRelation::new
+ );
+
+ private final List output;
+
+ public StubRelation(Source source, List output) {
+ super(source);
+ this.output = output;
+ }
+
+ public StubRelation(StreamInput in) throws IOException {
+ this(Source.readFrom((PlanStreamInput) in), emptyList());
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ Source.EMPTY.writeTo(out);
+ }
+
+ @Override
+ public List output() {
+ return output;
+ }
+
+ @Override
+ public boolean expressionsResolved() {
+ return true;
+ }
+
+ @Override
+ protected NodeInfo info() {
+ return NodeInfo.create(this, StubRelation::new, output);
+ }
+
+ @Override
+ public String commandName() {
+ return "";
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(StubRelation.class, output);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return ENTRY.name;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ StubRelation other = (StubRelation) obj;
+ return Objects.equals(output, other.output());
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplier.java
index 8bcf5c472b2d0..c076a23891bd8 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplier.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplier.java
@@ -17,10 +17,10 @@
/**
* A {@link LocalSupplier} that contains already filled {@link Block}s.
*/
-class ImmediateLocalSupplier implements LocalSupplier {
+public class ImmediateLocalSupplier implements LocalSupplier {
private final Block[] blocks;
- ImmediateLocalSupplier(Block[] blocks) {
+ public ImmediateLocalSupplier(Block[] blocks) {
this.blocks = blocks;
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/BinaryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/BinaryExec.java
new file mode 100644
index 0000000000000..6f200bad17a72
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/BinaryExec.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plan.physical;
+
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+public abstract class BinaryExec extends PhysicalPlan {
+
+ private final PhysicalPlan left, right;
+
+ protected BinaryExec(Source source, PhysicalPlan left, PhysicalPlan right) {
+ super(source, Arrays.asList(left, right));
+ this.left = left;
+ this.right = right;
+ }
+
+ @Override
+ public final BinaryExec replaceChildren(List newChildren) {
+ return replaceChildren(newChildren.get(0), newChildren.get(1));
+ }
+
+ protected abstract BinaryExec replaceChildren(PhysicalPlan newLeft, PhysicalPlan newRight);
+
+ public PhysicalPlan left() {
+ return left;
+ }
+
+ public PhysicalPlan right() {
+ return right;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ Source.EMPTY.writeTo(out);
+ out.writeNamedWriteable(left);
+ out.writeNamedWriteable(right);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(left, right);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ BinaryExec other = (BinaryExec) obj;
+ return Objects.equals(left, other.left) && Objects.equals(right, other.right);
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExec.java
index 7594c971b7ffc..5b1ee14642dbe 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExec.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExec.java
@@ -111,6 +111,10 @@ public PhysicalPlan estimateRowSize(State state) {
: new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
}
+ public FragmentExec withFragment(LogicalPlan fragment) {
+ return Objects.equals(fragment, this.fragment) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
+ }
+
public FragmentExec withFilter(QueryBuilder filter) {
return Objects.equals(filter, this.esFilter) ? this : new FragmentExec(source(), fragment, filter, estimatedRowSize, reducer);
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java
index 5b83c4d95cabf..4574c3720f8ee 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java
@@ -22,14 +22,13 @@
import java.util.Objects;
import java.util.Set;
-public class HashJoinExec extends UnaryExec implements EstimatesRowSize {
+public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
PhysicalPlan.class,
"HashJoinExec",
HashJoinExec::new
);
- private final LocalSourceExec joinData;
private final List matchFields;
private final List leftFields;
private final List rightFields;
@@ -38,15 +37,14 @@ public class HashJoinExec extends UnaryExec implements EstimatesRowSize {
public HashJoinExec(
Source source,
- PhysicalPlan child,
- LocalSourceExec hashData,
+ PhysicalPlan left,
+ PhysicalPlan hashData,
List matchFields,
List leftFields,
List rightFields,
List output
) {
- super(source, child);
- this.joinData = hashData;
+ super(source, left, hashData);
this.matchFields = matchFields;
this.leftFields = leftFields;
this.rightFields = rightFields;
@@ -54,8 +52,7 @@ public HashJoinExec(
}
private HashJoinExec(StreamInput in) throws IOException {
- super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(PhysicalPlan.class));
- this.joinData = new LocalSourceExec(in);
+ super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(PhysicalPlan.class), in.readNamedWriteable(PhysicalPlan.class));
this.matchFields = in.readNamedWriteableCollectionAsList(Attribute.class);
this.leftFields = in.readNamedWriteableCollectionAsList(Attribute.class);
this.rightFields = in.readNamedWriteableCollectionAsList(Attribute.class);
@@ -64,9 +61,7 @@ private HashJoinExec(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
- source().writeTo(out);
- out.writeNamedWriteable(child());
- joinData.writeTo(out);
+ super.writeTo(out);
out.writeNamedWriteableCollection(matchFields);
out.writeNamedWriteableCollection(leftFields);
out.writeNamedWriteableCollection(rightFields);
@@ -78,8 +73,8 @@ public String getWriteableName() {
return ENTRY.name;
}
- public LocalSourceExec joinData() {
- return joinData;
+ public PhysicalPlan joinData() {
+ return right();
}
public List matchFields() {
@@ -97,7 +92,7 @@ public List rightFields() {
public Set addedFields() {
if (lazyAddedFields == null) {
lazyAddedFields = outputSet();
- lazyAddedFields.removeAll(child().output());
+ lazyAddedFields.removeAll(left().output());
}
return lazyAddedFields;
}
@@ -113,19 +108,25 @@ public List output() {
return output;
}
+ @Override
+ public AttributeSet inputSet() {
+ // TODO: this is a hack until qualifiers land since the right side is always materialized
+ return left().outputSet();
+ }
+
@Override
protected AttributeSet computeReferences() {
return Expressions.references(leftFields);
}
@Override
- public HashJoinExec replaceChild(PhysicalPlan newChild) {
- return new HashJoinExec(source(), newChild, joinData, matchFields, leftFields, rightFields, output);
+ public HashJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
+ return new HashJoinExec(source(), left, right, matchFields, leftFields, rightFields, output);
}
@Override
protected NodeInfo extends PhysicalPlan> info() {
- return NodeInfo.create(this, HashJoinExec::new, child(), joinData, matchFields, leftFields, rightFields, output);
+ return NodeInfo.create(this, HashJoinExec::new, left(), right(), matchFields, leftFields, rightFields, output);
}
@Override
@@ -140,8 +141,7 @@ public boolean equals(Object o) {
return false;
}
HashJoinExec hash = (HashJoinExec) o;
- return joinData.equals(hash.joinData)
- && matchFields.equals(hash.matchFields)
+ return matchFields.equals(hash.matchFields)
&& leftFields.equals(hash.leftFields)
&& rightFields.equals(hash.rightFields)
&& output.equals(hash.output);
@@ -149,6 +149,6 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), joinData, matchFields, leftFields, rightFields, output);
+ return Objects.hash(super.hashCode(), matchFields, leftFields, rightFields, output);
}
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/PhysicalPlan.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/PhysicalPlan.java
index 9ddcd97218069..ecf78908d6d3e 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/PhysicalPlan.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/PhysicalPlan.java
@@ -43,6 +43,7 @@ public static List getNamedWriteables() {
ProjectExec.ENTRY,
RowExec.ENTRY,
ShowExec.ENTRY,
+ SubqueryExec.ENTRY,
TopNExec.ENTRY
);
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/SubqueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/SubqueryExec.java
new file mode 100644
index 0000000000000..adc84f06a939e
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/SubqueryExec.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plan.physical;
+
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Physical plan representing a subquery, meaning a section of the plan that needs to be executed independently.
+ */
+public class SubqueryExec extends UnaryExec {
+
+ public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+ PhysicalPlan.class,
+ "SubqueryExec",
+ SubqueryExec::new
+ );
+
+ public SubqueryExec(Source source, PhysicalPlan child) {
+ super(source, child);
+ }
+
+ private SubqueryExec(StreamInput in) throws IOException {
+ super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(PhysicalPlan.class));
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ source().writeTo(out);
+ out.writeNamedWriteable(child());
+ }
+
+ @Override
+ public SubqueryExec replaceChild(PhysicalPlan newChild) {
+ return new SubqueryExec(source(), newChild);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return ENTRY.name;
+ }
+
+ @Override
+ protected NodeInfo extends PhysicalPlan> info() {
+ return NodeInfo.create(this, SubqueryExec::new, child());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (super.equals(o) == false) return false;
+ SubqueryExec that = (SubqueryExec) o;
+ return Objects.equals(child(), that.child());
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
index dc732258d9fa5..0d0b8dda5fc74 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
@@ -496,18 +496,19 @@ private PhysicalOperation planEnrich(EnrichExec enrich, LocalExecutionPlannerCon
}
private PhysicalOperation planHashJoin(HashJoinExec join, LocalExecutionPlannerContext context) {
- PhysicalOperation source = plan(join.child(), context);
+ PhysicalOperation source = plan(join.left(), context);
int positionsChannel = source.layout.numberOfChannels();
Layout.Builder layoutBuilder = source.layout.builder();
for (Attribute f : join.output()) {
- if (join.child().outputSet().contains(f)) {
+ if (join.left().outputSet().contains(f)) {
continue;
}
layoutBuilder.append(f);
}
Layout layout = layoutBuilder.build();
- Block[] localData = join.joinData().supplier().get();
+ LocalSourceExec localSourceExec = (LocalSourceExec) join.joinData();
+ Block[] localData = localSourceExec.supplier().get();
RowInTableLookupOperator.Key[] keys = new RowInTableLookupOperator.Key[join.leftFields().size()];
int[] blockMapping = new int[join.leftFields().size()];
@@ -515,8 +516,9 @@ private PhysicalOperation planHashJoin(HashJoinExec join, LocalExecutionPlannerC
Attribute left = join.leftFields().get(k);
Attribute right = join.rightFields().get(k);
Block localField = null;
- for (int l = 0; l < join.joinData().output().size(); l++) {
- if (join.joinData().output().get(l).name().equals((((NamedExpression) right).name()))) {
+ List output = join.joinData().output();
+ for (int l = 0; l < output.size(); l++) {
+ if (output.get(l).name().equals(right.name())) {
localField = localData[l];
}
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java
deleted file mode 100644
index a8f820c8ef3fd..0000000000000
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.esql.planner;
-
-import org.elasticsearch.common.lucene.BytesRefs;
-import org.elasticsearch.compute.aggregation.AggregatorMode;
-import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
-import org.elasticsearch.xpack.esql.core.expression.Attribute;
-import org.elasticsearch.xpack.esql.core.expression.Literal;
-import org.elasticsearch.xpack.esql.core.tree.Source;
-import org.elasticsearch.xpack.esql.core.type.DataType;
-import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
-import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
-import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
-import org.elasticsearch.xpack.esql.plan.logical.Dissect;
-import org.elasticsearch.xpack.esql.plan.logical.Enrich;
-import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
-import org.elasticsearch.xpack.esql.plan.logical.Eval;
-import org.elasticsearch.xpack.esql.plan.logical.Filter;
-import org.elasticsearch.xpack.esql.plan.logical.Grok;
-import org.elasticsearch.xpack.esql.plan.logical.Limit;
-import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
-import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
-import org.elasticsearch.xpack.esql.plan.logical.Project;
-import org.elasticsearch.xpack.esql.plan.logical.Row;
-import org.elasticsearch.xpack.esql.plan.logical.TopN;
-import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
-import org.elasticsearch.xpack.esql.plan.logical.join.Join;
-import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
-import org.elasticsearch.xpack.esql.plan.logical.join.JoinType;
-import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
-import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
-import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
-import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
-import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
-import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
-import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
-import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
-import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
-import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
-import org.elasticsearch.xpack.esql.plan.physical.GrokExec;
-import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
-import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
-import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
-import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
-import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
-import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
-import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
-import org.elasticsearch.xpack.esql.plan.physical.RowExec;
-import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
-import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
-import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * This class is part of the planner
- *
- * Translates the logical plan into a physical plan. This is where we start to decide what will be executed on the data nodes and what
- * will be executed on the coordinator nodes. This step creates {@link org.elasticsearch.xpack.esql.plan.physical.FragmentExec} instances,
- * which represent logical plan fragments to be sent to the data nodes and {@link org.elasticsearch.xpack.esql.plan.physical.ExchangeExec}
- * instances, which represent data being sent back from the data nodes to the coordinating node.
- */
-public class Mapper {
-
- private final EsqlFunctionRegistry functionRegistry;
- private final boolean localMode; // non-coordinator (data node) mode
-
- public Mapper(EsqlFunctionRegistry functionRegistry) {
- this.functionRegistry = functionRegistry;
- localMode = false;
- }
-
- public Mapper(boolean localMode) {
- this.functionRegistry = null;
- this.localMode = localMode;
- }
-
- public PhysicalPlan map(LogicalPlan p) {
- //
- // Leaf Node
- //
-
- // Source
- if (p instanceof EsRelation esRelation) {
- return localMode ? new EsSourceExec(esRelation) : new FragmentExec(p);
- }
-
- if (p instanceof Row row) {
- return new RowExec(row.source(), row.fields());
- }
-
- if (p instanceof LocalRelation local) {
- return new LocalSourceExec(local.source(), local.output(), local.supplier());
- }
-
- // Commands
- if (p instanceof ShowInfo showInfo) {
- return new ShowExec(showInfo.source(), showInfo.output(), showInfo.values());
- }
-
- //
- // Unary Plan
- //
- if (localMode == false && p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
- // When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely.
- // We're only going to do it on the coordinator node.
- // The way we're going to do it is as follows:
- // 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything.
- // 2. Put this Enrich under it, removing everything that was below it previously.
- // 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under
- // FragmentExec.
- // 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich.
- // 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
-
- var child = map(enrich.child());
- AtomicBoolean hasFragment = new AtomicBoolean(false);
-
- var childTransformed = child.transformUp((f) -> {
- // Once we reached FragmentExec, we stuff our Enrich under it
- if (f instanceof FragmentExec) {
- hasFragment.set(true);
- return new FragmentExec(p);
- }
- if (f instanceof EnrichExec enrichExec) {
- // It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec
- assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here";
- return enrichExec.child();
- }
- if (f instanceof UnaryExec unaryExec) {
- if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) {
- return f;
- } else {
- return unaryExec.child();
- }
- }
- // Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
- return f;
- });
-
- if (hasFragment.get()) {
- return childTransformed;
- }
- }
-
- if (p instanceof UnaryPlan ua) {
- var child = map(ua.child());
- if (child instanceof FragmentExec) {
- // COORDINATOR enrich must not be included to the fragment as it has to be executed on the coordinating node
- if (p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
- assert localMode == false : "coordinator enrich must not be included to a fragment and re-planned locally";
- child = addExchangeForFragment(enrich.child(), child);
- return map(enrich, child);
- }
- // in case of a fragment, push to it any current streaming operator
- if (isPipelineBreaker(p) == false) {
- return new FragmentExec(p);
- }
- }
- return map(ua, child);
- }
-
- if (p instanceof BinaryPlan bp) {
- var left = map(bp.left());
- var right = map(bp.right());
-
- if (left instanceof FragmentExec) {
- if (right instanceof FragmentExec) {
- throw new EsqlIllegalArgumentException("can't plan binary [" + p.nodeName() + "]");
- }
- // in case of a fragment, push to it any current streaming operator
- return new FragmentExec(p);
- }
- if (right instanceof FragmentExec) {
- // in case of a fragment, push to it any current streaming operator
- return new FragmentExec(p);
- }
- return map(bp, left, right);
- }
-
- throw new EsqlIllegalArgumentException("unsupported logical plan node [" + p.nodeName() + "]");
- }
-
- static boolean isPipelineBreaker(LogicalPlan p) {
- return p instanceof Aggregate || p instanceof TopN || p instanceof Limit || p instanceof OrderBy;
- }
-
- private PhysicalPlan map(UnaryPlan p, PhysicalPlan child) {
- //
- // Pipeline operators
- //
- if (p instanceof Filter f) {
- return new FilterExec(f.source(), child, f.condition());
- }
-
- if (p instanceof Project pj) {
- return new ProjectExec(pj.source(), child, pj.projections());
- }
-
- if (p instanceof Eval eval) {
- return new EvalExec(eval.source(), child, eval.fields());
- }
-
- if (p instanceof Dissect dissect) {
- return new DissectExec(dissect.source(), child, dissect.input(), dissect.parser(), dissect.extractedFields());
- }
-
- if (p instanceof Grok grok) {
- return new GrokExec(grok.source(), child, grok.input(), grok.parser(), grok.extractedFields());
- }
-
- if (p instanceof Enrich enrich) {
- return new EnrichExec(
- enrich.source(),
- child,
- enrich.mode(),
- enrich.policy().getType(),
- enrich.matchField(),
- BytesRefs.toString(enrich.policyName().fold()),
- enrich.policy().getMatchField(),
- enrich.concreteIndices(),
- enrich.enrichFields()
- );
- }
-
- if (p instanceof MvExpand mvExpand) {
- MvExpandExec result = new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded());
- if (mvExpand.limit() != null) {
- // MvExpand could have an inner limit
- // see PushDownAndCombineLimits rule
- return new LimitExec(result.source(), result, new Literal(Source.EMPTY, mvExpand.limit(), DataType.INTEGER));
- }
- return result;
- }
-
- //
- // Pipeline breakers
- //
- if (p instanceof Limit limit) {
- return map(limit, child);
- }
-
- if (p instanceof OrderBy o) {
- return map(o, child);
- }
-
- if (p instanceof TopN topN) {
- return map(topN, child);
- }
-
- if (p instanceof Aggregate aggregate) {
- return map(aggregate, child);
- }
-
- throw new EsqlIllegalArgumentException("unsupported logical plan node [" + p.nodeName() + "]");
- }
-
- private PhysicalPlan map(Aggregate aggregate, PhysicalPlan child) {
- List intermediateAttributes = AbstractPhysicalOperationProviders.intermediateAttributes(
- aggregate.aggregates(),
- aggregate.groupings()
- );
- // in local mode the only aggregate that can appear is the partial side under an exchange
- if (localMode) {
- child = aggExec(aggregate, child, AggregatorMode.INITIAL, intermediateAttributes);
- }
- // otherwise create both sides of the aggregate (for parallelism purposes), if no fragment is present
- // TODO: might be easier long term to end up with just one node and split if necessary instead of doing that always at this stage
- else {
- child = addExchangeForFragment(aggregate, child);
- // exchange was added - use the intermediates for the output
- if (child instanceof ExchangeExec exchange) {
- child = new ExchangeExec(child.source(), intermediateAttributes, true, exchange.child());
- }
- // if no exchange was added, create the partial aggregate
- else {
- child = aggExec(aggregate, child, AggregatorMode.INITIAL, intermediateAttributes);
- }
-
- // regardless, always add the final agg
- child = aggExec(aggregate, child, AggregatorMode.FINAL, intermediateAttributes);
- }
-
- return child;
- }
-
- private static AggregateExec aggExec(
- Aggregate aggregate,
- PhysicalPlan child,
- AggregatorMode aggMode,
- List intermediateAttributes
- ) {
- return new AggregateExec(
- aggregate.source(),
- child,
- aggregate.groupings(),
- aggregate.aggregates(),
- aggMode,
- intermediateAttributes,
- null
- );
- }
-
- private PhysicalPlan map(Limit limit, PhysicalPlan child) {
- child = addExchangeForFragment(limit, child);
- return new LimitExec(limit.source(), child, limit.limit());
- }
-
- private PhysicalPlan map(OrderBy o, PhysicalPlan child) {
- child = addExchangeForFragment(o, child);
- return new OrderExec(o.source(), child, o.order());
- }
-
- private PhysicalPlan map(TopN topN, PhysicalPlan child) {
- child = addExchangeForFragment(topN, child);
- return new TopNExec(topN.source(), child, topN.order(), topN.limit(), null);
- }
-
- private PhysicalPlan addExchangeForFragment(LogicalPlan logical, PhysicalPlan child) {
- // in case of fragment, preserve the streaming operator (order-by, limit or topN) for local replanning
- // no need to do it for an aggregate since it gets split
- // and clone it as a physical node along with the exchange
- if (child instanceof FragmentExec) {
- child = new FragmentExec(logical);
- child = new ExchangeExec(child.source(), child);
- }
- return child;
- }
-
- private PhysicalPlan map(BinaryPlan p, PhysicalPlan lhs, PhysicalPlan rhs) {
- if (p instanceof Join join) {
- PhysicalPlan hash = tryHashJoin(join, lhs, rhs);
- if (hash != null) {
- return hash;
- }
- }
- throw new EsqlIllegalArgumentException("unsupported logical plan node [" + p.nodeName() + "]");
- }
-
- private PhysicalPlan tryHashJoin(Join join, PhysicalPlan lhs, PhysicalPlan rhs) {
- JoinConfig config = join.config();
- if (config.type() != JoinType.LEFT) {
- return null;
- }
- if (rhs instanceof LocalSourceExec local) {
- return new HashJoinExec(
- join.source(),
- lhs,
- local,
- config.matchFields(),
- config.leftFields(),
- config.rightFields(),
- join.output()
- );
- }
- return null;
- }
-}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
index 7868984d6b6e2..1758edb386e59 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
@@ -49,6 +49,8 @@
import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
+import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
+import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.stats.SearchStats;
@@ -88,7 +90,7 @@ public static PhysicalPlan dataNodeReductionPlan(LogicalPlan plan, PhysicalPlan
if (pipelineBreakers.isEmpty() == false) {
UnaryPlan pipelineBreaker = (UnaryPlan) pipelineBreakers.get(0);
if (pipelineBreaker instanceof TopN) {
- Mapper mapper = new Mapper(true);
+ LocalMapper mapper = new LocalMapper();
var physicalPlan = EstimatesRowSize.estimateRowSize(0, mapper.map(plan));
return physicalPlan.collectFirstChildren(TopNExec.class::isInstance).get(0);
} else if (pipelineBreaker instanceof Limit limit) {
@@ -96,7 +98,7 @@ public static PhysicalPlan dataNodeReductionPlan(LogicalPlan plan, PhysicalPlan
} else if (pipelineBreaker instanceof OrderBy order) {
return new OrderExec(order.source(), unused, order.order());
} else if (pipelineBreaker instanceof Aggregate) {
- Mapper mapper = new Mapper(true);
+ LocalMapper mapper = new LocalMapper();
var physicalPlan = EstimatesRowSize.estimateRowSize(0, mapper.map(plan));
var aggregate = (AggregateExec) physicalPlan.collectFirstChildren(AggregateExec.class::isInstance).get(0);
return aggregate.withMode(AggregatorMode.INITIAL);
@@ -151,13 +153,13 @@ public static PhysicalPlan localPlan(
LocalLogicalPlanOptimizer logicalOptimizer,
LocalPhysicalPlanOptimizer physicalOptimizer
) {
- final Mapper mapper = new Mapper(true);
+ final LocalMapper localMapper = new LocalMapper();
var isCoordPlan = new Holder<>(Boolean.TRUE);
var localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> {
isCoordPlan.set(Boolean.FALSE);
var optimizedFragment = logicalOptimizer.localOptimize(f.fragment());
- var physicalFragment = mapper.map(optimizedFragment);
+ var physicalFragment = localMapper.map(optimizedFragment);
var filter = f.esFilter();
if (filter != null) {
physicalFragment = physicalFragment.transformUp(
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java
new file mode 100644
index 0000000000000..ceffae704cff0
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.planner.mapper;
+
+import org.elasticsearch.compute.aggregation.AggregatorMode;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
+import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
+import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
+import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
+import org.elasticsearch.xpack.esql.plan.logical.Limit;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
+import org.elasticsearch.xpack.esql.plan.logical.TopN;
+import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
+import org.elasticsearch.xpack.esql.plan.logical.join.Join;
+import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
+import org.elasticsearch.xpack.esql.plan.logical.join.JoinType;
+import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
+import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
+import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
+import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
+import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
+import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
+
+import java.util.List;
+
+/**
+ * Maps a (local) logical plan into a (local) physical plan. This class is the equivalent of {@link Mapper} but for data nodes.
+ *
+ */
+public class LocalMapper {
+
+ public PhysicalPlan map(LogicalPlan p) {
+
+ if (p instanceof LeafPlan leaf) {
+ return mapLeaf(leaf);
+ }
+
+ if (p instanceof UnaryPlan unary) {
+ return mapUnary(unary);
+ }
+
+ if (p instanceof BinaryPlan binary) {
+ return mapBinary(binary);
+ }
+
+ return MapperUtils.unsupported(p);
+ }
+
+ private PhysicalPlan mapLeaf(LeafPlan leaf) {
+ if (leaf instanceof EsRelation esRelation) {
+ return new EsSourceExec(esRelation);
+ }
+
+ return MapperUtils.mapLeaf(leaf);
+ }
+
+ private PhysicalPlan mapUnary(UnaryPlan unary) {
+ PhysicalPlan mappedChild = map(unary.child());
+
+ //
+ // Pipeline breakers
+ //
+
+ if (unary instanceof Aggregate aggregate) {
+ List intermediate = MapperUtils.intermediateAttributes(aggregate);
+ return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
+ }
+
+ if (unary instanceof Limit limit) {
+ return new LimitExec(limit.source(), mappedChild, limit.limit());
+ }
+
+ if (unary instanceof OrderBy o) {
+ return new OrderExec(o.source(), mappedChild, o.order());
+ }
+
+ if (unary instanceof TopN topN) {
+ return new TopNExec(topN.source(), mappedChild, topN.order(), topN.limit(), null);
+ }
+
+ //
+ // Pipeline operators
+ //
+
+ return MapperUtils.mapUnary(unary, mappedChild);
+ }
+
+ private PhysicalPlan mapBinary(BinaryPlan binary) {
+ // special handling for inlinejoin - join + subquery which has to be executed first (async) and replaced by its result
+ if (binary instanceof Join join) {
+ JoinConfig config = join.config();
+ if (config.type() != JoinType.LEFT) {
+ throw new EsqlIllegalArgumentException("unsupported join type [" + config.type() + "]");
+ }
+
+ PhysicalPlan left = map(binary.left());
+ PhysicalPlan right = map(binary.right());
+
+ if (right instanceof LocalSourceExec == false) {
+ throw new EsqlIllegalArgumentException("right side of a join must be a local source");
+ }
+
+ return new HashJoinExec(
+ join.source(),
+ left,
+ right,
+ config.matchFields(),
+ config.leftFields(),
+ config.rightFields(),
+ join.output()
+ );
+ }
+
+ return MapperUtils.unsupported(binary);
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
new file mode 100644
index 0000000000000..b717af650b7a6
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.planner.mapper;
+
+import org.elasticsearch.compute.aggregation.AggregatorMode;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.util.Holder;
+import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
+import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
+import org.elasticsearch.xpack.esql.plan.logical.Enrich;
+import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
+import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
+import org.elasticsearch.xpack.esql.plan.logical.Limit;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
+import org.elasticsearch.xpack.esql.plan.logical.TopN;
+import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
+import org.elasticsearch.xpack.esql.plan.logical.join.Join;
+import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
+import org.elasticsearch.xpack.esql.plan.logical.join.JoinType;
+import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
+import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
+import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
+import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
+import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
+import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
+import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
+import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
+import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
+
+import java.util.List;
+
+/**
+ * This class is part of the planner
+ *
+ * Translates the logical plan into a physical plan. This is where we start to decide what will be executed on the data nodes and what
+ * will be executed on the coordinator nodes. This step creates {@link org.elasticsearch.xpack.esql.plan.physical.FragmentExec} instances,
+ * which represent logical plan fragments to be sent to the data nodes and {@link org.elasticsearch.xpack.esql.plan.physical.ExchangeExec}
+ * instances, which represent data being sent back from the data nodes to the coordinating node.
+ */
+public class Mapper {
+
+ public PhysicalPlan map(LogicalPlan p) {
+
+ if (p instanceof LeafPlan leaf) {
+ return mapLeaf(leaf);
+ }
+
+ if (p instanceof UnaryPlan unary) {
+ return mapUnary(unary);
+ }
+
+ if (p instanceof BinaryPlan binary) {
+ return mapBinary(binary);
+ }
+
+ return MapperUtils.unsupported(p);
+ }
+
+ private PhysicalPlan mapLeaf(LeafPlan leaf) {
+ if (leaf instanceof EsRelation esRelation) {
+ return new FragmentExec(esRelation);
+ }
+
+ return MapperUtils.mapLeaf(leaf);
+ }
+
+ private PhysicalPlan mapUnary(UnaryPlan unary) {
+ PhysicalPlan mappedChild = map(unary.child());
+
+ //
+ // TODO - this is hard to follow and needs reworking
+ // https://github.com/elastic/elasticsearch/issues/115897
+ //
+ if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
+ // When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely.
+ // We're only going to do it on the coordinator node.
+ // The way we're going to do it is as follows:
+ // 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything.
+ // 2. Put this Enrich under it, removing everything that was below it previously.
+ // 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under
+ // FragmentExec.
+ // 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich.
+ // 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
+ Holder hasFragment = new Holder<>(false);
+
+ var childTransformed = mappedChild.transformUp(f -> {
+ // Once we reached FragmentExec, we stuff our Enrich under it
+ if (f instanceof FragmentExec) {
+ hasFragment.set(true);
+ return new FragmentExec(enrich);
+ }
+ if (f instanceof EnrichExec enrichExec) {
+ // It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec
+ assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here";
+ return enrichExec.child();
+ }
+ if (f instanceof UnaryExec unaryExec) {
+ if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) {
+ return f;
+ } else {
+ return unaryExec.child();
+ }
+ }
+ // Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
+ return f;
+ });
+
+ if (hasFragment.get()) {
+ return childTransformed;
+ }
+ }
+
+ if (mappedChild instanceof FragmentExec) {
+ // COORDINATOR enrich must not be included to the fragment as it has to be executed on the coordinating node
+ if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
+ mappedChild = addExchangeForFragment(enrich.child(), mappedChild);
+ return MapperUtils.mapUnary(unary, mappedChild);
+ }
+ // in case of a fragment, push to it any current streaming operator
+ if (isPipelineBreaker(unary) == false) {
+ return new FragmentExec(unary);
+ }
+ }
+
+ //
+ // Pipeline breakers
+ //
+ if (unary instanceof Aggregate aggregate) {
+ List intermediate = MapperUtils.intermediateAttributes(aggregate);
+
+ // create both sides of the aggregate (for parallelism purposes), if no fragment is present
+ // TODO: might be easier long term to end up with just one node and split if necessary instead of doing that always at this
+ // stage
+ mappedChild = addExchangeForFragment(aggregate, mappedChild);
+
+ // exchange was added - use the intermediates for the output
+ if (mappedChild instanceof ExchangeExec exchange) {
+ mappedChild = new ExchangeExec(mappedChild.source(), intermediate, true, exchange.child());
+ }
+ // if no exchange was added (aggregation happening on the coordinator), create the initial agg
+ else {
+ mappedChild = MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
+ }
+
+ // always add the final/reduction agg
+ return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.FINAL, intermediate);
+ }
+
+ if (unary instanceof Limit limit) {
+ mappedChild = addExchangeForFragment(limit, mappedChild);
+ return new LimitExec(limit.source(), mappedChild, limit.limit());
+ }
+
+ if (unary instanceof OrderBy o) {
+ mappedChild = addExchangeForFragment(o, mappedChild);
+ return new OrderExec(o.source(), mappedChild, o.order());
+ }
+
+ if (unary instanceof TopN topN) {
+ mappedChild = addExchangeForFragment(topN, mappedChild);
+ return new TopNExec(topN.source(), mappedChild, topN.order(), topN.limit(), null);
+ }
+
+ //
+ // Pipeline operators
+ //
+ return MapperUtils.mapUnary(unary, mappedChild);
+ }
+
+ private PhysicalPlan mapBinary(BinaryPlan bp) {
+ if (bp instanceof Join join) {
+ JoinConfig config = join.config();
+ if (config.type() != JoinType.LEFT) {
+ throw new EsqlIllegalArgumentException("unsupported join type [" + config.type() + "]");
+ }
+
+ PhysicalPlan left = map(bp.left());
+
+ // only broadcast joins supported for now - hence push down as a streaming operator
+ if (left instanceof FragmentExec fragment) {
+ return new FragmentExec(bp);
+ }
+
+ PhysicalPlan right = map(bp.right());
+ // no fragment means lookup
+ if (right instanceof LocalSourceExec localData) {
+ return new HashJoinExec(
+ join.source(),
+ left,
+ localData,
+ config.matchFields(),
+ config.leftFields(),
+ config.rightFields(),
+ join.output()
+ );
+ }
+ }
+
+ return MapperUtils.unsupported(bp);
+ }
+
+ public static boolean isPipelineBreaker(LogicalPlan p) {
+ return p instanceof Aggregate || p instanceof TopN || p instanceof Limit || p instanceof OrderBy;
+ }
+
+ private PhysicalPlan addExchangeForFragment(LogicalPlan logical, PhysicalPlan child) {
+ // in case of fragment, preserve the streaming operator (order-by, limit or topN) for local replanning
+ // no need to do it for an aggregate since it gets split
+ // and clone it as a physical node along with the exchange
+ if (child instanceof FragmentExec) {
+ child = new FragmentExec(logical);
+ child = new ExchangeExec(child.source(), child);
+ }
+ return child;
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java
new file mode 100644
index 0000000000000..213e33f3712b1
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.planner.mapper;
+
+import org.elasticsearch.common.lucene.BytesRefs;
+import org.elasticsearch.compute.aggregation.AggregatorMode;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.expression.Literal;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
+import org.elasticsearch.xpack.esql.plan.logical.Dissect;
+import org.elasticsearch.xpack.esql.plan.logical.Enrich;
+import org.elasticsearch.xpack.esql.plan.logical.Eval;
+import org.elasticsearch.xpack.esql.plan.logical.Filter;
+import org.elasticsearch.xpack.esql.plan.logical.Grok;
+import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
+import org.elasticsearch.xpack.esql.plan.logical.Project;
+import org.elasticsearch.xpack.esql.plan.logical.Row;
+import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
+import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
+import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
+import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
+import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
+import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
+import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
+import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
+import org.elasticsearch.xpack.esql.plan.physical.GrokExec;
+import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
+import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
+import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
+import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
+import org.elasticsearch.xpack.esql.plan.physical.RowExec;
+import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
+import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
+
+import java.util.List;
+
+/**
+ * Class for sharing code across Mappers.
+ */
+class MapperUtils {
+ private MapperUtils() {}
+
+ static PhysicalPlan mapLeaf(LeafPlan p) {
+ if (p instanceof Row row) {
+ return new RowExec(row.source(), row.fields());
+ }
+
+ if (p instanceof LocalRelation local) {
+ return new LocalSourceExec(local.source(), local.output(), local.supplier());
+ }
+
+ // Commands
+ if (p instanceof ShowInfo showInfo) {
+ return new ShowExec(showInfo.source(), showInfo.output(), showInfo.values());
+ }
+
+ return unsupported(p);
+ }
+
+ static PhysicalPlan mapUnary(UnaryPlan p, PhysicalPlan child) {
+ if (p instanceof Filter f) {
+ return new FilterExec(f.source(), child, f.condition());
+ }
+
+ if (p instanceof Project pj) {
+ return new ProjectExec(pj.source(), child, pj.projections());
+ }
+
+ if (p instanceof Eval eval) {
+ return new EvalExec(eval.source(), child, eval.fields());
+ }
+
+ if (p instanceof Dissect dissect) {
+ return new DissectExec(dissect.source(), child, dissect.input(), dissect.parser(), dissect.extractedFields());
+ }
+
+ if (p instanceof Grok grok) {
+ return new GrokExec(grok.source(), child, grok.input(), grok.parser(), grok.extractedFields());
+ }
+
+ if (p instanceof Enrich enrich) {
+ return new EnrichExec(
+ enrich.source(),
+ child,
+ enrich.mode(),
+ enrich.policy().getType(),
+ enrich.matchField(),
+ BytesRefs.toString(enrich.policyName().fold()),
+ enrich.policy().getMatchField(),
+ enrich.concreteIndices(),
+ enrich.enrichFields()
+ );
+ }
+
+ if (p instanceof MvExpand mvExpand) {
+ MvExpandExec result = new MvExpandExec(mvExpand.source(), child, mvExpand.target(), mvExpand.expanded());
+ if (mvExpand.limit() != null) {
+ // MvExpand could have an inner limit
+ // see PushDownAndCombineLimits rule
+ return new LimitExec(result.source(), result, new Literal(Source.EMPTY, mvExpand.limit(), DataType.INTEGER));
+ }
+ return result;
+ }
+
+ return unsupported(p);
+ }
+
+ static List intermediateAttributes(Aggregate aggregate) {
+ List intermediateAttributes = AbstractPhysicalOperationProviders.intermediateAttributes(
+ aggregate.aggregates(),
+ aggregate.groupings()
+ );
+ return intermediateAttributes;
+ }
+
+ static AggregateExec aggExec(Aggregate aggregate, PhysicalPlan child, AggregatorMode aggMode, List intermediateAttributes) {
+ return new AggregateExec(
+ aggregate.source(),
+ child,
+ aggregate.groupings(),
+ aggregate.aggregates(),
+ aggMode,
+ intermediateAttributes,
+ null
+ );
+ }
+
+ static PhysicalPlan unsupported(LogicalPlan p) {
+ throw new EsqlIllegalArgumentException("unsupported logical plan node [" + p.nodeName() + "]");
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
index 193930cdf711d..507339ba145fa 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
@@ -39,8 +39,8 @@
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
import org.elasticsearch.xpack.esql.execution.PlanExecutor;
-import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.Configuration;
+import org.elasticsearch.xpack.esql.session.EsqlSession.PlanRunner;
import org.elasticsearch.xpack.esql.session.Result;
import java.io.IOException;
@@ -49,7 +49,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executor;
-import java.util.function.BiConsumer;
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
@@ -171,10 +170,10 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener remoteClusterService.isSkipUnavailable(clusterAlias),
request.includeCCSMetadata()
);
- BiConsumer> runPhase = (physicalPlan, resultListener) -> computeService.execute(
+ PlanRunner planRunner = (plan, resultListener) -> computeService.execute(
sessionId,
(CancellableTask) task,
- physicalPlan,
+ plan,
configuration,
executionInfo,
resultListener
@@ -186,7 +185,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener toResponse(task, request, configuration, result))
);
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/CcsUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/CcsUtils.java
new file mode 100644
index 0000000000000..a9314e6f65d87
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/CcsUtils.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.session;
+
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.RemoteClusterAware;
+import org.elasticsearch.transport.RemoteTransportException;
+import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
+import org.elasticsearch.xpack.esql.analysis.Analyzer;
+import org.elasticsearch.xpack.esql.index.IndexResolution;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+class CcsUtils {
+
+ private CcsUtils() {}
+
+ /**
+ * ActionListener that receives LogicalPlan or error from logical planning.
+ * Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so
+ * the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error.
+ */
+ abstract static class CssPartialErrorsActionListener implements ActionListener {
+ private final EsqlExecutionInfo executionInfo;
+ private final ActionListener listener;
+
+ CssPartialErrorsActionListener(EsqlExecutionInfo executionInfo, ActionListener listener) {
+ this.executionInfo = executionInfo;
+ this.listener = listener;
+ }
+
+ /**
+ * Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
+ *
+ * For cases where field-caps had no indices to search and the remotes were unavailable, we
+ * return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
+ *
+ * Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
+ * on any of the requested clusters.
+ */
+ private boolean returnSuccessWithEmptyResult(Exception e) {
+ if (executionInfo.isCrossClusterSearch() == false) {
+ return false;
+ }
+
+ if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
+ for (String clusterAlias : executionInfo.clusterAliases()) {
+ if (executionInfo.isSkipUnavailable(clusterAlias) == false
+ && clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ if (returnSuccessWithEmptyResult(e)) {
+ executionInfo.markEndQuery();
+ Exception exceptionForResponse;
+ if (e instanceof ConnectTransportException) {
+ // when field-caps has no field info (since no clusters could be connected to or had matching indices)
+ // it just throws the first exception in its list, so this odd special handling is here is to avoid
+ // having one specific remote alias name in all failure lists in the metadata response
+ exceptionForResponse = new RemoteTransportException(
+ "connect_transport_exception - unable to connect to remote cluster",
+ null
+ );
+ } else {
+ exceptionForResponse = e;
+ }
+ for (String clusterAlias : executionInfo.clusterAliases()) {
+ executionInfo.swapCluster(clusterAlias, (k, v) -> {
+ EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(
+ executionInfo.overallTook()
+ ).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0);
+ if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
+ // never mark local cluster as skipped
+ builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
+ } else {
+ builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED);
+ // add this exception to the failures list only if there is no failure already recorded there
+ if (v.getFailures() == null || v.getFailures().size() == 0) {
+ builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
+ }
+ }
+ return builder.build();
+ });
+ }
+ listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo));
+ } else {
+ listener.onFailure(e);
+ }
+ }
+ }
+
+ // visible for testing
+ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
+ StringBuilder sb = new StringBuilder();
+ for (String clusterAlias : executionInfo.clusterAliases()) {
+ EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
+ if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
+ if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
+ sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
+ } else {
+ String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression();
+ for (String index : indexExpression.split(",")) {
+ sb.append(clusterAlias).append(':').append(index).append(',');
+ }
+ }
+ }
+ }
+
+ if (sb.length() > 0) {
+ return sb.substring(0, sb.length() - 1);
+ } else {
+ return "";
+ }
+ }
+
+ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map unavailable) {
+ for (Map.Entry entry : unavailable.entrySet()) {
+ String clusterAlias = entry.getKey();
+ boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable();
+ RemoteTransportException e = new RemoteTransportException(
+ Strings.format("Remote cluster [%s] (with setting skip_unavailable=%s) is not available", clusterAlias, skipUnavailable),
+ entry.getValue().getException()
+ );
+ if (skipUnavailable) {
+ execInfo.swapCluster(
+ clusterAlias,
+ (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED)
+ .setTotalShards(0)
+ .setSuccessfulShards(0)
+ .setSkippedShards(0)
+ .setFailedShards(0)
+ .setFailures(List.of(new ShardSearchFailure(e)))
+ .build()
+ );
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ // visible for testing
+ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
+ Set clustersWithResolvedIndices = new HashSet<>();
+ // determine missing clusters
+ for (String indexName : indexResolution.get().indexNameWithModes().keySet()) {
+ clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName));
+ }
+ Set clustersRequested = executionInfo.clusterAliases();
+ Set clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
+ clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters().keySet());
+ /*
+ * These are clusters in the original request that are not present in the field-caps response. They were
+ * specified with an index or indices that do not exist, so the search on that cluster is done.
+ * Mark it as SKIPPED with 0 shards searched and took=0.
+ */
+ for (String c : clustersWithNoMatchingIndices) {
+ // TODO: in a follow-on PR, throw a Verification(400 status code) for local and remotes with skip_unavailable=false if
+ // they were requested with one or more concrete indices
+ // for now we never mark the local cluster as SKIPPED
+ final var status = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(c)
+ ? EsqlExecutionInfo.Cluster.Status.SUCCESSFUL
+ : EsqlExecutionInfo.Cluster.Status.SKIPPED;
+ executionInfo.swapCluster(
+ c,
+ (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status)
+ .setTook(new TimeValue(0))
+ .setTotalShards(0)
+ .setSuccessfulShards(0)
+ .setSkippedShards(0)
+ .setFailedShards(0)
+ .build()
+ );
+ }
+ }
+
+ // visible for testing
+ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) {
+ // TODO: this logic assumes a single phase execution model, so it may need to altered once INLINESTATS is made CCS compatible
+ if (execInfo.isCrossClusterSearch()) {
+ execInfo.markEndPlanning();
+ for (String clusterAlias : execInfo.clusterAliases()) {
+ EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias);
+ if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
+ execInfo.swapCluster(
+ clusterAlias,
+ (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime())
+ .setTotalShards(0)
+ .setSuccessfulShards(0)
+ .setSkippedShards(0)
+ .setFailedShards(0)
+ .build()
+ );
+ }
+ }
+ }
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
index 1e78f454b7531..a4405c32ff91c 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
@@ -7,16 +7,15 @@
package org.elasticsearch.xpack.esql.session;
-import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
-import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.regex.Regex;
-import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
@@ -25,9 +24,6 @@
import org.elasticsearch.indices.IndicesExpressionGrouper;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
-import org.elasticsearch.transport.ConnectTransportException;
-import org.elasticsearch.transport.RemoteClusterAware;
-import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
@@ -62,24 +58,24 @@
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.Keep;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.esql.plan.logical.Phased;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
+import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
+import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
+import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
-import org.elasticsearch.xpack.esql.planner.Mapper;
+import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.stats.PlanningMetrics;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -91,6 +87,14 @@ public class EsqlSession {
private static final Logger LOGGER = LogManager.getLogger(EsqlSession.class);
+ /**
+ * Interface for running the underlying plan.
+ * Abstracts away the underlying execution engine.
+ */
+ public interface PlanRunner {
+ void run(PhysicalPlan plan, ActionListener listener);
+ }
+
private final String sessionId;
private final Configuration configuration;
private final IndexResolver indexResolver;
@@ -140,158 +144,107 @@ public String sessionId() {
/**
* Execute an ESQL request.
*/
- public void execute(
- EsqlQueryRequest request,
- EsqlExecutionInfo executionInfo,
- BiConsumer> runPhase,
- ActionListener listener
- ) {
+ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener listener) {
LOGGER.debug("ESQL query:\n{}", request.query());
analyzedPlan(
parse(request.query(), request.params()),
executionInfo,
- new LogicalPlanActionListener(request, executionInfo, runPhase, listener)
- );
- }
-
- /**
- * ActionListener that receives LogicalPlan or error from logical planning.
- * Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so
- * the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error.
- */
- class LogicalPlanActionListener implements ActionListener {
- private final EsqlQueryRequest request;
- private final EsqlExecutionInfo executionInfo;
- private final BiConsumer> runPhase;
- private final ActionListener listener;
-
- LogicalPlanActionListener(
- EsqlQueryRequest request,
- EsqlExecutionInfo executionInfo,
- BiConsumer> runPhase,
- ActionListener listener
- ) {
- this.request = request;
- this.executionInfo = executionInfo;
- this.runPhase = runPhase;
- this.listener = listener;
- }
-
- @Override
- public void onResponse(LogicalPlan analyzedPlan) {
- executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(analyzedPlan), listener);
- }
-
- /**
- * Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
- *
- * For cases where field-caps had no indices to search and the remotes were unavailable, we
- * return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
- *
- * Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
- * on any of the requested clusters.
- */
- private boolean returnSuccessWithEmptyResult(Exception e) {
- if (executionInfo.isCrossClusterSearch() == false) {
- return false;
- }
-
- if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
- for (String clusterAlias : executionInfo.clusterAliases()) {
- if (executionInfo.isSkipUnavailable(clusterAlias) == false
- && clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
- return false;
- }
- }
- return true;
- }
- return false;
- }
-
- @Override
- public void onFailure(Exception e) {
- if (returnSuccessWithEmptyResult(e)) {
- executionInfo.markEndQuery();
- Exception exceptionForResponse;
- if (e instanceof ConnectTransportException) {
- // when field-caps has no field info (since no clusters could be connected to or had matching indices)
- // it just throws the first exception in its list, so this odd special handling is here is to avoid
- // having one specific remote alias name in all failure lists in the metadata response
- exceptionForResponse = new RemoteTransportException(
- "connect_transport_exception - unable to connect to remote cluster",
- null
- );
- } else {
- exceptionForResponse = e;
- }
- for (String clusterAlias : executionInfo.clusterAliases()) {
- executionInfo.swapCluster(clusterAlias, (k, v) -> {
- EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(
- executionInfo.overallTook()
- ).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0);
- if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
- // never mark local cluster as skipped
- builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
- } else {
- builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED);
- // add this exception to the failures list only if there is no failure already recorded there
- if (v.getFailures() == null || v.getFailures().size() == 0) {
- builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
- }
- }
- return builder.build();
- });
+ new CcsUtils.CssPartialErrorsActionListener(executionInfo, listener) {
+ @Override
+ public void onResponse(LogicalPlan analyzedPlan) {
+ executeOptimizedPlan(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener);
}
- listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo));
- } else {
- listener.onFailure(e);
}
- }
+ );
}
/**
* Execute an analyzed plan. Most code should prefer calling {@link #execute} but
- * this is public for testing. See {@link Phased} for the sequence of operations.
+ * this is public for testing.
*/
public void executeOptimizedPlan(
EsqlQueryRequest request,
EsqlExecutionInfo executionInfo,
- BiConsumer> runPhase,
+ PlanRunner planRunner,
LogicalPlan optimizedPlan,
ActionListener listener
) {
- LogicalPlan firstPhase = Phased.extractFirstPhase(optimizedPlan);
- updateExecutionInfoAtEndOfPlanning(executionInfo);
- if (firstPhase == null) {
- runPhase.accept(logicalPlanToPhysicalPlan(optimizedPlan, request), listener);
+ PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
+ // TODO: this could be snuck into the underlying listener
+ CcsUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
+ // execute any potential subplans
+ executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener);
+ }
+
+ private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {};
+
+ private void executeSubPlans(
+ PhysicalPlan physicalPlan,
+ PlanRunner runner,
+ EsqlExecutionInfo executionInfo,
+ EsqlQueryRequest request,
+ ActionListener listener
+ ) {
+ List subplans = new ArrayList<>();
+
+ // Currently the inlinestats are limited and supported as streaming operators, thus present inside the fragment as logical plans
+ // Below they get collected, translated into a separate, coordinator based plan and the results 'broadcasted' as a local relation
+ physicalPlan.forEachUp(FragmentExec.class, f -> {
+ f.fragment().forEachUp(InlineJoin.class, ij -> {
+ // extract the right side of the plan and replace its source
+ LogicalPlan subplan = InlineJoin.replaceStub(ij.left(), ij.right());
+ // mark the new root node as optimized
+ subplan.setOptimized();
+ PhysicalPlan subqueryPlan = logicalPlanToPhysicalPlan(subplan, request);
+ subplans.add(new PlanTuple(subqueryPlan, ij.right()));
+ });
+ });
+
+ Iterator iterator = subplans.iterator();
+
+ // TODO: merge into one method
+ if (subplans.size() > 0) {
+ // code-path to execute subplans
+ executeSubPlan(new ArrayList<>(), physicalPlan, iterator, executionInfo, runner, listener);
} else {
- executePhased(new ArrayList<>(), optimizedPlan, request, executionInfo, firstPhase, runPhase, listener);
+ // execute main plan
+ runner.run(physicalPlan, listener);
}
}
- private void executePhased(
+ private void executeSubPlan(
List profileAccumulator,
- LogicalPlan mainPlan,
- EsqlQueryRequest request,
+ PhysicalPlan plan,
+ Iterator subPlanIterator,
EsqlExecutionInfo executionInfo,
- LogicalPlan firstPhase,
- BiConsumer> runPhase,
+ PlanRunner runner,
ActionListener listener
) {
- PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan(firstPhase), request);
- runPhase.accept(physicalPlan, listener.delegateFailureAndWrap((next, result) -> {
+ PlanTuple tuple = subPlanIterator.next();
+
+ runner.run(tuple.physical, listener.delegateFailureAndWrap((next, result) -> {
try {
profileAccumulator.addAll(result.profiles());
- LogicalPlan newMainPlan = optimizedPlan(Phased.applyResultsFromFirstPhase(mainPlan, physicalPlan.output(), result.pages()));
- LogicalPlan newFirstPhase = Phased.extractFirstPhase(newMainPlan);
- if (newFirstPhase == null) {
- PhysicalPlan finalPhysicalPlan = logicalPlanToPhysicalPlan(newMainPlan, request);
- runPhase.accept(finalPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
+ LocalRelation resultWrapper = resultToPlan(tuple.logical, result);
+
+ // replace the original logical plan with the backing result
+ final PhysicalPlan newPlan = plan.transformUp(FragmentExec.class, f -> {
+ LogicalPlan frag = f.fragment();
+ return f.withFragment(
+ frag.transformUp(
+ InlineJoin.class,
+ ij -> ij.right() == tuple.logical ? InlineJoin.inlineData(ij, resultWrapper) : ij
+ )
+ );
+ });
+ if (subPlanIterator.hasNext() == false) {
+ runner.run(newPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
profileAccumulator.addAll(finalResult.profiles());
finalListener.onResponse(new Result(finalResult.schema(), finalResult.pages(), profileAccumulator, executionInfo));
}));
} else {
- executePhased(profileAccumulator, newMainPlan, request, executionInfo, newFirstPhase, runPhase, next);
+ // continue executing the subplans
+ executeSubPlan(profileAccumulator, newPlan, subPlanIterator, executionInfo, runner, next);
}
} finally {
Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks)));
@@ -299,6 +252,14 @@ private void executePhased(
}));
}
+ private LocalRelation resultToPlan(LogicalPlan plan, Result result) {
+ List pages = result.pages();
+ List schema = result.schema();
+ // if (pages.size() > 1) {
+ Block[] blocks = SessionUtils.fromPages(schema, pages);
+ return new LocalRelation(plan.source(), schema, LocalSupplier.of(blocks));
+ }
+
private LogicalPlan parse(String query, QueryParams params) {
var parsed = new EsqlParser().createStatement(query, params);
LOGGER.debug("Parsed logical plan:\n{}", parsed);
@@ -347,8 +308,8 @@ private void preAnalyze(
// TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index
// resolution to updateExecutionInfo
if (indexResolution.isValid()) {
- updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
- updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
+ CcsUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
+ CcsUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
if (executionInfo.isCrossClusterSearch()
&& executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel
@@ -422,7 +383,7 @@ private void preAnalyzeIndices(
}
// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
// based only on available clusters (which could now be an empty list)
- String indexExpressionToResolve = createIndexExpressionFromAvailableClusters(executionInfo);
+ String indexExpressionToResolve = CcsUtils.createIndexExpressionFromAvailableClusters(executionInfo);
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())));
@@ -440,30 +401,6 @@ private void preAnalyzeIndices(
}
}
- // visible for testing
- static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
- StringBuilder sb = new StringBuilder();
- for (String clusterAlias : executionInfo.clusterAliases()) {
- EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
- if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
- if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
- sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
- } else {
- String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression();
- for (String index : indexExpression.split(",")) {
- sb.append(clusterAlias).append(':').append(index).append(',');
- }
- }
- }
- }
-
- if (sb.length() > 0) {
- return sb.substring(0, sb.length() - 1);
- } else {
- return "";
- }
- }
-
static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchFields) {
if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) {
// no explicit columns selection, for example "from employees"
@@ -607,86 +544,4 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
LOGGER.debug("Optimized physical plan:\n{}", plan);
return plan;
}
-
- static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map unavailable) {
- for (Map.Entry entry : unavailable.entrySet()) {
- String clusterAlias = entry.getKey();
- boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable();
- RemoteTransportException e = new RemoteTransportException(
- Strings.format("Remote cluster [%s] (with setting skip_unavailable=%s) is not available", clusterAlias, skipUnavailable),
- entry.getValue().getException()
- );
- if (skipUnavailable) {
- execInfo.swapCluster(
- clusterAlias,
- (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED)
- .setTotalShards(0)
- .setSuccessfulShards(0)
- .setSkippedShards(0)
- .setFailedShards(0)
- .setFailures(List.of(new ShardSearchFailure(e)))
- .build()
- );
- } else {
- throw e;
- }
- }
- }
-
- // visible for testing
- static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
- Set clustersWithResolvedIndices = new HashSet<>();
- // determine missing clusters
- for (String indexName : indexResolution.get().indexNameWithModes().keySet()) {
- clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName));
- }
- Set clustersRequested = executionInfo.clusterAliases();
- Set clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
- clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters().keySet());
- /*
- * These are clusters in the original request that are not present in the field-caps response. They were
- * specified with an index or indices that do not exist, so the search on that cluster is done.
- * Mark it as SKIPPED with 0 shards searched and took=0.
- */
- for (String c : clustersWithNoMatchingIndices) {
- // TODO: in a follow-on PR, throw a Verification(400 status code) for local and remotes with skip_unavailable=false if
- // they were requested with one or more concrete indices
- // for now we never mark the local cluster as SKIPPED
- final var status = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(c)
- ? EsqlExecutionInfo.Cluster.Status.SUCCESSFUL
- : EsqlExecutionInfo.Cluster.Status.SKIPPED;
- executionInfo.swapCluster(
- c,
- (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status)
- .setTook(new TimeValue(0))
- .setTotalShards(0)
- .setSuccessfulShards(0)
- .setSkippedShards(0)
- .setFailedShards(0)
- .build()
- );
- }
- }
-
- // visible for testing
- static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) {
- // TODO: this logic assumes a single phase execution model, so it may need to altered once INLINESTATS is made CCS compatible
- if (execInfo.isCrossClusterSearch()) {
- execInfo.markEndPlanning();
- for (String clusterAlias : execInfo.clusterAliases()) {
- EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias);
- if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
- execInfo.swapCluster(
- clusterAlias,
- (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime())
- .setTotalShards(0)
- .setSuccessfulShards(0)
- .setSkippedShards(0)
- .setFailedShards(0)
- .build()
- );
- }
- }
- }
- }
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java
new file mode 100644
index 0000000000000..85abc635967a6
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.session;
+
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockUtils;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.planner.PlannerUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SessionUtils {
+
+ private SessionUtils() {}
+
+ public static Block[] fromPages(List schema, List pages) {
+ // Limit ourselves to 1mb of results similar to LOOKUP for now.
+ long bytesUsed = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum();
+ if (bytesUsed > ByteSizeValue.ofMb(1).getBytes()) {
+ throw new IllegalArgumentException("first phase result too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb");
+ }
+ int positionCount = pages.stream().mapToInt(Page::getPositionCount).sum();
+ Block.Builder[] builders = new Block.Builder[schema.size()];
+ Block[] blocks;
+ try {
+ for (int b = 0; b < builders.length; b++) {
+ builders[b] = PlannerUtils.toElementType(schema.get(b).dataType())
+ .newBlockBuilder(positionCount, PlannerUtils.NON_BREAKING_BLOCK_FACTORY);
+ }
+ for (Page p : pages) {
+ for (int b = 0; b < builders.length; b++) {
+ builders[b].copyFrom(p.getBlock(b), 0, p.getPositionCount());
+ }
+ }
+ blocks = Block.Builder.buildAll(builders);
+ } finally {
+ Releasables.closeExpectNoException(builders);
+ }
+ return blocks;
+ }
+
+ public static List