Skip to content

Commit

Permalink
Fix Flink SQL syntax and ability to push jobs to session cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
jogrogan committed Jan 16, 2025
1 parent 7c0455f commit 4c48812
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 28 deletions.
8 changes: 0 additions & 8 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ jobs:
kind load docker-image hoptimator
kind load docker-image hoptimator-flink-runner
kind load docker-image hoptimator-flink-operator
- name: Deploy Dev Environment
run: make deploy-dev-environment
- name: Deploy Hoptimator
run: make deploy
- name: Deploy Samples
run: make deploy-samples
- name: Wait for Readiness
run: kubectl wait kafka/one --for=condition=Ready --timeout=10m -n kafka
- name: Run Integration Tests
run: make integration-tests
- name: Capture Cluster State
Expand Down
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ undeploy-flink:
kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping"
helm uninstall flink-kubernetes-operator || echo "skipping"
helm repo remove flink-operator-repo || echo "skipping"
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

deploy-kafka: deploy deploy-flink
kubectl create namespace kafka || echo "skipping"
Expand All @@ -73,7 +74,6 @@ deploy-kafka: deploy deploy-flink
kubectl apply -f ./deploy/dev
kubectl apply -f ./deploy/samples/demodb.yaml
kubectl apply -f ./deploy/samples/kafkadb.yaml
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

undeploy-kafka:
kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping"
Expand Down Expand Up @@ -107,8 +107,7 @@ integration-tests: deploy-dev-environment deploy-samples
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid
./gradlew intTest || kill `cat port-forward.pid`
kill `cat port-forward.pid`
./gradlew intTest; status=$$?; kill `cat port-forward.pid`; exit $$status

generate-models:
./generate-models.sh
Expand Down
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ The below setup will create a dev environment with various resources within Kube

Commands `deploy-kafka`, `deploy-venice`, `deploy-flink`, etc. exist in isolation to deploy individual components.

### Kafka

To produce/consume Kafka data, use the following commands:

```
$ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1
$ kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 --from-beginning
```

### Flink

```
Expand All @@ -80,6 +89,26 @@ to access the Flink dashboard.
See the [Flink SQL Gateway Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql-gateway/overview/)
for sample adhoc queries through Flink.

To push a Flink job directly to the Flink deployment created above, `kubectl apply` the following yaml:
```
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: test-flink-session-job
spec:
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `datagen-table` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='datagen', 'number-of-rows'='10');
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'topic'='existing-topic-1', 'value.format'='json');
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `datagen-table`;
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
```

## The SQL CLI

The `./hoptimator` script launches the [sqlline](https://github.com/julianhyde/sqlline) SQL CLI pre-configured to connect to `jdbc:hoptimator://`. The CLI includes some additional commands. See `!intro`.
Expand Down
2 changes: 2 additions & 0 deletions deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ spec:
port: 9094
type: internal
tls: false
configuration:
useServiceDnsDomain: true
- name: tls
port: 9093
type: internal
Expand Down
5 changes: 3 additions & 2 deletions deploy/samples/flinkDeployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ kind: FlinkDeployment
metadata:
name: basic-session-deployment
spec:
image: flink:1.18
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
taskmanager.numberOfTaskSlots: "3"
serviceAccount: flink
jobManager:
resource:
Expand Down
4 changes: 3 additions & 1 deletion deploy/samples/kafkadb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ spec:
connector: |
connector = kafka
topic = {{table}}
properties.bootstrap.servers = localhost:9092
properties.bootstrap.servers = one-kafka-bootstrap.kafka.svc.cluster.local:9094
value.format = json
scan.startup.mode = earliest-offset
---

Expand Down
10 changes: 5 additions & 5 deletions hoptimator-kafka/src/test/resources/kafka-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ kind: FlinkSessionJob
metadata:
name: kafka-database-existing-topic-1
spec:
deploymentName: basic-session-deployment-example
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-2')
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-1')
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2`
jarURI: local:///opt/hoptimator-flink-runner.jar
- CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.format'='json')
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.format'='json')
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `existing-topic-2`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,15 @@ public void implement(SqlWriter w) {
/** Implements an arbitrary RelNode as a query */
class QueryImplementor implements ScriptImplementor {
private final RelNode relNode;
private final boolean dropDatabaseName;

public QueryImplementor(RelNode relNode) {
this(relNode, true);
}

public QueryImplementor(RelNode relNode, boolean dropDatabaseName) {
this.relNode = relNode;
this.dropDatabaseName = dropDatabaseName;
}

@Override
Expand Down Expand Up @@ -192,24 +198,32 @@ public SqlNode visit(SqlCall call) {

private static class UnflattenMemberAccess extends SqlShuttle {
private final Set<String> sinkFieldList;
private final boolean dropDatabaseName;

UnflattenMemberAccess(QueryImplementor outer) {
this.sinkFieldList = outer.relNode.getRowType().getFieldList()
.stream()
.map(RelDataTypeField::getName)
.collect(Collectors.toSet());
this.dropDatabaseName = outer.dropDatabaseName;
}

// SqlShuttle gets called for every field in SELECT and every table name in FROM alike
// For fields in SELECT, we want to unflatten them as `FOO_BAR`, for tables `FOO.BAR`
// or just `BAR` if we need to drop the database name (For Flink)
@Override
public SqlNode visit(SqlIdentifier id) {
if (id.names.size() == 1 && sinkFieldList.contains(id.names.get(0))) {
id.assignNamesFrom(new SqlIdentifier(id.names.get(0).replaceAll("\\$", "_"), SqlParserPos.ZERO));
} else {
SqlIdentifier replacement = new SqlIdentifier(id.names.stream()
.flatMap(x -> Stream.of(x.split("\\$")))
.collect(Collectors.toList()), SqlParserPos.ZERO);
SqlIdentifier replacement;
if (id.names.size() == 2 && this.dropDatabaseName) {
replacement = new SqlIdentifier(id.names.subList(1, id.names.size()), SqlParserPos.ZERO);
} else {
replacement = new SqlIdentifier(id.names.stream()
.flatMap(x -> Stream.of(x.split("\\$")))
.collect(Collectors.toList()), SqlParserPos.ZERO);
}
id.assignNamesFrom(replacement);
}
return id;
Expand Down
4 changes: 2 additions & 2 deletions hoptimator-venice/src/test/resources/venice-ddl-insert-all.id
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ spec:
args:
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
jarURI: local:///opt/hoptimator-flink-runner.jar
- INSERT INTO `test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `test-store`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ spec:
args:
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store`
jarURI: local:///opt/hoptimator-flink-runner.jar
- INSERT INTO `test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `test-store`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
Expand Down
4 changes: 2 additions & 2 deletions hoptimator-venice/src/test/resources/venice-ddl-select.id
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ spec:
args:
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `SINK` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ()
- INSERT INTO `SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1`
jarURI: local:///opt/hoptimator-flink-runner.jar
- INSERT INTO `SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `test-store-1`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
Expand Down

0 comments on commit 4c48812

Please sign in to comment.