Skip to content

Commit

Permalink
DOCS-1877: Add motion, nav, slam method examples (viamrobotics#550)
Browse files Browse the repository at this point in the history
  • Loading branch information
skyleilani authored Mar 8, 2024
1 parent a111146 commit 1125f05
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 3 deletions.
101 changes: 100 additions & 1 deletion src/viam/services/motion/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Transform,
WorldState,
)

from viam.proto.service.motion import (
Constraints,
GetPlanRequest,
Expand All @@ -33,6 +34,7 @@
StopPlanRequest,
StopPlanResponse,
)

from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.resource.types import RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_SERVICE, Subtype
from viam.services.service_client_base import ServiceClientBase
Expand Down Expand Up @@ -74,6 +76,24 @@ async def move(
resource_name = Arm.get_resource_name("externalFrame")
success = await MotionServiceClient.move(resource_name, ...)
::
motion = MotionClient.from_robot(robot=robot, name="builtin")
# Assumes a gripper configured with name "my_gripper" on the machine
gripper_name = Gripper.get_resource_name("my_gripper")
my_frame = "my_gripper_offset"
goal_pose = Pose(x=0, y=0, z=300, o_x=0, o_y=0, o_z=1, theta=0)
# Move the gripper
moved = await motion.move(component_name=gripper_name,
destination=PoseInFrame(reference_frame="myFrame",
pose=goal_pose),
world_state=worldState,
constraints={},
extra={})
Args:
component_name (viam.proto.common.ResourceName): Name of a component on a given robot.
destination (viam.proto.common.PoseInFrame): The destination to move to, expressed as a ``Pose`` and the frame in which it was
Expand Down Expand Up @@ -121,6 +141,23 @@ async def move_on_globe(
You can monitor the progress of the ``move_on_globe()`` call by querying ``get_plan()`` and ``list_plan_statuses()``.
::
motion = MotionClient.from_robot(robot=robot, name="builtin")
# Get the ResourceNames of the base and movement sensor
my_base_resource_name = Base.get_resource_name("my_base")
mvmnt_sensor_resource_name = MovementSensor.get_resource_name(
"my_movement_sensor")
# Define a destination GeoPoint at the GPS coordinates [0, 0]
my_destination = movement_sensor.GeoPoint(latitude=0, longitude=0)
# Move the base component to the designated geographic location, as reported by the movement sensor
execution_id = await motion.move_on_globe(
component_name=my_base_resource_name,
destination=my_destination,
movement_sensor_name=mvmnt_sensor_resource_name)
Args:
component_name (ResourceName): The ResourceName of the base to move.
destination (GeoPoint): The location of the component’s destination, represented in geographic notation as a
Expand Down Expand Up @@ -188,6 +225,23 @@ async def move_on_map(
You can monitor the progress of the ``move_on_map()`` call by querying ``get_plan()`` and ``list_plan_statuses()``.
::
motion = MotionClient.from_robot(robot=robot, name="builtin")
# Get the ResourceNames of the base component and SLAM service
my_base_resource_name = Base.get_resource_name("my_base")
my_slam_service_name = SLAMClient.get_resource_name("my_slam_service")
# Define a destination pose with respect to the origin of the map from the SLAM service "my_slam_service"
my_pose = Pose(y=10)
# Move the base component to the destination pose of Y=10, a location of
# (0, 10, 0) in respect to the origin of the map
execution_id = await motion.move_on_map(component_name=my_base_resource_name,
destination=my_pose,
slam_service_name=my_slam_service_name)
Args:
component_name (ResourceName): The ResourceName of the base to move.
destination (Pose): The destination, which can be any Pose with respect to the SLAM map’s origin.
Expand Down Expand Up @@ -232,6 +286,14 @@ async def stop_plan(
):
"""Stop a component being moved by an in progress ``move_on_globe()`` or ``move_on_map()`` call.
::
# Assuming a `move_on_globe()` started the execution
# Stop the base component which was instructed to move by `move_on_globe()`
# or `move_on_map()`
my_base_resource_name = Base.get_resource_name("my_base")
await motion.stop_plan(component_name=mvmnt_sensor)
Args:
component_name (ResourceName): The component to stop
Expand All @@ -258,7 +320,7 @@ async def get_plan(
extra: Optional[Mapping[str, ValueTypes]] = None,
timeout: Optional[float] = None,
) -> GetPlanResponse:
"""By default: returns the plan history of the most recent ``move_on_globe()`` or ``move_on_map()``call to move a component.
"""By default: returns the plan history of the most recent ``move_on_globe()`` or ``move_on_map()`` call to move a component.
The plan history for executions before the most recent can be requested by providing an ExecutionID in the request.
Expand All @@ -275,6 +337,13 @@ async def get_plan(
All repeated fields are in time ascending order.
::
motion = MotionClient.from_robot(robot=robot, name="builtin")
my_base_resource_name = Base.get_resource_name("my_base")
# Get the plan(s) of the base component which was instructed to move by `MoveOnGlobe()` or `MoveOnMap()`
resp = await motion.get_plan(component_name=my_base_resource_name)
Args:
component_name (ResourceName): The component to stop
last_plan_only (Optional[bool]): If supplied, the response will only return the last plan for the component / execution
Expand Down Expand Up @@ -311,6 +380,12 @@ async def list_plan_statuses(
All repeated fields are in chronological order.
::
motion = MotionClient.from_robot(robot=robot, name="builtin")
# List the plan statuses of the motion service within the TTL
resp = await motion.list_plan_statuses()
Args:
only_active_plans (Optional[bool]): If supplied, the response will filter out any plans that are not executing
Expand Down Expand Up @@ -346,6 +421,18 @@ async def get_pose(
Note that the example uses the ``Arm`` class, but any component class that inherits from ``ComponentBase`` will work
(``Base``, ``Gripper``, etc).
::
from viam.components.gripper import Gripper
from viam.services.motion import MotionClient
# Assume that the connect function is written and will return a valid machine.
robot = await connect()
motion = MotionClient.from_robot(robot=robot, name="builtin")
gripperName = Gripper.get_resource_name("my_gripper")
gripperPoseInWorld = await motion.get_pose(component_name=gripperName,
destination_frame="world")
Args:
component_name (viam.proto.common.ResourceName): Name of a component on a robot.
Expand All @@ -371,6 +458,18 @@ async def get_pose(
async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **__) -> Mapping[str, ValueTypes]:
"""Send/receive arbitrary commands
::
# Access the motion service
motion = MotionClient.from_robot(robot=robot, name="builtin")
my_command = {
"command": "dosomething",
"someparameter": 52
}
await motion.do_command(my_command)
Args:
command (Dict[str, ValueTypes]): The command to execute
Expand Down
173 changes: 173 additions & 0 deletions src/viam/services/navigation/navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,209 @@ class Navigation(ServiceBase):

@abc.abstractmethod
async def get_paths(self, *, timeout: Optional[float]) -> List[Path]:
"""
Get each path, the series of geo points the robot plans to travel through
to get to a destination waypoint, in the machine’s motion planning.
::
my_nav = NavigationClient.from_robot(robot=robot, name="my_nav_service")
# Get a list containing each path stored by the navigation service
paths = await my_nav.get_paths()
Args:
timeout (Optional[float]): An option to set how long to wait (in seconds)
before calling a time-out and closing the underlying RPC call.
Returns:
(List[navigation.Path]): An array comprised of Paths, where each path is either a user-provided destination or
a Waypoint, along with the corresponding set of geopoints. This outlines the route the machine is expected to take to
reach the specified destination or Waypoint.
"""
...

@abc.abstractmethod
async def get_location(self, *, timeout: Optional[float]) -> GeoPoint:
"""
Get the current location of the robot in the navigation service.
::
my_nav = NavigationClient.from_robot(robot=robot, name="my_nav_service")
# Get the current location of the robot in the navigation service
location = await my_nav.get_location()
Args:
timeout (Optional[float]): An option to set how long to wait (in seconds)
before calling a time-out and closing the underlying RPC call.
Returns:
(navigation.GeoPoint): The current location of the robot in the navigation service,
represented in a GeoPoint with latitude and longitude values.
"""
...

@abc.abstractmethod
async def get_obstacles(self, *, timeout: Optional[float]) -> List[GeoObstacle]:
"""
Get an array or list of the obstacles currently in the service’s data storage.
These are objects designated for the robot to avoid when navigating.
These include all transient obstacles which are discovered by the vision services configured for the navigation service,
in addition to the obstacles that are configured as a part of the service.
::
my_nav = NavigationClient.from_robot(robot=robot, name="my_nav_service")
# Get a list containing each obstacle stored by the navigation service
obstacles = await my_nav.get_obstacles()
Args:
timeout (Optional[float]): An option to set how long to wait (in seconds)
before calling a time-out and closing the underlying RPC call.
Returns:
(List[navigation.GeoObstacle]): A list comprised of each GeoObstacle in the service’s data storage.
These are objects designated for the robot to avoid when navigating.
"""
...

@abc.abstractmethod
async def get_waypoints(self, *, timeout: Optional[float]) -> List[Waypoint]:
"""
Get an array of waypoints currently in the service’s data storage.
These are locations designated within a path for the robot to navigate to.
::
my_nav = NavigationClient.from_robot(robot=robot, name="my_nav_service")
# Get a list containing each waypoint stored by the navigation service
waypoints = await my_nav.get_waypoints()
Args:
timeout (Optional[float]): An option to set how long to wait (in seconds)
before calling a time-out and closing the underlying RPC call.
Returns:
(List[navigation.Waypoint]): An array comprised of each Waypoint in the service’s data storage.
These are locations designated within a path for the robot to navigate to.
"""
...

@abc.abstractmethod
async def add_waypoint(self, point: GeoPoint, *, timeout: Optional[float]):
"""
Add a waypoint to the service’s data storage.
::
my_nav = NavigationClient.from_robot(robot=robot, name="my_nav_service")
# Create a new waypoint with latitude and longitude values of 0 degrees
location = GeoPoint(latitude=0, longitude=0)
# Add your waypoint to the service's data storage
await my_nav.add_waypoint(point=location)
Args:
point(navigation.GeoPoint): The current location of the robot in the navigation service,
represented in a GeoPoint with latitude and longitude values.
timeout (Optional[float]): An option to set how long to wait (in seconds)
before calling a time-out and closing the underlying RPC call.
"""
...

@abc.abstractmethod
async def remove_waypoint(self, id: str, *, timeout: Optional[float]):
"""
Remove a waypoint from the service’s data storage. If the robot is currently navigating to this waypoint,
the motion will be canceled, and the robot will proceed to the next waypoint.
::
my_nav = NavigationClient.from_robot(robot=robot, name="my_nav_service")
# Remove the waypoint matching that ObjectID from the service's data storage
await my_nav.remove_waypoint(waypoint_id)
Args:
id(str): The MongoDB ObjectID of the Waypoint to remove from the service’s data storage.
timeout (Optional[float]): An option to set how long to wait (in seconds)
before calling a time-out and closing the underlying RPC call.
"""
...

@abc.abstractmethod
async def get_mode(self, *, timeout: Optional[float]) -> Mode.ValueType:
"""
Get the Mode the service is operating in.
There are two options for modes: MODE_MANUAL or MODE_WAYPOINT.
MODE_WAYPOINT: Start to look for added waypoints and begin autonomous navigation.
MODE_MANUAL: Stop autonomous navigation between waypoints and allow the base to be controlled manually.
::
my_nav = NavigationClient.from_robot(robot=robot, name="my_nav_service")
# Get the Mode the service is operating in
await my_nav.get_mode()
Args:
timeout (Optional[float]): An option to set how long to wait (in seconds)
before calling a time-out and closing the underlying RPC call.
Returns:
navigation.Mode.ValueType: The Mode the service is operating in.
"""
...

@abc.abstractmethod
async def set_mode(self, mode: Mode.ValueType, *, timeout: Optional[float]):
"""
Set the Mode the service is operating in.
There are two options for modes: MODE_MANUAL or MODE_WAYPOINT.
MODE_WAYPOINT: Start to look for added waypoints and begin autonomous navigation.
MODE_MANUAL: Stop autonomous navigation between waypoints and allow the base to be controlled manually.
::
my_nav = NavigationClient.from_robot(robot=robot, name="my_nav_service")
# Set the Mode the service is operating in to MODE_WAYPOINT and begin navigation
await my_nav.set_mode(Mode.ValueType.MODE_WAYPOINT)
Args:
timeout (Optional[float]): An option to set how long to wait (in seconds)
before calling a time-out and closing the underlying RPC call.
mode (navigation.Mode.ValueType): The Mode for the service to operate in.
"""
...

@abc.abstractmethod
async def get_properties(self, *, timeout: Optional[float]) -> MapType.ValueType:
"""
Get information about the navigation service.
::
my_nav = NavigationClient.from_robot(robot=robot, name="my_nav_service")
# Get the properties of the current navigation service.
nav_properties = await my_nav.get_properties()
Args:
timeout (Optional[float]): An option to set how long to wait (in seconds)
before calling a time-out and closing the underlying RPC call.
Returns:
MapType.ValueType: Information about the type of map the service is using.
"""
...
Loading

0 comments on commit 1125f05

Please sign in to comment.