From 73ac2e532a0b40905717c366ff74f7253dfff0ca Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Tue, 23 Jan 2024 13:56:38 -0500 Subject: [PATCH] implement incremental streams and use where possible (#143) --- tap_facebook/client.py | 37 +++++++++++++++++++++++++++++++++++++ tap_facebook/streams.py | 11 +++++++---- tests/test_core.py | 2 +- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/tap_facebook/client.py b/tap_facebook/client.py index c5da462..bf37e9f 100644 --- a/tap_facebook/client.py +++ b/tap_facebook/client.py @@ -2,10 +2,12 @@ from __future__ import annotations +import json import typing as t from http import HTTPStatus from urllib.parse import urlparse +import pendulum from singer_sdk.authenticators import BearerTokenAuthenticator from singer_sdk.exceptions import FatalAPIError, RetriableAPIError from singer_sdk.helpers.jsonpath import extract_jsonpath @@ -139,3 +141,38 @@ def backoff_max_tries(self) -> int: int: limit """ return 20 + + +class IncrementalFacebookStream(FacebookStream): + def get_url_params( + self, + context: dict | None, + next_page_token: t.Any | None, # noqa: ANN401 + ) -> dict[str, t.Any]: + """Return a dictionary of values to be used in URL parameterization. + + Args: + context: The stream context. + next_page_token: The next page index or value. + + Returns: + A dictionary of URL query parameters. + """ + params: dict = {"limit": 25} + if next_page_token is not None: + params["after"] = next_page_token + if self.replication_key: + params["sort"] = "asc" + params["order_by"] = self.replication_key + ts = pendulum.parse(self.get_starting_replication_key_value(context)) + params["filtering"] = json.dumps( + [ + { + "field": f"{self.filter_entity}.{self.replication_key}", + "operator": "GREATER_THAN", + "value": int(ts.timestamp()), + }, + ], + ) + + return params diff --git a/tap_facebook/streams.py b/tap_facebook/streams.py index 29cf64a..4c778b7 100644 --- a/tap_facebook/streams.py +++ b/tap_facebook/streams.py @@ -18,7 +18,7 @@ StringType, ) -from tap_facebook.client import FacebookStream +from tap_facebook.client import FacebookStream, IncrementalFacebookStream # ads insights stream @@ -210,7 +210,7 @@ def post_process( # ads stream -class AdsStream(FacebookStream): +class AdsStream(IncrementalFacebookStream): """Ads stream class. columns: columns which will be added to fields parameter in api @@ -247,6 +247,7 @@ class AdsStream(FacebookStream): columns_remaining = ["adlabels", "recommendations"] # noqa: RUF012 name = "ads" + filter_entity = "ad" path = f"/ads?fields={columns}" @@ -475,7 +476,7 @@ class AdsStream(FacebookStream): # adsets stream -class AdsetsStream(FacebookStream): +class AdsetsStream(IncrementalFacebookStream): """https://developers.facebook.com/docs/marketing-api/reference/ad-campaign/.""" """ @@ -549,6 +550,7 @@ class AdsetsStream(FacebookStream): ] name = "adsets" + filter_entity = "adset" path = f"/adsets?fields={columns}" primary_keys = ["id", "updated_time"] # noqa: RUF012 @@ -753,7 +755,7 @@ class AdsetsStream(FacebookStream): # campaigns stream -class CampaignStream(FacebookStream): +class CampaignStream(IncrementalFacebookStream): """https://developers.facebook.com/docs/marketing-api/reference/ad-campaign-group.""" """ @@ -808,6 +810,7 @@ class CampaignStream(FacebookStream): ] name = "campaigns" + filter_entity = "campaign" path = f"/campaigns?fields={columns}" primary_keys = ["id", "updated_time"] # noqa: RUF012 diff --git a/tests/test_core.py b/tests/test_core.py index 54b7a0a..e5b69b6 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -8,7 +8,7 @@ from tap_facebook.tap import TapFacebook SAMPLE_CONFIG = { - "start_date": "2023-03-01T00:00:00Z", + "start_date": "2021-03-01T00:00:00Z", "access_token": os.environ["TAP_FACEBOOK_ACCESS_TOKEN"], "account_id": os.environ["TAP_FACEBOOK_ACCOUNT_ID"], }