diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index 90410332c3d7a..237ace7e51fc2 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -12,7 +12,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; +// import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -49,7 +49,8 @@ import scala.collection.JavaConversions; import scala.runtime.AbstractFunction1; import scala.runtime.AbstractPartialFunction; - +import java.lang.reflect.Method; +import org.json4s.JsonAST; @Slf4j public class DatahubSparkListener extends SparkListener { @@ -60,6 +61,11 @@ public class DatahubSparkListener extends SparkListener { public static final String PIPELINE_KEY = "metadata.pipeline"; public static final String PIPELINE_PLATFORM_INSTANCE_KEY = PIPELINE_KEY + ".platformInstance"; + public static final String MW_PIPELINE_NAME = "mars.pipelineName"; + public static final String MW_SPARK_IP_IS_IN_FLOW_URN = "mars.sparkIpInFlowURN"; + public static final String MW_USE_FLOW_HASH_URN = "mars.useJobHashInFlowURN"; + public static final String MW_JOB_NAME = "mars.jobName"; + public static final String COALESCE_KEY = "coalesce_jobs"; private final Map appDetails = new ConcurrentHashMap<>(); @@ -77,14 +83,40 @@ private class SqlStartTask { private final SparkContext ctx; private final LogicalPlan plan; + private String grepSqlStartJsonSparkCompatible(SparkListenerEvent sqlStart) { + String result = null; + try { + Class c = Class.forName(JsonProtocol.class.getName()); + Method sparkMethod = null; + // spark ge 3.4.x + sparkMethod = c.getDeclaredMethod("sparkEventToJsonString", + org.apache.spark.scheduler.SparkListenerEvent.class); + // spark lt 3.4.x + if (sparkMethod == null) { + sparkMethod = c.getDeclaredMethod("sparkEventToJson", org.apache.spark.scheduler.SparkListenerEvent.class); + } + + if (sparkMethod != null) { + result = JsonMethods$.MODULE$.compact((JsonAST.JValue) sparkMethod.invoke(null, sqlStart)); + } + + if (sparkMethod == null) { + throw new NullPointerException("The method 'sparkEventToJsonString' or 'sparkEventToJson' not found!"); + } + } catch (Exception e) { + log.error(e.toString()); + } + + return result; + } + public SqlStartTask(SparkListenerSQLExecutionStart sqlStart, LogicalPlan plan, SparkContext ctx) { this.sqlStart = sqlStart; this.plan = plan; this.ctx = ctx; String jsonPlan = (plan != null) ? plan.toJSON() : null; - String sqlStartJson = - (sqlStart != null) ? JsonMethods$.MODULE$.compact(JsonProtocol.sparkEventToJson(sqlStart)) : null; + String sqlStartJson = (sqlStart != null) ? grepSqlStartJsonSparkCompatible(sqlStart) : null; log.debug("SqlStartTask with parameters: sqlStart: {}, plan: {}, ctx: {}", sqlStartJson, jsonPlan, ctx); } @@ -106,8 +138,8 @@ public void run() { appSqlDetails.get(ctx.applicationId()) .put(sqlStart.executionId(), - new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(), - sqlStart.time(), sqlStart.executionId(), null)); + new SQLQueryExecStartEvent(getMaster(ctx), getPipelineName(ctx), ctx.applicationId(), + sqlStart.time(), sqlStart.executionId(), null, useHashUrn(ctx), jobName(ctx))); log.debug("PLAN for execution id: " + getPipelineName(ctx) + ":" + sqlStart.executionId() + "\n"); log.debug(plan.toString()); @@ -118,8 +150,8 @@ public void run() { return; } // Here assumption is that there will be only single target for single sql query - DatasetLineage lineage = - new DatasetLineage(sqlStart.description(), plan.toString(), outputDS.get().iterator().next()); + DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(), + outputDS.get().iterator().next()); Collection> allInners = new ArrayList<>(); plan.collect(new AbstractPartialFunction() { @@ -164,9 +196,8 @@ public boolean isDefinedAt(LogicalPlan x) { }); } - SQLQueryExecStartEvent evt = - new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(), - sqlStart.time(), sqlStart.executionId(), lineage); + SQLQueryExecStartEvent evt = new SQLQueryExecStartEvent(getMaster(ctx), getPipelineName(ctx), ctx.applicationId(), + sqlStart.time(), sqlStart.executionId(), lineage, useHashUrn(ctx), jobName(ctx)); appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(), evt); @@ -217,7 +248,7 @@ public Void apply(SparkContext sc) { if (start == null) { log.error("Application end event received, but start event missing for appId " + sc.applicationId()); } else { - AppEndEvent evt = new AppEndEvent(LineageUtils.getMaster(sc), getPipelineName(sc), sc.applicationId(), + AppEndEvent evt = new AppEndEvent(getMaster(sc), getPipelineName(sc), sc.applicationId(), applicationEnd.time(), start); McpEmitter emitter = appEmitters.get(sc.applicationId()); @@ -286,9 +317,9 @@ public Void apply(SparkContext sc) { "Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() + ":" + sqlEnd.executionId()); } else if (start.getDatasetLineage() != null) { - SQLQueryExecEndEvent evt = - new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(), sqlEnd.time(), - sqlEnd.executionId(), start); + SQLQueryExecEndEvent evt = new SQLQueryExecEndEvent(getMaster(sc), getPipelineName(sc), sc.applicationId(), + sqlEnd.time(), + sqlEnd.executionId(), start); McpEmitter emitter = appEmitters.get(sc.applicationId()); if (emitter != null) { emitter.accept(evt); @@ -300,7 +331,6 @@ public Void apply(SparkContext sc) { } private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) { - ExecutorService pool = null; String appId = ctx.applicationId(); Config datahubConfig = appConfig.get(appId); if (datahubConfig == null) { @@ -308,13 +338,14 @@ private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) { appConfig.put(appId, datahubConf); Config pipelineConfig = datahubConf.hasPath(PIPELINE_KEY) ? datahubConf.getConfig(PIPELINE_KEY) : com.typesafe.config.ConfigFactory.empty(); - AppStartEvent evt = - new AppStartEvent(LineageUtils.getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(), ctx.sparkUser(), - pipelineConfig); + AppStartEvent evt = new AppStartEvent(getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(), + ctx.sparkUser(), + pipelineConfig); appEmitters.computeIfAbsent(appId, s -> datahubConf.hasPath(COALESCE_KEY) && datahubConf.getBoolean(COALESCE_KEY) ? new CoalesceJobsEmitter( - datahubConf) : new McpEmitter(datahubConf)).accept(evt); + datahubConf) : new McpEmitter(datahubConf)) + .accept(evt); consumers().forEach(c -> c.accept(evt)); appDetails.put(appId, evt); appSqlDetails.put(appId, new ConcurrentHashMap<>()); @@ -323,19 +354,41 @@ private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) { private String getPipelineName(SparkContext cx) { Config datahubConfig = appConfig.computeIfAbsent(cx.applicationId(), s -> LineageUtils.parseSparkConfig()); - String name = ""; - if (datahubConfig.hasPath(DATABRICKS_CLUSTER_KEY)) { - name = datahubConfig.getString(DATABRICKS_CLUSTER_KEY) + "_" + cx.applicationId(); - } - name = cx.appName(); - // TODO: appending of platform instance needs to be done at central location - // like adding constructor to dataflowurl - if (datahubConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)) { - name = datahubConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY) + "." + name; + String name = cx.appName(); + + if (datahubConfig.hasPath(MW_PIPELINE_NAME)) { + name = datahubConfig.getString(MW_PIPELINE_NAME); + } else { + if (datahubConfig.hasPath(DATABRICKS_CLUSTER_KEY)) { + name = datahubConfig.getString(DATABRICKS_CLUSTER_KEY) + "_" + cx.applicationId(); + } + name = cx.appName(); + // TODO: appending of platform instance needs to be done at central location + // like adding constructor to dataflowurl + if (datahubConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)) { + name = datahubConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY) + "." + name; + } } return name; } + public String getMaster(SparkContext cx) { + Config datahubConfig = appConfig.computeIfAbsent(cx.applicationId(), s -> LineageUtils.parseSparkConfig()); + return datahubConfig.hasPath(MW_SPARK_IP_IS_IN_FLOW_URN) + && !(datahubConfig.getBoolean(MW_SPARK_IP_IS_IN_FLOW_URN)) ? "0.0.0.0" : LineageUtils.getMaster(cx); + } + + public boolean useHashUrn(SparkContext cx) { + Config datahubConfig = appConfig.computeIfAbsent(cx.applicationId(), s -> LineageUtils.parseSparkConfig()); + return datahubConfig.hasPath(MW_USE_FLOW_HASH_URN) && datahubConfig.getBoolean(MW_USE_FLOW_HASH_URN); + } + + public String jobName(SparkContext cx) { + Config datahubConfig = appConfig.computeIfAbsent(cx.applicationId(), s -> LineageUtils.parseSparkConfig()); + return datahubConfig.hasPath(MW_JOB_NAME) + && !(datahubConfig.getString(MW_JOB_NAME).equals("")) ? datahubConfig.getString(MW_JOB_NAME) : ""; + } + private void processExecution(SparkListenerSQLExecutionStart sqlStart) { QueryExecution queryExec = SQLExecution.getQueryExecution(sqlStart.executionId()); if (queryExec == null) { diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageUtils.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageUtils.java index ad837f034ad64..a8cb8f68d07bd 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageUtils.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageUtils.java @@ -25,6 +25,8 @@ import scala.Option; import scala.runtime.AbstractFunction0; import scala.runtime.AbstractFunction1; +import com.google.common.hash.Hashing; +import java.nio.charset.Charset; @Slf4j public class LineageUtils { @@ -93,22 +95,25 @@ public static DatasetUrn createDatasetUrn(String platform, String platformInstan /* This is for generating urn from a hash of the plan */ - /* - * public static String scrubPlan(String plan) { String s = - * plan.replaceAll("#[0-9]*", ""); s = - * s.replaceAll("JdbcRelationProvider@[0-9a-zA-Z]*,", "JdbcRelationProvider,"); - * s = s.replaceAll("InMemoryFileIndex@[0-9a-zA-Z]*,", "InMemoryFileIndex,"); s - * = s.replaceAll("Created Time:[^\n]+\n", ""); s = - * s.replaceAll("Last Access:[^\n]+\n", ""); s = s.replaceAll("Owner:[^\n]+\n", - * ""); s = s.replaceAll("Statistics:[^\n]+\n", ""); s = - * s.replaceAll("Table Properties:[^\n]+\n", ""); // - * System.out.println("CLEAN: " + s); return s; } - * - * public static void setPathReplacer(Function replacer) { - * PATH_REPLACER = replacer; } - * - * public static String hash(String s) { s = PATH_REPLACER.apply(s); - * log.debug("PATH REPLACED " + s); return Hashing.md5().hashString(s, - * Charset.forName("US-ASCII")).toString(); } - */ + public static String scrubPlan(String plan) { + String s = plan.replaceAll("#[0-9]*", ""); + s = s.replaceAll("JdbcRelationProvider@[0-9a-zA-Z]*,", "JdbcRelationProvider,"); + s = s.replaceAll("InMemoryFileIndex@[0-9a-zA-Z]*,", "InMemoryFileIndex,"); + s = s.replaceAll("Created Time:[^\n]+\n", ""); + s = s.replaceAll("Last Access:[^\n]+\n", ""); + s = s.replaceAll("Owner:[^\n]+\n", ""); + s = s.replaceAll("Statistics:[^\n]+\n", ""); + s = s.replaceAll("Table Properties:[^\n]+\n", ""); // + // System.out.println("CLEAN: " + s); + return s; + } + + // public static void setPathReplacer(Function replacer) { + // PATH_REPLACER = replacer; + // } + + public static String hash(String s) { + // String replaced = s.replaceAll("\\", "").replaceAll("/", ""); + return Hashing.md5().hashString(s, Charset.forName("US-ASCII")).toString(); + } } diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java index 0919f40c7e1c9..b5afc8cff53a2 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java @@ -16,62 +16,72 @@ import java.util.TreeSet; import lombok.Getter; import lombok.ToString; - +import java.util.stream.Collectors; +import com.google.common.base.Joiner; @ToString @Getter public class SQLQueryExecStartEvent extends LineageEvent { private final long sqlQueryExecId; private final DatasetLineage datasetLineage; + private final boolean useHashUrn; + private final String jobName; public SQLQueryExecStartEvent(String master, String appName, String appId, long time, long sqlQueryExecId, - DatasetLineage datasetLineage) { + DatasetLineage datasetLineage, boolean useHashUrn, String jobName) { super(master, appName, appId, time); this.sqlQueryExecId = sqlQueryExecId; this.datasetLineage = datasetLineage; + this.useHashUrn = useHashUrn; + this.jobName = jobName; + } @Override public List asMetadataEvents() { DataJobUrn jobUrn = jobUrn(); - MetadataChangeProposalWrapper mcpJobIO = - MetadataChangeProposalWrapper.create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobIO())); + MetadataChangeProposalWrapper mcpJobIO = MetadataChangeProposalWrapper + .create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobIO())); DataJobInfo jobInfo = jobInfo(); jobInfo.setCustomProperties(customProps()); jobInfo.setStatus(JobStatus.IN_PROGRESS); - MetadataChangeProposalWrapper mcpJobInfo = - MetadataChangeProposalWrapper.create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobInfo)); + MetadataChangeProposalWrapper mcpJobInfo = MetadataChangeProposalWrapper + .create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobInfo)); return Arrays.asList(mcpJobIO, mcpJobInfo); } - DataJobInfo jobInfo() { - return new DataJobInfo().setName(datasetLineage.getCallSiteShort()).setType(DataJobInfo.Type.create("sparkJob")); + public DataJobInfo jobInfo() { + return new DataJobInfo() + .setName( + this.jobName.equals("") ? datasetLineage.getCallSiteShort() + : this.jobName + "_" + Long.toString(sqlQueryExecId)) + .setType(DataJobInfo.Type.create("sparkJob")); } - DataJobUrn jobUrn() { + public DataJobUrn jobUrn() { /* This is for generating urn from a hash of the plan */ - /* - * Set sourceUrns = datasetLineage.getSources() .parallelStream() .map(x - * -> x.urn().toString()) .collect(Collectors.toSet()); sourceUrns = new - * TreeSet<>(sourceUrns); //sort for consistency - * - * String sinkUrn = datasetLineage.getSink().urn().toString(); String plan = - * LineageUtils.scrubPlan(datasetLineage.getPlan()); String id = - * Joiner.on(",").join(sinkUrn, sourceUrns, plan); - * - * return new DataJobUrn(flowUrn(), "planHash_" + LineageUtils.hash(id)); - */ - return new DataJobUrn(flowUrn(), "QueryExecId_" + sqlQueryExecId); + if (this.useHashUrn) { + Set sourceUrns = datasetLineage.getSources().parallelStream().map(x -> x.urn().toString()) + .collect(Collectors.toSet()); + sourceUrns = new TreeSet<>(sourceUrns); // sort for consistency + + String sinkUrn = datasetLineage.getSink().urn().toString(); + // String plan = LineageUtils.scrubPlan(datasetLineage.getPlan()); + String id = Joiner.on(",").join(sinkUrn, sourceUrns/*, plan*/); + return new DataJobUrn(flowUrn(), "planHash_" + LineageUtils.hash(id)); + } else { + return new DataJobUrn(flowUrn(), "QueryExecId_" + sqlQueryExecId); + } } - DataFlowUrn flowUrn() { + public DataFlowUrn flowUrn() { return LineageUtils.flowUrn(getMaster(), getAppName()); } - StringMap customProps() { + public StringMap customProps() { StringMap customProps = new StringMap(); customProps.put("startedAt", timeStr()); customProps.put("description", datasetLineage.getCallSiteShort());