Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix datahub spark listener works wit new versions of spark #8972

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
// import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -49,7 +49,8 @@
import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractPartialFunction;

import java.lang.reflect.Method;
import org.json4s.JsonAST;

@Slf4j
public class DatahubSparkListener extends SparkListener {
Expand All @@ -60,6 +61,11 @@ public class DatahubSparkListener extends SparkListener {
public static final String PIPELINE_KEY = "metadata.pipeline";
public static final String PIPELINE_PLATFORM_INSTANCE_KEY = PIPELINE_KEY + ".platformInstance";

public static final String MW_PIPELINE_NAME = "mars.pipelineName";
public static final String MW_SPARK_IP_IS_IN_FLOW_URN = "mars.sparkIpInFlowURN";
public static final String MW_USE_FLOW_HASH_URN = "mars.useJobHashInFlowURN";
public static final String MW_JOB_NAME = "mars.jobName";

public static final String COALESCE_KEY = "coalesce_jobs";

private final Map<String, AppStartEvent> appDetails = new ConcurrentHashMap<>();
Expand All @@ -77,14 +83,40 @@ private class SqlStartTask {
private final SparkContext ctx;
private final LogicalPlan plan;

private String grepSqlStartJsonSparkCompatible(SparkListenerEvent sqlStart) {
String result = null;
try {
Class<?> c = Class.forName(JsonProtocol.class.getName());
Method sparkMethod = null;
// spark ge 3.4.x
sparkMethod = c.getDeclaredMethod("sparkEventToJsonString",
org.apache.spark.scheduler.SparkListenerEvent.class);
// spark lt 3.4.x
if (sparkMethod == null) {
sparkMethod = c.getDeclaredMethod("sparkEventToJson", org.apache.spark.scheduler.SparkListenerEvent.class);
}

if (sparkMethod != null) {
result = JsonMethods$.MODULE$.compact((JsonAST.JValue) sparkMethod.invoke(null, sqlStart));
}

if (sparkMethod == null) {
throw new NullPointerException("The method 'sparkEventToJsonString' or 'sparkEventToJson' not found!");
}
} catch (Exception e) {
log.error(e.toString());
}

return result;
}

public SqlStartTask(SparkListenerSQLExecutionStart sqlStart, LogicalPlan plan, SparkContext ctx) {
this.sqlStart = sqlStart;
this.plan = plan;
this.ctx = ctx;

String jsonPlan = (plan != null) ? plan.toJSON() : null;
String sqlStartJson =
(sqlStart != null) ? JsonMethods$.MODULE$.compact(JsonProtocol.sparkEventToJson(sqlStart)) : null;
String sqlStartJson = (sqlStart != null) ? grepSqlStartJsonSparkCompatible(sqlStart) : null;
log.debug("SqlStartTask with parameters: sqlStart: {}, plan: {}, ctx: {}", sqlStartJson, jsonPlan, ctx);
}

Expand All @@ -106,8 +138,8 @@ public void run() {

appSqlDetails.get(ctx.applicationId())
.put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null));
new SQLQueryExecStartEvent(getMaster(ctx), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null, useHashUrn(ctx), jobName(ctx)));
log.debug("PLAN for execution id: " + getPipelineName(ctx) + ":" + sqlStart.executionId() + "\n");
log.debug(plan.toString());

Expand All @@ -118,8 +150,8 @@ public void run() {
return;
}
// Here assumption is that there will be only single target for single sql query
DatasetLineage lineage =
new DatasetLineage(sqlStart.description(), plan.toString(), outputDS.get().iterator().next());
DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(),
outputDS.get().iterator().next());
Collection<QueryPlan<?>> allInners = new ArrayList<>();

plan.collect(new AbstractPartialFunction<LogicalPlan, Void>() {
Expand Down Expand Up @@ -164,9 +196,8 @@ public boolean isDefinedAt(LogicalPlan x) {
});
}

