From f4fb89e799bf19e227e1ab24c074270fd26df86e Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Thu, 25 Jul 2024 14:46:32 +0200 Subject: [PATCH] feat(ingest/spark): Promote beta plugin (#10881) Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- build.gradle | 3 +- docs-website/filterTagIndexes.json | 2 +- docs-website/sidebars.js | 10 +- docs/lineage/openlineage.md | 8 +- metadata-ingestion/README.md | 2 +- .../docs/sources/databricks/README.md | 2 +- .../README.md | 93 +++++---- .../build.gradle | 0 .../scripts/check_jar.sh | 0 .../datahub/spark/DatahubEventEmitter.java | 27 +++ .../datahub/spark/DatahubSparkListener.java | 188 ++++++++++++----- .../spark/conf/DatahubEmitterConfig.java | 0 .../spark/conf/FileDatahubEmitterConfig.java | 18 ++ .../spark/conf/KafkaDatahubEmitterConfig.java | 18 ++ .../spark/conf/RestDatahubEmitterConfig.java | 0 .../spark/conf/S3DatahubEmitterConfig.java | 18 ++ .../datahub/spark/conf/SparkAppContext.java | 0 .../datahub/spark/conf/SparkConfigParser.java | 29 ++- .../datahub/spark/conf/SparkLineageConf.java | 0 .../SparkStreamingEventToDatahub.java | 0 .../spark/agent/util/PlanUtils.java | 0 .../spark/agent/util/RddPathUtils.java | 0 .../agent/util/RemovePathPatternUtils.java | 0 .../agent/vendor/redshift/Constants.java | 0 .../agent/vendor/redshift/RedshiftVendor.java | 0 .../redshift/lifecycle/RedshiftDataset.java | 0 .../lifecycle/RedshiftRelationVisitor.java | 0 .../lifecycle/RedshiftVisitorFactory.java | 0 .../plan/RedshiftEventHandlerFactory.java | 0 ...shiftSaveIntoDataSourceCommandBuilder.java | 0 .../io/openlineage/spark/api/Vendors.java | 0 .../io/openlineage/spark/api/VendorsImpl.java | 0 .../datahub/spark/HdfsPathDatasetTest.java | 0 .../spark/OpenLineageEventToDatahubTest.java | 60 ++++++ .../src/test/resources/log4j.properties | 0 .../resources/ol_events/gs_input_output.json | 62 ++++++ .../ol_events/redshift_lineage_spark.json | 0 .../redshift_mixed_case_lineage_spark.json | 0 .../ol_events/sample_failed_spark.json | 0 .../test/resources/ol_events/sample_glue.json | 0 .../sample_glue_ol_0_17_changes.json | 168 ++++++++++++++++ .../resources/ol_events/sample_spark.json | 0 .../log4j-defaults.properties | 0 .../java/datahub-client/build.gradle | 11 +- .../java/datahub-client/scripts/check_jar.sh | 4 + .../java/datahub/client/file/FileEmitter.java | 2 +- .../datahub/client/kafka/KafkaEmitter.java | 1 + .../java/datahub/client/s3/S3Emitter.java | 190 ++++++++++++++++++ .../datahub/client/s3/S3EmitterConfig.java | 24 +++ .../converter/OpenLineageToDataHub.java | 21 +- .../InsertIntoHadoopFsRelationVisitor.java | 79 -------- .../README.md | 2 +- .../build.gradle | 0 .../scripts/check_jar.sh | 5 +- .../spark-smoke-test/docker/.dockerignore | 0 .../docker/SparkBase.Dockerfile | 0 .../docker/SparkMaster.Dockerfile | 0 .../docker/SparkSlave.Dockerfile | 0 .../docker/SparkSubmit.Dockerfile | 0 .../spark-smoke-test/docker/build_images.sh | 0 .../docker/spark-docker-compose.yml | 0 .../golden_json/JavaHdfsIn2HdfsOut1.json | 0 .../golden_json/JavaHdfsIn2HdfsOut2.json | 0 .../JavaHdfsIn2HiveCreateInsertTable.json | 0 .../JavaHdfsIn2HiveCreateTable.json | 0 .../golden_json/JavaHiveInHiveOut.json | 0 .../golden_json/PythonHdfsIn2HdfsOut1.json | 0 .../golden_json/PythonHdfsIn2HdfsOut2.json | 0 .../PythonHdfsIn2HiveCreateInsertTable.json | 0 .../PythonHdfsIn2HiveCreateTable.json | 0 .../golden_json/PythonHiveInHiveOut.json | 0 .../HdfsIn2HdfsOut1.py | 0 .../HdfsIn2HdfsOut2.py | 0 .../HdfsIn2HiveCreateInsertTable.py | 0 .../HdfsIn2HiveCreateTable.py | 0 .../HiveInHiveOut.py | 0 .../python_test_run.sh | 0 .../spark-smoke-test/requirements.txt | 0 .../resources/data/in1.csv/part1.csv | 0 .../resources/data/in2.csv/part1.csv | 0 .../setup_spark_smoke_test.sh | 0 .../spark-smoke-test/smoke.sh | 0 .../spark-smoke-test/spark-docker.conf | 0 .../test-spark-lineage/.gitignore | 0 .../test-spark-lineage/build.gradle | 0 .../gradle/wrapper/gradle-wrapper.jar | Bin .../gradle/wrapper/gradle-wrapper.properties | 0 .../test-spark-lineage/gradlew | 0 .../test-spark-lineage/gradlew.bat | 0 .../test-spark-lineage/java_test_run.sh | 0 .../test-spark-lineage/settings.gradle | 0 .../test/spark/lineage/HdfsIn2HdfsOut1.java | 0 .../test/spark/lineage/HdfsIn2HdfsOut2.java | 0 .../lineage/HdfsIn2HiveCreateInsertTable.java | 0 .../spark/lineage/HdfsIn2HiveCreateTable.java | 0 .../test/spark/lineage/HiveInHiveOut.java | 0 .../spark/lineage/HiveInHiveOut_test1.java | 0 .../main/java/test/spark/lineage/Utils.java | 0 .../spark-smoke-test/test_e2e.py | 0 .../spark-smoke-test/url_list.txt | 0 .../datahub/spark/DatahubSparkListener.java | 0 .../java/datahub/spark/DatasetExtractor.java | 0 .../consumer/impl/CoalesceJobsEmitter.java | 0 .../spark/consumer/impl/McpEmitter.java | 61 +++++- .../java/datahub/spark/model/AppEndEvent.java | 0 .../datahub/spark/model/AppStartEvent.java | 0 .../datahub/spark/model/DatasetLineage.java | 0 .../datahub/spark/model/LineageConsumer.java | 0 .../datahub/spark/model/LineageEvent.java | 0 .../datahub/spark/model/LineageUtils.java | 0 .../spark/model/SQLQueryExecEndEvent.java | 0 .../spark/model/SQLQueryExecStartEvent.java | 0 .../model/dataset/CatalogTableDataset.java | 0 .../spark/model/dataset/HdfsPathDataset.java | 0 .../spark/model/dataset/JdbcDataset.java | 0 .../spark/model/dataset/SparkDataset.java | 0 .../datahub/spark/TestCoalesceJobLineage.java | 0 .../datahub/spark/TestSparkJobsLineage.java | 0 .../src/test/resources/data/in1.csv/part1.csv | 0 .../src/test/resources/data/in2.csv/part1.csv | 0 .../src/test/resources/data/in3.csv/part1.csv | 0 .../src/test/resources/data/in4.csv/part1.csv | 0 .../resources/expected/test1HdfsInOut.json | 0 .../expected/test2HdfsInHiveOut.json | 0 .../expected/test3HdfsJdbcInJdbcOut.json | 0 .../expected/test4HiveInHiveOut.json | 0 .../expected/test5HdfsInJdbcOut.json | 0 .../test6HdfsJdbcInJdbcOutTwoLevel.json | 0 .../expected/test7HdfsInPersistHdfsOut.json | 0 .../test8PersistHdfsJdbcInJdbcOut.json | 0 .../expected/test9PersistJdbcInHdfsOut.json | 0 .../expected/testHdfsInOutCoalesce.json | 0 .../expected/testHiveInHiveOutCoalesce.json | 0 .../apache/spark/log4j-defaults.properties | 0 .../java/spark-lineage/bin/.gitignore | 2 - .../test-spark-lineage/bin/.gitignore | 2 - settings.gradle | 4 +- 137 files changed, 906 insertions(+), 210 deletions(-) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/README.md (61%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/build.gradle (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/scripts/check_jar.sh (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/datahub/spark/DatahubEventEmitter.java (92%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/datahub/spark/DatahubSparkListener.java (66%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/datahub/spark/conf/DatahubEmitterConfig.java (100%) create mode 100644 metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/FileDatahubEmitterConfig.java create mode 100644 metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/datahub/spark/conf/RestDatahubEmitterConfig.java (100%) create mode 100644 metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/S3DatahubEmitterConfig.java rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/datahub/spark/conf/SparkAppContext.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/datahub/spark/conf/SparkConfigParser.java (90%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/datahub/spark/conf/SparkLineageConf.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/agent/vendor/redshift/Constants.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/agent/vendor/redshift/RedshiftVendor.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftDataset.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftRelationVisitor.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftVisitorFactory.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftEventHandlerFactory.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftSaveIntoDataSourceCommandBuilder.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/api/Vendors.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/main/java/io/openlineage/spark/api/VendorsImpl.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/test/java/datahub/spark/HdfsPathDatasetTest.java (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java (92%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/test/resources/log4j.properties (100%) create mode 100644 metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/gs_input_output.json rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/test/resources/ol_events/redshift_lineage_spark.json (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/test/resources/ol_events/redshift_mixed_case_lineage_spark.json (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/test/resources/ol_events/sample_failed_spark.json (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/test/resources/ol_events/sample_glue.json (100%) create mode 100644 metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/sample_glue_ol_0_17_changes.json rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/test/resources/ol_events/sample_spark.json (100%) rename metadata-integration/java/{spark-lineage-beta => acryl-spark-lineage}/src/test/resources/org.apache.spark/log4j-defaults.properties (100%) create mode 100644 metadata-integration/java/datahub-client/src/main/java/datahub/client/s3/S3Emitter.java create mode 100644 metadata-integration/java/datahub-client/src/main/java/datahub/client/s3/S3EmitterConfig.java delete mode 100644 metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/plan/InsertIntoHadoopFsRelationVisitor.java rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/README.md (99%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/build.gradle (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/scripts/check_jar.sh (94%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/docker/.dockerignore (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/docker/SparkBase.Dockerfile (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/docker/SparkMaster.Dockerfile (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/docker/SparkSlave.Dockerfile (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/docker/SparkSubmit.Dockerfile (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/docker/build_images.sh (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/docker/spark-docker-compose.yml (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/golden_json/JavaHdfsIn2HdfsOut1.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/golden_json/JavaHdfsIn2HdfsOut2.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/golden_json/JavaHdfsIn2HiveCreateInsertTable.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/golden_json/JavaHdfsIn2HiveCreateTable.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/golden_json/JavaHiveInHiveOut.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/golden_json/PythonHdfsIn2HdfsOut1.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/golden_json/PythonHdfsIn2HdfsOut2.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/golden_json/PythonHdfsIn2HiveCreateInsertTable.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/golden_json/PythonHdfsIn2HiveCreateTable.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/golden_json/PythonHiveInHiveOut.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut1.py (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut2.py (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/python-spark-lineage-test/HdfsIn2HiveCreateInsertTable.py (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/python-spark-lineage-test/HdfsIn2HiveCreateTable.py (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/python-spark-lineage-test/HiveInHiveOut.py (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/python-spark-lineage-test/python_test_run.sh (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/requirements.txt (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/resources/data/in1.csv/part1.csv (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/resources/data/in2.csv/part1.csv (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/setup_spark_smoke_test.sh (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/smoke.sh (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/spark-docker.conf (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/.gitignore (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/build.gradle (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/gradle/wrapper/gradle-wrapper.jar (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/gradle/wrapper/gradle-wrapper.properties (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/gradlew (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/gradlew.bat (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/java_test_run.sh (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/settings.gradle (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HdfsOut1.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HdfsOut2.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HiveCreateInsertTable.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HiveCreateTable.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HiveInHiveOut.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HiveInHiveOut_test1.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/Utils.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/test_e2e.py (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/spark-smoke-test/url_list.txt (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/DatahubSparkListener.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/DatasetExtractor.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/consumer/impl/CoalesceJobsEmitter.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/consumer/impl/McpEmitter.java (60%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/AppEndEvent.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/AppStartEvent.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/DatasetLineage.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/LineageConsumer.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/LineageEvent.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/LineageUtils.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/SQLQueryExecEndEvent.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/dataset/CatalogTableDataset.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/dataset/JdbcDataset.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/main/java/datahub/spark/model/dataset/SparkDataset.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/java/datahub/spark/TestCoalesceJobLineage.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/java/datahub/spark/TestSparkJobsLineage.java (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/data/in1.csv/part1.csv (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/data/in2.csv/part1.csv (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/data/in3.csv/part1.csv (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/data/in4.csv/part1.csv (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/expected/test1HdfsInOut.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/expected/test2HdfsInHiveOut.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/expected/test3HdfsJdbcInJdbcOut.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/expected/test4HiveInHiveOut.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/expected/test5HdfsInJdbcOut.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/expected/test6HdfsJdbcInJdbcOutTwoLevel.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/expected/test7HdfsInPersistHdfsOut.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/expected/test8PersistHdfsJdbcInJdbcOut.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/expected/test9PersistJdbcInHdfsOut.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/expected/testHdfsInOutCoalesce.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/expected/testHiveInHiveOutCoalesce.json (100%) rename metadata-integration/java/{spark-lineage => spark-lineage-legacy}/src/test/resources/org/apache/spark/log4j-defaults.properties (100%) delete mode 100644 metadata-integration/java/spark-lineage/bin/.gitignore delete mode 100644 metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/bin/.gitignore diff --git a/build.gradle b/build.gradle index 569987cf73c6c..adb45705c0ebd 100644 --- a/build.gradle +++ b/build.gradle @@ -54,7 +54,7 @@ buildscript { ext.hazelcastVersion = '5.3.6' ext.ebeanVersion = '12.16.1' ext.googleJavaFormatVersion = '1.18.1' - ext.openLineageVersion = '1.16.0' + ext.openLineageVersion = '1.19.0' ext.logbackClassicJava8 = '1.2.12' ext.docker_registry = 'acryldata' @@ -111,6 +111,7 @@ project.ext.externalDependency = [ 'avroCompiler': 'org.apache.avro:avro-compiler:1.11.3', 'awsGlueSchemaRegistrySerde': 'software.amazon.glue:schema-registry-serde:1.1.17', 'awsMskIamAuth': 'software.amazon.msk:aws-msk-iam-auth:2.0.3', + 'awsS3': 'software.amazon.awssdk:s3:2.26.21', 'awsSecretsManagerJdbc': 'com.amazonaws.secretsmanager:aws-secretsmanager-jdbc:1.0.13', 'awsPostgresIamAuth': 'software.amazon.jdbc:aws-advanced-jdbc-wrapper:1.0.2', 'awsRds':'software.amazon.awssdk:rds:2.18.24', diff --git a/docs-website/filterTagIndexes.json b/docs-website/filterTagIndexes.json index 8caff3497a200..2fa00120fd2cc 100644 --- a/docs-website/filterTagIndexes.json +++ b/docs-website/filterTagIndexes.json @@ -562,7 +562,7 @@ } }, { - "Path": "docs/metadata-integration/java/spark-lineage-beta", + "Path": "docs/metadata-integration/java/acryl-spark-lineage", "imgPath": "img/logos/platforms/spark.svg", "Title": "Spark", "Description": "Spark is a data processing tool that enables fast and efficient processing of large-scale data sets using distributed computing.", diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index 8e48062af6d4d..a7aece48be3bc 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -419,17 +419,13 @@ module.exports = { }, { type: "doc", - id: "metadata-integration/java/spark-lineage/README", - label: "Spark (Legacy)", - }, - { - type: "doc", - id: "metadata-integration/java/spark-lineage-beta/README", + id: "metadata-integration/java/acryl-spark-lineage/README", label: "Spark", }, //"docker/airflow/local_airflow", "metadata-ingestion/integration_docs/great-expectations", "metadata-integration/java/datahub-protobuf/README", + //"metadata-integration/java/spark-lineage-legacy/README", //"metadata-ingestion/source-docs-template", { type: "autogenerated", @@ -886,7 +882,7 @@ module.exports = { //"docs/how/graph-onboarding", //"docs/demo/graph-onboarding", //"metadata-integration/java/spark-lineage/README", - // "metadata-integration/java/spark-lineage-beta/README.md + // "metadata-integration/java/acryl-spark-lineage/README.md // "metadata-integration/java/openlineage-converter/README" //"metadata-ingestion-modules/airflow-plugin/README" //"metadata-ingestion-modules/dagster-plugin/README" diff --git a/docs/lineage/openlineage.md b/docs/lineage/openlineage.md index 0b9423bf2c4da..c91aa7499802c 100644 --- a/docs/lineage/openlineage.md +++ b/docs/lineage/openlineage.md @@ -6,7 +6,7 @@ DataHub, now supports [OpenLineage](https://openlineage.io/) integration. With t - **REST Endpoint Support**: DataHub now includes a REST endpoint that can understand OpenLineage events. This allows users to send lineage information directly to DataHub, enabling easy integration with various data processing frameworks. -- **[Spark Event Listener Plugin](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta)**: DataHub provides a Spark Event Listener plugin that seamlessly integrates OpenLineage's Spark plugin. This plugin enhances DataHub's OpenLineage support by offering additional features such as PathSpec support, column-level lineage, patch support and more. +- **[Spark Event Listener Plugin](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage)**: DataHub provides a Spark Event Listener plugin that seamlessly integrates OpenLineage's Spark plugin. This plugin enhances DataHub's OpenLineage support by offering additional features such as PathSpec support, column-level lineage, patch support and more. ## OpenLineage Support with DataHub @@ -73,7 +73,7 @@ The transport should look like this: #### Known Limitations With Spark and Airflow we recommend using the Spark Lineage or DataHub's Airflow plugin for tighter integration with DataHub. -- **[PathSpec](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta/#configuring-hdfs-based-dataset-urns) Support**: While the REST endpoint supports OpenLineage messages, full [PathSpec](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta/#configuring-hdfs-based-dataset-urns)) support is not yet available. +- **[PathSpec](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns) Support**: While the REST endpoint supports OpenLineage messages, full [PathSpec](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns)) support is not yet available. - **Column-level Lineage**: DataHub's current OpenLineage support does not provide full column-level lineage tracking. - etc... @@ -83,10 +83,10 @@ DataHub's Spark Event Listener plugin enhances OpenLineage support by providing #### How to Use -Follow the guides of the Spark Lineage plugin page for more information on how to set up the Spark Lineage plugin. The guide can be found [here](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta) +Follow the guides of the Spark Lineage plugin page for more information on how to set up the Spark Lineage plugin. The guide can be found [here](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage) ## References - [OpenLineage](https://openlineage.io/) - [DataHub OpenAPI Guide](../api/openapi/openapi-usage-guide.md) -- [DataHub Spark Lineage Plugin](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta) +- [DataHub Spark Lineage Plugin](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage) diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 54478fddbe2d0..f5add58a0543a 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -19,7 +19,7 @@ Integration can be divided into two concepts based on the method: ### Push-based Integration Push-based integrations allow you to emit metadata directly from your data systems when metadata changes. -Examples of push-based integrations include [Airflow](../docs/lineage/airflow.md), [Spark](../metadata-integration/java/spark-lineage/README.md), [Great Expectations](./integration_docs/great-expectations.md) and [Protobuf Schemas](../metadata-integration/java/datahub-protobuf/README.md). This allows you to get low-latency metadata integration from the "active" agents in your data ecosystem. +Examples of push-based integrations include [Airflow](../docs/lineage/airflow.md), [Spark](../metadata-integration/java/acryl-spark-lineage/README.md), [Great Expectations](./integration_docs/great-expectations.md) and [Protobuf Schemas](../metadata-integration/java/datahub-protobuf/README.md). This allows you to get low-latency metadata integration from the "active" agents in your data ecosystem. ### Pull-based Integration diff --git a/metadata-ingestion/docs/sources/databricks/README.md b/metadata-ingestion/docs/sources/databricks/README.md index a6cf39084c6ab..5147ddcd80680 100644 --- a/metadata-ingestion/docs/sources/databricks/README.md +++ b/metadata-ingestion/docs/sources/databricks/README.md @@ -11,7 +11,7 @@ The alternative way to integrate is via the Hive connector. The [Hive starter re ## Databricks Spark -To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/spark-lineage-beta/README.md#configuration-instructions-databricks). +To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/acryl-spark-lineage/README.md#configuration-instructions-databricks). ## Watch the DataHub Talk at the Data and AI Summit 2022 diff --git a/metadata-integration/java/spark-lineage-beta/README.md b/metadata-integration/java/acryl-spark-lineage/README.md similarity index 61% rename from metadata-integration/java/spark-lineage-beta/README.md rename to metadata-integration/java/acryl-spark-lineage/README.md index b0753936dd677..81108aa7b914d 100644 --- a/metadata-integration/java/spark-lineage-beta/README.md +++ b/metadata-integration/java/acryl-spark-lineage/README.md @@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co ```text #Configuring DataHub spark agent jar -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.13 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.15 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server http://localhost:8080 ``` @@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080 ## spark-submit command line ```sh -spark-submit --packages io.acryl:acryl-spark-lineage:0.2.13 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py +spark-submit --packages io.acryl:acryl-spark-lineage:0.2.15 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py ``` ### Configuration Instructions: Amazon EMR @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html) ```text -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.13 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.15 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server https://your_datahub_host/gms #If you have authentication set up then you also need to specify the Datahub access token @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh spark = SparkSession.builder .master("spark://spark-master:7077") .appName("test-application") -.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.13") +.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.15") .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.rest.server", "http://localhost:8080") .enableHiveSupport() @@ -157,34 +157,44 @@ information like tokens. ## Configuration Options -| Field | Required | Default | Description | -|---------------------------------------------------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:acryl-spark-lineage:0.2.13 | -| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | -| spark.datahub.rest.server | ✅ | | Datahub server url eg: | -| spark.datahub.rest.token | | | Authentication token. | -| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | -| spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed | -| spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries | -| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance | -| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) | -| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD | -| spark.datahub.metadata.dataset.hivePlatformAlias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` | -| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions | -| spark.datahub.metadata.remove_partition_pattern | | | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table | -| spark.datahub.coalesce_jobs | | true | Only one datajob(task) will be emitted containing all input and output datasets for the spark application | -| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true | -| spark.datahub.metadata.dataset.materialize | | false | Materialize Datasets in DataHub | -| spark.datahub.platform.s3.path_spec_list | | | List of pathspec per platform | -| spark.datahub.metadata.dataset.experimental_include_schema_metadata | false | | Emit dataset schema metadata based on the spark | -| spark.datahub.flow_name | | | If it is set it will be used as the DataFlow name otherwise it uses spark app name as flow_name | -| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | -| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | -| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | -| spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. | -| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. | -| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default, it is disabled. | -| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the s3 location instead of the Hive table. By default, it is disabled. | +| Field | Required | Default | Description | +|--------------------------------------------------------|----------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:acryl-spark-lineage:0.2.15 | +| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | +| spark.datahub.emitter | | rest | Specify the ways to emit metadata. By default it sends to DataHub using REST emitter. Valid options are rest, kafka or file | +| spark.datahub.rest.server | | http://localhost:8080 | Datahub server url eg: | +| spark.datahub.rest.token | | | Authentication token. | +| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | +| spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed | +| spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries | +| spark.datahub.file.filename | | | The file where metadata will be written if file emitter is set | +| spark.datahub.kafka.bootstrap | | | The Kafka bootstrap server url to use if the Kafka emitter is set | +| spark.datahub.kafka.schema_registry_url | | | The Schema registry url to use if the Kafka emitter is set | +| spark.datahub.kafka.schema_registry_config. | | | Additional config to pass in to the Schema Registry Client | +| spark.datahub.kafka.producer_config. | | | Additional config to pass in to the Kafka producer. For example: `--conf "spark.datahub.kafka.producer_config.client.id=my_client_id"` | +| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance | +| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) | +| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD | +| spark.datahub.metadata.dataset.hivePlatformAlias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` | +| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions | +| spark.datahub.metadata.remove_partition_pattern | | | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table | +| spark.datahub.coalesce_jobs | | true | Only one datajob(task) will be emitted containing all input and output datasets for the spark application | +| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true | +| spark.datahub.metadata.dataset.materialize | | false | Materialize Datasets in DataHub | +| spark.datahub.platform.s3.path_spec_list | | | List of pathspec per platform | +| spark.datahub.metadata.dataset.include_schema_metadata | false | | Emit dataset schema metadata based on the spark execution. It is recommended to get schema information from platform specific DataHub sources as this is less reliable | +| spark.datahub.flow_name | | | If it is set it will be used as the DataFlow name otherwise it uses spark app name as flow_name | +| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | +| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | +| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | +| spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. | +| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. | +| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default, it is disabled. | +| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the s3 location instead of the Hive table. By default, it is disabled. | +| spark.datahub.s3.bucket | | | The name of the bucket where metadata will be written if s3 emitter is set | +| spark.datahub.s3.prefix | | | The prefix for the file where metadata will be written on s3 if s3 emitter is set | +| spark.datahub.s3.filename | | | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set | + ## What to Expect: The Metadata Model @@ -205,8 +215,6 @@ For Spark on Databricks, The following custom properties in pipelines and tasks relate to the Spark UI: - appName and appId in a pipeline can be used to determine the Spark application -- description and SQLQueryId in a task can be used to determine the Query Execution within the application on the SQL - tab of Spark UI - Other custom properties of pipelines and tasks capture the start and end times of execution etc. For Spark on Databricks, pipeline start time is the cluster start time. @@ -222,6 +230,7 @@ This initial release has been tested with the following environments: - spark-submit of Python/Java applications to local and remote servers - Standalone Java applications - Databricks Standalone Cluster +- EMR Testing with Databricks Standard and High-concurrency Cluster is not done yet. @@ -340,20 +349,32 @@ log4j.logger.datahub.client.rest=DEBUG Use Java 8 to build the project. The project uses Gradle as the build tool. To build the project, run the following command: ```shell -./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:spark-lineage-beta:shadowJar +./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:acryl-spark-lineage:shadowJar ``` ## Known limitations + ## Changelog + +### Version 0.2.15 +- Add Kafka emitter to emit lineage to kafka +- Add File emitter to emit lineage to file +- Add S3 emitter to save mcps to s3 +- Upgrading OpenLineage to 1.19.0 +- Renaming project to acryl-datahub-spark-lineage +- Supporting OpenLineage 1.17+ glue identifier changes +- Fix handling OpenLineage input/output where wasn't any facet attached + ### Version 0.2.14 - Fix warning about MeterFilter warning from Micrometer ### Version 0.2.13 -- Silencing some chatty warnings in RddPathUtils +- Add kafka emitter to emit lineage to kafka ### Version 0.2.12 +- Silencing some chatty warnings in RddPathUtils +### Version 0.2.11 - Add option to lowercase dataset URNs - Add option to set platform instance and/or env per platform with `spark.datahub.platform..env` and `spark.datahub.platform..platform_instance` config parameter - Fixing platform instance setting for datasets when `spark.datahub.metadata.dataset.platformInstance` is set diff --git a/metadata-integration/java/spark-lineage-beta/build.gradle b/metadata-integration/java/acryl-spark-lineage/build.gradle similarity index 100% rename from metadata-integration/java/spark-lineage-beta/build.gradle rename to metadata-integration/java/acryl-spark-lineage/build.gradle diff --git a/metadata-integration/java/spark-lineage-beta/scripts/check_jar.sh b/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh similarity index 100% rename from metadata-integration/java/spark-lineage-beta/scripts/check_jar.sh rename to metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java similarity index 92% rename from metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java index dc274ad7df3b4..7a5fdeaeb8e0d 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java @@ -13,10 +13,16 @@ import com.linkedin.domain.Domains; import com.linkedin.mxe.MetadataChangeProposal; import datahub.client.Emitter; +import datahub.client.file.FileEmitter; +import datahub.client.kafka.KafkaEmitter; import datahub.client.rest.RestEmitter; +import datahub.client.s3.S3Emitter; import datahub.event.EventFormatter; import datahub.event.MetadataChangeProposalWrapper; +import datahub.spark.conf.FileDatahubEmitterConfig; +import datahub.spark.conf.KafkaDatahubEmitterConfig; import datahub.spark.conf.RestDatahubEmitterConfig; +import datahub.spark.conf.S3DatahubEmitterConfig; import datahub.spark.conf.SparkLineageConf; import io.datahubproject.openlineage.converter.OpenLineageToDataHub; import io.datahubproject.openlineage.dataset.DatahubDataset; @@ -71,6 +77,27 @@ private Optional getEmitter() { RestDatahubEmitterConfig datahubRestEmitterConfig = (RestDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig(); emitter = Optional.of(new RestEmitter(datahubRestEmitterConfig.getRestEmitterConfig())); + } else if (datahubConf.getDatahubEmitterConfig() instanceof KafkaDatahubEmitterConfig) { + KafkaDatahubEmitterConfig datahubKafkaEmitterConfig = + (KafkaDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig(); + try { + emitter = + Optional.of(new KafkaEmitter(datahubKafkaEmitterConfig.getKafkaEmitterConfig())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (datahubConf.getDatahubEmitterConfig() instanceof FileDatahubEmitterConfig) { + FileDatahubEmitterConfig datahubFileEmitterConfig = + (FileDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig(); + emitter = Optional.of(new FileEmitter(datahubFileEmitterConfig.getFileEmitterConfig())); + } else if (datahubConf.getDatahubEmitterConfig() instanceof S3DatahubEmitterConfig) { + S3DatahubEmitterConfig datahubFileEmitterConfig = + (S3DatahubEmitterConfig) datahubConf.getDatahubEmitterConfig(); + try { + emitter = Optional.of(new S3Emitter(datahubFileEmitterConfig.getS3EmitterConfig())); + } catch (IOException e) { + throw new RuntimeException(e); + } } else { log.error( "DataHub Transport {} not recognized. DataHub Lineage emission will not work", diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java similarity index 66% rename from metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubSparkListener.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index 96fa74d1bca1f..d64e159482c1b 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -5,9 +5,15 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import datahub.client.file.FileEmitterConfig; +import datahub.client.kafka.KafkaEmitterConfig; import datahub.client.rest.RestEmitterConfig; +import datahub.client.s3.S3EmitterConfig; import datahub.spark.conf.DatahubEmitterConfig; +import datahub.spark.conf.FileDatahubEmitterConfig; +import datahub.spark.conf.KafkaDatahubEmitterConfig; import datahub.spark.conf.RestDatahubEmitterConfig; +import datahub.spark.conf.S3DatahubEmitterConfig; import datahub.spark.conf.SparkAppContext; import datahub.spark.conf.SparkConfigParser; import datahub.spark.conf.SparkLineageConf; @@ -98,57 +104,138 @@ public void onApplicationStart(SparkListenerApplicationStart applicationStart) { public Optional initializeEmitter(Config sparkConf) { String emitterType = - sparkConf.hasPath(SparkConfigParser.TRANSPORT_KEY) - ? sparkConf.getString(SparkConfigParser.TRANSPORT_KEY) + sparkConf.hasPath(SparkConfigParser.EMITTER_TYPE) + ? sparkConf.getString(SparkConfigParser.EMITTER_TYPE) : "rest"; - if (emitterType.equals("rest")) { - String gmsUrl = - sparkConf.hasPath(SparkConfigParser.GMS_URL_KEY) - ? sparkConf.getString(SparkConfigParser.GMS_URL_KEY) - : "http://localhost:8080"; - String token = - sparkConf.hasPath(SparkConfigParser.GMS_AUTH_TOKEN) - ? sparkConf.getString(SparkConfigParser.GMS_AUTH_TOKEN) - : null; - boolean disableSslVerification = - sparkConf.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY) - && sparkConf.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY); - - int retry_interval_in_sec = - sparkConf.hasPath(SparkConfigParser.RETRY_INTERVAL_IN_SEC) - ? sparkConf.getInt(SparkConfigParser.RETRY_INTERVAL_IN_SEC) - : 5; - - int max_retries = - sparkConf.hasPath(SparkConfigParser.MAX_RETRIES) - ? sparkConf.getInt(SparkConfigParser.MAX_RETRIES) - : 0; - - log.info( - "REST Emitter Configuration: GMS url {}{}", - gmsUrl, - (sparkConf.hasPath(SparkConfigParser.GMS_URL_KEY) ? "" : "(default)")); - if (token != null) { - log.info("REST Emitter Configuration: Token {}", "XXXXX"); - } + switch (emitterType) { + case "rest": + String gmsUrl = + sparkConf.hasPath(SparkConfigParser.GMS_URL_KEY) + ? sparkConf.getString(SparkConfigParser.GMS_URL_KEY) + : "http://localhost:8080"; + String token = + sparkConf.hasPath(SparkConfigParser.GMS_AUTH_TOKEN) + ? sparkConf.getString(SparkConfigParser.GMS_AUTH_TOKEN) + : null; + boolean disableSslVerification = + sparkConf.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY) + && sparkConf.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY); + + int retry_interval_in_sec = + sparkConf.hasPath(SparkConfigParser.RETRY_INTERVAL_IN_SEC) + ? sparkConf.getInt(SparkConfigParser.RETRY_INTERVAL_IN_SEC) + : 5; + + int max_retries = + sparkConf.hasPath(SparkConfigParser.MAX_RETRIES) + ? sparkConf.getInt(SparkConfigParser.MAX_RETRIES) + : 0; + + log.info( + "REST Emitter Configuration: GMS url {}{}", + gmsUrl, + (sparkConf.hasPath(SparkConfigParser.GMS_URL_KEY) ? "" : "(default)")); + if (token != null) { + log.info("REST Emitter Configuration: Token {}", "XXXXX"); + } - if (disableSslVerification) { - log.warn("REST Emitter Configuration: ssl verification will be disabled."); - } + if (disableSslVerification) { + log.warn("REST Emitter Configuration: ssl verification will be disabled."); + } - RestEmitterConfig restEmitterConf = - RestEmitterConfig.builder() - .server(gmsUrl) - .token(token) - .disableSslVerification(disableSslVerification) - .maxRetries(max_retries) - .retryIntervalSec(retry_interval_in_sec) - .build(); - return Optional.of(new RestDatahubEmitterConfig(restEmitterConf)); - } else { - log.error( - "DataHub Transport {} not recognized. DataHub Lineage emission will not work", - emitterType); + RestEmitterConfig restEmitterConf = + RestEmitterConfig.builder() + .server(gmsUrl) + .token(token) + .disableSslVerification(disableSslVerification) + .maxRetries(max_retries) + .retryIntervalSec(retry_interval_in_sec) + .build(); + return Optional.of(new RestDatahubEmitterConfig(restEmitterConf)); + case "kafka": + KafkaEmitterConfig.KafkaEmitterConfigBuilder kafkaEmitterConfig = + KafkaEmitterConfig.builder(); + if (sparkConf.hasPath(SparkConfigParser.KAFKA_EMITTER_BOOTSTRAP)) { + kafkaEmitterConfig.bootstrap( + sparkConf.getString(SparkConfigParser.KAFKA_EMITTER_BOOTSTRAP)); + } + if (sparkConf.hasPath(SparkConfigParser.KAFKA_EMITTER_SCHEMA_REGISTRY_URL)) { + kafkaEmitterConfig.schemaRegistryUrl( + sparkConf.getString(SparkConfigParser.KAFKA_EMITTER_SCHEMA_REGISTRY_URL)); + } + + if (sparkConf.hasPath(KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG)) { + Map schemaRegistryConfig = new HashMap<>(); + sparkConf + .getConfig(KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG) + .entrySet() + .forEach( + entry -> { + schemaRegistryConfig.put( + entry.getKey(), entry.getValue().unwrapped().toString()); + }); + kafkaEmitterConfig.schemaRegistryConfig(schemaRegistryConfig); + } + + if (sparkConf.hasPath(KAFKA_EMITTER_PRODUCER_CONFIG)) { + Map kafkaConfig = new HashMap<>(); + sparkConf + .getConfig(KAFKA_EMITTER_PRODUCER_CONFIG) + .entrySet() + .forEach( + entry -> { + kafkaConfig.put(entry.getKey(), entry.getValue().unwrapped().toString()); + }); + kafkaEmitterConfig.producerConfig(kafkaConfig); + } + + return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build())); + case "file": + log.info("File Emitter Configuration: File emitter will be used"); + FileEmitterConfig.FileEmitterConfigBuilder fileEmitterConfig = FileEmitterConfig.builder(); + fileEmitterConfig.fileName(sparkConf.getString(SparkConfigParser.FILE_EMITTER_FILE_NAME)); + return Optional.of(new FileDatahubEmitterConfig(fileEmitterConfig.build())); + case "s3": + log.info("S3 Emitter Configuration: S3 emitter will be used"); + S3EmitterConfig.S3EmitterConfigBuilder s3EmitterConfig = S3EmitterConfig.builder(); + if (sparkConf.hasPath(SparkConfigParser.S3_EMITTER_BUCKET)) { + s3EmitterConfig.bucketName(sparkConf.getString(SparkConfigParser.S3_EMITTER_BUCKET)); + } + + if (sparkConf.hasPath(SparkConfigParser.S3_EMITTER_PREFIX)) { + s3EmitterConfig.pathPrefix(sparkConf.getString(SparkConfigParser.S3_EMITTER_PREFIX)); + } + + if (sparkConf.hasPath(SparkConfigParser.S3_EMITTER_REGION)) { + s3EmitterConfig.region(sparkConf.getString(SparkConfigParser.S3_EMITTER_REGION)); + } + + if (sparkConf.hasPath(S3_EMITTER_PROFILE)) { + s3EmitterConfig.profileName(sparkConf.getString(S3_EMITTER_PROFILE)); + } + + if (sparkConf.hasPath(S3_EMITTER_ENDPOINT)) { + s3EmitterConfig.endpoint(sparkConf.getString(S3_EMITTER_ENDPOINT)); + } + + if (sparkConf.hasPath(S3_EMITTER_ACCESS_KEY)) { + s3EmitterConfig.accessKey(sparkConf.getString(S3_EMITTER_ACCESS_KEY)); + } + + if (sparkConf.hasPath(S3_EMITTER_SECRET_KEY)) { + s3EmitterConfig.secretKey(sparkConf.getString(S3_EMITTER_SECRET_KEY)); + } + + if (sparkConf.hasPath(S3_EMITTER_FILE_NAME)) { + s3EmitterConfig.fileName(sparkConf.getString(S3_EMITTER_FILE_NAME)); + } + + return Optional.of(new S3DatahubEmitterConfig(s3EmitterConfig.build())); + default: + log.error( + "DataHub Transport {} not recognized. DataHub Lineage emission will not work", + emitterType); + break; } return Optional.empty(); @@ -171,9 +258,9 @@ private synchronized SparkLineageConf loadDatahubConfig( } log.info("Datahub configuration: {}", datahubConf.root().render()); - Optional restEmitter = initializeEmitter(datahubConf); + Optional emitterConfig = initializeEmitter(datahubConf); SparkLineageConf sparkLineageConf = - SparkLineageConf.toSparkLineageConf(datahubConf, appContext, restEmitter.orElse(null)); + SparkLineageConf.toSparkLineageConf(datahubConf, appContext, emitterConfig.orElse(null)); long elapsedTime = System.currentTimeMillis() - startTime; log.debug("loadDatahubConfig completed successfully in {} ms", elapsedTime); @@ -182,6 +269,7 @@ private synchronized SparkLineageConf loadDatahubConfig( public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { long startTime = System.currentTimeMillis(); + initializeContextFactoryIfNotInitialized(); log.debug("Application end called"); listener.onApplicationEnd(applicationEnd); diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/DatahubEmitterConfig.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/DatahubEmitterConfig.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/DatahubEmitterConfig.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/DatahubEmitterConfig.java diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/FileDatahubEmitterConfig.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/FileDatahubEmitterConfig.java new file mode 100644 index 0000000000000..03bc4238b6f39 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/FileDatahubEmitterConfig.java @@ -0,0 +1,18 @@ +package datahub.spark.conf; + +import datahub.client.file.FileEmitterConfig; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@Setter +@ToString +@Getter +public class FileDatahubEmitterConfig implements DatahubEmitterConfig { + final String type = "file"; + FileEmitterConfig fileEmitterConfig; + + public FileDatahubEmitterConfig(FileEmitterConfig fileEmitterConfig) { + this.fileEmitterConfig = fileEmitterConfig; + } +} diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java new file mode 100644 index 0000000000000..6ed66dbc9230f --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java @@ -0,0 +1,18 @@ +package datahub.spark.conf; + +import datahub.client.kafka.KafkaEmitterConfig; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@Setter +@ToString +@Getter +public class KafkaDatahubEmitterConfig implements DatahubEmitterConfig { + final String type = "kafka"; + KafkaEmitterConfig kafkaEmitterConfig; + + public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig) { + this.kafkaEmitterConfig = kafkaEmitterConfig; + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/RestDatahubEmitterConfig.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/RestDatahubEmitterConfig.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/RestDatahubEmitterConfig.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/RestDatahubEmitterConfig.java diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/S3DatahubEmitterConfig.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/S3DatahubEmitterConfig.java new file mode 100644 index 0000000000000..40b446c12ca3a --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/S3DatahubEmitterConfig.java @@ -0,0 +1,18 @@ +package datahub.spark.conf; + +import datahub.client.s3.S3EmitterConfig; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@Setter +@ToString +@Getter +public class S3DatahubEmitterConfig implements DatahubEmitterConfig { + final String type = "s3"; + S3EmitterConfig s3EmitterConfig; + + public S3DatahubEmitterConfig(S3EmitterConfig s3EmitterConfig) { + this.s3EmitterConfig = s3EmitterConfig; + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkAppContext.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkAppContext.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkAppContext.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkAppContext.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java similarity index 90% rename from metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java index c854861af2f81..630f10b08b411 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -25,12 +25,26 @@ public class SparkConfigParser { public static final String PARENT_JOB_KEY = "parent.datajob_urn"; - public static final String TRANSPORT_KEY = "transport"; + public static final String EMITTER_TYPE = "emitter"; public static final String GMS_URL_KEY = "rest.server"; public static final String GMS_AUTH_TOKEN = "rest.token"; + public static final String FILE_EMITTER_FILE_NAME = "file.filename"; public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification"; public static final String MAX_RETRIES = "rest.max_retries"; public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec"; + public static final String KAFKA_EMITTER_BOOTSTRAP = "kafka.bootstrap"; + public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_URL = "kafka.schema_registry_url"; + public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG = "kafka.schema_registry_config"; + public static final String KAFKA_EMITTER_PRODUCER_CONFIG = "kafka.producer_config"; + + public static final String S3_EMITTER_BUCKET = "s3.bucket"; + public static final String S3_EMITTER_REGION = "s3.region"; + public static final String S3_EMITTER_ENDPOINT = "s3.endpoint"; + public static final String S3_EMITTER_PREFIX = "s3.prefix"; + public static final String S3_EMITTER_ACCESS_KEY = "s3.access_key"; + public static final String S3_EMITTER_SECRET_KEY = "s3.secret_key"; + public static final String S3_EMITTER_PROFILE = "s3.profile"; + public static final String S3_EMITTER_FILE_NAME = "s3.filename"; public static final String COALESCE_KEY = "coalesce_jobs"; public static final String PATCH_ENABLED = "patch.enabled"; @@ -46,8 +60,10 @@ public class SparkConfigParser { public static final String DATASET_MATERIALIZE_KEY = "metadata.dataset.materialize"; public static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance"; - public static final String DATASET_INCLUDE_SCHEMA_METADATA = + public static final String DATASET_INCLUDE_SCHEMA_METADATA_DEPRECATED_ALIAS = "metadata.dataset.experimental_include_schema_metadata"; + public static final String DATASET_INCLUDE_SCHEMA_METADATA = + "metadata.dataset.include_schema_metadata"; public static final String SPARK_PLATFORM_INSTANCE_KEY = "platformInstance"; public static final String REMOVE_PARTITION_PATTERN = "metadata.remove_partition_pattern"; public static final String SPARK_APP_NAME = "spark.app.name"; @@ -293,8 +309,13 @@ public static boolean isDatasetMaterialize(Config datahubConfig) { } public static boolean isIncludeSchemaMetadata(Config datahubConfig) { - return datahubConfig.hasPath(DATASET_INCLUDE_SCHEMA_METADATA) - && datahubConfig.getBoolean(DATASET_INCLUDE_SCHEMA_METADATA); + if (datahubConfig.hasPath(DATASET_INCLUDE_SCHEMA_METADATA)) { + return datahubConfig.getBoolean(DATASET_INCLUDE_SCHEMA_METADATA); + } else { + // TODO: Deprecate eventually + return datahubConfig.hasPath(DATASET_INCLUDE_SCHEMA_METADATA_DEPRECATED_ALIAS) + && datahubConfig.getBoolean(DATASET_INCLUDE_SCHEMA_METADATA_DEPRECATED_ALIAS); + } } public static String getPipelineName(Config datahubConfig, SparkAppContext appContext) { diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkLineageConf.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkLineageConf.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/Constants.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/Constants.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/Constants.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/Constants.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/RedshiftVendor.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/RedshiftVendor.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/RedshiftVendor.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/RedshiftVendor.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftDataset.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftDataset.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftDataset.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftDataset.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftRelationVisitor.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftRelationVisitor.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftRelationVisitor.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftRelationVisitor.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftVisitorFactory.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftVisitorFactory.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftVisitorFactory.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftVisitorFactory.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftEventHandlerFactory.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftEventHandlerFactory.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftEventHandlerFactory.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftEventHandlerFactory.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftSaveIntoDataSourceCommandBuilder.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftSaveIntoDataSourceCommandBuilder.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftSaveIntoDataSourceCommandBuilder.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftSaveIntoDataSourceCommandBuilder.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/Vendors.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/api/Vendors.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/Vendors.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/api/Vendors.java diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/VendorsImpl.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/api/VendorsImpl.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/VendorsImpl.java rename to metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/api/VendorsImpl.java diff --git a/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/HdfsPathDatasetTest.java b/metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/HdfsPathDatasetTest.java similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/HdfsPathDatasetTest.java rename to metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/HdfsPathDatasetTest.java diff --git a/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java b/metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java similarity index 92% rename from metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java rename to metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java index b42c87618d2b6..ef2b17e9932f2 100644 --- a/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java +++ b/metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java @@ -513,6 +513,33 @@ public void testProcessGlueOlEvent() throws URISyntaxException, IOException { } } + public void testProcess_OL17_GlueOlEvent() throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + + String olEvent = + IOUtils.toString( + this.getClass().getResourceAsStream("/ol_events/sample_glue_ol_0_17_changes.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:glue,my_glue_database.my_glue_table,DEV)", + dataset.getUrn().toString()); + } + for (DatahubDataset dataset : datahubJob.getOutSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:glue,my_glue_database.my_output_glue_table,DEV)", + dataset.getUrn().toString()); + } + } + public void testProcessGlueOlEventSymlinkDisabled() throws URISyntaxException, IOException { DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = DatahubOpenlineageConfig.builder(); @@ -754,4 +781,37 @@ public void testProcessRedshiftOutputLowercasedUrns() throws URISyntaxException, dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift"); } } + + public void testProcessGCSInputsOutputs() throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + builder.lowerCaseDatasetUrns(true); + builder.materializeDataset(true); + builder.includeSchemaMetadata(true); + + String olEvent = + IOUtils.toString( + this.getClass().getResourceAsStream("/ol_events/gs_input_output.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + assertEquals(1, datahubJob.getInSet().size()); + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:gcs,my-gs-input-bucket/path/to/my-input-file.csv,DEV)", + dataset.getUrn().toString()); + } + assertEquals(1, datahubJob.getOutSet().size()); + + for (DatahubDataset dataset : datahubJob.getOutSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:gcs,my-gs-output-bucket/path/to/my-output-file.csv,DEV)", + dataset.getUrn().toString()); + } + } } diff --git a/metadata-integration/java/spark-lineage-beta/src/test/resources/log4j.properties b/metadata-integration/java/acryl-spark-lineage/src/test/resources/log4j.properties similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/test/resources/log4j.properties rename to metadata-integration/java/acryl-spark-lineage/src/test/resources/log4j.properties diff --git a/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/gs_input_output.json b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/gs_input_output.json new file mode 100644 index 0000000000000..ff08d6c3d1d0e --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/gs_input_output.json @@ -0,0 +1,62 @@ +{ + "eventTime": "2024-07-23T14:42:58.176Z", + "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", + "eventType": "COMPLETE", + "run": { + "runId": "0190e00b-80a0-7902-829f-10faa3197778", + "facets": { + "parent": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet", + "run": { + "runId": "0190e00b-80a0-7902-829f-10faa3197778" + }, + "job": { + "namespace": "default", + "name": "my-job" + } + }, + "processing_engine": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet", + "version": "3.3.4", + "name": "spark", + "openlineageAdapterVersion": "3.3.2_2.12" + }, + "spark_properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "properties": { + "spark.master": "spark://my-spark-master:7077", + "spark.app.name": "mySparkApp" + } + } + } + }, + "job": { + "namespace": "default", + "name": "my_job", + "facets": { + "jobType": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", + "processingType": "BATCH", + "integration": "SPARK", + "jobType": "RDD_JOB" + } + } + }, + "inputs": [ + { + "namespace": "gs://my-gs-input-bucket", + "name": "/path/to/my-input-file.csv" + } + ], + "outputs": [ + { + "namespace": "gs://my-gs-output-bucket", + "name": "/path/to/my-output-file.csv" + } + ] +} \ No newline at end of file diff --git a/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_lineage_spark.json b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/redshift_lineage_spark.json similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_lineage_spark.json rename to metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/redshift_lineage_spark.json diff --git a/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_mixed_case_lineage_spark.json b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/redshift_mixed_case_lineage_spark.json similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_mixed_case_lineage_spark.json rename to metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/redshift_mixed_case_lineage_spark.json diff --git a/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/sample_failed_spark.json b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/sample_failed_spark.json similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/sample_failed_spark.json rename to metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/sample_failed_spark.json diff --git a/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/sample_glue.json b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/sample_glue.json similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/sample_glue.json rename to metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/sample_glue.json diff --git a/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/sample_glue_ol_0_17_changes.json b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/sample_glue_ol_0_17_changes.json new file mode 100644 index 0000000000000..d2b92964b873a --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/sample_glue_ol_0_17_changes.json @@ -0,0 +1,168 @@ +{ + "eventTime": "2024-05-31T17:01:26.465Z", + "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", + "eventType": "START", + "run": { + "runId": "3ad2a5ec-1c8b-4bda-84f4-1492758af65c", + "facets": { + "parent": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet", + "run": { + "runId": "f03077f2-c077-4472-987e-b89b1c741c86" + }, + "job": { + "namespace": "default", + "name": "simple_app_parquet_with_persist_without_coalesce_s3_demo" + } + }, + "processing_engine": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet", + "version": "3.3.0-amzn-1", + "name": "spark" + }, + "environment-properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "environment-properties": {} + }, + "spark_properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "properties": { + "spark.master": "jes", + "spark.app.name": "SimpleAppParquetWithPersistWithoutCoalesceS3-Demo" + } + }, + "spark_version": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "spark-version": "3.3.0-amzn-1" + } + } + }, + "job": { + "namespace": "default", + "name": "simple_app_parquet_with_persist_without_coalesce_s3_demo.execute_insert_into_hadoop_fs_relation_command.sample_data_output", + "facets": { + "jobType": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", + "processingType": "BATCH", + "integration": "SPARK", + "jobType": "JOB" + } + } + }, + "inputs": [ + { + "namespace": "s3://my-bucket-test", + "name": "/sample_data/input_data.parquet", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "s3://my-bucket-test", + "uri": "s3://my-bucket-test" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "field_1", + "type": "integer" + }, + { + "name": "field_2", + "type": "string" + } + ] + }, + "symlinks": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", + "identifiers": [ + { + "namespace": "arn:aws:glue:us-west-2:123456789012", + "name": "table/my_glue_database/my_glue_table", + "type": "TABLE" + } + ] + } + }, + "inputFacets": {} + } + ], + "outputs": [ + { + "namespace": "s3://my-bucket-test", + "name": "mydata _output", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "s3://my-bucket-test", + "uri": "s3://my-bucket-test" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "field1", + "type": "long" + }, + { + "name": "field2", + "type": "string" + } + ] + }, + "columnLineage": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet", + "fields": { + "field1": { + "inputFields": [ + { + "namespace": "s3://my-bucket-test", + "name": "/output/field1.parquet", + "field": "field1" + } + ] + }, + "field2": { + "inputFields": [ + { + "namespace": "s3://my-bucket-test", + "name": "/output/field2.parquet", + "field": "field2" + } + ] + } + } + }, + "symlinks": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet", + "identifiers": [ + { + "namespace": "arn:aws:glue:us-east-1:123456789012/", + "name": "table/my_glue_database/my_output_glue_table", + "type": "TABLE" + } + ] + }, + "lifecycleStateChange": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet", + "lifecycleStateChange": "OVERWRITE" + } + }, + "outputFacets": {} + } + ] +} \ No newline at end of file diff --git a/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/sample_spark.json b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/sample_spark.json similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/sample_spark.json rename to metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/sample_spark.json diff --git a/metadata-integration/java/spark-lineage-beta/src/test/resources/org.apache.spark/log4j-defaults.properties b/metadata-integration/java/acryl-spark-lineage/src/test/resources/org.apache.spark/log4j-defaults.properties similarity index 100% rename from metadata-integration/java/spark-lineage-beta/src/test/resources/org.apache.spark/log4j-defaults.properties rename to metadata-integration/java/acryl-spark-lineage/src/test/resources/org.apache.spark/log4j-defaults.properties diff --git a/metadata-integration/java/datahub-client/build.gradle b/metadata-integration/java/datahub-client/build.gradle index 3f54497919062..d9087347e1b5c 100644 --- a/metadata-integration/java/datahub-client/build.gradle +++ b/metadata-integration/java/datahub-client/build.gradle @@ -21,13 +21,15 @@ dependencies { exclude group: "org.apache.avro" } implementation externalDependency.avro + implementation externalDependency.httpClient + constraints { implementation('commons-collections:commons-collections:3.2.2') { because 'Vulnerability Issue' } } - compileOnly externalDependency.httpClient + implementation externalDependency.awsS3 implementation externalDependency.jacksonDataBind runtimeOnly externalDependency.jna @@ -41,7 +43,6 @@ dependencies { testImplementation externalDependency.mockServer testImplementation externalDependency.mockServerClient testImplementation externalDependency.testContainers - testImplementation externalDependency.httpClient testRuntimeOnly externalDependency.logbackClassic } @@ -105,6 +106,8 @@ shadowJar { relocate 'com.thoughtworks.paranamer', 'datahub.shaded.com.thoughtworks.paranamer' relocate 'org.xerial.snappy', 'datahub.shaded.org.xerial.snappy' relocate 'org.apache.kafka', 'datahub.shaded.org.apache.kafka' + relocate 'org.apache.http', 'datahub.shaded.org.apache.http' + relocate 'software.amazon.awssdk', 'datahub.shaded.software.amazon.awssdk' relocate 'io.confluent', 'datahub.shaded.io.confluent' relocate 'org.apache.zookeeper', 'datahub.shaded.org.apache.zookeeper' relocate 'org.apache.yetus', 'datahub.shaded.org.apache.yetus' @@ -121,7 +124,9 @@ shadowJar { relocate 'org.eclipse.parsson', 'datahub.shaded.parsson' relocate 'jakarta.json', 'datahub.shaded.json' relocate 'io.netty', 'datahub.shaded.io.netty' - + relocate 'org.apache.hc', 'datahub.shaded.org.apache.hc' + relocate 'org.reactivestreams', 'datahub.shaded.org.reactivestreams' + relocate 'software.amazon.eventstream', 'datahub.shaded.software.amazon.eventstream' finalizedBy checkShadowJar } diff --git a/metadata-integration/java/datahub-client/scripts/check_jar.sh b/metadata-integration/java/datahub-client/scripts/check_jar.sh index e2c9ec16d49f8..f76931428e3d6 100755 --- a/metadata-integration/java/datahub-client/scripts/check_jar.sh +++ b/metadata-integration/java/datahub-client/scripts/check_jar.sh @@ -37,6 +37,10 @@ jar -tvf $jarFile |\ grep -v "MetadataChangeProposal.avsc" |\ grep -v "aix" |\ grep -v "com/sun/" + grep -v "mozilla" + grep -v "VersionInfo.java" + grep -v "mime.types" + if [ $? -ne 0 ]; then echo "✅ No unexpected class paths found in ${jarFile}" diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/file/FileEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/file/FileEmitter.java index ab866f060b354..a4f6aa367d808 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/file/FileEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/file/FileEmitter.java @@ -88,7 +88,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { public MetadataWriteResponse get() throws InterruptedException, ExecutionException { return MetadataWriteResponse.builder() .success(true) - .responseContent("MCP witten to File") + .responseContent("MCP witten to File " + config.getFileName()) .build(); } diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java index ba310de14813e..a9340d18749ad 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java @@ -51,6 +51,7 @@ public KafkaEmitter(KafkaEmitterConfig config) throws IOException { kafkaConfigProperties.put("schema.registry.url", this.config.getSchemaRegistryUrl()); kafkaConfigProperties.putAll(config.getSchemaRegistryConfig()); kafkaConfigProperties.putAll(config.getProducerConfig()); + producer = new KafkaProducer<>(kafkaConfigProperties); _avroSerializer = new AvroSerializer(); } diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/s3/S3Emitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/s3/S3Emitter.java new file mode 100644 index 0000000000000..980777d79c2c5 --- /dev/null +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/s3/S3Emitter.java @@ -0,0 +1,190 @@ +package datahub.client.s3; + +import com.linkedin.mxe.MetadataChangeProposal; +import datahub.client.Callback; +import datahub.client.Emitter; +import datahub.client.MetadataWriteResponse; +import datahub.client.file.FileEmitter; +import datahub.client.file.FileEmitterConfig; +import datahub.event.MetadataChangeProposalWrapper; +import datahub.event.UpsertAspectRequest; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.profiles.ProfileFile; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; + +@Slf4j +public class S3Emitter implements Emitter { + private final Path temporaryFile; + private final FileEmitter fileEmitter; + private final S3Client client; + private final S3EmitterConfig config; + + /** + * The default constructor + * + * @param config + */ + public S3Emitter(S3EmitterConfig config) throws IOException { + temporaryFile = Files.createTempFile("datahub_ingest_", "_mcps.json"); + log.info("Emitter created temporary file: {}", this.temporaryFile.toFile()); + FileEmitterConfig fileEmitterConfig = + FileEmitterConfig.builder() + .fileName(temporaryFile.toString()) + .eventFormatter(config.getEventFormatter()) + .build(); + fileEmitter = new FileEmitter(fileEmitterConfig); + S3ClientBuilder s3ClientBuilder = S3Client.builder(); + + if (config.getRegion() != null) { + s3ClientBuilder.region(Region.of(config.getRegion())); + } + + if (config.getEndpoint() != null) { + try { + s3ClientBuilder.endpointOverride(new URI(config.getEndpoint())); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + if (config.getAccessKey() != null && config.getSecretKey() != null) { + s3ClientBuilder.credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(config.getAccessKey(), config.getSecretKey()))); + } else { + DefaultCredentialsProvider.Builder credentialsProviderBuilder = + DefaultCredentialsProvider.builder(); + if (config.getProfileName() != null) { + credentialsProviderBuilder.profileName(config.getProfileName()); + } + + if (config.getProfileFile() != null) { + credentialsProviderBuilder.profileFile( + ProfileFile.builder().content(Paths.get(config.getProfileFile())).build()); + } + s3ClientBuilder.credentialsProvider(credentialsProviderBuilder.build()); + } + + this.client = s3ClientBuilder.build(); + this.config = config; + } + + private void deleteTemporaryFile() { + try { + Files.delete(temporaryFile); + log.debug("Emitter deleted temporary file: {}", this.temporaryFile.toFile()); + } catch (IOException e) { + log.warn("Failed to delete temporary file {}", temporaryFile); + } + } + + @Override + public void close() throws IOException { + log.debug("Closing file {}", this.temporaryFile.toFile()); + fileEmitter.close(); + String key = this.temporaryFile.getFileName().toString(); + + // If the target filename is set, use that as the key + if (config.getFileName() != null) { + key = config.getFileName(); + } + if (config.getPathPrefix().endsWith("/")) { + key = config.getPathPrefix() + key; + } else { + key = config.getPathPrefix() + "/" + key; + } + + if (key.startsWith("/")) { + key = key.substring(1); + } + + PutObjectRequest objectRequest = + PutObjectRequest.builder().bucket(config.getBucketName()).key(key).build(); + + log.info( + "Uploading file {} to S3 with bucket {} and key: {}", + this.temporaryFile, + config.getBucketName(), + key); + + PutObjectResponse response = client.putObject(objectRequest, this.temporaryFile); + deleteTemporaryFile(); + if (!response.sdkHttpResponse().isSuccessful()) { + log.error("Failed to upload file to S3. Response: {}", response); + throw new IOException("Failed to upload file to S3. Response: " + response); + } + } + + @Override + public Future emit( + @SuppressWarnings("rawtypes") MetadataChangeProposalWrapper mcpw, Callback callback) + throws IOException { + return fileEmitter.emit(mcpw, callback); + } + + @Override + public Future emit(MetadataChangeProposal mcp, Callback callback) + throws IOException { + return fileEmitter.emit(mcp, callback); + } + + @Override + public boolean testConnection() throws IOException, ExecutionException, InterruptedException { + throw new UnsupportedOperationException("testConnection not relevant for File Emitter"); + } + + @Override + public Future emit(List request, Callback callback) + throws IOException { + throw new UnsupportedOperationException("UpsertAspectRequest not relevant for File Emitter"); + } + + private Future createFailureFuture(String message) { + return new Future() { + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public MetadataWriteResponse get() throws InterruptedException, ExecutionException { + return MetadataWriteResponse.builder().success(false).responseContent(message).build(); + } + + @Override + public MetadataWriteResponse get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return this.get(); + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + }; + } +} diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/s3/S3EmitterConfig.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/s3/S3EmitterConfig.java new file mode 100644 index 0000000000000..48a38ee41d43f --- /dev/null +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/s3/S3EmitterConfig.java @@ -0,0 +1,24 @@ +package datahub.client.s3; + +import datahub.event.EventFormatter; +import lombok.Builder; +import lombok.Value; + +@Value +@Builder +public class S3EmitterConfig { + @Builder.Default @lombok.NonNull String bucketName = null; + @Builder.Default String pathPrefix = null; + @Builder.Default String fileName = null; + + @Builder.Default + EventFormatter eventFormatter = new EventFormatter(EventFormatter.Format.PEGASUS_JSON); + + @Builder.Default String region = null; + @Builder.Default String endpoint = null; + @Builder.Default String accessKey = null; + @Builder.Default String secretKey = null; + @Builder.Default String sessionToken = null; + @Builder.Default String profileFile = null; + @Builder.Default String profileName = null; +} diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java index 1568451beff10..9237ee60f473b 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java @@ -113,12 +113,19 @@ public static Optional convertOpenlineageDatasetToDatasetUrn( for (OpenLineage.SymlinksDatasetFacetIdentifiers symlink : dataset.getFacets().getSymlinks().getIdentifiers()) { if (symlink.getType().equals("TABLE")) { - if (symlink.getNamespace().startsWith("aws:glue:")) { + // Before OpenLineage 0.17.1 the namespace started with "aws:glue:" and after that it was + // changed to :arn:aws:glue:" + if (symlink.getNamespace().startsWith("aws:glue:") + || symlink.getNamespace().startsWith("arn:aws:glue:")) { namespace = "glue"; } else { namespace = mappingConfig.getHivePlatformAlias(); } - datasetName = symlink.getName(); + if (symlink.getName().startsWith("table/")) { + datasetName = symlink.getName().replaceFirst("table/", "").replace("/", "."); + } else { + datasetName = symlink.getName(); + } } } Optional symlinkedUrn = @@ -761,10 +768,7 @@ private static void processJobInputs( } for (OpenLineage.InputDataset input : - event.getInputs().stream() - .filter(input -> input.getFacets() != null) - .distinct() - .collect(Collectors.toList())) { + event.getInputs().stream().distinct().collect(Collectors.toList())) { Optional datasetUrn = convertOpenlineageDatasetToDatasetUrn(input, datahubConf); if (datasetUrn.isPresent()) { DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder(); @@ -791,10 +795,7 @@ private static void processJobOutputs( } for (OpenLineage.OutputDataset output : - event.getOutputs().stream() - .filter(input -> input.getFacets() != null) - .distinct() - .collect(Collectors.toList())) { + event.getOutputs().stream().distinct().collect(Collectors.toList())) { Optional datasetUrn = convertOpenlineageDatasetToDatasetUrn(output, datahubConf); if (datasetUrn.isPresent()) { DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder(); diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/plan/InsertIntoHadoopFsRelationVisitor.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/plan/InsertIntoHadoopFsRelationVisitor.java deleted file mode 100644 index 1e5bc72967e68..0000000000000 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/plan/InsertIntoHadoopFsRelationVisitor.java +++ /dev/null @@ -1,79 +0,0 @@ -/* -/* Copyright 2018-2024 contributors to the OpenLineage project -/* SPDX-License-Identifier: Apache-2.0 -*/ - -package io.openlineage.spark.agent.lifecycle.plan; - -import io.openlineage.client.OpenLineage; -import io.openlineage.client.utils.DatasetIdentifier; -import io.openlineage.spark.agent.util.PathUtils; -import io.openlineage.spark.api.OpenLineageContext; -import io.openlineage.spark.api.QueryPlanVisitor; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.catalyst.catalog.CatalogTable; -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; -import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand; -import scala.Option; - -/** - * {@link LogicalPlan} visitor that matches an {@link InsertIntoHadoopFsRelationCommand} and - * extracts the output {@link OpenLineage.Dataset} being written. - */ -public class InsertIntoHadoopFsRelationVisitor - extends QueryPlanVisitor { - - public InsertIntoHadoopFsRelationVisitor(OpenLineageContext context) { - super(context); - } - - @Override - public List apply(LogicalPlan x) { - InsertIntoHadoopFsRelationCommand command = (InsertIntoHadoopFsRelationCommand) x; - - Option catalogTable = command.catalogTable(); - OpenLineage.OutputDataset outputDataset; - - if (catalogTable.isEmpty()) { - DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file"); - if (SaveMode.Overwrite == command.mode()) { - outputDataset = - outputDataset() - .getDataset( - di, - command.query().schema(), - OpenLineage.LifecycleStateChangeDatasetFacet.LifecycleStateChange.OVERWRITE); - } else { - outputDataset = outputDataset().getDataset(di, command.query().schema()); - } - return Collections.singletonList(outputDataset); - } else { - if (SaveMode.Overwrite == command.mode()) { - return Collections.singletonList( - outputDataset() - .getDataset( - PathUtils.fromCatalogTable(catalogTable.get()), - catalogTable.get().schema(), - OpenLineage.LifecycleStateChangeDatasetFacet.LifecycleStateChange.CREATE)); - } else { - return Collections.singletonList( - outputDataset() - .getDataset( - PathUtils.fromCatalogTable(catalogTable.get()), catalogTable.get().schema())); - } - } - } - - @Override - public Optional jobNameSuffix(InsertIntoHadoopFsRelationCommand command) { - if (command.catalogTable().isEmpty()) { - DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file"); - return Optional.of(trimPath(di.getName())); - } - return Optional.of( - trimPath(PathUtils.fromCatalogTable(command.catalogTable().get()).getName())); - } -} diff --git a/metadata-integration/java/spark-lineage/README.md b/metadata-integration/java/spark-lineage-legacy/README.md similarity index 99% rename from metadata-integration/java/spark-lineage/README.md rename to metadata-integration/java/spark-lineage-legacy/README.md index 041408aac6d6d..d0120d4c6a3f1 100644 --- a/metadata-integration/java/spark-lineage/README.md +++ b/metadata-integration/java/spark-lineage-legacy/README.md @@ -2,7 +2,7 @@ :::note -This is our legacy Spark Integration which is replaced by [Acryl Spark Lineage](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta) +This is our legacy Spark Integration which is replaced by [Acryl Spark Lineage](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage) ::: diff --git a/metadata-integration/java/spark-lineage/build.gradle b/metadata-integration/java/spark-lineage-legacy/build.gradle similarity index 100% rename from metadata-integration/java/spark-lineage/build.gradle rename to metadata-integration/java/spark-lineage-legacy/build.gradle diff --git a/metadata-integration/java/spark-lineage/scripts/check_jar.sh b/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh similarity index 94% rename from metadata-integration/java/spark-lineage/scripts/check_jar.sh rename to metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh index b1c11494c2c80..2dd3743ae2ced 100755 --- a/metadata-integration/java/spark-lineage/scripts/check_jar.sh +++ b/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh @@ -37,7 +37,10 @@ jar -tvf $jarFile |\ grep -v "MetadataChangeProposal.avsc" |\ grep -v "aix" |\ grep -v "library.properties" |\ - grep -v "rootdoc.txt" + grep -v "rootdoc.txt" |\ + grep -v "VersionInfo.java" |\ + grep -v "mime.types" + if [ $? -ne 0 ]; then echo "✅ No unexpected class paths found in ${jarFile}" diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/docker/.dockerignore b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/.dockerignore similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/docker/.dockerignore rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/.dockerignore diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/docker/SparkBase.Dockerfile b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/SparkBase.Dockerfile similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/docker/SparkBase.Dockerfile rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/SparkBase.Dockerfile diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/docker/SparkMaster.Dockerfile b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/SparkMaster.Dockerfile similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/docker/SparkMaster.Dockerfile rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/SparkMaster.Dockerfile diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/docker/SparkSlave.Dockerfile b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/SparkSlave.Dockerfile similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/docker/SparkSlave.Dockerfile rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/SparkSlave.Dockerfile diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/docker/SparkSubmit.Dockerfile b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/SparkSubmit.Dockerfile similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/docker/SparkSubmit.Dockerfile rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/SparkSubmit.Dockerfile diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/docker/build_images.sh b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/build_images.sh similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/docker/build_images.sh rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/build_images.sh diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/docker/spark-docker-compose.yml b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/spark-docker-compose.yml similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/docker/spark-docker-compose.yml rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/docker/spark-docker-compose.yml diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/JavaHdfsIn2HdfsOut1.json b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/JavaHdfsIn2HdfsOut1.json similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/JavaHdfsIn2HdfsOut1.json rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/JavaHdfsIn2HdfsOut1.json diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/JavaHdfsIn2HdfsOut2.json b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/JavaHdfsIn2HdfsOut2.json similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/JavaHdfsIn2HdfsOut2.json rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/JavaHdfsIn2HdfsOut2.json diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/JavaHdfsIn2HiveCreateInsertTable.json b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/JavaHdfsIn2HiveCreateInsertTable.json similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/JavaHdfsIn2HiveCreateInsertTable.json rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/JavaHdfsIn2HiveCreateInsertTable.json diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/JavaHdfsIn2HiveCreateTable.json b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/JavaHdfsIn2HiveCreateTable.json similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/JavaHdfsIn2HiveCreateTable.json rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/JavaHdfsIn2HiveCreateTable.json diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/JavaHiveInHiveOut.json b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/JavaHiveInHiveOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/JavaHiveInHiveOut.json rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/JavaHiveInHiveOut.json diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/PythonHdfsIn2HdfsOut1.json b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/PythonHdfsIn2HdfsOut1.json similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/PythonHdfsIn2HdfsOut1.json rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/PythonHdfsIn2HdfsOut1.json diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/PythonHdfsIn2HdfsOut2.json b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/PythonHdfsIn2HdfsOut2.json similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/PythonHdfsIn2HdfsOut2.json rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/PythonHdfsIn2HdfsOut2.json diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/PythonHdfsIn2HiveCreateInsertTable.json b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/PythonHdfsIn2HiveCreateInsertTable.json similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/PythonHdfsIn2HiveCreateInsertTable.json rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/PythonHdfsIn2HiveCreateInsertTable.json diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/PythonHdfsIn2HiveCreateTable.json b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/PythonHdfsIn2HiveCreateTable.json similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/PythonHdfsIn2HiveCreateTable.json rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/PythonHdfsIn2HiveCreateTable.json diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/PythonHiveInHiveOut.json b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/PythonHiveInHiveOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/golden_json/PythonHiveInHiveOut.json rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/golden_json/PythonHiveInHiveOut.json diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut1.py b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut1.py similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut1.py rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut1.py diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut2.py b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut2.py similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut2.py rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut2.py diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HiveCreateInsertTable.py b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/HdfsIn2HiveCreateInsertTable.py similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HiveCreateInsertTable.py rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/HdfsIn2HiveCreateInsertTable.py diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HiveCreateTable.py b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/HdfsIn2HiveCreateTable.py similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HiveCreateTable.py rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/HdfsIn2HiveCreateTable.py diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HiveInHiveOut.py b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/HiveInHiveOut.py similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HiveInHiveOut.py rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/HiveInHiveOut.py diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/python_test_run.sh b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/python_test_run.sh similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/python_test_run.sh rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/python-spark-lineage-test/python_test_run.sh diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/requirements.txt b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/requirements.txt similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/requirements.txt rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/requirements.txt diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/resources/data/in1.csv/part1.csv b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/resources/data/in1.csv/part1.csv similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/resources/data/in1.csv/part1.csv rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/resources/data/in1.csv/part1.csv diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/resources/data/in2.csv/part1.csv b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/resources/data/in2.csv/part1.csv similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/resources/data/in2.csv/part1.csv rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/resources/data/in2.csv/part1.csv diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/setup_spark_smoke_test.sh b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/setup_spark_smoke_test.sh similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/setup_spark_smoke_test.sh rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/setup_spark_smoke_test.sh diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/smoke.sh b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/smoke.sh similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/smoke.sh rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/smoke.sh diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/spark-docker.conf b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/spark-docker.conf similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/spark-docker.conf rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/spark-docker.conf diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/.gitignore b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/.gitignore similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/.gitignore rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/.gitignore diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/build.gradle b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/build.gradle similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/build.gradle rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/build.gradle diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/gradle/wrapper/gradle-wrapper.jar b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/gradle/wrapper/gradle-wrapper.jar similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/gradle/wrapper/gradle-wrapper.jar rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/gradle/wrapper/gradle-wrapper.jar diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/gradle/wrapper/gradle-wrapper.properties b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/gradle/wrapper/gradle-wrapper.properties similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/gradle/wrapper/gradle-wrapper.properties rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/gradle/wrapper/gradle-wrapper.properties diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/gradlew b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/gradlew similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/gradlew rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/gradlew diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/gradlew.bat b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/gradlew.bat similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/gradlew.bat rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/gradlew.bat diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/java_test_run.sh b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/java_test_run.sh similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/java_test_run.sh rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/java_test_run.sh diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/settings.gradle b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/settings.gradle similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/settings.gradle rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/settings.gradle diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HdfsOut1.java b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HdfsOut1.java similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HdfsOut1.java rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HdfsOut1.java diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HdfsOut2.java b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HdfsOut2.java similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HdfsOut2.java rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HdfsOut2.java diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HiveCreateInsertTable.java b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HiveCreateInsertTable.java similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HiveCreateInsertTable.java rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HiveCreateInsertTable.java diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HiveCreateTable.java b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HiveCreateTable.java similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HiveCreateTable.java rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HdfsIn2HiveCreateTable.java diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HiveInHiveOut.java b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HiveInHiveOut.java similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HiveInHiveOut.java rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HiveInHiveOut.java diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HiveInHiveOut_test1.java b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HiveInHiveOut_test1.java similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HiveInHiveOut_test1.java rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/HiveInHiveOut_test1.java diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/Utils.java b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/Utils.java similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/Utils.java rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test-spark-lineage/src/main/java/test/spark/lineage/Utils.java diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test_e2e.py b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test_e2e.py similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/test_e2e.py rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/test_e2e.py diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/url_list.txt b/metadata-integration/java/spark-lineage-legacy/spark-smoke-test/url_list.txt similarity index 100% rename from metadata-integration/java/spark-lineage/spark-smoke-test/url_list.txt rename to metadata-integration/java/spark-lineage-legacy/spark-smoke-test/url_list.txt diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/DatahubSparkListener.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/DatahubSparkListener.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/DatasetExtractor.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/DatasetExtractor.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/CoalesceJobsEmitter.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/consumer/impl/CoalesceJobsEmitter.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/CoalesceJobsEmitter.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/consumer/impl/CoalesceJobsEmitter.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/McpEmitter.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/consumer/impl/McpEmitter.java similarity index 60% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/McpEmitter.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/consumer/impl/McpEmitter.java index 918ce48d1cf42..d6ee41563724b 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/McpEmitter.java +++ b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/consumer/impl/McpEmitter.java @@ -2,6 +2,8 @@ import com.typesafe.config.Config; import datahub.client.Emitter; +import datahub.client.kafka.KafkaEmitter; +import datahub.client.kafka.KafkaEmitterConfig; import datahub.client.rest.RestEmitter; import datahub.client.rest.RestEmitterConfig; import datahub.event.MetadataChangeProposalWrapper; @@ -9,6 +11,7 @@ import datahub.spark.model.LineageEvent; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -18,12 +21,19 @@ @Slf4j public class McpEmitter implements LineageConsumer { - private String emitterType; + private final String emitterType; private Optional restEmitterConfig; + private Optional kafkaEmitterConfig; + private static final String TRANSPORT_KEY = "transport"; private static final String GMS_URL_KEY = "rest.server"; private static final String GMS_AUTH_TOKEN = "rest.token"; private static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification"; + private static final String KAFKA_BOOTSTRAP = "kafka.connection.bootstrap"; + private static final String KAFKA_PRODUCER_CONFIG = "kafka.connection.producer_config"; + private static final String KAFKA_SCHEMA_REGISTRY_URL = "kafka.connection.schema_registry_url"; + private static final String KAFKA_SCHEMA_REGISTRY_CONFIG = + "kafka.connection.schema_registry_config"; private Optional getEmitter() { Optional emitter = Optional.empty(); @@ -33,6 +43,15 @@ private Optional getEmitter() { emitter = Optional.of(new RestEmitter(restEmitterConfig.get())); } break; + case "kafka": + if (kafkaEmitterConfig.isPresent()) { + try { + emitter = Optional.of(new KafkaEmitter(kafkaEmitterConfig.get())); + } catch (IOException e) { + log.error("Failed to create KafkaEmitter", e); + } + } + break; default: log.error( @@ -89,14 +108,13 @@ public McpEmitter(Config datahubConf) { datahubConf.hasPath(GMS_AUTH_TOKEN) ? datahubConf.getString(GMS_AUTH_TOKEN) : null; boolean disableSslVerification = datahubConf.hasPath(DISABLE_SSL_VERIFICATION_KEY) - ? datahubConf.getBoolean(DISABLE_SSL_VERIFICATION_KEY) - : false; + && datahubConf.getBoolean(DISABLE_SSL_VERIFICATION_KEY); log.info( "REST Emitter Configuration: GMS url {}{}", gmsUrl, (datahubConf.hasPath(GMS_URL_KEY) ? "" : "(default)")); if (token != null) { - log.info("REST Emitter Configuration: Token {}", (token != null) ? "XXXXX" : "(empty)"); + log.info("REST Emitter Configuration: Token {}", "XXXXX"); } if (disableSslVerification) { log.warn("REST Emitter Configuration: ssl verification will be disabled."); @@ -110,6 +128,41 @@ public McpEmitter(Config datahubConf) { .build()); break; + case "kafka": + String kafkaBootstrap = + datahubConf.hasPath(KAFKA_BOOTSTRAP) + ? datahubConf.getString(KAFKA_BOOTSTRAP) + : "localhost:9092"; + Map producerConfig = + datahubConf.hasPath(KAFKA_PRODUCER_CONFIG) + ? datahubConf.getConfig(KAFKA_PRODUCER_CONFIG).entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, entry -> entry.getValue().unwrapped().toString())) + : null; + String schemaRegistryUrl = + datahubConf.hasPath(KAFKA_SCHEMA_REGISTRY_URL) + ? datahubConf.getString(KAFKA_SCHEMA_REGISTRY_URL) + : null; + Map schemaRegistryConfig = + datahubConf.hasPath(KAFKA_SCHEMA_REGISTRY_CONFIG) + ? datahubConf.getConfig(KAFKA_SCHEMA_REGISTRY_CONFIG).entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, entry -> entry.getValue().unwrapped().toString())) + : null; + log.info( + "Kafka Emitter Configuration: Kafka bootstrap {}{}", + kafkaBootstrap, + (datahubConf.hasPath(KAFKA_BOOTSTRAP) ? "" : "(default)")); + kafkaEmitterConfig = + Optional.of( + KafkaEmitterConfig.builder() + .bootstrap(kafkaBootstrap) + .producerConfig(producerConfig) + .schemaRegistryUrl(schemaRegistryUrl) + .schemaRegistryConfig(schemaRegistryConfig) + .build()); default: log.error( "DataHub Transport {} not recognized. DataHub Lineage emission will not work", diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/AppEndEvent.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/AppEndEvent.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/AppEndEvent.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/AppEndEvent.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/AppStartEvent.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/AppStartEvent.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/AppStartEvent.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/AppStartEvent.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/DatasetLineage.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/DatasetLineage.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/DatasetLineage.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/DatasetLineage.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageConsumer.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/LineageConsumer.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageConsumer.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/LineageConsumer.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageEvent.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/LineageEvent.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageEvent.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/LineageEvent.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageUtils.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/LineageUtils.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageUtils.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/LineageUtils.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecEndEvent.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/SQLQueryExecEndEvent.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecEndEvent.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/SQLQueryExecEndEvent.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/CatalogTableDataset.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/dataset/CatalogTableDataset.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/CatalogTableDataset.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/dataset/CatalogTableDataset.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/JdbcDataset.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/dataset/JdbcDataset.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/JdbcDataset.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/dataset/JdbcDataset.java diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/SparkDataset.java b/metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/dataset/SparkDataset.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/SparkDataset.java rename to metadata-integration/java/spark-lineage-legacy/src/main/java/datahub/spark/model/dataset/SparkDataset.java diff --git a/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestCoalesceJobLineage.java b/metadata-integration/java/spark-lineage-legacy/src/test/java/datahub/spark/TestCoalesceJobLineage.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestCoalesceJobLineage.java rename to metadata-integration/java/spark-lineage-legacy/src/test/java/datahub/spark/TestCoalesceJobLineage.java diff --git a/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java b/metadata-integration/java/spark-lineage-legacy/src/test/java/datahub/spark/TestSparkJobsLineage.java similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java rename to metadata-integration/java/spark-lineage-legacy/src/test/java/datahub/spark/TestSparkJobsLineage.java diff --git a/metadata-integration/java/spark-lineage/src/test/resources/data/in1.csv/part1.csv b/metadata-integration/java/spark-lineage-legacy/src/test/resources/data/in1.csv/part1.csv similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/data/in1.csv/part1.csv rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/data/in1.csv/part1.csv diff --git a/metadata-integration/java/spark-lineage/src/test/resources/data/in2.csv/part1.csv b/metadata-integration/java/spark-lineage-legacy/src/test/resources/data/in2.csv/part1.csv similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/data/in2.csv/part1.csv rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/data/in2.csv/part1.csv diff --git a/metadata-integration/java/spark-lineage/src/test/resources/data/in3.csv/part1.csv b/metadata-integration/java/spark-lineage-legacy/src/test/resources/data/in3.csv/part1.csv similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/data/in3.csv/part1.csv rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/data/in3.csv/part1.csv diff --git a/metadata-integration/java/spark-lineage/src/test/resources/data/in4.csv/part1.csv b/metadata-integration/java/spark-lineage-legacy/src/test/resources/data/in4.csv/part1.csv similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/data/in4.csv/part1.csv rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/data/in4.csv/part1.csv diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test1HdfsInOut.json b/metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test1HdfsInOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/test1HdfsInOut.json rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test1HdfsInOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test2HdfsInHiveOut.json b/metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test2HdfsInHiveOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/test2HdfsInHiveOut.json rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test2HdfsInHiveOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test3HdfsJdbcInJdbcOut.json b/metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test3HdfsJdbcInJdbcOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/test3HdfsJdbcInJdbcOut.json rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test3HdfsJdbcInJdbcOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test4HiveInHiveOut.json b/metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test4HiveInHiveOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/test4HiveInHiveOut.json rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test4HiveInHiveOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test5HdfsInJdbcOut.json b/metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test5HdfsInJdbcOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/test5HdfsInJdbcOut.json rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test5HdfsInJdbcOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test6HdfsJdbcInJdbcOutTwoLevel.json b/metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test6HdfsJdbcInJdbcOutTwoLevel.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/test6HdfsJdbcInJdbcOutTwoLevel.json rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test6HdfsJdbcInJdbcOutTwoLevel.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test7HdfsInPersistHdfsOut.json b/metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test7HdfsInPersistHdfsOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/test7HdfsInPersistHdfsOut.json rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test7HdfsInPersistHdfsOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test8PersistHdfsJdbcInJdbcOut.json b/metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test8PersistHdfsJdbcInJdbcOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/test8PersistHdfsJdbcInJdbcOut.json rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test8PersistHdfsJdbcInJdbcOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test9PersistJdbcInHdfsOut.json b/metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test9PersistJdbcInHdfsOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/test9PersistJdbcInHdfsOut.json rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/test9PersistJdbcInHdfsOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsInOutCoalesce.json b/metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/testHdfsInOutCoalesce.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsInOutCoalesce.json rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/testHdfsInOutCoalesce.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/testHiveInHiveOutCoalesce.json b/metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/testHiveInHiveOutCoalesce.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/testHiveInHiveOutCoalesce.json rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/expected/testHiveInHiveOutCoalesce.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/org/apache/spark/log4j-defaults.properties b/metadata-integration/java/spark-lineage-legacy/src/test/resources/org/apache/spark/log4j-defaults.properties similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/org/apache/spark/log4j-defaults.properties rename to metadata-integration/java/spark-lineage-legacy/src/test/resources/org/apache/spark/log4j-defaults.properties diff --git a/metadata-integration/java/spark-lineage/bin/.gitignore b/metadata-integration/java/spark-lineage/bin/.gitignore deleted file mode 100644 index 7eed456bec8db..0000000000000 --- a/metadata-integration/java/spark-lineage/bin/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/main/ -/test/ diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/bin/.gitignore b/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/bin/.gitignore deleted file mode 100644 index 7eed456bec8db..0000000000000 --- a/metadata-integration/java/spark-lineage/spark-smoke-test/test-spark-lineage/bin/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/main/ -/test/ diff --git a/settings.gradle b/settings.gradle index a09e9a650803f..b850816ab5e6b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -52,13 +52,13 @@ include 'metadata-perf' include 'docs-website' include 'metadata-models-custom' include 'entity-registry:custom-test-model' -include 'metadata-integration:java:spark-lineage' +include 'metadata-integration:java:spark-lineage-legacy' include 'metadata-integration:java:datahub-client' include 'metadata-integration:java:custom-plugin-lib' include 'metadata-integration:java:datahub-event' include 'metadata-integration:java:datahub-protobuf' include 'metadata-integration:java:openlineage-converter' -include 'metadata-integration:java:spark-lineage-beta' +include 'metadata-integration:java:acryl-spark-lineage' include 'ingestion-scheduler' include 'metadata-ingestion-modules:airflow-plugin' include 'metadata-ingestion-modules:dagster-plugin'