diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java index 43a8b2b5feec0..03132cfdcda72 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java @@ -726,7 +726,7 @@ private static void processJobInputs( DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder(); builder.urn(datasetUrn.get()); if (datahubConf.isMaterializeDataset()) { - builder.schemaMetadata(getSchemaMetadata(input)); + builder.schemaMetadata(getSchemaMetadata(input, datahubConf)); } if (datahubConf.isCaptureColumnLevelLineage()) { UpstreamLineage upstreamLineage = getFineGrainedLineage(input, datahubConf); @@ -756,7 +756,7 @@ private static void processJobOutputs( DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder(); builder.urn(datasetUrn.get()); if (datahubConf.isMaterializeDataset()) { - builder.schemaMetadata(getSchemaMetadata(output)); + builder.schemaMetadata(getSchemaMetadata(output, datahubConf)); } if (datahubConf.isCaptureColumnLevelLineage()) { UpstreamLineage upstreamLineage = getFineGrainedLineage(output, datahubConf); @@ -887,7 +887,8 @@ public static SchemaFieldDataType.Type convertOlFieldTypeToDHFieldType( } } - public static SchemaMetadata getSchemaMetadata(OpenLineage.Dataset dataset) { + public static SchemaMetadata getSchemaMetadata( + OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig) { SchemaFieldArray schemaFieldArray = new SchemaFieldArray(); if ((dataset.getFacets() == null) || (dataset.getFacets().getSchema() == null)) { return null; @@ -916,9 +917,16 @@ public static SchemaMetadata getSchemaMetadata(OpenLineage.Dataset dataset) { ddl.setTableSchema(OpenLineageClientUtils.toJson(dataset.getFacets().getSchema().getFields())); SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema(); platformSchema.setMySqlDDL(ddl); + Optional datasetUrn = + getDatasetUrnFromOlDataset(dataset.getNamespace(), dataset.getName(), mappingConfig); + + if (!datasetUrn.isPresent()) { + return null; + } + schemaMetadata.setPlatformSchema(platformSchema); - schemaMetadata.setPlatform(new DataPlatformUrn(dataset.getNamespace())); + schemaMetadata.setPlatform(datasetUrn.get().getPlatformEntity()); schemaMetadata.setFields(schemaFieldArray); return schemaMetadata; diff --git a/metadata-integration/java/spark-lineage-beta/build.gradle b/metadata-integration/java/spark-lineage-beta/build.gradle index d83753028d0b4..99b87b9b89bf4 100644 --- a/metadata-integration/java/spark-lineage-beta/build.gradle +++ b/metadata-integration/java/spark-lineage-beta/build.gradle @@ -56,6 +56,7 @@ dependencies { implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion" compileOnly "org.apache.iceberg:iceberg-spark3-runtime:0.12.1" compileOnly "org.apache.spark:spark-sql_2.12:3.1.3" + compileOnly "io.github.spark-redshift-community:spark-redshift_2.12:6.2.0-spark_3.5" testCompileOnly externalDependency.lombok testAnnotationProcessor externalDependency.lombok diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/Constants.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/Constants.java new file mode 100644 index 0000000000000..717525e4a9aaf --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/Constants.java @@ -0,0 +1,9 @@ +package io.openlineage.spark.agent.vendor.redshift; + +public class Constants { + public static final String REDSHIFT_CLASS_NAME = + "io.github.spark_redshift_community.spark.redshift.RedshiftRelation"; + + public static final String REDSHIFT_PROVIDER_CLASS_NAME = + "io.github.spark_redshift_community.spark.redshift.DefaultSource"; +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/RedshiftVendor.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/RedshiftVendor.java new file mode 100644 index 0000000000000..6f0fceb9c4c4a --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/RedshiftVendor.java @@ -0,0 +1,56 @@ +package io.openlineage.spark.agent.vendor.redshift; + +import static io.openlineage.spark.agent.vendor.redshift.Constants.*; + +import io.openlineage.spark.agent.lifecycle.VisitorFactory; +import io.openlineage.spark.agent.vendor.redshift.lifecycle.RedshiftRelationVisitor; +import io.openlineage.spark.agent.vendor.redshift.lifecycle.plan.RedshiftEventHandlerFactory; +import io.openlineage.spark.agent.vendor.snowflake.lifecycle.SnowflakeVisitorFactory; +import io.openlineage.spark.api.OpenLineageEventHandlerFactory; +import io.openlineage.spark.api.Vendor; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RedshiftVendor implements Vendor { + + public static boolean hasRedshiftClasses() { + /* + Checking the Redshift class with both + SnowflakeRelationVisitor.class.getClassLoader.loadClass and + Thread.currentThread().getContextClassLoader().loadClass. The first checks if the class is + present on the classpath, and the second one is a catchall which captures if the class has + been installed. This is relevant for Azure Databricks where jars can be installed and + accessible to the user, even if they are not present on the classpath. + */ + try { + RedshiftRelationVisitor.class.getClassLoader().loadClass(REDSHIFT_PROVIDER_CLASS_NAME); + return true; + } catch (Exception e) { + // swallow - we don't care + } + try { + Thread.currentThread().getContextClassLoader().loadClass(REDSHIFT_PROVIDER_CLASS_NAME); + return true; + } catch (Exception e) { + // swallow - we don't care + } + return false; + } + + @Override + public boolean isVendorAvailable() { + log.info("Checking if Redshift classes are available"); + return hasRedshiftClasses(); + } + + @Override + public Optional getVisitorFactory() { + return Optional.of(new SnowflakeVisitorFactory()); + } + + @Override + public Optional getEventHandlerFactory() { + return Optional.of(new RedshiftEventHandlerFactory()); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftDataset.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftDataset.java new file mode 100644 index 0000000000000..0e7fd4ce942bd --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftDataset.java @@ -0,0 +1,72 @@ +package io.openlineage.spark.agent.vendor.redshift.lifecycle; + +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.agent.util.SqlUtils; +import io.openlineage.spark.api.DatasetFactory; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Slf4j +public class RedshiftDataset { + public static final String REDSHIFT_PREFIX = "redshift://"; + + private static final Logger logger = LoggerFactory.getLogger(RedshiftDataset.class); + public static final String DEFAULT_SCHEMA = "public"; + + public static List getDatasets( + DatasetFactory factory, + String url, + Optional dbtable, + Optional query, + StructType schema) + throws URISyntaxException { + + URI jdbcUrl = + new URI( + REDSHIFT_PREFIX + + url.replace("jdbc:redshift:iam://", "").replace("jdbc:redshift://", "")); + String db = jdbcUrl.getPath().substring(1); // remove leading slash + final String namespace = + jdbcUrl.getScheme() + "://" + jdbcUrl.getHost() + ":" + jdbcUrl.getPort(); + + final String tableName; + // https://github.com/databricks/spark-redshift?tab=readme-ov-file + // > Specify one of the following options for the table data to be read: + // > - `dbtable`: The name of the table to be read. All columns and records are retrieved + // > (i.e. it is equivalent to SELECT * FROM db_table). + // > - `query`: The exact query (SELECT statement) to run. + // If dbtable is null it will be replaced with the string `complex` and it means the query + // option was used. + // An improvement could be put the query string in the `DatasetFacets` + if (dbtable.isPresent()) { + tableName = dbtable.get(); + String[] splits = tableName.split("\\."); + String table = tableName; + if (splits.length == 1) { + table = String.format("%s.%s.%s", db, DEFAULT_SCHEMA, tableName); + } else if (splits.length == 2) { + table = String.format("%s.%s", db, tableName); + } else if (splits.length == 3) { + table = tableName; + } else { + logger.warn("Redshift getDataset: tableName: {} is not in the expected format", tableName); + return Collections.emptyList(); + } + + return Collections.singletonList(factory.getDataset(table, namespace, schema)); + } else if (query.isPresent()) { + return SqlUtils.getDatasets(factory, query.get(), "redshift", namespace, db, DEFAULT_SCHEMA); + } else { + logger.warn( + "Unable to discover Redshift table property - neither \"dbtable\" nor \"query\" option present"); + } + return Collections.emptyList(); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftRelationVisitor.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftRelationVisitor.java new file mode 100644 index 0000000000000..02eb2d1745558 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftRelationVisitor.java @@ -0,0 +1,66 @@ +package io.openlineage.spark.agent.vendor.redshift.lifecycle; + +import io.github.spark_redshift_community.spark.redshift.Parameters; +import io.github.spark_redshift_community.spark.redshift.RedshiftRelation; +import io.github.spark_redshift_community.spark.redshift.TableName; +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.agent.util.ScalaConversionUtils; +import io.openlineage.spark.api.DatasetFactory; +import io.openlineage.spark.api.OpenLineageContext; +import io.openlineage.spark.api.QueryPlanVisitor; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.LogicalRelation; +import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand; + +/** + * {@link LogicalPlan} visitor that matches {@link SaveIntoDataSourceCommand}s that use a {@link + * RedshiftRelation}. This function extracts a {@link OpenLineage.Dataset} from the Redshift table + * referenced by the relation. + */ +@Slf4j +public class RedshiftRelationVisitor + extends QueryPlanVisitor { + private static final String REDSHIFT_NAMESPACE = "redshift"; + private static final String REDSHIFT_CLASS_NAME = + "io.github.spark_redshift_community.spark.redshift.RedshiftRelation"; + private final DatasetFactory factory; + + public RedshiftRelationVisitor(@NonNull OpenLineageContext context, DatasetFactory factory) { + super(context); + this.factory = factory; + log.info("RedshiftRelationVisitor created"); + } + + @Override + public List apply(LogicalPlan x) { + RedshiftRelation relation = (RedshiftRelation) ((LogicalRelation) x).relation(); + Parameters.MergedParameters params = relation.params(); + Optional dbtable = + (Optional) + ScalaConversionUtils.asJavaOptional(params.table().map(TableName::toString)); + Optional query = ScalaConversionUtils.asJavaOptional(params.query()); + return Collections.singletonList( + factory.getDataset(dbtable.orElse(""), REDSHIFT_NAMESPACE, relation.schema())); + } + + protected boolean isRedshiftClass(LogicalPlan plan) { + try { + Class c = Thread.currentThread().getContextClassLoader().loadClass(REDSHIFT_CLASS_NAME); + return (plan instanceof LogicalRelation + && c.isAssignableFrom(((LogicalRelation) plan).relation().getClass())); + } catch (Exception e) { + // swallow - not a snowflake class + } + return false; + } + + @Override + public boolean isDefinedAt(LogicalPlan plan) { + return isRedshiftClass(plan); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftVisitorFactory.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftVisitorFactory.java new file mode 100644 index 0000000000000..1003c863227b8 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/RedshiftVisitorFactory.java @@ -0,0 +1,26 @@ +package io.openlineage.spark.agent.vendor.redshift.lifecycle; + +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.agent.lifecycle.VisitorFactory; +import io.openlineage.spark.api.DatasetFactory; +import io.openlineage.spark.api.OpenLineageContext; +import java.util.Collections; +import java.util.List; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import scala.PartialFunction; + +public class RedshiftVisitorFactory implements VisitorFactory { + @Override + public List>> getInputVisitors( + OpenLineageContext context) { + DatasetFactory factory = DatasetFactory.input(context); + return Collections.singletonList(new RedshiftRelationVisitor<>(context, factory)); + } + + @Override + public List>> getOutputVisitors( + OpenLineageContext context) { + DatasetFactory factory = DatasetFactory.output(context); + return Collections.singletonList(new RedshiftRelationVisitor<>(context, factory)); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftEventHandlerFactory.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftEventHandlerFactory.java new file mode 100644 index 0000000000000..4cd1ba996fe88 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftEventHandlerFactory.java @@ -0,0 +1,20 @@ +package io.openlineage.spark.agent.vendor.redshift.lifecycle.plan; + +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.api.OpenLineageContext; +import io.openlineage.spark.api.OpenLineageEventHandlerFactory; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import scala.PartialFunction; + +public class RedshiftEventHandlerFactory implements OpenLineageEventHandlerFactory { + @Override + public Collection>> + createOutputDatasetBuilder(OpenLineageContext context) { + // The right function will be determined at runtime by using type checking based on the correct + // Spark LogicalPlan + return Collections.singleton( + (PartialFunction) new RedshiftSaveIntoDataSourceCommandBuilder(context)); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftSaveIntoDataSourceCommandBuilder.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftSaveIntoDataSourceCommandBuilder.java new file mode 100644 index 0000000000000..e484458d40aeb --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/vendor/redshift/lifecycle/plan/RedshiftSaveIntoDataSourceCommandBuilder.java @@ -0,0 +1,80 @@ +package io.openlineage.spark.agent.vendor.redshift.lifecycle.plan; + +import static io.openlineage.spark.agent.vendor.redshift.RedshiftVendor.hasRedshiftClasses; + +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.agent.util.PlanUtils; +import io.openlineage.spark.agent.util.ScalaConversionUtils; +import io.openlineage.spark.agent.vendor.redshift.lifecycle.RedshiftDataset; +import io.openlineage.spark.api.AbstractQueryPlanDatasetBuilder; +import io.openlineage.spark.api.OpenLineageContext; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.scheduler.SparkListenerEvent; +import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand; +import org.apache.spark.sql.sources.CreatableRelationProvider; +import org.apache.spark.sql.types.StructType; + +@Slf4j +public class RedshiftSaveIntoDataSourceCommandBuilder + extends AbstractQueryPlanDatasetBuilder< + SparkListenerEvent, SaveIntoDataSourceCommand, OpenLineage.OutputDataset> { + + public RedshiftSaveIntoDataSourceCommandBuilder(OpenLineageContext context) { + super(context, false); + } + + @Override + public List apply(SaveIntoDataSourceCommand command) { + if (isRedshiftSource(command.dataSource())) { + // Called from SaveIntoDataSourceCommandVisitor on Snowflake write operations. + Map options = ScalaConversionUtils.fromMap(command.options()); + log.info("Redshift SaveIntoDataSourceCommand options: {}", options); + Optional dbtable = Optional.ofNullable(options.get("dbtable")); + Optional query = Optional.ofNullable(options.get("query")); + String url = options.get("url"); + + try { + return + // Similar to Kafka, Snowflake also has some special handling. So we use the method + // below for extracting the dataset from Snowflake write operations. + RedshiftDataset.getDatasets( + outputDataset(), url, dbtable, query, getSchema(command) + // command.schema() doesn't seem to contain the schema when tested with Azure + // Snowflake, + // so we use the helper to extract it from the logical plan. + ); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } else { + return Collections.emptyList(); + } + } + + public static boolean isRedshiftSource(CreatableRelationProvider provider) { + return hasRedshiftClasses(); // && provider instanceof DefaultSource; + } + + /** + * Taken from {@link + * io.openlineage.spark.agent.lifecycle.plan.SaveIntoDataSourceCommandVisitor#getSchema(org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand)} + * + * @param command + * @return + */ + private StructType getSchema(SaveIntoDataSourceCommand command) { + StructType schema = command.schema(); + if ((schema == null || schema.fields() == null || schema.fields().length == 0) + && command.query() != null + && command.query().output() != null) { + // get schema from logical plan's output + schema = PlanUtils.toStructType(ScalaConversionUtils.fromSeq(command.query().output())); + } + return schema; + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/Vendors.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/Vendors.java new file mode 100644 index 0000000000000..967935cb40468 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/Vendors.java @@ -0,0 +1,80 @@ +/* +/* Copyright 2018-2024 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark.api; + +import io.openlineage.spark.agent.lifecycle.VisitorFactory; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public interface Vendors { + + @SuppressWarnings("PMD.AvoidFieldNameMatchingTypeName") + List VENDORS = + Arrays.asList( + // Add vendor classes here + "io.openlineage.spark.agent.vendor.snowflake.SnowflakeVendor", + // This is the only chance we have to add the RedshiftVendor to the list of vendors + "io.openlineage.spark.agent.vendor.redshift.RedshiftVendor"); + + static Vendors getVendors() { + return getVendors(Collections.emptyList()); + } + + static Vendors getVendors(List additionalVendors) { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + + List vendors = + Stream.concat(VENDORS.stream(), additionalVendors.stream()) + .map( + vendorClassName -> { + try { + Class vendor = cl.loadClass(vendorClassName); + return (Vendor) vendor.newInstance(); + } catch (ClassNotFoundException + | InstantiationException + | IllegalAccessException e) { + return null; + } + }) + .filter(Objects::nonNull) + .filter(Vendor::isVendorAvailable) + .collect(Collectors.toList()); + // The main reason to avoid using the service loader and use static loading with the class name + // is to prevent potential missing loading caused by missing META-INF/services files. + // This can happen if the user packages the OpenLineage dependency in an Uber-jar without proper + // services file configuration + // The implementation with the ClassLoader and the list of vendor class names increase the + // coupling between the vendor + // and the app + // https://github.com/OpenLineage/OpenLineage/issues/1860 + // ServiceLoader serviceLoader = ServiceLoader.load(Vendor.class); + return new VendorsImpl(vendors); + } + + static Vendors empty() { + return new Vendors() { + + @Override + public Collection getVisitorFactories() { + return Collections.emptyList(); + } + + @Override + public Collection getEventHandlerFactories() { + return Collections.emptyList(); + } + }; + } + + Collection getVisitorFactories(); + + Collection getEventHandlerFactories(); +} diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/VendorsImpl.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/VendorsImpl.java new file mode 100644 index 0000000000000..66db4cf4f4e43 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/api/VendorsImpl.java @@ -0,0 +1,42 @@ +/* +/* Copyright 2018-2024 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark.api; + +import io.openlineage.spark.agent.lifecycle.VisitorFactory; +import io.openlineage.spark.agent.vendor.redshift.RedshiftVendor; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class VendorsImpl implements Vendors { + private final List vendors; + + public VendorsImpl(List vendors) { + this.vendors = vendors; + } + + @Override + public Collection getVisitorFactories() { + vendors.add(new RedshiftVendor()); + return vendors.stream() + .map(Vendor::getVisitorFactory) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + + @Override + public Collection getEventHandlerFactories() { + return vendors.stream() + .map(Vendor::getEventHandlerFactory) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/SparkStreamingEventToDatahubTest.java b/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java similarity index 94% rename from metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/SparkStreamingEventToDatahubTest.java rename to metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java index 6d4c57228a05b..1f6f1f2f28a43 100644 --- a/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/SparkStreamingEventToDatahubTest.java +++ b/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java @@ -26,7 +26,7 @@ import org.apache.commons.lang3.tuple.Triple; import org.junit.Assert; -public class SparkStreamingEventToDatahubTest extends TestCase { +public class OpenLineageEventToDatahubTest extends TestCase { public void testGenerateUrnFromStreamingDescriptionFile() throws URISyntaxException { Config datahubConfig = ConfigFactory.parseMap( @@ -538,4 +538,36 @@ public void testProcessGlueOlEventWithHiveAlias() throws URISyntaxException, IOE dataset.getUrn().toString()); } } + + public void testProcessRedshiftOutput() throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + builder.hivePlatformAlias("glue"); + builder.materializeDataset(true); + builder.includeSchemaMetadata(true); + + String olEvent = + IOUtils.toString( + this.getClass().getResourceAsStream("/ol_events/redshift_lineage_spark.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,DEV)", + dataset.getUrn().toString()); + } + for (DatahubDataset dataset : datahubJob.getOutSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.spark_redshift_load_test,DEV)", + dataset.getUrn().toString()); + assertEquals( + dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift"); + } + } } diff --git a/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_lineage_spark.json b/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_lineage_spark.json new file mode 100644 index 0000000000000..3b11b28207636 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_lineage_spark.json @@ -0,0 +1,147 @@ +{ + "eventTime": "2024-06-18T06:52:21.64Z", + "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", + "eventType": "COMPLETE", + "run": { + "runId": "01902a1e-371a-7dbf-8098-2337d441e8dc", + "facets": { + "parent": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet", + "run": { + "runId": "01902a1e-0b05-750e-b38d-439998f7a853" + }, + "job": { + "namespace": "default", + "name": "jdbc_test_demo" + } + }, + "processing_engine": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet", + "version": "3.3.4", + "name": "spark" + }, + "environment-properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "environment-properties": {} + }, + "spark_properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "properties": { + "spark.master": "local[*]", + "spark.app.name": "JdbcTest-Demo" + } + } + } + }, + "job": { + "namespace": "default", + "name": "jdbc_test_demo.execute_save_into_data_source_command.spark_redshift_load_test", + "facets": { + "jobType": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", + "processingType": "BATCH", + "integration": "SPARK", + "jobType": "SQL_JOB" + } + } + }, + "inputs": [ + { + "namespace": "mysql://localhost:3306", + "name": "datahub.metadata_aspect_v2", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "mysql://localhost:3306", + "uri": "mysql://localhost:3306" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "urn", + "type": "string" + }, + { + "name": "aspect", + "type": "string" + }, + { + "name": "version", + "type": "long" + }, + { + "name": "metadata", + "type": "string" + }, + { + "name": "systemmetadata", + "type": "string" + }, + { + "name": "createdon", + "type": "timestamp" + }, + { + "name": "createdby", + "type": "string" + }, + { + "name": "createdfor", + "type": "string" + } + ] + } + }, + "inputFacets": {} + } + ], + "outputs": [ + { + "namespace": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439", + "name": "dev.public.spark_redshift_load_test", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439", + "uri": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "urn", + "type": "string" + } + ] + }, + "columnLineage": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet", + "fields": { + "urn": { + "inputFields": [ + { + "namespace": "mysql://localhost:3306", + "name": "datahub.metadata_aspect_v2", + "field": "urn" + } + ] + } + } + } + }, + "outputFacets": {} + } + ] +} \ No newline at end of file