diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0f9efb98..826433a3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -63,7 +63,7 @@ jobs: run: | source .venv/bin/activate pytest --cov=datachecks --cov-report=xml -p no:warnings ./tests/* - + #----------------------------------------- # Run Pytest #--------------------------------------- diff --git a/README.md b/README.md index 70519e84..91dac241 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ datachecks inspect -C config.yaml ### Data Source Configuration -Declare the data sources in the `data_sources` section of the config file. +Declare the data sources in the `data_sources` section of the config file. The data sources can be of type `postgres` or `opensearch`. ```yaml data_sources: @@ -67,7 +67,7 @@ data_sources: ### Metric Configuration -Metrics are defined in the `metrics` section of the config file. +Metrics are defined in the `metrics` section of the config file. ```yaml metrics: diff --git a/datachecks/__init__.py b/datachecks/__init__.py index 8f6a6824..1cdc2a1d 100644 --- a/datachecks/__init__.py +++ b/datachecks/__init__.py @@ -12,5 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datachecks.core.configuration.configuration import (Configuration, + load_configuration) from datachecks.core.inspect import Inspect -from datachecks.core.configuration.configuration import Configuration, load_configuration diff --git a/datachecks/__main__.py b/datachecks/__main__.py index c3c52b05..96ae880d 100644 --- a/datachecks/__main__.py +++ b/datachecks/__main__.py @@ -13,6 +13,5 @@ # limitations under the License. from datachecks.cli.cli import main - if __name__ == "__main__": main() diff --git a/datachecks/cli/__init__.py b/datachecks/cli/__init__.py index e69de29b..95dc1b8e 100644 --- a/datachecks/cli/__init__.py +++ b/datachecks/cli/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/datachecks/cli/cli.py b/datachecks/cli/cli.py index 9980d70c..1ee97f53 100644 --- a/datachecks/cli/cli.py +++ b/datachecks/cli/cli.py @@ -16,7 +16,8 @@ import click from datachecks.__version__ import __version__ -from datachecks.core.configuration.configuration import Configuration, load_configuration +from datachecks.core.configuration.configuration import (Configuration, + load_configuration) from datachecks.core.datasource.data_source import DataSourceManager from datachecks.core.metric.metric import MetricManager @@ -44,17 +45,19 @@ def inspect( data_source_manager = DataSourceManager(configuration.data_sources) for data_source_name in data_source_manager.get_data_source_names(): - data_source = data_source_manager.get_data_source(data_source_name=data_source_name) - print(f"Data source: {data_source.data_source_name} is {data_source.is_connected()}") + data_source = data_source_manager.get_data_source( + data_source_name=data_source_name + ) + print( + f"Data source: {data_source.data_source_name} is {data_source.is_connected()}" + ) metric_manager = MetricManager( - metric_config=configuration.metrics, - data_source_manager=data_source_manager + metric_config=configuration.metrics, data_source_manager=data_source_manager ) for metric_name, metric in metric_manager.metrics.items(): metric_value = metric.get_value() print(f"{metric_name} : {metric_value}") - """ 1. Read config 2. Create data sources diff --git a/datachecks/core/__init__.py b/datachecks/core/__init__.py index 55403345..95dc1b8e 100644 --- a/datachecks/core/__init__.py +++ b/datachecks/core/__init__.py @@ -10,4 +10,4 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file +# limitations under the License. diff --git a/datachecks/core/configuration/configuration.py b/datachecks/core/configuration/configuration.py index ec7481c8..7e2b7779 100644 --- a/datachecks/core/configuration/configuration.py +++ b/datachecks/core/configuration/configuration.py @@ -13,17 +13,24 @@ # limitations under the License. from dataclasses import dataclass -from typing import Optional, List, Dict, Any +from enum import Enum +from typing import Any, Dict, List, Optional import yaml from yaml import SafeLoader +class DatasourceType(Enum): + OPENSEARCH = "opensearch" + POSTGRES = "postgres" + + @dataclass class DataSourceConnectionConfiguration: """ Connection configuration for a data source """ + host: str port: int username: Optional[str] @@ -37,8 +44,9 @@ class DataSourceConfiguration: """ Data source configuration """ + name: str - type: str + type: DatasourceType connection_config: DataSourceConnectionConfiguration @@ -47,6 +55,7 @@ class MetricsFilterConfiguration: """ Filter configuration for a metric """ + sql_query: Optional[list] search_query: Optional[list] @@ -56,6 +65,7 @@ class MetricConfiguration: """ Metric configuration """ + name: str metric_type: str index: Optional[str] = None @@ -68,51 +78,65 @@ class Configuration: """ Configuration for the data checks """ + data_sources: List[DataSourceConfiguration] metrics: Dict[str, List[MetricConfiguration]] def load_configuration(file_path: str) -> Configuration: """ - Load the configuration from a YAML file + Load configuration from a yaml file :param file_path: :return: """ with open(file_path) as config_yaml_file: yaml_string = config_yaml_file.read() - config_dict: Dict = yaml.safe_load(yaml_string) - - data_source_configurations = [ - DataSourceConfiguration( - name=data_source["name"], - type=data_source["type"], - connection_config=DataSourceConnectionConfiguration( - host=data_source["connection"]["host"], - port=data_source["connection"]["port"], - username=data_source["connection"].get("username"), - password=data_source["connection"].get("password"), - database=data_source["connection"].get("database"), - schema=data_source["connection"].get("schema"), + + return load_configuration_from_yaml_str(yaml_string) + + +def load_configuration_from_yaml_str(yaml_string: str) -> Configuration: + """ + Load configuration from a yaml string + """ + + config_dict: Dict = yaml.safe_load(yaml_string) + + data_source_configurations = [ + DataSourceConfiguration( + name=data_source["name"], + type=DatasourceType(data_source["type"]), + connection_config=DataSourceConnectionConfiguration( + host=data_source["connection"]["host"], + port=data_source["connection"]["port"], + username=data_source["connection"].get("username"), + password=data_source["connection"].get("password"), + database=data_source["connection"].get("database"), + schema=data_source["connection"].get("schema"), + ), + ) + for data_source in config_dict["data_sources"] + ] + + metric_configurations = { + data_source_name: [ + MetricConfiguration( + name=metric_name, + metric_type=metric_value["metric_type"], + index=metric_value.get("index"), + table=metric_value.get("table"), + filter=MetricsFilterConfiguration( + sql_query=metric_value.get("filter", {}).get("sql_query", None), + search_query=metric_value.get("filter", {}).get( + "search_query", None + ), ), ) - for data_source in config_dict["data_sources"] + for metric_name, metric_value in metric_list.items() ] + for data_source_name, metric_list in config_dict["metrics"].items() + } - metric_configurations = { - data_source_name: [ - MetricConfiguration( - name=metric_name, - metric_type=metric_value["metric_type"], - index=metric_value.get("index"), - table=metric_value.get("table"), - filter=MetricsFilterConfiguration( - sql_query=metric_value.get("filter", {}).get("sql_query", None), - search_query=metric_value.get("filter", {}).get("search_query", None) - ) - ) - for metric_name, metric_value in metric_list.items() - ] - for data_source_name, metric_list in config_dict["metrics"].items() - } - - return Configuration(data_sources=data_source_configurations, metrics=metric_configurations) + return Configuration( + data_sources=data_source_configurations, metrics=metric_configurations + ) diff --git a/datachecks/core/datasource/__init__.py b/datachecks/core/datasource/__init__.py index 55403345..95dc1b8e 100644 --- a/datachecks/core/datasource/__init__.py +++ b/datachecks/core/datasource/__init__.py @@ -10,4 +10,4 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file +# limitations under the License. diff --git a/datachecks/core/datasource/base.py b/datachecks/core/datasource/base.py index c7b5935a..13e091d4 100644 --- a/datachecks/core/datasource/base.py +++ b/datachecks/core/datasource/base.py @@ -12,25 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod -from dataclasses import asdict -from enum import Enum +from abc import ABC from sqlite3 import Connection -from typing import Dict, Any, List, Union +from typing import Any, Dict, Union -from opensearchpy import OpenSearch -from sqlalchemy import URL, create_engine, text +from sqlalchemy import text class DataSource(ABC): """ Abstract class for data sources """ - def __init__( - self, - data_source_name: str, - data_connection: Dict - ): + + def __init__(self, data_source_name: str, data_connection: Dict): self.data_source_name: str = data_source_name self.data_connection: Dict = data_connection @@ -51,11 +45,8 @@ class SearchIndexDataSource(DataSource): """ Abstract class for search index data sources """ - def __init__( - self, - data_source_name: str, - data_connection: Dict - ): + + def __init__(self, data_source_name: str, data_connection: Dict): super().__init__(data_source_name, data_connection) self.client = None @@ -69,16 +60,23 @@ def query_get_document_count(self, index_name: str, filter: str = None) -> int: """ raise NotImplementedError("query_get_document_count method is not implemented") + def query_get_max(self, index_name: str, field: str, filter: str = None) -> int: + """ + Get the max value + :param index_name: name of the index + :param field: field name + :param filter: optional filter + :return: max value + """ + raise NotImplementedError("query_get_max method is not implemented") + class SQLDatasource(DataSource): """ Abstract class for SQL data sources """ - def __init__( - self, - data_source_name: str, - data_source_properties: Dict - ): + + def __init__(self, data_source_name: str, data_source_properties: Dict): super().__init__(data_source_name, data_source_properties) self.connection: Union[Connection, None] = None @@ -91,13 +89,26 @@ def is_connected(self) -> bool: def query_get_row_count(self, table: str, filter: str = None) -> int: """ - Get the document count - :param index_name: name of the index + Get the row count + :param table: name of the table :param filter: optional filter - :return: count of documents """ query = "SELECT COUNT(*) FROM {}".format(table) if filter: query += " WHERE {}".format(filter) return self.connection.execute(text(query)).fetchone()[0] + + def query_get_max(self, table: str, field: str, filter: str = None) -> int: + """ + Get the max value + :param table: table name + :param field: column name + :param filter: filter condition + :return: + """ + query = "SELECT MAX({}) FROM {}".format(field, table) + if filter: + query += " WHERE {}".format(filter) + + return self.connection.execute(text(query)).fetchone()[0] diff --git a/datachecks/core/datasource/manager.py b/datachecks/core/datasource/manager.py index e43d3c2f..d8e45791 100644 --- a/datachecks/core/datasource/manager.py +++ b/datachecks/core/datasource/manager.py @@ -1,9 +1,25 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from dataclasses import asdict -from typing import List, Dict +from typing import Dict, List -from datachecks.core.configuration.configuration import DataSourceConfiguration +from datachecks.core.configuration.configuration import ( + DataSourceConfiguration, DatasourceType) from datachecks.core.datasource.base import DataSource -from datachecks.core.datasource.opensearch import OpenSearchSearchIndexDataSource +from datachecks.core.datasource.opensearch import \ + OpenSearchSearchIndexDataSource from datachecks.core.datasource.postgres import PostgresSQLDatasource @@ -13,6 +29,7 @@ class DataSourceManager: This class is responsible for managing the data sources. """ + def __init__(self, config: List[DataSourceConfiguration]): self.data_source_configs: List[DataSourceConfiguration] = config self.data_sources: Dict[str, DataSource] = {} @@ -24,7 +41,9 @@ def _initialize_data_sources(self): :return: """ for data_source_config in self.data_source_configs: - self.data_sources[data_source_config.name] = self.create_data_source(data_source_config=data_source_config) + self.data_sources[data_source_config.name] = self.create_data_source( + data_source_config=data_source_config + ) self.data_sources[data_source_config.name].connect() @staticmethod @@ -34,15 +53,15 @@ def create_data_source(data_source_config: DataSourceConfiguration) -> DataSourc :param data_source_config: data source configuration :return: data source """ - if data_source_config.type == 'opensearch': + if data_source_config.type == DatasourceType.OPENSEARCH: return OpenSearchSearchIndexDataSource( data_source_name=data_source_config.name, data_connection=asdict(data_source_config.connection_config), ) - elif data_source_config.type == 'postgres': + elif data_source_config.type == DatasourceType.POSTGRES: return PostgresSQLDatasource( data_source_name=data_source_config.name, - data_source_properties=asdict(data_source_config.connection_config) + data_source_properties=asdict(data_source_config.connection_config), ) else: raise ValueError(f"Unsupported data source type: {data_source_config.type}") diff --git a/datachecks/core/datasource/opensearch.py b/datachecks/core/datasource/opensearch.py index f3d9f33d..bf20d31b 100644 --- a/datachecks/core/datasource/opensearch.py +++ b/datachecks/core/datasource/opensearch.py @@ -1,3 +1,17 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from typing import Dict from opensearchpy import OpenSearch @@ -9,11 +23,8 @@ class OpenSearchSearchIndexDataSource(SearchIndexDataSource): """ OpenSearch data source """ - def __init__( - self, - data_source_name: str, - data_connection: Dict - ): + + def __init__(self, data_source_name: str, data_connection: Dict): super().__init__(data_source_name, data_connection) def connect(self) -> OpenSearch: @@ -21,15 +32,18 @@ def connect(self) -> OpenSearch: Connect to the data source """ - auth = (self.data_connection.get('username'), self.data_connection.get('password')) - host = self.data_connection.get('host') - port = int(self.data_connection.get('port')) + auth = ( + self.data_connection.get("username"), + self.data_connection.get("password"), + ) + host = self.data_connection.get("host") + port = int(self.data_connection.get("port")) self.client = OpenSearch( - hosts=[{'host': host, 'port': port}], + hosts=[{"host": host, "port": port}], http_auth=auth, use_ssl=True, verify_certs=False, - ca_certs=False + ca_certs=False, ) return self.client @@ -45,6 +59,21 @@ def query_get_document_count(self, index_name: str, filter: str = None) -> int: :param index_name: name of the index :param filter: optional filter """ - response = self.client.count(index=index_name, body=filter) + response = self.client.count(index=index_name, body=filter if filter else {}) + print(response) + return response["count"] + + def query_get_max(self, index_name: str, field: str, filter: str = None) -> int: + """ + Get the max value of a field + :param index_name: + :param field: + :param filter: + :return: + """ + query = {"aggs": {"max_value": {"max": {"field": field}}}} + if filter: + query["query"] = filter - return response['count'] \ No newline at end of file + response = self.client.search(index=index_name, body=query) + return response["aggregations"]["max_value"]["value"] diff --git a/datachecks/core/datasource/postgres.py b/datachecks/core/datasource/postgres.py index 0f02048f..46684f90 100644 --- a/datachecks/core/datasource/postgres.py +++ b/datachecks/core/datasource/postgres.py @@ -1,17 +1,26 @@ -from typing import Dict, Any +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -from sqlalchemy import create_engine, URL +from typing import Any, Dict + +from sqlalchemy import URL, create_engine from datachecks.core.datasource.base import SQLDatasource class PostgresSQLDatasource(SQLDatasource): - - def __init__( - self, - data_source_name: str, - data_source_properties: Dict - ): + def __init__(self, data_source_name: str, data_source_properties: Dict): super().__init__(data_source_name, data_source_properties) def connect(self) -> Any: @@ -20,11 +29,11 @@ def connect(self) -> Any: """ url = URL.create( drivername="postgresql", - username=self.data_connection.get('username'), - password=self.data_connection.get('password'), - host=self.data_connection.get('host'), - port=self.data_connection.get('port'), - database=self.data_connection.get('database') + username=self.data_connection.get("username"), + password=self.data_connection.get("password"), + host=self.data_connection.get("host"), + port=self.data_connection.get("port"), + database=self.data_connection.get("database"), ) engine = create_engine(url) self.connection = engine.connect() diff --git a/datachecks/core/inspect.py b/datachecks/core/inspect.py index 3228afe7..18f3f1ce 100644 --- a/datachecks/core/inspect.py +++ b/datachecks/core/inspect.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Dict +from typing import Dict, List from datachecks.core.configuration.configuration import Configuration from datachecks.core.datasource.manager import DataSourceManager @@ -19,13 +19,12 @@ class Inspect: - def __init__(self, configuration: Configuration): self.configuration = configuration self.data_source_manager = DataSourceManager(configuration.data_sources) self.metric_manager = MetricManager( metric_config=configuration.metrics, - data_source_manager=self.data_source_manager + data_source_manager=self.data_source_manager, ) def start(self): diff --git a/datachecks/core/metric/__init__.py b/datachecks/core/metric/__init__.py index 55403345..95dc1b8e 100644 --- a/datachecks/core/metric/__init__.py +++ b/datachecks/core/metric/__init__.py @@ -10,4 +10,4 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file +# limitations under the License. diff --git a/datachecks/core/metric/base.py b/datachecks/core/metric/base.py index 967c2d41..982b1d5e 100644 --- a/datachecks/core/metric/base.py +++ b/datachecks/core/metric/base.py @@ -14,30 +14,28 @@ import datetime from abc import ABC, abstractmethod -from dataclasses import asdict from enum import Enum -from typing import Dict, List +from typing import Dict, Optional -from datachecks.core.configuration.configuration import MetricConfiguration -from datachecks.core.datasource.base import DataSource, SearchIndexDataSource, SQLDatasource -from datachecks.core.datasource.manager import DataSourceManager +from datachecks.core.datasource.base import (DataSource, SearchIndexDataSource, + SQLDatasource) class MetricsType(str, Enum): - ROW_COUNT = 'row_count' - DOCUMENT_COUNT = 'document_count' + ROW_COUNT = "row_count" + DOCUMENT_COUNT = "document_count" + MAX = "max" class MetricIdentity: - @staticmethod def generate_identity( - metric_type: MetricsType, - metric_name: str, - data_source: DataSource = None, - index_name: str = None, - table_name: str = None, - row_name: str = None + metric_type: MetricsType, + metric_name: str, + data_source: DataSource = None, + index_name: str = None, + table_name: str = None, + field_name: str = None, ): identifiers = [] if data_source: @@ -48,8 +46,8 @@ def generate_identity( identifiers.append(index_name) if table_name: identifiers.append(table_name) - if row_name: - identifiers.append(row_name) + if field_name: + identifiers.append(field_name) return ".".join([str(p) for p in identifiers]) @@ -58,21 +56,48 @@ class Metric(ABC): """ Metric is a class that represents a metric that is generated by a data source. """ + def __init__( - self, - name: str, - data_source: DataSource, - metric_type: MetricsType, - filter: Dict = None + self, + name: str, + data_source: DataSource, + metric_type: MetricsType, + table_name: Optional[str] = None, + index_name: Optional[str] = None, + filter: Dict = None, ): + if index_name is not None and table_name is not None: + raise ValueError( + "Please give a value for table_name or index_name (but not both)" + ) + if index_name is None and table_name is None: + raise ValueError("Please give a value for table_name or index_name") + + if index_name: + self.index_name = index_name + if table_name: + self.table_name = table_name + self.name: str = name self.data_source = data_source self.metric_type = metric_type - self.data_filter = filter - self.metric_identity = MetricIdentity.generate_identity( - metric_type=metric_type, - metric_name=name, - data_source=data_source + self.filter_query = None + if filter is not None: + if "search_query" in filter and "sql_query" in filter: + raise ValueError( + "Please give a value for search_query or sql_query (but not both)" + ) + + if "search_query" in filter: + self.filter_query = filter["search_query"] + elif "sql_query" in filter: + self.filter_query = filter["sql_query"] + + def get_metric_identity(self): + return MetricIdentity.generate_identity( + metric_type=self.metric_type, + metric_name=self.name, + data_source=self.data_source, ) @abstractmethod @@ -81,124 +106,55 @@ def _generate_metric_value(self): def get_value(self): value = { - "identity": self.metric_identity, - "metricName": self.name, - "metricType": self.metric_type.value, - "value": self._generate_metric_value(), - "dataSourceName": self.data_source.data_source_name, - "@timestamp": datetime.datetime.utcnow().isoformat() - } + "identity": self.get_metric_identity(), + "metricName": self.name, + "metricType": self.metric_type.value, + "value": self._generate_metric_value(), + "dataSourceName": self.data_source.data_source_name, + "@timestamp": datetime.datetime.utcnow().isoformat(), + } if "index_name" in self.__dict__: value["index_name"] = self.__dict__["index_name"] elif "table_name" in self.__dict__: value["table_name"] = self.__dict__["table_name"] - if "row_name" in self.__dict__: - value["row_name"] = self.__dict__["row_name"] + if "field_name" in self.__dict__: + value["field_name"] = self.__dict__["field_name"] return value -class RowMetrics(Metric, ABC): - +class FieldMetrics(Metric, ABC): def __init__( - self, - name: str, - data_source: DataSource, - table_name: str, - row_name: str, - metric_type=MetricsType, - filter: Dict = None + self, + name: str, + data_source: DataSource, + table_name: Optional[str], + index_name: Optional[str], + field_name: str, + metric_type=MetricsType, + filter: Dict = None, ): super().__init__( name=name, data_source=data_source, + table_name=table_name, + index_name=index_name, metric_type=metric_type, - filter=filter + filter=filter, ) - self.table_name = table_name - self.row_name = row_name - self.metric_identity = MetricIdentity.generate_identity( - metric_type=MetricsType.DOCUMENT_COUNT, - metric_name=name, - data_source=data_source, - table_name=table_name, - row_name=row_name - ) + self.field_name = field_name - @property - def get_row_name(self): - return self.row_name - - -class DocumentCountMetrics(Metric): - - def __init__( - self, - name: str, - data_source: DataSource, - index_name: str, - filter: Dict = None - ): - super().__init__( - name=name, - data_source=data_source, + def get_metric_identity(self): + return MetricIdentity.generate_identity( metric_type=MetricsType.DOCUMENT_COUNT, - filter=filter + metric_name=self.name, + data_source=self.data_source, + table_name=self.table_name, + field_name=self.field_name, ) - self.index_name = index_name - self.metric_identity = MetricIdentity.generate_identity( - metric_type=MetricsType.DOCUMENT_COUNT, - metric_name=name, - data_source=data_source, - index_name=index_name - ) - - def validate_data_source(self): - return isinstance(self.data_source, SearchIndexDataSource) - - def _generate_metric_value(self): - if isinstance(self.data_source, SearchIndexDataSource): - return self.data_source.query_get_document_count( - index_name=self.index_name, - filter=self.data_filter['search_query'] - ) - else: - raise ValueError("Invalid data source type") - - -class RowCountMetrics(Metric): - def __init__( - self, - name: str, - data_source: DataSource, - table_name: str, - filter: Dict = None - ): - super().__init__( - name=name, - data_source=data_source, - metric_type=MetricsType.ROW_COUNT, - filter=filter - ) - self.table_name = table_name - self.metric_identity = MetricIdentity.generate_identity( - metric_type=MetricsType.ROW_COUNT, - metric_name=name, - data_source=data_source, - table_name=table_name - ) - - def validate_data_source(self): - return isinstance(self.data_source, SQLDatasource) - - def _generate_metric_value(self): - if isinstance(self.data_source, SQLDatasource): - return self.data_source.query_get_row_count( - table=self.table_name, - filter=self.data_filter['sql_query'] - ) - else: - raise ValueError("Invalid data source type") + @property + def get_field_name(self): + return self.field_name diff --git a/datachecks/core/metric/manager.py b/datachecks/core/metric/manager.py index 10858caf..559ed10a 100644 --- a/datachecks/core/metric/manager.py +++ b/datachecks/core/metric/manager.py @@ -1,17 +1,32 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from dataclasses import asdict from typing import Dict, List from datachecks.core.configuration.configuration import MetricConfiguration from datachecks.core.datasource.manager import DataSourceManager -from datachecks.core.metric.base import MetricsType, DocumentCountMetrics, RowCountMetrics +from datachecks.core.metric.base import MetricsType +from datachecks.core.metric.numeric_metric import (DocumentCountMetric, + RowCountMetric) class MetricManager: - def __init__( - self, - metric_config: Dict[str, List[MetricConfiguration]], - data_source_manager: DataSourceManager + self, + metric_config: Dict[str, List[MetricConfiguration]], + data_source_manager: DataSourceManager, ): self.data_source_manager = data_source_manager self.metrics = {} @@ -21,20 +36,23 @@ def _build_metrics(self, config: Dict[str, List[MetricConfiguration]]): for data_source, metric_list in config.items(): for metric_config in metric_list: if metric_config.metric_type == MetricsType.DOCUMENT_COUNT: - - metric = DocumentCountMetrics( + metric = DocumentCountMetric( name=metric_config.name, - data_source=self.data_source_manager.get_data_source(data_source), + data_source=self.data_source_manager.get_data_source( + data_source + ), filter=asdict(metric_config.filter), - index_name=metric_config.index + index_name=metric_config.index, ) self.metrics[metric.metric_identity] = metric elif metric_config.metric_type == MetricsType.ROW_COUNT: - metric = RowCountMetrics( + metric = RowCountMetric( name=metric_config.name, - data_source=self.data_source_manager.get_data_source(data_source), + data_source=self.data_source_manager.get_data_source( + data_source + ), filter=asdict(metric_config.filter), - table_name=metric_config.table + table_name=metric_config.table, ) self.metrics[metric.metric_identity] = metric else: diff --git a/datachecks/core/metric/numeric_metric.py b/datachecks/core/metric/numeric_metric.py new file mode 100644 index 00000000..8286a7a4 --- /dev/null +++ b/datachecks/core/metric/numeric_metric.py @@ -0,0 +1,96 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, Optional + +from datachecks.core.datasource.base import (DataSource, SearchIndexDataSource, + SQLDatasource) +from datachecks.core.metric.base import (FieldMetrics, Metric, MetricIdentity, + MetricsType) + + +class DocumentCountMetric(Metric): + """ + DocumentCountMetrics is a class that represents a metric that is generated by a data source. + """ + + def validate_data_source(self): + return isinstance(self.data_source, SearchIndexDataSource) + + def _generate_metric_value(self): + if isinstance(self.data_source, SearchIndexDataSource): + return self.data_source.query_get_document_count( + index_name=self.index_name, + filter=self.filter_query if self.filter_query else None, + ) + else: + raise ValueError("Invalid data source type") + + +class RowCountMetric(Metric): + + """ + RowCountMetrics is a class that represents a metric that is generated by a data source. + """ + + def get_metric_identity(self): + return MetricIdentity.generate_identity( + metric_type=MetricsType.ROW_COUNT, + metric_name=self.name, + data_source=self.data_source, + table_name=self.table_name, + ) + + def validate_data_source(self): + return isinstance(self.data_source, SQLDatasource) + + def _generate_metric_value(self): + if isinstance(self.data_source, SQLDatasource): + return self.data_source.query_get_row_count( + table=self.table_name, + filter=self.filter_query if self.filter_query else None, + ) + else: + raise ValueError("Invalid data source type") + + +class MaxMetric(FieldMetrics): + + """ + MaxMetric is a class that represents a metric that is generated by a data source. + """ + + def get_metric_identity(self): + return MetricIdentity.generate_identity( + metric_type=MetricsType.MAX, + metric_name=self.name, + data_source=self.data_source, + field_name=self.field_name, + ) + + def _generate_metric_value(self): + if isinstance(self.data_source, SQLDatasource): + return self.data_source.query_get_max( + table=self.table_name, + field=self.field_name, + filter=self.filter_query if self.filter_query else None, + ) + elif isinstance(self.data_source, SearchIndexDataSource): + return self.data_source.query_get_max( + index_name=self.index_name, + field=self.field_name, + filter=self.filter_query if self.filter_query else None, + ) + else: + raise ValueError("Invalid data source type") diff --git a/example/data_generator.py b/example/data_generator.py index 72633e5c..cdaccb0f 100644 --- a/example/data_generator.py +++ b/example/data_generator.py @@ -1,5 +1,19 @@ -from dataclasses import dataclass +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import random +from dataclasses import dataclass from typing import List from opensearchpy import OpenSearch @@ -24,7 +38,7 @@ class TestData: password="changeme", host="127.0.0.1", port=5431, - database="postgres" + database="postgres", ) url_staging = URL.create( drivername="postgresql", @@ -32,15 +46,15 @@ class TestData: password="changeme", host="127.0.0.1", port=5430, - database="postgres" + database="postgres", ) client = OpenSearch( - hosts=[{'host': "127.0.0.1", 'port': 9201}], - http_auth=("admin", "admin"), - use_ssl=True, - verify_certs=False, - ca_certs=False - ) + hosts=[{"host": "127.0.0.1", "port": 9201}], + http_auth=("admin", "admin"), + use_ssl=True, + verify_certs=False, + ca_certs=False, +) engine_content = create_engine(url_content) engine_staging = create_engine(url_staging) @@ -54,13 +68,22 @@ def generate_data_content(number_of_data: int): id=i + 2000, name=f"name_{i}", category=random.choice(CATEGORIES), - is_valid=bool(random.getrandbits(1)) + is_valid=bool(random.getrandbits(1)), ) try: - content_connection \ - .execute(text(""" + content_connection.execute( + text( + """ INSERT INTO table_1 (id, name, category, is_valid) VALUES (:id, :name, :category, :is_valid) - """), {"id": d.id, "name": d.name, "category": d.category, "is_valid": d.is_valid}) + """ + ), + { + "id": d.id, + "name": d.name, + "category": d.category, + "is_valid": d.is_valid, + }, + ) content_connection.commit() except Exception as e: raise e @@ -72,13 +95,22 @@ def generate_data_staging(number_of_data: int): id=i + 2000, name=f"name_{i}", category=random.choice(CATEGORIES), - is_valid=bool(random.getrandbits(1)) + is_valid=bool(random.getrandbits(1)), ) try: - staging_connection \ - .execute(text(""" + staging_connection.execute( + text( + """ INSERT INTO table_2 (id, name, category, is_valid) VALUES (:id, :name, :category, :is_valid) - """), {"id": d.id, "name": d.name, "category": d.category, "is_valid": d.is_valid}) + """ + ), + { + "id": d.id, + "name": d.name, + "category": d.category, + "is_valid": d.is_valid, + }, + ) staging_connection.commit() except Exception as e: print(e) @@ -90,9 +122,12 @@ def generate_open_search(number_of_data: int): id=i, name=f"name_{i}", category=random.choice(CATEGORIES), - is_valid=bool(random.getrandbits(1)) + is_valid=bool(random.getrandbits(1)), + ) + client.index( + index="category_tabel", + body={"name": d.name, "category": d.category, "is_valid": d.is_valid}, ) - client.index(index="category_tabel", body={"name": d.name, "category": d.category, "is_valid": d.is_valid}) if __name__ == "__main__": diff --git a/example/run_datachecks.py b/example/run_datachecks.py index e8803e5e..516be010 100644 --- a/example/run_datachecks.py +++ b/example/run_datachecks.py @@ -1,12 +1,24 @@ -from datachecks import Inspect, load_configuration -from datachecks import Configuration +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import requests +from datachecks import Configuration, Inspect, load_configuration if __name__ == "__main__": - - #Reding Config File - configuration: Configuration = load_configuration('config.yaml') + # Reding Config File + configuration: Configuration = load_configuration("config.yaml") # Generating Metrics metrics = Inspect(configuration).start() @@ -14,4 +26,9 @@ # Sending to ELK for metric in metrics: print(metric) - requests.post('https://localhost:9201/example_indices/_doc', json=metric, auth=("admin", "admin"), verify=False) + requests.post( + "https://localhost:9201/example_indices/_doc", + json=metric, + auth=("admin", "admin"), + verify=False, + ) diff --git a/tests/__init__.py b/tests/__init__.py index e69de29b..95dc1b8e 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/conftest.py b/tests/conftest.py index 5e055f03..810a22c3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,18 +1,38 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import os +from dataclasses import asdict import pytest - from opensearchpy import OpenSearch +from sqlalchemy import Connection, create_engine +from datachecks.core.configuration.configuration import \ + DataSourceConnectionConfiguration +from datachecks.core.datasource.opensearch import \ + OpenSearchSearchIndexDataSource -def is_opensearch__responsive(host, port): + +def is_opensearch_responsive(host, port): try: client = OpenSearch( - hosts=[{'host': host, 'port': port}], + hosts=[{"host": host, "port": port}], http_auth=("admin", "admin"), use_ssl=True, verify_certs=False, - ca_certs=False + ca_certs=False, ) status = client.ping() client.close() @@ -21,6 +41,19 @@ def is_opensearch__responsive(host, port): return status +def is_pgsql_responsive(host, port): + try: + engine = create_engine( + f"postgresql+psycopg2://dbuser:dbpass@{host}:{port}/postgres" + ) + connection = engine.connect() + status = True + connection.close() + except ConnectionError: + status = False + return status + + @pytest.fixture(scope="session") def docker_compose_file(pytestconfig): base_directory = os.path.dirname(os.path.abspath(__file__)).replace("tests", "") @@ -28,15 +61,77 @@ def docker_compose_file(pytestconfig): @pytest.fixture(scope="session") -def opensearch_client(docker_ip, docker_services) -> OpenSearch: +def opensearch_client_config( + docker_ip, docker_services +) -> DataSourceConnectionConfiguration: port = docker_services.port_for("dc-opensearch", 9200) docker_services.wait_until_responsive( - timeout=60.0, pause=20, check=lambda: is_opensearch__responsive(host=docker_ip, port=port) + timeout=60.0, + pause=20, + check=lambda: is_opensearch_responsive(host=docker_ip, port=port), + ) + + return DataSourceConnectionConfiguration( + host=docker_ip, + port=port, + username="admin", + password="admin", + database=None, + schema=None, + ) + + +@pytest.fixture(scope="session") +def pgsql_connection_configuration( + docker_ip, docker_services +) -> DataSourceConnectionConfiguration: + port = docker_services.port_for("dc-postgres", 5432) + docker_services.wait_until_responsive( + timeout=60.0, + pause=20, + check=lambda: is_pgsql_responsive(host=docker_ip, port=port), ) - return OpenSearch( - hosts=[{'host': docker_ip, 'port': port}], - http_auth=("admin", "admin"), + + return DataSourceConnectionConfiguration( + host=docker_ip, + port=port, + username="dbuser", + password="dbpass", + database="postgres", + schema="public", + ) + + +@pytest.mark.usefixtures("opensearch_client_config") +@pytest.fixture(scope="session") +def opensearch_client( + opensearch_client_config: DataSourceConnectionConfiguration, +) -> OpenSearch: + client = OpenSearch( + hosts=[ + { + "host": opensearch_client_config.host, + "port": opensearch_client_config.port, + } + ], + http_auth=( + opensearch_client_config.username, + opensearch_client_config.password, + ), use_ssl=True, verify_certs=False, - ca_certs=False + ca_certs=False, + ) + return client + + +@pytest.mark.usefixtures("opensearch_client_config") +@pytest.fixture(scope="session") +def opensearch_datasource( + opensearch_client_config: DataSourceConnectionConfiguration, +) -> OpenSearchSearchIndexDataSource: + source = OpenSearchSearchIndexDataSource( + data_source_name="opensearch", data_connection=asdict(opensearch_client_config) ) + source.connect() + return source diff --git a/tests/core/__init__.py b/tests/core/__init__.py new file mode 100644 index 00000000..95dc1b8e --- /dev/null +++ b/tests/core/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/core/configuration/__init__.py b/tests/core/configuration/__init__.py new file mode 100644 index 00000000..95dc1b8e --- /dev/null +++ b/tests/core/configuration/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/core/configuration/test_configuration.py b/tests/core/configuration/test_configuration.py new file mode 100644 index 00000000..1efb5159 --- /dev/null +++ b/tests/core/configuration/test_configuration.py @@ -0,0 +1,52 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datachecks.core.configuration.configuration import ( + DatasourceType, load_configuration_from_yaml_str) + + +def test_should_read_datasource_config_for_opensearch(): + yaml_string = """ + data_sources: + - name: "test" + type: "opensearch" + connection: + host: "localhost" + port: 9200 + metrics: + test: + "test_metric": + "metric_type": "document_count" + "index": "test" + """ + configuration = load_configuration_from_yaml_str(yaml_string) + assert configuration.data_sources[0].type == DatasourceType.OPENSEARCH + + +def test_should_read_datasource_config_for_postgres(): + yaml_string = """ + data_sources: + - name: "test" + type: "postgres" + connection: + host: "localhost" + port: 5432 + metrics: + test: + "test_metric": + "metric_type": "row_count" + "table": "test" + """ + configuration = load_configuration_from_yaml_str(yaml_string) + assert configuration.data_sources[0].type == DatasourceType.POSTGRES diff --git a/tests/core/metric/__init__.py b/tests/core/metric/__init__.py new file mode 100644 index 00000000..95dc1b8e --- /dev/null +++ b/tests/core/metric/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/core/metric/test_numeric_metric.py b/tests/core/metric/test_numeric_metric.py new file mode 100644 index 00000000..8f4bd889 --- /dev/null +++ b/tests/core/metric/test_numeric_metric.py @@ -0,0 +1,45 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from opensearchpy import OpenSearch + +from datachecks.core.datasource.opensearch import \ + OpenSearchSearchIndexDataSource +from datachecks.core.metric.base import MetricsType +from datachecks.core.metric.numeric_metric import DocumentCountMetric + + +@pytest.mark.usefixtures("opensearch_client") +@pytest.fixture(scope="module") +def setup_data(opensearch_client: OpenSearch): + opensearch_client.indices.delete(index="test", ignore=[400, 404]) + opensearch_client.indices.create(index="test") + opensearch_client.index(index="test", body={"test": "test"}) + opensearch_client.indices.refresh(index="test") + + +@pytest.mark.usefixtures("opensearch_datasource", "setup_data") +class TestDocumentCountMetric: + def test_document_count_metric( + self, opensearch_datasource: OpenSearchSearchIndexDataSource + ): + doc = DocumentCountMetric( + name="test", + data_source=opensearch_datasource, + index_name="test", + metric_type=MetricsType.DOCUMENT_COUNT, + ) + doc_value = doc.get_value() + assert doc_value["value"] == 1 diff --git a/tests/test_databases_availability.py b/tests/test_databases_availability.py index 26dcb342..ccb4b9d5 100644 --- a/tests/test_databases_availability.py +++ b/tests/test_databases_availability.py @@ -1,5 +1,60 @@ +# Copyright 2022-present, the Waterdip Labs Pvt. Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest from opensearchpy import OpenSearch +from sqlalchemy import create_engine, text +from datachecks.core.configuration.configuration import \ + DataSourceConnectionConfiguration -def test_opensearch_available(opensearch_client: OpenSearch): - assert opensearch_client.ping() + +@pytest.mark.usefixtures("opensearch_client_config") +def test_opensearch_available( + opensearch_client_config: DataSourceConnectionConfiguration, +): + client = OpenSearch( + hosts=[ + { + "host": opensearch_client_config.host, + "port": opensearch_client_config.port, + } + ], + http_auth=( + opensearch_client_config.username, + opensearch_client_config.password, + ), + use_ssl=True, + verify_certs=False, + ca_certs=False, + ) + assert client.ping() + client.close() + + +@pytest.mark.usefixtures("pgsql_connection_configuration") +def test_pgsql_available( + pgsql_connection_configuration: DataSourceConnectionConfiguration, +): + host = pgsql_connection_configuration.host + port = pgsql_connection_configuration.port + user = pgsql_connection_configuration.username + password = pgsql_connection_configuration.password + database = pgsql_connection_configuration.database + engine = create_engine( + f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" + ) + + connect = engine.connect() + assert connect.execute(text("SELECT 1")).fetchone()[0] == 1 + connect.close()