Skip to content

Commit

Permalink
Add new planner, JDBC driver, and DDL machinery
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Nov 26, 2024
1 parent 67fda04 commit 7d2a3f1
Show file tree
Hide file tree
Showing 160 changed files with 9,055 additions and 1,471 deletions.
2 changes: 0 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
FROM eclipse-temurin:18
WORKDIR /home/
ADD ./hoptimator-cli-integration/build/distributions/hoptimator-cli-integration.tar ./
ADD ./hoptimator-operator-integration/build/distributions/hoptimator-operator-integration.tar ./
ADD ./etc/* ./
ENTRYPOINT ["/bin/sh", "-c"]
CMD ["./hoptimator-cli-integration/bin/hoptimator-cli-integration -n '' -p '' -u jdbc:calcite:model=model.yaml"]

12 changes: 9 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@

install:
./gradlew installDist

build:
./gradlew build
docker build . -t hoptimator
docker build hoptimator-flink-runner -t hoptimator-flink-runner

bounce: build undeploy deploy deploy-samples deploy-config
bounce: build undeploy deploy deploy-samples deploy-config deploy-demo

integration-tests:
./bin/hoptimator --run=./integration-tests.sql
echo "\nPASS"
echo "\nNOTHING TO DO FOR NOW"

clean:
./gradlew clean

deploy-demo:
kubectl apply -f ./deploy/samples/demodb.yaml

deploy: deploy-config
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy

undeploy:
Expand Down
93 changes: 37 additions & 56 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,87 +1,68 @@
# Hoptimator

Multi-hop declarative data pipelines
## Intro

## What is Hoptimator?
Hoptimator gives you a SQL interface to a Kubernetes cluster. You can install databases, query tables, create views, and deploy data pipelines using just SQL.

Hoptimator is an SQL-based control plane for complex data pipelines.

Hoptimator turns high-level SQL _subscriptions_ into multi-hop data pipelines. Pipelines may involve an auto-generated Flink job (or similar) and any arbitrary resources required for the job to run.

## How does it work?

Hoptimator has a pluggable _adapter_ framework, which lets you wire-up arbtitary data sources. Adapters loosely correspond to connectors in the underlying compute engine (e.g. Flink Connectors), but they may include custom control plane logic. For example, an adapter may create a cache or a CDC stream as part of a pipeline. This enables a single pipeline to span multiple "hops" across different systems (as opposed to, say, a single Flink job).

Hoptimator's pipelines tend to have the following general shape:

_________
topic1 ----------------------> | |
table2 --> CDC ---> topic2 --> | SQL job | --> topic4
table3 --> rETL --> topic3 --> |_________|
To install a database, use `kubectl`:

```
$ kubectl apply -f my-database.yaml
```

The three data sources on the left correspond to three different adapters:
(`create database` is coming soon!)

1. `topic1` can be read directly from a Flink job, so the first adapter simply configures a Flink connector.
2. `table2` is inefficient for bulk access, so the second adapter creates a CDC stream (`topic2`) and configures a Flink connector to read from _that_.
3. `table3` is in cold storage, so the third adapter creates a reverse-ETL job to re-ingest the data into Kafka.
Then use Hoptimator DDL to create a materialized view:

In order to deploy such a pipeline, you only need to write one SQL query, called a _subscription_. Pipelines are constructed automatically based on subscriptions.
```
> create materialized view my.foo as select * from ads.page_views;
```

## Quick Start
Views created via DDL show up in Kubernetes as `views`:

### Prerequistes
```
$ kubectl get views
NAME SCHEMA VIEW SQL
my-foo DEFAULT FOO SELECT *...
1. `docker` is installed and docker daemon is running
2. `kubectl` is installed and cluster is running
1. `minikube` can be used for a local cluster
3. `helm` for Kubernetes is installed
```

### Run
Materialized views result in `pipelines`:

```
$ make quickstart
... wait a while ...
$ ./bin/hoptimator
> !intro
> !q
$ kubectl get pipelines
NAME SQL STATUS
my-foo INSERT INTO... Ready.
```

## Subscriptions
## Quickstart

Subscriptions are SQL views that are automatically materialized by a pipeline.
Hoptimator requires a Kubernetes cluster. To connect from outside a Kubernetes cluster, make sure your `kubectl` is properly configured.

```
$ kubectl apply -f deploy/samples/subscriptions.yaml
$ make install # build and install SQL CLI
$ make deploy deploy-demo # install CRDs and K8s objects
$ ./hoptimator
> !intro
```

In response, the operator will deploy a Flink job and other resources:
## The SQL CLI

```
$ kubectl get subscriptions
$ kubectl get flinkdeployments
$ kubectl get kafkatopics
```
The `./hoptimator` script launches the [sqlline](https://github.com/julianhyde/sqlline) SQL CLI pre-configured to connect to `:jdbc:hoptimator://`. The CLI includes some additional commands. See `!intro`.

You can verify the job is running by inspecting the output:
## The JDBC Driver

```
$ ./bin/hoptimator
> !tables
> SELECT * FROM RAWKAFKA."products" LIMIT 5;
> !q
```
To use Hoptimator from Java code, or from anything that supports JDBC, use the `:jdbc:hoptimator://` JDBC driver.

## The Operator

Hoptimator-operator is a Kubernetes operator that orchestrates multi-hop data pipelines based on Subscriptions (a custom resource). When a Subscription is deployed, the operator:
`hoptimator-operator` turns materialized views into real data pipelines.

1. creates a _plan_ based on the Subscription SQL. The plan includes a set of _resources_ that make up a _pipeline_.
2. deploys each resource in the pipeline. This may involve creating Kafka topics, Flink jobs, etc.
3. reports Subscription status, which depends on the status of each resource in the pipeline.
## Extending Hoptimator

The operator is extensible via _adapters_. Among other responsibilities, adapters can implement custom control plane logic (see `ControllerProvider`), or they can depend on external operators. For example, the Kafka adapter actively manages Kafka topics using a custom controller. The Flink adapter defers to [flink-kubernetes-operator](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/) to manage Flink jobs.
Hoptimator can be extended via `TableTemplates`:

## The CLI
```
$ kubectl apply -f my-table-template.yaml
```

Hoptimator includes a SQL CLI based on [sqlline](https://github.com/julianhyde/sqlline). This is primarily for testing and debugging purposes, but it can also be useful for runnig ad-hoc queries. The CLI leverages the same adapters as the operator, but it doesn't deploy anything. Instead, queries run as local, in-process Flink jobs.
21 changes: 0 additions & 21 deletions deploy/hoptimator-pod.yaml

This file was deleted.

35 changes: 35 additions & 0 deletions deploy/samples/demodb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Database
metadata:
name: ads-database
spec:
schema: ADS
url: jdbc:demodb://ads
dialect: Calcite

---

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Database
metadata:
name: profile-database
spec:
schema: PROFILE
url: jdbc:demodb://profile
dialect: Calcite

---

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: TableTemplate
metadata:
name: demodb-template
spec:
databases:
- profile-database
- ads-database
connector: |
connector = demo
database = {{database}}
table = {{table}}
20 changes: 0 additions & 20 deletions etc/integration-tests.sql

This file was deleted.

18 changes: 18 additions & 0 deletions generate-models.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/sh

docker pull ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6

docker run \
--rm \
--mount type=bind,src=/var/run/docker.sock,dst=/var/run/docker.sock \
--mount type=bind,src="$(pwd)",dst="$(pwd)" \
-ti \
--network host \
ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6 \
/generate.sh -o "$(pwd)/hoptimator-k8s" -n "" -p "com.linkedin.hoptimator.k8s" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/databases.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/pipelines.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/views.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/subscriptions.crd.yaml" \
&& echo "done."
48 changes: 24 additions & 24 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
[libraries]
assertj = "org.assertj:assertj-core:3.12.0"
avro = "org.apache.avro:avro:1.10.2"
calciteAvatica = "org.apache.calcite.avatica:avatica:1.23.0"
calciteCore = "org.apache.calcite:calcite-core:1.34.0"
flinkClients = "org.apache.flink:flink-clients:1.16.2"
flinkConnectorBase = "org.apache.flink:flink-connector-base:1.16.2"
flinkCore = "org.apache.flink:flink-core:1.16.2"
flinkCsv = "org.apache.flink:flink-csv:1.16.2"
flinkStreamingJava = "org.apache.flink:flink-streaming-java:1.16.2"
flinkTableApiJava = "org.apache.flink:flink-table-api-java:1.16.2"
flinkTableApiJavaBridge = "org.apache.flink:flink-table-api-java-bridge:1.16.2"
flinkTableCommon = "org.apache.flink:flink-table-common:1.16.2"
flinkTablePlanner = "org.apache.flink:flink-table-planner_2.12:1.16.2"
flinkTableRuntime = "org.apache.flink:flink-table-runtime:1.16.2"
flinkMetricsDropwizard = "org.apache.flink:flink-metrics-dropwizard:1.16.2"
flinkConnectorKafka = "org.apache.flink:flink-sql-connector-kafka:1.16.2"
flinkConnectorMySqlCdc = "com.ververica:flink-sql-connector-mysql-cdc:2.3.0"
calcite-avatica = "org.apache.calcite.avatica:avatica:1.23.0"
calcite-core = "org.apache.calcite:calcite-core:1.34.0"
calcite-server = "org.apache.calcite:calcite-server:1.34.0"
flink-clients = "org.apache.flink:flink-clients:1.16.2"
flink-connector-base = "org.apache.flink:flink-connector-base:1.16.2"
flink-core = "org.apache.flink:flink-core:1.16.2"
flink-csv = "org.apache.flink:flink-csv:1.16.2"
flink-streaming-java = "org.apache.flink:flink-streaming-java:1.16.2"
flink-table-api-java = "org.apache.flink:flink-table-api-java:1.16.2"
flink-table-api-java-bridge = "org.apache.flink:flink-table-api-java-bridge:1.16.2"
flink-table-common = "org.apache.flink:flink-table-common:1.16.2"
flink-table-planner = "org.apache.flink:flink-table-planner_2.12:1.16.2"
flink-table-runtime = "org.apache.flink:flink-table-runtime:1.16.2"
flink-connector-kafka = "org.apache.flink:flink-sql-connector-kafka:1.16.2"
flink-connector-mysql-cdc = "com.ververica:flink-sql-connector-mysql-cdc:2.3.0"
gson = "com.google.code.gson:gson:2.9.0"
jackson = "com.fasterxml.jackson.core:jackson-core:2.14.1"
jacksonYaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.1"
javaxAnnotationApi = "javax.annotation:javax.annotation-api:1.3.2"
jackson-dataformat-yaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.1"
javax-annotation-api = "javax.annotation:javax.annotation-api:1.3.2"
junit = "junit:junit:4.12"
kafkaClients = "org.apache.kafka:kafka-clients:2.7.1"
kubernetesClient = "io.kubernetes:client-java:16.0.2"
kubernetesExtendedClient = "io.kubernetes:client-java-extended:16.0.2"
slf4jSimple = "org.slf4j:slf4j-simple:1.7.30"
slf4jApi = "org.slf4j:slf4j-api:1.7.30"
kafka-clients = "org.apache.kafka:kafka-clients:2.7.1"
kubernetes-client = "io.kubernetes:client-java:16.0.2"
kubernetes-extended-client = "io.kubernetes:client-java-extended:16.0.2"
slf4j-simple = "org.slf4j:slf4j-simple:1.7.30"
slf4j-api = "org.slf4j:slf4j-api:1.7.30"
sqlline = "sqlline:sqlline:1.12.0"
commonsCli = 'commons-cli:commons-cli:1.4'

commons-cli = "commons-cli:commons-cli:1.4"
quidem = "net.hydromatic:quidem:0.11"
8 changes: 8 additions & 0 deletions hoptimator
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/sh

BASEDIR="$( cd "$( dirname "$0" )" && pwd )"

$BASEDIR/hoptimator-cli/build/install/hoptimator-cli/bin/hoptimator-cli sqlline.SqlLine \
-ac sqlline.HoptimatorAppConfig \
-u jdbc:hoptimator:// -n "" -p "" -nn "Hoptimator" $@

7 changes: 7 additions & 0 deletions hoptimator-api/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
plugins {
id 'java'
}

dependencies {
// plz keep it this way
}
12 changes: 12 additions & 0 deletions hoptimator-api/src/main/java/com/linkedin/hoptimator/Catalog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.linkedin.hoptimator;

import java.sql.Wrapper;
import java.sql.SQLException;

/** Registers a set of tables, possibly within schemas and sub-schemas. */
public interface Catalog {

String name();
String description();
void register(Wrapper parentSchema) throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkedin.hoptimator;

import java.util.Collection;

public interface CatalogProvider {

Collection<Catalog> catalogs();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.hoptimator;

import java.util.Map;
import java.sql.SQLException;

public interface Connector<T> {

Map<String, String> configure(T t) throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkedin.hoptimator;

import java.util.Collection;

public interface ConnectorProvider {

<T> Collection<Connector<T>> connectors(Class<T> clazz);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkedin.hoptimator;

/** A collection of tables, as populated by a Catalog. */
public interface Database {

/** Name of the database. */
String databaseName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.linkedin.hoptimator;

import java.util.List;
import java.sql.SQLException;

public interface Deployable {

void create() throws SQLException;
void delete() throws SQLException;
void update() throws SQLException;

/** Render a list of specs, usually YAML. */
List<String> specify() throws SQLException;
}
Loading

0 comments on commit 7d2a3f1

Please sign in to comment.