Skip to content

Commit

Permalink
[Subhankar] add freshness metric
Browse files Browse the repository at this point in the history
  • Loading branch information
subhankarb committed Jul 23, 2023
1 parent f6aa840 commit 74fff5f
Show file tree
Hide file tree
Showing 15 changed files with 471 additions and 99 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ metrics:
metric_type: row_count
table: table_1
filter:
sql_query: "category = 'HAT' AND is_valid is True"
where_clause: "category = 'HAT' AND is_valid is True"
count_content_non_valid:
metric_type: row_count
table: table_1
filter:
sql_query: "is_valid is False"
where_clause: "is_valid is False"
```
19 changes: 10 additions & 9 deletions datachecks/core/configuration/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@

from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Dict, List, Optional

import yaml
from yaml import SafeLoader


class DatasourceType(Enum):
Expand Down Expand Up @@ -56,8 +55,8 @@ class MetricsFilterConfiguration:
Filter configuration for a metric
"""

sql_query: Optional[list]
search_query: Optional[list]
where_clause: Optional[str] = None
search_query: Optional[str] = None


@dataclass
Expand All @@ -70,7 +69,8 @@ class MetricConfiguration:
metric_type: str
index: Optional[str] = None
table: Optional[str] = None
filter: Optional[MetricsFilterConfiguration] = None
field: Optional[str] = None
filters: Optional[MetricsFilterConfiguration] = None


@dataclass
Expand Down Expand Up @@ -125,11 +125,12 @@ def load_configuration_from_yaml_str(yaml_string: str) -> Configuration:
metric_type=metric_value["metric_type"],
index=metric_value.get("index"),
table=metric_value.get("table"),
filter=MetricsFilterConfiguration(
sql_query=metric_value.get("filter", {}).get("sql_query", None),
search_query=metric_value.get("filter", {}).get(
field=metric_value.get("field"),
filters=MetricsFilterConfiguration(
where_clause=metric_value.get("filters", {}).get("where_clause", None),
search_query=metric_value.get("filters", {}).get(
"search_query", None
),
)
),
)
for metric_name, metric_value in metric_list.items()
Expand Down
18 changes: 14 additions & 4 deletions datachecks/core/datasource/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ def query_get_max(self, index_name: str, field: str, filters: str = None) -> int
"""
raise NotImplementedError("query_get_max method is not implemented")

def query_get_time_diff(self, index_name: str, field: str) -> int:
"""
Get the time difference
:param index_name: name of the index
:param field: field name
:param filters: optional filter
:return: time difference in milliseconds
"""
raise NotImplementedError("query_get_time_diff method is not implemented")


class SQLDatasource(DataSource):
"""
Expand Down Expand Up @@ -109,16 +119,16 @@ def query_get_row_count(self, table: str, filters: str = None) -> int:

return self.connection.execute(text(query)).fetchone()[0]

def query_get_max(self, table: str, field: str, filter: str = None) -> int:
def query_get_max(self, table: str, field: str, filters: str = None) -> int:
"""
Get the max value
:param table: table name
:param field: column name
:param filter: filter condition
:param filters: filter condition
:return:
"""
query = "SELECT MAX({}) FROM {}".format(field, table)
if filter:
query += " WHERE {}".format(filter)
if filters:
query += " WHERE {}".format(filters)

return self.connection.execute(text(query)).fetchone()[0]
4 changes: 2 additions & 2 deletions datachecks/core/datasource/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ def _initialize_data_sources(self):
:return:
"""
for data_source_config in self.data_source_configs:
self.data_sources[data_source_config.name] = self.create_data_source(
self.data_sources[data_source_config.name] = self._create_data_source(
data_source_config=data_source_config
)
self.data_sources[data_source_config.name].connect()

@staticmethod
def create_data_source(data_source_config: DataSourceConfiguration) -> DataSource:
def _create_data_source(data_source_config: DataSourceConfiguration) -> DataSource:
"""
Create a data source
:param data_source_config: data source configuration
Expand Down
23 changes: 22 additions & 1 deletion datachecks/core/datasource/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime
from typing import Dict

from dateutil import parser
from opensearchpy import OpenSearch

from datachecks.core.datasource.base import SearchIndexDataSource
Expand Down Expand Up @@ -83,3 +84,23 @@ def query_get_max(self, index_name: str, field: str, filters: Dict = None) -> in

response = self.client.search(index=index_name, body=query)
return response["aggregations"]["max_value"]["value"]

def query_get_time_diff(self, index_name: str, field: str) -> int:
"""
Get the time difference between the latest and the now
:param index_name:
:param field:
:return:
"""
query = {"query": {"match_all": {}}, "sort": [{f"{field}": {"order": "desc"}}]}

response = self.client.search(index=index_name, body=query)

if response["hits"]["hits"]:
last_updated = response["hits"]["hits"][0]["_source"][field]

last_updated = parser.parse(timestr=last_updated).timestamp()
now = datetime.utcnow().timestamp()
return int(now - last_updated)

return 0
24 changes: 11 additions & 13 deletions datachecks/core/metric/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class MetricsType(str, Enum):
ROW_COUNT = "row_count"
DOCUMENT_COUNT = "document_count"
MAX = "max"
FRESHNESS = "freshness"


class MetricIdentity:
Expand All @@ -45,7 +46,7 @@ def generate_identity(
identifiers.append(metric_name)
if index_name:
identifiers.append(index_name)
if table_name:
elif table_name:
identifiers.append(table_name)
if field_name:
identifiers.append(field_name)
Expand Down Expand Up @@ -74,6 +75,7 @@ def __init__(
if index_name is None and table_name is None:
raise ValueError("Please give a value for table_name or index_name")

self.index_name, self.table_name = None, None
if index_name:
self.index_name = index_name
if table_name:
Expand All @@ -84,12 +86,17 @@ def __init__(
self.metric_type = metric_type
self.filter_query = None
if filters is not None:
if "search_query" in filters and "sql_query" in filters:
if (
"search_query" in filters and filters["search_query"] is not None
) and (
"where_clause" in filters and filters["where_clause"] is not None
):
raise ValueError(
"Please give a value for search_query or sql_query (but not both)"
"Please give a value for search_query or where_clause (but not both)"
)

if "search_query" in filters:
if "search_query" in filters and filters["search_query"] is not None:
print(filters)
self.filter_query = json.loads(filters["search_query"])
elif "where_clause" in filters:
self.filter_query = filters["where_clause"]
Expand Down Expand Up @@ -147,15 +154,6 @@ def __init__(

self.field_name = field_name

def get_metric_identity(self):
return MetricIdentity.generate_identity(
metric_type=MetricsType.DOCUMENT_COUNT,
metric_name=self.name,
data_source=self.data_source,
table_name=self.table_name,
field_name=self.field_name,
)

@property
def get_field_name(self):
return self.field_name
46 changes: 46 additions & 0 deletions datachecks/core/metric/freshness_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2022-present, the Waterdip Labs Pvt. Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datachecks.core.datasource.base import (SearchIndexDataSource,
SQLDatasource)
from datachecks.core.metric.base import (FieldMetrics, MetricIdentity,
MetricsType)


class FreshnessValueMetric(FieldMetrics):
"""
FreshnessMetric is a class that represents a metric that is generated by a data source.
"""

def get_metric_identity(self):
return MetricIdentity.generate_identity(
metric_type=MetricsType.FRESHNESS,
metric_name=self.name,
data_source=self.data_source,
field_name=self.field_name,
)

def _generate_metric_value(self):
if isinstance(self.data_source, SQLDatasource):
return self.data_source.query_get_max(
table=self.table_name,
field=self.field_name,
filters=self.filter_query if self.filter_query else None,
)
elif isinstance(self.data_source, SearchIndexDataSource):
return self.data_source.query_get_time_diff(
index_name=self.index_name, field=self.field_name
)
else:
raise ValueError("Invalid data source type")
27 changes: 22 additions & 5 deletions datachecks/core/metric/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from datachecks.core.datasource.manager import DataSourceManager
from datachecks.core.metric.base import MetricsType
from datachecks.core.metric.numeric_metric import (DocumentCountMetric,
RowCountMetric)
MaxMetric, RowCountMetric)


class MetricManager:
Expand All @@ -41,20 +41,37 @@ def _build_metrics(self, config: Dict[str, List[MetricConfiguration]]):
data_source=self.data_source_manager.get_data_source(
data_source
),
filter=asdict(metric_config.filter),
filters=asdict(metric_config.filters)
if metric_config.filters
else None,
index_name=metric_config.index,
metric_type=MetricsType.DOCUMENT_COUNT,
)
self.metrics[metric.metric_identity] = metric
self.metrics[metric.get_metric_identity()] = metric
elif metric_config.metric_type == MetricsType.ROW_COUNT:
metric = RowCountMetric(
name=metric_config.name,
data_source=self.data_source_manager.get_data_source(
data_source
),
filter=asdict(metric_config.filter),
filters=asdict(metric_config.filters) if metric_config.filters else None,
table_name=metric_config.table,
metric_type=MetricsType.ROW_COUNT,
)
self.metrics[metric.metric_identity] = metric
self.metrics[metric.get_metric_identity()] = metric
elif metric_config.metric_type == MetricsType.MAX:
metric = MaxMetric(
name=metric_config.name,
data_source=self.data_source_manager.get_data_source(
data_source
),
filters=asdict(metric_config.filters) if metric_config.filters else None,
table_name=metric_config.table,
index_name=metric_config.index,
metric_type=MetricsType.MAX,
field_name=metric_config.field,
)
self.metrics[metric.get_metric_identity()] = metric
else:
raise ValueError("Invalid metric type")

Expand Down
12 changes: 11 additions & 1 deletion datachecks/core/metric/numeric_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ class DocumentCountMetric(Metric):
def validate_data_source(self):
return isinstance(self.data_source, SearchIndexDataSource)

def get_metric_identity(self):
return MetricIdentity.generate_identity(
metric_type=MetricsType.DOCUMENT_COUNT,
metric_name=self.name,
data_source=self.data_source,
index_name=self.index_name,
)

def _generate_metric_value(self):
if isinstance(self.data_source, SearchIndexDataSource):
return self.data_source.query_get_document_count(
Expand Down Expand Up @@ -77,14 +85,16 @@ def get_metric_identity(self):
metric_name=self.name,
data_source=self.data_source,
field_name=self.field_name,
table_name=self.table_name if self.table_name else None,
index_name=self.index_name if self.index_name else None,
)

def _generate_metric_value(self):
if isinstance(self.data_source, SQLDatasource):
return self.data_source.query_get_max(
table=self.table_name,
field=self.field_name,
filter=self.filter_query if self.filter_query else None,
filters=self.filter_query if self.filter_query else None,
)
elif isinstance(self.data_source, SearchIndexDataSource):
return self.data_source.query_get_max(
Expand Down
Loading

0 comments on commit 74fff5f

Please sign in to comment.