Skip to content

Commit

Permalink
[CALCITE-4448] Use TableMacro user-defined table functions with Query…
Browse files Browse the repository at this point in the history
…ableTable
  • Loading branch information
vvysotskyi committed Jun 13, 2022
1 parent b0b3e3e commit d1cf4a3
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 157 deletions.
79 changes: 37 additions & 42 deletions core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.TableExpressionFactory;
import org.apache.calcite.materialize.Lattice;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
Expand All @@ -34,11 +35,8 @@
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.schema.ColumnStrategy;
import org.apache.calcite.schema.FilterableTable;
import org.apache.calcite.schema.ModifiableTable;
import org.apache.calcite.schema.Path;
import org.apache.calcite.schema.ProjectableFilterableTable;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
Expand Down Expand Up @@ -66,7 +64,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

Expand All @@ -77,7 +74,7 @@ public class RelOptTableImpl extends Prepare.AbstractPreparingTable {
private final @Nullable RelOptSchema schema;
private final RelDataType rowType;
private final @Nullable Table table;
private final @Nullable Function<Class, Expression> expressionFunction;
private final @Nullable TableExpressionFactory tableExpressionFactory;
private final ImmutableList<String> names;

/** Estimate for the row count, or null.
Expand All @@ -94,13 +91,13 @@ private RelOptTableImpl(
RelDataType rowType,
List<String> names,
@Nullable Table table,
@Nullable Function<Class, Expression> expressionFunction,
@Nullable TableExpressionFactory tableExpressionFactory,
@Nullable Double rowCount) {
this.schema = schema;
this.rowType = requireNonNull(rowType, "rowType");
this.names = ImmutableList.copyOf(names);
this.table = table; // may be null
this.expressionFunction = expressionFunction; // may be null
this.tableExpressionFactory = tableExpressionFactory; // may be null
this.rowCount = rowCount; // may be null
}

Expand All @@ -113,37 +110,61 @@ public static RelOptTableImpl create(
c -> expression, null);
}

@Deprecated // to be removed before 2.0
public static RelOptTableImpl create(
@Nullable RelOptSchema schema,
RelDataType rowType,
List<String> names,
Table table,
Expression expression) {
return create(schema, rowType, names, table, c -> expression);
}

/**
* Creates {@link RelOptTableImpl} instance with specified arguments
* and row count obtained from table statistic.
*
* @param schema table schema
* @param rowType table row type
* @param names full table path
* @param table table
* @param expressionFactory expression function for accessing table data
* in the generated code
*
* @return {@link RelOptTableImpl} instance
*/
public static RelOptTableImpl create(
@Nullable RelOptSchema schema,
RelDataType rowType,
List<String> names,
Table table,
TableExpressionFactory expressionFactory) {
return new RelOptTableImpl(schema, rowType, names, table,
c -> expression, table.getStatistic().getRowCount());
expressionFactory, table.getStatistic().getRowCount());
}

public static RelOptTableImpl create(@Nullable RelOptSchema schema, RelDataType rowType,
Table table, Path path) {
final SchemaPlus schemaPlus = MySchemaPlus.create(path);
return new RelOptTableImpl(schema, rowType, Pair.left(path), table,
getClassExpressionFunction(schemaPlus, Util.last(path).left, table),
c -> Schemas.getTableExpression(schemaPlus, Util.last(path).left, table, c),
table.getStatistic().getRowCount());
}

public static RelOptTableImpl create(@Nullable RelOptSchema schema, RelDataType rowType,
final CalciteSchema.TableEntry tableEntry, @Nullable Double rowCount) {
final Table table = tableEntry.getTable();
return new RelOptTableImpl(schema, rowType, tableEntry.path(),
table, getClassExpressionFunction(tableEntry, table), rowCount);
return new RelOptTableImpl(schema, rowType, tableEntry.path(), table,
c -> Schemas.getTableExpression(tableEntry.schema.plus(), tableEntry.name, table, c),
rowCount);
}

