Skip to content

Commit

Permalink
source-klaviyo: define filter comparison operator per incremental stream
Browse files Browse the repository at this point in the history
My earlier commit (dfbe8b5) was too
broad in changing all incremental streams to use `"greater-or-equal"` in
the `filter` query param. The `/profiles` endpoint used by both the
`Profiles` and `GlobalExclusions` streams do not support filtering with
`"greater-or-equal"`, so they have to use `"greater-than"`.
  • Loading branch information
Alex-Bair committed Nov 4, 2024
1 parent fe097d7 commit 2fbd9b0
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion source-klaviyo/source_klaviyo/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ def cursor_field(self) -> Union[str, List[str]]:
:return str: The name of the cursor field.
"""

# comparison_operator is used to filter what results we receive from Klaviyo. Some endpoints support "greater-or-equal" (preferred)
# and others only support "greater-than". comparison_operator must be specified for each incremental stream.
comparison_operator: str = "greater-or-equal"

def request_params(
self,
stream_state: Optional[Mapping[str, Any]],
Expand All @@ -172,11 +176,17 @@ def request_params(
latest_cursor = pendulum.parse(latest_cursor)
if stream_state_cursor_value:
latest_cursor = max(latest_cursor, pendulum.parse(stream_state_cursor_value))

# For streams that can only filter with a "greater-than" comparison, we subtract
# one second so we do not miss records updated in the same second.
if self.comparison_operator == 'greater-than':
latest_cursor = latest_cursor.subtract(seconds=1)

# Klaviyo API will throw an error if the request filter is set too close to the current time.
# Setting a minimum value of at least 3 seconds from the current time ensures this will never happen,
# and allows our 'abnormal_state' acceptance test to pass.
latest_cursor = min(latest_cursor, pendulum.now().subtract(seconds=3))
params["filter"] = f"greater-or-equal({self.cursor_field},{latest_cursor.isoformat()})"
params["filter"] = f"{self.comparison_operator}({self.cursor_field},{latest_cursor.isoformat()})"
params["sort"] = self.cursor_field
return params

Expand Down Expand Up @@ -279,6 +289,7 @@ class Profiles(IncrementalKlaviyoStream):
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

cursor_field = "updated"
comparison_operator = 'greater-than'
api_revision = "2023-02-22"
page_size = 100
state_checkpoint_interval = 100 # API can return maximum 100 records per page
Expand Down

0 comments on commit 2fbd9b0

Please sign in to comment.