diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/exception/CatalogExceptionMessage.java b/openmetadata-service/src/main/java/org/openmetadata/service/exception/CatalogExceptionMessage.java index 2219c0e243cc..582302d3f0af 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/exception/CatalogExceptionMessage.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/exception/CatalogExceptionMessage.java @@ -101,6 +101,7 @@ public final class CatalogExceptionMessage { public static final String INVALID_BOT_USER = "Revoke Token can only be applied to Bot Users."; public static final String NO_MANUAL_TRIGGER_ERR = "App does not support manual trigger."; public static final String INVALID_APP_TYPE = "Application Type is not valid."; + public static final String CSV_EXPORT_FAILED = "CSV Export Failed."; private CatalogExceptionMessage() {} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index abad6c7a7d69..65d43bf76660 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -43,6 +43,7 @@ import java.util.UUID; import javax.json.JsonPatch; import javax.ws.rs.core.Response; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -258,19 +259,28 @@ public final String exportCsv( return CsvUtil.formatCsv(csvFile); } + @Getter + private static class ColumnMapping { + String fromChildFQN; + String toChildFQN; + + ColumnMapping(String from, String to) { + this.fromChildFQN = from; + this.toChildFQN = to; + } + } + public final String exportCsvAsync( String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, String entityType, - boolean deleted) - throws IOException { - Response response = - Entity.getSearchRepository() - .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); - + boolean deleted) { try { + Response response = + Entity.getSearchRepository() + .searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); String jsonResponse = JsonUtils.pojoToJson(response.getEntity()); JsonNode rootNode = JsonUtils.readTree(jsonResponse); @@ -284,11 +294,28 @@ public final String exportCsvAsync( StringWriter csvContent = new StringWriter(); CSVWriter csvWriter = new CSVWriter(csvContent); String[] headers = { - "fromEntityFQN", "fromServiceName", "fromServiceType", "fromOwners", "fromDomain", - "toEntityFQN", "toServiceName", "toServiceType", "toOwners", "toDomain", - "fromChildEntityFQN", "toChildEntityFQN" + "fromEntityFQN", + "fromServiceName", + "fromServiceType", + "fromOwners", + "fromDomain", + "toEntityFQN", + "toServiceName", + "toServiceType", + "toOwners", + "toDomain", + "fromChildEntityFQN", + "toChildEntityFQN", + "pipelineName", + "pipelineType", + "pipelineDescription", + "pipelineOwners", + "pipelineDomain", + "pipelineServiceName", + "pipelineServiceType" }; csvWriter.writeNext(headers); + JsonNode edges = rootNode.path("edges"); for (JsonNode edge : edges) { String fromEntityId = edge.path("fromEntity").path("id").asText(); @@ -302,50 +329,59 @@ public final String exportCsvAsync( baseRow.put("fromServiceName", getText(fromEntity.path("service"), "name")); baseRow.put("fromServiceType", getText(fromEntity, "serviceType")); baseRow.put("fromOwners", getOwners(fromEntity.path("owners"))); - baseRow.put("fromDomain", getText(fromEntity, "domain")); + baseRow.put("fromDomain", getDomainFQN(fromEntity.path("domain"))); baseRow.put("toEntityFQN", getText(toEntity, "fullyQualifiedName")); baseRow.put("toServiceName", getText(toEntity.path("service"), "name")); baseRow.put("toServiceType", getText(toEntity, "serviceType")); baseRow.put("toOwners", getOwners(toEntity.path("owners"))); - baseRow.put("toDomain", getText(toEntity, "domain")); - - List fromChildFQNs = new ArrayList<>(); - List toChildFQNs = new ArrayList<>(); - - extractChildEntities(fromEntity, fromChildFQNs); - extractChildEntities(toEntity, toChildFQNs); + baseRow.put("toDomain", getDomainFQN(toEntity.path("domain"))); JsonNode columns = edge.path("columns"); if (columns.isArray() && !columns.isEmpty()) { - for (JsonNode columnMapping : columns) { - JsonNode fromColumns = columnMapping.path("fromColumns"); - String toColumn = columnMapping.path("toColumn").asText(); - - for (JsonNode fromColumn : fromColumns) { - String fromChildFQN = fromColumn.asText(); - String toChildFQN = toColumn; - writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN); - } + List explicitColumnMappings = extractColumnMappingsFromEdge(columns); + for (ColumnMapping mapping : explicitColumnMappings) { + writeCsvRow( + csvWriter, + baseRow, + mapping.getFromChildFQN(), + mapping.getToChildFQN(), + "", + "", + "", + "", + "", + "", + ""); + LOG.debug( + "Exported explicit ColumnMapping: from='{}', to='{}'", + mapping.getFromChildFQN(), + mapping.getToChildFQN()); } - } else if (!fromChildFQNs.isEmpty() || !toChildFQNs.isEmpty()) { - if (!fromChildFQNs.isEmpty() && !toChildFQNs.isEmpty()) { - for (String fromChildFQN : fromChildFQNs) { - for (String toChildFQN : toChildFQNs) { - writeCsvRow(csvWriter, baseRow, fromChildFQN, toChildFQN); - } - } - } else if (!fromChildFQNs.isEmpty()) { - for (String fromChildFQN : fromChildFQNs) { - writeCsvRow(csvWriter, baseRow, fromChildFQN, ""); - } - } else { - for (String toChildFQN : toChildFQNs) { - writeCsvRow(csvWriter, baseRow, "", toChildFQN); - } - } - } else { - writeCsvRow(csvWriter, baseRow, "", ""); + } + + JsonNode pipeline = edge.path("pipeline"); + if (!pipeline.isMissingNode() && !pipeline.isNull()) { + String pipelineName = getText(pipeline, "name"); + String pipelineType = getText(pipeline, "serviceType"); + String pipelineDescription = getText(pipeline, "description"); + String pipelineOwners = getOwners(pipeline.path("owners")); + String pipelineServiceName = getText(pipeline.path("service"), "name"); + String pipelineServiceType = getText(pipeline, "serviceType"); + String pipelineDomain = getDomainFQN(pipeline.path("domain")); + writeCsvRow( + csvWriter, + baseRow, + "", + "", + pipelineName, + pipelineType, + pipelineDescription, + pipelineOwners, + pipelineDomain, + pipelineServiceName, + pipelineServiceType); + LOG.debug("Exported Pipeline Information: {}", pipelineName); } } csvWriter.close(); @@ -356,20 +392,37 @@ public final String exportCsvAsync( } private static void writeCsvRow( - CSVWriter csvWriter, Map baseRow, String fromChildFQN, String toChildFQN) { + CSVWriter csvWriter, + Map baseRow, + String fromChildFQN, + String toChildFQN, + String pipelineName, + String pipelineType, + String pipelineDescription, + String pipelineOwners, + String pipelineDomain, + String pipelineServiceName, + String pipelineServiceType) { String[] row = { - baseRow.get("fromEntityFQN"), - baseRow.get("fromServiceName"), - baseRow.get("fromServiceType"), - baseRow.get("fromOwners"), - baseRow.get("fromDomain"), - baseRow.get("toEntityFQN"), - baseRow.get("toServiceName"), - baseRow.get("toServiceType"), - baseRow.get("toOwners"), - baseRow.get("toDomain"), + baseRow.getOrDefault("fromEntityFQN", ""), + baseRow.getOrDefault("fromServiceName", ""), + baseRow.getOrDefault("fromServiceType", ""), + baseRow.getOrDefault("fromOwners", ""), + baseRow.getOrDefault("fromDomain", ""), + baseRow.getOrDefault("toEntityFQN", ""), + baseRow.getOrDefault("toServiceName", ""), + baseRow.getOrDefault("toServiceType", ""), + baseRow.getOrDefault("toOwners", ""), + baseRow.getOrDefault("toDomain", ""), fromChildFQN, - toChildFQN + toChildFQN, + pipelineName, + pipelineType, + pipelineDescription, + pipelineOwners, + pipelineDomain, + pipelineServiceName, + pipelineServiceType }; csvWriter.writeNext(row); } @@ -386,7 +439,7 @@ private static String getOwners(JsonNode ownersNode) { if (ownersNode != null && ownersNode.isArray()) { List ownersList = new ArrayList<>(); for (JsonNode owner : ownersNode) { - String ownerName = getText(owner, "name"); + String ownerName = getText(owner, "displayName"); if (!ownerName.isEmpty()) { ownersList.add(ownerName); } @@ -396,91 +449,32 @@ private static String getOwners(JsonNode ownersNode) { return ""; } - private static void extractChildEntities(JsonNode entityNode, List childFQNs) { - if (entityNode == null) { - return; - } - String entityType = getText(entityNode, "entityType"); - switch (entityType) { - case TABLE: - extractColumns(entityNode.path("columns"), childFQNs); - break; - case DASHBOARD: - extractCharts(entityNode.path("charts"), childFQNs); - break; - case SEARCH_INDEX: - extractFields(entityNode.path("fields"), childFQNs); - break; - case CONTAINER: - extractContainers(entityNode.path("children"), childFQNs); - extractColumns(entityNode.path("dataModel").path("columns"), childFQNs); - break; - case TOPIC: - extractSchemaFields(entityNode.path("messageSchema").path("schemaFields"), childFQNs); - break; - case DASHBOARD_DATA_MODEL: - extractColumns(entityNode.path("columns"), childFQNs); - break; - default: - break; + private static String getDomainFQN(JsonNode domainNode) { + if (domainNode != null && domainNode.has("fullyQualifiedName")) { + JsonNode fqnNode = domainNode.get("fullyQualifiedName"); + return fqnNode.isNull() ? "" : fqnNode.asText(); } + return ""; } - private static void extractColumns(JsonNode columnsNode, List childFQNs) { + private static List extractColumnMappingsFromEdge(JsonNode columnsNode) { + List mappings = new ArrayList<>(); if (columnsNode != null && columnsNode.isArray()) { - for (JsonNode column : columnsNode) { - if (column != null) { - String columnFQN = getText(column, "fullyQualifiedName"); - childFQNs.add(columnFQN); - extractColumns(column.path("children"), childFQNs); - } - } - } - } - - private static void extractCharts(JsonNode chartsNode, List childFQNs) { - if (chartsNode != null && chartsNode.isArray()) { - for (JsonNode chart : chartsNode) { - String chartFQN = getText(chart, "fullyQualifiedName"); - childFQNs.add(chartFQN); - } - } - } - - private static void extractFields(JsonNode fieldsNode, List childFQNs) { - if (fieldsNode != null && fieldsNode.isArray()) { - for (JsonNode field : fieldsNode) { - if (field != null) { - String fieldFQN = getText(field, "fullyQualifiedName"); - childFQNs.add(fieldFQN); - extractFields(field.path("children"), childFQNs); - } - } - } - } - - private static void extractContainers(JsonNode containersNode, List childFQNs) { - if (containersNode != null && containersNode.isArray()) { - for (JsonNode container : containersNode) { - if (container != null) { - String containerFQN = getText(container, "fullyQualifiedName"); - childFQNs.add(containerFQN); - extractContainers(container.path("children"), childFQNs); - } - } - } - } - - private static void extractSchemaFields(JsonNode schemaFieldsNode, List childFQNs) { - if (schemaFieldsNode != null && schemaFieldsNode.isArray()) { - for (JsonNode field : schemaFieldsNode) { - if (field != null) { - String fieldFQN = getText(field, "fullyQualifiedName"); - childFQNs.add(fieldFQN); - extractSchemaFields(field.path("children"), childFQNs); + for (JsonNode columnMapping : columnsNode) { + JsonNode fromColumns = columnMapping.path("fromColumns"); + String toColumn = columnMapping.path("toColumn").asText().trim(); + + if (fromColumns.isArray() && !toColumn.isEmpty()) { + for (JsonNode fromColumn : fromColumns) { + String fromChildFQN = fromColumn.asText().trim(); + if (!fromChildFQN.isEmpty()) { + mappings.add(new ColumnMapping(fromChildFQN, toColumn)); + } + } } } } + return mappings; } private String getStringOrNull(HashMap map, String key) {