diff --git a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py index 9890fea990c4e..f76d145a87043 100644 --- a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py +++ b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py @@ -12,6 +12,7 @@ from great_expectations.checkpoint.actions import ValidationAction from great_expectations.core.batch import Batch from great_expectations.core.batch_spec import ( + RuntimeDataBatchSpec, RuntimeQueryBatchSpec, SqlAlchemyDatasourceBatchSpec, ) @@ -24,6 +25,7 @@ ExpectationSuiteIdentifier, ValidationResultIdentifier, ) +from great_expectations.execution_engine import PandasExecutionEngine from great_expectations.execution_engine.sqlalchemy_execution_engine import ( SqlAlchemyExecutionEngine, ) @@ -566,10 +568,12 @@ def get_dataset_partitions(self, batch_identifier, data_asset): logger.debug("Finding datasets being validated") - # for now, we support only v3-api and sqlalchemy execution engine - if isinstance(data_asset, Validator) and isinstance( - data_asset.execution_engine, SqlAlchemyExecutionEngine - ): + # for now, we support only v3-api and sqlalchemy execution engine and Pandas engine + is_sql_alchemy = isinstance(data_asset, Validator) and ( + isinstance(data_asset.execution_engine, SqlAlchemyExecutionEngine) + ) + is_pandas = isinstance(data_asset.execution_engine, PandasExecutionEngine) + if is_sql_alchemy or is_pandas: ge_batch_spec = data_asset.active_batch_spec partitionSpec = None batchSpecProperties = { @@ -581,10 +585,14 @@ def get_dataset_partitions(self, batch_identifier, data_asset): ), } sqlalchemy_uri = None - if isinstance(data_asset.execution_engine.engine, Engine): + if is_sql_alchemy and isinstance( + data_asset.execution_engine.engine, Engine + ): sqlalchemy_uri = data_asset.execution_engine.engine.url # For snowflake sqlalchemy_execution_engine.engine is actually instance of Connection - elif isinstance(data_asset.execution_engine.engine, Connection): + elif is_sql_alchemy and isinstance( + data_asset.execution_engine.engine, Connection + ): sqlalchemy_uri = data_asset.execution_engine.engine.engine.url if isinstance(ge_batch_spec, SqlAlchemyDatasourceBatchSpec): @@ -680,6 +688,30 @@ def get_dataset_partitions(self, batch_identifier, data_asset): "batchSpec": batchSpec, } ) + elif isinstance(ge_batch_spec, RuntimeDataBatchSpec): + data_platform = self.get_platform_instance( + data_asset.active_batch_definition.datasource_name + ) + dataset_urn = builder.make_dataset_urn_with_platform_instance( + platform=data_platform + if self.platform_alias is None + else self.platform_alias, + name=data_asset.active_batch_definition.datasource_name, + platform_instance="", + env=self.env, + ) + batchSpec = BatchSpec( + nativeBatchId=batch_identifier, + query="", + customProperties=batchSpecProperties, + ) + dataset_partitions.append( + { + "dataset_urn": dataset_urn, + "partitionSpec": partitionSpec, + "batchSpec": batchSpec, + } + ) else: warn( "DataHubValidationAction does not recognize this GE batch spec type- {batch_spec_type}.".format( diff --git a/metadata-ingestion/tests/unit/test_great_expectations_action.py b/metadata-ingestion/tests/unit/test_great_expectations_action.py index 0675c28547c9e..2e23949d29689 100644 --- a/metadata-ingestion/tests/unit/test_great_expectations_action.py +++ b/metadata-ingestion/tests/unit/test_great_expectations_action.py @@ -2,9 +2,13 @@ from datetime import datetime, timezone from unittest import mock +import pandas as pd import pytest from great_expectations.core.batch import Batch, BatchDefinition, BatchRequest -from great_expectations.core.batch_spec import SqlAlchemyDatasourceBatchSpec +from great_expectations.core.batch_spec import ( + RuntimeDataBatchSpec, + SqlAlchemyDatasourceBatchSpec, +) from great_expectations.core.expectation_validation_result import ( ExpectationSuiteValidationResult, ) @@ -21,6 +25,9 @@ from great_expectations.execution_engine.pandas_execution_engine import ( PandasExecutionEngine, ) +from great_expectations.execution_engine.sparkdf_execution_engine import ( + SparkDFExecutionEngine, +) from great_expectations.execution_engine.sqlalchemy_execution_engine import ( SqlAlchemyExecutionEngine, ) @@ -87,12 +94,80 @@ def ge_validator_sqlalchemy() -> Validator: return validator +@pytest.fixture(scope="function") +def ge_validator_spark() -> Validator: + validator = Validator(execution_engine=SparkDFExecutionEngine()) + return validator + + @pytest.fixture(scope="function") def ge_validator_pandas() -> Validator: - validator = Validator(execution_engine=PandasExecutionEngine()) + validator = Validator( + execution_engine=PandasExecutionEngine(), + batches=[ + Batch( + data=pd.DataFrame({"foo": [10, 20], "bar": [100, 200]}), + batch_request=BatchRequest( + datasource_name="my_df_datasource", + data_connector_name="pandas_df", + data_asset_name="foobar", + ), + batch_definition=BatchDefinition( + datasource_name="my_df_datasource", + data_connector_name="pandas_df", + data_asset_name="foobar", + batch_identifiers=IDDict(), + ), + batch_spec=RuntimeDataBatchSpec( + { + "data_asset_name": "foobar", + "batch_identifiers": {}, + "batch_data": {}, + "type": "pandas_dataframe", + } + ), + ) + ], + ) return validator +@pytest.fixture(scope="function") +def ge_validation_result_suite_pandas() -> ExpectationSuiteValidationResult: + validation_result_suite = ExpectationSuiteValidationResult( + results=[ + { + "success": True, + "result": {}, + "expectation_config": { + "expectation_type": "expect_column_values_to_not_be_null", + "kwargs": { + "column": "column", + "batch_id": "010ef8c1cd417910b971f4468f024ec6", + }, + "meta": {}, + }, + } + ], + success=True, + statistics={ + "evaluated_expectations": 1, + "successful_expectations": 1, + "unsuccessful_expectations": 0, + "success_percent": 100, + }, + meta={ + "great_expectations_version": "v0.13.40", + "expectation_suite_name": "asset.default", + "run_id": { + "run_name": "test_200", + }, + "validation_time": "20211228T130000.000000Z", + }, + ) + return validation_result_suite + + @pytest.fixture(scope="function") def ge_validation_result_suite() -> ExpectationSuiteValidationResult: validation_result_suite = ExpectationSuiteValidationResult( @@ -144,8 +219,22 @@ def ge_validation_result_suite_id() -> ValidationResultIdentifier: return validation_result_suite_id +@pytest.fixture(scope="function") +def ge_validation_result_suite_id_pandas() -> ValidationResultIdentifier: + validation_result_suite_id = ValidationResultIdentifier( + expectation_suite_identifier=ExpectationSuiteIdentifier("asset.default"), + run_id=RunIdentifier( + run_name="test_200", + run_time=datetime.fromtimestamp(1640701702, tz=timezone.utc), + ), + batch_identifier="010ef8c1cd417910b971f4468f024ec6", + ) + + return validation_result_suite_id + + @mock.patch("datahub.emitter.rest_emitter.DatahubRestEmitter.emit_mcp", autospec=True) -def test_DataHubValidationAction_basic( +def test_DataHubValidationAction_sqlalchemy( mock_emitter: mock.MagicMock, ge_data_context: DataContext, ge_validator_sqlalchemy: Validator, @@ -248,6 +337,70 @@ def test_DataHubValidationAction_basic( ) +@mock.patch("datahub.emitter.rest_emitter.DatahubRestEmitter.emit_mcp", autospec=True) +def test_DataHubValidationAction_pandas( + mock_emitter: mock.MagicMock, + ge_data_context: DataContext, + ge_validator_pandas: Validator, + ge_validation_result_suite_pandas: ExpectationSuiteValidationResult, + ge_validation_result_suite_id_pandas: ValidationResultIdentifier, +) -> None: + server_url = "http://localhost:9999" + + datahub_action = DataHubValidationAction( + data_context=ge_data_context, + server_url=server_url, + platform_instance_map={"my_df_datasource": "custom_platefrom"}, + ) + + assert datahub_action.run( + validation_result_suite_identifier=ge_validation_result_suite_id_pandas, + validation_result_suite=ge_validation_result_suite_pandas, + data_asset=ge_validator_pandas, + ) == {"datahub_notification_result": "DataHub notification succeeded"} + + mock_emitter.assert_has_calls( + [ + mock.call( + mock.ANY, + MetadataChangeProposalWrapper( + entityType="assertion", + changeType="UPSERT", + entityUrn="urn:li:assertion:7e04bcc3b85897d6d3fef6c998db6b05", + aspectName="assertionInfo", + aspect=AssertionInfoClass( + customProperties={"expectation_suite_name": "asset.default"}, + type="DATASET", + datasetAssertion=DatasetAssertionInfoClass( + dataset="urn:li:dataset:(urn:li:dataPlatform:custom_platefrom,my_df_datasource,PROD)", + scope=DatasetAssertionScopeClass.DATASET_COLUMN, + operator="NOT_NULL", + fields=[ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:custom_platefrom,my_df_datasource,PROD),column)" + ], + aggregation="IDENTITY", + nativeType="expect_column_values_to_not_be_null", + nativeParameters={"column": "column"}, + ), + ), + ), + ), + mock.call( + mock.ANY, + MetadataChangeProposalWrapper( + entityType="assertion", + changeType="UPSERT", + entityUrn="urn:li:assertion:7e04bcc3b85897d6d3fef6c998db6b05", + aspectName="dataPlatformInstance", + aspect=DataPlatformInstanceClass( + platform="urn:li:dataPlatform:great-expectations" + ), + ), + ), + ] + ) + + def test_DataHubValidationAction_graceful_failure( ge_data_context: DataContext, ge_validator_sqlalchemy: Validator, @@ -269,7 +422,7 @@ def test_DataHubValidationAction_graceful_failure( def test_DataHubValidationAction_not_supported( ge_data_context: DataContext, - ge_validator_pandas: Validator, + ge_validator_spark: Validator, ge_validation_result_suite: ExpectationSuiteValidationResult, ge_validation_result_suite_id: ValidationResultIdentifier, ) -> None: @@ -282,5 +435,5 @@ def test_DataHubValidationAction_not_supported( assert datahub_action.run( validation_result_suite_identifier=ge_validation_result_suite_id, validation_result_suite=ge_validation_result_suite, - data_asset=ge_validator_pandas, + data_asset=ge_validator_spark, ) == {"datahub_notification_result": "none required"}