Skip to content

Commit

Permalink
Support loading arbitrary JDBC connections
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Nov 5, 2024
1 parent 377908f commit 4fb0254
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 31 deletions.
4 changes: 2 additions & 2 deletions deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ metadata:
namespace: kafka
spec:
kafka:
version: 3.6.1
version: 3.8.0
replicas: 1
listeners:
- name: plain
Expand All @@ -40,7 +40,7 @@ spec:
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.4"
inter.broker.protocol.version: "3.8"
allow.everyone.if.no.acl.found: true
storage:
type: ephemeral
Expand Down
2 changes: 1 addition & 1 deletion deploy/hoptimator-operator-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ spec:
- name: hoptimator-operator
image: docker.io/library/hoptimator
imagePullPolicy: Never
command: ["./hoptimator-operator-integration/bin/hoptimator-operator-integration", "/etc/config/model.yaml"]
command: ["./hoptimator-operator-integration/bin/hoptimator-operator-integration", "jdbc:calcite:model=/etc/config/model.yaml"]
volumeMounts:
- name: config-volume
mountPath: /etc/config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {

String connectionUrl = sqlline.getConnectionMetadata().getUrl();
try {
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
RelNode plan = planner.logical(sql);
String avroSchema = AvroConverter.avro("OutputNamespace", "OutputName", plan.getRowType()).toString(true);
sqlline.output(avroSchema);
Expand Down Expand Up @@ -205,7 +205,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
String connectionUrl = sqlline.getConnectionMetadata().getUrl();
try {
InsertInto insertInto = parseInsertInto(sql);
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
PipelineRel plan = planner.pipeline(insertInto.query);
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
HopTable sink = planner.database(insertInto.database)
Expand Down Expand Up @@ -280,7 +280,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
String connectionUrl = sqlline.getConnectionMetadata().getUrl();
try {
InsertInto insertInto = parseInsertInto(sql);
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
PipelineRel plan = planner.pipeline(insertInto.query);
sqlline.output("PLAN:");
sqlline.output(plan.explain());
Expand Down Expand Up @@ -383,7 +383,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
throw new IllegalArgumentException("Expected one of 'not', 'empty', or 'value'");
}

HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
PipelineRel plan = planner.pipeline(query);
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
String pipelineSql = impl.query().sql(MysqlSqlDialect.DEFAULT);
Expand Down Expand Up @@ -475,7 +475,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
String connectionUrl = sqlline.getConnectionMetadata().getUrl();
try {
InsertInto insertInto = parseInsertInto(sql);
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
PipelineRel plan = planner.pipeline(insertInto.query);
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
HopTable sink = planner.database(insertInto.database)
Expand Down Expand Up @@ -607,7 +607,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
String connectionUrl = sqlline.getConnectionMetadata().getUrl();
try {
InsertInto insertInto = parseInsertInto(sql);
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
PipelineRel plan = planner.pipeline(insertInto.query);
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
HopTable sink = planner.database(insertInto.database)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@
public class HoptimatorOperatorApp {
private static final Logger log = LoggerFactory.getLogger(HoptimatorOperatorApp.class);

final String modelPath;
final String url;
final String namespace;
final ApiClient apiClient;
final Predicate<V1alpha1Subscription> subscriptionFilter;
final Properties properties;
final Resource.Environment environment;

/** This constructor is likely to evolve and break. */
public HoptimatorOperatorApp(String modelPath, String namespace, ApiClient apiClient,
public HoptimatorOperatorApp(String url, String namespace, ApiClient apiClient,
Predicate<V1alpha1Subscription> subscriptionFilter, Properties properties) {
this.modelPath = modelPath;
this.url = url;
this.namespace = namespace;
this.apiClient = apiClient;
this.subscriptionFilter = subscriptionFilter;
Expand All @@ -53,7 +53,7 @@ public HoptimatorOperatorApp(String modelPath, String namespace, ApiClient apiCl

public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("Missing model file argument.");
throw new IllegalArgumentException("Missing JDBC URL argument.");
}

Options options = new Options();
Expand All @@ -76,18 +76,17 @@ public static void main(String[] args) throws Exception {
return;
}

String modelFileInput = cmd.getArgs()[0];
String urlInput = cmd.getArgs()[0];
String namespaceInput = cmd.getOptionValue("namespace", "default");

new HoptimatorOperatorApp(modelFileInput, namespaceInput, Config.defaultClient(), null,
new HoptimatorOperatorApp(urlInput, namespaceInput, Config.defaultClient(), null,
new Properties()).run();
}

public void run() throws Exception {
HoptimatorPlanner.Factory plannerFactory = HoptimatorPlanner.Factory.fromModelFile(modelPath,
properties);
HoptimatorPlanner.Factory plannerFactory = HoptimatorPlanner.Factory.fromJdbc(url, properties);

// ensure model file works, and that static classes are initialized in the main thread
// ensure JDBC connection works, and that static classes are initialized in the main thread
HoptimatorPlanner planner = plannerFactory.makePlanner();

apiClient.setHttpClient(apiClient.getHttpClient().newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.linkedin.hoptimator.planner;

import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.adapter.jdbc.JdbcCatalogSchema;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.model.ModelHandler;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.rules.CoreRules;
Expand All @@ -16,7 +19,6 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.model.ModelHandler;
import org.apache.calcite.tools.RuleSet;
import org.apache.calcite.tools.RuleSets;
import org.apache.calcite.tools.Frameworks;
Expand All @@ -26,11 +28,14 @@
import com.linkedin.hoptimator.catalog.Database;
import com.linkedin.hoptimator.catalog.DatabaseSchema;

import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.NoSuchElementException;
import java.sql.SQLException;
import javax.sql.DataSource;

/** A one-shot stateful object, which creates Pipelines from SQL. */
public class HoptimatorPlanner {
Expand Down Expand Up @@ -67,8 +72,20 @@ public class HoptimatorPlanner {
public interface Factory {
HoptimatorPlanner makePlanner() throws Exception;

static Factory fromModelFile(String filePath, Properties properties) {
return () -> HoptimatorPlanner.fromModelFile(filePath, properties);
static Factory fromSchema(String catalog, Schema schema) {
return () -> HoptimatorPlanner.fromSchema(catalog, schema);
}

static Factory fromDataSource(String catalog, DataSource dataSource) {
return () -> HoptimatorPlanner.fromDataSource(catalog, dataSource);
}

static Factory fromJdbc(String url, String catalog, String username, String password) {
return () -> HoptimatorPlanner.fromJdbc(url, catalog, username, password);
}

static Factory fromJdbc(String url, Properties properties) {
return () -> HoptimatorPlanner.fromJdbc(url, properties);
}
}

Expand Down Expand Up @@ -131,22 +148,38 @@ public Database database(String name) {
return ((DatabaseSchema) subSchema).database();
}

public static HoptimatorPlanner fromModelFile(String filePath, Properties properties) throws Exception {
String uri = filePath;
if (uri.startsWith("jdbc:calcite:model=")) {
uri = uri.substring("jdbc:calcite:model=".length());
}
public static HoptimatorPlanner fromSchema(String name, Schema schema) {
SchemaPlus rootSchema = Frameworks.createRootSchema(true);
rootSchema.add(name == null ? "ROOT" : name, schema);
return new HoptimatorPlanner(rootSchema);
}

public static HoptimatorPlanner fromDataSource(String catalog, DataSource dataSource) {
Schema schema = JdbcCatalogSchema.create(null, catalog, dataSource, catalog);
return fromSchema(catalog, schema);
}

public static HoptimatorPlanner fromModelFile(String filePath, Properties properties)
throws SQLException, IOException {
Driver driver = new Driver();
CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(properties);
CalciteConnection connection = (CalciteConnection) driver.connect("jdbc:calcite:", properties);
SchemaPlus schema = connection.getRootSchema();
ModelHandler modelHandler = new ModelHandler(connection, uri); // side-effect: modifies connection
ModelHandler modelHandler = new ModelHandler(connection, filePath); // side-effect: modifies connection
return new HoptimatorPlanner(schema);
}

public static HoptimatorPlanner fromSchema(String name, Schema schema) {
SchemaPlus rootSchema = Frameworks.createRootSchema(true);
rootSchema.add(name, schema);
return new HoptimatorPlanner(rootSchema);
public static HoptimatorPlanner fromJdbc(String url, String catalog, String username, String password) {
DataSource dataSource = JdbcSchema.dataSource(url, null, username, password);
return fromDataSource(catalog, dataSource);
}

public static HoptimatorPlanner fromJdbc(String url, Properties properties) throws SQLException, IOException {
if (url.startsWith("jdbc:calcite:model=")) {
return fromModelFile(url.substring("jdbc:calcite:model=".length()), properties);
} else {
return fromJdbc(url, properties.getProperty("catalog"), properties.getProperty("username"),
properties.getProperty("password"));
}
}
}

0 comments on commit 4fb0254

Please sign in to comment.