Skip to content

Commit

Permalink
Merge branch 'master' into pluginv2
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalkSimplify authored Jul 24, 2023
2 parents db8d7bd + c0dbea8 commit 1e1d116
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 44 deletions.
75 changes: 38 additions & 37 deletions metadata-ingestion/src/datahub/ingestion/source/elastic_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def __init__(self, config: ElasticsearchSourceConfig, ctx: PipelineContext):
self.report = ElasticsearchSourceReport()
self.data_stream_partition_count: Dict[str, int] = defaultdict(int)
self.platform: str = "elasticsearch"
self.profiling_info: Dict[str, DatasetProfileClass] = {}
self.cat_response: Optional[List[Dict[str, Any]]] = None

@classmethod
def create(
Expand All @@ -357,7 +357,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
indices = self.client.indices.get_alias()

for index in indices:
self.report.report_index_scanned(index)

Expand All @@ -366,12 +365,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield mcp.as_workunit()
else:
self.report.report_dropped(index)
for urn, profiling_info in self.profiling_info.items():
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=profiling_info,
).as_workunit()
self.profiling_info = {}

for mcp in self._get_data_stream_index_count_mcps():
yield mcp.as_workunit()
Expand Down Expand Up @@ -523,36 +516,44 @@ def _extract_mcps(
)

if self.source_config.profiling.enabled:
cat_response = self.client.cat.indices(
index=index, params={"format": "json", "bytes": "b"}
if self.cat_response is None:
self.cat_response = self.client.cat.indices(
params={
"format": "json",
"bytes": "b",
"h": "index,docs.count,store.size",
}
)
if self.cat_response is None:
return
for item in self.cat_response:
item["index"] = collapse_name(
name=item["index"],
collapse_urns=self.source_config.collapse_urns,
)

profile_info_current = list(
filter(lambda x: x["index"] == collapsed_index_name, self.cat_response)
)
if len(cat_response) == 1:
index_res = cat_response[0]
docs_count = int(index_res["docs.count"])
size = int(index_res["store.size"])
if len(self.source_config.collapse_urns.urns_suffix_regex) > 0:
if dataset_urn not in self.profiling_info:
self.profiling_info[dataset_urn] = DatasetProfileClass(
timestampMillis=int(time.time() * 1000),
rowCount=docs_count,
columnCount=len(schema_fields),
sizeInBytes=size,
)
else:
existing_profile = self.profiling_info[dataset_urn]
if existing_profile.rowCount is not None:
docs_count = docs_count + existing_profile.rowCount
if existing_profile.sizeInBytes is not None:
size = size + existing_profile.sizeInBytes
self.profiling_info[dataset_urn] = DatasetProfileClass(
timestampMillis=int(time.time() * 1000),
rowCount=docs_count,
columnCount=len(schema_fields),
sizeInBytes=size,
)
else:
logger.warning(
"Unexpected response from cat response with multiple rows"
if len(profile_info_current) > 0:
self.cat_response = list(
filter(
lambda x: x["index"] != collapsed_index_name, self.cat_response
)
)
row_count = 0
size_in_bytes = 0
for profile_info in profile_info_current:
row_count += int(profile_info["docs.count"])
size_in_bytes += int(profile_info["store.size"])
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=DatasetProfileClass(
timestampMillis=int(time.time() * 1000),
rowCount=row_count,
columnCount=len(schema_fields),
sizeInBytes=size_in_bytes,
),
)

def get_report(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,18 +360,22 @@ def _get_column_cardinality(
@_run_with_query_combiner
def _get_dataset_rows(self, dataset_profile: DatasetProfileClass) -> None:
if self.config.profile_table_row_count_estimate_only:
schema_name = self.dataset_name.split(".")[1]
table_name = self.dataset_name.split(".")[2]
logger.debug(
f"Getting estimated rowcounts for table:{self.dataset_name}, schema:{schema_name}, table:{table_name}"
)

dialect_name = self.dataset.engine.dialect.name.lower()
if dialect_name == "postgresql":
schema_name = self.dataset_name.split(".")[1]
table_name = self.dataset_name.split(".")[2]
logger.debug(
f"Getting estimated rowcounts for table:{self.dataset_name}, schema:{schema_name}, table:{table_name}"
)
get_estimate_script = sa.text(
f"SELECT c.reltuples AS estimate FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = '{table_name}' AND n.nspname = '{schema_name}'"
)
elif dialect_name == "mysql":
schema_name = self.dataset_name.split(".")[0]
table_name = self.dataset_name.split(".")[1]
logger.debug(
f"Getting estimated rowcounts for table:{self.dataset_name}, schema:{schema_name}, table:{table_name}"
)
get_estimate_script = sa.text(
f"SELECT table_rows AS estimate FROM information_schema.tables WHERE table_schema = '{schema_name}' AND table_name = '{table_name}'"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,5 +400,159 @@
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProfile",
"aspect": {
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"rowCount": 0,
"columnCount": 6,
"fieldProfiles": [
{
"fieldPath": "id",
"uniqueCount": 5,
"uniqueProportion": 1,
"nullCount": 0,
"min": "1",
"max": "5",
"mean": "3.0",
"median": "3",
"stdev": "1.5811388300841898",
"sampleValues": [
"1",
"2",
"3",
"4",
"5"
]
},
{
"fieldPath": "company",
"uniqueCount": 5,
"uniqueProportion": 1,
"nullCount": 0,
"sampleValues": [
"Company A",
"Company B",
"Company C",
"Company D",
"Company E"
]
},
{
"fieldPath": "last_name",
"uniqueCount": 5,
"uniqueProportion": 1,
"nullCount": 0,
"sampleValues": [
"Axen",
"Bedecs",
"Donnell",
"Gratacos Solsona",
"Lee"
]
},
{
"fieldPath": "first_name",
"uniqueCount": 5,
"uniqueProportion": 1,
"nullCount": 0,
"sampleValues": [
"Anna",
"Antonio",
"Christina",
"Martin",
"Thomas"
]
},
{
"fieldPath": "email_address",
"uniqueCount": 0,
"nullCount": 0,
"sampleValues": []
},
{
"fieldPath": "priority",
"uniqueCount": 3,
"uniqueProportion": 0.75,
"nullCount": 0,
"min": "3.8",
"max": "4.9",
"mean": "4.175000011920929",
"median": "4.0",
"stdev": "0.49244294899530355",
"sampleValues": [
"4.0",
"4.9",
"4.0",
"3.8"
]
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProfile",
"aspect": {
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"rowCount": 0,
"columnCount": 3,
"fieldProfiles": [
{
"fieldPath": "id",
"uniqueCount": 0,
"nullCount": 0,
"min": "None",
"max": "None",
"mean": "None",
"median": "None",
"stdev": "0.0",
"sampleValues": []
},
{
"fieldPath": "description",
"uniqueCount": 0,
"nullCount": 0,
"sampleValues": []
},
{
"fieldPath": "customer_id",
"uniqueCount": 0,
"nullCount": 0,
"min": "None",
"max": "None",
"mean": "None",
"median": "None",
"stdev": "0.0",
"sampleValues": []
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-2020_04_14-07_00_00"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@
}
},
{
"urn": "urn:li:dataPlatform:presto_on_hive",
"urn": "urn:li:dataPlatform:presto-on-hive",
"aspect": {
"datasetNameDelimiter": ".",
"name": "presto-on-hive",
Expand Down

0 comments on commit 1e1d116

Please sign in to comment.