From 47a27b8ab46349aeb01d208ab3a375cbbf922f86 Mon Sep 17 00:00:00 2001 From: Dimitri GRISARD Date: Fri, 19 Jan 2024 16:34:16 +0100 Subject: [PATCH 1/7] feat(openapi): allow Bearer token --- .../src/datahub/ingestion/source/openapi.py | 7 ++++--- .../src/datahub/ingestion/source/openapi_parser.py | 8 ++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index 1b3a6dc4bee58..105e2711ca8d5 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -114,6 +114,7 @@ def get_swagger(self) -> Dict: sw_dict = get_swag_json( self.url, token=self.token, + bearer_token=self.bearer_token, swagger_file=self.swagger_file, proxies=self.proxies, ) # load the swagger file @@ -286,7 +287,7 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 if config.token: response = request_call( - tot_url, token=config.token, proxies=config.proxies + tot_url, token=config.token, bearer_token=config.bearer_token, proxies=config.proxies ) else: response = request_call( @@ -314,7 +315,7 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 tot_url = clean_url(config.url + self.url_basepath + url_guess) if config.token: response = request_call( - tot_url, token=config.token, proxies=config.proxies + tot_url, token=config.token, bearer_token=config.bearer_token, proxies=config.proxies ) else: response = request_call( @@ -342,7 +343,7 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 tot_url = clean_url(config.url + self.url_basepath + composed_url) if config.token: response = request_call( - tot_url, token=config.token, proxies=config.proxies + tot_url, token=config.token, bearer_token=config.bearer_token, proxies=config.proxies ) else: response = request_call( diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py index c1caca18fefe3..e9c4ee6a1fa29 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py @@ -49,17 +49,20 @@ def flatten2list(d: dict) -> list: def request_call( url: str, token: Optional[str] = None, + bearer_token: Optional[bool] = False, username: Optional[str] = None, password: Optional[str] = None, proxies: Optional[dict] = None, ) -> requests.Response: headers = {"accept": "application/json"} - if username is not None and password is not None: return requests.get( url, headers=headers, auth=HTTPBasicAuth(username, password) ) + elif token is not None and bearer_token is True: + headers["Authorization"] = f"Bearer {token}" + return requests.get(url, proxies=proxies, headers=headers) elif token is not None: headers["Authorization"] = f"{token}" return requests.get(url, proxies=proxies, headers=headers) @@ -70,6 +73,7 @@ def request_call( def get_swag_json( url: str, token: Optional[str] = None, + bearer_token: Optional[bool] = False, username: Optional[str] = None, password: Optional[str] = None, swagger_file: str = "", @@ -77,7 +81,7 @@ def get_swag_json( ) -> Dict: tot_url = url + swagger_file if token is not None: - response = request_call(url=tot_url, token=token, proxies=proxies) + response = request_call(url=tot_url, token=token, bearer_token=bearer_token, proxies=proxies) else: response = request_call( url=tot_url, username=username, password=password, proxies=proxies From 0a4a8b23d6783d0e94bb9a0292d73ed71df62e5b Mon Sep 17 00:00:00 2001 From: Dimitri GRISARD Date: Sat, 20 Jan 2024 14:54:28 +0100 Subject: [PATCH 2/7] feat(openapi): use pydantic --- .../src/datahub/ingestion/source/openapi.py | 39 ++++++++++++++----- .../ingestion/source/openapi_parser.py | 16 ++++---- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index 105e2711ca8d5..d13f1daefdc47 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -4,9 +4,10 @@ from abc import ABC from typing import Dict, Iterable, Optional, Tuple +from pydantic import validator from pydantic.fields import Field -from datahub.configuration.common import ConfigModel +from datahub.configuration.common import ConfigModel, ConfigurationError from datahub.emitter.mce_builder import make_tag_urn from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( @@ -78,9 +79,19 @@ class OpenApiConfig(ConfigModel): default={}, description="Retrieving a token from the endpoint." ) + @validator("bearer_token") + def ensure_only_one_token( + cls, bearer_token: Optional[str], values + ) -> Optional[str]: + if bearer_token is not None and values.get("token") is not None: + raise ConfigurationError( + "Unable to use 'token' and 'bearer_token' together." + ) + return bearer_token + def get_swagger(self) -> Dict: - if self.get_token or self.token is not None: - if self.token is not None: + if self.get_token or self.token or self.bearer_token is not None: + if self.token or self.bearer_token: ... else: assert ( @@ -284,10 +295,12 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 "{" not in endpoint_k ): # if the API does not explicitly require parameters tot_url = clean_url(config.url + self.url_basepath + endpoint_k) - - if config.token: + if config.token or config.bearer_token: response = request_call( - tot_url, token=config.token, bearer_token=config.bearer_token, proxies=config.proxies + tot_url, + token=config.token, + bearer_token=config.bearer_token, + proxies=config.proxies, ) else: response = request_call( @@ -313,9 +326,12 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 # start guessing... url_guess = try_guessing(endpoint_k, root_dataset_samples) tot_url = clean_url(config.url + self.url_basepath + url_guess) - if config.token: + if config.token or config.bearer_token: response = request_call( - tot_url, token=config.token, bearer_token=config.bearer_token, proxies=config.proxies + tot_url, + token=config.token, + bearer_token=config.bearer_token, + proxies=config.proxies, ) else: response = request_call( @@ -341,9 +357,12 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 raw_url=endpoint_k, attr_list=config.forced_examples[endpoint_k] ) tot_url = clean_url(config.url + self.url_basepath + composed_url) - if config.token: + if config.token or config.bearer_token: response = request_call( - tot_url, token=config.token, bearer_token=config.bearer_token, proxies=config.proxies + tot_url, + token=config.token, + bearer_token=config.bearer_token, + proxies=config.proxies, ) else: response = request_call( diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py index e9c4ee6a1fa29..6e136262890c4 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py @@ -49,7 +49,7 @@ def flatten2list(d: dict) -> list: def request_call( url: str, token: Optional[str] = None, - bearer_token: Optional[bool] = False, + bearer_token: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, proxies: Optional[dict] = None, @@ -60,10 +60,10 @@ def request_call( url, headers=headers, auth=HTTPBasicAuth(username, password) ) - elif token is not None and bearer_token is True: - headers["Authorization"] = f"Bearer {token}" + elif bearer_token: + headers["Authorization"] = f"Bearer {bearer_token}" return requests.get(url, proxies=proxies, headers=headers) - elif token is not None: + elif token: headers["Authorization"] = f"{token}" return requests.get(url, proxies=proxies, headers=headers) else: @@ -73,15 +73,17 @@ def request_call( def get_swag_json( url: str, token: Optional[str] = None, - bearer_token: Optional[bool] = False, + bearer_token: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, swagger_file: str = "", proxies: Optional[dict] = None, ) -> Dict: tot_url = url + swagger_file - if token is not None: - response = request_call(url=tot_url, token=token, bearer_token=bearer_token, proxies=proxies) + if token or bearer_token: + response = request_call( + url=tot_url, token=token, bearer_token=bearer_token, proxies=proxies + ) else: response = request_call( url=tot_url, username=username, password=password, proxies=proxies From 34268f777634bb924f2f3d994eae839ecf3e5f75 Mon Sep 17 00:00:00 2001 From: Dimitri <36767102+dim-ops@users.noreply.github.com> Date: Tue, 23 Jan 2024 11:29:05 +0100 Subject: [PATCH 3/7] chore(openapi): always call validator Co-authored-by: Harshal Sheth --- metadata-ingestion/src/datahub/ingestion/source/openapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index d13f1daefdc47..be3fe9fa228a9 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -79,7 +79,7 @@ class OpenApiConfig(ConfigModel): default={}, description="Retrieving a token from the endpoint." ) - @validator("bearer_token") + @validator("bearer_token", always=True) def ensure_only_one_token( cls, bearer_token: Optional[str], values ) -> Optional[str]: From 62e23adb2129875bad43f1da0855bdd5211f303e Mon Sep 17 00:00:00 2001 From: Dimitri <36767102+dim-ops@users.noreply.github.com> Date: Tue, 23 Jan 2024 11:30:03 +0100 Subject: [PATCH 4/7] chore(openapi): improve condition Co-authored-by: Harshal Sheth --- metadata-ingestion/src/datahub/ingestion/source/openapi.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index be3fe9fa228a9..8719fa290c996 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -91,7 +91,10 @@ def ensure_only_one_token( def get_swagger(self) -> Dict: if self.get_token or self.token or self.bearer_token is not None: - if self.token or self.bearer_token: + if self.token: + pass + elif self.bearer_token: + self.token = f"Bearer {self.bearer_token}" ... else: assert ( From 4e53c48bb6be8d2e22719b58c86299c18be67f40 Mon Sep 17 00:00:00 2001 From: Dimitri GRISARD Date: Fri, 26 Jan 2024 19:05:03 +0100 Subject: [PATCH 5/7] fix: add bearer_token field --- metadata-ingestion/src/datahub/ingestion/source/openapi.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index 8719fa290c996..7b12bb4bbf8b1 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -75,6 +75,9 @@ class OpenApiConfig(ConfigModel): token: Optional[str] = Field( default=None, description="Token for endpoint authentication." ) + bearer_token: Optional[str] = Field( + default=None, description="Bearer token for endpoint authentication." + ) get_token: dict = Field( default={}, description="Retrieving a token from the endpoint." ) From de45efc3785cb7f4b32344064e73ff316675d4fa Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 9 Feb 2024 16:21:55 -0500 Subject: [PATCH 6/7] simplify code + fix lint --- .../src/datahub/ingestion/source/openapi.py | 17 ++++++++--------- .../datahub/ingestion/source/openapi_parser.py | 12 ++---------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index 7b12bb4bbf8b1..54affafdcc978 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -84,7 +84,7 @@ class OpenApiConfig(ConfigModel): @validator("bearer_token", always=True) def ensure_only_one_token( - cls, bearer_token: Optional[str], values + cls, bearer_token: Optional[str], values: Dict ) -> Optional[str]: if bearer_token is not None and values.get("token") is not None: raise ConfigurationError( @@ -97,8 +97,11 @@ def get_swagger(self) -> Dict: if self.token: pass elif self.bearer_token: + # TRICKY: To avoid passing a bunch of different token types around, we set the + # token's value to the properly formatted bearer token. + # TODO: We should just create a requests.Session and set all the auth + # details there once, and then use that session for all requests. self.token = f"Bearer {self.bearer_token}" - ... else: assert ( "url_complement" in self.get_token.keys() @@ -131,7 +134,6 @@ def get_swagger(self) -> Dict: sw_dict = get_swag_json( self.url, token=self.token, - bearer_token=self.bearer_token, swagger_file=self.swagger_file, proxies=self.proxies, ) # load the swagger file @@ -301,11 +303,10 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 "{" not in endpoint_k ): # if the API does not explicitly require parameters tot_url = clean_url(config.url + self.url_basepath + endpoint_k) - if config.token or config.bearer_token: + if config.token: response = request_call( tot_url, token=config.token, - bearer_token=config.bearer_token, proxies=config.proxies, ) else: @@ -332,11 +333,10 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 # start guessing... url_guess = try_guessing(endpoint_k, root_dataset_samples) tot_url = clean_url(config.url + self.url_basepath + url_guess) - if config.token or config.bearer_token: + if config.token: response = request_call( tot_url, token=config.token, - bearer_token=config.bearer_token, proxies=config.proxies, ) else: @@ -363,11 +363,10 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 raw_url=endpoint_k, attr_list=config.forced_examples[endpoint_k] ) tot_url = clean_url(config.url + self.url_basepath + composed_url) - if config.token or config.bearer_token: + if config.token: response = request_call( tot_url, token=config.token, - bearer_token=config.bearer_token, proxies=config.proxies, ) else: diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py index 6e136262890c4..b7f9e4a90f64a 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py @@ -49,7 +49,6 @@ def flatten2list(d: dict) -> list: def request_call( url: str, token: Optional[str] = None, - bearer_token: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, proxies: Optional[dict] = None, @@ -59,10 +58,6 @@ def request_call( return requests.get( url, headers=headers, auth=HTTPBasicAuth(username, password) ) - - elif bearer_token: - headers["Authorization"] = f"Bearer {bearer_token}" - return requests.get(url, proxies=proxies, headers=headers) elif token: headers["Authorization"] = f"{token}" return requests.get(url, proxies=proxies, headers=headers) @@ -73,17 +68,14 @@ def request_call( def get_swag_json( url: str, token: Optional[str] = None, - bearer_token: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, swagger_file: str = "", proxies: Optional[dict] = None, ) -> Dict: tot_url = url + swagger_file - if token or bearer_token: - response = request_call( - url=tot_url, token=token, bearer_token=bearer_token, proxies=proxies - ) + if token: + response = request_call(url=tot_url, token=token, proxies=proxies) else: response = request_call( url=tot_url, username=username, password=password, proxies=proxies From be698dbbfdc5e4230893f95e9c8795bfeaa269b3 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 9 Feb 2024 16:24:46 -0500 Subject: [PATCH 7/7] use token is not None --- .../src/datahub/ingestion/source/openapi_parser.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py index b7f9e4a90f64a..5bacafaa3f588 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py @@ -58,7 +58,7 @@ def request_call( return requests.get( url, headers=headers, auth=HTTPBasicAuth(username, password) ) - elif token: + elif token is not None: headers["Authorization"] = f"{token}" return requests.get(url, proxies=proxies, headers=headers) else: @@ -74,12 +74,9 @@ def get_swag_json( proxies: Optional[dict] = None, ) -> Dict: tot_url = url + swagger_file - if token: - response = request_call(url=tot_url, token=token, proxies=proxies) - else: - response = request_call( - url=tot_url, username=username, password=password, proxies=proxies - ) + response = request_call( + url=tot_url, token=token, username=username, password=password, proxies=proxies + ) if response.status_code != 200: raise Exception(f"Unable to retrieve {tot_url}, error {response.status_code}")