From 0128ba8e4620c47a6eca0b11b708665b2077394d Mon Sep 17 00:00:00 2001 From: Katie Peters Date: Tue, 17 Dec 2024 17:37:13 -0500 Subject: [PATCH] DATA-3443: Add export_tabular_data to data client (#800) --- CONTRIBUTING.md | 2 +- src/viam/app/data_client.py | 140 +++++++++++++++++++++++++++++++++--- tests/mocks/services.py | 14 ++++ tests/test_data_client.py | 46 +++++++++++- 4 files changed, 190 insertions(+), 12 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index dfe5cc4e8..5412dbe79 100755 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -61,7 +61,7 @@ We use [`uv`](https://docs.astral.sh/uv/) to manage our environments and depende 4. When you're done making changes, check that your changes conform to any code formatting requirements and pass any tests. -- When testing, make sure you use the correct virtual environment by running either `uv make test` or `source .venv/bin/activate; make test` +- When testing, make sure you use the correct virtual environment by running either `uv run make test` or `source .venv/bin/activate; make test` 5. Commit your changes and open a pull request. diff --git a/src/viam/app/data_client.py b/src/viam/app/data_client.py index 37d1f4f1a..7b68f59ac 100644 --- a/src/viam/app/data_client.py +++ b/src/viam/app/data_client.py @@ -23,6 +23,7 @@ BinaryID, BoundingBoxLabelsByFilterRequest, BoundingBoxLabelsByFilterResponse, + CaptureInterval, CaptureMetadata, ConfigureDatabaseUserRequest, DataRequest, @@ -33,6 +34,8 @@ DeleteBinaryDataByIDsResponse, DeleteTabularDataRequest, DeleteTabularDataResponse, + ExportTabularDataRequest, + ExportTabularDataResponse, Filter, GetDatabaseConnectionRequest, GetDatabaseConnectionResponse, @@ -145,6 +148,69 @@ def __eq__(self, other: object) -> bool: return str(self) == str(other) return False + @dataclass + class TabularDataPoint: + """Represents a tabular data point and its associated metadata.""" + + part_id: str + """The robot part ID""" + + resource_name: str + """The resource name""" + + resource_subtype: str + """The resource subtype. Ex: `rdk:component:sensor`""" + + method_name: str + """The method used for data capture. Ex" `Readings`""" + + time_captured: datetime + """The time at which the data point was captured""" + + organization_id: str + """The organization ID""" + + location_id: str + """The location ID""" + + robot_name: str + """The robot name""" + + robot_id: str + """The robot ID""" + + part_name: str + """The robot part name""" + + method_parameters: Mapping[str, ValueTypes] + """Additional parameters associated with the data capture method""" + + tags: List[str] + """A list of tags associated with the data point""" + + payload: Mapping[str, ValueTypes] + """The captured data""" + + def __str__(self) -> str: + return ( + f"TabularDataPoint(" + f"robot='{self.robot_name}' (id={self.robot_id}), " + f"part='{self.part_name}' (id={self.part_id}), " + f"resource='{self.resource_name}' ({self.resource_subtype}), " + f"method='{self.method_name}', " + f"org={self.organization_id}, " + f"location={self.location_id}, " + f"time='{self.time_captured.isoformat()}', " + f"params={self.method_parameters}, " + f"tags={self.tags}, " + f"payload={self.payload})" + ) + + def __eq__(self, other: object) -> bool: + if isinstance(other, DataClient.TabularDataPoint): + return str(self) == str(other) + return False + def __init__(self, channel: Channel, metadata: Mapping[str, str]): """Create a `DataClient` that maintains a connection to app. @@ -254,7 +320,6 @@ async def tabular_data_by_sql(self, organization_id: str, sql_query: str) -> Lis sql_query="SELECT * FROM readings LIMIT 5" ) - Args: organization_id (str): The ID of the organization that owns the data. You can obtain your organization ID from the Viam app's organization settings page. @@ -284,7 +349,6 @@ async def tabular_data_by_mql(self, organization_id: str, mql_binary: List[bytes print(f"Tabular Data: {tabular_data}") - Args: organization_id (str): The ID of the organization that owns the data. You can obtain your organization ID from the Viam app's organization settings page. @@ -307,13 +371,12 @@ async def get_latest_tabular_data( :: - time_captured, time_synced, payload = await data_client.get_latest_tabular_data( - part_id="", - resource_name="", - resource_subtype="", - method_name="" - ) - + time_captured, time_synced, payload = await data_client.get_latest_tabular_data( + part_id="", + resource_name="", + resource_subtype="", + method_name="" + ) Args: part_id (str): The ID of the part that owns the data. @@ -327,6 +390,7 @@ async def get_latest_tabular_data( datetime: The time captured, datetime: The time synced, Dict[str, ValueTypes]: The latest tabular data captured from the specified data source. + For more information, see `Data Client API `_. """ @@ -338,6 +402,64 @@ async def get_latest_tabular_data( return None return response.time_captured.ToDatetime(), response.time_synced.ToDatetime(), struct_to_dict(response.payload) + async def export_tabular_data( + self, part_id: str, resource_name: str, resource_subtype: str, method_name: str, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None + ) -> List[TabularDataPoint]: + """Obtain unified tabular data and metadata from the specified data source. + + :: + + tabular_data = await data_client.export_tabular_data( + part_id="", + resource_name="", + resource_subtype="", + method_name="", + start_time="" + end_time="" + ) + + print(f"My data: {tabular_data}") + + Args: + part_id (str): The ID of the part that owns the data. + resource_name (str): The name of the requested resource that captured the data. + resource_subtype (str): The subtype of the requested resource that captured the data. + method_name (str): The data capture method name. + start_time (datetime): Optional start time for requesting a specific range of data. + end_time (datetime): Optional end time for requesting a specific range of data. + + Returns: + List[TabularDataPoint]: The unified tabular data and metadata. + + For more information, see `Data Client API `_. + """ + + interval=CaptureInterval(start=datetime_to_timestamp(start_time), end=datetime_to_timestamp(end_time)) + request = ExportTabularDataRequest( + part_id=part_id, resource_name=resource_name, resource_subtype=resource_subtype, method_name=method_name, interval=interval + ) + response: List[ExportTabularDataResponse] = await self._data_client.ExportTabularData(request, metadata=self._metadata) + + return [ + DataClient.TabularDataPoint( + part_id=resp.part_id, + resource_name=resp.resource_name, + resource_subtype=resp.resource_subtype, + method_name=resp.method_name, + time_captured=resp.time_captured.ToDatetime(), + organization_id=resp.organization_id, + location_id=resp.location_id, + robot_name=resp.robot_name, + robot_id=resp.robot_id, + part_name=resp.part_name, + method_parameters=struct_to_dict(resp.method_parameters), + tags=list(resp.tags), + payload=struct_to_dict(resp.payload) + ) + for resp in response + ] + + async def binary_data_by_filter( self, filter: Optional[Filter] = None, diff --git a/tests/mocks/services.py b/tests/mocks/services.py index 4fa092496..c44a531cd 100644 --- a/tests/mocks/services.py +++ b/tests/mocks/services.py @@ -201,6 +201,8 @@ DeleteBinaryDataByIDsResponse, DeleteTabularDataRequest, DeleteTabularDataResponse, + ExportTabularDataRequest, + ExportTabularDataResponse, GetDatabaseConnectionRequest, GetDatabaseConnectionResponse, GetLatestTabularDataRequest, @@ -767,6 +769,7 @@ class MockData(UnimplementedDataServiceBase): def __init__( self, tabular_response: List[DataClient.TabularData], + tabular_export_response: List[ExportTabularDataResponse], tabular_query_response: List[Dict[str, Union[ValueTypes, datetime]]], binary_response: List[BinaryData], delete_remove_response: int, @@ -775,6 +778,7 @@ def __init__( hostname_response: str, ): self.tabular_response = tabular_response + self.tabular_export_response = tabular_export_response self.tabular_query_response = tabular_query_response self.binary_response = binary_response self.delete_remove_response = delete_remove_response @@ -975,6 +979,16 @@ async def GetLatestTabularData(self, stream: Stream[GetLatestTabularDataRequest, data = dict_to_struct(self.tabular_response[0].data) await stream.send_message(GetLatestTabularDataResponse(time_captured=timestamp, time_synced=timestamp, payload=data)) + async def ExportTabularData(self, stream: Stream[ExportTabularDataRequest, ExportTabularDataResponse]) -> None: + request = await stream.recv_message() + assert request is not None + self.part_id = request.part_id + self.resource_name = request.resource_name + self.resource_subtype = request.resource_subtype + self.method_name = request.method_name + self.interval = request.interval + for tabular_data in self.tabular_export_response: + await stream.send_message(tabular_data) class MockDataset(DatasetServiceBase): def __init__(self, create_response: str, datasets_response: Sequence[Dataset]): diff --git a/tests/test_data_client.py b/tests/test_data_client.py index 7ccd6837e..995bc4850 100644 --- a/tests/test_data_client.py +++ b/tests/test_data_client.py @@ -2,12 +2,13 @@ from typing import List import pytest +from google.protobuf.struct_pb2 import Struct from google.protobuf.timestamp_pb2 import Timestamp from grpclib.testing import ChannelFor from viam.app.data_client import DataClient -from viam.proto.app.data import Annotations, BinaryData, BinaryID, BinaryMetadata, BoundingBox, CaptureMetadata, Filter, Order -from viam.utils import create_filter +from viam.proto.app.data import Annotations, BinaryData, BinaryID, BinaryMetadata, BoundingBox, CaptureInterval, CaptureMetadata, ExportTabularDataResponse, Filter, Order +from viam.utils import create_filter, dict_to_struct from .mocks.services import MockData @@ -56,6 +57,7 @@ bbox_labels=BBOX_LABELS, dataset_id=DATASET_ID, ) +INTERVAL=CaptureInterval(start=START_TS, end=END_TS) FILE_ID = "file_id" BINARY_ID = BinaryID(file_id=FILE_ID, organization_id=ORG_ID, location_id=LOCATION_ID) @@ -101,6 +103,20 @@ ) TABULAR_RESPONSE = [DataClient.TabularData(TABULAR_DATA, TABULAR_METADATA, START_DATETIME, END_DATETIME)] +TABULAR_EXPORT_RESPONSE = [ExportTabularDataResponse( + part_id=TABULAR_METADATA.part_id, + resource_name = TABULAR_METADATA.component_name, + resource_subtype = TABULAR_METADATA.component_type, + time_captured = END_TS, + organization_id = TABULAR_METADATA.organization_id, + location_id =TABULAR_METADATA.location_id, + robot_name = TABULAR_METADATA.robot_name, + robot_id = TABULAR_METADATA.robot_id, + part_name = TABULAR_METADATA.part_name, + method_parameters = Struct(), + tags = TABULAR_METADATA.tags, + payload = dict_to_struct(TABULAR_DATA), +)] TABULAR_QUERY_RESPONSE = [ {"key1": START_DATETIME, "key2": "2", "key3": [1, 2, 3], "key4": {"key4sub1": END_DATETIME}}, ] @@ -117,6 +133,7 @@ def service() -> MockData: return MockData( tabular_response=TABULAR_RESPONSE, + tabular_export_response=TABULAR_EXPORT_RESPONSE, tabular_query_response=TABULAR_QUERY_RESPONSE, binary_response=BINARY_RESPONSE, delete_remove_response=DELETE_REMOVE_RESPONSE, @@ -179,6 +196,31 @@ async def test_get_latest_tabular_data(self, service: MockData): assert time_captured == time assert time_synced == time + async def test_export_tabular_data(self, service: MockData): + async with ChannelFor([service]) as channel: + client = DataClient(channel, DATA_SERVICE_METADATA) + tabular_data = await client.export_tabular_data(PART_ID, COMPONENT_NAME, COMPONENT_TYPE, METHOD, START_DATETIME, END_DATETIME) + assert tabular_data is not None + for tabular_datum in tabular_data: + assert tabular_datum is not None + assert tabular_datum.part_id == TABULAR_METADATA.part_id + assert tabular_datum.resource_name == TABULAR_METADATA.component_name + assert tabular_datum.resource_subtype == TABULAR_METADATA.component_type + assert tabular_datum.time_captured == END_DATETIME + assert tabular_datum.organization_id == TABULAR_METADATA.organization_id + assert tabular_datum.location_id == TABULAR_METADATA.location_id + assert tabular_datum.robot_name == TABULAR_METADATA.robot_name + assert tabular_datum.robot_id == TABULAR_METADATA.robot_id + assert tabular_datum.part_name == TABULAR_METADATA.part_name + assert tabular_datum.method_parameters == TABULAR_METADATA.method_parameters + assert tabular_datum.tags == TABULAR_METADATA.tags + assert tabular_datum.payload == TABULAR_DATA + assert service.part_id == PART_ID + assert service.resource_name == COMPONENT_NAME + assert service.resource_subtype == COMPONENT_TYPE + assert service.method_name == METHOD + assert service.interval == INTERVAL + async def test_binary_data_by_filter(self, service: MockData): async with ChannelFor([service]) as channel: client = DataClient(channel, DATA_SERVICE_METADATA)