diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index f71afac737ca6..70786efff79a4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -95,6 +95,7 @@ class Constant: TITLE = "title" EMBED_URL = "embedUrl" ACCESS_TOKEN = "access_token" + ACCESS_TOKEN_EXPIRY = "expires_in" IS_READ_ONLY = "isReadOnly" WEB_URL = "webUrl" ODATA_COUNT = "@odata.count" diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index c6314c212d104..3aeffa60bc28e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -1,6 +1,7 @@ import logging import math from abc import ABC, abstractmethod +from datetime import datetime, timedelta from time import sleep from typing import Any, Dict, List, Optional @@ -59,6 +60,7 @@ def __init__( tenant_id: str, ): self.__access_token: Optional[str] = None + self.__access_token_expiry_time: Optional[datetime] = None self.__tenant_id = tenant_id # Test connection by generating access token logger.info("Trying to connect to {}".format(self._get_authority_url())) @@ -128,7 +130,7 @@ def get_authorization_header(self): return {Constant.Authorization: self.get_access_token()} def get_access_token(self): - if self.__access_token is not None: + if self.__access_token is not None and not self._is_access_token_expired(): return self.__access_token logger.info("Generating PowerBi access token") @@ -150,11 +152,22 @@ def get_access_token(self): self.__access_token = "Bearer {}".format( auth_response.get(Constant.ACCESS_TOKEN) ) + safety_gap = 300 + self.__access_token_expiry_time = datetime.now() + timedelta( + seconds=( + max(auth_response.get(Constant.ACCESS_TOKEN_EXPIRY, 0) - safety_gap, 0) + ) + ) logger.debug(f"{Constant.PBIAccessToken}={self.__access_token}") return self.__access_token + def _is_access_token_expired(self) -> bool: + if not self.__access_token_expiry_time: + return True + return self.__access_token_expiry_time < datetime.now() + def get_dashboards(self, workspace: Workspace) -> List[Dashboard]: """ Get the list of dashboard from PowerBi for the given workspace identifier diff --git a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py index c9b0ded433749..b2cbccf983eb0 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py +++ b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py @@ -1,8 +1,10 @@ +import datetime import logging import re import sys from typing import Any, Dict, List, cast from unittest import mock +from unittest.mock import MagicMock import pytest from freezegun import freeze_time @@ -31,13 +33,23 @@ def enable_logging(): logging.getLogger().setLevel(logging.DEBUG) -def mock_msal_cca(*args, **kwargs): - class MsalClient: - def acquire_token_for_client(self, *args, **kwargs): - return { - "access_token": "dummy", - } +class MsalClient: + call_num = 0 + token: Dict[str, Any] = { + "access_token": "dummy", + } + + @staticmethod + def acquire_token_for_client(*args, **kwargs): + MsalClient.call_num += 1 + return MsalClient.token + + @staticmethod + def reset(): + MsalClient.call_num = 0 + +def mock_msal_cca(*args, **kwargs): return MsalClient() @@ -627,7 +639,13 @@ def default_source_config(): @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration -def test_powerbi_ingest(mock_msal, pytestconfig, tmp_path, mock_time, requests_mock): +def test_powerbi_ingest( + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: enable_logging() test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" @@ -658,7 +676,7 @@ def test_powerbi_ingest(mock_msal, pytestconfig, tmp_path, mock_time, requests_m mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "powerbi_mces.json", + output_path=f"{tmp_path}/powerbi_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @@ -667,8 +685,12 @@ def test_powerbi_ingest(mock_msal, pytestconfig, tmp_path, mock_time, requests_m @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_powerbi_platform_instance_ingest( - mock_msal, pytestconfig, tmp_path, mock_time, requests_mock -): + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: enable_logging() test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" @@ -711,8 +733,12 @@ def test_powerbi_platform_instance_ingest( @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_powerbi_ingest_urn_lower_case( - mock_msal, pytestconfig, tmp_path, mock_time, requests_mock -): + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(request_mock=requests_mock) @@ -752,8 +778,12 @@ def test_powerbi_ingest_urn_lower_case( @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_override_ownership( - mock_msal, pytestconfig, tmp_path, mock_time, requests_mock -): + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(request_mock=requests_mock) @@ -783,7 +813,7 @@ def test_override_ownership( mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "powerbi_mces_disabled_ownership.json", + output_path=f"{tmp_path}/powerbi_mces_disabled_ownership.json", golden_path=f"{test_resources_dir}/{mce_out_file}", ) @@ -792,8 +822,13 @@ def test_override_ownership( @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_scan_all_workspaces( - mock_msal, pytestconfig, tmp_path, mock_time, requests_mock -): + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: + test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(request_mock=requests_mock) @@ -828,7 +863,7 @@ def test_scan_all_workspaces( mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "powerbi_mces_scan_all_workspaces.json", + output_path=f"{tmp_path}/powerbi_mces_scan_all_workspaces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @@ -836,7 +871,14 @@ def test_scan_all_workspaces( @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration -def test_extract_reports(mock_msal, pytestconfig, tmp_path, mock_time, requests_mock): +def test_extract_reports( + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: + enable_logging() test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" @@ -868,7 +910,7 @@ def test_extract_reports(mock_msal, pytestconfig, tmp_path, mock_time, requests_ mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "powerbi_report_mces.json", + output_path=f"{tmp_path}/powerbi_report_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @@ -876,7 +918,13 @@ def test_extract_reports(mock_msal, pytestconfig, tmp_path, mock_time, requests_ @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration -def test_extract_lineage(mock_msal, pytestconfig, tmp_path, mock_time, requests_mock): +def test_extract_lineage( + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: enable_logging() test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" @@ -925,8 +973,12 @@ def test_extract_lineage(mock_msal, pytestconfig, tmp_path, mock_time, requests_ @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_extract_endorsements( - mock_msal, pytestconfig, tmp_path, mock_time, requests_mock -): + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(request_mock=requests_mock) @@ -957,7 +1009,7 @@ def test_extract_endorsements( mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "powerbi_endorsement_mces.json", + output_path=f"{tmp_path}/powerbi_endorsement_mces.json", golden_path=f"{test_resources_dir}/{mce_out_file}", ) @@ -966,8 +1018,12 @@ def test_extract_endorsements( @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_admin_access_is_not_allowed( - mock_msal, pytestconfig, tmp_path, mock_time, requests_mock -): + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: enable_logging() test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" @@ -1024,8 +1080,12 @@ def test_admin_access_is_not_allowed( @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) def test_workspace_container( - mock_msal, pytestconfig, tmp_path, mock_time, requests_mock -): + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: enable_logging() test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" @@ -1062,11 +1122,92 @@ def test_workspace_container( mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "powerbi_container_mces.json", + output_path=f"{tmp_path}/powerbi_container_mces.json", golden_path=f"{test_resources_dir}/{mce_out_file}", ) +@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) +def test_access_token_expiry_with_long_expiry( + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: + enable_logging() + + register_mock_api(request_mock=requests_mock) + + pipeline = Pipeline.create( + { + "run_id": "powerbi-test", + "source": { + "type": "powerbi", + "config": { + **default_source_config(), + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/powerbi_access_token_mces.json", + }, + }, + } + ) + + # for long expiry, the token should only be requested once. + MsalClient.token = { + "access_token": "dummy2", + "expires_in": 3600, + } + + MsalClient.reset() + pipeline.run() + # We expect the token to be requested twice (once for AdminApiResolver and one for RegularApiResolver) + assert MsalClient.call_num == 2 + + +@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) +def test_access_token_expiry_with_short_expiry( + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: + enable_logging() + + register_mock_api(request_mock=requests_mock) + + pipeline = Pipeline.create( + { + "run_id": "powerbi-test", + "source": { + "type": "powerbi", + "config": { + **default_source_config(), + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/powerbi_access_token_mces.json", + }, + }, + } + ) + + # for short expiry, the token should be requested when expires. + MsalClient.token = { + "access_token": "dummy", + "expires_in": 0, + } + pipeline.run() + assert MsalClient.call_num > 2 + + def dataset_type_mapping_set_to_all_platform(pipeline: Pipeline) -> None: source_config: PowerBiDashboardSourceConfig = cast( PowerBiDashboardSource, pipeline.source @@ -1306,8 +1447,12 @@ def validate_pipeline(pipeline: Pipeline) -> None: @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_reports_with_failed_page_request( - mock_msal, pytestconfig, tmp_path, mock_time, requests_mock -): + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: """ Test that all reports are fetched even if a single page request fails """ @@ -1419,8 +1564,12 @@ def test_reports_with_failed_page_request( @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) def test_independent_datasets_extraction( - mock_msal, pytestconfig, tmp_path, mock_time, requests_mock -): + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" @@ -1503,14 +1652,20 @@ def test_independent_datasets_extraction( mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "powerbi_independent_mces.json", + output_path=f"{tmp_path}/powerbi_independent_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) -def test_cll_extraction(mock_msal, pytestconfig, tmp_path, mock_time, requests_mock): +def test_cll_extraction( + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" @@ -1553,7 +1708,7 @@ def test_cll_extraction(mock_msal, pytestconfig, tmp_path, mock_time, requests_m mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "powerbi_cll_mces.json", + output_path=f"{tmp_path}/powerbi_cll_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @@ -1561,8 +1716,12 @@ def test_cll_extraction(mock_msal, pytestconfig, tmp_path, mock_time, requests_m @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) def test_cll_extraction_flags( - mock_msal, pytestconfig, tmp_path, mock_time, requests_mock -): + mock_msal: MagicMock, + pytestconfig: pytest.Config, + tmp_path: str, + mock_time: datetime.datetime, + requests_mock: Any, +) -> None: register_mock_api( request_mock=requests_mock,