Skip to content

Commit

Permalink
feat(pagination): Refactor FHIRSearch with iterators, added iter and …
Browse files Browse the repository at this point in the history
…next to Bundle, and moved pagination logic to _utils.py
  • Loading branch information
LanaNYC committed Sep 20, 2024
1 parent 9967711 commit 08a8535
Show file tree
Hide file tree
Showing 10 changed files with 611 additions and 312 deletions.
2 changes: 1 addition & 1 deletion fhir-parser
127 changes: 127 additions & 0 deletions fhirclient/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import urllib
from typing import Optional, Iterable

import requests


# Use forward references to avoid circular imports
def _fetch_next_page(bundle: 'Bundle') -> Optional['Bundle']:
"""
Fetch the next page of results using the `next` link provided in the bundle.
Args:
bundle (Bundle): The FHIR Bundle containing the `next` link.
Returns:
Optional[Bundle]: The next page of results as a FHIR Bundle, or None if no "next" link is found.
"""
next_link = _get_next_link(bundle)
if next_link:
sanitized_next_link = _sanitize_next_link(next_link)
return _execute_pagination_request(sanitized_next_link)
return None


def _get_next_link(bundle: 'Bundle') -> Optional[str]:
"""
Extract the `next` link from the Bundle's links.
Args:
bundle (Bundle): The FHIR Bundle containing pagination links.
Returns:
Optional[str]: The URL of the next page if available, None otherwise.
"""
if not bundle.link:
return None

for link in bundle.link:
if link.relation == "next":
return link.url
return None


def _sanitize_next_link(next_link: str) -> str:
"""
Sanitize the `next` link to ensure it is safe to use.
Args:
next_link (str): The raw `next` link URL.
Returns:
str: The sanitized URL.
Raises:
ValueError: If the URL scheme or domain is invalid.
"""
parsed_url = urllib.parse.urlparse(next_link)

# Validate scheme and netloc (domain)
if parsed_url.scheme not in ["http", "https"]:
raise ValueError("Invalid URL scheme in `next` link.")
if not parsed_url.netloc:
raise ValueError("Invalid URL domain in `next` link.")

# Additional sanitization if necessary, e.g., removing dangerous query parameters
query_params = urllib.parse.parse_qs(parsed_url.query)
sanitized_query = {k: v for k, v in query_params.items()}

# Rebuild the sanitized URL
sanitized_url = urllib.parse.urlunparse(
(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
urllib.parse.urlencode(sanitized_query, doseq=True),
parsed_url.fragment,
)
)

return sanitized_url


def _execute_pagination_request(sanitized_url: str) -> Optional['Bundle']:
"""
Execute the request to retrieve the next page using the sanitized URL.
Args:
sanitized_url (str): The sanitized URL to fetch the next page.
Returns:
Optional[Bundle]: The next page of results as a FHIR Bundle, or None.
Raises:
HTTPError: If the request fails due to network issues or server errors.
"""
from fhirclient.models.bundle import Bundle # Import here to avoid circular import

try:
# Use requests.get directly to make the HTTP request
response = requests.get(sanitized_url)
response.raise_for_status()
next_bundle_data = response.json()
next_bundle = Bundle(next_bundle_data)

return next_bundle

except requests.exceptions.HTTPError as e:
# Handle specific HTTP errors as needed, possibly including retry logic
raise e


def iter_pages(first_bundle: 'Bundle') -> Iterable['Bundle']:
"""
Iterator that yields each page of results as a FHIR Bundle.
Args:
first_bundle (Bundle): The first Bundle to start pagination.
Yields:
Bundle: Each page of results as a FHIR Bundle.
"""
bundle = first_bundle
while bundle:
yield bundle
bundle = _fetch_next_page(bundle)

122 changes: 0 additions & 122 deletions fhirclient/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
import logging
import urllib
from typing import Optional, Iterable

import requests
from .models.bundle import Bundle

from .server import FHIRServer, FHIRUnauthorizedException, FHIRNotFoundException

__version__ = '4.2.0'
Expand Down Expand Up @@ -244,119 +238,3 @@ def from_state(self, state):

def save_state (self):
self._save_func(self.state)

# MARK: Pagination
def _fetch_next_page(self, bundle: Bundle) -> Optional[Bundle]:
"""
Fetch the next page of results using the `next` link provided in the bundle.
Args:
bundle (Bundle): The FHIR Bundle containing the `next` link.
Returns:
Optional[Bundle]: The next page of results as a FHIR Bundle, or None if no "next" link is found.
"""
next_link = self._get_next_link(bundle)
if next_link:
sanitized_next_link = self._sanitize_next_link(next_link)
return self._execute_pagination_request(sanitized_next_link)
return None

def _get_next_link(self, bundle: Bundle) -> Optional[str]:
"""
Extract the `next` link from the Bundle's links.
Args:
bundle (Bundle): The FHIR Bundle containing pagination links.
Returns:
Optional[str]: The URL of the next page if available, None otherwise.
"""
if not bundle.link:
return None

for link in bundle.link:
if link.relation == "next":
return link.url
return None

def _sanitize_next_link(self, next_link: str) -> str:
"""
Sanitize the `next` link to ensure it is safe to use.
Args:
next_link (str): The raw `next` link URL.
Returns:
str: The sanitized URL.
Raises:
ValueError: If the URL scheme or domain is invalid.
"""
parsed_url = urllib.parse.urlparse(next_link)

