Skip to content

Commit

Permalink
cursor based pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
sid-acryl committed Aug 23, 2024
1 parent 88b3893 commit fcc2450
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 41 deletions.
95 changes: 55 additions & 40 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
make_fine_grained_lineage_class,
make_upstream_class,
published_datasource_graphql_query,
query_metadata,
query_metadata_cursor_based_pagination,
sheet_graphql_query,
tableau_field_to_schema_field,
workbook_graphql_query,
Expand Down Expand Up @@ -972,22 +972,31 @@ def get_connection_object_page(
query: str,
connection_type: str,
query_filter: str,
current_cursor: Optional[str], # initial value is None
count: int = 0,
offset: int = 0,
retry_on_auth_error: bool = True,
retries_remaining: Optional[int] = None,
) -> Tuple[dict, int, int]:
retries_remaining = retries_remaining or self.config.max_retries

logger.debug(
f"Query {connection_type} to get {count} objects with offset {offset}"
f"Query {connection_type} to get {count} objects with offset {current_cursor}"
f" and filter {query_filter}"
)
try:
assert self.server is not None
query_data = query_metadata(
self.server, query, connection_type, count, offset, query_filter
)
query_data = query_metadata_cursor_based_pagination(
server=self.server,
main_query=query,
connection_name=connection_type,
first=count,
after=current_cursor,
qry_filter=query_filter,
)

# query_data = query_metadata(
# self.server, query, connection_type, count, offset, query_filter
# )
except REAUTHENTICATE_ERRORS:
if not retry_on_auth_error:
raise
Expand All @@ -997,11 +1006,11 @@ def get_connection_object_page(
# will be thrown and we need to re-authenticate and retry.
self._re_authenticate()
return self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
query=query,
connection_type=connection_type,
query_filter=query_filter,
count=count,
current_cursor=current_cursor,
retry_on_auth_error=False,
retries_remaining=retries_remaining,
)
Expand All @@ -1015,13 +1024,13 @@ def get_connection_object_page(
if retries_remaining <= 0:
raise
return self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
query=query,
connection_type=connection_type,
query_filter=query_filter,
count=count,
current_cursor=current_cursor,
retry_on_auth_error=False,
retries_remaining=retries_remaining - 1,
retries_remaining=retries_remaining,
)

if c.ERRORS in query_data:
Expand Down Expand Up @@ -1082,23 +1091,28 @@ def get_connection_object_page(
)
time.sleep(backoff_time)
return self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
query=query,
connection_type=connection_type,
query_filter=query_filter,
count=count,
current_cursor=current_cursor,
retry_on_auth_error=False,
retries_remaining=retries_remaining - 1,
retries_remaining=retries_remaining,
)
raise RuntimeError(f"Query {connection_type} error: {errors}")

connection_object = query_data.get(c.DATA, {}).get(connection_type, {})

total_count = connection_object.get(c.TOTAL_COUNT, 0)
has_next_page = connection_object.get(c.PAGE_INFO, {}).get(
c.HAS_NEXT_PAGE, False
)
return connection_object, total_count, has_next_page

next_cursor = connection_object.get(c.PAGE_INFO, {}).get(
"endCursor",
None,
)

return connection_object, next_cursor, has_next_page

def get_connection_objects(
self,
Expand All @@ -1114,29 +1128,30 @@ def get_connection_objects(
filter_pages = get_filter_pages(query_filter, page_size)

for filter_page in filter_pages:
total_count = page_size
has_next_page = 1
offset = 0
# offset = 0
current_cursor: Optional[str] = None
while has_next_page:
count = (
page_size
if offset + page_size < total_count
else total_count - offset
)
# count = (
# page_size
# if offset + page_size < total_count
# else total_count - offset
# )

filter_: str = make_filter(filter_page)

(
connection_objects,
total_count,
current_cursor,
has_next_page,
) = self.get_connection_object_page(
query,
connection_type,
make_filter(filter_page),
count,
offset,
query=query,
connection_type=connection_type,
query_filter=filter_,
count=page_size,
current_cursor=current_cursor,
)

offset += count

yield from connection_objects.get(c.NODES) or []

def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,41 @@ def make_filter(filter_dict: dict) -> str:
return filter


def query_metadata_cursor_based_pagination(
server: Server,
main_query: str,
connection_name: str,
first: int,
after: Optional[str],
qry_filter: str = "",
) -> dict:

query = f"""
query GetItems(
$first: Int,
$after: String
) {{
{connection_name} ( first: $first, after: $after, filter:{{ {qry_filter} }})
{{
nodes {main_query}
pageInfo {{
hasNextPage
endCursor
}}
}}
}}"""

result = server.metadata.query(
query=query,
variables={
"first": first,
"after": after,
},
)

return result


def query_metadata(
server: Server,
main_query: str,
Expand Down Expand Up @@ -961,7 +996,10 @@ def get_filter_pages(query_filter: dict, page_size: int) -> List[dict]:
)
]
}
for start in range(0, len(ids), page_size)
for start in range(
0,
len(ids),
)
]

return filter_pages

0 comments on commit fcc2450

Please sign in to comment.