Skip to content

Commit

Permalink
Supporting alias in insertHadoopFs command
Browse files Browse the repository at this point in the history
Fix platformInstance support
  • Loading branch information
treff7es committed Jun 4, 2024
1 parent 7501b9e commit 7b8e7cc
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
datahubUrn = getDatasetUrnFromOlDataset(namespace, datasetName, mappingConfig);
}

log.info("Dataset URN: {}, alias_list: {}", datahubUrn, mappingConfig.getUrnAliases());
log.debug("Dataset URN: {}, alias_list: {}", datahubUrn, mappingConfig.getUrnAliases());
// If we have the urn in urn aliases then we should use the alias instead of the original urn
if (datahubUrn.isPresent()
&& mappingConfig.getUrnAliases().containsKey(datahubUrn.get().toString())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ public static HdfsPathDataset create(URI path, DatahubOpenlineageConfig datahubC

String rawName = getRawNameFromUri(pathUri, pathSpec.getPathSpecList());
if (rawName != null) {
String platformInstance =
pathSpec.platformInstance.orElseGet(datahubConf::getCommonDatasetPlatformInstance);
String platformInstance = pathSpec.platformInstance.orElse(null);
FabricType fabricType = datahubConf.getFabricType();
return new HdfsPathDataset(
platform, getDatasetName(rawName), platformInstance, fabricType, rawName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testPathSpecListPlatformInstance()
SparkDataset dataset =
HdfsPathDataset.create(new URI("s3a://my-bucket/foo/tests/bar.avro"), datahubConfig);
Assert.assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:s3,instance.my-bucket/foo/tests,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/foo/tests,PROD)",
dataset.urn().toString());
}

Expand Down
6 changes: 3 additions & 3 deletions metadata-integration/java/spark-lineage-beta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ information like tokens.
| spark.datahub.rest.rest.max_retries | | 0 | Number of times a request retried if failed |
| spark.datahub.rest.rest.retry_interval | | 10 | Number of seconds to wait between retries |
| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
| spark.datahub.metadata.table.hive_platform_alias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` |
| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions |
Expand All @@ -181,8 +181,8 @@ information like tokens.
| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` |
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | true | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled.
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. |
|

## What to Expect: The Metadata Model
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.spark.agent.lifecycle.plan;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.spark.agent.util.PathUtils;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.QueryPlanVisitor;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import scala.Option;

/**
* {@link LogicalPlan} visitor that matches an {@link InsertIntoHadoopFsRelationCommand} and
* extracts the output {@link OpenLineage.Dataset} being written.
*/
public class InsertIntoHadoopFsRelationVisitor
extends QueryPlanVisitor<InsertIntoHadoopFsRelationCommand, OpenLineage.OutputDataset> {

public InsertIntoHadoopFsRelationVisitor(OpenLineageContext context) {
super(context);
}

@Override
public List<OpenLineage.OutputDataset> apply(LogicalPlan x) {
InsertIntoHadoopFsRelationCommand command = (InsertIntoHadoopFsRelationCommand) x;

Option<CatalogTable> catalogTable = command.catalogTable();
OpenLineage.OutputDataset outputDataset;

if (catalogTable.isEmpty()) {
DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file");
if (SaveMode.Overwrite == command.mode()) {
outputDataset =
outputDataset()
.getDataset(
di,
command.query().schema(),
OpenLineage.LifecycleStateChangeDatasetFacet.LifecycleStateChange.OVERWRITE);
} else {
outputDataset = outputDataset().getDataset(di, command.query().schema());
}
return Collections.singletonList(outputDataset);
} else {
if (SaveMode.Overwrite == command.mode()) {
return Collections.singletonList(
outputDataset()
.getDataset(
PathUtils.fromCatalogTable(catalogTable.get()),
catalogTable.get().schema(),
OpenLineage.LifecycleStateChangeDatasetFacet.LifecycleStateChange.CREATE));
} else {
return Collections.singletonList(
outputDataset()
.getDataset(
PathUtils.fromCatalogTable(catalogTable.get()), catalogTable.get().schema()));
}
}
}

@Override
public Optional<String> jobNameSuffix(InsertIntoHadoopFsRelationCommand command) {
if (command.catalogTable().isEmpty()) {
DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file");
return Optional.of(trimPath(di.getName()));
}
return Optional.of(
trimPath(PathUtils.fromCatalogTable(command.catalogTable().get()).getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void testPathSpecListPlatformInstance()
HdfsPathDataset.create(
new URI("s3a://my-bucket/foo/tests/bar.avro"), sparkLineageConf.getOpenLineageConf());
Assert.assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:s3,instance.my-bucket/foo/tests,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/foo/tests,PROD)",
dataset.urn().toString());
}

Expand Down

0 comments on commit 7b8e7cc

Please sign in to comment.