diff --git a/source-asana/source_asana/streams.py b/source-asana/source_asana/streams.py index 625c432324..c87abe9c78 100644 --- a/source-asana/source_asana/streams.py +++ b/source-asana/source_asana/streams.py @@ -235,7 +235,8 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: class Events(AsanaStream): primary_key = "created_at" - sync_token = None + sync_token: Optional[str] = None + raise_on_http_errors = False def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: return "events" @@ -248,49 +249,46 @@ def read_records(self, *args, **kwargs): yield from super().read_records(*args, **kwargs) - # After reading records, update the sync token - self.sync_token = self.get_latest_sync_token() - - def get_latest_sync_token(self) -> str: - latest_sync_token = self.state.get("last_sync_token") # Get the previous sync token - - if latest_sync_token is None: - return None - - return latest_sync_token["sync"] # Extract the sync token value - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - if response.status_code == 412: # Check if response is a 412 error - response_json = response.json() - if "sync" in response_json: # Check if new sync token is available - self.sync_token = response_json["sync"] - else: - self.sync_token = None - self.logger.warning("Sync token expired. Fetch the full dataset for this query now.") - else: - response_json = response.json() + def parse_response( + self, response: requests.Response, **kwargs + ) -> Iterable[Mapping]: + response_json: dict = response.json() + self.sync_token = response_json.get("sync") - # Check if response has new sync token - if "sync" in response_json: - self.sync_token = response_json["sync"] + if ( # Check if response is a 412 error + response.status_code == HTTPStatus.PRECONDITION_FAILED + or not self.sync_token + ): + self.logger.warning( + "Sync token expired. Fetch the full dataset for this query now." + ) + data = response_json.get("data", []) - yield from response_json.get("data", []) + yield from data - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + def next_page_token( + self, response: requests.Response + ) -> Optional[Mapping[str, Any]]: decoded_response = response.json() last_sync = decoded_response.get("sync") if last_sync: return {"sync": last_sync} - def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: + def request_params( + self, stream_slice: Mapping[str, Any] = None, **kwargs + ) -> MutableMapping[str, Any]: params = super().request_params(**kwargs) params["resource"] = stream_slice["resource_gid"] + params["sync"] = self.sync_token return params def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - yield from self.read_slices_from_records(stream_class=Projects, slice_field="resource_gid") - yield from self.read_slices_from_records(stream_class=Tasks, slice_field="resource_gid") - + yield from self.read_slices_from_records( + stream_class=Projects, slice_field="resource_gid" + ) + yield from self.read_slices_from_records( + stream_class=Tasks, slice_field="resource_gid" + ) class OrganizationExports(AsanaStream):