From 43ad229b0ad431b1bce389ba442821948dd5c26b Mon Sep 17 00:00:00 2001 From: elish7lapid Date: Mon, 21 Aug 2023 17:57:22 +0300 Subject: [PATCH 1/4] fix(powerbi): fix access token expiry Powerbi returns an expiry time for the access token which is currently ignored. This causes all api calls to fail after 60 minutes, which is the timeout returned by PowerBI. This commit fixes the bug by creating a new token once the old one expires. --- .../src/datahub/ingestion/source/powerbi/config.py | 1 + .../source/powerbi/rest_api_wrapper/data_resolver.py | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index 31d067f984d2d..471d458e279fa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -95,6 +95,7 @@ class Constant: TITLE = "title" EMBED_URL = "embedUrl" ACCESS_TOKEN = "access_token" + ACCESS_TOKEN_EXPIRY = "expires_in" IS_READ_ONLY = "isReadOnly" WEB_URL = "webUrl" ODATA_COUNT = "@odata.count" diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index c6314c212d104..c840f641d235a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -1,6 +1,7 @@ import logging import math from abc import ABC, abstractmethod +from datetime import datetime, timedelta from time import sleep from typing import Any, Dict, List, Optional @@ -59,6 +60,7 @@ def __init__( tenant_id: str, ): self.__access_token: Optional[str] = None + self.__access_token_expiry_time: Optional[datetime] = None self.__tenant_id = tenant_id # Test connection by generating access token logger.info("Trying to connect to {}".format(self._get_authority_url())) @@ -128,7 +130,7 @@ def get_authorization_header(self): return {Constant.Authorization: self.get_access_token()} def get_access_token(self): - if self.__access_token is not None: + if self.__access_token is not None and self.__access_token_expiry_time > datetime.now(): return self.__access_token logger.info("Generating PowerBi access token") @@ -150,6 +152,10 @@ def get_access_token(self): self.__access_token = "Bearer {}".format( auth_response.get(Constant.ACCESS_TOKEN) ) + safety_gap = 300 + self.__access_token_expiry_time = datetime.now() + timedelta( + seconds=(max(auth_response.get(Constant.ACCESS_TOKEN_EXPIRY, 0) - safety_gap, 0)) + ) logger.debug(f"{Constant.PBIAccessToken}={self.__access_token}") From 3bfafe89162f14caeb519c6615557ee3a7a52351 Mon Sep 17 00:00:00 2001 From: elish7lapid Date: Sun, 27 Aug 2023 09:34:08 +0300 Subject: [PATCH 2/4] CR fixes. --- .../source/powerbi/rest_api_wrapper/data_resolver.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index c840f641d235a..e9d2f7b98007b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -130,7 +130,7 @@ def get_authorization_header(self): return {Constant.Authorization: self.get_access_token()} def get_access_token(self): - if self.__access_token is not None and self.__access_token_expiry_time > datetime.now(): + if self.__access_token is not None and not self._is_access_token_expired(): return self.__access_token logger.info("Generating PowerBi access token") @@ -161,6 +161,9 @@ def get_access_token(self): return self.__access_token + def _is_access_token_expired(self) -> bool: + return self.__access_token_expiry_time < datetime.now() + def get_dashboards(self, workspace: Workspace) -> List[Dashboard]: """ Get the list of dashboard from PowerBi for the given workspace identifier From 6b0bfe5733d9311121036856b3ffb660dc4c8644 Mon Sep 17 00:00:00 2001 From: elish7lapid Date: Mon, 28 Aug 2023 16:56:36 +0300 Subject: [PATCH 3/4] CR fixes 2 - add tests --- .../tests/integration/powerbi/test_powerbi.py | 57 ++++++++++++++++--- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py index 5036f758a7de9..8ba12810208b9 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py +++ b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py @@ -23,19 +23,25 @@ FROZEN_TIME = "2022-02-03 07:00:00" +class MsalClient: + def __init__(self): + self.expiry_to_return = 3599 + self.token_request_count = 0 + + def acquire_token_for_client(self, *args, **kwargs): + self.token_request_count += 1 + return { + "access_token": "dummy", + "expires_in": self.expiry_to_return, + } + + def enable_logging(): # set logging to console logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) logging.getLogger().setLevel(logging.DEBUG) - def mock_msal_cca(*args, **kwargs): - class MsalClient: - def acquire_token_for_client(self, *args, **kwargs): - return { - "access_token": "dummy", - } - return MsalClient() @@ -1061,6 +1067,43 @@ def test_workspace_container( ) +@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) +def test_access_token_expiry( + mock_msal: MsalClient, pytestconfig, tmp_path, mock_time, requests_mock +): + enable_logging() + + register_mock_api(request_mock=requests_mock) + + pipeline = Pipeline.create( + { + "run_id": "powerbi-test", + "source": { + "type": "powerbi", + "config": { + **default_source_config(), + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/powerbi_access_token_mces.json", + }, + }, + } + ) + + # for long expiry, the token should only be requested once. + mock_msal.expiry_to_return = 3600 + pipeline.run() + assert mock_msal.token_request_count == 1 + + # for short expiry, the token should be requested when expires. + mock_msal.expiry_to_return = 0 + pipeline.run() + assert mock_msal.token_request_count > 1 + + def dataset_type_mapping_set_to_all_platform(pipeline: Pipeline) -> None: source_config: PowerBiDashboardSourceConfig = cast( PowerBiDashboardSource, pipeline.source From 52e1ad4229bcb6cbfcc9e16758be2c6dbdef125f Mon Sep 17 00:00:00 2001 From: elish7lapid Date: Wed, 13 Sep 2023 10:52:15 +0300 Subject: [PATCH 4/4] lint + fix tests --- .../powerbi/rest_api_wrapper/data_resolver.py | 4 +- .../tests/integration/powerbi/test_powerbi.py | 38 ++++++++++--------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index e9d2f7b98007b..1b0a205b461f0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -154,7 +154,9 @@ def get_access_token(self): ) safety_gap = 300 self.__access_token_expiry_time = datetime.now() + timedelta( - seconds=(max(auth_response.get(Constant.ACCESS_TOKEN_EXPIRY, 0) - safety_gap, 0)) + seconds=( + max(auth_response.get(Constant.ACCESS_TOKEN_EXPIRY, 0) - safety_gap, 0) + ) ) logger.debug(f"{Constant.PBIAccessToken}={self.__access_token}") diff --git a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py index 8ba12810208b9..31231da18345f 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py +++ b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py @@ -2,6 +2,7 @@ import sys from typing import Any, Dict, List, cast from unittest import mock +from unittest.mock import MagicMock import pytest from freezegun import freeze_time @@ -23,25 +24,19 @@ FROZEN_TIME = "2022-02-03 07:00:00" -class MsalClient: - def __init__(self): - self.expiry_to_return = 3599 - self.token_request_count = 0 - - def acquire_token_for_client(self, *args, **kwargs): - self.token_request_count += 1 - return { - "access_token": "dummy", - "expires_in": self.expiry_to_return, - } - - def enable_logging(): # set logging to console logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) logging.getLogger().setLevel(logging.DEBUG) + def mock_msal_cca(*args, **kwargs): + class MsalClient: + def acquire_token_for_client(self, *args, **kwargs): + return { + "access_token": "dummy", + } + return MsalClient() @@ -1069,7 +1064,7 @@ def test_workspace_container( @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) def test_access_token_expiry( - mock_msal: MsalClient, pytestconfig, tmp_path, mock_time, requests_mock + mock_msal: MagicMock, pytestconfig, tmp_path, mock_time, requests_mock ): enable_logging() @@ -1094,14 +1089,21 @@ def test_access_token_expiry( ) # for long expiry, the token should only be requested once. - mock_msal.expiry_to_return = 3600 + mock_msal.acquire_token_for_client = lambda *args, **kwargs: { + "access_token": "dummy", + "expires_in": 3600, + } pipeline.run() - assert mock_msal.token_request_count == 1 + mock_msal.return_value.acquire_token_for_client.assert_called_once() # for short expiry, the token should be requested when expires. - mock_msal.expiry_to_return = 0 + mock_msal.reset_mock() + mock_msal.acquire_token_for_client = lambda *args, **kwargs: { + "access_token": "dummy", + "expires_in": 0, + } pipeline.run() - assert mock_msal.token_request_count > 1 + assert len(mock_msal.return_value.acquire_token_for_client.mock_calls) > 1 def dataset_type_mapping_set_to_all_platform(pipeline: Pipeline) -> None: