Skip to content

Commit

Permalink
feat(ingest): detect source table for cards sourced from other cards
Browse files Browse the repository at this point in the history
Metabase question (datahub Card) may not query database table directly
but rather use another question as source. The change makes ingestion
to attempt finding the source database table from source question
in recursive manner.
  • Loading branch information
k-popov committed Aug 25, 2023
1 parent bf5499e commit 8d72dc6
Show file tree
Hide file tree
Showing 5 changed files with 531 additions and 14 deletions.
87 changes: 74 additions & 13 deletions metadata-ingestion/src/datahub/ingestion/source/metabase.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timezone
from functools import lru_cache
from typing import Dict, Iterable, List, Optional, Union
from typing import Dict, Iterable, List, Optional, Tuple, Union

import dateutil.parser as dp
import pydantic
Expand Down Expand Up @@ -43,6 +43,8 @@
)
from datahub.utilities import config_clean

DATASOURCE_URN_RECURSION_LIMIT = 5


class MetabaseConfig(DatasetLineageProviderConfigBase):
# See the Metabase /api/session endpoint for details
Expand Down Expand Up @@ -327,18 +329,43 @@ def emit_card_mces(self) -> Iterable[MetadataWorkUnit]:
)
return None

def construct_card_from_api_data(self, card_data: dict) -> Optional[ChartSnapshot]:
card_id = card_data.get("id", "")
def get_card_details_by_id(self, card_id: Union[int, str]) -> dict:
"""
Method will attempt to get detailed information on card
from Metabase API by card ID and return this info as dict.
If information can't be retrieved, an empty dict is returned
to unify return value of failed call with successful call of the method.
:param Union[int, str] card_id: ID of card (question) in Metabase
:param int datasource_id: Numeric datasource ID received from Metabase API
:return: dict with info or empty dict
"""
card_url = f"{self.config.connect_uri}/api/card/{card_id}"
try:
card_response = self.session.get(card_url)
card_response.raise_for_status()
card_details = card_response.json()
return card_response.json()
except HTTPError as http_error:
self.report.report_failure(
key=f"metabase-card-{card_id}",
reason=f"Unable to retrieve Card info. " f"Reason: {str(http_error)}",
)
return {}

def construct_card_from_api_data(self, card_data: dict) -> Optional[ChartSnapshot]:
card_id = card_data.get("id")
if card_id is None:
self.report.report_failure(
key="metabase-card",
reason=f"Unable to get Card id from card data {str(card_data)}",
)
return None

card_details = self.get_card_details_by_id(card_id)
if not card_details:
self.report.report_failure(
key=f"metabase-card-{card_id}",
reason="Unable to construct Card due to empty card details",
)
return None

chart_urn = builder.make_chart_urn(self.platform, card_id)
Expand All @@ -357,7 +384,7 @@ def construct_card_from_api_data(self, card_data: dict) -> Optional[ChartSnapsho
lastModified=AuditStamp(time=modified_ts, actor=modified_actor),
)

chart_type = self._get_chart_type(card_id, card_details.get("display"))
chart_type = self._get_chart_type(card_id, card_details.get("display") or "")
description = card_details.get("description") or ""
title = card_details.get("name") or ""
datasource_urn = self.get_datasource_urn(card_details)
Expand Down Expand Up @@ -448,13 +475,30 @@ def construct_card_custom_properties(self, card_details: dict) -> Dict:

return custom_properties

def get_datasource_urn(self, card_details: dict) -> Optional[List]:
def get_datasource_urn(
self, card_details: dict, recursion_depth: int = 0
) -> Optional[List]:
if recursion_depth > DATASOURCE_URN_RECURSION_LIMIT:
self.report.report_failure(
key=f"metabase-card-{card_details.get('id')}",
reason="Unable to retrieve Card info. Reason: source table recursion depth exceeded",
)
return None

datasource_id = card_details.get("database_id") or ""
(
platform,
database_name,
database_schema,
platform_instance,
) = self.get_datasource_from_id(card_details.get("database_id", ""))
) = self.get_datasource_from_id(datasource_id)
if not platform:
self.report.report_failure(
key=f"metabase-datasource-{datasource_id}",
reason=f"Unable to detect platform for database id {datasource_id}"
)
return None

