diff --git a/hironx_ros_bridge/scripts/hironx.py b/hironx_ros_bridge/scripts/hironx.py index 575cea69..a91bf70f 100755 --- a/hironx_ros_bridge/scripts/hironx.py +++ b/hironx_ros_bridge/scripts/hironx.py @@ -56,15 +56,8 @@ ' this script, but can use RTM. To use ROS, do not forget' \ ' to run rosbridge. How to do so? --> http://wiki.ros.org/rtmros_nextage/Tutorials/Operating%20Hiro%2C%20NEXTAGE%20OPEN' -RTC_LIST = [ - ['seq', "SequencePlayer"], - ['sh', "StateHolder"], - ['fk', "ForwardKinematics"], - ['ic', "ImpedanceController"], - ['el', "SoftErrorLimiter"], - # ['co', "CollisionDetector"], - ['sc', "ServoController"], - ['log', "DataLogger"],] +# The default RTCs for Hironx +RTC_LIST = 'seq, sh, fk, ic, el, sc, log' if __name__ == '__main__': parser = argparse.ArgumentParser(description='hiro command line interpreters') @@ -72,7 +65,7 @@ parser.add_argument('--port', help='corba name server port number') parser.add_argument('--modelfile', help='robot model file nmae') parser.add_argument('--robot', help='robot modlule name (RobotHardware0 for real robot, Robot()') - parser.add_argument('--rtcs', help='RT components to activate. If nothing passed then default value will be used.') + parser.add_argument('--rtcs', help="RT components to activate. If nothing passed then default value will be used. Example: '{}'".format(RTC_LIST)) args, unknown = parser.parse_known_args() unknown = [u for u in unknown if u[:2] != '__'] # filter out ros arguments diff --git a/hironx_ros_bridge/src/hironx_ros_bridge/hironx_client.py b/hironx_ros_bridge/src/hironx_ros_bridge/hironx_client.py index 46446d9d..33edfe2d 100644 --- a/hironx_ros_bridge/src/hironx_ros_bridge/hironx_client.py +++ b/hironx_ros_bridge/src/hironx_ros_bridge/hironx_client.py @@ -307,7 +307,10 @@ class via the link above; nicely formatted api doc web page HandGroups = {'rhand': [2, 3, 4, 5], 'lhand': [6, 7, 8, 9]} - _RTClist = [ + # This shouldn't be accessed once turned to True during `init` method. + is_rtc_activated = False + + _RTC_list = [ ['seq', "SequencePlayer"], ['sh', "StateHolder"], ['fk', "ForwardKinematics"], @@ -318,6 +321,9 @@ class via the link above; nicely formatted api doc web page ['log', "DataLogger"], ] + # List of the name of RT Components that hrpsys requires at minimum. + _RTC_NAME_MINREQ = ['seq', 'sh', 'fk'] + # servo controller (grasper) sc = None sc_svc = None @@ -328,7 +334,7 @@ class via the link above; nicely formatted api doc web page "the function call was successful, since not " + "all methods internally called return status") - def init(self, robotname="HiroNX(Robot)0", url="", rtcs=_RTClist): + def init(self, robotname="HiroNX(Robot)0", url="", rtcs=None): ''' Calls init from its superclass, which tries to connect RTCManager, looks for ModelLoader, and starts necessary RTC components. Also runs @@ -337,11 +343,10 @@ def init(self, robotname="HiroNX(Robot)0", url="", rtcs=_RTClist): @type robotname: str @type url: str - @type rtcs: [[str, str]] - @param rtcs: List of list of RTC names. Each inner list consists of - 'SHORTENED' name and the 'FULLNAME'. + @type rtcs: [str] + @param rtcs: List of abbreviated RTC names. - example: [['seq', "SequencePlayer"], ['sh', "StateHolder"],,,] + example: ['seq', 'sh',,,] ''' # reload for hrpsys 315.1.8 print(self.configurator_name + "waiting ModelLoader") @@ -366,10 +371,8 @@ def init(self, robotname="HiroNX(Robot)0", url="", rtcs=_RTClist): # HrpsysConfigurator.init(self, robotname=robotname, url=url) self.sensors = self.getSensors(url) - if rtcs: - self._RTClist = rtcs # all([rtm.findRTC(rn[0], rtm.rootnc) for rn in self.getRTCList()]) # not working somehow... - if set([rn[0] for rn in self.getRTCList()]).issubset(set([x.name() for x in self.ms.get_components()])) : + if set([rn[0] for rn in self.getRTCList(rtcs)]).issubset(set([x.name() for x in self.ms.get_components()])) : print(self.configurator_name + "hrpsys components are already created and running") self.findComps(max_timeout_count=0, verbose=True) else: @@ -450,24 +453,68 @@ def goInitial(self, tm=7, wait=True, init_pose_type=0): self.seq_svc.waitInterpolationOfGroup(self.Groups[i][0]) return ret - def getRTCList(self): + def getRTCList(self, rtcs_str=None): ''' + @summary: Return the list of activated RT components. As opposed to + its naming, this also: + 1) activate an rmfo (stands for "remove force offset") + RTC. + 2) selectively activate RTCs passed by rtcs_str. This is + possible ONLY during the initialization process done + by `init` method. @see: HrpsysConfigurator.getRTCList + @type rtcs_str: str + @param rtcs_str: A single str for a set of abbreviated names of RTCs, + each of which is comma-separated. This is possible + ONLY during the initialization process done by + `init` method. + example: "seq, sh, fk, ic, el, sc, log" @rtype [[str]] - @rerutrn List of available components. Each element consists of a list + @return List of available components. Each element consists of a list of abbreviated and full names of the component. + @raise TypeError: When rtcs_str isn't a string. + @raise ValueError: When rtcs_str does not contain minimum + required RTCs. ''' - if hasattr(self, 'rmfo'): + if rtcs_str: + if self.is_rtc_activated: + print('RTCs are already activated. Skipping the passed request: {}'.format(rtcs_str)) + else: + if not isinstance(rtcs_str, basestring): + raise TypeError('rtcs_str needs to be string.') + # Set a new list of RTCs + new_rtcs = [] + # Separate by comma and remove whitespace. + rtcs_req_list = [x.strip() for x in rtcs_str.split(",")] + # Check if minimum required RTCs are passed. + if not all(x in rtcs_req_list for x in self._RTC_NAME_MINREQ): + raise ValueError('{} are required at minimum'.format( + self._RTC_NAME_MINREQ)) + for rtc_requested in rtcs_req_list: + for elem in self._RTC_list: + if elem[0] == rtc_requested: + new_rtcs.append(elem) + break + self._RTC_list = new_rtcs + self.is_rtc_activated = True + + is_rmfo_initiated = False + # For some reason using built-in "any" method yields + # `TypeError: 'module' object is not callable`, so do the iteration. + for rtc_list in self._RTC_list: + if 'rmfo' in rtc_list: + is_rmfo_initiated = True + if hasattr(self, 'rmfo') and not is_rmfo_initiated: self.ms.load("RemoveForceSensorLinkOffset") self.ms.load("AbsoluteForceSensor") if "RemoveForceSensorLinkOffset" in self.ms.get_factory_names(): - self._RTClist.append(['rmfo', "RemoveForceSensorLinkOffset"]) + self._RTC_list.append(['rmfo', "RemoveForceSensorLinkOffset"]) elif "AbsoluteForceSensor" in self.ms.get_factory_names(): - self._RTClist.append(['rmfo', "AbsoluteForceSensor"]) + self._RTC_list.append(['rmfo', "AbsoluteForceSensor"]) else: print "Component rmfo is not loadable." - return self._RTClist + return self._RTC_list # hand interface # effort: 1~100[%] diff --git a/hironx_ros_bridge/test/test_hironx_client.py b/hironx_ros_bridge/test/test_hironx_client.py index c129c455..0106957e 100755 --- a/hironx_ros_bridge/test/test_hironx_client.py +++ b/hironx_ros_bridge/test/test_hironx_client.py @@ -47,24 +47,42 @@ class TestHiroClient(TestHiro): ['fk', "ForwardKinematics"], ['ic', "ImpedanceController"], ['el', "SoftErrorLimiter"], - # ['co', "CollisionDetector"], ['sc', "ServoController"], ['log', "DataLogger"], + # rmfo will be automatically added in getRTCList. + ['rmfo', 'RemoveForceSensorLinkOffset'] ] _RTC_LIST_CUSTOM = [ ['seq', "SequencePlayer"], ['sh', "StateHolder"], ['fk', "ForwardKinematics"], - ['el', "SoftErrorLimiter"], - ['co', "CollisionDetector"], - ['log', "DataLogger"], + ['rmfo', 'RemoveForceSensorLinkOffset'] ] + def _compare_2dlist(self, twodim_list_a, twodim_list_b): + ''' + Compare the first element in all elements of the 2nd list. + E.g. For [['a0', 'a1'], ['b0', 'b1'],..., ['n0', 'n1']], this method + checks a0, b0, n0 + @rtype bool + ''' + return set([a[0] for a in twodim_list_a]) == set( + [b[0] for b in twodim_list_b]) + def test_getRTCList(self): - self.assertListEqual(self.robot.getRTCList(), self._RTC_LIST) + ''' + Depending on the hrpsys version, different RTC implementation can be + returned, e.g. for "rmfo", older returns AbsoluteForceSensor while + newer does RemoveForceSensorLinkOffset. So in this testcase we only + check the first element of the returned list (e.g. "rmfo" instead of + its implementation). + ''' + self.assertTrue( + self._compare_2dlist( + self.robot.getRTCList(), self._RTC_LIST)) - def test_getRTCList_customrtcs(self): + def test_getRTCList_customrtcs_args_correct(self): ''' Test when the RTC list was passed from the client. @@ -73,9 +91,27 @@ def test_getRTCList_customrtcs(self): which is not elegant but as of now I can't think of a better way. ''' self.robot = HIRONX() - self.robot.init(rtcs=self._RTC_LIST_CUSTOM) + # Passing 1st elems from _RTC_LIST_CUSTOM, to init method that calls + # internally getRTCList. + self.robot.init(rtcs='seq, sh, fk') + self.assertTrue( + self._compare_2dlist( + self.robot.getRTCList(), self._RTC_LIST_CUSTOM)) + + def test_getRTCList_customrtcs_args_wrong(self): + ''' + Test when the RTC list was passed from the client, in wrong format. + ''' + # Passing the list of RTCs falling short of requirement. + self.assertRaises( + ValueError, self.robot.getRTCList, rtcs_str='seq, sh') - self.assertListEqual(self.robot.getRTCList(), self._RTC_LIST_CUSTOM) + # Passing 1st elems from _RTC_LIST_CUSTOM, + # but list is not the right type of arg. + ## http://stackoverflow.com/a/6103930/577001 + self.assertRaises( + TypeError, lambda: self.robot.getRTCList, + rtcs_str=['seq', 'sh', 'fk', 'el', 'co', 'log']) if __name__ == '__main__': import rostest