SQLQueryExecStartEvent evt =
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), lineage);
SQLQueryExecStartEvent evt = new SQLQueryExecStartEvent(getMaster(ctx), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), lineage, useHashUrn(ctx), jobName(ctx));

appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(), evt);

Expand Down Expand Up @@ -217,7 +248,7 @@ public Void apply(SparkContext sc) {
if (start == null) {
log.error("Application end event received, but start event missing for appId " + sc.applicationId());
} else {
AppEndEvent evt = new AppEndEvent(LineageUtils.getMaster(sc), getPipelineName(sc), sc.applicationId(),
AppEndEvent evt = new AppEndEvent(getMaster(sc), getPipelineName(sc), sc.applicationId(),
applicationEnd.time(), start);

McpEmitter emitter = appEmitters.get(sc.applicationId());
Expand Down Expand Up @@ -286,9 +317,9 @@ public Void apply(SparkContext sc) {
"Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() + ":"
+ sqlEnd.executionId());
} else if (start.getDatasetLineage() != null) {
SQLQueryExecEndEvent evt =
new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(), sqlEnd.time(),
sqlEnd.executionId(), start);
SQLQueryExecEndEvent evt = new SQLQueryExecEndEvent(getMaster(sc), getPipelineName(sc), sc.applicationId(),
sqlEnd.time(),
sqlEnd.executionId(), start);
McpEmitter emitter = appEmitters.get(sc.applicationId());
if (emitter != null) {
emitter.accept(evt);
Expand All @@ -300,21 +331,21 @@ public Void apply(SparkContext sc) {
}

private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) {
ExecutorService pool = null;
String appId = ctx.applicationId();
Config datahubConfig = appConfig.get(appId);
if (datahubConfig == null) {
Config datahubConf = LineageUtils.parseSparkConfig();
appConfig.put(appId, datahubConf);
Config pipelineConfig = datahubConf.hasPath(PIPELINE_KEY) ? datahubConf.getConfig(PIPELINE_KEY)
: com.typesafe.config.ConfigFactory.empty();
AppStartEvent evt =
new AppStartEvent(LineageUtils.getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(), ctx.sparkUser(),
pipelineConfig);
AppStartEvent evt = new AppStartEvent(getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(),
ctx.sparkUser(),
pipelineConfig);

appEmitters.computeIfAbsent(appId,
s -> datahubConf.hasPath(COALESCE_KEY) && datahubConf.getBoolean(COALESCE_KEY) ? new CoalesceJobsEmitter(
datahubConf) : new McpEmitter(datahubConf)).accept(evt);
datahubConf) : new McpEmitter(datahubConf))
.accept(evt);
consumers().forEach(c -> c.accept(evt));
appDetails.put(appId, evt);
appSqlDetails.put(appId, new ConcurrentHashMap<>());
Expand All @@ -323,19 +354,41 @@ private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) {

private String getPipelineName(SparkContext cx) {
Config datahubConfig = appConfig.computeIfAbsent(cx.applicationId(), s -> LineageUtils.parseSparkConfig());
String name = "";
if (datahubConfig.hasPath(DATABRICKS_CLUSTER_KEY)) {
name = datahubConfig.getString(DATABRICKS_CLUSTER_KEY) + "_" + cx.applicationId();
}
name = cx.appName();
// TODO: appending of platform instance needs to be done at central location
// like adding constructor to dataflowurl
if (datahubConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)) {
name = datahubConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY) + "." + name;
String name = cx.appName();

if (datahubConfig.hasPath(MW_PIPELINE_NAME)) {
name = datahubConfig.getString(MW_PIPELINE_NAME);
} else {
if (datahubConfig.hasPath(DATABRICKS_CLUSTER_KEY)) {
name = datahubConfig.getString(DATABRICKS_CLUSTER_KEY) + "_" + cx.applicationId();
}
name = cx.appName();
// TODO: appending of platform instance needs to be done at central location
// like adding constructor to dataflowurl
if (datahubConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)) {
name = datahubConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY) + "." + name;
}
}
return name;
}

