Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): add message endpoint for inspector #490

Merged
merged 33 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
381cdb9
feat(core): add message endpoint for inspector
Alejandro-Morales Aug 7, 2024
4e6ffd2
fix: ruff check
Alejandro-Morales Aug 7, 2024
8f53362
fix: ruff check
Alejandro-Morales Aug 7, 2024
f4acd8c
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 15, 2024
bd8d4cc
feat: move file storage to memory storage
Alejandro-Morales Aug 15, 2024
f13f0c4
fix: ruff check
Alejandro-Morales Aug 16, 2024
c899ca5
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 16, 2024
2a1e27d
updates on Florians comments
Alejandro-Morales Aug 16, 2024
613c257
ruff format
Alejandro-Morales Aug 16, 2024
7a78622
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 16, 2024
7dd0098
feat: updates on florians comments and agent_info endpoint
Alejandro-Morales Aug 19, 2024
b40656a
fix: delete unused imports
Alejandro-Morales Aug 19, 2024
474d5bd
fix: ruff format check
Alejandro-Morales Aug 19, 2024
d244919
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 19, 2024
c6e5e62
feat: add retention mechanism
Alejandro-Morales Aug 19, 2024
5dc4996
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 21, 2024
f43ab1c
feat: Bureau integration
Alejandro-Morales Aug 22, 2024
46aea4c
try adding server headers
Archento Aug 22, 2024
e37ee6a
fix: format
Alejandro-Morales Aug 22, 2024
05f08e8
fix: model dump agent endpoint list
jrriehl Aug 23, 2024
0e3c4d1
loosen access control rules
Archento Aug 23, 2024
724ede6
Merge branch 'main' into feat/message-inspector
Alejandro-Morales Aug 28, 2024
e65f152
feat: Refactoring based on new REST updates
Alejandro-Morales Aug 28, 2024
39f87b5
fix: test and ruff check
Alejandro-Morales Aug 28, 2024
ead3242
minor corrections
Alejandro-Morales Aug 28, 2024
01b075f
fix rest example, make inspector opt-out, enforce reserved routes, wr…
Archento Aug 29, 2024
887c3f7
fix test
Archento Aug 29, 2024
2999472
simplify AgentEndpoint model
Archento Aug 29, 2024
bec7141
relocate cache ref to dispenser
Archento Aug 30, 2024
a593e94
update docs
Archento Aug 30, 2024
4bcf5e8
fix: small context inconsistency
Archento Aug 30, 2024
0c72136
another small revert in context
Archento Aug 30, 2024
a28162e
refactor retention mechanism
Archento Aug 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion python/src/uagents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import functools
import logging
import uuid
from time import time
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -43,6 +44,7 @@
InternalContext,
IntervalCallback,
MessageCallback,
MessageStore,
)
from uagents.crypto import Identity, derive_key_from_seed, is_user_address
from uagents.dispatch import JsonStr, Sink, dispatcher
Expand Down Expand Up @@ -331,6 +333,7 @@ def __init__(
self._ledger = get_ledger(test)
self._almanac_contract = get_almanac_contract(test)
self._storage = KeyValueStore(self.address[0:16])
self._message_store = MessageStore(self._storage)
self._interval_handlers: List[Tuple[IntervalCallback, float]] = []
self._interval_messages: Set[str] = set()
self._signed_message_handlers: Dict[str, MessageCallback] = {}
Expand Down Expand Up @@ -375,14 +378,15 @@ def __init__(
interval_messages=self._interval_messages,
wallet_messaging_client=self._wallet_messaging_client,
logger=self._logger,
message_store=self._message_store,
)

# register with the dispatcher
self._dispatcher.register(self.address, self)

if not self._use_mailbox:
self._server = ASGIServer(
self._port, self._loop, self._queries, logger=self._logger
self._port, self._ctx, self._loop, self._queries, logger=self._logger
)

# define default error message handler
Expand Down Expand Up @@ -907,6 +911,16 @@ async def handle_message(
session (uuid.UUID): The session UUID.

"""
self._message_store.add_message(
self.address,
{
"type": "received",
"sender": sender,
"schema_digest": schema_digest,
"message": message,
"timestamp": time(),
},
)
await self._message_queue.put((schema_digest, sender, message, session))

async def _startup(self):
Expand Down Expand Up @@ -1052,6 +1066,7 @@ async def _process_message_queue(self):
message=message, schema_digest=schema_digest
),
protocol=self.get_message_protocol(schema_digest),
message_store=self._message_store,
)

# parse the received message
Expand Down
31 changes: 29 additions & 2 deletions python/src/uagents/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from requests.structures import CaseInsensitiveDict
from uagents.communication import enclose_response_raw
from uagents.config import RESPONSE_TIME_HINT_SECONDS
from uagents.context import ERROR_MESSAGE_DIGEST
from uagents.context import ERROR_MESSAGE_DIGEST, Context
from uagents.crypto import is_user_address
from uagents.dispatch import dispatcher
from uagents.envelope import Envelope
Expand Down Expand Up @@ -42,6 +42,7 @@ class ASGIServer:
def __init__(
self,
port: int,
ctx: Context,
loop: asyncio.AbstractEventLoop,
queries: Dict[str, asyncio.Future],
logger: Optional[Logger] = None,
Expand All @@ -60,6 +61,7 @@ def __init__(
self._queries = queries
self._logger = logger or get_logger("server")
self._server = None
self._ctx = ctx

@property
def server(self):
Expand Down Expand Up @@ -149,6 +151,25 @@ async def handle_missing_content_type(self, headers: CaseInsensitiveDict, send):
}
)

async def handle_get_messages(self, ctx, send):
"""
Handle retrieval of stored messages.
"""
messages = ctx._message_store.get_messages(ctx.address)
response = {"messages": messages}
await send(
{
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", b"application/json"],
],
}
)
await send(
{"type": "http.response.body", "body": json.dumps(response).encode()}
Alejandro-Morales marked this conversation as resolved.
Show resolved Hide resolved
)

async def serve(self):
"""
Start the server.
Expand All @@ -173,6 +194,13 @@ async def __call__(self, scope, receive, send): # pylint: disable=too-many-bra

assert scope["type"] == "http"

request_method = scope["method"]
path = scope["path"]

if request_method == "GET" and path == "/messages":
await self.handle_get_messages(self._ctx, send)
return

if scope["path"] != "/submit":
await send(
{
Expand All @@ -190,7 +218,6 @@ async def __call__(self, scope, receive, send): # pylint: disable=too-many-bra

headers = CaseInsensitiveDict(scope.get("headers", {}))

request_method = scope["method"]
if request_method == "HEAD":
await self.handle_readiness_probe(headers, send)
return
Expand Down
65 changes: 63 additions & 2 deletions python/src/uagents/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,35 @@
ERROR_MESSAGE_DIGEST = Model.build_schema_digest(ErrorMessage)


class MessageStore:
def __init__(self, storage: KeyValueStore):
self._storage = storage

def add_message(self, agent_id: str, message: dict):
"""
Store a message for an agent.

Args:
agent_id (str): The identifier of the agent.
message (dict): The message details to store.
"""
messages = self._storage.get(agent_id) or []
messages.append(message)
self._storage.set(agent_id, messages)

def get_messages(self, agent_id: str) -> List[dict]:
"""
Retrieve messages for an agent.

Args:
agent_id (str): The identifier of the agent.

Returns:
List[dict]: A list of messages.
"""
return self._storage.get(agent_id) or []


class Context(ABC):
# pylint: disable=unnecessary-pass
"""
Expand Down Expand Up @@ -267,6 +296,7 @@ def __init__(
interval_messages: Optional[Set[str]] = None,
wallet_messaging_client: Optional[Any] = None,
logger: Optional[logging.Logger] = None,
message_store: Optional[MessageStore] = None,
):
self._agent = agent
self._storage = storage
Expand All @@ -278,6 +308,7 @@ def __init__(
self._interval_messages = interval_messages
self._wallet_messaging_client = wallet_messaging_client
self._outbound_messages: Dict[str, Tuple[JsonStr, str]] = {}
self._message_store = message_store

@property
def agent(self) -> AgentRepresentation:
Expand Down Expand Up @@ -430,14 +461,28 @@ async def send(
session=self._session,
)

return await self.send_raw(
msg_status = await self.send_raw(
destination,
schema_digest,
message_body,
sync=sync,
timeout=timeout,
)

if self._message_store is not None:
self._message_store.add_message(
self.address,
{
"type": "sent",
"destination": destination,
"message": message.json(),
"timestamp": time(),
"status": msg_status.status,
},
)

return msg_status

async def send_raw(
self,
destination: str,
Expand Down Expand Up @@ -587,6 +632,7 @@ def __init__(
queries: Optional[Dict[str, asyncio.Future]] = None,
replies: Optional[Dict[str, Dict[str, Type[Model]]]] = None,
protocol: Optional[Tuple[str, Protocol]] = None,
message_store: Optional[MessageStore] = None,
**kwargs,
):
"""
Expand All @@ -607,6 +653,7 @@ def __init__(
self._replies = replies
self._message_received = message_received
self._protocol = protocol or ("", None)
self._message_store = message_store

def _is_valid_reply(self, message_schema_digest: str) -> bool:
"""
Expand Down Expand Up @@ -673,7 +720,7 @@ async def send(
session=self._session,
)

return await self.send_raw(
msg_status = await self.send_raw(
destination,
schema_digest,
message.model_dump_json(),
Expand All @@ -682,3 +729,17 @@ async def send(
protocol_digest=self._protocol[0],
queries=self._queries,
)

if self._message_store is not None:
self._message_store.add_message(
self.agent.address,
{
"type": "sent",
"destination": destination,
"message": message.model_dump_json(),
"timestamp": time(),
"status": msg_status.status,
},
)

return msg_status
Loading