From 1f292cc4c7e90cba1b4bf2d7e6687681f3f74a0e Mon Sep 17 00:00:00 2001 From: "Isaac I.Y. Saito" <130s@2000.jukuin.keio.ac.jp> Date: Tue, 25 Apr 2017 17:48:47 -0700 Subject: [PATCH] [WIP][ros_bridge] Fix to https://github.com/tork-a/rtmros_nextage/issues/308. --- hironx_ros_bridge/scripts/hironx.py | 12 +--- .../src/hironx_ros_bridge/hironx_client.py | 62 +++++++++++++++---- hironx_ros_bridge/test/test_hironx_client.py | 34 +++++++--- 3 files changed, 78 insertions(+), 30 deletions(-) diff --git a/hironx_ros_bridge/scripts/hironx.py b/hironx_ros_bridge/scripts/hironx.py index 575cea69..a67375db 100755 --- a/hironx_ros_bridge/scripts/hironx.py +++ b/hironx_ros_bridge/scripts/hironx.py @@ -56,15 +56,7 @@ ' this script, but can use RTM. To use ROS, do not forget' \ ' to run rosbridge. How to do so? --> http://wiki.ros.org/rtmros_nextage/Tutorials/Operating%20Hiro%2C%20NEXTAGE%20OPEN' -RTC_LIST = [ - ['seq', "SequencePlayer"], - ['sh', "StateHolder"], - ['fk', "ForwardKinematics"], - ['ic', "ImpedanceController"], - ['el', "SoftErrorLimiter"], - # ['co', "CollisionDetector"], - ['sc', "ServoController"], - ['log', "DataLogger"],] +RTC_LIST = 'seq, sh, fk, ic, el, sc, log' if __name__ == '__main__': parser = argparse.ArgumentParser(description='hiro command line interpreters') @@ -72,7 +64,7 @@ parser.add_argument('--port', help='corba name server port number') parser.add_argument('--modelfile', help='robot model file nmae') parser.add_argument('--robot', help='robot modlule name (RobotHardware0 for real robot, Robot()') - parser.add_argument('--rtcs', help='RT components to activate. If nothing passed then default value will be used.') + parser.add_argument('--rtcs', help="RT components to activate. If nothing passed then default value will be used. Example: ['seq', 'sh', 'fk', 'ic', 'el', 'sc', 'log']") args, unknown = parser.parse_known_args() unknown = [u for u in unknown if u[:2] != '__'] # filter out ros arguments diff --git a/hironx_ros_bridge/src/hironx_ros_bridge/hironx_client.py b/hironx_ros_bridge/src/hironx_ros_bridge/hironx_client.py index 46446d9d..933bc5e4 100644 --- a/hironx_ros_bridge/src/hironx_ros_bridge/hironx_client.py +++ b/hironx_ros_bridge/src/hironx_ros_bridge/hironx_client.py @@ -307,7 +307,7 @@ class via the link above; nicely formatted api doc web page HandGroups = {'rhand': [2, 3, 4, 5], 'lhand': [6, 7, 8, 9]} - _RTClist = [ + _RTC_list = [ ['seq', "SequencePlayer"], ['sh', "StateHolder"], ['fk', "ForwardKinematics"], @@ -318,6 +318,9 @@ class via the link above; nicely formatted api doc web page ['log', "DataLogger"], ] + # List of the name of RT Components that hrpsys requires at minimum. + _RTC_NAME_MINREQ = ['seq', 'sh', 'fk'] + # servo controller (grasper) sc = None sc_svc = None @@ -328,7 +331,7 @@ class via the link above; nicely formatted api doc web page "the function call was successful, since not " + "all methods internally called return status") - def init(self, robotname="HiroNX(Robot)0", url="", rtcs=_RTClist): + def init(self, robotname="HiroNX(Robot)0", url="", rtcs=None): ''' Calls init from its superclass, which tries to connect RTCManager, looks for ModelLoader, and starts necessary RTC components. Also runs @@ -337,11 +340,10 @@ def init(self, robotname="HiroNX(Robot)0", url="", rtcs=_RTClist): @type robotname: str @type url: str - @type rtcs: [[str, str]] - @param rtcs: List of list of RTC names. Each inner list consists of - 'SHORTENED' name and the 'FULLNAME'. + @type rtcs: [str] + @param rtcs: List of abbreviated RTC names. - example: [['seq', "SequencePlayer"], ['sh', "StateHolder"],,,] + example: ['seq', 'sh',,,] ''' # reload for hrpsys 315.1.8 print(self.configurator_name + "waiting ModelLoader") @@ -367,7 +369,10 @@ def init(self, robotname="HiroNX(Robot)0", url="", rtcs=_RTClist): self.sensors = self.getSensors(url) if rtcs: - self._RTClist = rtcs + # convert the list of abbreviated RTC names to the one that + # getRTCList wants. + self.getRTCList(rtcs) + # all([rtm.findRTC(rn[0], rtm.rootnc) for rn in self.getRTCList()]) # not working somehow... if set([rn[0] for rn in self.getRTCList()]).issubset(set([x.name() for x in self.ms.get_components()])) : print(self.configurator_name + "hrpsys components are already created and running") @@ -450,24 +455,55 @@ def goInitial(self, tm=7, wait=True, init_pose_type=0): self.seq_svc.waitInterpolationOfGroup(self.Groups[i][0]) return ret - def getRTCList(self): + def getRTCList(self, rtcs_str=None): ''' @see: HrpsysConfigurator.getRTCList + @type rtcs_str: str + @param rtcs_str: A single str for a set of abbreviated names of RTCs, + each of which is comma-separated. + example: "seq, sh, fk, ic, el, sc, log" @rtype [[str]] - @rerutrn List of available components. Each element consists of a list + @return List of available components. Each element consists of a list of abbreviated and full names of the component. + @raise TypeError: When rtcs_str isn't a string. + @raise ValueError: When rtcs_str does not contain minimum + required RTCs. ''' - if hasattr(self, 'rmfo'): + if rtcs_str: + if not isinstance(rtcs_str, basestring): + raise TypeError('rtcs_str needs to be string.') + # Set a new list of RTCs + new_rtcs = [] + # Separate by comma and remove whitespace. + for rtc_requested in [x.strip() for x in rtcs_str.split(",")]: + for elem in self._RTC_list: + if elem[0] == rtc_requested: + new_rtcs.append(elem) + break + # Check if minimum required RTCs are passed. + if not all(x in self._RTC_NAME_MINREQ for x in new_rtcs): + raise ValueError('{} are required at minimum'.format( + self._RTC_NAME_MINREQ)) + self._RTC_list = new_rtcs + + is_rmfo_initiated = False + # For some reason using built-in "any" method yields + # `TypeError: 'module' object is not callable`, so do the iteration. + for rtc_list in self._RTC_list: + if 'rmfo' in rtc_list: + is_rmfo_initiated = True + if hasattr(self, 'rmfo') and not is_rmfo_initiated: self.ms.load("RemoveForceSensorLinkOffset") self.ms.load("AbsoluteForceSensor") if "RemoveForceSensorLinkOffset" in self.ms.get_factory_names(): - self._RTClist.append(['rmfo', "RemoveForceSensorLinkOffset"]) + self._RTC_list.append(['rmfo', "RemoveForceSensorLinkOffset"]) elif "AbsoluteForceSensor" in self.ms.get_factory_names(): - self._RTClist.append(['rmfo', "AbsoluteForceSensor"]) + self._RTC_list.append(['rmfo', "AbsoluteForceSensor"]) else: print "Component rmfo is not loadable." - return self._RTClist + print('[getRTCList][DEBUG] _RTC_list: {}'.format(self._RTC_list)) + return self._RTC_list # hand interface # effort: 1~100[%] diff --git a/hironx_ros_bridge/test/test_hironx_client.py b/hironx_ros_bridge/test/test_hironx_client.py index c129c455..aa28f715 100755 --- a/hironx_ros_bridge/test/test_hironx_client.py +++ b/hironx_ros_bridge/test/test_hironx_client.py @@ -47,24 +47,23 @@ class TestHiroClient(TestHiro): ['fk', "ForwardKinematics"], ['ic', "ImpedanceController"], ['el', "SoftErrorLimiter"], - # ['co', "CollisionDetector"], ['sc', "ServoController"], ['log', "DataLogger"], + # rmfo will be automatically added in getRTCList. + ['rmfo', 'RemoveForceSensorLinkOffset'] ] _RTC_LIST_CUSTOM = [ ['seq', "SequencePlayer"], ['sh', "StateHolder"], ['fk', "ForwardKinematics"], - ['el', "SoftErrorLimiter"], - ['co', "CollisionDetector"], - ['log', "DataLogger"], + ['rmfo', 'RemoveForceSensorLinkOffset'] ] def test_getRTCList(self): self.assertListEqual(self.robot.getRTCList(), self._RTC_LIST) - def test_getRTCList_customrtcs(self): + def test_getRTCList_customrtcs_args_correct(self): ''' Test when the RTC list was passed from the client. @@ -73,10 +72,31 @@ def test_getRTCList_customrtcs(self): which is not elegant but as of now I can't think of a better way. ''' self.robot = HIRONX() - self.robot.init(rtcs=self._RTC_LIST_CUSTOM) - + # Passing 1st elems from _RTC_LIST_CUSTOM, to init method that calls + # internally getRTCList. + self.robot.init(rtcs='seq, sh, fk') self.assertListEqual(self.robot.getRTCList(), self._RTC_LIST_CUSTOM) + def test_getRTCList_customrtcs_args_wrong(self): + ''' + Test when the RTC list was passed from the client, in wrong format. + ''' + # Passing the list of RTCs falling short of requirement. + self.assertRaises( + ValueError, self.robot.getRTCList, rtcs_str='seq, sh') + + # Passing 1st elems from _RTC_LIST_CUSTOM, + # but list is not the right type of arg. + # + ## http://stackoverflow.com/questions/6103825/how-to-properly-use-unit-testings-assertraises-with-nonetype-objects + ## <-- didn't work, TypeError still raised before assertion. + # with self.assertRaises(TypeError): + # self.robot.getRTCList(rtcs_str=['seq', 'sh', 'fk', 'el', 'co', 'log']) + ## http://stackoverflow.com/a/6103930/577001 + self.assertRaises( + TypeError, lambda: self.robot.getRTCList, + rtcs_str=['seq', 'sh', 'fk', 'el', 'co', 'log']) + if __name__ == '__main__': import rostest rostest.rosrun(PKG, 'test_hronx_client', TestHiroClient)