diff --git a/deploy/dev/kafka.yaml b/deploy/dev/kafka.yaml index 5fd9089..33441ff 100644 --- a/deploy/dev/kafka.yaml +++ b/deploy/dev/kafka.yaml @@ -23,7 +23,7 @@ metadata: namespace: kafka spec: kafka: - version: 3.6.1 + version: 3.8.0 replicas: 1 listeners: - name: plain @@ -40,7 +40,7 @@ spec: transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 - inter.broker.protocol.version: "3.4" + inter.broker.protocol.version: "3.8" allow.everyone.if.no.acl.found: true storage: type: ephemeral diff --git a/deploy/hoptimator-operator-deployment.yaml b/deploy/hoptimator-operator-deployment.yaml index 8487f4f..75e84b0 100644 --- a/deploy/hoptimator-operator-deployment.yaml +++ b/deploy/hoptimator-operator-deployment.yaml @@ -19,7 +19,7 @@ spec: - name: hoptimator-operator image: docker.io/library/hoptimator imagePullPolicy: Never - command: ["./hoptimator-operator-integration/bin/hoptimator-operator-integration", "/etc/config/model.yaml"] + command: ["./hoptimator-operator-integration/bin/hoptimator-operator-integration", "jdbc:calcite:model=/etc/config/model.yaml"] volumeMounts: - name: config-volume mountPath: /etc/config diff --git a/hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java b/hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java index b0702e4..0407712 100644 --- a/hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java +++ b/hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java @@ -137,7 +137,7 @@ public void execute(String line, DispatchCallback dispatchCallback) { String connectionUrl = sqlline.getConnectionMetadata().getUrl(); try { - HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties); + HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties); RelNode plan = planner.logical(sql); String avroSchema = AvroConverter.avro("OutputNamespace", "OutputName", plan.getRowType()).toString(true); sqlline.output(avroSchema); @@ -205,7 +205,7 @@ public void execute(String line, DispatchCallback dispatchCallback) { String connectionUrl = sqlline.getConnectionMetadata().getUrl(); try { InsertInto insertInto = parseInsertInto(sql); - HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties); + HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties); PipelineRel plan = planner.pipeline(insertInto.query); PipelineRel.Implementor impl = new PipelineRel.Implementor(plan); HopTable sink = planner.database(insertInto.database) @@ -280,7 +280,7 @@ public void execute(String line, DispatchCallback dispatchCallback) { String connectionUrl = sqlline.getConnectionMetadata().getUrl(); try { InsertInto insertInto = parseInsertInto(sql); - HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties); + HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties); PipelineRel plan = planner.pipeline(insertInto.query); sqlline.output("PLAN:"); sqlline.output(plan.explain()); @@ -383,7 +383,7 @@ public void execute(String line, DispatchCallback dispatchCallback) { throw new IllegalArgumentException("Expected one of 'not', 'empty', or 'value'"); } - HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties); + HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties); PipelineRel plan = planner.pipeline(query); PipelineRel.Implementor impl = new PipelineRel.Implementor(plan); String pipelineSql = impl.query().sql(MysqlSqlDialect.DEFAULT); @@ -475,7 +475,7 @@ public void execute(String line, DispatchCallback dispatchCallback) { String connectionUrl = sqlline.getConnectionMetadata().getUrl(); try { InsertInto insertInto = parseInsertInto(sql); - HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties); + HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties); PipelineRel plan = planner.pipeline(insertInto.query); PipelineRel.Implementor impl = new PipelineRel.Implementor(plan); HopTable sink = planner.database(insertInto.database) @@ -607,7 +607,7 @@ public void execute(String line, DispatchCallback dispatchCallback) { String connectionUrl = sqlline.getConnectionMetadata().getUrl(); try { InsertInto insertInto = parseInsertInto(sql); - HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties); + HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties); PipelineRel plan = planner.pipeline(insertInto.query); PipelineRel.Implementor impl = new PipelineRel.Implementor(plan); HopTable sink = planner.database(insertInto.database) diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java index bb0734b..b4f2867 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java @@ -33,7 +33,7 @@ public class HoptimatorOperatorApp { private static final Logger log = LoggerFactory.getLogger(HoptimatorOperatorApp.class); - final String modelPath; + final String url; final String namespace; final ApiClient apiClient; final Predicate subscriptionFilter; @@ -41,9 +41,9 @@ public class HoptimatorOperatorApp { final Resource.Environment environment; /** This constructor is likely to evolve and break. */ - public HoptimatorOperatorApp(String modelPath, String namespace, ApiClient apiClient, + public HoptimatorOperatorApp(String url, String namespace, ApiClient apiClient, Predicate subscriptionFilter, Properties properties) { - this.modelPath = modelPath; + this.url = url; this.namespace = namespace; this.apiClient = apiClient; this.subscriptionFilter = subscriptionFilter; @@ -53,7 +53,7 @@ public HoptimatorOperatorApp(String modelPath, String namespace, ApiClient apiCl public static void main(String[] args) throws Exception { if (args.length < 1) { - throw new IllegalArgumentException("Missing model file argument."); + throw new IllegalArgumentException("Missing JDBC URL argument."); } Options options = new Options(); @@ -76,18 +76,17 @@ public static void main(String[] args) throws Exception { return; } - String modelFileInput = cmd.getArgs()[0]; + String urlInput = cmd.getArgs()[0]; String namespaceInput = cmd.getOptionValue("namespace", "default"); - new HoptimatorOperatorApp(modelFileInput, namespaceInput, Config.defaultClient(), null, + new HoptimatorOperatorApp(urlInput, namespaceInput, Config.defaultClient(), null, new Properties()).run(); } public void run() throws Exception { - HoptimatorPlanner.Factory plannerFactory = HoptimatorPlanner.Factory.fromModelFile(modelPath, - properties); + HoptimatorPlanner.Factory plannerFactory = HoptimatorPlanner.Factory.fromJdbc(url, properties); - // ensure model file works, and that static classes are initialized in the main thread + // ensure JDBC connection works, and that static classes are initialized in the main thread HoptimatorPlanner planner = plannerFactory.makePlanner(); apiClient.setHttpClient(apiClient.getHttpClient().newBuilder() diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java index f663c1d..7e15ca5 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java @@ -1,9 +1,12 @@ package com.linkedin.hoptimator.planner; +import org.apache.calcite.adapter.jdbc.JdbcSchema; +import org.apache.calcite.adapter.jdbc.JdbcCatalogSchema; import org.apache.calcite.config.CalciteConnectionConfig; import org.apache.calcite.config.CalciteConnectionConfigImpl; import org.apache.calcite.jdbc.Driver; import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.model.ModelHandler; import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.rules.CoreRules; @@ -16,7 +19,6 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.model.ModelHandler; import org.apache.calcite.tools.RuleSet; import org.apache.calcite.tools.RuleSets; import org.apache.calcite.tools.Frameworks; @@ -26,11 +28,14 @@ import com.linkedin.hoptimator.catalog.Database; import com.linkedin.hoptimator.catalog.DatabaseSchema; +import java.io.IOException; import java.util.Arrays; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.NoSuchElementException; +import java.sql.SQLException; +import javax.sql.DataSource; /** A one-shot stateful object, which creates Pipelines from SQL. */ public class HoptimatorPlanner { @@ -67,8 +72,20 @@ public class HoptimatorPlanner { public interface Factory { HoptimatorPlanner makePlanner() throws Exception; - static Factory fromModelFile(String filePath, Properties properties) { - return () -> HoptimatorPlanner.fromModelFile(filePath, properties); + static Factory fromSchema(String catalog, Schema schema) { + return () -> HoptimatorPlanner.fromSchema(catalog, schema); + } + + static Factory fromDataSource(String catalog, DataSource dataSource) { + return () -> HoptimatorPlanner.fromDataSource(catalog, dataSource); + } + + static Factory fromJdbc(String url, String catalog, String username, String password) { + return () -> HoptimatorPlanner.fromJdbc(url, catalog, username, password); + } + + static Factory fromJdbc(String url, Properties properties) { + return () -> HoptimatorPlanner.fromJdbc(url, properties); } } @@ -131,22 +148,38 @@ public Database database(String name) { return ((DatabaseSchema) subSchema).database(); } - public static HoptimatorPlanner fromModelFile(String filePath, Properties properties) throws Exception { - String uri = filePath; - if (uri.startsWith("jdbc:calcite:model=")) { - uri = uri.substring("jdbc:calcite:model=".length()); - } + public static HoptimatorPlanner fromSchema(String name, Schema schema) { + SchemaPlus rootSchema = Frameworks.createRootSchema(true); + rootSchema.add(name == null ? "ROOT" : name, schema); + return new HoptimatorPlanner(rootSchema); + } + + public static HoptimatorPlanner fromDataSource(String catalog, DataSource dataSource) { + Schema schema = JdbcCatalogSchema.create(null, catalog, dataSource, catalog); + return fromSchema(catalog, schema); + } + + public static HoptimatorPlanner fromModelFile(String filePath, Properties properties) + throws SQLException, IOException { Driver driver = new Driver(); CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(properties); CalciteConnection connection = (CalciteConnection) driver.connect("jdbc:calcite:", properties); SchemaPlus schema = connection.getRootSchema(); - ModelHandler modelHandler = new ModelHandler(connection, uri); // side-effect: modifies connection + ModelHandler modelHandler = new ModelHandler(connection, filePath); // side-effect: modifies connection return new HoptimatorPlanner(schema); } - public static HoptimatorPlanner fromSchema(String name, Schema schema) { - SchemaPlus rootSchema = Frameworks.createRootSchema(true); - rootSchema.add(name, schema); - return new HoptimatorPlanner(rootSchema); + public static HoptimatorPlanner fromJdbc(String url, String catalog, String username, String password) { + DataSource dataSource = JdbcSchema.dataSource(url, null, username, password); + return fromDataSource(catalog, dataSource); + } + + public static HoptimatorPlanner fromJdbc(String url, Properties properties) throws SQLException, IOException { + if (url.startsWith("jdbc:calcite:model=")) { + return fromModelFile(url.substring("jdbc:calcite:model=".length()), properties); + } else { + return fromJdbc(url, properties.getProperty("catalog"), properties.getProperty("username"), + properties.getProperty("password")); + } } }