From a3d60f7f39c63832832564d9a2afd9cb63e0744c Mon Sep 17 00:00:00 2001 From: Ed FitzGerald Date: Thu, 23 Nov 2023 16:49:48 +0000 Subject: [PATCH] refactor: to make exchange adapter (#199) Co-authored-by: James Riehl Co-authored-by: James Riehl <33920192+jrriehl@users.noreply.github.com> --- python/.pylintrc | 1 + python/src/uagents/agent.py | 7 +++- python/src/uagents/context.py | 69 ++++++++++++++++++++++++++--------- 3 files changed, 58 insertions(+), 19 deletions(-) diff --git a/python/.pylintrc b/python/.pylintrc index cbbc7976..886b992d 100644 --- a/python/.pylintrc +++ b/python/.pylintrc @@ -11,5 +11,6 @@ disable=missing-module-docstring, too-many-locals, too-many-return-statements, logging-fstring-interpolation, + too-many-lines, broad-exception-caught diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 2e54db5c..46e28dd2 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -159,6 +159,7 @@ def __init__( max_resolver_endpoints: Optional[int] = None, version: Optional[str] = None, test: Optional[bool] = True, + loop: Optional[asyncio.AbstractEventLoop] = None, ): """ Initialize an Agent instance. @@ -182,7 +183,11 @@ def __init__( if resolve is not None else GlobalResolver(max_endpoints=max_resolver_endpoints) ) - self._loop = asyncio.get_event_loop_policy().get_event_loop() + + if loop is not None: + self._loop = loop + else: + self._loop = asyncio.get_event_loop_policy().get_event_loop() self._initialize_wallet_and_identity(seed, name) diff --git a/python/src/uagents/context.py b/python/src/uagents/context.py index 1b302f9e..a00b5489 100644 --- a/python/src/uagents/context.py +++ b/python/src/uagents/context.py @@ -349,7 +349,8 @@ async def experimental_broadcast( agents = self.get_agents_by_protocol(destination_protocol, limit=limit) if not agents: self.logger.error(f"No active agents found for: {destination_protocol}") - return + return [] + schema_digest = Model.build_schema_digest(message) futures = await asyncio.gather( *[ @@ -457,12 +458,38 @@ async def send_raw( endpoint="", ) + return await self.send_raw_exchange_envelope( + self._identity, + destination, + self._resolver, + schema_digest, + self.get_message_protocol(schema_digest), + json_message, + logger=self._logger, + timeout=timeout, + session_id=self._session, + ) + + @staticmethod + async def send_raw_exchange_envelope( + sender: Identity, + destination: str, + resolver: Resolver, + schema_digest: str, + protocol_digest: Optional[str], + json_message: JsonStr, + logger: Optional[logging.Logger] = None, + timeout: int = 5, + session_id: Optional[uuid.UUID] = None, + ) -> MsgStatus: # Resolve the destination address and endpoint ('destination' can be a name or address) - destination_address, endpoints = await self._resolver.resolve(destination) + destination_address, endpoints = await resolver.resolve(destination) if len(endpoints) == 0: - self._logger.exception( - f"Unable to resolve destination endpoint for address {destination}" - ) + if logger: + logger.exception( + f"Unable to resolve destination endpoint for address {destination}" + ) + return MsgStatus( status=DeliveryStatus.FAILED, detail="Unable to resolve destination endpoint", @@ -476,15 +503,15 @@ async def send_raw( # Handle external dispatch of messages env = Envelope( version=1, - sender=self.address, + sender=sender.address, target=destination_address, - session=self._session or uuid.uuid4(), + session=session_id or uuid.uuid4(), schema_digest=schema_digest, - protocol_digest=self.get_message_protocol(schema_digest), + protocol_digest=protocol_digest, expires=expires, ) env.encode_payload(json_message) - env.sign(self._identity) + env.sign(sender) for endpoint in endpoints: try: @@ -502,18 +529,24 @@ async def send_raw( destination=destination, endpoint=endpoint, ) - self._logger.warning( - f"Failed to send message to {destination_address} @ {endpoint}: " - + (await resp.text()) - ) + + if logger: + logger.warning( + f"Failed to send message to {destination_address} @ {endpoint}: " + + (await resp.text()) + ) except aiohttp.ClientConnectorError as ex: - self._logger.warning(f"Failed to connect to {endpoint}: {ex}") + if logger: + logger.warning(f"Failed to connect to {endpoint}: {ex}") + except Exception as ex: - self._logger.warning( - f"Failed to send message to {destination} @ {endpoint}: {ex}" - ) + if logger: + logger.warning( + f"Failed to send message to {destination} @ {endpoint}: {ex}" + ) - self._logger.exception(f"Failed to deliver message to {destination}") + if logger: + logger.exception(f"Failed to deliver message to {destination}") return MsgStatus( status=DeliveryStatus.FAILED,