Skip to content

Commit

Permalink
Merge pull request #432 from VorTECHsa/feat-search-after
Browse files Browse the repository at this point in the history
Search After pagination for Voyages and Cargoes
  • Loading branch information
migueluzcategui authored Jul 18, 2023
2 parents 12c793a + 097f1e8 commit 49a7242
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 22 deletions.
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{
"python.linting.mypyEnabled": true,
"python.linting.enabled": true
"python.linting.enabled": true,
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
2 changes: 1 addition & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ filterwarnings =
error
ignore::DeprecationWarning:

doctest_optionflags= ELLIPSIS
doctest_optionflags= ELLIPSIS
7 changes: 7 additions & 0 deletions tests/endpoints/test_cargo_movements_real.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ class TestCargoMovementsReal(TestCaseUsingRealAPI):
def test_default_search(self):
CargoMovements().search(filter_activity="loading_state")

def test_full_search_with_pagination(self):
CargoMovements().search(
filter_activity="loading_state",
filter_time_min=datetime(2023, 4, 30),
filter_time_max=datetime(2023, 5, 1),
)

def test_search_returns_unique_results(self):
result = CargoMovements().search(
filter_activity="loading_state",
Expand Down
26 changes: 26 additions & 0 deletions tests/endpoints/test_voyages_search_enriched.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,32 @@ def test_search_returns_dataframe(self):

assert len(df) == 2

def test_search_pagination_on_df(self):
start = datetime(2021, 1, 1)
end = datetime(2021, 2, 28)

res = (
VoyagesSearchEnriched()
.search(
time_min=start, time_max=end, origins=rotterdam, columns="all"
)
.to_df()
)

assert len(res) > 1000

def test_search_pagination_on_lists(self):
start = datetime(2021, 1, 1)
end = datetime(2021, 2, 28)

res = (
VoyagesSearchEnriched()
.search(time_min=start, time_max=end, origins=rotterdam)
.to_list()
)

assert len(res) > 1000

def test_search_returns_some_cols(self):
start = datetime(2021, 6, 17)
end = datetime(2021, 6, 21)
Expand Down
10 changes: 10 additions & 0 deletions tests/mock_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
VESSELS_REFERENCE,
)
from vortexasdk.search_response import SearchResponse
from vortexasdk.utils import PAGINATION_STRATEGIES


def _read(example_file) -> List[Dict]:
Expand Down Expand Up @@ -49,3 +50,12 @@ def search(
self, resource: str, response_type=None, **data
) -> SearchResponse:
return {"data": MockVortexaClient._results[resource], "reference": {}}

def search_base(
self,
resource: str,
response_type=None,
pagination_strategy: PAGINATION_STRATEGIES = None,
**data,
) -> SearchResponse:
return {"data": MockVortexaClient._results[resource], "reference": {}}
93 changes: 82 additions & 11 deletions vortexasdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from urllib.parse import urlencode
import uuid

import json

from requests import Response
from tqdm import tqdm
from warnings import warn
Expand All @@ -23,7 +25,7 @@
retry_get,
retry_post,
)
from vortexasdk.utils import filter_empty_values
from vortexasdk.utils import filter_empty_values, PAGINATION_STRATEGIES
from vortexasdk.version_utils import is_sdk_version_outdated
from vortexasdk.version import __version__
from vortexasdk import __name__ as sdk_pkg_name
Expand Down Expand Up @@ -61,8 +63,12 @@ def get_record_with_params(
response = retry_get(url)
return _handle_response(response)["data"]

def search(
self, resource: str, response_type: Optional[str], **data
def search_base(
self,
resource: str,
response_type: Optional[str],
pagination_strategy: Optional[PAGINATION_STRATEGIES] = None,
**data,
) -> SearchResponse:
"""Search using `resource` using `**data` as filter params."""
url = self._create_url(resource)
Expand Down Expand Up @@ -96,14 +102,27 @@ def search(
# Only one page response, no need to send another request, so return flattened response
return {"reference": {}, "data": probe_response["data"]}
else:
# Multiple pages available, create offsets and fetch all responses
responses = self._process_multiple_pages(
total=total,
url=url,
payload=payload,
data=data,
headers=headers,
logger.debug(
f"Sending post request with pagination: {pagination_strategy}"
)
if pagination_strategy == PAGINATION_STRATEGIES.SEARCH_AFTER:
# Wait for the response to retrieve new request
responses = self._process_multiple_pages_with_search_after(
total=total,
url=url,
payload=payload,
data=data,
headers=headers,
)
else:
# Multiple pages available, create offsets and fetch all responses
responses = self._process_multiple_pages(
total=total,
url=url,
payload=payload,
data=data,
headers=headers,
)

flattened = self._flatten_response(responses)

Expand All @@ -113,8 +132,24 @@ def search(
)
warn(f"Actual: {len(flattened)}, expected: {total}")

logger.info(f"Total records returned: {total}")

return {"reference": {}, "data": flattened}

def search(
self, resource: str, response_type: Optional[str], **data
) -> SearchResponse:
return self.search_base(
resource, response_type, PAGINATION_STRATEGIES.OFFSET, **data
)

def searchWithSearchAfter(
self, resource: str, response_type: Optional[str], **data
) -> SearchResponse:
return self.search_base(
resource, response_type, PAGINATION_STRATEGIES.SEARCH_AFTER, **data
)

def _create_url(self, path: str) -> str:
return (
f"{API_URL}{path}?_sdk=python_v{__version__}&apikey={self.api_key}"
Expand Down Expand Up @@ -155,6 +190,36 @@ def _process_multiple_pages(

return pool.map(func, offsets)

def _process_multiple_pages_with_search_after(
self, total: int, url: str, payload: Dict, data: Dict, headers
) -> List:
responses = []
size = data.get("size", 500)

first_response = _send_post_request(url, payload, size, 0, headers)

responses.append(first_response.get("data", []))
next_request = first_response.get("next_request")
search_after = first_response.get("search_after")

if not next_request and search_after:
next_request = dict(payload)
next_request["search_after"] = search_after

while next_request:
logger.warn(f"Sending post request with search_after")
dict_response = _send_post_request(
url, next_request, size, 0, headers
)
responses.append(dict_response.get("data", []))
next_request = dict_response.get("next_request")
search_after = dict_response.get("search_after")
if not next_request and search_after:
next_request = dict(payload)
next_request["search_after"] = search_after

return responses

@staticmethod
def _cleanse_payload(payload: Dict) -> Dict:
exclude_params = payload.get("exclude", {})
Expand Down Expand Up @@ -242,7 +307,13 @@ def _handle_response(
"data": data,
"total": int(response.headers["x-total"]),
}

if response.headers["x-next-request"] != "undefined":
try:
decoded["search_after"] = json.loads(
response.headers["x-next-request"]
)
except Exception as e:
logger.error(f"error parsing search_after: {e}")
else:
decoded = response.json()
except JSONDecodeError:
Expand Down
2 changes: 1 addition & 1 deletion vortexasdk/endpoints/cargo_movements.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def search(
"size": self._MAX_PAGE_RESULT_SIZE,
}

response = super().search_with_client(**api_params)
response = super().search_with_client_with_search_after(**api_params)

return CargoMovementsResult(
records=response["data"], reference=response["reference"]
Expand Down
6 changes: 4 additions & 2 deletions vortexasdk/endpoints/voyages_search_enriched.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,14 @@ def search(
}

if columns is None:
response = super().search_with_client(**api_params)
response = super().search_with_client_with_search_after(
**api_params
)
return VoyagesSearchEnrichedListResult(
records=response["data"], reference=response["reference"]
)
else:
response = super().search_with_client(
response = super().search_with_client_with_search_after(
headers=self._CSV_HEADERS, **api_params
)
return VoyagesSearchEnrichedFlattenedResult(
Expand Down
42 changes: 37 additions & 5 deletions vortexasdk/operations.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from typing import Dict

from typing import Dict, Optional
from vortexasdk.api.id import ID
from vortexasdk.client import default_client
from vortexasdk.exceptions import InvalidAPIDataResponseException
from vortexasdk.logger import get_logger
from vortexasdk.search_response import SearchResponse
from vortexasdk.utils import filter_exact_match
from vortexasdk.utils import filter_exact_match, PAGINATION_STRATEGIES

logger = get_logger(__name__)

Expand Down Expand Up @@ -68,11 +67,12 @@ def __init__(self, resource):

# This method has been renamed from `search` to `search_with_client` to avoid type signature
# issues with the `search` method in each endpoint class.
def search_with_client(
def search_with_client_base(
self,
exact_term_match: bool = None,
response_type: str = None,
headers: dict = None,
pagination_strategy: Optional[PAGINATION_STRATEGIES] = None,
**api_params,
) -> SearchResponse:
"""
Expand All @@ -92,9 +92,11 @@ def search_with_client(
"""
logger.info(f"Searching {self.__class__.__name__}")
api_result = default_client().search(

api_result = default_client().search_base(
self._resource,
response_type=response_type,
pagination_strategy=pagination_strategy,
headers=headers,
**api_params,
)
Expand All @@ -114,6 +116,36 @@ def search_with_client(
else:
return api_result

def search_with_client(
self,
exact_term_match: bool = None,
response_type: str = None,
headers: dict = None,
**api_params,
) -> SearchResponse:
return self.search_with_client_base(
exact_term_match,
response_type,
headers,
PAGINATION_STRATEGIES.OFFSET,
**api_params,
)

def search_with_client_with_search_after(
self,
exact_term_match: bool = None,
response_type: str = None,
headers: dict = None,
**api_params,
) -> SearchResponse:
return self.search_with_client_base(
exact_term_match,
response_type,
headers,
PAGINATION_STRATEGIES.SEARCH_AFTER,
**api_params,
)


class Record:
"""Lookup Vortexa Data using an record ID."""
Expand Down
6 changes: 6 additions & 0 deletions vortexasdk/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, List, Union
from enum import Enum


def convert_to_list(a) -> List:
Expand Down Expand Up @@ -51,3 +52,8 @@ def sts_param_value(param):
return {"exclude": True, "x_filter": False}
else:
return {"exclude": False, "x_filter": False}


class PAGINATION_STRATEGIES(Enum):
OFFSET = "OFFSET"
SEARCH_AFTER = "SEARCH_AFTER"
2 changes: 1 addition & 1 deletion vortexasdk/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.52"
__version__ = "0.53"

0 comments on commit 49a7242

Please sign in to comment.