public String getMaster(SparkContext cx) {
Config datahubConfig = appConfig.computeIfAbsent(cx.applicationId(), s -> LineageUtils.parseSparkConfig());
return datahubConfig.hasPath(MW_SPARK_IP_IS_IN_FLOW_URN)
&& !(datahubConfig.getBoolean(MW_SPARK_IP_IS_IN_FLOW_URN)) ? "0.0.0.0" : LineageUtils.getMaster(cx);
}

public boolean useHashUrn(SparkContext cx) {
Config datahubConfig = appConfig.computeIfAbsent(cx.applicationId(), s -> LineageUtils.parseSparkConfig());
return datahubConfig.hasPath(MW_USE_FLOW_HASH_URN) && datahubConfig.getBoolean(MW_USE_FLOW_HASH_URN);
}

public String jobName(SparkContext cx) {
Config datahubConfig = appConfig.computeIfAbsent(cx.applicationId(), s -> LineageUtils.parseSparkConfig());
return datahubConfig.hasPath(MW_JOB_NAME)
&& !(datahubConfig.getString(MW_JOB_NAME).equals("")) ? datahubConfig.getString(MW_JOB_NAME) : "";
}

private void processExecution(SparkListenerSQLExecutionStart sqlStart) {
QueryExecution queryExec = SQLExecution.getQueryExecution(sqlStart.executionId());
if (queryExec == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import scala.Option;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import com.google.common.hash.Hashing;
import java.nio.charset.Charset;

@Slf4j
public class LineageUtils {
Expand Down Expand Up @@ -93,22 +95,25 @@ public static DatasetUrn createDatasetUrn(String platform, String platformInstan

/* This is for generating urn from a hash of the plan */

/*
* public static String scrubPlan(String plan) { String s =
* plan.replaceAll("#[0-9]*", ""); s =
* s.replaceAll("JdbcRelationProvider@[0-9a-zA-Z]*,", "JdbcRelationProvider,");
* s = s.replaceAll("InMemoryFileIndex@[0-9a-zA-Z]*,", "InMemoryFileIndex,"); s
* = s.replaceAll("Created Time:[^\n]+\n", ""); s =
* s.replaceAll("Last Access:[^\n]+\n", ""); s = s.replaceAll("Owner:[^\n]+\n",
* ""); s = s.replaceAll("Statistics:[^\n]+\n", ""); s =
* s.replaceAll("Table Properties:[^\n]+\n", ""); //
* System.out.println("CLEAN: " + s); return s; }
*
* public static void setPathReplacer(Function<String, String> replacer) {
* PATH_REPLACER = replacer; }
*
* public static String hash(String s) { s = PATH_REPLACER.apply(s);
* log.debug("PATH REPLACED " + s); return Hashing.md5().hashString(s,
* Charset.forName("US-ASCII")).toString(); }
*/
public static String scrubPlan(String plan) {
String s = plan.replaceAll("#[0-9]*", "");
s = s.replaceAll("JdbcRelationProvider@[0-9a-zA-Z]*,", "JdbcRelationProvider,");
s = s.replaceAll("InMemoryFileIndex@[0-9a-zA-Z]*,", "InMemoryFileIndex,");
s = s.replaceAll("Created Time:[^\n]+\n", "");
s = s.replaceAll("Last Access:[^\n]+\n", "");
s = s.replaceAll("Owner:[^\n]+\n", "");
s = s.replaceAll("Statistics:[^\n]+\n", "");
s = s.replaceAll("Table Properties:[^\n]+\n", ""); //
// System.out.println("CLEAN: " + s);
return s;
}

// public static void setPathReplacer(Function<String, String> replacer) {
// PATH_REPLACER = replacer;
// }

public static String hash(String s) {
// String replaced = s.replaceAll("\\", "").replaceAll("/", "");
return Hashing.md5().hashString(s, Charset.forName("US-ASCII")).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,62 +16,72 @@
import java.util.TreeSet;
import lombok.Getter;
import lombok.ToString;

import java.util.stream.Collectors;
import com.google.common.base.Joiner;

@ToString
@Getter
public class SQLQueryExecStartEvent extends LineageEvent {
private final long sqlQueryExecId;
private final DatasetLineage datasetLineage;
private final boolean useHashUrn;
private final String jobName;

public SQLQueryExecStartEvent(String master, String appName, String appId, long time, long sqlQueryExecId,
DatasetLineage datasetLineage) {
DatasetLineage datasetLineage, boolean useHashUrn, String jobName) {
super(master, appName, appId, time);
this.sqlQueryExecId = sqlQueryExecId;
this.datasetLineage = datasetLineage;
this.useHashUrn = useHashUrn;
this.jobName = jobName;

}

@Override
public List<MetadataChangeProposalWrapper> asMetadataEvents() {
DataJobUrn jobUrn = jobUrn();
MetadataChangeProposalWrapper mcpJobIO =
MetadataChangeProposalWrapper.create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobIO()));
MetadataChangeProposalWrapper mcpJobIO = MetadataChangeProposalWrapper
.create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobIO()));

DataJobInfo jobInfo = jobInfo();
jobInfo.setCustomProperties(customProps());
jobInfo.setStatus(JobStatus.IN_PROGRESS);

MetadataChangeProposalWrapper mcpJobInfo =
MetadataChangeProposalWrapper.create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobInfo));
MetadataChangeProposalWrapper mcpJobInfo = MetadataChangeProposalWrapper
.create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobInfo));

