diff --git a/source-zendesk-support/source_zendesk_support/streams.py b/source-zendesk-support/source_zendesk_support/streams.py index 287cf8664..3deffda19 100644 --- a/source-zendesk-support/source_zendesk_support/streams.py +++ b/source-zendesk-support/source_zendesk_support/streams.py @@ -754,54 +754,34 @@ class AuditLogs(SourceZendeskSupportCursorPaginationStream): This endpoint does not respect the start_time param. It requires two query params with the same name to filter by date. See https://support.zendesk.com/hc/en-us/community/posts/4859612547866-Audit-Log-API-error. - Instead, we mimic the TicketAudits stream strategy of not paginating further when the most recent page contains results we don't need. """ response_list_name: str = "audit_logs" # audit_logs doesn't have the 'updated_by' field cursor_field = "created_at" + state_checkpoint_interval = 100 - def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: - params = {"page[size]": self.page_size} + def request_params(self, next_page_token: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None,**kwargs) -> MutableMapping[str, Any]: + cursor_value = stream_state.get(self.cursor_field, None) + start_time = cursor_value or self._start_date + # The end_time is moved a little in the past to avoid missing records that share the same "created_at" since + # records with the current cursor value are ignored. + end_time = (datetime.now(tz=UTC) - timedelta(seconds=30)).strftime(DATETIME_FORMAT) + + params = { + "page[size]": self.page_size, + # By default, results are returned in descending order. + # "sort" is required to get results returned in ascending order. + "sort": "created_at", + # "filter[created_at][]" filters the responses' results to only records created within the specified timespan. + "filter[created_at][]": [start_time, end_time] + } if next_page_token: params.update(next_page_token) return params - - def _is_before_last_cursor_date(self, response: requests.Response, stream_state: Mapping[str, Any]) -> bool: - """ - This method checks whether a response contains documents before the last cursor date (if it exists) or thg start date. - This allows us to determine when to stop paginating backwards. - """ - document = response.json().get(self.response_list_name, [{}])[0] - document_created_at = document.get(self.cursor_field, "") - cursor_date = (stream_state or {}).get(self.cursor_field) or self._start_date - return document_created_at < cursor_date - - # Same as airbyte_cdk's HttpStream._read_pages method, but adds a condition to stop paginating - # if the response contains documents created before our last cursor date / start date. - def _read_pages( - self, - records_generator_fn: Callable[ - [requests.PreparedRequest, requests.Response, Mapping[str, Any], Optional[Mapping[str, Any]]], Iterable[StreamData] - ], - stream_slice: Optional[Mapping[str, Any]] = None, - stream_state: Optional[Mapping[str, Any]] = None, - ) -> Iterable[StreamData]: - stream_state = stream_state or {} - pagination_complete = False - next_page_token = None - while not pagination_complete: - request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token) - yield from records_generator_fn(request, response, stream_state, stream_slice) - next_page_token = self.next_page_token(response) - if not next_page_token or self._is_before_last_cursor_date(response, stream_state): - pagination_complete = True - - # Return an empty generator in case no records are yielded - yield from [] class Users(SourceZendeskSupportIncrementalCursorExportStream): """Users stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-user-export-cursor-based"""