Skip to content

Commit

Permalink
Handle sync tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
Pedro Antonio authored and jgraettinger committed Mar 7, 2024
1 parent f5371e9 commit 78edfac
Showing 1 changed file with 29 additions and 31 deletions.
60 changes: 29 additions & 31 deletions source-asana/source_asana/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:

class Events(AsanaStream):
primary_key = "created_at"
sync_token = None
sync_token: Optional[str] = None
raise_on_http_errors = False

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return "events"
Expand All @@ -248,49 +249,46 @@ def read_records(self, *args, **kwargs):

yield from super().read_records(*args, **kwargs)

# After reading records, update the sync token
self.sync_token = self.get_latest_sync_token()

def get_latest_sync_token(self) -> str:
latest_sync_token = self.state.get("last_sync_token") # Get the previous sync token

if latest_sync_token is None:
return None

return latest_sync_token["sync"] # Extract the sync token value

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if response.status_code == 412: # Check if response is a 412 error
response_json = response.json()
if "sync" in response_json: # Check if new sync token is available
self.sync_token = response_json["sync"]
else:
self.sync_token = None
self.logger.warning("Sync token expired. Fetch the full dataset for this query now.")
else:
response_json = response.json()
def parse_response(
self, response: requests.Response, **kwargs
) -> Iterable[Mapping]:
response_json: dict = response.json()
self.sync_token = response_json.get("sync")

# Check if response has new sync token
if "sync" in response_json:
self.sync_token = response_json["sync"]
if ( # Check if response is a 412 error
response.status_code == HTTPStatus.PRECONDITION_FAILED
or not self.sync_token
):
self.logger.warning(
"Sync token expired. Fetch the full dataset for this query now."
)
data = response_json.get("data", [])

yield from response_json.get("data", [])
yield from data

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
def next_page_token(
self, response: requests.Response
) -> Optional[Mapping[str, Any]]:
decoded_response = response.json()
last_sync = decoded_response.get("sync")
if last_sync:
return {"sync": last_sync}

def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
def request_params(
self, stream_slice: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(**kwargs)
params["resource"] = stream_slice["resource_gid"]
params["sync"] = self.sync_token
return params

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
yield from self.read_slices_from_records(stream_class=Projects, slice_field="resource_gid")
yield from self.read_slices_from_records(stream_class=Tasks, slice_field="resource_gid")

yield from self.read_slices_from_records(
stream_class=Projects, slice_field="resource_gid"
)
yield from self.read_slices_from_records(
stream_class=Tasks, slice_field="resource_gid"
)


class OrganizationExports(AsanaStream):
Expand Down

0 comments on commit 78edfac

Please sign in to comment.