diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index 510cb6c96d1f2..0eae1967dddf8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -101,7 +101,7 @@ make_fine_grained_lineage_class, make_upstream_class, published_datasource_graphql_query, - query_metadata, + query_metadata_cursor_based_pagination, sheet_graphql_query, tableau_field_to_schema_field, workbook_graphql_query, @@ -972,22 +972,31 @@ def get_connection_object_page( query: str, connection_type: str, query_filter: str, + current_cursor: Optional[str], # initial value is None count: int = 0, - offset: int = 0, retry_on_auth_error: bool = True, retries_remaining: Optional[int] = None, ) -> Tuple[dict, int, int]: retries_remaining = retries_remaining or self.config.max_retries logger.debug( - f"Query {connection_type} to get {count} objects with offset {offset}" + f"Query {connection_type} to get {count} objects with offset {current_cursor}" f" and filter {query_filter}" ) try: assert self.server is not None - query_data = query_metadata( - self.server, query, connection_type, count, offset, query_filter - ) + query_data = query_metadata_cursor_based_pagination( + server=self.server, + main_query=query, + connection_name=connection_type, + first=count, + after=current_cursor, + qry_filter=query_filter, + ) + + # query_data = query_metadata( + # self.server, query, connection_type, count, offset, query_filter + # ) except REAUTHENTICATE_ERRORS: if not retry_on_auth_error: raise @@ -997,11 +1006,11 @@ def get_connection_object_page( # will be thrown and we need to re-authenticate and retry. self._re_authenticate() return self.get_connection_object_page( - query, - connection_type, - query_filter, - count, - offset, + query=query, + connection_type=connection_type, + query_filter=query_filter, + count=count, + current_cursor=current_cursor, retry_on_auth_error=False, retries_remaining=retries_remaining, ) @@ -1015,13 +1024,13 @@ def get_connection_object_page( if retries_remaining <= 0: raise return self.get_connection_object_page( - query, - connection_type, - query_filter, - count, - offset, + query=query, + connection_type=connection_type, + query_filter=query_filter, + count=count, + current_cursor=current_cursor, retry_on_auth_error=False, - retries_remaining=retries_remaining - 1, + retries_remaining=retries_remaining, ) if c.ERRORS in query_data: @@ -1082,23 +1091,28 @@ def get_connection_object_page( ) time.sleep(backoff_time) return self.get_connection_object_page( - query, - connection_type, - query_filter, - count, - offset, + query=query, + connection_type=connection_type, + query_filter=query_filter, + count=count, + current_cursor=current_cursor, retry_on_auth_error=False, - retries_remaining=retries_remaining - 1, + retries_remaining=retries_remaining, ) raise RuntimeError(f"Query {connection_type} error: {errors}") connection_object = query_data.get(c.DATA, {}).get(connection_type, {}) - total_count = connection_object.get(c.TOTAL_COUNT, 0) has_next_page = connection_object.get(c.PAGE_INFO, {}).get( c.HAS_NEXT_PAGE, False ) - return connection_object, total_count, has_next_page + + next_cursor = connection_object.get(c.PAGE_INFO, {}).get( + "endCursor", + None, + ) + + return connection_object, next_cursor, has_next_page def get_connection_objects( self, @@ -1114,29 +1128,30 @@ def get_connection_objects( filter_pages = get_filter_pages(query_filter, page_size) for filter_page in filter_pages: - total_count = page_size has_next_page = 1 - offset = 0 + # offset = 0 + current_cursor: Optional[str] = None while has_next_page: - count = ( - page_size - if offset + page_size < total_count - else total_count - offset - ) + # count = ( + # page_size + # if offset + page_size < total_count + # else total_count - offset + # ) + + filter_: str = make_filter(filter_page) + ( connection_objects, - total_count, + current_cursor, has_next_page, ) = self.get_connection_object_page( - query, - connection_type, - make_filter(filter_page), - count, - offset, + query=query, + connection_type=connection_type, + query_filter=filter_, + count=page_size, + current_cursor=current_cursor, ) - offset += count - yield from connection_objects.get(c.NODES) or [] def emit_workbooks(self) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py index f3a9c4a5aa201..dfa325066d8db 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py @@ -910,6 +910,41 @@ def make_filter(filter_dict: dict) -> str: return filter +def query_metadata_cursor_based_pagination( + server: Server, + main_query: str, + connection_name: str, + first: int, + after: Optional[str], + qry_filter: str = "", +) -> dict: + + query = f""" + query GetItems( + $first: Int, + $after: String + ) {{ + {connection_name} ( first: $first, after: $after, filter:{{ {qry_filter} }}) + {{ + nodes {main_query} + pageInfo {{ + hasNextPage + endCursor + }} + }} + }}""" + + result = server.metadata.query( + query=query, + variables={ + "first": first, + "after": after, + }, + ) + + return result + + def query_metadata( server: Server, main_query: str, @@ -961,7 +996,10 @@ def get_filter_pages(query_filter: dict, page_size: int) -> List[dict]: ) ] } - for start in range(0, len(ids), page_size) + for start in range( + 0, + len(ids), + ) ] return filter_pages