# Validate scheme and netloc (domain)
if parsed_url.scheme not in ["http", "https"]:
raise ValueError("Invalid URL scheme in `next` link.")
if not parsed_url.netloc:
raise ValueError("Invalid URL domain in `next` link.")

# Additional sanitization if necessary, e.g., removing dangerous query parameters
query_params = urllib.parse.parse_qs(parsed_url.query)
sanitized_query = {k: v for k, v in query_params.items()}

# Rebuild the sanitized URL
sanitized_url = urllib.parse.urlunparse(
(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
urllib.parse.urlencode(sanitized_query, doseq=True),
parsed_url.fragment,
)
)

return sanitized_url

def _execute_pagination_request(self, sanitized_url: str) -> Optional[Bundle]:
"""
Execute the request to retrieve the next page using the sanitized URL.
Args:
sanitized_url (str): The sanitized URL to fetch the next page.
Returns:
Optional[Bundle]: The next page of results as a FHIR Bundle, or None.
Raises:
HTTPError: If the request fails due to network issues or server errors.
"""
try:
# Use requests.get directly to make the HTTP request
response = requests.get(sanitized_url)
response.raise_for_status()
next_bundle_data = response.json()
next_bundle = Bundle(next_bundle_data)

return next_bundle

except requests.exceptions.HTTPError as e:
# Handle specific HTTP errors as needed, possibly including retry logic
raise e

def iter_pages(self, first_bundle: Bundle) -> Iterable[Bundle]:
"""
Iterator that yields each page of results as a FHIR Bundle.
Args:
first_bundle (Bundle): The first Bundle to start pagination.
Yields:
Bundle: Each page of results as a FHIR Bundle.
"""
bundle = first_bundle
while bundle:
yield bundle
bundle = self._fetch_next_page(bundle)

40 changes: 28 additions & 12 deletions fhirclient/models/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,35 @@ def __init__(self, jsondict=None, strict=True):
Type `str`. """

super(Bundle, self).__init__(jsondict=jsondict, strict=strict)


def __iter__(self):
""" Makes the Bundle itself an iterator by returning an iterator over its entries. """
if self.entry is None:
self._entry_iter = iter([])
else:
self._entry_iter = iter(self.entry)
return self

def __next__(self):
""" Returns the next BundleEntry in the Bundle's entry list using the internal iterator. """
# return next(self._entry_iter)

if not hasattr(self, '_entry_iter'):
self.__iter__()
return next(self._entry_iter)

def elementProperties(self):
js = super(Bundle, self).elementProperties()
js.extend([
("entry", "entry", BundleEntry, True, None, False),
("identifier", "identifier", identifier.Identifier, False, None, False),
("link", "link", BundleLink, True, None, False),
("signature", "signature", signature.Signature, False, None, False),
("timestamp", "timestamp", fhirinstant.FHIRInstant, False, None, False),
("total", "total", int, False, None, False),
("type", "type", str, False, None, True),
])
return js
js = super(Bundle, self).elementProperties()
js.extend([
("entry", "entry", BundleEntry, True, None, False),
("identifier", "identifier", identifier.Identifier, False, None, False),
("link", "link", BundleLink, True, None, False),
("signature", "signature", signature.Signature, False, None, False),
("timestamp", "timestamp", fhirinstant.FHIRInstant, False, None, False),
("total", "total", int, False, None, False),
("type", "type", str, False, None, True),
])
return js


from . import backboneelement
Expand Down
44 changes: 43 additions & 1 deletion fhirclient/models/fhirsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
# 2014, SMART Health IT.

import logging
import warnings
from typing import Iterator


from . import fhirreference
from .._utils import iter_pages

try:
from urllib import quote_plus
Expand Down Expand Up @@ -124,14 +128,35 @@ def perform(self, server):
bundle = bundle.Bundle(res)
bundle.origin_server = server
return bundle


# Use forward references to avoid circular imports
def perform_iter(self, server) -> Iterator['Resource']:
""" Perform the search by calling `perform` and return an iterator that yields
Bundle instances.
:param server: The server against which to perform the search
:returns: An iterator of Bundle instances
"""

first_bundle = self.perform(server)
if not first_bundle:
return iter([])
yield first_bundle

def perform_resources(self, server):
""" Performs the search by calling `perform`, then extracts all Bundle
entries and returns a list of Resource instances.
:param server: The server against which to perform the search
:returns: A list of Resource instances
"""
# Old method with deprecation warning
warnings.warn(
"perform_resources() is deprecated and will be removed in a future release. "
"Please use perform_resources_iter() instead.",
DeprecationWarning,
)

bundle = self.perform(server)
resources = []
if bundle is not None and bundle.entry is not None:
Expand All @@ -140,6 +165,23 @@ def perform_resources(self, server):

return resources

# Use forward references to avoid circular imports
def perform_resources_iter(self, server) -> Iterator['Resource']:
""" Performs the search by calling `perform`, then extracts all Bundle
entries and returns an iterator of Resource instances.
:param server: The server against which to perform the search
:returns: An iterator of Resource instances
"""
first_bundle = self.perform(server)

if not first_bundle or not first_bundle.entry:
return iter([])

for bundle in iter_pages(first_bundle):
if bundle.entry:
yield from (entry.resource for entry in bundle.entry)


class FHIRSearchParam(object):
""" Holds one search parameter.
Expand Down
Loading

0 comments on commit 08a8535

Please sign in to comment.