query_type = card_details.get("dataset_query", {}).get("type", {})
source_tables = set()

Expand All @@ -463,8 +507,19 @@ def get_datasource_urn(self, card_details: dict) -> Optional[List]:
card_details.get("dataset_query", {})
.get("query", {})
.get("source-table")
or ""
)
if source_table_id is not None:
if str(source_table_id).startswith("card__"):
# question is built not directly from table in DB but from results of other question in Metabase
# trying to get source table from source question. Recursion depth is limited
return self.get_datasource_urn(
card_details=self.get_card_details_by_id(
source_table_id.replace("card__", "")
),
recursion_depth=recursion_depth + 1,
)
elif source_table_id != "":
# the question is built directly from table in DB
schema_name, table_name = self.get_source_table_from_id(source_table_id)
if table_name:
source_tables.add(
Expand Down Expand Up @@ -520,7 +575,9 @@ def get_datasource_urn(self, card_details: dict) -> Optional[List]:
return dataset_urn

@lru_cache(maxsize=None)
def get_source_table_from_id(self, table_id):
def get_source_table_from_id(
self, table_id: Union[int, str]
) -> Tuple[Optional[str], Optional[str]]:
try:
dataset_response = self.session.get(
f"{self.config.connect_uri}/api/table/{table_id}"
Expand All @@ -542,8 +599,8 @@ def get_source_table_from_id(self, table_id):

@lru_cache(maxsize=None)
def get_platform_instance(
self, platform: Union[str, None] = None, datasource_id: Union[int, None] = None
) -> Union[str, None]:
self, platform: Optional[str] = None, datasource_id: Optional[int] = None
) -> Optional[str]:
"""
Method will attempt to detect `platform_instance` by checking
`database_id_to_instance_map` and `platform_instance_map` mappings.
Expand Down Expand Up @@ -571,7 +628,9 @@ def get_platform_instance(
return platform_instance

@lru_cache(maxsize=None)
def get_datasource_from_id(self, datasource_id):
def get_datasource_from_id(
self, datasource_id: Union[int, str]
) -> Tuple[str, Optional[str], Optional[str], Optional[str]]:
try:
dataset_response = self.session.get(
f"{self.config.connect_uri}/api/database/{datasource_id}"
Expand All @@ -583,7 +642,9 @@ def get_datasource_from_id(self, datasource_id):
key=f"metabase-datasource-{datasource_id}",
reason=f"Unable to retrieve Datasource. " f"Reason: {str(http_error)}",
)
return None, None
# returning empty string as `platform` because
# `make_dataset_urn_with_platform_instance()` only accepts `str`
return "", None, None, None

# Map engine names to what datahub expects in
# https://github.com/datahub-project/datahub/blob/master/metadata-service/war/src/main/resources/boot/data_platforms.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,61 @@
"runId": "metabase-test"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": {
"urn": "urn:li:chart:(metabase,3)",
"aspects": [
{
"com.linkedin.pegasus2avro.chart.ChartInfo": {
"customProperties": {
"Metrics": "Distinct values of order_number, Sum of nominal_total",
"Filters": "['time-interval', ['field', 'completed_at', {'base-type': 'type/DateTimeWithTZ'}], -8, 'day', {'include-current': False}]",
"Dimensions": "completed_at"
},
"title": "Question with data from other question",
"description": "",
"lastModified": {
"created": {
"time": 1685628119636,
"actor": "urn:li:corpuser:[email protected]"
},
"lastModified": {
"time": 1685628119636,
"actor": "urn:li:corpuser:[email protected]"
}
},
"chartUrl": "http://localhost:3000/card/3",
"inputs": [
{
"string": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-data.public.payment,PROD)"
}
],
"type": "TABLE"
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:[email protected]",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1636614000000,
"runId": "metabase-test"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": {
Expand Down Expand Up @@ -195,6 +250,21 @@
"runId": "metabase-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(metabase,3)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1636614000000,
"runId": "metabase-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(metabase,1)",
Expand Down
Loading

0 comments on commit 8d72dc6

Please sign in to comment.