Skip to content

Commit

Permalink
Merge pull request #2 from waterdipai/test_update
Browse files Browse the repository at this point in the history
[Subhankar] add pytest for configuration
  • Loading branch information
subhankarb authored Jul 22, 2023
2 parents b0dd689 + 7ae0310 commit 47db070
Show file tree
Hide file tree
Showing 28 changed files with 811 additions and 283 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
run: |
source .venv/bin/activate
pytest --cov=datachecks --cov-report=xml -p no:warnings ./tests/*
#-----------------------------------------
# Run Pytest
#---------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ datachecks inspect -C config.yaml

### Data Source Configuration

Declare the data sources in the `data_sources` section of the config file.
Declare the data sources in the `data_sources` section of the config file.
The data sources can be of type `postgres` or `opensearch`.
```yaml
data_sources:
Expand All @@ -67,7 +67,7 @@ data_sources:
### Metric Configuration
Metrics are defined in the `metrics` section of the config file.
Metrics are defined in the `metrics` section of the config file.

```yaml
metrics:
Expand Down
3 changes: 2 additions & 1 deletion datachecks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from datachecks.core.configuration.configuration import (Configuration,
load_configuration)
from datachecks.core.inspect import Inspect
from datachecks.core.configuration.configuration import Configuration, load_configuration
1 change: 0 additions & 1 deletion datachecks/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,5 @@
# limitations under the License.
from datachecks.cli.cli import main


if __name__ == "__main__":
main()
13 changes: 13 additions & 0 deletions datachecks/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2022-present, the Waterdip Labs Pvt. Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
15 changes: 9 additions & 6 deletions datachecks/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import click

from datachecks.__version__ import __version__
from datachecks.core.configuration.configuration import Configuration, load_configuration
from datachecks.core.configuration.configuration import (Configuration,
load_configuration)
from datachecks.core.datasource.data_source import DataSourceManager
from datachecks.core.metric.metric import MetricManager

Expand Down Expand Up @@ -44,17 +45,19 @@ def inspect(

data_source_manager = DataSourceManager(configuration.data_sources)
for data_source_name in data_source_manager.get_data_source_names():
data_source = data_source_manager.get_data_source(data_source_name=data_source_name)
print(f"Data source: {data_source.data_source_name} is {data_source.is_connected()}")
data_source = data_source_manager.get_data_source(
data_source_name=data_source_name
)
print(
f"Data source: {data_source.data_source_name} is {data_source.is_connected()}"
)
metric_manager = MetricManager(
metric_config=configuration.metrics,
data_source_manager=data_source_manager
metric_config=configuration.metrics, data_source_manager=data_source_manager
)
for metric_name, metric in metric_manager.metrics.items():
metric_value = metric.get_value()
print(f"{metric_name} : {metric_value}")


"""
1. Read config
2. Create data sources
Expand Down
2 changes: 1 addition & 1 deletion datachecks/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
94 changes: 59 additions & 35 deletions datachecks/core/configuration/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,24 @@
# limitations under the License.

from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from enum import Enum
from typing import Any, Dict, List, Optional

import yaml
from yaml import SafeLoader


class DatasourceType(Enum):
OPENSEARCH = "opensearch"
POSTGRES = "postgres"


@dataclass
class DataSourceConnectionConfiguration:
"""
Connection configuration for a data source
"""

host: str
port: int
username: Optional[str]
Expand All @@ -37,8 +44,9 @@ class DataSourceConfiguration:
"""
Data source configuration
"""

name: str
type: str
type: DatasourceType
connection_config: DataSourceConnectionConfiguration


Expand All @@ -47,6 +55,7 @@ class MetricsFilterConfiguration:
"""
Filter configuration for a metric
"""

sql_query: Optional[list]
search_query: Optional[list]

Expand All @@ -56,6 +65,7 @@ class MetricConfiguration:
"""
Metric configuration
"""

name: str
metric_type: str
index: Optional[str] = None
Expand All @@ -68,51 +78,65 @@ class Configuration:
"""
Configuration for the data checks
"""

data_sources: List[DataSourceConfiguration]
metrics: Dict[str, List[MetricConfiguration]]


def load_configuration(file_path: str) -> Configuration:
"""
Load the configuration from a YAML file
Load configuration from a yaml file
:param file_path:
:return:
"""
with open(file_path) as config_yaml_file:
yaml_string = config_yaml_file.read()
config_dict: Dict = yaml.safe_load(yaml_string)

data_source_configurations = [
DataSourceConfiguration(
name=data_source["name"],
type=data_source["type"],
connection_config=DataSourceConnectionConfiguration(
host=data_source["connection"]["host"],
port=data_source["connection"]["port"],
username=data_source["connection"].get("username"),
password=data_source["connection"].get("password"),
database=data_source["connection"].get("database"),
schema=data_source["connection"].get("schema"),

return load_configuration_from_yaml_str(yaml_string)


def load_configuration_from_yaml_str(yaml_string: str) -> Configuration:
"""
Load configuration from a yaml string
"""

config_dict: Dict = yaml.safe_load(yaml_string)

data_source_configurations = [
DataSourceConfiguration(
name=data_source["name"],
type=DatasourceType(data_source["type"]),
connection_config=DataSourceConnectionConfiguration(
host=data_source["connection"]["host"],
port=data_source["connection"]["port"],
username=data_source["connection"].get("username"),
password=data_source["connection"].get("password"),
database=data_source["connection"].get("database"),
schema=data_source["connection"].get("schema"),
),
)
for data_source in config_dict["data_sources"]
]

metric_configurations = {
data_source_name: [
MetricConfiguration(
name=metric_name,
metric_type=metric_value["metric_type"],
index=metric_value.get("index"),
table=metric_value.get("table"),
filter=MetricsFilterConfiguration(
sql_query=metric_value.get("filter", {}).get("sql_query", None),
search_query=metric_value.get("filter", {}).get(
"search_query", None
),
),
)
for data_source in config_dict["data_sources"]
for metric_name, metric_value in metric_list.items()
]
for data_source_name, metric_list in config_dict["metrics"].items()
}

metric_configurations = {
data_source_name: [
MetricConfiguration(
name=metric_name,
metric_type=metric_value["metric_type"],
index=metric_value.get("index"),
table=metric_value.get("table"),
filter=MetricsFilterConfiguration(
sql_query=metric_value.get("filter", {}).get("sql_query", None),
search_query=metric_value.get("filter", {}).get("search_query", None)
)
)
for metric_name, metric_value in metric_list.items()
]
for data_source_name, metric_list in config_dict["metrics"].items()
}

return Configuration(data_sources=data_source_configurations, metrics=metric_configurations)
return Configuration(
data_sources=data_source_configurations, metrics=metric_configurations
)
2 changes: 1 addition & 1 deletion datachecks/core/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
59 changes: 35 additions & 24 deletions datachecks/core/datasource/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from dataclasses import asdict
from enum import Enum
from abc import ABC
from sqlite3 import Connection
from typing import Dict, Any, List, Union
from typing import Any, Dict, Union

from opensearchpy import OpenSearch
from sqlalchemy import URL, create_engine, text
from sqlalchemy import text


class DataSource(ABC):
"""
Abstract class for data sources
"""
def __init__(
self,
data_source_name: str,
data_connection: Dict
):

def __init__(self, data_source_name: str, data_connection: Dict):
self.data_source_name: str = data_source_name
self.data_connection: Dict = data_connection

Expand All @@ -51,11 +45,8 @@ class SearchIndexDataSource(DataSource):
"""
Abstract class for search index data sources
"""
def __init__(
self,
data_source_name: str,
data_connection: Dict
):

def __init__(self, data_source_name: str, data_connection: Dict):
super().__init__(data_source_name, data_connection)

self.client = None
Expand All @@ -69,16 +60,23 @@ def query_get_document_count(self, index_name: str, filter: str = None) -> int:
"""
raise NotImplementedError("query_get_document_count method is not implemented")

def query_get_max(self, index_name: str, field: str, filter: str = None) -> int:
"""
Get the max value
:param index_name: name of the index
:param field: field name
:param filter: optional filter
:return: max value
"""
raise NotImplementedError("query_get_max method is not implemented")


class SQLDatasource(DataSource):
"""
Abstract class for SQL data sources
"""
def __init__(
self,
data_source_name: str,
data_source_properties: Dict
):

def __init__(self, data_source_name: str, data_source_properties: Dict):
super().__init__(data_source_name, data_source_properties)

self.connection: Union[Connection, None] = None
Expand All @@ -91,13 +89,26 @@ def is_connected(self) -> bool:

def query_get_row_count(self, table: str, filter: str = None) -> int:
"""
Get the document count
:param index_name: name of the index
Get the row count
:param table: name of the table
:param filter: optional filter
:return: count of documents
"""
query = "SELECT COUNT(*) FROM {}".format(table)
if filter:
query += " WHERE {}".format(filter)

return self.connection.execute(text(query)).fetchone()[0]

def query_get_max(self, table: str, field: str, filter: str = None) -> int:
"""
Get the max value
:param table: table name
:param field: column name
:param filter: filter condition
:return:
"""
query = "SELECT MAX({}) FROM {}".format(field, table)
if filter:
query += " WHERE {}".format(filter)

return self.connection.execute(text(query)).fetchone()[0]
Loading

0 comments on commit 47db070

Please sign in to comment.