Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/visitor activity 504 #8

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions tap_pardot/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Client:
creds = None

get_url = "{}/version/{}/do/query"
get_id_url = "{}/version/{}/do/read/id/{}"
describe_url = "{}/version/{}/do/describe"

def __init__(self, creds):
Expand Down Expand Up @@ -144,11 +145,14 @@ def describe(self, endpoint, **kwargs):
giveup=is_not_retryable_pardot_exception,
jitter=None,
)
def _fetch(self, method, endpoint, format_params, **kwargs):
def _fetch(self, method, endpoint, format_params, object_id=None, **kwargs):
base_formatting = [endpoint, self.api_version]
if format_params:
base_formatting.extend(format_params)
url = (ENDPOINT_BASE + self.get_url).format(*base_formatting)
if object_id is not None:
url = (ENDPOINT_BASE + self.get_id_url).format(*base_formatting, object_id)
else:
url = (ENDPOINT_BASE + self.get_url).format(*base_formatting)

params = {"format": "json", "output": "bulk", **kwargs}

Expand All @@ -161,5 +165,8 @@ def _fetch(self, method, endpoint, format_params, **kwargs):
def get(self, endpoint, format_params=None, **kwargs):
return self._fetch("get", endpoint, format_params, **kwargs)

def get_specific(self, endpoint, object_id, format_params=None, **kwargs):
return self._fetch("get", endpoint, format_params, object_id, **kwargs)

def post(self, endpoint, format_params=None, **kwargs):
return self._fetch("post", endpoint, format_params, **kwargs)
115 changes: 114 additions & 1 deletion tap_pardot/streams.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import inspect

import singer
from singer import utils
from datetime import timedelta


class Stream:
Expand Down Expand Up @@ -143,6 +145,117 @@ def get_params(self):
"sort_order": "ascending",
}

class UpdatedAtDescendingSortReplicationStream(Stream):
"""
Streams where records can only be sorted backwards by updated_at.

Syncing mechanism:

- use bookmark to keep track of the overall window_start and window_end
- use bookmark to track internal sub_window_end bookmark
- sync records from now/end_date to bookmark/start_date and adjust overall bookmark after full sync
"""
replication_key = ["updated_at"]
replication_method = "INCREMENTAL"

def get_params(self, window_start, window_end):
return {
"updated_after": window_start,
"updated_before": window_end,
"sort_by": "updated_at",
"sort_order": "descending",
}

def update_bookmark(self, key, value):
singer.bookmarks.write_bookmark(
self.state, self.stream_name, key, value
)

def _get_window_state(self):
"""
Pardot's API seems to treat dates like 'YYYY-mm-DD HH:MM:SS' in the
user's time zone, and dates like 'YYYY-mm-DDTHH:MM:SSZ in UTC.

For consistency:
- All datetimes should be in string format, never parsed.
- The window_start and window_end times should all be in UTC.
- The sub_window should be in the user's timezone.
"""
window_start = singer.get_bookmark(self.state, self.stream_name, 'window_start')
sub_window_end = singer.get_bookmark(self.state, self.stream_name, 'sub_window_end')
window_end = singer.get_bookmark(self.state, self.stream_name, 'window_end')

start_date = self.config.get('start_date')

window_start = max(window_start, start_date)
window_end = window_end

return window_start, sub_window_end, window_end

def pre_sync(self):
if singer.get_bookmark(self.state, self.stream_name, 'sub_window_end') is None:
if singer.get_bookmark(self.state, self.stream_name, 'window_start') is None:
singer.write_bookmark(self.state, self.stream_name, "window_start", self.config.get('start_date'))
if singer.get_bookmark(self.state, self.stream_name, 'window_end') is None:
singer.write_bookmark(self.state, self.stream_name, "window_end", utils.strftime(utils.now()))
singer.write_state(self.state)

def post_sync(self):
# Set window_start to current window_end
window_start = singer.get_bookmark(self.state, self.stream_name, "window_end")
singer.write_bookmark(self.state, self.stream_name, "window_start", window_start)
singer.clear_bookmark(self.state, self.stream_name, "window_end")
singer.bookmarks.clear_bookmark(self.state, self.stream_name, "sub_window_end")
singer.write_state(self.state)


def check_order(self, current_bookmark_value):
if self._last_bookmark_value is None:
self._last_bookmark_value = current_bookmark_value

if current_bookmark_value > self._last_bookmark_value:
raise Exception(
"Detected out of order data. Current bookmark value {} is greater than last bookmark value {}".format(
current_bookmark_value, self._last_bookmark_value
)
)

self._last_bookmark_value = current_bookmark_value

def get_records(self, window_start, window_end):
""" Make one page request and update bookmarks, sync handles pagination based on result size. """
replication_key = self.replication_key[0]
sub_window_end = window_end
data = self.client.get(self.endpoint, **self.get_params(window_start, window_end))

if data["result"] is None or data["result"].get("total_results") == 0:
return []

records = data["result"][self.data_key]

if isinstance(records, dict):
records = [records]

for rec in records:
bookmark_value = rec.get(replication_key)
if bookmark_value is None:
raise Exception("visitor_activities - Visitor Activity (ID: {}) is missing updated_at value.".format(rec.get("id")))
self.check_order()
yield rec

sub_window_end = records[-1][replication_key]
self.update_bookmark("sub_window_end", sub_window_end)
singer.write_state(self.state)

def sync_page(self):
window_start, sub_window_end, window_end = self._get_window_state()

if sub_window_end is not None:
for rec in self.get_records(window_start, sub_window_end):
yield rec
else:
for rec in self.get_records(window_start, window_end):
yield rec

class ComplexBookmarkStream(Stream):
"""Streams that need to keep track of more than 1 bookmark."""
Expand Down Expand Up @@ -353,7 +466,7 @@ class EmailClicks(IdReplicationStream):
is_dynamic = False


class VisitorActivities(IdReplicationStream):
class VisitorActivities(UpdatedAtDescendingSortReplicationStream):
stream_name = "visitor_activities"
data_key = "visitor_activity"
endpoint = "visitorActivity"
Expand Down
18 changes: 17 additions & 1 deletion tap_pardot/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,25 @@

LOGGER = singer.get_logger()

def translate_state(client, state, selected_streams):
"""
Translate the state for streams that have changed their patterns
"""
selected_stream_names = [s.tap_stream_id for s in selected_streams]
if "visitor_activities" in selected_stream_names:
bookmark_id = singer.bookmarks.get_bookmark(state, "visitor_activities", "id")
if bookmark_id:
bookmark_activity = client.get_specific("visitorActivity", bookmark_id)
if bookmark_activity.get('visitor_activity', {}).get('updated_at'):
new_bookmark = bookmark_activity['visitor_activity']['updated_at']
singer.bookmarks.clear_bookmark(state, "visitor_activities", "id")
singer.bookmarks.write_bookmark(state, "visitor_activities", "window_start", new_bookmark)
else:
raise Exception("Could not translate state for visitor_activites, bookmarked activity is missing `updated_at` value")

def sync(client, config, state, catalog):
selected_streams = catalog.get_selected_streams(state)
selected_streams = list(catalog.get_selected_streams(state))
translate_state(client, state, selected_streams)

for stream in selected_streams:
stream_id = stream.tap_stream_id
Expand Down