Skip to content

Commit

Permalink
Add support for Redshift spark connector
Browse files Browse the repository at this point in the history
Small fix for openlineage converter to get platform properly for SchemaField
  • Loading branch information
treff7es committed Jun 18, 2024
1 parent 7b8e7cc commit d51989a
Show file tree
Hide file tree
Showing 13 changed files with 644 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ private static void processJobInputs(
DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder();
builder.urn(datasetUrn.get());
if (datahubConf.isMaterializeDataset()) {
builder.schemaMetadata(getSchemaMetadata(input));
builder.schemaMetadata(getSchemaMetadata(input, datahubConf));
}
if (datahubConf.isCaptureColumnLevelLineage()) {
UpstreamLineage upstreamLineage = getFineGrainedLineage(input, datahubConf);
Expand Down Expand Up @@ -756,7 +756,7 @@ private static void processJobOutputs(
DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder();
builder.urn(datasetUrn.get());
if (datahubConf.isMaterializeDataset()) {
builder.schemaMetadata(getSchemaMetadata(output));
builder.schemaMetadata(getSchemaMetadata(output, datahubConf));
}
if (datahubConf.isCaptureColumnLevelLineage()) {
UpstreamLineage upstreamLineage = getFineGrainedLineage(output, datahubConf);
Expand Down Expand Up @@ -887,7 +887,8 @@ public static SchemaFieldDataType.Type convertOlFieldTypeToDHFieldType(
}
}

public static SchemaMetadata getSchemaMetadata(OpenLineage.Dataset dataset) {
public static SchemaMetadata getSchemaMetadata(
OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig) {
SchemaFieldArray schemaFieldArray = new SchemaFieldArray();
if ((dataset.getFacets() == null) || (dataset.getFacets().getSchema() == null)) {
return null;
Expand Down Expand Up @@ -916,9 +917,16 @@ public static SchemaMetadata getSchemaMetadata(OpenLineage.Dataset dataset) {
ddl.setTableSchema(OpenLineageClientUtils.toJson(dataset.getFacets().getSchema().getFields()));
SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
platformSchema.setMySqlDDL(ddl);
Optional<DatasetUrn> datasetUrn =
getDatasetUrnFromOlDataset(dataset.getNamespace(), dataset.getName(), mappingConfig);

if (!datasetUrn.isPresent()) {
return null;
}

schemaMetadata.setPlatformSchema(platformSchema);

schemaMetadata.setPlatform(new DataPlatformUrn(dataset.getNamespace()));
schemaMetadata.setPlatform(datasetUrn.get().getPlatformEntity());

schemaMetadata.setFields(schemaFieldArray);
return schemaMetadata;
Expand Down
1 change: 1 addition & 0 deletions metadata-integration/java/spark-lineage-beta/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion"
compileOnly "org.apache.iceberg:iceberg-spark3-runtime:0.12.1"
compileOnly "org.apache.spark:spark-sql_2.12:3.1.3"
compileOnly "io.github.spark-redshift-community:spark-redshift_2.12:6.2.0-spark_3.5"

testCompileOnly externalDependency.lombok
testAnnotationProcessor externalDependency.lombok
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.openlineage.spark.agent.vendor.redshift;

public class Constants {
public static final String REDSHIFT_CLASS_NAME =
"io.github.spark_redshift_community.spark.redshift.RedshiftRelation";

public static final String REDSHIFT_PROVIDER_CLASS_NAME =
"io.github.spark_redshift_community.spark.redshift.DefaultSource";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.openlineage.spark.agent.vendor.redshift;

import static io.openlineage.spark.agent.vendor.redshift.Constants.*;

import io.openlineage.spark.agent.lifecycle.VisitorFactory;
import io.openlineage.spark.agent.vendor.redshift.lifecycle.RedshiftRelationVisitor;
import io.openlineage.spark.agent.vendor.redshift.lifecycle.plan.RedshiftEventHandlerFactory;
import io.openlineage.spark.agent.vendor.snowflake.lifecycle.SnowflakeVisitorFactory;
import io.openlineage.spark.api.OpenLineageEventHandlerFactory;
import io.openlineage.spark.api.Vendor;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RedshiftVendor implements Vendor {

public static boolean hasRedshiftClasses() {
/*
Checking the Redshift class with both
SnowflakeRelationVisitor.class.getClassLoader.loadClass and
Thread.currentThread().getContextClassLoader().loadClass. The first checks if the class is
present on the classpath, and the second one is a catchall which captures if the class has
been installed. This is relevant for Azure Databricks where jars can be installed and
accessible to the user, even if they are not present on the classpath.
*/
try {
RedshiftRelationVisitor.class.getClassLoader().loadClass(REDSHIFT_PROVIDER_CLASS_NAME);
return true;
} catch (Exception e) {
// swallow - we don't care
}
try {
Thread.currentThread().getContextClassLoader().loadClass(REDSHIFT_PROVIDER_CLASS_NAME);
return true;
} catch (Exception e) {
// swallow - we don't care
}
return false;
}

@Override
public boolean isVendorAvailable() {
log.info("Checking if Redshift classes are available");
return hasRedshiftClasses();
}

@Override
public Optional<VisitorFactory> getVisitorFactory() {
return Optional.of(new SnowflakeVisitorFactory());
}

@Override
public Optional<OpenLineageEventHandlerFactory> getEventHandlerFactory() {
return Optional.of(new RedshiftEventHandlerFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.openlineage.spark.agent.vendor.redshift.lifecycle;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.util.SqlUtils;
import io.openlineage.spark.api.DatasetFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public class RedshiftDataset {
public static final String REDSHIFT_PREFIX = "redshift://";

private static final Logger logger = LoggerFactory.getLogger(RedshiftDataset.class);
public static final String DEFAULT_SCHEMA = "public";

public static <D extends OpenLineage.Dataset> List<D> getDatasets(
DatasetFactory<D> factory,
String url,
Optional<String> dbtable,
Optional<String> query,
StructType schema)
throws URISyntaxException {

URI jdbcUrl =
new URI(
REDSHIFT_PREFIX
+ url.replace("jdbc:redshift:iam://", "").replace("jdbc:redshift://", ""));
String db = jdbcUrl.getPath().substring(1); // remove leading slash
final String namespace =
jdbcUrl.getScheme() + "://" + jdbcUrl.getHost() + ":" + jdbcUrl.getPort();

final String tableName;
// https://github.com/databricks/spark-redshift?tab=readme-ov-file
// > Specify one of the following options for the table data to be read:
// > - `dbtable`: The name of the table to be read. All columns and records are retrieved
// > (i.e. it is equivalent to SELECT * FROM db_table).
// > - `query`: The exact query (SELECT statement) to run.
// If dbtable is null it will be replaced with the string `complex` and it means the query
// option was used.
// An improvement could be put the query string in the `DatasetFacets`
if (dbtable.isPresent()) {
tableName = dbtable.get();
String[] splits = tableName.split("\\.");
String table = tableName;
if (splits.length == 1) {
table = String.format("%s.%s.%s", db, DEFAULT_SCHEMA, tableName);
} else if (splits.length == 2) {
table = String.format("%s.%s", db, tableName);
} else if (splits.length == 3) {
table = tableName;
} else {
logger.warn("Redshift getDataset: tableName: {} is not in the expected format", tableName);
return Collections.emptyList();
}

return Collections.singletonList(factory.getDataset(table, namespace, schema));
} else if (query.isPresent()) {
return SqlUtils.getDatasets(factory, query.get(), "redshift", namespace, db, DEFAULT_SCHEMA);
} else {
logger.warn(
"Unable to discover Redshift table property - neither \"dbtable\" nor \"query\" option present");
}
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.openlineage.spark.agent.vendor.redshift.lifecycle;

import io.github.spark_redshift_community.spark.redshift.Parameters;
import io.github.spark_redshift_community.spark.redshift.RedshiftRelation;
import io.github.spark_redshift_community.spark.redshift.TableName;
import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.DatasetFactory;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.QueryPlanVisitor;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.LogicalRelation;
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand;

/**
* {@link LogicalPlan} visitor that matches {@link SaveIntoDataSourceCommand}s that use a {@link
* RedshiftRelation}. This function extracts a {@link OpenLineage.Dataset} from the Redshift table
* referenced by the relation.
*/
@Slf4j
public class RedshiftRelationVisitor<D extends OpenLineage.Dataset>
extends QueryPlanVisitor<LogicalRelation, D> {
private static final String REDSHIFT_NAMESPACE = "redshift";
private static final String REDSHIFT_CLASS_NAME =
"io.github.spark_redshift_community.spark.redshift.RedshiftRelation";
private final DatasetFactory<D> factory;

public RedshiftRelationVisitor(@NonNull OpenLineageContext context, DatasetFactory<D> factory) {
super(context);
this.factory = factory;
log.info("RedshiftRelationVisitor created");
}

@Override
public List<D> apply(LogicalPlan x) {
RedshiftRelation relation = (RedshiftRelation) ((LogicalRelation) x).relation();
Parameters.MergedParameters params = relation.params();
Optional<String> dbtable =
(Optional<String>)
ScalaConversionUtils.asJavaOptional(params.table().map(TableName::toString));
Optional<String> query = ScalaConversionUtils.asJavaOptional(params.query());
return Collections.singletonList(
factory.getDataset(dbtable.orElse(""), REDSHIFT_NAMESPACE, relation.schema()));
}

protected boolean isRedshiftClass(LogicalPlan plan) {
try {
Class c = Thread.currentThread().getContextClassLoader().loadClass(REDSHIFT_CLASS_NAME);
return (plan instanceof LogicalRelation
&& c.isAssignableFrom(((LogicalRelation) plan).relation().getClass()));
} catch (Exception e) {
// swallow - not a snowflake class
}
return false;
}

@Override
public boolean isDefinedAt(LogicalPlan plan) {
return isRedshiftClass(plan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.openlineage.spark.agent.vendor.redshift.lifecycle;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.lifecycle.VisitorFactory;
import io.openlineage.spark.api.DatasetFactory;
import io.openlineage.spark.api.OpenLineageContext;
import java.util.Collections;
import java.util.List;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import scala.PartialFunction;

public class RedshiftVisitorFactory implements VisitorFactory {
@Override
public List<PartialFunction<LogicalPlan, List<OpenLineage.InputDataset>>> getInputVisitors(
OpenLineageContext context) {
DatasetFactory<OpenLineage.InputDataset> factory = DatasetFactory.input(context);
return Collections.singletonList(new RedshiftRelationVisitor<>(context, factory));
}

@Override
public List<PartialFunction<LogicalPlan, List<OpenLineage.OutputDataset>>> getOutputVisitors(
OpenLineageContext context) {
DatasetFactory<OpenLineage.OutputDataset> factory = DatasetFactory.output(context);
return Collections.singletonList(new RedshiftRelationVisitor<>(context, factory));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.openlineage.spark.agent.vendor.redshift.lifecycle.plan;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.OpenLineageEventHandlerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import scala.PartialFunction;

public class RedshiftEventHandlerFactory implements OpenLineageEventHandlerFactory {
@Override
public Collection<PartialFunction<Object, List<OpenLineage.OutputDataset>>>
createOutputDatasetBuilder(OpenLineageContext context) {
// The right function will be determined at runtime by using type checking based on the correct
// Spark LogicalPlan
return Collections.singleton(
(PartialFunction) new RedshiftSaveIntoDataSourceCommandBuilder(context));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.openlineage.spark.agent.vendor.redshift.lifecycle.plan;

import static io.openlineage.spark.agent.vendor.redshift.RedshiftVendor.hasRedshiftClasses;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.agent.vendor.redshift.lifecycle.RedshiftDataset;
import io.openlineage.spark.api.AbstractQueryPlanDatasetBuilder;
import io.openlineage.spark.api.OpenLineageContext;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand;
import org.apache.spark.sql.sources.CreatableRelationProvider;
import org.apache.spark.sql.types.StructType;

@Slf4j
public class RedshiftSaveIntoDataSourceCommandBuilder
extends AbstractQueryPlanDatasetBuilder<
SparkListenerEvent, SaveIntoDataSourceCommand, OpenLineage.OutputDataset> {

public RedshiftSaveIntoDataSourceCommandBuilder(OpenLineageContext context) {
super(context, false);
}

@Override
public List<OpenLineage.OutputDataset> apply(SaveIntoDataSourceCommand command) {
if (isRedshiftSource(command.dataSource())) {
// Called from SaveIntoDataSourceCommandVisitor on Snowflake write operations.
Map<String, String> options = ScalaConversionUtils.<String, String>fromMap(command.options());
log.info("Redshift SaveIntoDataSourceCommand options: {}", options);
Optional<String> dbtable = Optional.ofNullable(options.get("dbtable"));
Optional<String> query = Optional.ofNullable(options.get("query"));
String url = options.get("url");

try {
return
// Similar to Kafka, Snowflake also has some special handling. So we use the method
// below for extracting the dataset from Snowflake write operations.
RedshiftDataset.getDatasets(
outputDataset(), url, dbtable, query, getSchema(command)
// command.schema() doesn't seem to contain the schema when tested with Azure
// Snowflake,
// so we use the helper to extract it from the logical plan.
);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
} else {
return Collections.emptyList();
}
}

public static boolean isRedshiftSource(CreatableRelationProvider provider) {
return hasRedshiftClasses(); // && provider instanceof DefaultSource;
}

/**
* Taken from {@link
* io.openlineage.spark.agent.lifecycle.plan.SaveIntoDataSourceCommandVisitor#getSchema(org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand)}
*
* @param command
* @return
*/
private StructType getSchema(SaveIntoDataSourceCommand command) {
StructType schema = command.schema();
if ((schema == null || schema.fields() == null || schema.fields().length == 0)
&& command.query() != null
&& command.query().output() != null) {
// get schema from logical plan's output
schema = PlanUtils.toStructType(ScalaConversionUtils.fromSeq(command.query().output()));
}
return schema;
}
}
Loading

0 comments on commit d51989a

Please sign in to comment.