Skip to content

Commit

Permalink
implement incremental streams and use where possible (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 authored Jan 23, 2024
1 parent 0c10714 commit 73ac2e5
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
37 changes: 37 additions & 0 deletions tap_facebook/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

from __future__ import annotations

import json
import typing as t
from http import HTTPStatus
from urllib.parse import urlparse

import pendulum
from singer_sdk.authenticators import BearerTokenAuthenticator
from singer_sdk.exceptions import FatalAPIError, RetriableAPIError
from singer_sdk.helpers.jsonpath import extract_jsonpath
Expand Down Expand Up @@ -139,3 +141,38 @@ def backoff_max_tries(self) -> int:
int: limit
"""
return 20


class IncrementalFacebookStream(FacebookStream):
def get_url_params(
self,
context: dict | None,
next_page_token: t.Any | None, # noqa: ANN401
) -> dict[str, t.Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
params: dict = {"limit": 25}
if next_page_token is not None:
params["after"] = next_page_token
if self.replication_key:
params["sort"] = "asc"
params["order_by"] = self.replication_key
ts = pendulum.parse(self.get_starting_replication_key_value(context))
params["filtering"] = json.dumps(
[
{
"field": f"{self.filter_entity}.{self.replication_key}",
"operator": "GREATER_THAN",
"value": int(ts.timestamp()),
},
],
)

return params
11 changes: 7 additions & 4 deletions tap_facebook/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
StringType,
)

from tap_facebook.client import FacebookStream
from tap_facebook.client import FacebookStream, IncrementalFacebookStream


# ads insights stream
Expand Down Expand Up @@ -210,7 +210,7 @@ def post_process(


# ads stream
class AdsStream(FacebookStream):
class AdsStream(IncrementalFacebookStream):
"""Ads stream class.
columns: columns which will be added to fields parameter in api
Expand Down Expand Up @@ -247,6 +247,7 @@ class AdsStream(FacebookStream):
columns_remaining = ["adlabels", "recommendations"] # noqa: RUF012

name = "ads"
filter_entity = "ad"

path = f"/ads?fields={columns}"

Expand Down Expand Up @@ -475,7 +476,7 @@ class AdsStream(FacebookStream):


# adsets stream
class AdsetsStream(FacebookStream):
class AdsetsStream(IncrementalFacebookStream):
"""https://developers.facebook.com/docs/marketing-api/reference/ad-campaign/."""

"""
Expand Down Expand Up @@ -549,6 +550,7 @@ class AdsetsStream(FacebookStream):
]

name = "adsets"
filter_entity = "adset"

path = f"/adsets?fields={columns}"
primary_keys = ["id", "updated_time"] # noqa: RUF012
Expand Down Expand Up @@ -753,7 +755,7 @@ class AdsetsStream(FacebookStream):


# campaigns stream
class CampaignStream(FacebookStream):
class CampaignStream(IncrementalFacebookStream):
"""https://developers.facebook.com/docs/marketing-api/reference/ad-campaign-group."""

"""
Expand Down Expand Up @@ -808,6 +810,7 @@ class CampaignStream(FacebookStream):
]

name = "campaigns"
filter_entity = "campaign"

path = f"/campaigns?fields={columns}"
primary_keys = ["id", "updated_time"] # noqa: RUF012
Expand Down
2 changes: 1 addition & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from tap_facebook.tap import TapFacebook

SAMPLE_CONFIG = {
"start_date": "2023-03-01T00:00:00Z",
"start_date": "2021-03-01T00:00:00Z",
"access_token": os.environ["TAP_FACEBOOK_ACCESS_TOKEN"],
"account_id": os.environ["TAP_FACEBOOK_ACCOUNT_ID"],
}
Expand Down

0 comments on commit 73ac2e5

Please sign in to comment.