Skip to content

Commit

Permalink
DATA-3443: Add export_tabular_data to data client (viamrobotics#800)
Browse files Browse the repository at this point in the history
  • Loading branch information
katiepeters authored Dec 17, 2024
1 parent fe27252 commit 0128ba8
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 12 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ We use [`uv`](https://docs.astral.sh/uv/) to manage our environments and depende

4. When you're done making changes, check that your changes conform to any code formatting requirements and pass any tests.

- When testing, make sure you use the correct virtual environment by running either `uv make test` or `source .venv/bin/activate; make test`
- When testing, make sure you use the correct virtual environment by running either `uv run make test` or `source .venv/bin/activate; make test`

5. Commit your changes and open a pull request.

Expand Down
140 changes: 131 additions & 9 deletions src/viam/app/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
BinaryID,
BoundingBoxLabelsByFilterRequest,
BoundingBoxLabelsByFilterResponse,
CaptureInterval,
CaptureMetadata,
ConfigureDatabaseUserRequest,
DataRequest,
Expand All @@ -33,6 +34,8 @@
DeleteBinaryDataByIDsResponse,
DeleteTabularDataRequest,
DeleteTabularDataResponse,
ExportTabularDataRequest,
ExportTabularDataResponse,
Filter,
GetDatabaseConnectionRequest,
GetDatabaseConnectionResponse,
Expand Down Expand Up @@ -145,6 +148,69 @@ def __eq__(self, other: object) -> bool:
return str(self) == str(other)
return False

@dataclass
class TabularDataPoint:
"""Represents a tabular data point and its associated metadata."""

part_id: str
"""The robot part ID"""

resource_name: str
"""The resource name"""

resource_subtype: str
"""The resource subtype. Ex: `rdk:component:sensor`"""

method_name: str
"""The method used for data capture. Ex" `Readings`"""

time_captured: datetime
"""The time at which the data point was captured"""

organization_id: str
"""The organization ID"""

location_id: str
"""The location ID"""

robot_name: str
"""The robot name"""

robot_id: str
"""The robot ID"""

part_name: str
"""The robot part name"""

method_parameters: Mapping[str, ValueTypes]
"""Additional parameters associated with the data capture method"""

tags: List[str]
"""A list of tags associated with the data point"""

payload: Mapping[str, ValueTypes]
"""The captured data"""

def __str__(self) -> str:
return (
f"TabularDataPoint("
f"robot='{self.robot_name}' (id={self.robot_id}), "
f"part='{self.part_name}' (id={self.part_id}), "
f"resource='{self.resource_name}' ({self.resource_subtype}), "
f"method='{self.method_name}', "
f"org={self.organization_id}, "
f"location={self.location_id}, "
f"time='{self.time_captured.isoformat()}', "
f"params={self.method_parameters}, "
f"tags={self.tags}, "
f"payload={self.payload})"
)

def __eq__(self, other: object) -> bool:
if isinstance(other, DataClient.TabularDataPoint):
return str(self) == str(other)
return False

def __init__(self, channel: Channel, metadata: Mapping[str, str]):
"""Create a `DataClient` that maintains a connection to app.
Expand Down Expand Up @@ -254,7 +320,6 @@ async def tabular_data_by_sql(self, organization_id: str, sql_query: str) -> Lis
sql_query="SELECT * FROM readings LIMIT 5"
)
Args:
organization_id (str): The ID of the organization that owns the data.
You can obtain your organization ID from the Viam app's organization settings page.
Expand Down Expand Up @@ -284,7 +349,6 @@ async def tabular_data_by_mql(self, organization_id: str, mql_binary: List[bytes
print(f"Tabular Data: {tabular_data}")
Args:
organization_id (str): The ID of the organization that owns the data.
You can obtain your organization ID from the Viam app's organization settings page.
Expand All @@ -307,13 +371,12 @@ async def get_latest_tabular_data(
::
time_captured, time_synced, payload = await data_client.get_latest_tabular_data(
part_id="<PART-ID>",
resource_name="<RESOURCE-NAME>",
resource_subtype="<RESOURCE-SUBTYPE>",
method_name="<METHOD-NAME>"
)
time_captured, time_synced, payload = await data_client.get_latest_tabular_data(
part_id="<PART-ID>",
resource_name="<RESOURCE-NAME>",
resource_subtype="<RESOURCE-SUBTYPE>",
method_name="<METHOD-NAME>"
)
Args:
part_id (str): The ID of the part that owns the data.
Expand All @@ -327,6 +390,7 @@ async def get_latest_tabular_data(
datetime: The time captured,
datetime: The time synced,
Dict[str, ValueTypes]: The latest tabular data captured from the specified data source.
For more information, see `Data Client API <https://docs.viam.com/appendix/apis/data-client/>`_.
"""

Expand All @@ -338,6 +402,64 @@ async def get_latest_tabular_data(
return None
return response.time_captured.ToDatetime(), response.time_synced.ToDatetime(), struct_to_dict(response.payload)

async def export_tabular_data(
self, part_id: str, resource_name: str, resource_subtype: str, method_name: str, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None
) -> List[TabularDataPoint]:
"""Obtain unified tabular data and metadata from the specified data source.
::
tabular_data = await data_client.export_tabular_data(
part_id="<PART-ID>",
resource_name="<RESOURCE-NAME>",
resource_subtype="<RESOURCE-SUBTYPE>",
method_name="<METHOD-NAME>",
start_time="<START_TIME>"
end_time="<END_TIME>"
)
print(f"My data: {tabular_data}")
Args:
part_id (str): The ID of the part that owns the data.
resource_name (str): The name of the requested resource that captured the data.
resource_subtype (str): The subtype of the requested resource that captured the data.
method_name (str): The data capture method name.
start_time (datetime): Optional start time for requesting a specific range of data.
end_time (datetime): Optional end time for requesting a specific range of data.
Returns:
List[TabularDataPoint]: The unified tabular data and metadata.
For more information, see `Data Client API <https://docs.viam.com/appendix/apis/data-client/>`_.
"""

interval=CaptureInterval(start=datetime_to_timestamp(start_time), end=datetime_to_timestamp(end_time))
request = ExportTabularDataRequest(
part_id=part_id, resource_name=resource_name, resource_subtype=resource_subtype, method_name=method_name, interval=interval
)
response: List[ExportTabularDataResponse] = await self._data_client.ExportTabularData(request, metadata=self._metadata)

return [
DataClient.TabularDataPoint(
part_id=resp.part_id,
resource_name=resp.resource_name,
resource_subtype=resp.resource_subtype,
method_name=resp.method_name,
time_captured=resp.time_captured.ToDatetime(),
organization_id=resp.organization_id,
location_id=resp.location_id,
robot_name=resp.robot_name,
robot_id=resp.robot_id,
part_name=resp.part_name,
method_parameters=struct_to_dict(resp.method_parameters),
tags=list(resp.tags),
payload=struct_to_dict(resp.payload)
)
for resp in response
]


async def binary_data_by_filter(
self,
filter: Optional[Filter] = None,
Expand Down
14 changes: 14 additions & 0 deletions tests/mocks/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@
DeleteBinaryDataByIDsResponse,
DeleteTabularDataRequest,
DeleteTabularDataResponse,
ExportTabularDataRequest,
ExportTabularDataResponse,
GetDatabaseConnectionRequest,
GetDatabaseConnectionResponse,
GetLatestTabularDataRequest,
Expand Down Expand Up @@ -767,6 +769,7 @@ class MockData(UnimplementedDataServiceBase):
def __init__(
self,
tabular_response: List[DataClient.TabularData],
tabular_export_response: List[ExportTabularDataResponse],
tabular_query_response: List[Dict[str, Union[ValueTypes, datetime]]],
binary_response: List[BinaryData],
delete_remove_response: int,
Expand All @@ -775,6 +778,7 @@ def __init__(
hostname_response: str,
):
self.tabular_response = tabular_response
self.tabular_export_response = tabular_export_response
self.tabular_query_response = tabular_query_response
self.binary_response = binary_response
self.delete_remove_response = delete_remove_response
Expand Down Expand Up @@ -975,6 +979,16 @@ async def GetLatestTabularData(self, stream: Stream[GetLatestTabularDataRequest,
data = dict_to_struct(self.tabular_response[0].data)
await stream.send_message(GetLatestTabularDataResponse(time_captured=timestamp, time_synced=timestamp, payload=data))

async def ExportTabularData(self, stream: Stream[ExportTabularDataRequest, ExportTabularDataResponse]) -> None:
request = await stream.recv_message()
assert request is not None
self.part_id = request.part_id
self.resource_name = request.resource_name
self.resource_subtype = request.resource_subtype
self.method_name = request.method_name
self.interval = request.interval
for tabular_data in self.tabular_export_response:
await stream.send_message(tabular_data)

class MockDataset(DatasetServiceBase):
def __init__(self, create_response: str, datasets_response: Sequence[Dataset]):
Expand Down
46 changes: 44 additions & 2 deletions tests/test_data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
from typing import List

import pytest
from google.protobuf.struct_pb2 import Struct
from google.protobuf.timestamp_pb2 import Timestamp
from grpclib.testing import ChannelFor

from viam.app.data_client import DataClient
from viam.proto.app.data import Annotations, BinaryData, BinaryID, BinaryMetadata, BoundingBox, CaptureMetadata, Filter, Order
from viam.utils import create_filter
from viam.proto.app.data import Annotations, BinaryData, BinaryID, BinaryMetadata, BoundingBox, CaptureInterval, CaptureMetadata, ExportTabularDataResponse, Filter, Order
from viam.utils import create_filter, dict_to_struct

from .mocks.services import MockData

Expand Down Expand Up @@ -56,6 +57,7 @@
bbox_labels=BBOX_LABELS,
dataset_id=DATASET_ID,
)
INTERVAL=CaptureInterval(start=START_TS, end=END_TS)

FILE_ID = "file_id"
BINARY_ID = BinaryID(file_id=FILE_ID, organization_id=ORG_ID, location_id=LOCATION_ID)
Expand Down Expand Up @@ -101,6 +103,20 @@
)

TABULAR_RESPONSE = [DataClient.TabularData(TABULAR_DATA, TABULAR_METADATA, START_DATETIME, END_DATETIME)]
TABULAR_EXPORT_RESPONSE = [ExportTabularDataResponse(
part_id=TABULAR_METADATA.part_id,
resource_name = TABULAR_METADATA.component_name,
resource_subtype = TABULAR_METADATA.component_type,
time_captured = END_TS,
organization_id = TABULAR_METADATA.organization_id,
location_id =TABULAR_METADATA.location_id,
robot_name = TABULAR_METADATA.robot_name,
robot_id = TABULAR_METADATA.robot_id,
part_name = TABULAR_METADATA.part_name,
method_parameters = Struct(),
tags = TABULAR_METADATA.tags,
payload = dict_to_struct(TABULAR_DATA),
)]
TABULAR_QUERY_RESPONSE = [
{"key1": START_DATETIME, "key2": "2", "key3": [1, 2, 3], "key4": {"key4sub1": END_DATETIME}},
]
Expand All @@ -117,6 +133,7 @@
def service() -> MockData:
return MockData(
tabular_response=TABULAR_RESPONSE,
tabular_export_response=TABULAR_EXPORT_RESPONSE,
tabular_query_response=TABULAR_QUERY_RESPONSE,
binary_response=BINARY_RESPONSE,
delete_remove_response=DELETE_REMOVE_RESPONSE,
Expand Down Expand Up @@ -179,6 +196,31 @@ async def test_get_latest_tabular_data(self, service: MockData):
assert time_captured == time
assert time_synced == time

async def test_export_tabular_data(self, service: MockData):
async with ChannelFor([service]) as channel:
client = DataClient(channel, DATA_SERVICE_METADATA)
tabular_data = await client.export_tabular_data(PART_ID, COMPONENT_NAME, COMPONENT_TYPE, METHOD, START_DATETIME, END_DATETIME)
assert tabular_data is not None
for tabular_datum in tabular_data:
assert tabular_datum is not None
assert tabular_datum.part_id == TABULAR_METADATA.part_id
assert tabular_datum.resource_name == TABULAR_METADATA.component_name
assert tabular_datum.resource_subtype == TABULAR_METADATA.component_type
assert tabular_datum.time_captured == END_DATETIME
assert tabular_datum.organization_id == TABULAR_METADATA.organization_id
assert tabular_datum.location_id == TABULAR_METADATA.location_id
assert tabular_datum.robot_name == TABULAR_METADATA.robot_name
assert tabular_datum.robot_id == TABULAR_METADATA.robot_id
assert tabular_datum.part_name == TABULAR_METADATA.part_name
assert tabular_datum.method_parameters == TABULAR_METADATA.method_parameters
assert tabular_datum.tags == TABULAR_METADATA.tags
assert tabular_datum.payload == TABULAR_DATA
assert service.part_id == PART_ID
assert service.resource_name == COMPONENT_NAME
assert service.resource_subtype == COMPONENT_TYPE
assert service.method_name == METHOD
assert service.interval == INTERVAL

async def test_binary_data_by_filter(self, service: MockData):
async with ChannelFor([service]) as channel:
client = DataClient(channel, DATA_SERVICE_METADATA)
Expand Down

0 comments on commit 0128ba8

Please sign in to comment.