From c60eedee63616e5d0babcf72dd4da7ffa84fcff8 Mon Sep 17 00:00:00 2001 From: mahmeahmed Date: Sun, 28 Jun 2020 09:34:02 +0200 Subject: [PATCH 1/4] build with scala-2.12.11 --- pom.xml | 10 +++++----- spark-atlas-connector-assembly/pom.xml | 4 ++-- spark-atlas-connector/pom.xml | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 054d18f0..feff8ebf 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ 4.0.0 com.hortonworks.spark - spark-atlas-connector-main_2.11 + spark-atlas-connector-main 0.1.0-SNAPSHOT pom @@ -29,11 +29,11 @@ UTF-8 UTF-8 1.8 - 2.4.0 + 2.4.5 2.0.0 3.5.4 - 2.11.12 - 2.11 + 2.12.11 + 2.12 2.0.1 512m 512m @@ -239,7 +239,7 @@ org.scalacheck - scalacheck_${scala.binary.version} + scalacheck_2.11 1.12.5 diff --git a/spark-atlas-connector-assembly/pom.xml b/spark-atlas-connector-assembly/pom.xml index e366ca2a..2eaa82c0 100644 --- a/spark-atlas-connector-assembly/pom.xml +++ b/spark-atlas-connector-assembly/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.hortonworks.spark - spark-atlas-connector-main_2.11 + spark-atlas-connector-main 0.1.0-SNAPSHOT ../pom.xml @@ -33,7 +33,7 @@ com.hortonworks.spark - spark-atlas-connector_${scala.binary.version} + spark-atlas-connector ${project.version} diff --git a/spark-atlas-connector/pom.xml b/spark-atlas-connector/pom.xml index 9fe194ef..d8619739 100644 --- a/spark-atlas-connector/pom.xml +++ b/spark-atlas-connector/pom.xml @@ -22,11 +22,11 @@ com.hortonworks.spark - spark-atlas-connector-main_2.11 + spark-atlas-connector-main 0.1.0-SNAPSHOT ../pom.xml - spark-atlas-connector_2.11 + spark-atlas-connector jar From 5128e3812f119aeb436558b082074b757256d498 Mon Sep 17 00:00:00 2001 From: mahmeahmed Date: Mon, 29 Jun 2020 00:49:27 +0200 Subject: [PATCH 2/4] support delta lake output --- pom.xml | 15 +++++-- .../spark/atlas/sql/CommandsHarvester.scala | 6 +++ .../spark/atlas/types/external.scala | 41 +++++++++++++++++++ 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index feff8ebf..8e429679 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,15 @@ + + + io.delta + delta-core_${scala.binary.version} + 0.6.0 + provided + + + org.scala-lang scala-library @@ -123,7 +132,7 @@ --> com.fasterxml.jackson.module - jackson-module-scala_2.11 + jackson-module-scala_${scala.binary.version} ${jackson.version} test @@ -239,8 +248,8 @@ org.scalacheck - scalacheck_2.11 - 1.12.5 + scalacheck_${scala.binary.version} + 1.14.3 diff --git a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala index 7438ab5e..bb2a20a3 100644 --- a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala +++ b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala @@ -39,6 +39,7 @@ import com.hortonworks.spark.atlas.sql.SparkExecutionPlanProcessor.SinkDataSourc import com.hortonworks.spark.atlas.types.{AtlasEntityUtils, external, internal} import com.hortonworks.spark.atlas.utils.SparkUtils.sparkSession import com.hortonworks.spark.atlas.utils.{Logging, SparkUtils} +import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.streaming.SinkProgress @@ -165,6 +166,7 @@ object CommandsHarvester extends AtlasEntityUtils with Logging { } } + // TODO : ADD Support For Detla Table output source object SaveIntoDataSourceHarvester extends Harvester[SaveIntoDataSourceCommand] { override def harvest( node: SaveIntoDataSourceCommand, @@ -175,6 +177,10 @@ object CommandsHarvester extends AtlasEntityUtils with Logging { case SHCEntities(shcEntities) => Seq(shcEntities) case JDBCEntities(jdbcEntities) => Seq(jdbcEntities) case KafkaEntities(kafkaEntities) => kafkaEntities + case e if e.dataSource.isInstanceOf[DeltaDataSource] => + val path = node.options.getOrElse("path", "none") + val entity = external.pathToEntity(path) + Seq(entity) case e => logWarn(s"Missing output entities: $e") Seq.empty diff --git a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala index 1c205223..46b72e36 100644 --- a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala +++ b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala @@ -44,6 +44,8 @@ object external { private def isS3Schema(schema: String): Boolean = schema.matches("s3[an]?") + private def isGSSchema(schema: String): Boolean = schema.matches("gs") + private def extractS3Entity(uri: URI, fsPath: Path): SACAtlasEntityWithDependencies = { val path = Path.getPathWithoutSchemeAndAuthority(fsPath).toString @@ -81,6 +83,43 @@ object external { new SACAtlasEntityWithDependencies(objectEntity, Seq(dirEntityWithDeps)) } + private def extractGSEntity(uri: URI, fsPath: Path): SACAtlasEntityWithDependencies = { + val path = Path.getPathWithoutSchemeAndAuthority(fsPath).toString + + val bucketName = uri.getAuthority + val bucketQualifiedName = s"gs://${bucketName}" + val dirName = path.replaceFirst("[^/]*$", "") + val dirQualifiedName = bucketQualifiedName + dirName + val objectName = path.replaceFirst("^.*/", "") + val objectQualifiedName = dirQualifiedName + objectName + + // bucket + val bucketEntity = new AtlasEntity(S3_BUCKET_TYPE_STRING) + bucketEntity.setAttribute("name", bucketName) + bucketEntity.setAttribute("qualifiedName", bucketQualifiedName) + + // pseudo dir + val dirEntity = new AtlasEntity(S3_PSEUDO_DIR_TYPE_STRING) + dirEntity.setAttribute("name", dirName) + dirEntity.setAttribute("qualifiedName", dirQualifiedName) + dirEntity.setAttribute("objectPrefix", dirQualifiedName) + dirEntity.setAttribute("bucket", AtlasUtils.entityToReference(bucketEntity)) + + // object + val objectEntity = new AtlasEntity(S3_OBJECT_TYPE_STRING) + objectEntity.setAttribute("name", objectName) + objectEntity.setAttribute("path", path) + objectEntity.setAttribute("qualifiedName", objectQualifiedName) + objectEntity.setAttribute("pseudoDirectory", AtlasUtils.entityToReference(dirEntity)) + + // dir entity depends on bucket entity + val dirEntityWithDeps = new SACAtlasEntityWithDependencies(dirEntity, + Seq(SACAtlasEntityWithDependencies(bucketEntity))) + + // object entity depends on dir entity + new SACAtlasEntityWithDependencies(objectEntity, Seq(dirEntityWithDeps)) + } + def pathToEntity(path: String): SACAtlasEntityWithDependencies = { val uri = resolveURI(path) val fsPath = new Path(uri) @@ -96,6 +135,8 @@ object external { SACAtlasEntityWithDependencies(entity) } else if (isS3Schema(uri.getScheme)) { extractS3Entity(uri, fsPath) + } else if (isGSSchema(uri.getScheme)) { + extractGSEntity(uri, fsPath) } else { val entity = new AtlasEntity(FS_PATH_TYPE_STRING) entity.setAttribute("name", From 76565846e13a49cc68f6a2844683bf9131fe031f Mon Sep 17 00:00:00 2001 From: mahmeahmed Date: Mon, 29 Jun 2020 02:59:45 +0200 Subject: [PATCH 3/4] support delta lake input --- .../hortonworks/spark/atlas/sql/CommandsHarvester.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala index bb2a20a3..bb4eb9d0 100644 --- a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala +++ b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/sql/CommandsHarvester.scala @@ -39,6 +39,8 @@ import com.hortonworks.spark.atlas.sql.SparkExecutionPlanProcessor.SinkDataSourc import com.hortonworks.spark.atlas.types.{AtlasEntityUtils, external, internal} import com.hortonworks.spark.atlas.utils.SparkUtils.sparkSession import com.hortonworks.spark.atlas.utils.{Logging, SparkUtils} +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.streaming.SinkProgress @@ -245,6 +247,12 @@ object CommandsHarvester extends AtlasEntityUtils with Logging { tChildren.flatMap { case r: HiveTableRelation => Seq(tableToEntity(r.tableMeta)) case v: View => Seq(tableToEntity(v.desc)) + case LogicalRelation(fileRelation: FileRelation, _, catalogTable, _) + if fileRelation.getClass.getName.contains("org.apache.spark.sql.delta.DeltaLog") => + if (fileRelation.inputFiles.nonEmpty) { + val path = new Path(fileRelation.inputFiles.head).getParent.toString + Seq(external.pathToEntity(path)) + } else Seq.empty case LogicalRelation(fileRelation: FileRelation, _, catalogTable, _) => catalogTable.map(tbl => Seq(tableToEntity(tbl))).getOrElse( fileRelation.inputFiles.flatMap(file => Seq(external.pathToEntity(file))).toSeq) From 5be69ae622f38b1836cb48e41eb60c7fc412519e Mon Sep 17 00:00:00 2001 From: mahmeahmed Date: Tue, 30 Jun 2020 20:38:50 +0200 Subject: [PATCH 4/4] fix atlas gcp typedef --- .../com/hortonworks/spark/atlas/types/external.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala index 46b72e36..71b70cf3 100644 --- a/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala +++ b/spark-atlas-connector/src/main/scala/com/hortonworks/spark/atlas/types/external.scala @@ -41,6 +41,9 @@ object external { val S3_OBJECT_TYPE_STRING = "aws_s3_object" val S3_PSEUDO_DIR_TYPE_STRING = "aws_s3_pseudo_dir" val S3_BUCKET_TYPE_STRING = "aws_s3_bucket" + val GS_OBJECT_TYPE_STRING = "gcp_gs_object" + val GS_PSEUDO_DIR_TYPE_STRING = "gcp_gs_pseudo_dir" + val GS_BUCKET_TYPE_STRING = "gcp_gs_bucket" private def isS3Schema(schema: String): Boolean = schema.matches("s3[an]?") @@ -94,19 +97,19 @@ object external { val objectQualifiedName = dirQualifiedName + objectName // bucket - val bucketEntity = new AtlasEntity(S3_BUCKET_TYPE_STRING) + val bucketEntity = new AtlasEntity(GS_BUCKET_TYPE_STRING) bucketEntity.setAttribute("name", bucketName) bucketEntity.setAttribute("qualifiedName", bucketQualifiedName) // pseudo dir - val dirEntity = new AtlasEntity(S3_PSEUDO_DIR_TYPE_STRING) + val dirEntity = new AtlasEntity(GS_PSEUDO_DIR_TYPE_STRING) dirEntity.setAttribute("name", dirName) dirEntity.setAttribute("qualifiedName", dirQualifiedName) dirEntity.setAttribute("objectPrefix", dirQualifiedName) dirEntity.setAttribute("bucket", AtlasUtils.entityToReference(bucketEntity)) // object - val objectEntity = new AtlasEntity(S3_OBJECT_TYPE_STRING) + val objectEntity = new AtlasEntity(GS_OBJECT_TYPE_STRING) objectEntity.setAttribute("name", objectName) objectEntity.setAttribute("path", path) objectEntity.setAttribute("qualifiedName", objectQualifiedName)