Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new planner, JDBC driver, and DDL machinery #72

Merged
merged 3 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,6 @@ jobs:
run: make deploy
- name: Deploy Samples
run: make deploy-samples
- name: Wait for Readiness
run: kubectl wait pod hoptimator --for condition=Ready --timeout=10m
- name: Wait for Flink Jobs
run: |
i=0
while [ $i -ne 10 ]
do
kubectl wait flinkdeployments --all --for=jsonpath={.status.lifecycleState}=STABLE --timeout=1m && break || sleep 60
i=$(($i+1))
echo "No stable Flink jobs after $i tries..."
done
- name: Run Integration Tests
run: make integration-tests
- name: Capture Cluster State
Expand Down
2 changes: 0 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
FROM eclipse-temurin:18
WORKDIR /home/
ADD ./hoptimator-cli-integration/build/distributions/hoptimator-cli-integration.tar ./
ADD ./hoptimator-operator-integration/build/distributions/hoptimator-operator-integration.tar ./
ADD ./etc/* ./
ENTRYPOINT ["/bin/sh", "-c"]
CMD ["./hoptimator-cli-integration/bin/hoptimator-cli-integration -n '' -p '' -u jdbc:calcite:model=model.yaml"]

12 changes: 9 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@

install:
./gradlew installDist

build:
./gradlew build
docker build . -t hoptimator
docker build hoptimator-flink-runner -t hoptimator-flink-runner

bounce: build undeploy deploy deploy-samples deploy-config
bounce: build undeploy deploy deploy-samples deploy-config deploy-demo

integration-tests:
./bin/hoptimator --run=./integration-tests.sql
echo "\nPASS"
echo "\nNOTHING TO DO FOR NOW"

clean:
./gradlew clean

deploy-demo:
kubectl apply -f ./deploy/samples/demodb.yaml

deploy: deploy-config
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy

undeploy:
Expand Down
93 changes: 37 additions & 56 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,87 +1,68 @@
# Hoptimator

Multi-hop declarative data pipelines
## Intro

## What is Hoptimator?
Hoptimator gives you a SQL interface to a Kubernetes cluster. You can install databases, query tables, create views, and deploy data pipelines using just SQL.

Hoptimator is an SQL-based control plane for complex data pipelines.

Hoptimator turns high-level SQL _subscriptions_ into multi-hop data pipelines. Pipelines may involve an auto-generated Flink job (or similar) and any arbitrary resources required for the job to run.

## How does it work?

Hoptimator has a pluggable _adapter_ framework, which lets you wire-up arbtitary data sources. Adapters loosely correspond to connectors in the underlying compute engine (e.g. Flink Connectors), but they may include custom control plane logic. For example, an adapter may create a cache or a CDC stream as part of a pipeline. This enables a single pipeline to span multiple "hops" across different systems (as opposed to, say, a single Flink job).

Hoptimator's pipelines tend to have the following general shape:

_________
topic1 ----------------------> | |
table2 --> CDC ---> topic2 --> | SQL job | --> topic4
table3 --> rETL --> topic3 --> |_________|
To install a database, use `kubectl`:

```
$ kubectl apply -f my-database.yaml
```

The three data sources on the left correspond to three different adapters:
(`create database` is coming soon!)

1. `topic1` can be read directly from a Flink job, so the first adapter simply configures a Flink connector.
2. `table2` is inefficient for bulk access, so the second adapter creates a CDC stream (`topic2`) and configures a Flink connector to read from _that_.
3. `table3` is in cold storage, so the third adapter creates a reverse-ETL job to re-ingest the data into Kafka.
Then use Hoptimator DDL to create a materialized view:

In order to deploy such a pipeline, you only need to write one SQL query, called a _subscription_. Pipelines are constructed automatically based on subscriptions.
```
> create materialized view my.foo as select * from ads.page_views;
```

## Quick Start
Views created via DDL show up in Kubernetes as `views`:

### Prerequistes
```
$ kubectl get views
NAME SCHEMA VIEW SQL
my-foo MY FOO SELECT *...

1. `docker` is installed and docker daemon is running
2. `kubectl` is installed and cluster is running
1. `minikube` can be used for a local cluster
3. `helm` for Kubernetes is installed
```

### Run
Materialized views result in `pipelines`:

```
$ make quickstart
... wait a while ...
$ ./bin/hoptimator
> !intro
> !q
$ kubectl get pipelines
NAME SQL STATUS
my-foo INSERT INTO... Ready.
```

## Subscriptions
## Quickstart

Subscriptions are SQL views that are automatically materialized by a pipeline.
Hoptimator requires a Kubernetes cluster. To connect from outside a Kubernetes cluster, make sure your `kubectl` is properly configured.

```
$ kubectl apply -f deploy/samples/subscriptions.yaml
$ make install # build and install SQL CLI
$ make deploy deploy-demo # install CRDs and K8s objects
$ ./hoptimator
> !intro
```

In response, the operator will deploy a Flink job and other resources:
## The SQL CLI

```
$ kubectl get subscriptions
$ kubectl get flinkdeployments
$ kubectl get kafkatopics
```
The `./hoptimator` script launches the [sqlline](https://github.com/julianhyde/sqlline) SQL CLI pre-configured to connect to `jdbc:hoptimator://`. The CLI includes some additional commands. See `!intro`.

You can verify the job is running by inspecting the output:
## The JDBC Driver

```
$ ./bin/hoptimator
> !tables
> SELECT * FROM RAWKAFKA."products" LIMIT 5;
> !q
```
To use Hoptimator from Java code, or from anything that supports JDBC, use the `jdbc:hoptimator://` JDBC driver.

## The Operator

Hoptimator-operator is a Kubernetes operator that orchestrates multi-hop data pipelines based on Subscriptions (a custom resource). When a Subscription is deployed, the operator:
`hoptimator-operator` turns materialized views into real data pipelines.

1. creates a _plan_ based on the Subscription SQL. The plan includes a set of _resources_ that make up a _pipeline_.
2. deploys each resource in the pipeline. This may involve creating Kafka topics, Flink jobs, etc.
3. reports Subscription status, which depends on the status of each resource in the pipeline.
## Extending Hoptimator

The operator is extensible via _adapters_. Among other responsibilities, adapters can implement custom control plane logic (see `ControllerProvider`), or they can depend on external operators. For example, the Kafka adapter actively manages Kafka topics using a custom controller. The Flink adapter defers to [flink-kubernetes-operator](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/) to manage Flink jobs.
Hoptimator can be extended via `TableTemplates`:

## The CLI
```
$ kubectl apply -f my-table-template.yaml
```

Hoptimator includes a SQL CLI based on [sqlline](https://github.com/julianhyde/sqlline). This is primarily for testing and debugging purposes, but it can also be useful for runnig ad-hoc queries. The CLI leverages the same adapters as the operator, but it doesn't deploy anything. Instead, queries run as local, in-process Flink jobs.
21 changes: 0 additions & 21 deletions deploy/hoptimator-pod.yaml

This file was deleted.

35 changes: 35 additions & 0 deletions deploy/samples/demodb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Database
metadata:
name: ads-database
spec:
schema: ADS
url: jdbc:demodb://ads
dialect: Calcite

---

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Database
metadata:
name: profile-database
spec:
schema: PROFILE
url: jdbc:demodb://profile
dialect: Calcite

---

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: TableTemplate
metadata:
name: demodb-template
spec:
databases:
- profile-database
- ads-database
connector: |
connector = demo
database = {{database}}
table = {{table}}

20 changes: 0 additions & 20 deletions etc/integration-tests.sql

This file was deleted.

18 changes: 18 additions & 0 deletions generate-models.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/sh

docker pull ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6

docker run \
--rm \
--mount type=bind,src=/var/run/docker.sock,dst=/var/run/docker.sock \
--mount type=bind,src="$(pwd)",dst="$(pwd)" \
-ti \
--network host \
ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6 \
/generate.sh -o "$(pwd)/hoptimator-k8s" -n "" -p "com.linkedin.hoptimator.k8s" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/databases.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/pipelines.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/views.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/subscriptions.crd.yaml" \
&& echo "done."
48 changes: 24 additions & 24 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
[libraries]
assertj = "org.assertj:assertj-core:3.12.0"
avro = "org.apache.avro:avro:1.10.2"
calciteAvatica = "org.apache.calcite.avatica:avatica:1.23.0"
calciteCore = "org.apache.calcite:calcite-core:1.34.0"
flinkClients = "org.apache.flink:flink-clients:1.16.2"
flinkConnectorBase = "org.apache.flink:flink-connector-base:1.16.2"
flinkCore = "org.apache.flink:flink-core:1.16.2"
flinkCsv = "org.apache.flink:flink-csv:1.16.2"
flinkStreamingJava = "org.apache.flink:flink-streaming-java:1.16.2"
flinkTableApiJava = "org.apache.flink:flink-table-api-java:1.16.2"
flinkTableApiJavaBridge = "org.apache.flink:flink-table-api-java-bridge:1.16.2"
flinkTableCommon = "org.apache.flink:flink-table-common:1.16.2"
flinkTablePlanner = "org.apache.flink:flink-table-planner_2.12:1.16.2"
flinkTableRuntime = "org.apache.flink:flink-table-runtime:1.16.2"
flinkMetricsDropwizard = "org.apache.flink:flink-metrics-dropwizard:1.16.2"
flinkConnectorKafka = "org.apache.flink:flink-sql-connector-kafka:1.16.2"
flinkConnectorMySqlCdc = "com.ververica:flink-sql-connector-mysql-cdc:2.3.0"
calcite-avatica = "org.apache.calcite.avatica:avatica:1.23.0"
calcite-core = "org.apache.calcite:calcite-core:1.34.0"
calcite-server = "org.apache.calcite:calcite-server:1.34.0"
flink-clients = "org.apache.flink:flink-clients:1.16.2"
flink-connector-base = "org.apache.flink:flink-connector-base:1.16.2"
flink-core = "org.apache.flink:flink-core:1.16.2"
flink-csv = "org.apache.flink:flink-csv:1.16.2"
flink-streaming-java = "org.apache.flink:flink-streaming-java:1.16.2"
flink-table-api-java = "org.apache.flink:flink-table-api-java:1.16.2"
flink-table-api-java-bridge = "org.apache.flink:flink-table-api-java-bridge:1.16.2"
flink-table-common = "org.apache.flink:flink-table-common:1.16.2"
flink-table-planner = "org.apache.flink:flink-table-planner_2.12:1.16.2"
flink-table-runtime = "org.apache.flink:flink-table-runtime:1.16.2"
flink-connector-kafka = "org.apache.flink:flink-sql-connector-kafka:1.16.2"
flink-connector-mysql-cdc = "com.ververica:flink-sql-connector-mysql-cdc:2.3.0"
gson = "com.google.code.gson:gson:2.9.0"
jackson = "com.fasterxml.jackson.core:jackson-core:2.14.1"
jacksonYaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.1"
javaxAnnotationApi = "javax.annotation:javax.annotation-api:1.3.2"
jackson-dataformat-yaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.1"
javax-annotation-api = "javax.annotation:javax.annotation-api:1.3.2"
junit = "junit:junit:4.12"
kafkaClients = "org.apache.kafka:kafka-clients:2.7.1"
kubernetesClient = "io.kubernetes:client-java:16.0.2"
kubernetesExtendedClient = "io.kubernetes:client-java-extended:16.0.2"
slf4jSimple = "org.slf4j:slf4j-simple:1.7.30"
slf4jApi = "org.slf4j:slf4j-api:1.7.30"
kafka-clients = "org.apache.kafka:kafka-clients:2.7.1"
kubernetes-client = "io.kubernetes:client-java:16.0.2"
kubernetes-extended-client = "io.kubernetes:client-java-extended:16.0.2"
slf4j-simple = "org.slf4j:slf4j-simple:1.7.30"
slf4j-api = "org.slf4j:slf4j-api:1.7.30"
sqlline = "sqlline:sqlline:1.12.0"
commonsCli = 'commons-cli:commons-cli:1.4'

commons-cli = "commons-cli:commons-cli:1.4"
quidem = "net.hydromatic:quidem:0.11"
8 changes: 8 additions & 0 deletions hoptimator
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/sh

BASEDIR="$( cd "$( dirname "$0" )" && pwd )"

$BASEDIR/hoptimator-cli/build/install/hoptimator-cli/bin/hoptimator-cli sqlline.SqlLine \
-ac sqlline.HoptimatorAppConfig \
-u jdbc:hoptimator:// -n "" -p "" -nn "Hoptimator" $@

7 changes: 7 additions & 0 deletions hoptimator-api/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
plugins {
id 'java'
}

dependencies {
// plz keep it this way
}
12 changes: 12 additions & 0 deletions hoptimator-api/src/main/java/com/linkedin/hoptimator/Catalog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.linkedin.hoptimator;

import java.sql.Wrapper;
import java.sql.SQLException;

/** Registers a set of tables, possibly within schemas and sub-schemas. */
public interface Catalog {

String name();
String description();
void register(Wrapper parentSchema) throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkedin.hoptimator;

import java.util.Collection;

public interface CatalogProvider {

Collection<Catalog> catalogs();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.hoptimator;

import java.util.Map;
import java.sql.SQLException;

public interface Connector<T> {

Map<String, String> configure(T t) throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkedin.hoptimator;

import java.util.Collection;

public interface ConnectorProvider {

<T> Collection<Connector<T>> connectors(Class<T> clazz);
}
Loading