Skip to content

Commit

Permalink
refactor: to make exchange adapter (#199)
Browse files Browse the repository at this point in the history
Co-authored-by: James Riehl <[email protected]>
Co-authored-by: James Riehl <[email protected]>
  • Loading branch information
3 people authored Nov 23, 2023
1 parent 1ecc714 commit a3d60f7
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 19 deletions.
1 change: 1 addition & 0 deletions python/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ disable=missing-module-docstring,
too-many-locals,
too-many-return-statements,
logging-fstring-interpolation,
too-many-lines,
broad-exception-caught

7 changes: 6 additions & 1 deletion python/src/uagents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def __init__(
max_resolver_endpoints: Optional[int] = None,
version: Optional[str] = None,
test: Optional[bool] = True,
loop: Optional[asyncio.AbstractEventLoop] = None,
):
"""
Initialize an Agent instance.
Expand All @@ -182,7 +183,11 @@ def __init__(
if resolve is not None
else GlobalResolver(max_endpoints=max_resolver_endpoints)
)
self._loop = asyncio.get_event_loop_policy().get_event_loop()

if loop is not None:
self._loop = loop
else:
self._loop = asyncio.get_event_loop_policy().get_event_loop()

self._initialize_wallet_and_identity(seed, name)

Expand Down
69 changes: 51 additions & 18 deletions python/src/uagents/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ async def experimental_broadcast(
agents = self.get_agents_by_protocol(destination_protocol, limit=limit)
if not agents:
self.logger.error(f"No active agents found for: {destination_protocol}")
return
return []

schema_digest = Model.build_schema_digest(message)
futures = await asyncio.gather(
*[
Expand Down Expand Up @@ -457,12 +458,38 @@ async def send_raw(
endpoint="",
)

return await self.send_raw_exchange_envelope(
self._identity,
destination,
self._resolver,
schema_digest,
self.get_message_protocol(schema_digest),
json_message,
logger=self._logger,
timeout=timeout,
session_id=self._session,
)

@staticmethod
async def send_raw_exchange_envelope(
sender: Identity,
destination: str,
resolver: Resolver,
schema_digest: str,
protocol_digest: Optional[str],
json_message: JsonStr,
logger: Optional[logging.Logger] = None,
timeout: int = 5,
session_id: Optional[uuid.UUID] = None,
) -> MsgStatus:
# Resolve the destination address and endpoint ('destination' can be a name or address)
destination_address, endpoints = await self._resolver.resolve(destination)
destination_address, endpoints = await resolver.resolve(destination)
if len(endpoints) == 0:
self._logger.exception(
f"Unable to resolve destination endpoint for address {destination}"
)
if logger:
logger.exception(
f"Unable to resolve destination endpoint for address {destination}"
)

return MsgStatus(
status=DeliveryStatus.FAILED,
detail="Unable to resolve destination endpoint",
Expand All @@ -476,15 +503,15 @@ async def send_raw(
# Handle external dispatch of messages
env = Envelope(
version=1,
sender=self.address,
sender=sender.address,
target=destination_address,
session=self._session or uuid.uuid4(),
session=session_id or uuid.uuid4(),
schema_digest=schema_digest,
protocol_digest=self.get_message_protocol(schema_digest),
protocol_digest=protocol_digest,
expires=expires,
)
env.encode_payload(json_message)
env.sign(self._identity)
env.sign(sender)

for endpoint in endpoints:
try:
Expand All @@ -502,18 +529,24 @@ async def send_raw(
destination=destination,
endpoint=endpoint,
)
self._logger.warning(
f"Failed to send message to {destination_address} @ {endpoint}: "
+ (await resp.text())
)

if logger:
logger.warning(
f"Failed to send message to {destination_address} @ {endpoint}: "
+ (await resp.text())
)
except aiohttp.ClientConnectorError as ex:
self._logger.warning(f"Failed to connect to {endpoint}: {ex}")
if logger:
logger.warning(f"Failed to connect to {endpoint}: {ex}")

except Exception as ex:
self._logger.warning(
f"Failed to send message to {destination} @ {endpoint}: {ex}"
)
if logger:
logger.warning(
f"Failed to send message to {destination} @ {endpoint}: {ex}"
)

self._logger.exception(f"Failed to deliver message to {destination}")
if logger:
logger.exception(f"Failed to deliver message to {destination}")

return MsgStatus(
status=DeliveryStatus.FAILED,
Expand Down

0 comments on commit a3d60f7

Please sign in to comment.