diff --git a/pom.xml b/pom.xml
index 054d18f0..8e429679 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
4.0.0
com.hortonworks.spark
- spark-atlas-connector-main_2.11
+ spark-atlas-connector-main
0.1.0-SNAPSHOT
pom
@@ -29,11 +29,11 @@
UTF-8
UTF-8
1.8
- 2.4.0
+ 2.4.5
2.0.0
3.5.4
- 2.11.12
- 2.11
+ 2.12.11
+ 2.12
2.0.1
512m
512m
@@ -65,6 +65,15 @@
+
+
+ io.delta
+ delta-core_${scala.binary.version}
+ 0.6.0
+ provided
+
+
+
org.scala-lang
scala-library
@@ -123,7 +132,7 @@
-->
com.fasterxml.jackson.module
- jackson-module-scala_2.11
+ jackson-module-scala_${scala.binary.version}
${jackson.version}
test
@@ -240,7 +249,7 @@
org.scalacheck
scalacheck_${scala.binary.version}
- 1.12.5
+ 1.14.3
diff --git a/spark-atlas-connector-assembly/pom.xml b/spark-atlas-connector-assembly/pom.xml
index e366ca2a..2eaa82c0 100644
--- a/spark-atlas-connector-assembly/pom.xml
+++ b/spark-atlas-connector-assembly/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.hortonworks.spark
- spark-atlas-connector-main_2.11
+ spark-atlas-connector-main
0.1.0-SNAPSHOT
../pom.xml
@@ -33,7 +33,7 @@
com.hortonworks.spark
- spark-atlas-connector_${scala.binary.version}
+ spark-atlas-connector
${project.version}
diff --git a/spark-atlas-connector/pom.xml b/spark-atlas-connector/pom.xml
index 9fe194ef..d8619739 100644
--- a/spark-atlas-connector/pom.xml
+++ b/spark-atlas-connector/pom.xml
@@ -22,11 +22,11 @@
com.hortonworks.spark
- spark-atlas-connector-main_2.11
+ spark-atlas-connector-main
0.1.0-SNAPSHOT
../pom.xml
- spark-atlas-connector_2.11
+ spark-atlas-connector
jar
diff --git a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala
index 7438ab5e..bb4eb9d0 100644
--- a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala
+++ b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala
@@ -39,6 +39,9 @@ import com.hortonworks.spark.atlas.sql.SparkExecutionPlanProcessor.SinkDataSourc
import com.hortonworks.spark.atlas.types.{AtlasEntityUtils, external, internal}
import com.hortonworks.spark.atlas.utils.SparkUtils.sparkSession
import com.hortonworks.spark.atlas.utils.{Logging, SparkUtils}
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.delta.DeltaLog
+import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.streaming.SinkProgress
@@ -165,6 +168,7 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
}
}
+ // TODO : ADD Support For Detla Table output source
object SaveIntoDataSourceHarvester extends Harvester[SaveIntoDataSourceCommand] {
override def harvest(
node: SaveIntoDataSourceCommand,
@@ -175,6 +179,10 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
case SHCEntities(shcEntities) => Seq(shcEntities)
case JDBCEntities(jdbcEntities) => Seq(jdbcEntities)
case KafkaEntities(kafkaEntities) => kafkaEntities
+ case e if e.dataSource.isInstanceOf[DeltaDataSource] =>
+ val path = node.options.getOrElse("path", "none")
+ val entity = external.pathToEntity(path)
+ Seq(entity)
case e =>
logWarn(s"Missing output entities: $e")
Seq.empty
@@ -239,6 +247,12 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
tChildren.flatMap {
case r: HiveTableRelation => Seq(tableToEntity(r.tableMeta))
case v: View => Seq(tableToEntity(v.desc))
+ case LogicalRelation(fileRelation: FileRelation, _, catalogTable, _)
+ if fileRelation.getClass.getName.contains("org.apache.spark.sql.delta.DeltaLog") =>
+ if (fileRelation.inputFiles.nonEmpty) {
+ val path = new Path(fileRelation.inputFiles.head).getParent.toString
+ Seq(external.pathToEntity(path))
+ } else Seq.empty
case LogicalRelation(fileRelation: FileRelation, _, catalogTable, _) =>
catalogTable.map(tbl => Seq(tableToEntity(tbl))).getOrElse(
fileRelation.inputFiles.flatMap(file => Seq(external.pathToEntity(file))).toSeq)
diff --git a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala
index 1c205223..71b70cf3 100644
--- a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala
+++ b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala
@@ -41,9 +41,14 @@ object external {
val S3_OBJECT_TYPE_STRING = "aws_s3_object"
val S3_PSEUDO_DIR_TYPE_STRING = "aws_s3_pseudo_dir"
val S3_BUCKET_TYPE_STRING = "aws_s3_bucket"
+ val GS_OBJECT_TYPE_STRING = "gcp_gs_object"
+ val GS_PSEUDO_DIR_TYPE_STRING = "gcp_gs_pseudo_dir"
+ val GS_BUCKET_TYPE_STRING = "gcp_gs_bucket"
private def isS3Schema(schema: String): Boolean = schema.matches("s3[an]?")
+ private def isGSSchema(schema: String): Boolean = schema.matches("gs")
+
private def extractS3Entity(uri: URI, fsPath: Path): SACAtlasEntityWithDependencies = {
val path = Path.getPathWithoutSchemeAndAuthority(fsPath).toString
@@ -81,6 +86,43 @@ object external {
new SACAtlasEntityWithDependencies(objectEntity, Seq(dirEntityWithDeps))
}
+ private def extractGSEntity(uri: URI, fsPath: Path): SACAtlasEntityWithDependencies = {
+ val path = Path.getPathWithoutSchemeAndAuthority(fsPath).toString
+
+ val bucketName = uri.getAuthority
+ val bucketQualifiedName = s"gs://${bucketName}"
+ val dirName = path.replaceFirst("[^/]*$", "")
+ val dirQualifiedName = bucketQualifiedName + dirName
+ val objectName = path.replaceFirst("^.*/", "")
+ val objectQualifiedName = dirQualifiedName + objectName
+
+ // bucket
+ val bucketEntity = new AtlasEntity(GS_BUCKET_TYPE_STRING)
+ bucketEntity.setAttribute("name", bucketName)
+ bucketEntity.setAttribute("qualifiedName", bucketQualifiedName)
+
+ // pseudo dir
+ val dirEntity = new AtlasEntity(GS_PSEUDO_DIR_TYPE_STRING)
+ dirEntity.setAttribute("name", dirName)
+ dirEntity.setAttribute("qualifiedName", dirQualifiedName)
+ dirEntity.setAttribute("objectPrefix", dirQualifiedName)
+ dirEntity.setAttribute("bucket", AtlasUtils.entityToReference(bucketEntity))
+
+ // object
+ val objectEntity = new AtlasEntity(GS_OBJECT_TYPE_STRING)
+ objectEntity.setAttribute("name", objectName)
+ objectEntity.setAttribute("path", path)
+ objectEntity.setAttribute("qualifiedName", objectQualifiedName)
+ objectEntity.setAttribute("pseudoDirectory", AtlasUtils.entityToReference(dirEntity))
+
+ // dir entity depends on bucket entity
+ val dirEntityWithDeps = new SACAtlasEntityWithDependencies(dirEntity,
+ Seq(SACAtlasEntityWithDependencies(bucketEntity)))
+
+ // object entity depends on dir entity
+ new SACAtlasEntityWithDependencies(objectEntity, Seq(dirEntityWithDeps))
+ }
+
def pathToEntity(path: String): SACAtlasEntityWithDependencies = {
val uri = resolveURI(path)
val fsPath = new Path(uri)
@@ -96,6 +138,8 @@ object external {
SACAtlasEntityWithDependencies(entity)
} else if (isS3Schema(uri.getScheme)) {
extractS3Entity(uri, fsPath)
+ } else if (isGSSchema(uri.getScheme)) {
+ extractGSEntity(uri, fsPath)
} else {
val entity = new AtlasEntity(FS_PATH_TYPE_STRING)
entity.setAttribute("name",