/**
* Creates a copy of this RelOptTable. The new RelOptTable will have newRowType.
*/
public RelOptTableImpl copy(RelDataType newRowType) {
return new RelOptTableImpl(this.schema, newRowType, this.names, this.table,
this.expressionFunction, this.rowCount);
this.tableExpressionFactory, this.rowCount);
}

@Override public String toString() {
Expand All @@ -155,32 +176,6 @@ public RelOptTableImpl copy(RelDataType newRowType) {
+ '}';
}

private static Function<Class, Expression> getClassExpressionFunction(
CalciteSchema.TableEntry tableEntry, Table table) {
return getClassExpressionFunction(tableEntry.schema.plus(), tableEntry.name,
table);
}

private static Function<Class, Expression> getClassExpressionFunction(
final SchemaPlus schema, final String tableName, final Table table) {
if (table instanceof QueryableTable) {
final QueryableTable queryableTable = (QueryableTable) table;
return clazz -> queryableTable.getExpression(schema, tableName, clazz);
} else if (table instanceof ScannableTable
|| table instanceof FilterableTable
|| table instanceof ProjectableFilterableTable) {
return clazz -> Schemas.tableExpression(schema, Object[].class, tableName,
table.getClass());
} else if (table instanceof StreamableTable) {
return getClassExpressionFunction(schema, tableName,
((StreamableTable) table).stream());
} else {
return input -> {
throw new UnsupportedOperationException();
};
}
}

public static RelOptTableImpl create(@Nullable RelOptSchema schema,
RelDataType rowType, Table table, ImmutableList<String> names) {
assert table instanceof TranslatableTable
Expand Down Expand Up @@ -211,18 +206,18 @@ public static RelOptTableImpl create(@Nullable RelOptSchema schema,
}

@Override public @Nullable Expression getExpression(Class clazz) {
if (expressionFunction == null) {
if (tableExpressionFactory == null) {
return null;
}
return expressionFunction.apply(clazz);
return tableExpressionFactory.create(clazz);
}

@Override protected RelOptTable extend(Table extendedTable) {
RelOptSchema schema = requireNonNull(getRelOptSchema(), "relOptSchema");
final RelDataType extendedRowType =
extendedTable.getRowType(schema.getTypeFactory());
return new RelOptTableImpl(schema, extendedRowType, getQualifiedName(),
extendedTable, expressionFunction, getRowCount());
extendedTable, tableExpressionFactory, getRowCount());
}

@Override public boolean equals(@Nullable Object obj) {
Expand Down Expand Up @@ -274,7 +269,7 @@ public static RelOptTableImpl create(@Nullable RelOptSchema schema,
}
final RelOptTable relOptTable =
new RelOptTableImpl(this.schema, b.build(), this.names, this.table,
this.expressionFunction, this.rowCount) {
this.tableExpressionFactory, this.rowCount) {
@Override public <T extends Object> @Nullable T unwrap(Class<T> clazz) {
if (clazz.isAssignableFrom(InitializerExpressionFactory.class)) {
return clazz.cast(NullInitializerExpressionFactory.INSTANCE);
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/org/apache/calcite/schema/Schemas.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,35 @@ public static Expression tableExpression(SchemaPlus schema, Type elementType,
return EnumUtils.convert(expression, clazz);
}

/**
* Generates an expression with which table can be referenced in
* generated code.
*
* @param schema Schema
* @param tableName Table name (unique within schema)
* @param table Table to be referenced
* @param clazz Class that provides specific methods for accessing table data.
* It may differ from the {@code table} class; for example {@code clazz} may be
* {@code MongoTable.MongoQueryable}, though {@code table} is {@code MongoTable}
*/
public static Expression getTableExpression(SchemaPlus schema, String tableName,
Table table, Class<?> clazz) {
if (table instanceof QueryableTable) {
QueryableTable queryableTable = (QueryableTable) table;
return queryableTable.getExpression(schema, tableName, clazz);
} else if (table instanceof ScannableTable
|| table instanceof FilterableTable
|| table instanceof ProjectableFilterableTable) {
return tableExpression(schema, Object[].class, tableName,
table.getClass());
} else if (table instanceof StreamableTable) {
return getTableExpression(schema, tableName,
((StreamableTable) table).stream(), clazz);
} else {
throw new UnsupportedOperationException();
}
}

public static DataContext createDataContext(
Connection connection, @Nullable SchemaPlus rootSchema) {
return DataContexts.of((CalciteConnection) connection, rootSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.apache.calcite.sql2rel;

import org.apache.calcite.avatica.util.Spaces;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.linq4j.tree.TableExpressionFactory;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptSamplingParameters;
Expand Down Expand Up @@ -90,6 +92,7 @@
import org.apache.calcite.schema.ColumnStrategy;
import org.apache.calcite.schema.ModifiableTable;
import org.apache.calcite.schema.ModifiableView;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.Wrapper;
Expand Down Expand Up @@ -2667,8 +2670,13 @@ protected void convertCollectionTable(
(SqlUserDefinedTableMacro) operator;
final TranslatableTable table = udf.getTable(callBinding);
final RelDataType rowType = table.getRowType(typeFactory);
RelOptTable relOptTable = RelOptTableImpl.create(null, rowType, table,
udf.getNameAsId().names);
CalciteSchema schema = Schemas.subSchema(
catalogReader.getRootSchema(), udf.getNameAsId().skipLast(1).names);
TableExpressionFactory expressionFunction =
clazz -> Schemas.getTableExpression(Objects.requireNonNull(schema, "schema").plus(),
Util.last(udf.getNameAsId().names), table, clazz);
RelOptTable relOptTable = RelOptTableImpl.create(null, rowType,
udf.getNameAsId().names, table, expressionFunction);
RelNode converted = toRel(relOptTable, ImmutableList.of());
bb.setRoot(converted, true);
return;
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/org/apache/calcite/test/JdbcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7262,8 +7262,8 @@ private void checkGetTimestamp(Connection con) throws SQLException {
}

// add tables and retrieve with various case sensitivities
final TableInRootSchemaTest.SimpleTable table =
new TableInRootSchemaTest.SimpleTable();
final Smalls.SimpleTable table =
new Smalls.SimpleTable();
a2Schema.add("table1", table);
a2Schema.add("TABLE1", table);
a2Schema.add("tabLe1", table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.calcite.test;

import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.util.Smalls;

import com.google.common.collect.ImmutableMultiset;

Expand All @@ -43,7 +44,7 @@ class RelMdColumnOriginsTest {
connection.unwrap(CalciteConnection.class);

calciteConnection.getRootSchema().add("T1",
new TableInRootSchemaTest.SimpleTable());
new Smalls.SimpleTable());
Statement statement = calciteConnection.createStatement();
ResultSet resultSet =
statement.executeQuery("SELECT TABLE1.ID, TABLE2.ID FROM "
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/java/org/apache/calcite/test/TableFunctionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -547,4 +547,26 @@ private Connection getConnectionWithMultiplyFunction() throws SQLException {
assertThat(CalciteAssert.toString(resultSet), equalTo(expected));
}
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-4448">[CALCITE-4448]
* Use TableMacro user-defined table functions with QueryableTable</a>. */
@Test void testQueryableTableWithTableMacro() throws SQLException {
try (Connection connection =
DriverManager.getConnection("jdbc:calcite:")) {
CalciteConnection calciteConnection =
connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
SchemaPlus schema = rootSchema.add("s", new AbstractSchema());
schema.add("simple", new Smalls.SimpleTableMacro());

String sql = "select * from table(\"s\".\"simple\"())";
ResultSet resultSet = connection.createStatement().executeQuery(sql);
String expected = "A=foo; B=5\n"
+ "A=bar; B=4\n"
+ "A=foo; B=3\n";
assertThat(CalciteAssert.toString(resultSet),
equalTo(expected));
}
}
}
Loading

0 comments on commit d1cf4a3

Please sign in to comment.