diff --git a/smoke-test/tests/assertions/assertions_test.py b/smoke-test/tests/assertions/assertions_test.py index 4aa64c512f684..48f3564e6cd97 100644 --- a/smoke-test/tests/assertions/assertions_test.py +++ b/smoke-test/tests/assertions/assertions_test.py @@ -2,28 +2,29 @@ import urllib import pytest -import requests_wrapper as requests import tenacity from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext, RecordEnvelope from datahub.ingestion.api.sink import NoopWriteCallback from datahub.ingestion.sink.file import FileSink, FileSinkConfig -from datahub.metadata.com.linkedin.pegasus2avro.assertion import AssertionStdAggregation -from datahub.metadata.schema_classes import ( - AssertionInfoClass, - AssertionResultClass, - AssertionResultTypeClass, - AssertionRunEventClass, - AssertionRunStatusClass, - AssertionStdOperatorClass, - AssertionTypeClass, - DatasetAssertionInfoClass, - DatasetAssertionScopeClass, - PartitionSpecClass, - PartitionTypeClass, -) -from tests.utils import delete_urns_from_file, get_gms_url, ingest_file_via_rest, wait_for_healthcheck_util, get_sleep_info +from datahub.metadata.com.linkedin.pegasus2avro.assertion import \ + AssertionStdAggregation +from datahub.metadata.schema_classes import (AssertionInfoClass, + AssertionResultClass, + AssertionResultTypeClass, + AssertionRunEventClass, + AssertionRunStatusClass, + AssertionStdOperatorClass, + AssertionTypeClass, + DatasetAssertionInfoClass, + DatasetAssertionScopeClass, + PartitionSpecClass, + PartitionTypeClass) + +import requests_wrapper as requests +from tests.utils import (delete_urns_from_file, get_gms_url, get_sleep_info, + ingest_file_via_rest, wait_for_healthcheck_util) restli_default_headers = { "X-RestLi-Protocol-Version": "2.0.0", diff --git a/smoke-test/tests/browse/browse_test.py b/smoke-test/tests/browse/browse_test.py index b9d2143d13ec7..550f0062d5a39 100644 --- a/smoke-test/tests/browse/browse_test.py +++ b/smoke-test/tests/browse/browse_test.py @@ -1,9 +1,10 @@ import time import pytest -import requests_wrapper as requests -from tests.utils import delete_urns_from_file, get_frontend_url, ingest_file_via_rest +import requests_wrapper as requests +from tests.utils import (delete_urns_from_file, get_frontend_url, + ingest_file_via_rest) TEST_DATASET_1_URN = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-browse-1,PROD)" TEST_DATASET_2_URN = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-browse-2,PROD)" @@ -51,7 +52,9 @@ def test_get_browse_paths(frontend_session, ingest_cleanup_data): # /prod -- There should be one entity get_browse_paths_json = { "query": get_browse_paths_query, - "variables": {"input": { "type": "DATASET", "path": ["prod"], "start": 0, "count": 100 } }, + "variables": { + "input": {"type": "DATASET", "path": ["prod"], "start": 0, "count": 100} + }, } response = frontend_session.post( @@ -67,12 +70,19 @@ def test_get_browse_paths(frontend_session, ingest_cleanup_data): browse = res_data["data"]["browse"] print(browse) - assert browse["entities"] == [{ "urn": TEST_DATASET_3_URN }] + assert browse["entities"] == [{"urn": TEST_DATASET_3_URN}] # /prod/kafka1 get_browse_paths_json = { "query": get_browse_paths_query, - "variables": {"input": { "type": "DATASET", "path": ["prod", "kafka1"], "start": 0, "count": 10 } }, + "variables": { + "input": { + "type": "DATASET", + "path": ["prod", "kafka1"], + "start": 0, + "count": 10, + } + }, } response = frontend_session.post( @@ -88,16 +98,27 @@ def test_get_browse_paths(frontend_session, ingest_cleanup_data): browse = res_data["data"]["browse"] assert browse == { - "total": 3, - "entities": [{ "urn": TEST_DATASET_1_URN }, { "urn": TEST_DATASET_2_URN }, { "urn": TEST_DATASET_3_URN }], - "groups": [], - "metadata": { "path": ["prod", "kafka1"], "totalNumEntities": 0 } + "total": 3, + "entities": [ + {"urn": TEST_DATASET_1_URN}, + {"urn": TEST_DATASET_2_URN}, + {"urn": TEST_DATASET_3_URN}, + ], + "groups": [], + "metadata": {"path": ["prod", "kafka1"], "totalNumEntities": 0}, } # /prod/kafka2 get_browse_paths_json = { "query": get_browse_paths_query, - "variables": {"input": { "type": "DATASET", "path": ["prod", "kafka2"], "start": 0, "count": 10 } }, + "variables": { + "input": { + "type": "DATASET", + "path": ["prod", "kafka2"], + "start": 0, + "count": 10, + } + }, } response = frontend_session.post( @@ -113,10 +134,8 @@ def test_get_browse_paths(frontend_session, ingest_cleanup_data): browse = res_data["data"]["browse"] assert browse == { - "total": 2, - "entities": [{ "urn": TEST_DATASET_1_URN }, { "urn": TEST_DATASET_2_URN }], - "groups": [], - "metadata": { "path": ["prod", "kafka2"], "totalNumEntities": 0 } + "total": 2, + "entities": [{"urn": TEST_DATASET_1_URN}, {"urn": TEST_DATASET_2_URN}], + "groups": [], + "metadata": {"path": ["prod", "kafka2"], "totalNumEntities": 0}, } - - diff --git a/smoke-test/tests/cli/datahub-cli.py b/smoke-test/tests/cli/datahub-cli.py index 1d0080bdd9d48..c3db6028efceb 100644 --- a/smoke-test/tests/cli/datahub-cli.py +++ b/smoke-test/tests/cli/datahub-cli.py @@ -1,8 +1,11 @@ import json -import pytest from time import sleep -from datahub.cli.cli_utils import guess_entity_type, post_entity, get_aspects_for_entity + +import pytest +from datahub.cli.cli_utils import (get_aspects_for_entity, guess_entity_type, + post_entity) from datahub.cli.ingest_cli import get_session_and_host, rollback + from tests.utils import ingest_file_via_rest, wait_for_writes_to_sync ingested_dataset_run_id = "" @@ -24,24 +27,46 @@ def test_setup(): session, gms_host = get_session_and_host() - assert "browsePaths" not in get_aspects_for_entity(entity_urn=dataset_urn, aspects=["browsePaths"], typed=False) - assert "editableDatasetProperties" not in get_aspects_for_entity(entity_urn=dataset_urn, aspects=["editableDatasetProperties"], typed=False) + assert "browsePaths" not in get_aspects_for_entity( + entity_urn=dataset_urn, aspects=["browsePaths"], typed=False + ) + assert "editableDatasetProperties" not in get_aspects_for_entity( + entity_urn=dataset_urn, aspects=["editableDatasetProperties"], typed=False + ) - ingested_dataset_run_id = ingest_file_via_rest("tests/cli/cli_test_data.json").config.run_id + ingested_dataset_run_id = ingest_file_via_rest( + "tests/cli/cli_test_data.json" + ).config.run_id print("Setup ingestion id: " + ingested_dataset_run_id) - assert "browsePaths" in get_aspects_for_entity(entity_urn=dataset_urn, aspects=["browsePaths"], typed=False) + assert "browsePaths" in get_aspects_for_entity( + entity_urn=dataset_urn, aspects=["browsePaths"], typed=False + ) yield # Clean up rollback_url = f"{gms_host}/runs?action=rollback" - session.post(rollback_url, data=json.dumps({"runId": ingested_editable_run_id, "dryRun": False, "hardDelete": True})) - session.post(rollback_url, data=json.dumps({"runId": ingested_dataset_run_id, "dryRun": False, "hardDelete": True})) + session.post( + rollback_url, + data=json.dumps( + {"runId": ingested_editable_run_id, "dryRun": False, "hardDelete": True} + ), + ) + session.post( + rollback_url, + data=json.dumps( + {"runId": ingested_dataset_run_id, "dryRun": False, "hardDelete": True} + ), + ) - assert "browsePaths" not in get_aspects_for_entity(entity_urn=dataset_urn, aspects=["browsePaths"], typed=False) - assert "editableDatasetProperties" not in get_aspects_for_entity(entity_urn=dataset_urn, aspects=["editableDatasetProperties"], typed=False) + assert "browsePaths" not in get_aspects_for_entity( + entity_urn=dataset_urn, aspects=["browsePaths"], typed=False + ) + assert "editableDatasetProperties" not in get_aspects_for_entity( + entity_urn=dataset_urn, aspects=["editableDatasetProperties"], typed=False + ) @pytest.mark.dependency() @@ -49,9 +74,7 @@ def test_rollback_editable(): global ingested_dataset_run_id global ingested_editable_run_id platform = "urn:li:dataPlatform:kafka" - dataset_name = ( - "test-rollback" - ) + dataset_name = "test-rollback" env = "PROD" dataset_urn = f"urn:li:dataset:({platform},{dataset_name},{env})" @@ -59,23 +82,38 @@ def test_rollback_editable(): print("Ingested dataset id:", ingested_dataset_run_id) # Assert that second data ingestion worked - assert "browsePaths" in get_aspects_for_entity(entity_urn=dataset_urn, aspects=["browsePaths"], typed=False) + assert "browsePaths" in get_aspects_for_entity( + entity_urn=dataset_urn, aspects=["browsePaths"], typed=False + ) # Make editable change - ingested_editable_run_id = ingest_file_via_rest("tests/cli/cli_editable_test_data.json").config.run_id + ingested_editable_run_id = ingest_file_via_rest( + "tests/cli/cli_editable_test_data.json" + ).config.run_id print("ingested editable id:", ingested_editable_run_id) # Assert that second data ingestion worked - assert "editableDatasetProperties" in get_aspects_for_entity(entity_urn=dataset_urn, aspects=["editableDatasetProperties"], typed=False) + assert "editableDatasetProperties" in get_aspects_for_entity( + entity_urn=dataset_urn, aspects=["editableDatasetProperties"], typed=False + ) # rollback ingestion 1 rollback_url = f"{gms_host}/runs?action=rollback" - session.post(rollback_url, data=json.dumps({"runId": ingested_dataset_run_id, "dryRun": False, "hardDelete": False})) + session.post( + rollback_url, + data=json.dumps( + {"runId": ingested_dataset_run_id, "dryRun": False, "hardDelete": False} + ), + ) # Allow async MCP processor to handle ingestions & rollbacks wait_for_writes_to_sync() # EditableDatasetProperties should still be part of the entity that was soft deleted. - assert "editableDatasetProperties" in get_aspects_for_entity(entity_urn=dataset_urn, aspects=["editableDatasetProperties"], typed=False) + assert "editableDatasetProperties" in get_aspects_for_entity( + entity_urn=dataset_urn, aspects=["editableDatasetProperties"], typed=False + ) # But first ingestion aspects should not be present - assert "browsePaths" not in get_aspects_for_entity(entity_urn=dataset_urn, typed=False) + assert "browsePaths" not in get_aspects_for_entity( + entity_urn=dataset_urn, typed=False + ) diff --git a/smoke-test/tests/cli/datahub_graph_test.py b/smoke-test/tests/cli/datahub_graph_test.py index 16925d26f6983..17c8924fb0998 100644 --- a/smoke-test/tests/cli/datahub_graph_test.py +++ b/smoke-test/tests/cli/datahub_graph_test.py @@ -1,13 +1,11 @@ import pytest import tenacity from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph -from datahub.metadata.schema_classes import KafkaSchemaClass, SchemaMetadataClass -from tests.utils import ( - delete_urns_from_file, - get_gms_url, - get_sleep_info, - ingest_file_via_rest, -) +from datahub.metadata.schema_classes import (KafkaSchemaClass, + SchemaMetadataClass) + +from tests.utils import (delete_urns_from_file, get_gms_url, get_sleep_info, + ingest_file_via_rest) sleep_sec, sleep_times = get_sleep_info() diff --git a/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py b/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py index 4288a61b7a0c1..106da7cd8d71e 100644 --- a/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py +++ b/smoke-test/tests/cli/delete_cmd/test_timeseries_delete.py @@ -1,21 +1,22 @@ import json import logging +import sys import tempfile import time -import sys from json import JSONDecodeError from typing import Any, Dict, List, Optional -from click.testing import CliRunner, Result - import datahub.emitter.mce_builder as builder +from click.testing import CliRunner, Result from datahub.emitter.serialization_helper import pre_json_transform from datahub.entrypoints import datahub from datahub.metadata.schema_classes import DatasetProfileClass + +import requests_wrapper as requests from tests.aspect_generators.timeseries.dataset_profile_gen import \ gen_dataset_profiles -from tests.utils import get_strftime_from_timestamp_millis, wait_for_writes_to_sync -import requests_wrapper as requests +from tests.utils import (get_strftime_from_timestamp_millis, + wait_for_writes_to_sync) logger = logging.getLogger(__name__) @@ -33,6 +34,7 @@ def sync_elastic() -> None: wait_for_writes_to_sync() + def datahub_put_profile(dataset_profile: DatasetProfileClass) -> None: with tempfile.NamedTemporaryFile("w+t", suffix=".json") as aspect_file: aspect_text: str = json.dumps(pre_json_transform(dataset_profile.to_obj())) diff --git a/smoke-test/tests/cli/ingest_cmd/test_timeseries_rollback.py b/smoke-test/tests/cli/ingest_cmd/test_timeseries_rollback.py index 61e7a5a65b494..e962b1a5cafd6 100644 --- a/smoke-test/tests/cli/ingest_cmd/test_timeseries_rollback.py +++ b/smoke-test/tests/cli/ingest_cmd/test_timeseries_rollback.py @@ -2,14 +2,14 @@ import time from typing import Any, Dict, List, Optional -from click.testing import CliRunner, Result - import datahub.emitter.mce_builder as builder +from click.testing import CliRunner, Result from datahub.emitter.serialization_helper import post_json_transform from datahub.entrypoints import datahub from datahub.metadata.schema_classes import DatasetProfileClass -from tests.utils import ingest_file_via_rest, wait_for_writes_to_sync + import requests_wrapper as requests +from tests.utils import ingest_file_via_rest, wait_for_writes_to_sync runner = CliRunner(mix_stderr=False) diff --git a/smoke-test/tests/cli/user_groups_cmd/test_group_cmd.py b/smoke-test/tests/cli/user_groups_cmd/test_group_cmd.py index 405e061c016f9..7b986d3be0444 100644 --- a/smoke-test/tests/cli/user_groups_cmd/test_group_cmd.py +++ b/smoke-test/tests/cli/user_groups_cmd/test_group_cmd.py @@ -1,6 +1,7 @@ import json import sys import tempfile +import time from typing import Any, Dict, Iterable, List import yaml @@ -8,7 +9,7 @@ from datahub.api.entities.corpgroup.corpgroup import CorpGroup from datahub.entrypoints import datahub from datahub.ingestion.graph.client import DataHubGraph, get_default_graph -import time + import requests_wrapper as requests from tests.utils import wait_for_writes_to_sync diff --git a/smoke-test/tests/conftest.py b/smoke-test/tests/conftest.py index eed7a983197ef..57b92a2db1c19 100644 --- a/smoke-test/tests/conftest.py +++ b/smoke-test/tests/conftest.py @@ -2,8 +2,8 @@ import pytest -from tests.utils import wait_for_healthcheck_util, get_frontend_session from tests.test_result_msg import send_message +from tests.utils import get_frontend_session, wait_for_healthcheck_util # Disable telemetry os.environ["DATAHUB_TELEMETRY_ENABLED"] = "false" @@ -28,5 +28,5 @@ def test_healthchecks(wait_for_healthchecks): def pytest_sessionfinish(session, exitstatus): - """ whole test run finishes. """ + """whole test run finishes.""" send_message(exitstatus) diff --git a/smoke-test/tests/consistency_utils.py b/smoke-test/tests/consistency_utils.py index 15993733c592b..607835bf3649c 100644 --- a/smoke-test/tests/consistency_utils.py +++ b/smoke-test/tests/consistency_utils.py @@ -1,10 +1,16 @@ -import time +import logging import os import subprocess +import time _ELASTIC_BUFFER_WRITES_TIME_IN_SEC: int = 1 USE_STATIC_SLEEP: bool = bool(os.getenv("USE_STATIC_SLEEP", False)) -ELASTICSEARCH_REFRESH_INTERVAL_SECONDS: int = int(os.getenv("ELASTICSEARCH_REFRESH_INTERVAL_SECONDS", 5)) +ELASTICSEARCH_REFRESH_INTERVAL_SECONDS: int = int( + os.getenv("ELASTICSEARCH_REFRESH_INTERVAL_SECONDS", 5) +) + +logger = logging.getLogger(__name__) + def wait_for_writes_to_sync(max_timeout_in_sec: int = 120) -> None: if USE_STATIC_SLEEP: @@ -30,7 +36,9 @@ def wait_for_writes_to_sync(max_timeout_in_sec: int = 120) -> None: lag_zero = True if not lag_zero: - logger.warning(f"Exiting early from waiting for elastic to catch up due to a timeout. Current lag is {lag_values}") + logger.warning( + f"Exiting early from waiting for elastic to catch up due to a timeout. Current lag is {lag_values}" + ) else: # we want to sleep for an additional period of time for Elastic writes buffer to clear - time.sleep(_ELASTIC_BUFFER_WRITES_TIME_IN_SEC) \ No newline at end of file + time.sleep(_ELASTIC_BUFFER_WRITES_TIME_IN_SEC) diff --git a/smoke-test/tests/containers/containers_test.py b/smoke-test/tests/containers/containers_test.py index 575e3def6cf23..05a45239dabf8 100644 --- a/smoke-test/tests/containers/containers_test.py +++ b/smoke-test/tests/containers/containers_test.py @@ -1,5 +1,7 @@ import pytest -from tests.utils import delete_urns_from_file, get_frontend_url, ingest_file_via_rest + +from tests.utils import (delete_urns_from_file, get_frontend_url, + ingest_file_via_rest) @pytest.fixture(scope="module", autouse=False) diff --git a/smoke-test/tests/cypress/integration_test.py b/smoke-test/tests/cypress/integration_test.py index b3bacf39ac7ae..4ad2bc53fa87d 100644 --- a/smoke-test/tests/cypress/integration_test.py +++ b/smoke-test/tests/cypress/integration_test.py @@ -1,18 +1,16 @@ -from typing import Set, List - import datetime -import pytest -import subprocess import os +import subprocess +from typing import List, Set + +import pytest + +from tests.setup.lineage.ingest_time_lineage import (get_time_lineage_urns, + ingest_time_lineage) +from tests.utils import (create_datahub_step_state_aspects, delete_urns, + delete_urns_from_file, get_admin_username, + ingest_file_via_rest) -from tests.utils import ( - create_datahub_step_state_aspects, - get_admin_username, - ingest_file_via_rest, - delete_urns_from_file, - delete_urns, -) -from tests.setup.lineage.ingest_time_lineage import ingest_time_lineage, get_time_lineage_urns CYPRESS_TEST_DATA_DIR = "tests/cypress" TEST_DATA_FILENAME = "data.json" @@ -145,7 +143,6 @@ def ingest_cleanup_data(): delete_urns_from_file(f"{CYPRESS_TEST_DATA_DIR}/{TEST_ONBOARDING_DATA_FILENAME}") delete_urns(get_time_lineage_urns()) - print_now() print("deleting onboarding data file") if os.path.exists(f"{CYPRESS_TEST_DATA_DIR}/{TEST_ONBOARDING_DATA_FILENAME}"): diff --git a/smoke-test/tests/dataproduct/test_dataproduct.py b/smoke-test/tests/dataproduct/test_dataproduct.py index db198098f21fa..baef1cb1cb3ba 100644 --- a/smoke-test/tests/dataproduct/test_dataproduct.py +++ b/smoke-test/tests/dataproduct/test_dataproduct.py @@ -1,4 +1,6 @@ +import logging import os +import subprocess import tempfile import time from random import randint @@ -17,8 +19,6 @@ DomainPropertiesClass, DomainsClass) from datahub.utilities.urns.urn import Urn -import subprocess -import logging logger = logging.getLogger(__name__) diff --git a/smoke-test/tests/delete/delete_test.py b/smoke-test/tests/delete/delete_test.py index 68e001f983fbf..d920faaf3a89a 100644 --- a/smoke-test/tests/delete/delete_test.py +++ b/smoke-test/tests/delete/delete_test.py @@ -1,16 +1,14 @@ -import os import json -import pytest +import os from time import sleep + +import pytest from datahub.cli.cli_utils import get_aspects_for_entity from datahub.cli.ingest_cli import get_session_and_host -from tests.utils import ( - ingest_file_via_rest, - wait_for_healthcheck_util, - delete_urns_from_file, - wait_for_writes_to_sync, - get_datahub_graph, -) + +from tests.utils import (delete_urns_from_file, get_datahub_graph, + ingest_file_via_rest, wait_for_healthcheck_util, + wait_for_writes_to_sync) # Disable telemetry os.environ["DATAHUB_TELEMETRY_ENABLED"] = "false" @@ -102,7 +100,7 @@ def test_delete_reference(test_setup, depends=["test_healthchecks"]): graph.delete_references_to_urn(tag_urn, dry_run=False) wait_for_writes_to_sync() - + # Validate that references no longer exist references_count, related_aspects = graph.delete_references_to_urn( tag_urn, dry_run=True diff --git a/smoke-test/tests/deprecation/deprecation_test.py b/smoke-test/tests/deprecation/deprecation_test.py index 1149a970aa8e5..a8969804d03d7 100644 --- a/smoke-test/tests/deprecation/deprecation_test.py +++ b/smoke-test/tests/deprecation/deprecation_test.py @@ -1,10 +1,7 @@ import pytest -from tests.utils import ( - delete_urns_from_file, - get_frontend_url, - ingest_file_via_rest, - get_root_urn, -) + +from tests.utils import (delete_urns_from_file, get_frontend_url, get_root_urn, + ingest_file_via_rest) @pytest.fixture(scope="module", autouse=True) diff --git a/smoke-test/tests/domains/domains_test.py b/smoke-test/tests/domains/domains_test.py index 7ffe1682cafd8..fa8c918e3cbe1 100644 --- a/smoke-test/tests/domains/domains_test.py +++ b/smoke-test/tests/domains/domains_test.py @@ -1,12 +1,8 @@ import pytest import tenacity -from tests.utils import ( - delete_urns_from_file, - get_frontend_url, - get_gms_url, - ingest_file_via_rest, - get_sleep_info, -) + +from tests.utils import (delete_urns_from_file, get_frontend_url, get_gms_url, + get_sleep_info, ingest_file_via_rest) sleep_sec, sleep_times = get_sleep_info() @@ -240,4 +236,7 @@ def test_set_unset_domain(frontend_session, ingest_cleanup_data): assert res_data assert res_data["data"]["dataset"]["domain"]["domain"]["urn"] == domain_urn - assert res_data["data"]["dataset"]["domain"]["domain"]["properties"]["name"] == "Engineering" + assert ( + res_data["data"]["dataset"]["domain"]["domain"]["properties"]["name"] + == "Engineering" + ) diff --git a/smoke-test/tests/managed-ingestion/managed_ingestion_test.py b/smoke-test/tests/managed-ingestion/managed_ingestion_test.py index 1238a1dd5730a..b5e408731334e 100644 --- a/smoke-test/tests/managed-ingestion/managed_ingestion_test.py +++ b/smoke-test/tests/managed-ingestion/managed_ingestion_test.py @@ -3,7 +3,8 @@ import pytest import tenacity -from tests.utils import get_frontend_url, get_sleep_info, wait_for_healthcheck_util +from tests.utils import (get_frontend_url, get_sleep_info, + wait_for_healthcheck_util) sleep_sec, sleep_times = get_sleep_info() diff --git a/smoke-test/tests/patch/common_patch_tests.py b/smoke-test/tests/patch/common_patch_tests.py index 574e4fd4e4c88..f1d6abf5da794 100644 --- a/smoke-test/tests/patch/common_patch_tests.py +++ b/smoke-test/tests/patch/common_patch_tests.py @@ -2,25 +2,17 @@ import uuid from typing import Dict, Optional, Type -from datahub.emitter.mce_builder import ( - make_tag_urn, - make_term_urn, - make_user_urn, -) +from datahub.emitter.mce_builder import (make_tag_urn, make_term_urn, + make_user_urn) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_patch_builder import MetadataPatchProposal from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig -from datahub.metadata.schema_classes import ( - AuditStampClass, - GlobalTagsClass, - GlossaryTermAssociationClass, - GlossaryTermsClass, - OwnerClass, - OwnershipClass, - OwnershipTypeClass, - TagAssociationClass, - _Aspect, -) +from datahub.metadata.schema_classes import (AuditStampClass, GlobalTagsClass, + GlossaryTermAssociationClass, + GlossaryTermsClass, OwnerClass, + OwnershipClass, + OwnershipTypeClass, + TagAssociationClass, _Aspect) def helper_test_entity_terms_patch( @@ -34,18 +26,14 @@ def get_terms(graph, entity_urn): term_urn = make_term_urn(term=f"testTerm-{uuid.uuid4()}") - term_association = GlossaryTermAssociationClass( - urn=term_urn, context="test" - ) + term_association = GlossaryTermAssociationClass(urn=term_urn, context="test") global_terms = GlossaryTermsClass( terms=[term_association], auditStamp=AuditStampClass( time=int(time.time() * 1000.0), actor=make_user_urn("tester") ), ) - mcpw = MetadataChangeProposalWrapper( - entityUrn=test_entity_urn, aspect=global_terms - ) + mcpw = MetadataChangeProposalWrapper(entityUrn=test_entity_urn, aspect=global_terms) with DataHubGraph(DataHubGraphConfig()) as graph: graph.emit_mcp(mcpw) @@ -88,9 +76,7 @@ def helper_test_dataset_tags_patch( tag_association = TagAssociationClass(tag=tag_urn, context="test") global_tags = GlobalTagsClass(tags=[tag_association]) - mcpw = MetadataChangeProposalWrapper( - entityUrn=test_entity_urn, aspect=global_tags - ) + mcpw = MetadataChangeProposalWrapper(entityUrn=test_entity_urn, aspect=global_tags) with DataHubGraph(DataHubGraphConfig()) as graph: graph.emit_mcp(mcpw) @@ -153,15 +139,11 @@ def helper_test_ownership_patch( assert owner.owners[0].owner == make_user_urn("jdoe") for patch_mcp in ( - patch_builder_class(test_entity_urn) - .add_owner(owner_to_add) - .build() + patch_builder_class(test_entity_urn).add_owner(owner_to_add).build() ): graph.emit_mcp(patch_mcp) - owner = graph.get_aspect( - entity_urn=test_entity_urn, aspect_type=OwnershipClass - ) + owner = graph.get_aspect(entity_urn=test_entity_urn, aspect_type=OwnershipClass) assert len(owner.owners) == 2 for patch_mcp in ( @@ -171,9 +153,7 @@ def helper_test_ownership_patch( ): graph.emit_mcp(patch_mcp) - owner = graph.get_aspect( - entity_urn=test_entity_urn, aspect_type=OwnershipClass - ) + owner = graph.get_aspect(entity_urn=test_entity_urn, aspect_type=OwnershipClass) assert len(owner.owners) == 1 assert owner.owners[0].owner == make_user_urn("jdoe") @@ -199,9 +179,7 @@ def get_custom_properties( orig_aspect = base_aspect assert hasattr(orig_aspect, "customProperties") orig_aspect.customProperties = base_property_map - mcpw = MetadataChangeProposalWrapper( - entityUrn=test_entity_urn, aspect=orig_aspect - ) + mcpw = MetadataChangeProposalWrapper(entityUrn=test_entity_urn, aspect=orig_aspect) with DataHubGraph(DataHubGraphConfig()) as graph: graph.emit(mcpw) diff --git a/smoke-test/tests/patch/test_datajob_patches.py b/smoke-test/tests/patch/test_datajob_patches.py index 407410ee89914..342d5d683228a 100644 --- a/smoke-test/tests/patch/test_datajob_patches.py +++ b/smoke-test/tests/patch/test_datajob_patches.py @@ -3,19 +3,14 @@ from datahub.emitter.mce_builder import make_data_job_urn, make_dataset_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig -from datahub.metadata.schema_classes import ( - DataJobInfoClass, - DataJobInputOutputClass, - EdgeClass, -) +from datahub.metadata.schema_classes import (DataJobInfoClass, + DataJobInputOutputClass, + EdgeClass) from datahub.specific.datajob import DataJobPatchBuilder from tests.patch.common_patch_tests import ( - helper_test_custom_properties_patch, - helper_test_dataset_tags_patch, - helper_test_entity_terms_patch, - helper_test_ownership_patch, -) + helper_test_custom_properties_patch, helper_test_dataset_tags_patch, + helper_test_entity_terms_patch, helper_test_ownership_patch) def _make_test_datajob_urn( @@ -37,16 +32,12 @@ def test_datajob_ownership_patch(wait_for_healthchecks): # Tags def test_datajob_tags_patch(wait_for_healthchecks): - helper_test_dataset_tags_patch( - _make_test_datajob_urn(), DataJobPatchBuilder - ) + helper_test_dataset_tags_patch(_make_test_datajob_urn(), DataJobPatchBuilder) # Terms def test_dataset_terms_patch(wait_for_healthchecks): - helper_test_entity_terms_patch( - _make_test_datajob_urn(), DataJobPatchBuilder - ) + helper_test_entity_terms_patch(_make_test_datajob_urn(), DataJobPatchBuilder) # Custom Properties diff --git a/smoke-test/tests/patch/test_dataset_patches.py b/smoke-test/tests/patch/test_dataset_patches.py index 239aab64675d8..6704d19760fb9 100644 --- a/smoke-test/tests/patch/test_dataset_patches.py +++ b/smoke-test/tests/patch/test_dataset_patches.py @@ -20,7 +20,10 @@ UpstreamClass, UpstreamLineageClass) from datahub.specific.dataset import DatasetPatchBuilder -from tests.patch.common_patch_tests import helper_test_entity_terms_patch, helper_test_dataset_tags_patch, helper_test_ownership_patch, helper_test_custom_properties_patch + +from tests.patch.common_patch_tests import ( + helper_test_custom_properties_patch, helper_test_dataset_tags_patch, + helper_test_entity_terms_patch, helper_test_ownership_patch) # Common Aspect Patch Tests @@ -31,6 +34,7 @@ def test_dataset_ownership_patch(wait_for_healthchecks): ) helper_test_ownership_patch(dataset_urn, DatasetPatchBuilder) + # Tags def test_dataset_tags_patch(wait_for_healthchecks): dataset_urn = make_dataset_urn( @@ -38,6 +42,7 @@ def test_dataset_tags_patch(wait_for_healthchecks): ) helper_test_dataset_tags_patch(dataset_urn, DatasetPatchBuilder) + # Terms def test_dataset_terms_patch(wait_for_healthchecks): dataset_urn = make_dataset_urn( @@ -284,8 +289,15 @@ def test_custom_properties_patch(wait_for_healthchecks): dataset_urn = make_dataset_urn( platform="hive", name=f"SampleHiveDataset-{uuid.uuid4()}", env="PROD" ) - orig_dataset_properties = DatasetPropertiesClass(name="test_name", description="test_description") - helper_test_custom_properties_patch(test_entity_urn=dataset_urn, patch_builder_class=DatasetPatchBuilder, custom_properties_aspect_class=DatasetPropertiesClass, base_aspect=orig_dataset_properties) + orig_dataset_properties = DatasetPropertiesClass( + name="test_name", description="test_description" + ) + helper_test_custom_properties_patch( + test_entity_urn=dataset_urn, + patch_builder_class=DatasetPatchBuilder, + custom_properties_aspect_class=DatasetPropertiesClass, + base_aspect=orig_dataset_properties, + ) with DataHubGraph(DataHubGraphConfig()) as graph: # Patch custom properties along with name diff --git a/smoke-test/tests/policies/test_policies.py b/smoke-test/tests/policies/test_policies.py index b7091541894dd..67142181d2b96 100644 --- a/smoke-test/tests/policies/test_policies.py +++ b/smoke-test/tests/policies/test_policies.py @@ -1,12 +1,8 @@ import pytest import tenacity -from tests.utils import ( - get_frontend_url, - wait_for_healthcheck_util, - get_frontend_session, - get_sleep_info, - get_root_urn, -) + +from tests.utils import (get_frontend_session, get_frontend_url, get_root_urn, + get_sleep_info, wait_for_healthcheck_util) TEST_POLICY_NAME = "Updated Platform Policy" diff --git a/smoke-test/tests/setup/lineage/helper_classes.py b/smoke-test/tests/setup/lineage/helper_classes.py index 53f77b08d15ed..d550f3093be85 100644 --- a/smoke-test/tests/setup/lineage/helper_classes.py +++ b/smoke-test/tests/setup/lineage/helper_classes.py @@ -1,10 +1,7 @@ from dataclasses import dataclass from typing import Any, Dict, List, Optional -from datahub.metadata.schema_classes import ( - EdgeClass, - SchemaFieldDataTypeClass, -) +from datahub.metadata.schema_classes import EdgeClass, SchemaFieldDataTypeClass @dataclass diff --git a/smoke-test/tests/setup/lineage/ingest_data_job_change.py b/smoke-test/tests/setup/lineage/ingest_data_job_change.py index 8e3e9c5352922..588a1625419bc 100644 --- a/smoke-test/tests/setup/lineage/ingest_data_job_change.py +++ b/smoke-test/tests/setup/lineage/ingest_data_job_change.py @@ -1,36 +1,20 @@ from typing import List -from datahub.emitter.mce_builder import ( - make_dataset_urn, - make_data_flow_urn, - make_data_job_urn_with_flow, -) +from datahub.emitter.mce_builder import (make_data_flow_urn, + make_data_job_urn_with_flow, + make_dataset_urn) from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.metadata.schema_classes import ( - DateTypeClass, - NumberTypeClass, - SchemaFieldDataTypeClass, - StringTypeClass, -) +from datahub.metadata.schema_classes import (DateTypeClass, NumberTypeClass, + SchemaFieldDataTypeClass, + StringTypeClass) -from tests.setup.lineage.constants import ( - AIRFLOW_DATA_PLATFORM, - SNOWFLAKE_DATA_PLATFORM, - TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, - TIMESTAMP_MILLIS_ONE_DAY_AGO, -) -from tests.setup.lineage.helper_classes import ( - Field, - Dataset, - Task, - Pipeline, -) -from tests.setup.lineage.utils import ( - create_edge, - create_node, - create_nodes_and_edges, - emit_mcps, -) +from tests.setup.lineage.constants import (AIRFLOW_DATA_PLATFORM, + SNOWFLAKE_DATA_PLATFORM, + TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + TIMESTAMP_MILLIS_ONE_DAY_AGO) +from tests.setup.lineage.helper_classes import Dataset, Field, Pipeline, Task +from tests.setup.lineage.utils import (create_edge, create_node, + create_nodes_and_edges, emit_mcps) # Constants for Case 2 DAILY_TEMPERATURE_DATASET_ID = "climate.daily_temperature" diff --git a/smoke-test/tests/setup/lineage/ingest_dataset_join_change.py b/smoke-test/tests/setup/lineage/ingest_dataset_join_change.py index 35a8e6d5cf02e..bb9f51b6b5e9b 100644 --- a/smoke-test/tests/setup/lineage/ingest_dataset_join_change.py +++ b/smoke-test/tests/setup/lineage/ingest_dataset_join_change.py @@ -1,32 +1,18 @@ from typing import List -from datahub.emitter.mce_builder import ( - make_dataset_urn, -) +from datahub.emitter.mce_builder import make_dataset_urn from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.metadata.schema_classes import ( - NumberTypeClass, - SchemaFieldDataTypeClass, - StringTypeClass, - UpstreamClass, -) +from datahub.metadata.schema_classes import (NumberTypeClass, + SchemaFieldDataTypeClass, + StringTypeClass, UpstreamClass) -from tests.setup.lineage.constants import ( - DATASET_ENTITY_TYPE, - SNOWFLAKE_DATA_PLATFORM, - TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, - TIMESTAMP_MILLIS_ONE_DAY_AGO, -) -from tests.setup.lineage.helper_classes import ( - Field, - Dataset, -) -from tests.setup.lineage.utils import ( - create_node, - create_upstream_edge, - create_upstream_mcp, - emit_mcps, -) +from tests.setup.lineage.constants import (DATASET_ENTITY_TYPE, + SNOWFLAKE_DATA_PLATFORM, + TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + TIMESTAMP_MILLIS_ONE_DAY_AGO) +from tests.setup.lineage.helper_classes import Dataset, Field +from tests.setup.lineage.utils import (create_node, create_upstream_edge, + create_upstream_mcp, emit_mcps) # Constants for Case 3 GDP_DATASET_ID = "economic_data.gdp" diff --git a/smoke-test/tests/setup/lineage/ingest_input_datasets_change.py b/smoke-test/tests/setup/lineage/ingest_input_datasets_change.py index f4fb795147478..6079d7a3d2b63 100644 --- a/smoke-test/tests/setup/lineage/ingest_input_datasets_change.py +++ b/smoke-test/tests/setup/lineage/ingest_input_datasets_change.py @@ -1,36 +1,20 @@ from typing import List -from datahub.emitter.mce_builder import ( - make_dataset_urn, - make_data_flow_urn, - make_data_job_urn_with_flow, -) +from datahub.emitter.mce_builder import (make_data_flow_urn, + make_data_job_urn_with_flow, + make_dataset_urn) from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.metadata.schema_classes import ( - NumberTypeClass, - SchemaFieldDataTypeClass, - StringTypeClass, -) - -from tests.setup.lineage.constants import ( - AIRFLOW_DATA_PLATFORM, - BQ_DATA_PLATFORM, - TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, - TIMESTAMP_MILLIS_ONE_DAY_AGO, -) -from tests.setup.lineage.helper_classes import ( - Field, - Dataset, - Task, - Pipeline, -) -from tests.setup.lineage.utils import ( - create_edge, - create_node, - create_nodes_and_edges, - emit_mcps, -) +from datahub.metadata.schema_classes import (NumberTypeClass, + SchemaFieldDataTypeClass, + StringTypeClass) +from tests.setup.lineage.constants import (AIRFLOW_DATA_PLATFORM, + BQ_DATA_PLATFORM, + TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + TIMESTAMP_MILLIS_ONE_DAY_AGO) +from tests.setup.lineage.helper_classes import Dataset, Field, Pipeline, Task +from tests.setup.lineage.utils import (create_edge, create_node, + create_nodes_and_edges, emit_mcps) # Constants for Case 1 TRANSACTIONS_DATASET_ID = "transactions.transactions" diff --git a/smoke-test/tests/setup/lineage/ingest_time_lineage.py b/smoke-test/tests/setup/lineage/ingest_time_lineage.py index cae8e0124d501..3aec979707290 100644 --- a/smoke-test/tests/setup/lineage/ingest_time_lineage.py +++ b/smoke-test/tests/setup/lineage/ingest_time_lineage.py @@ -1,12 +1,14 @@ +import os from typing import List from datahub.emitter.rest_emitter import DatahubRestEmitter -from tests.setup.lineage.ingest_input_datasets_change import ingest_input_datasets_change, get_input_datasets_change_urns -from tests.setup.lineage.ingest_data_job_change import ingest_data_job_change, get_data_job_change_urns -from tests.setup.lineage.ingest_dataset_join_change import ingest_dataset_join_change, get_dataset_join_change_urns - -import os +from tests.setup.lineage.ingest_data_job_change import ( + get_data_job_change_urns, ingest_data_job_change) +from tests.setup.lineage.ingest_dataset_join_change import ( + get_dataset_join_change_urns, ingest_dataset_join_change) +from tests.setup.lineage.ingest_input_datasets_change import ( + get_input_datasets_change_urns, ingest_input_datasets_change) SERVER = os.getenv("DATAHUB_SERVER") or "http://localhost:8080" TOKEN = os.getenv("DATAHUB_TOKEN") or "" @@ -20,4 +22,8 @@ def ingest_time_lineage() -> None: def get_time_lineage_urns() -> List[str]: - return get_input_datasets_change_urns() + get_data_job_change_urns() + get_dataset_join_change_urns() + return ( + get_input_datasets_change_urns() + + get_data_job_change_urns() + + get_dataset_join_change_urns() + ) diff --git a/smoke-test/tests/setup/lineage/utils.py b/smoke-test/tests/setup/lineage/utils.py index 672f7a945a6af..c72f6ccb89b7a 100644 --- a/smoke-test/tests/setup/lineage/utils.py +++ b/smoke-test/tests/setup/lineage/utils.py @@ -1,41 +1,30 @@ import datetime -from datahub.emitter.mce_builder import ( - make_data_platform_urn, - make_dataset_urn, - make_data_job_urn_with_flow, - make_data_flow_urn, -) +from typing import List + +from datahub.emitter.mce_builder import (make_data_flow_urn, + make_data_job_urn_with_flow, + make_data_platform_urn, + make_dataset_urn) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.metadata.com.linkedin.pegasus2avro.dataset import UpstreamLineage -from datahub.metadata.schema_classes import ( - AuditStampClass, - ChangeTypeClass, - DatasetLineageTypeClass, - DatasetPropertiesClass, - DataFlowInfoClass, - DataJobInputOutputClass, - DataJobInfoClass, - EdgeClass, - MySqlDDLClass, - SchemaFieldClass, - SchemaMetadataClass, - UpstreamClass, -) -from typing import List - -from tests.setup.lineage.constants import ( - DATASET_ENTITY_TYPE, - DATA_JOB_ENTITY_TYPE, - DATA_FLOW_ENTITY_TYPE, - DATA_FLOW_INFO_ASPECT_NAME, - DATA_JOB_INFO_ASPECT_NAME, - DATA_JOB_INPUT_OUTPUT_ASPECT_NAME, -) -from tests.setup.lineage.helper_classes import ( - Dataset, - Pipeline, -) +from datahub.metadata.schema_classes import (AuditStampClass, ChangeTypeClass, + DataFlowInfoClass, + DataJobInfoClass, + DataJobInputOutputClass, + DatasetLineageTypeClass, + DatasetPropertiesClass, EdgeClass, + MySqlDDLClass, SchemaFieldClass, + SchemaMetadataClass, + UpstreamClass) + +from tests.setup.lineage.constants import (DATA_FLOW_ENTITY_TYPE, + DATA_FLOW_INFO_ASPECT_NAME, + DATA_JOB_ENTITY_TYPE, + DATA_JOB_INFO_ASPECT_NAME, + DATA_JOB_INPUT_OUTPUT_ASPECT_NAME, + DATASET_ENTITY_TYPE) +from tests.setup.lineage.helper_classes import Dataset, Pipeline def create_node(dataset: Dataset) -> List[MetadataChangeProposalWrapper]: @@ -85,10 +74,10 @@ def create_node(dataset: Dataset) -> List[MetadataChangeProposalWrapper]: def create_edge( - source_urn: str, - destination_urn: str, - created_timestamp_millis: int, - updated_timestamp_millis: int, + source_urn: str, + destination_urn: str, + created_timestamp_millis: int, + updated_timestamp_millis: int, ) -> EdgeClass: created_audit_stamp: AuditStampClass = AuditStampClass( time=created_timestamp_millis, actor="urn:li:corpuser:unknown" @@ -105,7 +94,7 @@ def create_edge( def create_nodes_and_edges( - airflow_dag: Pipeline, + airflow_dag: Pipeline, ) -> List[MetadataChangeProposalWrapper]: mcps = [] data_flow_urn = make_data_flow_urn( @@ -160,9 +149,9 @@ def create_nodes_and_edges( def create_upstream_edge( - upstream_entity_urn: str, - created_timestamp_millis: int, - updated_timestamp_millis: int, + upstream_entity_urn: str, + created_timestamp_millis: int, + updated_timestamp_millis: int, ): created_audit_stamp: AuditStampClass = AuditStampClass( time=created_timestamp_millis, actor="urn:li:corpuser:unknown" @@ -180,11 +169,11 @@ def create_upstream_edge( def create_upstream_mcp( - entity_type: str, - entity_urn: str, - upstreams: List[UpstreamClass], - timestamp_millis: int, - run_id: str = "", + entity_type: str, + entity_urn: str, + upstreams: List[UpstreamClass], + timestamp_millis: int, + run_id: str = "", ) -> MetadataChangeProposalWrapper: print(f"Creating upstreamLineage aspect for {entity_urn}") timestamp_millis: int = int(datetime.datetime.now().timestamp() * 1000) @@ -203,7 +192,7 @@ def create_upstream_mcp( def emit_mcps( - emitter: DatahubRestEmitter, mcps: List[MetadataChangeProposalWrapper] + emitter: DatahubRestEmitter, mcps: List[MetadataChangeProposalWrapper] ) -> None: for mcp in mcps: emitter.emit_mcp(mcp) diff --git a/smoke-test/tests/tags-and-terms/tags_and_terms_test.py b/smoke-test/tests/tags-and-terms/tags_and_terms_test.py index b0ca29b544cfe..6ac75765286f0 100644 --- a/smoke-test/tests/tags-and-terms/tags_and_terms_test.py +++ b/smoke-test/tests/tags-and-terms/tags_and_terms_test.py @@ -1,5 +1,7 @@ import pytest -from tests.utils import delete_urns_from_file, get_frontend_url, ingest_file_via_rest, wait_for_healthcheck_util + +from tests.utils import (delete_urns_from_file, get_frontend_url, + ingest_file_via_rest, wait_for_healthcheck_util) @pytest.fixture(scope="module", autouse=True) diff --git a/smoke-test/tests/telemetry/telemetry_test.py b/smoke-test/tests/telemetry/telemetry_test.py index 3672abcda948d..3127061c9f506 100644 --- a/smoke-test/tests/telemetry/telemetry_test.py +++ b/smoke-test/tests/telemetry/telemetry_test.py @@ -7,5 +7,7 @@ def test_no_clientID(): client_id_urn = "urn:li:telemetry:clientId" aspect = ["telemetryClientId"] - res_data = json.dumps(get_aspects_for_entity(entity_urn=client_id_urn, aspects=aspect, typed=False)) + res_data = json.dumps( + get_aspects_for_entity(entity_urn=client_id_urn, aspects=aspect, typed=False) + ) assert res_data == "{}" diff --git a/smoke-test/tests/test_result_msg.py b/smoke-test/tests/test_result_msg.py index e3b336db9d66c..b9775e8ee4acd 100644 --- a/smoke-test/tests/test_result_msg.py +++ b/smoke-test/tests/test_result_msg.py @@ -1,6 +1,6 @@ -from slack_sdk import WebClient import os +from slack_sdk import WebClient datahub_stats = {} @@ -10,10 +10,10 @@ def add_datahub_stats(stat_name, stat_val): def send_to_slack(passed: str): - slack_api_token = os.getenv('SLACK_API_TOKEN') - slack_channel = os.getenv('SLACK_CHANNEL') - slack_thread_ts = os.getenv('SLACK_THREAD_TS') - test_identifier = os.getenv('TEST_IDENTIFIER', 'LOCAL_TEST') + slack_api_token = os.getenv("SLACK_API_TOKEN") + slack_channel = os.getenv("SLACK_CHANNEL") + slack_thread_ts = os.getenv("SLACK_THREAD_TS") + test_identifier = os.getenv("TEST_IDENTIFIER", "LOCAL_TEST") if slack_api_token is None or slack_channel is None: return client = WebClient(token=slack_api_token) @@ -26,14 +26,21 @@ def send_to_slack(passed: str): message += f"Num {entity_type} is {val}\n" if slack_thread_ts is None: - client.chat_postMessage(channel=slack_channel, text=f'{test_identifier} Status - {passed}\n{message}') + client.chat_postMessage( + channel=slack_channel, + text=f"{test_identifier} Status - {passed}\n{message}", + ) else: - client.chat_postMessage(channel=slack_channel, text=f'{test_identifier} Status - {passed}\n{message}', thread_ts=slack_thread_ts) + client.chat_postMessage( + channel=slack_channel, + text=f"{test_identifier} Status - {passed}\n{message}", + thread_ts=slack_thread_ts, + ) def send_message(exitstatus): try: - send_to_slack('PASSED' if exitstatus == 0 else 'FAILED') + send_to_slack("PASSED" if exitstatus == 0 else "FAILED") except Exception as e: # We don't want to fail pytest at all print(f"Exception happened for sending msg to slack {e}") diff --git a/smoke-test/tests/test_stateful_ingestion.py b/smoke-test/tests/test_stateful_ingestion.py index a10cf13a08029..c6adb402e5d51 100644 --- a/smoke-test/tests/test_stateful_ingestion.py +++ b/smoke-test/tests/test_stateful_ingestion.py @@ -4,17 +4,15 @@ from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.sql.mysql import MySQLConfig, MySQLSource from datahub.ingestion.source.state.checkpoint import Checkpoint -from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState -from datahub.ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalHandler +from datahub.ingestion.source.state.entity_removal_state import \ + GenericCheckpointState +from datahub.ingestion.source.state.stale_entity_removal_handler import \ + StaleEntityRemovalHandler from sqlalchemy import create_engine from sqlalchemy.sql import text -from tests.utils import ( - get_gms_url, - get_mysql_password, - get_mysql_url, - get_mysql_username, -) +from tests.utils import (get_gms_url, get_mysql_password, get_mysql_url, + get_mysql_username) def test_stateful_ingestion(wait_for_healthchecks): diff --git a/smoke-test/tests/tests/tests_test.py b/smoke-test/tests/tests/tests_test.py index 0b87f90a92c58..213a2ea087b7a 100644 --- a/smoke-test/tests/tests/tests_test.py +++ b/smoke-test/tests/tests/tests_test.py @@ -1,9 +1,13 @@ import pytest import tenacity -from tests.utils import delete_urns_from_file, get_frontend_url, ingest_file_via_rest, wait_for_healthcheck_util, get_sleep_info + +from tests.utils import (delete_urns_from_file, get_frontend_url, + get_sleep_info, ingest_file_via_rest, + wait_for_healthcheck_util) sleep_sec, sleep_times = get_sleep_info() + @pytest.fixture(scope="module", autouse=True) def ingest_cleanup_data(request): print("ingesting test data") @@ -18,6 +22,7 @@ def wait_for_healthchecks(): wait_for_healthcheck_util() yield + @pytest.mark.dependency() def test_healthchecks(wait_for_healthchecks): # Call to wait_for_healthchecks fixture will do the actual functionality. diff --git a/smoke-test/tests/timeline/timeline_test.py b/smoke-test/tests/timeline/timeline_test.py index a73d585c6c72d..4705343c1a2ba 100644 --- a/smoke-test/tests/timeline/timeline_test.py +++ b/smoke-test/tests/timeline/timeline_test.py @@ -3,14 +3,14 @@ from datahub.cli import timeline_cli from datahub.cli.cli_utils import guess_entity_type, post_entity -from tests.utils import ingest_file_via_rest, wait_for_writes_to_sync, get_datahub_graph + +from tests.utils import (get_datahub_graph, ingest_file_via_rest, + wait_for_writes_to_sync) def test_all(): platform = "urn:li:dataPlatform:kafka" - dataset_name = ( - "test-timeline-sample-kafka" - ) + dataset_name = "test-timeline-sample-kafka" env = "PROD" dataset_urn = f"urn:li:dataset:({platform},{dataset_name},{env})" @@ -18,8 +18,13 @@ def test_all(): ingest_file_via_rest("tests/timeline/timeline_test_datav2.json") ingest_file_via_rest("tests/timeline/timeline_test_datav3.json") - res_data = timeline_cli.get_timeline(dataset_urn, ["TAG", "DOCUMENTATION", "TECHNICAL_SCHEMA", "GLOSSARY_TERM", - "OWNER"], None, None, False) + res_data = timeline_cli.get_timeline( + dataset_urn, + ["TAG", "DOCUMENTATION", "TECHNICAL_SCHEMA", "GLOSSARY_TERM", "OWNER"], + None, + None, + False, + ) get_datahub_graph().hard_delete_entity(urn=dataset_urn) assert res_data @@ -35,9 +40,7 @@ def test_all(): def test_schema(): platform = "urn:li:dataPlatform:kafka" - dataset_name = ( - "test-timeline-sample-kafka" - ) + dataset_name = "test-timeline-sample-kafka" env = "PROD" dataset_urn = f"urn:li:dataset:({platform},{dataset_name},{env})" @@ -45,7 +48,9 @@ def test_schema(): put(dataset_urn, "schemaMetadata", "test_resources/timeline/newschemav2.json") put(dataset_urn, "schemaMetadata", "test_resources/timeline/newschemav3.json") - res_data = timeline_cli.get_timeline(dataset_urn, ["TECHNICAL_SCHEMA"], None, None, False) + res_data = timeline_cli.get_timeline( + dataset_urn, ["TECHNICAL_SCHEMA"], None, None, False + ) get_datahub_graph().hard_delete_entity(urn=dataset_urn) assert res_data @@ -61,9 +66,7 @@ def test_schema(): def test_glossary(): platform = "urn:li:dataPlatform:kafka" - dataset_name = ( - "test-timeline-sample-kafka" - ) + dataset_name = "test-timeline-sample-kafka" env = "PROD" dataset_urn = f"urn:li:dataset:({platform},{dataset_name},{env})" @@ -71,7 +74,9 @@ def test_glossary(): put(dataset_urn, "glossaryTerms", "test_resources/timeline/newglossaryv2.json") put(dataset_urn, "glossaryTerms", "test_resources/timeline/newglossaryv3.json") - res_data = timeline_cli.get_timeline(dataset_urn, ["GLOSSARY_TERM"], None, None, False) + res_data = timeline_cli.get_timeline( + dataset_urn, ["GLOSSARY_TERM"], None, None, False + ) get_datahub_graph().hard_delete_entity(urn=dataset_urn) assert res_data @@ -87,17 +92,29 @@ def test_glossary(): def test_documentation(): platform = "urn:li:dataPlatform:kafka" - dataset_name = ( - "test-timeline-sample-kafka" - ) + dataset_name = "test-timeline-sample-kafka" env = "PROD" dataset_urn = f"urn:li:dataset:({platform},{dataset_name},{env})" - put(dataset_urn, "institutionalMemory", "test_resources/timeline/newdocumentation.json") - put(dataset_urn, "institutionalMemory", "test_resources/timeline/newdocumentationv2.json") - put(dataset_urn, "institutionalMemory", "test_resources/timeline/newdocumentationv3.json") + put( + dataset_urn, + "institutionalMemory", + "test_resources/timeline/newdocumentation.json", + ) + put( + dataset_urn, + "institutionalMemory", + "test_resources/timeline/newdocumentationv2.json", + ) + put( + dataset_urn, + "institutionalMemory", + "test_resources/timeline/newdocumentationv3.json", + ) - res_data = timeline_cli.get_timeline(dataset_urn, ["DOCUMENTATION"], None, None, False) + res_data = timeline_cli.get_timeline( + dataset_urn, ["DOCUMENTATION"], None, None, False + ) get_datahub_graph().hard_delete_entity(urn=dataset_urn) assert res_data @@ -113,9 +130,7 @@ def test_documentation(): def test_tags(): platform = "urn:li:dataPlatform:kafka" - dataset_name = ( - "test-timeline-sample-kafka" - ) + dataset_name = "test-timeline-sample-kafka" env = "PROD" dataset_urn = f"urn:li:dataset:({platform},{dataset_name},{env})" @@ -139,9 +154,7 @@ def test_tags(): def test_ownership(): platform = "urn:li:dataPlatform:kafka" - dataset_name = ( - "test-timeline-sample-kafka" - ) + dataset_name = "test-timeline-sample-kafka" env = "PROD" dataset_urn = f"urn:li:dataset:({platform},{dataset_name},{env})" diff --git a/smoke-test/tests/tokens/revokable_access_token_test.py b/smoke-test/tests/tokens/revokable_access_token_test.py index b10ad3aa3fc2a..55f3de594af4e 100644 --- a/smoke-test/tests/tokens/revokable_access_token_test.py +++ b/smoke-test/tests/tokens/revokable_access_token_test.py @@ -1,15 +1,11 @@ import os -import pytest -import requests from time import sleep -from tests.utils import ( - get_frontend_url, - wait_for_healthcheck_util, - get_admin_credentials, - wait_for_writes_to_sync, -) +import pytest +import requests +from tests.utils import (get_admin_credentials, get_frontend_url, + wait_for_healthcheck_util, wait_for_writes_to_sync) # Disable telemetry os.environ["DATAHUB_TELEMETRY_ENABLED"] = "false" diff --git a/smoke-test/tests/utils.py b/smoke-test/tests/utils.py index af03efd4f71f8..bd75b13d1910f 100644 --- a/smoke-test/tests/utils.py +++ b/smoke-test/tests/utils.py @@ -1,19 +1,20 @@ import functools import json +import logging import os -from datetime import datetime, timedelta, timezone import subprocess import time -from typing import Any, Dict, List, Tuple +from datetime import datetime, timedelta, timezone from time import sleep -from joblib import Parallel, delayed +from typing import Any, Dict, List, Tuple -import requests_wrapper as requests -import logging from datahub.cli import cli_utils from datahub.cli.cli_utils import get_system_auth -from datahub.ingestion.graph.client import DataHubGraph, DatahubClientConfig +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph from datahub.ingestion.run.pipeline import Pipeline +from joblib import Parallel, delayed + +import requests_wrapper as requests from tests.consistency_utils import wait_for_writes_to_sync TIME: int = 1581407189000 @@ -174,6 +175,7 @@ def delete(entry): wait_for_writes_to_sync() + # Fixed now value NOW: datetime = datetime.now() @@ -232,6 +234,3 @@ def create_datahub_step_state_aspects( ] with open(onboarding_filename, "w") as f: json.dump(aspects_dict, f, indent=2) - - - diff --git a/smoke-test/tests/views/views_test.py b/smoke-test/tests/views/views_test.py index 4da69750a167b..685c3bd80b04d 100644 --- a/smoke-test/tests/views/views_test.py +++ b/smoke-test/tests/views/views_test.py @@ -1,16 +1,14 @@ -import pytest import time + +import pytest import tenacity -from tests.utils import ( - delete_urns_from_file, - get_frontend_url, - get_gms_url, - ingest_file_via_rest, - get_sleep_info, -) + +from tests.utils import (delete_urns_from_file, get_frontend_url, get_gms_url, + get_sleep_info, ingest_file_via_rest) sleep_sec, sleep_times = get_sleep_info() + @pytest.mark.dependency() def test_healthchecks(wait_for_healthchecks): # Call to wait_for_healthchecks fixture will do the actual functionality. @@ -40,6 +38,7 @@ def _ensure_more_views(frontend_session, list_views_json, query_name, before_cou assert after_count == before_count + 1 return after_count + @tenacity.retry( stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec) ) @@ -111,18 +110,18 @@ def test_create_list_delete_global_view(frontend_session): new_view_name = "Test View" new_view_description = "Test Description" new_view_definition = { - "entityTypes": ["DATASET", "DASHBOARD"], - "filter": { - "operator": "AND", - "filters": [ - { - "field": "tags", - "values": ["urn:li:tag:test"], - "negated": False, - "condition": "EQUAL" - } - ] - } + "entityTypes": ["DATASET", "DASHBOARD"], + "filter": { + "operator": "AND", + "filters": [ + { + "field": "tags", + "values": ["urn:li:tag:test"], + "negated": False, + "condition": "EQUAL", + } + ], + }, } # Create new View @@ -137,7 +136,7 @@ def test_create_list_delete_global_view(frontend_session): "viewType": "GLOBAL", "name": new_view_name, "description": new_view_description, - "definition": new_view_definition + "definition": new_view_definition, } }, } @@ -169,9 +168,7 @@ def test_create_list_delete_global_view(frontend_session): "query": """mutation deleteView($urn: String!) {\n deleteView(urn: $urn) }""", - "variables": { - "urn": view_urn - }, + "variables": {"urn": view_urn}, } response = frontend_session.post( @@ -189,7 +186,9 @@ def test_create_list_delete_global_view(frontend_session): ) -@pytest.mark.dependency(depends=["test_healthchecks", "test_create_list_delete_global_view"]) +@pytest.mark.dependency( + depends=["test_healthchecks", "test_create_list_delete_global_view"] +) def test_create_list_delete_personal_view(frontend_session): # Get count of existing views @@ -237,18 +236,18 @@ def test_create_list_delete_personal_view(frontend_session): new_view_name = "Test View" new_view_description = "Test Description" new_view_definition = { - "entityTypes": ["DATASET", "DASHBOARD"], - "filter": { - "operator": "AND", - "filters": [ - { - "field": "tags", - "values": ["urn:li:tag:test"], - "negated": False, - "condition": "EQUAL" - } - ] - } + "entityTypes": ["DATASET", "DASHBOARD"], + "filter": { + "operator": "AND", + "filters": [ + { + "field": "tags", + "values": ["urn:li:tag:test"], + "negated": False, + "condition": "EQUAL", + } + ], + }, } # Create new View @@ -263,7 +262,7 @@ def test_create_list_delete_personal_view(frontend_session): "viewType": "PERSONAL", "name": new_view_name, "description": new_view_description, - "definition": new_view_definition + "definition": new_view_definition, } }, } @@ -293,9 +292,7 @@ def test_create_list_delete_personal_view(frontend_session): "query": """mutation deleteView($urn: String!) {\n deleteView(urn: $urn) }""", - "variables": { - "urn": view_urn - }, + "variables": {"urn": view_urn}, } response = frontend_session.post( @@ -312,25 +309,28 @@ def test_create_list_delete_personal_view(frontend_session): before_count=new_count, ) -@pytest.mark.dependency(depends=["test_healthchecks", "test_create_list_delete_personal_view"]) + +@pytest.mark.dependency( + depends=["test_healthchecks", "test_create_list_delete_personal_view"] +) def test_update_global_view(frontend_session): # First create a view new_view_name = "Test View" new_view_description = "Test Description" new_view_definition = { - "entityTypes": ["DATASET", "DASHBOARD"], - "filter": { - "operator": "AND", - "filters": [ - { - "field": "tags", - "values": ["urn:li:tag:test"], - "negated": False, - "condition": "EQUAL" - } - ] - } + "entityTypes": ["DATASET", "DASHBOARD"], + "filter": { + "operator": "AND", + "filters": [ + { + "field": "tags", + "values": ["urn:li:tag:test"], + "negated": False, + "condition": "EQUAL", + } + ], + }, } # Create new View @@ -345,7 +345,7 @@ def test_update_global_view(frontend_session): "viewType": "PERSONAL", "name": new_view_name, "description": new_view_description, - "definition": new_view_definition + "definition": new_view_definition, } }, } @@ -366,18 +366,18 @@ def test_update_global_view(frontend_session): new_view_name = "New Test View" new_view_description = "New Test Description" new_view_definition = { - "entityTypes": ["DATASET", "DASHBOARD", "CHART", "DATA_FLOW"], - "filter": { - "operator": "OR", - "filters": [ - { - "field": "glossaryTerms", - "values": ["urn:li:glossaryTerm:test"], - "negated": True, - "condition": "CONTAIN" - } - ] - } + "entityTypes": ["DATASET", "DASHBOARD", "CHART", "DATA_FLOW"], + "filter": { + "operator": "OR", + "filters": [ + { + "field": "glossaryTerms", + "values": ["urn:li:glossaryTerm:test"], + "negated": True, + "condition": "CONTAIN", + } + ], + }, } update_view_json = { @@ -391,8 +391,8 @@ def test_update_global_view(frontend_session): "input": { "name": new_view_name, "description": new_view_description, - "definition": new_view_definition - } + "definition": new_view_definition, + }, }, } @@ -411,9 +411,7 @@ def test_update_global_view(frontend_session): "query": """mutation deleteView($urn: String!) {\n deleteView(urn: $urn) }""", - "variables": { - "urn": view_urn - }, + "variables": {"urn": view_urn}, } response = frontend_session.post(