Skip to content

Commit

Permalink
Merge pull request #39 from sebastianswms/logs-threading-updates
Browse files Browse the repository at this point in the history
feat: Updates to logging and threading
  • Loading branch information
sebastianswms authored Sep 6, 2023
2 parents 1e0f0bc + f05ce54 commit 78d619a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 71 deletions.
6 changes: 2 additions & 4 deletions tap_indeedsponsoredjobs/auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""IndeedSponsoredJobs Authentication."""

import logging
from typing import Callable, Generator
from urllib.parse import urlparse

Expand Down Expand Up @@ -172,10 +171,9 @@ def backoff_handler(self, details) -> None:
details: backoff invocation details
https://github.com/litl/backoff#event-handlers
"""
logging.error(
self.logger.error(
"Backing off {wait:0.1f} seconds after {tries} tries "
"calling function {target} with args {args} and kwargs "
"{kwargs}".format(**details)
"calling function {target}".format(**details)
)

def validate_response(self, response: requests.Response) -> None:
Expand Down
112 changes: 45 additions & 67 deletions tap_indeedsponsoredjobs/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@
SCHEMAS_DIR = Path(__file__).parent / Path("./schemas")


class ThreadableIndeedSponsoredJobsStream(IndeedSponsoredJobsStream):

def __init__(self, campaign_threaded_data: dict, *args, **kwargs) -> None:
"""Initialize the stream with a variable for storing threaded data."""
self.campaign_threaded_data = campaign_threaded_data
super().__init__(*args, **kwargs)

def request_records(self, context: dict | None) -> Iterable[dict]:
if self.config["threading_enable"]:
url = self.get_url(context)
for input in self.campaign_threaded_data[url]:
yield from extract_jsonpath(self.records_jsonpath, input=input)
self.campaign_threaded_data.pop(url)
else:
yield from super().request_records(context=context)


class Employers(IndeedSponsoredJobsStream):
"""List of all employers we have access to"""

Expand Down Expand Up @@ -273,23 +290,24 @@ def get_url_params(
def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""

# The name of each threaded child stream and its associated url, for use in the
# manage_threads() function. manage_threads() will store each piece of data in a
# dictionary with the url as a key. Adding url base and record["Id"] could be
# done inside manage_threads(), but it seemed cleaner to do it all here.
endpoints = [
f"{self.url_base}/v1/campaigns/{record['Id']}/budget",
f"{self.url_base}/v1/campaigns/{record['Id']}",
f"{self.url_base}/v1/campaigns/{record['Id']}/properties",
f"{self.url_base}/v1/campaigns/{record['Id']}/jobDetails",
]

# Data is taken from the manage_threads() function and stored in a tap-level
# dictionary for reference from the child streams. This implementation was
# chosen over storing in context because of the dangers of writing state while
# context contains large amounts of data.
self.logger.info(f"Threading to call all of these endpoints at the same time {endpoints=}")
self.manage_threads(endpoints=endpoints, context=context, campaign_threaded_data=self.campaign_threaded_data)
if self.config["threading_enable"]:
# The name of each threaded child stream and its associated url, for use in the
# manage_threads() function. manage_threads() will store each piece of data in a
# dictionary with the url as a key. Adding url base and record["Id"] could be
# done inside manage_threads(), but it seemed cleaner to do it all here.
endpoints = [
f"{self.url_base}/v1/campaigns/{record['Id']}/budget",
f"{self.url_base}/v1/campaigns/{record['Id']}",
f"{self.url_base}/v1/campaigns/{record['Id']}/properties",
f"{self.url_base}/v1/campaigns/{record['Id']}/jobDetails",
]

# Data is taken from the manage_threads() function and stored in a tap-level
# dictionary for reference from the child streams. This implementation was
# chosen over storing in context because of the dangers of writing state while
# context contains large amounts of data.
self.logger.info(f"Threading to call all of these endpoints at the same time {endpoints=}")
self.manage_threads(endpoints=endpoints, context=context, campaign_threaded_data=self.campaign_threaded_data)

return {
"_sdc_employer_id": context["_sdc_employer_id"],
Expand All @@ -313,10 +331,12 @@ def manage_threads(self, endpoints: list[str], context: dict, campaign_threaded_
args=[endpoint, campaign_threaded_data, context, stream_lock],
)
threads.append(new_thread)
self.logger.info("Starting a thread %s for endpoint '%s'", new_thread, endpoint)
new_thread.start()

# Wait for the completion of all threads before continuing.
for thread in threads:
self.logger.info("Joining thread %s", thread)
thread.join()

def thread_stream(self, endpoint: str, campaign_threaded_data: dict, context: dict, stream_lock: threading.Lock) -> None:
Expand All @@ -329,7 +349,10 @@ def thread_stream(self, endpoint: str, campaign_threaded_data: dict, context: di
context: Relevant context for the stream.
"""

# First critical section setting up an empty array.
stream_lock.acquire()
campaign_threaded_data[endpoint] = []
stream_lock.release()

paginator = self.get_new_paginator()
decorated_request = self.request_decorator(self._request)
Expand All @@ -351,7 +374,7 @@ def thread_stream(self, endpoint: str, campaign_threaded_data: dict, context: di
resp = decorated_request(prepared_request, context)
response_json = resp.json()

# Critical section.
# Second critical section adding to an existing array.
stream_lock.acquire()
campaign_threaded_data[endpoint].append(response_json)
stream_lock.release()
Expand Down Expand Up @@ -410,7 +433,7 @@ def get_url_params(
return params


class CampaignBudget(IndeedSponsoredJobsStream):
class CampaignBudget(ThreadableIndeedSponsoredJobsStream):
"""Campaign Budget per Campaign"""

name = "campaign_budget"
Expand All @@ -425,19 +448,8 @@ class CampaignBudget(IndeedSponsoredJobsStream):
th.Property("_sdc_campaign_id", th.StringType),
).to_dict()

def __init__(self, campaign_threaded_data: dict, *args, **kwargs) -> None:
"""Initialize the stream with a variable for storing threaded data."""
self.campaign_threaded_data = campaign_threaded_data
super().__init__(*args, **kwargs)

def request_records(self, context: dict | None) -> Iterable[dict]:
url = self.get_url(context)
for input in self.campaign_threaded_data[url]:
yield from extract_jsonpath(self.records_jsonpath, input=input)
self.campaign_threaded_data.pop(url)


class CampaignInfo(IndeedSponsoredJobsStream):
class CampaignInfo(ThreadableIndeedSponsoredJobsStream):
"""Campaign Info per Campaign"""

name = "campaign_info"
Expand Down Expand Up @@ -474,19 +486,8 @@ class CampaignInfo(IndeedSponsoredJobsStream):
th.Property("_sdc_campaign_id", th.StringType),
).to_dict()

def __init__(self, campaign_threaded_data: dict, *args, **kwargs) -> None:
"""Initialize the stream with a variable for storing threaded data."""
self.campaign_threaded_data = campaign_threaded_data
super().__init__(*args, **kwargs)

def request_records(self, context: dict | None) -> Iterable[dict]:
url = self.get_url(context)
for input in self.campaign_threaded_data[url]:
yield from extract_jsonpath(self.records_jsonpath, input=input)
self.campaign_threaded_data.pop(url)


class CampaignProperties(IndeedSponsoredJobsStream):
class CampaignProperties(ThreadableIndeedSponsoredJobsStream):
"""Campaign Properties per Campaign"""

name = "campaign_property"
Expand All @@ -503,19 +504,8 @@ class CampaignProperties(IndeedSponsoredJobsStream):
th.Property("_sdc_employer_id", th.StringType),
).to_dict()

def __init__(self, campaign_threaded_data: dict, *args, **kwargs) -> None:
"""Initialize the stream with a variable for storing threaded data."""
self.campaign_threaded_data = campaign_threaded_data
super().__init__(*args, **kwargs)

def request_records(self, context: dict | None) -> Iterable[dict]:
url = self.get_url(context)
for input in self.campaign_threaded_data[url]:
yield from extract_jsonpath(self.records_jsonpath, input=input)
self.campaign_threaded_data.pop(url)


class CampaignJobDetails(IndeedSponsoredJobsStream):
class CampaignJobDetails(ThreadableIndeedSponsoredJobsStream):
"""Job Details per Campaign"""

name = "campaign_job_detail"
Expand All @@ -532,15 +522,3 @@ class CampaignJobDetails(IndeedSponsoredJobsStream):
th.Property("_sdc_campaign_id", th.StringType),
th.Property("_sdc_employer_id", th.StringType),
).to_dict()

def __init__(self, campaign_threaded_data: dict, *args, **kwargs) -> None:
"""Initialize the stream with a variable for storing threaded data."""
self.campaign_threaded_data = campaign_threaded_data
super().__init__(*args, **kwargs)

def request_records(self, context: dict | None) -> Iterable[dict]:
url = self.get_url(context)
for input in self.campaign_threaded_data[url]:
self.logger.info(f"{self.campaign_threaded_data=}")
yield from extract_jsonpath(self.records_jsonpath, input=input)
self.campaign_threaded_data.pop(url)
9 changes: 9 additions & 0 deletions tap_indeedsponsoredjobs/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ class TapIndeedSponsoredJobs(Tap):
"left unset, will pull all employer ids."
),
),
th.Property(
"threading_enable",
th.BooleanType,
required=True,
default=False,
description=(
"Whether to use threading to run some streams in parallel."
),
),
th.Property(
"start_date",
th.StringType,
Expand Down

0 comments on commit 78d619a

Please sign in to comment.