From b41668cb8d4287ef445521ed5e68090234a5f893 Mon Sep 17 00:00:00 2001 From: Sebastian Smiley Date: Mon, 4 Sep 2023 12:50:34 -0400 Subject: [PATCH 1/3] Remove secrets from logs Config option to disable threading. --- tap_indeedsponsoredjobs/auth.py | 6 +- tap_indeedsponsoredjobs/streams.py | 112 ++++++++++++----------------- tap_indeedsponsoredjobs/tap.py | 13 +++- 3 files changed, 58 insertions(+), 73 deletions(-) diff --git a/tap_indeedsponsoredjobs/auth.py b/tap_indeedsponsoredjobs/auth.py index db62f78..aceee45 100644 --- a/tap_indeedsponsoredjobs/auth.py +++ b/tap_indeedsponsoredjobs/auth.py @@ -1,6 +1,5 @@ """IndeedSponsoredJobs Authentication.""" -import logging from typing import Callable, Generator from urllib.parse import urlparse @@ -172,10 +171,9 @@ def backoff_handler(self, details) -> None: details: backoff invocation details https://github.com/litl/backoff#event-handlers """ - logging.error( + self.logger.error( "Backing off {wait:0.1f} seconds after {tries} tries " - "calling function {target} with args {args} and kwargs " - "{kwargs}".format(**details) + "calling function {target}".format(**details) ) def validate_response(self, response: requests.Response) -> None: diff --git a/tap_indeedsponsoredjobs/streams.py b/tap_indeedsponsoredjobs/streams.py index 6c4540e..da09662 100644 --- a/tap_indeedsponsoredjobs/streams.py +++ b/tap_indeedsponsoredjobs/streams.py @@ -26,6 +26,23 @@ SCHEMAS_DIR = Path(__file__).parent / Path("./schemas") +class ThreadedIndeedSponsoredJobsStream(IndeedSponsoredJobsStream): + + def __init__(self, campaign_threaded_data: dict, *args, **kwargs) -> None: + """Initialize the stream with a variable for storing threaded data.""" + self.campaign_threaded_data = campaign_threaded_data + super().__init__(*args, **kwargs) + + def request_records(self, context: dict | None) -> Iterable[dict]: + if self.config["threading_enable"]: + url = self.get_url(context) + for input in self.campaign_threaded_data[url]: + yield from extract_jsonpath(self.records_jsonpath, input=input) + self.campaign_threaded_data.pop(url) + else: + yield from super().request_records(context=context) + + class Employers(IndeedSponsoredJobsStream): """List of all employers we have access to""" @@ -273,23 +290,24 @@ def get_url_params( def get_child_context(self, record: dict, context: Optional[dict]) -> dict: """Return a context dictionary for child streams.""" - # The name of each threaded child stream and its associated url, for use in the - # manage_threads() function. manage_threads() will store each piece of data in a - # dictionary with the url as a key. Adding url base and record["Id"] could be - # done inside manage_threads(), but it seemed cleaner to do it all here. - endpoints = [ - f"{self.url_base}/v1/campaigns/{record['Id']}/budget", - f"{self.url_base}/v1/campaigns/{record['Id']}", - f"{self.url_base}/v1/campaigns/{record['Id']}/properties", - f"{self.url_base}/v1/campaigns/{record['Id']}/jobDetails", - ] - - # Data is taken from the manage_threads() function and stored in a tap-level - # dictionary for reference from the child streams. This implementation was - # chosen over storing in context because of the dangers of writing state while - # context contains large amounts of data. - self.logger.info(f"Threading to call all of these endpoints at the same time {endpoints=}") - self.manage_threads(endpoints=endpoints, context=context, campaign_threaded_data=self.campaign_threaded_data) + if self.config["threading_enable"]: + # The name of each threaded child stream and its associated url, for use in the + # manage_threads() function. manage_threads() will store each piece of data in a + # dictionary with the url as a key. Adding url base and record["Id"] could be + # done inside manage_threads(), but it seemed cleaner to do it all here. + endpoints = [ + f"{self.url_base}/v1/campaigns/{record['Id']}/budget", + f"{self.url_base}/v1/campaigns/{record['Id']}", + f"{self.url_base}/v1/campaigns/{record['Id']}/properties", + f"{self.url_base}/v1/campaigns/{record['Id']}/jobDetails", + ] + + # Data is taken from the manage_threads() function and stored in a tap-level + # dictionary for reference from the child streams. This implementation was + # chosen over storing in context because of the dangers of writing state while + # context contains large amounts of data. + self.logger.info(f"Threading to call all of these endpoints at the same time {endpoints=}") + self.manage_threads(endpoints=endpoints, context=context, campaign_threaded_data=self.campaign_threaded_data) return { "_sdc_employer_id": context["_sdc_employer_id"], @@ -313,10 +331,12 @@ def manage_threads(self, endpoints: list[str], context: dict, campaign_threaded_ args=[endpoint, campaign_threaded_data, context, stream_lock], ) threads.append(new_thread) + self.logger.info("Starting a thread %s for endpoint '%s'", new_thread, endpoint) new_thread.start() # Wait for the completion of all threads before continuing. for thread in threads: + self.logger.info("Joining thread %s", thread) thread.join() def thread_stream(self, endpoint: str, campaign_threaded_data: dict, context: dict, stream_lock: threading.Lock) -> None: @@ -329,7 +349,10 @@ def thread_stream(self, endpoint: str, campaign_threaded_data: dict, context: di context: Relevant context for the stream. """ + # First critical section setting up an empty array. + stream_lock.acquire() campaign_threaded_data[endpoint] = [] + stream_lock.release() paginator = self.get_new_paginator() decorated_request = self.request_decorator(self._request) @@ -351,7 +374,7 @@ def thread_stream(self, endpoint: str, campaign_threaded_data: dict, context: di resp = decorated_request(prepared_request, context) response_json = resp.json() - # Critical section. + # Second critical section adding to an existing array. stream_lock.acquire() campaign_threaded_data[endpoint].append(response_json) stream_lock.release() @@ -410,7 +433,7 @@ def get_url_params( return params -class CampaignBudget(IndeedSponsoredJobsStream): +class CampaignBudget(ThreadedIndeedSponsoredJobsStream): """Campaign Budget per Campaign""" name = "campaign_budget" @@ -425,19 +448,8 @@ class CampaignBudget(IndeedSponsoredJobsStream): th.Property("_sdc_campaign_id", th.StringType), ).to_dict() - def __init__(self, campaign_threaded_data: dict, *args, **kwargs) -> None: - """Initialize the stream with a variable for storing threaded data.""" - self.campaign_threaded_data = campaign_threaded_data - super().__init__(*args, **kwargs) - def request_records(self, context: dict | None) -> Iterable[dict]: - url = self.get_url(context) - for input in self.campaign_threaded_data[url]: - yield from extract_jsonpath(self.records_jsonpath, input=input) - self.campaign_threaded_data.pop(url) - - -class CampaignInfo(IndeedSponsoredJobsStream): +class CampaignInfo(ThreadedIndeedSponsoredJobsStream): """Campaign Info per Campaign""" name = "campaign_info" @@ -474,19 +486,8 @@ class CampaignInfo(IndeedSponsoredJobsStream): th.Property("_sdc_campaign_id", th.StringType), ).to_dict() - def __init__(self, campaign_threaded_data: dict, *args, **kwargs) -> None: - """Initialize the stream with a variable for storing threaded data.""" - self.campaign_threaded_data = campaign_threaded_data - super().__init__(*args, **kwargs) - - def request_records(self, context: dict | None) -> Iterable[dict]: - url = self.get_url(context) - for input in self.campaign_threaded_data[url]: - yield from extract_jsonpath(self.records_jsonpath, input=input) - self.campaign_threaded_data.pop(url) - -class CampaignProperties(IndeedSponsoredJobsStream): +class CampaignProperties(ThreadedIndeedSponsoredJobsStream): """Campaign Properties per Campaign""" name = "campaign_property" @@ -503,19 +504,8 @@ class CampaignProperties(IndeedSponsoredJobsStream): th.Property("_sdc_employer_id", th.StringType), ).to_dict() - def __init__(self, campaign_threaded_data: dict, *args, **kwargs) -> None: - """Initialize the stream with a variable for storing threaded data.""" - self.campaign_threaded_data = campaign_threaded_data - super().__init__(*args, **kwargs) - def request_records(self, context: dict | None) -> Iterable[dict]: - url = self.get_url(context) - for input in self.campaign_threaded_data[url]: - yield from extract_jsonpath(self.records_jsonpath, input=input) - self.campaign_threaded_data.pop(url) - - -class CampaignJobDetails(IndeedSponsoredJobsStream): +class CampaignJobDetails(ThreadedIndeedSponsoredJobsStream): """Job Details per Campaign""" name = "campaign_job_detail" @@ -532,15 +522,3 @@ class CampaignJobDetails(IndeedSponsoredJobsStream): th.Property("_sdc_campaign_id", th.StringType), th.Property("_sdc_employer_id", th.StringType), ).to_dict() - - def __init__(self, campaign_threaded_data: dict, *args, **kwargs) -> None: - """Initialize the stream with a variable for storing threaded data.""" - self.campaign_threaded_data = campaign_threaded_data - super().__init__(*args, **kwargs) - - def request_records(self, context: dict | None) -> Iterable[dict]: - url = self.get_url(context) - for input in self.campaign_threaded_data[url]: - self.logger.info(f"{self.campaign_threaded_data=}") - yield from extract_jsonpath(self.records_jsonpath, input=input) - self.campaign_threaded_data.pop(url) diff --git a/tap_indeedsponsoredjobs/tap.py b/tap_indeedsponsoredjobs/tap.py index ad7d76d..3205ca9 100644 --- a/tap_indeedsponsoredjobs/tap.py +++ b/tap_indeedsponsoredjobs/tap.py @@ -52,6 +52,15 @@ class TapIndeedSponsoredJobs(Tap): "left unset, will pull all employer ids." ), ), + th.Property( + "threading_enable", + th.BooleanType, + required=True, + default=False, + description=( + "Whether to use threading to run some streams in parallel." + ), + ), th.Property( "start_date", th.StringType, @@ -77,8 +86,8 @@ def discover_streams(self) -> List[Stream]: CampaignJobDetails(campaign_threaded_data=campaign_threaded_data, tap=self), CampaignBudget(campaign_threaded_data=campaign_threaded_data, tap=self), CampaignInfo(campaign_threaded_data=campaign_threaded_data, tap=self), - CampaignPerformanceStats(tap=self), - EmployerStatsReport(tap=self), + # CampaignPerformanceStats(tap=self), + # EmployerStatsReport(tap=self), ] if __name__ == "__main__": From d6e208d73dd031d176ff2aa62212a160b112d226 Mon Sep 17 00:00:00 2001 From: Sebastian Smiley Date: Wed, 6 Sep 2023 12:09:50 -0400 Subject: [PATCH 2/3] Fix testing code --- tap_indeedsponsoredjobs/tap.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_indeedsponsoredjobs/tap.py b/tap_indeedsponsoredjobs/tap.py index 3205ca9..307b6f2 100644 --- a/tap_indeedsponsoredjobs/tap.py +++ b/tap_indeedsponsoredjobs/tap.py @@ -86,8 +86,8 @@ def discover_streams(self) -> List[Stream]: CampaignJobDetails(campaign_threaded_data=campaign_threaded_data, tap=self), CampaignBudget(campaign_threaded_data=campaign_threaded_data, tap=self), CampaignInfo(campaign_threaded_data=campaign_threaded_data, tap=self), - # CampaignPerformanceStats(tap=self), - # EmployerStatsReport(tap=self), + CampaignPerformanceStats(tap=self), + EmployerStatsReport(tap=self), ] if __name__ == "__main__": From f05ce54e50ba9f824960d75635b996c58db1dcbe Mon Sep 17 00:00:00 2001 From: Sebastian Smiley Date: Wed, 6 Sep 2023 12:12:34 -0400 Subject: [PATCH 3/3] Update name of threadable streams superclass --- tap_indeedsponsoredjobs/streams.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tap_indeedsponsoredjobs/streams.py b/tap_indeedsponsoredjobs/streams.py index da09662..9b0ecc8 100644 --- a/tap_indeedsponsoredjobs/streams.py +++ b/tap_indeedsponsoredjobs/streams.py @@ -26,7 +26,7 @@ SCHEMAS_DIR = Path(__file__).parent / Path("./schemas") -class ThreadedIndeedSponsoredJobsStream(IndeedSponsoredJobsStream): +class ThreadableIndeedSponsoredJobsStream(IndeedSponsoredJobsStream): def __init__(self, campaign_threaded_data: dict, *args, **kwargs) -> None: """Initialize the stream with a variable for storing threaded data.""" @@ -433,7 +433,7 @@ def get_url_params( return params -class CampaignBudget(ThreadedIndeedSponsoredJobsStream): +class CampaignBudget(ThreadableIndeedSponsoredJobsStream): """Campaign Budget per Campaign""" name = "campaign_budget" @@ -449,7 +449,7 @@ class CampaignBudget(ThreadedIndeedSponsoredJobsStream): ).to_dict() -class CampaignInfo(ThreadedIndeedSponsoredJobsStream): +class CampaignInfo(ThreadableIndeedSponsoredJobsStream): """Campaign Info per Campaign""" name = "campaign_info" @@ -487,7 +487,7 @@ class CampaignInfo(ThreadedIndeedSponsoredJobsStream): ).to_dict() -class CampaignProperties(ThreadedIndeedSponsoredJobsStream): +class CampaignProperties(ThreadableIndeedSponsoredJobsStream): """Campaign Properties per Campaign""" name = "campaign_property" @@ -505,7 +505,7 @@ class CampaignProperties(ThreadedIndeedSponsoredJobsStream): ).to_dict() -class CampaignJobDetails(ThreadedIndeedSponsoredJobsStream): +class CampaignJobDetails(ThreadableIndeedSponsoredJobsStream): """Job Details per Campaign""" name = "campaign_job_detail"