From 4c488126fb062bde00efa61dde82c026af5e2f72 Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Wed, 15 Jan 2025 18:22:25 -0500 Subject: [PATCH 1/3] Fix Flink SQL syntax and ability to push jobs to session cluster --- .github/workflows/integration-tests.yml | 8 ----- Makefile | 5 ++-- README.md | 29 +++++++++++++++++++ deploy/dev/kafka.yaml | 2 ++ deploy/samples/flinkDeployment.yaml | 5 ++-- deploy/samples/kafkadb.yaml | 4 ++- .../src/test/resources/kafka-ddl.id | 10 +++---- .../util/planner/ScriptImplementor.java | 20 +++++++++++-- .../test/resources/venice-ddl-insert-all.id | 4 +-- .../resources/venice-ddl-insert-partial.id | 4 +-- .../src/test/resources/venice-ddl-select.id | 4 +-- 11 files changed, 67 insertions(+), 28 deletions(-) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index bedaf141..0bdbb367 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -33,14 +33,6 @@ jobs: kind load docker-image hoptimator kind load docker-image hoptimator-flink-runner kind load docker-image hoptimator-flink-operator - - name: Deploy Dev Environment - run: make deploy-dev-environment - - name: Deploy Hoptimator - run: make deploy - - name: Deploy Samples - run: make deploy-samples - - name: Wait for Readiness - run: kubectl wait kafka/one --for=condition=Ready --timeout=10m -n kafka - name: Run Integration Tests run: make integration-tests - name: Capture Cluster State diff --git a/Makefile b/Makefile index e40df53c..f67a0409 100644 --- a/Makefile +++ b/Makefile @@ -64,6 +64,7 @@ undeploy-flink: kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping" helm uninstall flink-kubernetes-operator || echo "skipping" helm repo remove flink-operator-repo || echo "skipping" + kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" deploy-kafka: deploy deploy-flink kubectl create namespace kafka || echo "skipping" @@ -73,7 +74,6 @@ deploy-kafka: deploy deploy-flink kubectl apply -f ./deploy/dev kubectl apply -f ./deploy/samples/demodb.yaml kubectl apply -f ./deploy/samples/kafkadb.yaml - kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" undeploy-kafka: kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping" @@ -107,8 +107,7 @@ integration-tests: deploy-dev-environment deploy-samples kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid - ./gradlew intTest || kill `cat port-forward.pid` - kill `cat port-forward.pid` + ./gradlew intTest; status=$$?; kill `cat port-forward.pid`; exit $$status generate-models: ./generate-models.sh diff --git a/README.md b/README.md index dc42b090..43f1b67f 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,15 @@ The below setup will create a dev environment with various resources within Kube Commands `deploy-kafka`, `deploy-venice`, `deploy-flink`, etc. exist in isolation to deploy individual components. +### Kafka + +To produce/consume Kafka data, use the following commands: + +``` + $ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 + $ kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 --from-beginning +``` + ### Flink ``` @@ -80,6 +89,26 @@ to access the Flink dashboard. See the [Flink SQL Gateway Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql-gateway/overview/) for sample adhoc queries through Flink. +To push a Flink job directly to the Flink deployment created above, `kubectl apply` the following yaml: +``` + apiVersion: flink.apache.org/v1beta1 + kind: FlinkSessionJob + metadata: + name: test-flink-session-job + spec: + deploymentName: basic-session-deployment + job: + entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner + args: + - CREATE TABLE IF NOT EXISTS `datagen-table` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='datagen', 'number-of-rows'='10'); + - CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'topic'='existing-topic-1', 'value.format'='json'); + - INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `datagen-table`; + jarURI: file:///opt/hoptimator-flink-runner.jar + parallelism: 1 + upgradeMode: stateless + state: running +``` + ## The SQL CLI The `./hoptimator` script launches the [sqlline](https://github.com/julianhyde/sqlline) SQL CLI pre-configured to connect to `jdbc:hoptimator://`. The CLI includes some additional commands. See `!intro`. diff --git a/deploy/dev/kafka.yaml b/deploy/dev/kafka.yaml index 237b9202..ce61b94f 100644 --- a/deploy/dev/kafka.yaml +++ b/deploy/dev/kafka.yaml @@ -30,6 +30,8 @@ spec: port: 9094 type: internal tls: false + configuration: + useServiceDnsDomain: true - name: tls port: 9093 type: internal diff --git a/deploy/samples/flinkDeployment.yaml b/deploy/samples/flinkDeployment.yaml index e7c89cfd..f2464add 100644 --- a/deploy/samples/flinkDeployment.yaml +++ b/deploy/samples/flinkDeployment.yaml @@ -3,10 +3,11 @@ kind: FlinkDeployment metadata: name: basic-session-deployment spec: - image: flink:1.18 + image: docker.io/library/hoptimator-flink-runner + imagePullPolicy: Never flinkVersion: v1_18 flinkConfiguration: - taskmanager.numberOfTaskSlots: "1" + taskmanager.numberOfTaskSlots: "3" serviceAccount: flink jobManager: resource: diff --git a/deploy/samples/kafkadb.yaml b/deploy/samples/kafkadb.yaml index 79263f04..6b4ff9a7 100644 --- a/deploy/samples/kafkadb.yaml +++ b/deploy/samples/kafkadb.yaml @@ -34,7 +34,9 @@ spec: connector: | connector = kafka topic = {{table}} - properties.bootstrap.servers = localhost:9092 + properties.bootstrap.servers = one-kafka-bootstrap.kafka.svc.cluster.local:9094 + value.format = json + scan.startup.mode = earliest-offset --- diff --git a/hoptimator-kafka/src/test/resources/kafka-ddl.id b/hoptimator-kafka/src/test/resources/kafka-ddl.id index 4b717ab6..e868d8d3 100644 --- a/hoptimator-kafka/src/test/resources/kafka-ddl.id +++ b/hoptimator-kafka/src/test/resources/kafka-ddl.id @@ -7,14 +7,14 @@ kind: FlinkSessionJob metadata: name: kafka-database-existing-topic-1 spec: - deploymentName: basic-session-deployment-example + deploymentName: basic-session-deployment job: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - - CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-2') - - CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-1') - - INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2` - jarURI: local:///opt/hoptimator-flink-runner.jar + - CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.format'='json') + - CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.format'='json') + - INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `existing-topic-2` + jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless state: running diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 2fb4158e..ec8beee2 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -157,9 +157,15 @@ public void implement(SqlWriter w) { /** Implements an arbitrary RelNode as a query */ class QueryImplementor implements ScriptImplementor { private final RelNode relNode; + private final boolean dropDatabaseName; public QueryImplementor(RelNode relNode) { + this(relNode, true); + } + + public QueryImplementor(RelNode relNode, boolean dropDatabaseName) { this.relNode = relNode; + this.dropDatabaseName = dropDatabaseName; } @Override @@ -192,24 +198,32 @@ public SqlNode visit(SqlCall call) { private static class UnflattenMemberAccess extends SqlShuttle { private final Set sinkFieldList; + private final boolean dropDatabaseName; UnflattenMemberAccess(QueryImplementor outer) { this.sinkFieldList = outer.relNode.getRowType().getFieldList() .stream() .map(RelDataTypeField::getName) .collect(Collectors.toSet()); + this.dropDatabaseName = outer.dropDatabaseName; } // SqlShuttle gets called for every field in SELECT and every table name in FROM alike // For fields in SELECT, we want to unflatten them as `FOO_BAR`, for tables `FOO.BAR` + // or just `BAR` if we need to drop the database name (For Flink) @Override public SqlNode visit(SqlIdentifier id) { if (id.names.size() == 1 && sinkFieldList.contains(id.names.get(0))) { id.assignNamesFrom(new SqlIdentifier(id.names.get(0).replaceAll("\\$", "_"), SqlParserPos.ZERO)); } else { - SqlIdentifier replacement = new SqlIdentifier(id.names.stream() - .flatMap(x -> Stream.of(x.split("\\$"))) - .collect(Collectors.toList()), SqlParserPos.ZERO); + SqlIdentifier replacement; + if (id.names.size() == 2 && this.dropDatabaseName) { + replacement = new SqlIdentifier(id.names.subList(1, id.names.size()), SqlParserPos.ZERO); + } else { + replacement = new SqlIdentifier(id.names.stream() + .flatMap(x -> Stream.of(x.split("\\$"))) + .collect(Collectors.toList()), SqlParserPos.ZERO); + } id.assignNamesFrom(replacement); } return id; diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id index 98b9e2e1..dc1a4020 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id @@ -13,8 +13,8 @@ spec: args: - CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') - CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - - INSERT INTO `test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store` - jarURI: local:///opt/hoptimator-flink-runner.jar + - INSERT INTO `test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `test-store` + jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless state: running diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id index cec687e0..ca82454a 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id @@ -13,8 +13,8 @@ spec: args: - CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') - CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - - INSERT INTO `test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store` - jarURI: local:///opt/hoptimator-flink-runner.jar + - INSERT INTO `test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `test-store` + jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless state: running diff --git a/hoptimator-venice/src/test/resources/venice-ddl-select.id b/hoptimator-venice/src/test/resources/venice-ddl-select.id index 78ebc693..f9643a73 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-select.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-select.id @@ -13,8 +13,8 @@ spec: args: - CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - CREATE TABLE IF NOT EXISTS `SINK` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH () - - INSERT INTO `SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1` - jarURI: local:///opt/hoptimator-flink-runner.jar + - INSERT INTO `SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `test-store-1` + jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless state: running From 11e3ecf3af4ad47b51e386cc244027c6cea76ce1 Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Wed, 15 Jan 2025 22:02:31 -0500 Subject: [PATCH 2/3] Update integration-test step --- .github/workflows/integration-tests.yml | 24 ++++++++++++++----- Makefile | 20 +++++++++++----- README.md | 2 +- deploy/dev/kafka.yaml | 5 +++- .../docker-compose-single-dc-setup.yaml | 2 ++ etc/cluster.yaml | 12 ++++++++++ 6 files changed, 51 insertions(+), 14 deletions(-) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 0bdbb367..971fe21e 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -34,15 +34,27 @@ jobs: kind load docker-image hoptimator-flink-runner kind load docker-image hoptimator-flink-operator - name: Run Integration Tests - run: make integration-tests + run: make integration-tests-kind + - name: Capture Integration Reports + if: failure() + uses: actions/upload-artifact@v4.6.0 + with: + name: reports + path: | + **/build/reports/tests/intTest/ - name: Capture Cluster State if: always() run: | - kubectl describe pods - kubectl describe deployments - kubectl describe kafkas -n kafka - kubectl describe flinkdeployments - kubectl describe subscriptions + kubectl get pods + kubectl get svc + kubectl get deployments + kubectl get pods -n kafka + kubectl get svc -n kafka + kubectl get deployments -n kafka + kubectl get kafkas -n kafka + kubectl get flinkdeployments + kubectl get subscriptions + kubectl get pipelines - name: Capture Flink Job Logs if: always() run: | diff --git a/Makefile b/Makefile index f67a0409..066689bf 100644 --- a/Makefile +++ b/Makefile @@ -25,8 +25,10 @@ undeploy-config: deploy: deploy-config kubectl apply -f ./hoptimator-k8s/src/main/resources/ kubectl apply -f ./deploy + kubectl apply -f ./deploy/dev/rbac.yaml undeploy: undeploy-config + kubectl delete -f ./deploy/dev/rbac.yaml || echo "skipping" kubectl delete -f ./deploy || echo "skipping" kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping" @@ -70,7 +72,6 @@ deploy-kafka: deploy deploy-flink kubectl create namespace kafka || echo "skipping" kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io - kubectl apply -f ./hoptimator-k8s/src/main/resources/ kubectl apply -f ./deploy/dev kubectl apply -f ./deploy/samples/demodb.yaml kubectl apply -f ./deploy/samples/kafkadb.yaml @@ -83,7 +84,6 @@ undeploy-kafka: kubectl delete -f ./deploy/samples/kafkadb.yaml || echo "skipping" kubectl delete -f ./deploy/samples/demodb.yaml || echo "skipping" kubectl delete -f ./deploy/dev || echo "skipping" - kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping" kubectl delete namespace kafka || echo "skipping" # Deploys Venice cluster in docker and creates two stores in Venice. Stores are not managed via K8s for now. @@ -101,13 +101,21 @@ deploy-dev-environment: deploy deploy-flink deploy-kafka deploy-venice undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy -# Integration tests expect K8s, Kafka, and Venice to be running +# Integration test setup intended to be run locally integration-tests: deploy-dev-environment deploy-samples kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka - kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid - ./gradlew intTest; status=$$?; kill `cat port-forward.pid`; exit $$status + kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & echo $$! > port-forward.pid + ./gradlew intTest || kill `cat port-forward.pid` + kill `cat port-forward.pid` + +# kind cluster used in github workflow needs to have different routing set up, avoiding the need to forward kafka ports +integration-tests-kind: deploy-dev-environment deploy-samples + kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka + kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka + kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka + ./gradlew intTest generate-models: ./generate-models.sh @@ -117,4 +125,4 @@ release: test -n "$(VERSION)" # MISSING ARG: $$VERSION ./gradlew publish -.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-demo undeploy-demo deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice integration-tests deploy-dev-environment undeploy-dev-environment generate-models release +.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-demo undeploy-demo deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice integration-tests integration-tests-kind deploy-dev-environment undeploy-dev-environment generate-models release diff --git a/README.md b/README.md index 43f1b67f..ec027d6c 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ The below setup will create a dev environment with various resources within Kube ``` $ make install # build and install SQL CLI $ make deploy-dev-environment # start all local dev setups - $ kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & # forward external Kafka port for use by SQL CLI + $ kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & # forward external Kafka port for use by SQL CLI $ ./hoptimator # start the SQL CLI > !intro ``` diff --git a/deploy/dev/kafka.yaml b/deploy/dev/kafka.yaml index ce61b94f..b3e92add 100644 --- a/deploy/dev/kafka.yaml +++ b/deploy/dev/kafka.yaml @@ -41,9 +41,12 @@ spec: type: nodeport tls: false configuration: + bootstrap: + nodePort: 31092 brokers: - broker: 0 - advertisedHost: localhost + advertisedHost: 127.0.0.1 + nodePort: 31234 config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 diff --git a/deploy/docker/venice/docker-compose-single-dc-setup.yaml b/deploy/docker/venice/docker-compose-single-dc-setup.yaml index 2eb1460a..ce9ecc72 100644 --- a/deploy/docker/venice/docker-compose-single-dc-setup.yaml +++ b/deploy/docker/venice/docker-compose-single-dc-setup.yaml @@ -16,6 +16,8 @@ services: hostname: kafka environment: - ZOOKEEPER_ADDRESS=zookeeper:2181 + ports: + - 9095:9092 depends_on: zookeeper: condition: service_healthy diff --git a/etc/cluster.yaml b/etc/cluster.yaml index d996fa3c..84a1a003 100644 --- a/etc/cluster.yaml +++ b/etc/cluster.yaml @@ -2,5 +2,17 @@ kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 # Additional worker nodes will not add any real resource capacity. However, # it should mean that there are more CPU reservations to hand out. +networking: + ipFamily: ipv4 + apiServerAddress: 127.0.0.1 nodes: - role: control-plane + extraPortMappings: + - containerPort: 31092 + hostPort: 9092 + listenAddress: "127.0.0.1" + protocol: TCP + - containerPort: 31234 + hostPort: 31234 + listenAddress: "127.0.0.1" + protocol: TCP \ No newline at end of file From 338d2caf01043d77033fef171f84dacbbd263f90 Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Thu, 16 Jan 2025 11:05:10 -0500 Subject: [PATCH 3/3] Fix hoptimator-operator crashing issue --- Dockerfile | 1 + deploy/dev/rbac.yaml | 4 +-- deploy/rbac.yaml | 8 ++--- .../linkedin/hoptimator/k8s/K8sContext.java | 29 ++++++++++++++----- hoptimator-operator-integration/build.gradle | 2 +- .../operator/PipelineOperatorApp.java | 2 +- 6 files changed, 31 insertions(+), 15 deletions(-) diff --git a/Dockerfile b/Dockerfile index d24bca8e..203ddc76 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,5 +2,6 @@ FROM eclipse-temurin:18 WORKDIR /home/ ADD ./hoptimator-operator-integration/build/distributions/hoptimator-operator-integration.tar ./ ADD ./etc/* ./ +ENV POD_NAMESPACE_FILEPATH=/var/run/secrets/kubernetes.io/serviceaccount/namespace ENTRYPOINT ["/bin/sh", "-c"] diff --git a/deploy/dev/rbac.yaml b/deploy/dev/rbac.yaml index 9595182f..9682cbc7 100644 --- a/deploy/dev/rbac.yaml +++ b/deploy/dev/rbac.yaml @@ -1,5 +1,5 @@ apiVersion: rbac.authorization.k8s.io/v1 -kind: RoleBinding +kind: ClusterRoleBinding metadata: name: hoptimator-operator namespace: default @@ -8,6 +8,6 @@ subjects: name: hoptimator-operator namespace: default roleRef: - kind: Role + kind: ClusterRole name: hoptimator-operator apiGroup: rbac.authorization.k8s.io diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index 023a87e7..cef944b7 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -1,16 +1,16 @@ apiVersion: rbac.authorization.k8s.io/v1 -kind: Role +kind: ClusterRole metadata: namespace: default name: hoptimator-operator rules: - apiGroups: ["hoptimator.linkedin.com"] - resources: ["acls", "kafkatopics", "subscriptions", "sqljobs"] + resources: ["acls", "kafkatopics", "subscriptions", "sqljobs", "pipelines"] verbs: ["get", "watch", "list", "update", "create"] - apiGroups: ["hoptimator.linkedin.com"] - resources: ["kafkatopics/status", "subscriptions/status", "acls/status", "sqljobs/status"] + resources: ["kafkatopics/status", "subscriptions/status", "acls/status", "sqljobs/status", "pipelines/status"] verbs: ["get", "patch"] - apiGroups: ["flink.apache.org"] - resources: ["flinkdeployments"] + resources: ["flinkdeployments", "flinksessionjobs"] verbs: ["get", "update", "create"] diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java index d8088dfa..57051b12 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.io.Reader; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Optional; @@ -14,6 +15,7 @@ import io.kubernetes.client.informer.SharedInformerFactory; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.util.ClientBuilder; +import io.kubernetes.client.util.Config; import io.kubernetes.client.util.KubeConfig; import io.kubernetes.client.util.generic.GenericKubernetesApi; import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi; @@ -101,13 +103,26 @@ public static void useContext(K8sContext context) { } static K8sContext defaultContext() throws IOException { - File file = Paths.get(System.getProperty("user.home"), ".kube", "config").toFile(); - try (Reader r = Files.newBufferedReader(file.toPath())) { - KubeConfig kubeConfig = KubeConfig.loadKubeConfig(r); - kubeConfig.setFile(file); - ApiClient apiClient = ClientBuilder.kubeconfig(kubeConfig).build(); - String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse("default"); - return new K8sContext(kubeConfig.getCurrentContext(), namespace, apiClient); + Path path = Paths.get(System.getProperty("user.home"), ".kube", "config"); + if (Files.exists(path)) { + File file = path.toFile(); + try (Reader r = Files.newBufferedReader(file.toPath())) { + KubeConfig kubeConfig = KubeConfig.loadKubeConfig(r); + kubeConfig.setFile(file); + ApiClient apiClient = ClientBuilder.kubeconfig(kubeConfig).build(); + String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse("default"); + return new K8sContext(kubeConfig.getCurrentContext(), namespace, apiClient); + } + } else { + ApiClient apiClient = Config.defaultClient(); + String filePath = System.getenv("POD_NAMESPACE_FILEPATH"); + String namespace; + if (filePath == null) { + namespace = "default"; + } else { + namespace = new String(Files.readAllBytes(Paths.get(filePath))); + } + return new K8sContext("default", namespace, apiClient); } } } diff --git a/hoptimator-operator-integration/build.gradle b/hoptimator-operator-integration/build.gradle index 9d40238f..9ab687c8 100644 --- a/hoptimator-operator-integration/build.gradle +++ b/hoptimator-operator-integration/build.gradle @@ -7,7 +7,7 @@ plugins { dependencies { implementation project(':hoptimator-operator') implementation libs.slf4j.simple - + testImplementation libs.junit testImplementation libs.assertj } diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java index 77153656..5297c4cd 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java @@ -16,7 +16,7 @@ public class PipelineOperatorApp { - private static final Logger log = LoggerFactory.getLogger(HoptimatorOperatorApp.class); + private static final Logger log = LoggerFactory.getLogger(PipelineOperatorApp.class); public static void main(String[] args) throws Exception { new PipelineOperatorApp().run();