return Arrays.asList(mcpJobIO, mcpJobInfo);
}

DataJobInfo jobInfo() {
return new DataJobInfo().setName(datasetLineage.getCallSiteShort()).setType(DataJobInfo.Type.create("sparkJob"));
public DataJobInfo jobInfo() {
return new DataJobInfo()
.setName(
this.jobName.equals("") ? datasetLineage.getCallSiteShort()
: this.jobName + "_" + Long.toString(sqlQueryExecId))
.setType(DataJobInfo.Type.create("sparkJob"));
}

DataJobUrn jobUrn() {
public DataJobUrn jobUrn() {
/* This is for generating urn from a hash of the plan */
/*
* Set<String> sourceUrns = datasetLineage.getSources() .parallelStream() .map(x
* -> x.urn().toString()) .collect(Collectors.toSet()); sourceUrns = new
* TreeSet<>(sourceUrns); //sort for consistency
*
* String sinkUrn = datasetLineage.getSink().urn().toString(); String plan =
* LineageUtils.scrubPlan(datasetLineage.getPlan()); String id =
* Joiner.on(",").join(sinkUrn, sourceUrns, plan);
*
* return new DataJobUrn(flowUrn(), "planHash_" + LineageUtils.hash(id));
*/
return new DataJobUrn(flowUrn(), "QueryExecId_" + sqlQueryExecId);
if (this.useHashUrn) {
Set<String> sourceUrns = datasetLineage.getSources().parallelStream().map(x -> x.urn().toString())
.collect(Collectors.toSet());
sourceUrns = new TreeSet<>(sourceUrns); // sort for consistency

String sinkUrn = datasetLineage.getSink().urn().toString();
// String plan = LineageUtils.scrubPlan(datasetLineage.getPlan());
String id = Joiner.on(",").join(sinkUrn, sourceUrns/*, plan*/);
return new DataJobUrn(flowUrn(), "planHash_" + LineageUtils.hash(id));
} else {
return new DataJobUrn(flowUrn(), "QueryExecId_" + sqlQueryExecId);
}
}

DataFlowUrn flowUrn() {
public DataFlowUrn flowUrn() {
return LineageUtils.flowUrn(getMaster(), getAppName());
}

StringMap customProps() {
public StringMap customProps() {
StringMap customProps = new StringMap();
customProps.put("startedAt", timeStr());
customProps.put("description", datasetLineage.getCallSiteShort());
Expand Down
Loading