From 975dcd286f2aabea5934529fc108c5e95a96f56d Mon Sep 17 00:00:00 2001 From: altescy Date: Tue, 24 Oct 2023 19:06:22 +0900 Subject: [PATCH 1/3] remove unused attribute --- queuery_client/response.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/queuery_client/response.py b/queuery_client/response.py index c2f8fcb..4556ff5 100644 --- a/queuery_client/response.py +++ b/queuery_client/response.py @@ -38,7 +38,6 @@ def __init__( ): self._response = response self._data_file_urls = response.data_file_urls - self._cursor = 0 self._parser = csv.reader self._session = Session() self._enable_cast = enable_cast @@ -56,8 +55,6 @@ def _open(self, url: str) -> List[List[str]]: data = self._session.get(url).content response = gzip.decompress(data).decode() reader = csv.reader(StringIO(response), escapechar="\\") - - self._cursor += 1 return list(reader) def fetch_manifest(self, force: bool = False) -> Dict[str, Any]: From 596e4737510cea00e30978fd4a2479bfaf335599 Mon Sep 17 00:00:00 2001 From: altescy Date: Wed, 25 Oct 2023 19:09:55 +0900 Subject: [PATCH 2/3] add use_manifest option --- queuery_client/client.py | 3 +++ queuery_client/queuery_client.py | 2 ++ queuery_client/response.py | 43 ++++++++++++++++++++++++++------ queuery_client/util.py | 26 +++++++++++++++++++ tests/test_response.py | 15 ++++++----- 5 files changed, 76 insertions(+), 13 deletions(-) create mode 100644 queuery_client/util.py diff --git a/queuery_client/client.py b/queuery_client/client.py index df56925..09ee5af 100644 --- a/queuery_client/client.py +++ b/queuery_client/client.py @@ -23,6 +23,7 @@ def __init__( timeout: int = 300, enable_cast: bool = False, session: Optional[Session] = None, + use_manifest: Optional[bool] = None, ) -> None: endpoint = endpoint or os.getenv("QUEUERY_ENDPOINT") if endpoint is None: @@ -33,6 +34,7 @@ def __init__( self._timeout = timeout self._enable_cast = enable_cast self._session = session or Session() + self._use_manifest = use_manifest @property def _auth(self) -> Optional[Tuple[str, str]]: @@ -78,6 +80,7 @@ def get_body(self, qid: int) -> Response: response=body, enable_cast=self._enable_cast, session=self._session, + use_manifest=self._use_manifest, ) def wait_for(self, qid: int) -> Response: diff --git a/queuery_client/queuery_client.py b/queuery_client/queuery_client.py index f3bbddf..99c9edf 100644 --- a/queuery_client/queuery_client.py +++ b/queuery_client/queuery_client.py @@ -14,12 +14,14 @@ def __init__( timeout: int = 300, enable_cast: bool = False, session: Optional[Session] = None, + use_manifest: Optional[bool] = None, ) -> None: self._client = Client( endpoint=endpoint, timeout=timeout, enable_cast=enable_cast, session=session, + use_manifest=use_manifest, ) def run(self, sql: str) -> Response: diff --git a/queuery_client/response.py b/queuery_client/response.py index 4556ff5..35f0f52 100644 --- a/queuery_client/response.py +++ b/queuery_client/response.py @@ -7,6 +7,7 @@ from requests import Session from queuery_client.cast import cast_row +from queuery_client.util import SizedIterator try: import pandas @@ -35,21 +36,33 @@ def __init__( response: ResponseBody, enable_cast: bool = False, session: Optional[Session] = None, + use_manifest: Optional[bool] = None, ): + if enable_cast and use_manifest is False: + raise ValueError("enable_cast is not available when use_manifest is False.") + self._response = response self._data_file_urls = response.data_file_urls self._parser = csv.reader self._session = Session() self._enable_cast = enable_cast + self._use_manifest = use_manifest or enable_cast self._manifest: Optional[Dict[str, Any]] = None def __iter__(self) -> Iterator[List[Any]]: - for url in self._data_file_urls: - for row in self._open(url): - if self._enable_cast: - yield cast_row(row, self.fetch_manifest()) - else: - yield row + def get_iterator() -> Iterator[List[Any]]: + for url in self._data_file_urls: + for row in self._open(url): + if self._enable_cast: + yield cast_row(row, self.fetch_manifest()) + else: + yield row + + if self._use_manifest: + record_count = self.fetch_record_count() + return SizedIterator(get_iterator(), record_count) + + return get_iterator() def _open(self, url: str) -> List[List[str]]: data = self._session.get(url).content @@ -58,15 +71,27 @@ def _open(self, url: str) -> List[List[str]]: return list(reader) def fetch_manifest(self, force: bool = False) -> Dict[str, Any]: + if not self._use_manifest: + raise RuntimeError("Manifest file is not available.") if self._manifest is None or force: if not self._response.manifest_file_url: - raise RuntimeError("Response does not contain manifest_file_url.") + raise RuntimeError( + "Manifest is not available because response does not contain manifest_file_url." + ) manifest = self._session.get(self._response.manifest_file_url).json() assert isinstance(manifest, dict) self._manifest = manifest return self._manifest + def fetch_record_count(self) -> int: + manifest = self.fetch_manifest() + return int(manifest["meta"]["record_count"]) + + def fetch_column_names(self) -> List[str]: + manifest = self.fetch_manifest() + return [x["name"] for x in manifest["schema"]["elements"]] + @overload def read(self) -> List[List[Any]]: ... @@ -91,6 +116,10 @@ def read( "pandas is not availabe. Please make sure that " "pandas is successfully installed to use use_pandas option." ) + + if self._use_manifest: + return pandas.DataFrame(elems, columns=self.fetch_column_names()) + return pandas.DataFrame(elems) return elems diff --git a/queuery_client/util.py b/queuery_client/util.py new file mode 100644 index 0000000..45292ea --- /dev/null +++ b/queuery_client/util.py @@ -0,0 +1,26 @@ +from typing import Generic, Iterator, TypeVar + +T = TypeVar("T") + + +class SizedIterator(Generic[T]): + """ + A wrapper for an iterator that knows its size. + + Args: + iterator: The iterator. + size: The size of the iterator. + """ + + def __init__(self, iterator: Iterator[T], size: int): + self.iterator = iterator + self.size = size + + def __iter__(self) -> Iterator[T]: + return self.iterator + + def __next__(self) -> T: + return next(self.iterator) + + def __len__(self) -> int: + return self.size diff --git a/tests/test_response.py b/tests/test_response.py index 2a1af3f..bc26ca2 100644 --- a/tests/test_response.py +++ b/tests/test_response.py @@ -44,12 +44,15 @@ def test_response_with_type_cast() -> None: manifest_response = MockResponse( b""" - {"schema": { - "elements": [ - {"name": "id", "type": {"base": "integer"}}, - {"name": "title", "type": {"base": "character varying"}} - ] - }} + { + "schema": { + "elements": [ + {"name": "id", "type": {"base": "integer"}}, + {"name": "title", "type": {"base": "character varying"}} + ] + }, + "meta": {"record_count": 2} + } """, 200, ) From e20fbd4a0a96426a19ca2d353985fa0952555623 Mon Sep 17 00:00:00 2001 From: altescy Date: Wed, 25 Oct 2023 19:30:47 +0900 Subject: [PATCH 3/3] add map method --- queuery_client/response.py | 25 +++++++++++++++++++++++- tests/test_response.py | 39 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/queuery_client/response.py b/queuery_client/response.py index 35f0f52..f0bad63 100644 --- a/queuery_client/response.py +++ b/queuery_client/response.py @@ -2,7 +2,7 @@ import dataclasses import gzip from io import StringIO -from typing import Any, Dict, Iterator, List, Literal, Optional, Union, overload +from typing import Any, Callable, Dict, Iterator, List, Literal, Optional, Tuple, Type, TypeVar, Union, overload from requests import Session @@ -15,6 +15,9 @@ pandas = None +T = TypeVar("T") + + @dataclasses.dataclass class ResponseBody: id: int @@ -123,3 +126,23 @@ def read( return pandas.DataFrame(elems) return elems + + def map(self, target: Union[Type[T], Callable[..., T]]) -> Iterator[T]: + column_names = self.fetch_column_names() if self._use_manifest else None + + def convert_to_args(row: List[Any]) -> Tuple[List[Any], Dict[str, Any]]: + if column_names is None: + return row, {} + return [], {name: value for name, value in zip(column_names, row)} + + def map_to_target(row: List[Any]) -> T: + args, kwargs = convert_to_args(row) + return target(*args, **kwargs) + + iterator = map(map_to_target, self) + + if self._use_manifest: + record_count = self.fetch_record_count() + return SizedIterator(iterator, record_count) + + return iterator diff --git a/tests/test_response.py b/tests/test_response.py index bc26ca2..17d8216 100644 --- a/tests/test_response.py +++ b/tests/test_response.py @@ -1,3 +1,4 @@ +import dataclasses import gzip import json from typing import Any, Dict @@ -63,3 +64,41 @@ def test_response_with_type_cast() -> None: with mock.patch("requests.Session.get", return_value=data_response): data = response.read() assert data == [[1, "test_recipe1"], [2, "test_recipe2"]] + + +def test_response_with_map() -> None: + @dataclasses.dataclass + class Item: + id: int + title: str + + response_body = ResponseBody( + id=1, + data_file_urls=["https://queuery.example.com/data"], + error=None, + status="success", + manifest_file_url="https://queuery.example.com/manifest", + ) + response = Response(response_body, enable_cast=True) + + manifest_response = MockResponse( + b""" + { + "schema": { + "elements": [ + {"name": "id", "type": {"base": "integer"}}, + {"name": "title", "type": {"base": "character varying"}} + ] + }, + "meta": {"record_count": 2} + } + """, + 200, + ) + with mock.patch("requests.Session.get", return_value=manifest_response): + response.fetch_manifest() + + data_response = MockResponse(gzip.compress(b'"1","test_recipe1"\n"2","test_recipe2"'), 200) + with mock.patch("requests.Session.get", return_value=data_response): + data = list(response.map(Item)) + assert data == [Item(id=1, title="test_recipe1"), Item(id=2, title="test_recipe2")]