From acf866ff8295e3e4705299beb0cb83662a44cdfb Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Tue, 2 Apr 2024 19:22:46 -0400 Subject: [PATCH 01/28] refactor!: add data acquisition models to recorder --- silverback/recorder.py | 71 +++++++++++++++++++++++++++++++----------- silverback/runner.py | 3 +- silverback/types.py | 29 +++++++++++++++-- 3 files changed, 82 insertions(+), 21 deletions(-) diff --git a/silverback/recorder.py b/silverback/recorder.py index 6ce0a2ce..3744eba0 100644 --- a/silverback/recorder.py +++ b/silverback/recorder.py @@ -3,13 +3,14 @@ import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone -from typing import TypeVar +from typing import Any, TypeVar -from pydantic import BaseModel +from ape.logging import get_logger +from pydantic import BaseModel, Field from taskiq import TaskiqResult from typing_extensions import Self # Introduced 3.11 -from .types import SilverbackID +from .types import BaseDatapoint, Metrics, ScalarDatapoint, SilverbackID, scalar_types _HandlerReturnType = TypeVar("_HandlerReturnType") @@ -24,13 +25,44 @@ class SilverbackState(BaseModel): updated: datetime -class HandlerResult(TaskiqResult): +class HandlerResult(BaseModel): instance: str network: str handler_id: str block_number: int | None log_index: int | None created: datetime + labels: dict[str, Any] = Field(default_factory=dict) + execution_time: float + metrics: Metrics + error: Optional[str] = None + + @classmethod + def _extract_metrics(cls, result: Any, handler_id: str) -> Metrics: + if isinstance(result, BaseDatapoint): + return {f"{handler_id}_result": result} + + elif isinstance(result, scalar_types): + return {f"{handler_id}_result": ScalarDatapoint(data=result)} + + elif isinstance(result, dict): + converted_result = { + k: ScalarDatapoint(data=v) if not isinstance(v, BaseDatapoint) else v + for k, v in result.items() + if isinstance(v, (BaseDatapoint, *scalar_types)) + } + if len(converted_result) < len(result): + logger = get_logger(handler_id) + logger.warning(f"Unhandled results: {len(result)-len(converted_result)}") + + return converted_result + + elif result is not None: + logger = get_logger(handler_id) + logger.warning(f"Cannot handle return type '{type(result.metrics)}'.") + + # else: + return {} @classmethod def from_taskiq( @@ -48,7 +80,10 @@ def from_taskiq( block_number=block_number, log_index=log_index, created=datetime.now(timezone.utc), - **result.dict(), + labels=result.labels, + execution_time=result.execution_time, + error=str(result.error), + metrics=cls._extract_metrics(result.return_value, handler_id), ) @@ -113,16 +148,16 @@ class SQLiteRecorder(BaseRecorder): WHERE instance = ? AND network = ?; """ SQL_GET_RESULT_LATEST = """ - SELECT handler_id, block_number, log_index, execution_time, is_err, created, - return_value_blob + SELECT handler_id, block_number, log_index, execution_time, error, created, + metrics_blob FROM silverback_result WHERE instance = ? AND network = ? ORDER BY created DESC LIMIT 1; """ SQL_GET_HANDLER_LATEST = """ - SELECT handler_id, block_number, log_index, execution_time, is_err, created, - return_value_blob + SELECT handler_id, block_number, log_index, execution_time, error, created, + metrics_blob FROM silverback_result WHERE instance = ? AND network = ? AND handler_id = ? ORDER BY created DESC @@ -131,7 +166,7 @@ class SQLiteRecorder(BaseRecorder): SQL_INSERT_RESULT = """ INSERT INTO silverback_result ( instance, network, handler_id, block_number, log_index, execution_time, - is_err, created, return_value_blob + error, created, metrics_blob ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); """ @@ -160,9 +195,9 @@ async def init(self): block_number int, log_index int, execution_time real, - is_err bool, + error text, created int, - return_value_blob blob + metrics_blob blob ); CREATE UNIQUE INDEX IF NOT EXISTS silverback_state__instance ON silverback_state(instance, network); @@ -170,8 +205,8 @@ async def init(self): ON silverback_result (instance, network); CREATE INDEX IF NOT EXISTS silverback_result__handler ON silverback_result (instance, network, handler_id); - CREATE INDEX IF NOT EXISTS silverback_result__is_err - ON silverback_result (is_err); + CREATE INDEX IF NOT EXISTS silverback_result__error + ON silverback_result (error); COMMIT; """ ) @@ -295,9 +330,9 @@ async def get_latest_result( block_number=row[1], log_index=row[2], execution_time=row[3], - is_err=row[4], + error=row[4], created=datetime.fromtimestamp(row[5], timezone.utc), - return_value=json.loads(row[6]), + metrics=json.loads(row[6]), ) async def add_result(self, v: HandlerResult): @@ -317,9 +352,9 @@ async def add_result(self, v: HandlerResult): v.block_number, v.log_index, v.execution_time, - v.is_err, + v.error, v.created, - json.dumps(v.return_value), + json.dumps({n: m.model_dump_json() for n, m in v.metrics.items()}), ), ) diff --git a/silverback/runner.py b/silverback/runner.py index ed67f86e..493c4708 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -35,10 +35,11 @@ def _handle_result(self, result: TaskiqResult): self.exceptions += 1 else: + # NOTE: Reset exception counter self.exceptions = 0 if self.exceptions > self.max_exceptions: - raise Halt() + raise Halt() from result.error async def _checkpoint( self, last_block_seen: int = 0, last_block_processed: int = 0 diff --git a/silverback/types.py b/silverback/types.py index 49529f5e..2eb0cef3 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,7 +1,9 @@ +from datetime import datetime, timezone +from decimal import Decimal from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+ -from typing import Protocol +from typing import Literal, Protocol, get_args -from pydantic import BaseModel +from pydantic import BaseModel, Field from typing_extensions import Self # Introduced 3.11 @@ -38,3 +40,26 @@ def from_settings(cls, settings_: ISilverbackSettings) -> Self: class SilverbackStartupState(BaseModel): last_block_seen: int last_block_processed: int + + +class BaseDatapoint(BaseModel): + type: str # discriminator + + # NOTE: default value ensures we don't have to set this manually + time: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +ScalarType = bool | int | float | Decimal +scalar_types = get_args(ScalarType) + + +class ScalarDatapoint(BaseDatapoint): + type: Literal["scalar"] = "scalar" + + # NOTE: app-supported scalar value types: + data: ScalarType + + +# This is what a Silverback app task must return to integrate properly with our data acq system +Metrics = dict[str, BaseDatapoint] +# Otherwise, log a warning and ignore any unconverted return value(s) From baf6f3cb41cfe322a622616744be719b01d6d868 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Tue, 16 Apr 2024 22:16:49 -0400 Subject: [PATCH 02/28] refactor: reformat metrics a bit --- silverback/recorder.py | 110 ++++++++++++++++++++++++----------------- silverback/types.py | 41 ++++++++++----- 2 files changed, 95 insertions(+), 56 deletions(-) diff --git a/silverback/recorder.py b/silverback/recorder.py index 3744eba0..97be6777 100644 --- a/silverback/recorder.py +++ b/silverback/recorder.py @@ -10,10 +10,20 @@ from taskiq import TaskiqResult from typing_extensions import Self # Introduced 3.11 -from .types import BaseDatapoint, Metrics, ScalarDatapoint, SilverbackID, scalar_types +from .types import ( + Datapoint, + ScalarDatapoint, + ScalarType, + SilverbackID, + UTCTimestamp, + iso_format, + utc_now, +) _HandlerReturnType = TypeVar("_HandlerReturnType") +logger = get_logger(__name__) + class SilverbackState(BaseModel): instance: str @@ -25,65 +35,77 @@ class SilverbackState(BaseModel): updated: datetime -class HandlerResult(BaseModel): - instance: str - network: str - handler_id: str - block_number: int | None - log_index: int | None - created: datetime - labels: dict[str, Any] = Field(default_factory=dict) +class TaskResult(BaseModel): + # NOTE: Model must eventually serialize using PyArrow/Parquet for long-term storage + + # Task Info + task_name: str execution_time: float - metrics: Metrics - error: Optional[str] = None + error: str | None = None - @classmethod - def _extract_metrics(cls, result: Any, handler_id: str) -> Metrics: - if isinstance(result, BaseDatapoint): - return {f"{handler_id}_result": result} + # NOTE: intended to use default when creating a model with this type + completed: UTCTimestamp = Field(default_factory=utc_now) - elif isinstance(result, scalar_types): - return {f"{handler_id}_result": ScalarDatapoint(data=result)} + # System Metrics here (must default to None in case they are missing) + block_number: int | None = None - elif isinstance(result, dict): - converted_result = { - k: ScalarDatapoint(data=v) if not isinstance(v, BaseDatapoint) else v - for k, v in result.items() - if isinstance(v, (BaseDatapoint, *scalar_types)) - } - if len(converted_result) < len(result): - logger = get_logger(handler_id) - logger.warning(f"Unhandled results: {len(result)-len(converted_result)}") + # Custom user metrics here + metrics: dict[str, Datapoint] = {} - return converted_result + @classmethod + def _extract_custom_metrics(cls, result: Any, task_name: str) -> dict[str, Datapoint]: + if isinstance(result, Datapoint): # type: ignore[arg-type,misc] + return {"result": result} + + elif isinstance(result, ScalarType): # type: ignore[arg-type,misc] + return {"result": ScalarDatapoint(data=result)} - elif result is not None: - logger = get_logger(handler_id) - logger.warning(f"Cannot handle return type '{type(result.metrics)}'.") + elif result is None: + return {} + + elif not isinstance(result, dict): + logger.warning(f"Cannot handle return type of '{task_name}': '{type(result)}'.") + return {} # else: - return {} + converted_result = {} + + for metric_name, metric_value in result.items(): + if isinstance(metric_value, Datapoint): # type: ignore[arg-type,misc] + converted_result[metric_name] = metric_value + + elif isinstance(metric_value, ScalarType): # type: ignore[arg-type,misc] + converted_result[metric_name] = ScalarDatapoint(data=metric_value) + + else: + logger.warning( + f"Cannot handle type of metric '{task_name}.{metric_name}':" + f" '{type(metric_value)}'." + ) + + return converted_result + + @classmethod + def _extract_system_metrics(cls, labels: dict) -> dict: + metrics = {} + + if block_number := labels.get("number") or labels.get("block"): + metrics["block_number"] = int(block_number) + + return metrics @classmethod def from_taskiq( cls, - ident: SilverbackID, - handler_id: str, - block_number: int | None, - log_index: int | None, result: TaskiqResult, ) -> Self: + task_name = result.labels.pop("task_name", "") return cls( - instance=ident.identifier, - network=ident.network_choice, - handler_id=handler_id, - block_number=block_number, - log_index=log_index, - created=datetime.now(timezone.utc), - labels=result.labels, + task_name=task_name, execution_time=result.execution_time, - error=str(result.error), - metrics=cls._extract_metrics(result.return_value, handler_id), + error=str(result.error) if result.error else None, + metrics=cls._extract_custom_metrics(result.return_value, task_name), + **cls._extract_system_metrics(result.labels), ) diff --git a/silverback/types.py b/silverback/types.py index 2eb0cef3..2cb7a3a5 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,12 +1,29 @@ from datetime import datetime, timezone from decimal import Decimal from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+ -from typing import Literal, Protocol, get_args +from typing import Annotated, Literal, Protocol from pydantic import BaseModel, Field +from pydantic.functional_serializers import PlainSerializer +from taskiq import Context, TaskiqDepends, TaskiqState from typing_extensions import Self # Introduced 3.11 +def iso_format(dt: datetime) -> str: + return dt.isoformat() + + +def utc_now() -> datetime: + return datetime.now(timezone.utc) + + +UTCTimestamp = Annotated[ + datetime, + # TODO: Bug in TaskIQ can't serialize `datetime` + PlainSerializer(iso_format, return_type=str), +] + + class TaskType(str, Enum): STARTUP = "silverback_startup" # TODO: Shorten in 0.4.0 NEW_BLOCKS = "block" @@ -42,24 +59,24 @@ class SilverbackStartupState(BaseModel): last_block_processed: int -class BaseDatapoint(BaseModel): - type: str # discriminator - # NOTE: default value ensures we don't have to set this manually - time: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + +class _BaseDatapoint(BaseModel): + type: str # discriminator + + +# NOTE: only these types of data are implicitly converted e.g. `{"something": 1, "else": 0.001}` ScalarType = bool | int | float | Decimal -scalar_types = get_args(ScalarType) -class ScalarDatapoint(BaseDatapoint): +class ScalarDatapoint(_BaseDatapoint): type: Literal["scalar"] = "scalar" - - # NOTE: app-supported scalar value types: data: ScalarType -# This is what a Silverback app task must return to integrate properly with our data acq system -Metrics = dict[str, BaseDatapoint] -# Otherwise, log a warning and ignore any unconverted return value(s) +# NOTE: Other datapoint types must be explicitly used + +# TODO: Other datapoint types added to union here... +Datapoint = ScalarDatapoint From 55d23e279db2df764397a264aa8ef171ab9b4140 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Tue, 16 Apr 2024 22:24:28 -0400 Subject: [PATCH 03/28] refactor: change how state annotations work a bit --- example.py | 68 +++++++++++++++++++++++---------------- silverback/__init__.py | 4 +-- silverback/application.py | 14 +++++--- silverback/settings.py | 2 +- silverback/types.py | 57 +++++++++++++++----------------- 5 files changed, 78 insertions(+), 67 deletions(-) diff --git a/example.py b/example.py index 60d13456..b925343c 100644 --- a/example.py +++ b/example.py @@ -1,14 +1,11 @@ -from typing import Annotated - from ape import chain from ape.api import BlockAPI from ape.types import ContractLog from ape_tokens import tokens # type: ignore[import] -from taskiq import Context, TaskiqDepends, TaskiqState -from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState +from silverback import CircuitBreaker, SilverbackApp, WorkerState -# Do this to initialize your app +# Do this first to initialize your app app = SilverbackApp() # NOTE: Don't do any networking until after initializing app @@ -17,53 +14,68 @@ @app.on_startup() -def app_startup(startup_state: SilverbackStartupState): - return {"message": "Starting...", "block_number": startup_state.last_block_seen} +def app_startup(): + # NOTE: This is called just as the app is put into "run" state, + # and handled by the first available worker + # raise Exception # NOTE: Any exception raised on startup aborts immediately + return {"block_number": app.state.last_block_seen} + + +# Can handle some resource initialization for each worker, like LLMs or database connections +class MyDB: + def execute(self, query: str): + pass -# Can handle some initialization on startup, like models or network connections @app.on_worker_startup() -def worker_startup(state: TaskiqState): +def worker_startup(state: WorkerState): # NOTE: You need the type hint here + # NOTE: Can put anything here, any python object works + state.db = MyDB() state.block_count = 0 - # state.db = MyDB() - return {"message": "Worker started."} + # raise Exception # NOTE: Any exception raised on worker startup aborts immediately # This is how we trigger off of new blocks @app.on_(chain.blocks) -# context must be a type annotated kwarg to be provided to the task -def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): - context.state.block_count += 1 +# NOTE: The type hint for block is `BlockAPI`, but we parse it using `EcosystemAPI` +# NOTE: If you need something from worker state, you have to use the type hint +def exec_block(block: BlockAPI, state: WorkerState): + state.db.execute(f"some query {block.number}") return len(block.transactions) # This is how we trigger off of events # Set new_block_timeout to adjust the expected block time. @app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25) -# NOTE: Typing isn't required +# NOTE: Typing isn't required, it will still be an Ape `ContractLog` type def exec_event1(log): if log.log_index % 7 == 3: - # If you ever want the app to shutdown under some scenario, call this exception - raise CircuitBreaker("Oopsie!") + # If you raise any exception, Silverback will track the failure and keep running + # NOTE: By default, if you have 3 tasks fail in a row, the app will shutdown itself + raise ValueError("I don't like the number 3.") + return {"amount": log.amount} @app.on_(YFI.Approval) # Any handler function can be async too async def exec_event2(log: ContractLog): - return log.amount - + if log.log_index % 7 == 6: + # If you ever want the app to immediately shutdown under some scenario, raise this exception + raise CircuitBreaker("Oopsie!") -# Just in case you need to release some resources or something -@app.on_worker_shutdown() -def worker_shutdown(state): - return { - "message": f"Worker stopped after handling {state.block_count} blocks.", - "block_count": state.block_count, - } + return log.amount # A final job to execute on Silverback shutdown @app.on_shutdown() -def app_shutdown(state): - return {"message": "Stopping..."} +def app_shutdown(): + # raise Exception # NOTE: Any exception raised on shutdown is ignored + return {"block_number": app.state.last_block_processed} + + +# Just in case you need to release some resources or something inside each worker +@app.on_worker_shutdown() +def worker_shutdown(state): + state.db = None + # raise Exception # NOTE: Any exception raised on worker shutdown is ignored diff --git a/silverback/__init__.py b/silverback/__init__.py index dd26b077..5ee44f1c 100644 --- a/silverback/__init__.py +++ b/silverback/__init__.py @@ -1,10 +1,10 @@ from .application import SilverbackApp from .exceptions import CircuitBreaker, SilverbackException -from .types import SilverbackStartupState +from .types import WorkerState __all__ = [ "CircuitBreaker", "SilverbackApp", "SilverbackException", - "SilverbackStartupState", + "WorkerState", ] diff --git a/silverback/application.py b/silverback/application.py index d1373cb2..87bed0fa 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -46,9 +46,11 @@ def __init__(self, settings: Settings | None = None): if not settings: settings = Settings() - self.network = settings.get_provider_context() + self.name = settings.APP_NAME + + network = settings.get_provider_context() # NOTE: This allows using connected ape methods e.g. `Contract` - provider = self.network.__enter__() + provider = network.__enter__() # Adjust defaults from connection if settings.NEW_BLOCK_TIMEOUT is None and ( @@ -64,23 +66,25 @@ def __init__(self, settings: Settings | None = None): self.tasks: defaultdict[TaskType, list[TaskData]] = defaultdict(list) self.poll_settings: dict[str, dict] = {} - atexit.register(self.network.__exit__, None, None, None) + atexit.register(network.__exit__, None, None, None) self.signer = settings.get_signer() self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT self.start_block = settings.START_BLOCK - network_str = f'\n NETWORK="{provider.network.ecosystem.name}:{provider.network.name}"' + self.network_choice = f"{provider.network.ecosystem.name}:{provider.network.name}" signer_str = f"\n SIGNER={repr(self.signer)}" start_block_str = f"\n START_BLOCK={self.start_block}" if self.start_block else "" new_block_timeout_str = ( f"\n NEW_BLOCK_TIMEOUT={self.new_block_timeout}" if self.new_block_timeout else "" ) logger.info( - f"Loaded Silverback App:{network_str}" + f'Loaded Silverback App:\n NETWORK="{self.network_choice}"' f"{signer_str}{start_block_str}{new_block_timeout_str}" ) + self.state = None # Runner manages this + def broker_task_decorator( self, task_type: TaskType, diff --git a/silverback/settings.py b/silverback/settings.py index d9f4d65e..bdefc2ed 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -23,7 +23,7 @@ class Settings(BaseSettings, ManagerAccessMixin): """ # A unique identifier for this silverback instance - INSTANCE: str = "default" + APP_NAME: str = "bot" BROKER_CLASS: str = "taskiq:InMemoryBroker" BROKER_URI: str = "" diff --git a/silverback/types.py b/silverback/types.py index 2cb7a3a5..6d87393d 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,27 +1,11 @@ from datetime import datetime, timezone from decimal import Decimal from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+ -from typing import Annotated, Literal, Protocol +from typing import Annotated, Literal from pydantic import BaseModel, Field from pydantic.functional_serializers import PlainSerializer from taskiq import Context, TaskiqDepends, TaskiqState -from typing_extensions import Self # Introduced 3.11 - - -def iso_format(dt: datetime) -> str: - return dt.isoformat() - - -def utc_now() -> datetime: - return datetime.now(timezone.utc) - - -UTCTimestamp = Annotated[ - datetime, - # TODO: Bug in TaskIQ can't serialize `datetime` - PlainSerializer(iso_format, return_type=str), -] class TaskType(str, Enum): @@ -34,33 +18,44 @@ def __str__(self) -> str: return self.value -class ISilverbackSettings(Protocol): - """Loose approximation of silverback.settings.Settings. If you can, use the class as - a type reference.""" +class SilverbackID(BaseModel): + name: str + ecosystem: str + network: str - INSTANCE: str - RECORDER_CLASS: str | None - def get_network_choice(self) -> str: - ... +def iso_format(dt: datetime) -> str: + return dt.isoformat() -class SilverbackID(BaseModel): - identifier: str - network_choice: str +def utc_now() -> datetime: + return datetime.now(timezone.utc) + - @classmethod - def from_settings(cls, settings_: ISilverbackSettings) -> Self: - return cls(identifier=settings_.INSTANCE, network_choice=settings_.get_network_choice()) +UTCTimestamp = Annotated[ + datetime, + # TODO: Bug in TaskIQ can't serialize `datetime` + PlainSerializer(iso_format, return_type=str), +] -class SilverbackStartupState(BaseModel): +class AppState(BaseModel): + # Last block number seen by runner last_block_seen: int + + # Last block number processed by a worker last_block_processed: int + # Last time the state was updated + # NOTE: intended to use default when creating a model with this type + last_updated: UTCTimestamp = Field(default_factory=utc_now) + +def get_worker_state(context: Annotated[Context, TaskiqDepends()]) -> TaskiqState: + return context.state +WorkerState = Annotated[TaskiqState, TaskiqDepends(get_worker_state)] class _BaseDatapoint(BaseModel): From 0980bd623ab80e5b119b42fd87ceaa91562f9c4b Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Tue, 16 Apr 2024 22:26:34 -0400 Subject: [PATCH 04/28] refactor!: remove SQLRecorder, replace with JSONLineRecorder --- .gitignore | 1 + silverback/recorder.py | 339 ++++++++++------------------------------- 2 files changed, 78 insertions(+), 262 deletions(-) diff --git a/.gitignore b/.gitignore index 7881af65..404f2291 100644 --- a/.gitignore +++ b/.gitignore @@ -119,6 +119,7 @@ version.py # Ape stuff .build/ +.silverback-sessions/ **/.DS_Store *.swp diff --git a/silverback/recorder.py b/silverback/recorder.py index 97be6777..730df1b8 100644 --- a/silverback/recorder.py +++ b/silverback/recorder.py @@ -1,9 +1,6 @@ -import json -import os -import sqlite3 from abc import ABC, abstractmethod -from datetime import datetime, timezone -from typing import Any, TypeVar +from pathlib import Path +from typing import Any, Iterator from ape.logging import get_logger from pydantic import BaseModel, Field @@ -11,6 +8,7 @@ from typing_extensions import Self # Introduced 3.11 from .types import ( + AppState, Datapoint, ScalarDatapoint, ScalarType, @@ -20,21 +18,9 @@ utc_now, ) -_HandlerReturnType = TypeVar("_HandlerReturnType") - logger = get_logger(__name__) -class SilverbackState(BaseModel): - instance: str - network: str - # Last block number seen by runner - last_block_seen: int - # Last block number processed by a worker - last_block_processed: int - updated: datetime - - class TaskResult(BaseModel): # NOTE: Model must eventually serialize using PyArrow/Parquet for long-term storage @@ -110,275 +96,104 @@ def from_taskiq( class BaseRecorder(ABC): - @abstractmethod - async def init(self): - """Handle any async initialization from Silverback settings (e.g. migrations).""" - ... + """ + Base class used for managing persistent application state, and serializing task results + to an external data recording process. - @abstractmethod - async def get_state(self, ident: SilverbackID) -> SilverbackState | None: - """Return the stored state for a Silverback instance""" - ... + NOTE: Persistent state and task results can be managed using two different solutions - @abstractmethod - async def set_state( - self, ident: SilverbackID, last_block_seen: int, last_block_processed: int - ) -> SilverbackState | None: - """Set the stored state for a Silverback instance""" - ... + Recorders are configured using the following environment variable: - @abstractmethod - async def get_latest_result( - self, ident: SilverbackID, handler: str | None = None - ) -> HandlerResult | None: - """Return the latest result for a Silverback instance's handler""" - ... + - `SILVERBACK_RECORDER_CLASS`: Any fully qualified subclass of `BaseRecorder` as a string + """ @abstractmethod - async def add_result(self, v: HandlerResult): - """Store a result for a Silverback instance's handler""" - ... - + async def init(self, app_id: SilverbackID) -> AppState | None: + """ + Handle any async initialization from Silverback settings (e.g. migrations). -class SQLiteRecorder(BaseRecorder): - """ - SQLite implementation of BaseRecorder used to store application state and handler - result data. + Returns startup state, if available. + """ - Usage: + @abstractmethod + async def set_state(self, app_state: AppState): + """Set the stored state for a Silverback instance""" - To use SQLite recorder, you must configure the following env vars: + @abstractmethod + async def add_result(self, result: TaskResult): + """Store a result for a Silverback instance's handler""" - - `RECORDER_CLASS`: `silverback.recorder.SQLiteRecorder` - - `SQLITE_PATH` (optional): A system file path or if blank it will be stored in-memory. - """ - SQL_GET_STATE = """ - SELECT last_block_seen, last_block_processed, updated - FROM silverback_state - WHERE instance = ? AND network = ?; - """ - SQL_INSERT_STATE = """ - INSERT INTO silverback_state ( - instance, network, last_block_seen, last_block_processed, updated - ) - VALUES (?, ?, ?, ?, ?); - """ - SQL_UPDATE_STATE = """ - UPDATE silverback_state - SET last_block_seen = ?, last_block_processed = ?, updated = ? - WHERE instance = ? AND network = ?; - """ - SQL_GET_RESULT_LATEST = """ - SELECT handler_id, block_number, log_index, execution_time, error, created, - metrics_blob - FROM silverback_result - WHERE instance = ? AND network = ? - ORDER BY created DESC - LIMIT 1; - """ - SQL_GET_HANDLER_LATEST = """ - SELECT handler_id, block_number, log_index, execution_time, error, created, - metrics_blob - FROM silverback_result - WHERE instance = ? AND network = ? AND handler_id = ? - ORDER BY created DESC - LIMIT 1; - """ - SQL_INSERT_RESULT = """ - INSERT INTO silverback_result ( - instance, network, handler_id, block_number, log_index, execution_time, - error, created, metrics_blob - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); +class JSONLineRecorder(BaseRecorder): """ + Very basic implementation of BaseRecorder used to store application state and handler + result data by storing/retreiving state from a JSON-encoded file, and appending task + results to a file containing newline-separated JSON entries (https://jsonlines.org/). - con: sqlite3.Connection | None - initialized: bool = False - - async def init(self): - self.con = sqlite3.connect(os.environ.get("SQLITE_PATH", ":memory:")) - - cur = self.con.cursor() - cur.executescript( - """ - BEGIN; - CREATE TABLE IF NOT EXISTS silverback_state ( - instance text, - network text, - last_block_seen int, - last_block_processed int, - updated int - ); - CREATE TABLE IF NOT EXISTS silverback_result ( - instance text, - network text, - handler_id text, - block_number int, - log_index int, - execution_time real, - error text, - created int, - metrics_blob blob - ); - CREATE UNIQUE INDEX IF NOT EXISTS silverback_state__instance - ON silverback_state(instance, network); - CREATE INDEX IF NOT EXISTS silverback_result__instance - ON silverback_result (instance, network); - CREATE INDEX IF NOT EXISTS silverback_result__handler - ON silverback_result (instance, network, handler_id); - CREATE INDEX IF NOT EXISTS silverback_result__error - ON silverback_result (error); - COMMIT; - """ - ) - cur.close() + The file structure that this Recorder uses leverages the value of `SILVERBACK_APP_NAME` + as well as the configured network to determine the location where files get saved: - if not self.con: - raise Exception("Failed to setup SQLite connection") + ./.silverback-sessions/ + / + / + state.json # always write here + session-.json # start time of each app session - self.initialized = True + Each app "session" (everytime the Runner is started up via `silverback run`) is recorded + in a separate file with the timestamp of the first handled task in its filename. - async def get_state(self, ident: SilverbackID) -> SilverbackState | None: - if not self.initialized: - await self.init() + Note that this format can be read by basic means (even in a JS frontend), or read + efficiently via Apache Arrow for more efficient big data processing: - assert self.con is not None + https://arrow.apache.org/docs/python/json.html - cur = self.con.cursor() - res = cur.execute( - self.SQL_GET_STATE, - (ident.identifier, ident.network_choice), - ) - row = res.fetchone() + Usage: - cur.close() + To use this recorder, you must configure the following environment variable: - if row is None: - return None + - `SILVERBACK_RECORDER_CLASS`: `"silverback.recorder.JSONLineRecorder"` - return SilverbackState( - instance=ident.identifier, - network=ident.network_choice, - last_block_seen=row[0], - last_block_processed=row[1], - updated=datetime.fromtimestamp(row[2], timezone.utc), - ) + You may also want to give your app a unique name so the data does not get overwritten, + if you are using multiple apps from the same directory: - async def set_state( - self, ident: SilverbackID, last_block_seen: int, last_block_processed: int - ) -> SilverbackState | None: - if not self.initialized: - await self.init() - - assert self.con is not None + - `SILVERBACK_APP_NAME`: Any alphabetical string valid as a folder name + """ - cur = self.con.cursor() - res = cur.execute( - self.SQL_GET_STATE, - (ident.identifier, ident.network_choice), - ) - row = res.fetchone() - - now = datetime.now(timezone.utc) - now_stamp = int(now.timestamp()) - - if row is None: - cur.execute( - self.SQL_INSERT_STATE, - ( - ident.identifier, - ident.network_choice, - last_block_seen, - last_block_processed, - now_stamp, - ), - ) - else: - cur.execute( - self.SQL_UPDATE_STATE, - ( - last_block_seen, - last_block_processed, - now_stamp, - ident.identifier, - ident.network_choice, - ), - ) - - cur.close() - self.con.commit() - - return SilverbackState( - instance=ident.identifier, - network=ident.network_choice, - last_block_seen=last_block_seen, - last_block_processed=last_block_processed, - updated=now, + async def init(self, app_id: SilverbackID) -> AppState | None: + data_folder = ( + Path.cwd() / ".silverback-sessions" / app_id.name / app_id.ecosystem / app_id.network ) + data_folder.mkdir(parents=True, exist_ok=True) - async def get_latest_result( - self, ident: SilverbackID, handler: str | None = None - ) -> HandlerResult | None: - if not self.initialized: - await self.init() - - assert self.con is not None - - cur = self.con.cursor() - - if handler is not None: - res = cur.execute( - self.SQL_GET_HANDLER_LATEST, - (ident.identifier, ident.network_choice, handler), - ) - else: - res = cur.execute( - self.SQL_GET_RESULT_LATEST, - (ident.identifier, ident.network_choice), - ) - - row = res.fetchone() - - cur.close() - - if row is None: - return None - - return HandlerResult( - instance=ident.identifier, - network=ident.network_choice, - handler_id=row[0], - block_number=row[1], - log_index=row[2], - execution_time=row[3], - error=row[4], - created=datetime.fromtimestamp(row[5], timezone.utc), - metrics=json.loads(row[6]), - ) + self.state_backup_file = data_folder / "state.json" + self.session_results_file = data_folder / f"session-{iso_format(utc_now())}.jsonl" - async def add_result(self, v: HandlerResult): - if not self.initialized: - await self.init() - - assert self.con is not None - - cur = self.con.cursor() - - cur.execute( - self.SQL_INSERT_RESULT, - ( - v.instance, - v.network, - v.handler_id, - v.block_number, - v.log_index, - v.execution_time, - v.error, - v.created, - json.dumps({n: m.model_dump_json() for n, m in v.metrics.items()}), - ), + return ( + AppState.parse_file(self.state_backup_file) if self.state_backup_file.exists() else None ) - cur.close() - self.con.commit() + async def set_state(self, state: AppState): + self.state_backup_file.write_text(state.model_dump_json()) + + async def add_result(self, result: TaskResult): + # NOTE: mode `a` means "append to file if exists" + # NOTE: JSONNL convention requires the use of `\n` as newline char + with self.session_results_file.open("a") as writer: + writer.write(result.model_dump_json()) + writer.write("\n") + + +def get_metrics(session: Path | str, task_name: str) -> Iterator[dict]: + with open(session, "r") as file: + for line in file: + if ( + (result := TaskResult.model_validate_json(line)) + and result.task_name == task_name + and not result.error + ): + yield { + "block_number": result.block_number, + "execution_time": result.execution_time, + "completed": result.completed, + **{name: datapoint.data for name, datapoint in result.metrics.items()}, + } From 92338397df9478d48c5a8ef4b06f6cb892c06e8e Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Tue, 16 Apr 2024 22:28:59 -0400 Subject: [PATCH 05/28] refactor: migrate CircuitBreaker exception to subclass of Halt --- silverback/exceptions.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/silverback/exceptions.py b/silverback/exceptions.py index 125e85a0..aa262e75 100644 --- a/silverback/exceptions.py +++ b/silverback/exceptions.py @@ -1,7 +1,6 @@ from typing import Any from ape.exceptions import ApeException -from ape.logging import logger from .types import TaskType @@ -36,9 +35,8 @@ def __init__(self): super().__init__("App halted, must restart manually") -class CircuitBreaker(SilverbackException): +class CircuitBreaker(Halt): """Custom exception (created by user) that will trigger an application shutdown.""" def __init__(self, message: str): - logger.error(message) - super().__init__(message) + super(SilverbackException, self).__init__(message) From 5cc0d0d8137908fdf3a959fdbc9c0319a541cb49 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Tue, 16 Apr 2024 22:37:38 -0400 Subject: [PATCH 06/28] refactor!: migrate recorder config to CLI callback also refactor result handling --- silverback/_cli.py | 19 ++++- silverback/middlewares.py | 24 +----- silverback/runner.py | 157 ++++++++++++++++++-------------------- 3 files changed, 92 insertions(+), 108 deletions(-) diff --git a/silverback/_cli.py b/silverback/_cli.py index 4a604690..f3a48f1d 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -34,6 +34,16 @@ def _runner_callback(ctx, param, val): raise ValueError(f"Failed to import runner '{val}'.") +def _recorder_callback(ctx, param, val): + if not val: + return None + + elif recorder := import_from_string(val): + return recorder() + + raise ValueError(f"Failed to import recorder '{val}'.") + + def _account_callback(ctx, param, val): if val: val = val.alias.replace("dev_", "TEST::") @@ -92,11 +102,16 @@ async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90): help="An import str in format ':'", callback=_runner_callback, ) +@click.option( + "--recorder", + help="An import string in format ':'", + callback=_recorder_callback, +) @click.option("-x", "--max-exceptions", type=int, default=3) @click.argument("path") -def run(cli_ctx, account, runner, max_exceptions, path): +def run(cli_ctx, account, runner, recorder, max_exceptions, path): app = import_from_string(path) - runner = runner(app, max_exceptions=max_exceptions) + runner = runner(app, recorder=recorder, max_exceptions=max_exceptions) asyncio.run(runner.run()) diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 3f50ecd5..a57cd6bc 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -6,8 +6,7 @@ from eth_utils.conversions import to_hex from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult -from silverback.recorder import HandlerResult -from silverback.types import SilverbackID, TaskType +from silverback.types import TaskType from silverback.utils import hexbytes_dict @@ -22,11 +21,7 @@ def compute_block_time() -> int: return int((head.timestamp - genesis.timestamp) / head.number) - settings = kwargs.pop("silverback_settings") - self.block_time = self.chain_manager.provider.network.block_time or compute_block_time() - self.ident = SilverbackID.from_settings(settings) - self.recorder = settings.get_recorder() def pre_send(self, message: TaskiqMessage) -> TaskiqMessage: # TODO: Necessary because bytes/HexBytes doesn't encode/deocde well for some reason @@ -97,21 +92,4 @@ def post_execute(self, message: TaskiqMessage, result: TaskiqResult): f"{self._create_label(message)} " f"- {result.execution_time:.3f}s{percent_display}" ) - async def post_save(self, message: TaskiqMessage, result: TaskiqResult): - if not self.recorder: - return - - handler_result = HandlerResult.from_taskiq( - self.ident, - message.task_name, - message.labels.get("block_number"), - message.labels.get("log_index"), - result, - ) - - try: - await self.recorder.add_result(handler_result) - except Exception as err: - logger.error(f"Error storing result: {err}") - # NOTE: Unless stdout is ignored, error traceback appears in stdout, no need for `on_error` diff --git a/silverback/runner.py b/silverback/runner.py index 493c4708..af45d516 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -6,67 +6,82 @@ from ape.logging import logger from ape.utils import ManagerAccessMixin from ape_ethereum.ecosystem import keccak -from taskiq import AsyncTaskiqDecoratedTask, TaskiqResult +from taskiq import AsyncTaskiqDecoratedTask, AsyncTaskiqTask from .application import SilverbackApp from .exceptions import Halt, NoWebsocketAvailableError -from .recorder import BaseRecorder -from .settings import Settings +from .recorder import BaseRecorder, TaskResult from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .types import SilverbackID, SilverbackStartupState, TaskType +from .types import AppState, SilverbackID, TaskType from .utils import async_wrap_iter, hexbytes_dict -settings = Settings() - class BaseRunner(ABC): - def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs): + def __init__( + self, + app: SilverbackApp, + *args, + max_exceptions: int = 3, + recorder: Optional[BaseRecorder] = None, + **kwargs, + ): self.app = app + self.recorder = recorder self.max_exceptions = max_exceptions self.exceptions = 0 - self.last_block_seen = 0 - self.last_block_processed = 0 - self.recorder: BaseRecorder | None = None - self.ident = SilverbackID.from_settings(settings) - def _handle_result(self, result: TaskiqResult): - if result.is_err: - self.exceptions += 1 + ecosystem_name, network_name = app.network_choice.split(":") + self.identifier = SilverbackID( + name=app.name, + ecosystem=ecosystem_name, + network=network_name, + ) + + logger.info(f"Using {self.__class__.__name__}: max_exceptions={self.max_exceptions}") + + async def _handle_task(self, task: AsyncTaskiqTask): + result = await task.wait_result() + + if self.recorder: + await self.recorder.add_result(TaskResult.from_taskiq(result)) - else: + if not result.is_err: # NOTE: Reset exception counter self.exceptions = 0 + return - if self.exceptions > self.max_exceptions: - raise Halt() from result.error + self.exceptions += 1 + + if self.exceptions > self.max_exceptions or isinstance(result.error, Halt): + result.raise_for_error() async def _checkpoint( - self, last_block_seen: int = 0, last_block_processed: int = 0 - ) -> tuple[int, int]: + self, + last_block_seen: int | None = None, + last_block_processed: int | None = None, + ): """Set latest checkpoint block number""" - if ( - last_block_seen > self.last_block_seen - or last_block_processed > self.last_block_processed - ): - logger.debug( - ( - f"Checkpoint block [seen={self.last_block_seen}, " - f"procssed={self.last_block_processed}]" - ) + assert self.app.state, f"{self.__class__.__name__}.run() not triggered." + + logger.debug( + ( + f"Checkpoint block [seen={self.app.state.last_block_seen}, " + f"procssed={self.app.state.last_block_processed}]" ) - self.last_block_seen = max(last_block_seen, self.last_block_seen) - self.last_block_processed = max(last_block_processed, self.last_block_processed) + ) - if self.recorder: - try: - await self.recorder.set_state( - self.ident, self.last_block_seen, self.last_block_processed - ) - except Exception as err: - logger.error(f"Error settings state: {err}") + if last_block_seen: + self.app.state.last_block_seen = last_block_seen + if last_block_processed: + self.app.state.last_block_processed = last_block_processed + + if self.recorder: + try: + await self.recorder.set_state(self.app.state) - return self.last_block_seen, self.last_block_processed + except Exception as err: + logger.error(f"Error setting state: {err}") @abstractmethod async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): @@ -92,14 +107,14 @@ async def run(self): Raises: :class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute. """ - self.recorder = settings.get_recorder() + # Initialize recorder (if available) and fetch state if app has been run previously + if self.recorder and (startup_state := (await self.recorder.init(app_id=self.identifier))): + self.app.state = startup_state - if self.recorder: - boot_state = await self.recorder.get_state(self.ident) - if boot_state: - self.last_block_seen = boot_state.last_block_seen - self.last_block_processed = boot_state.last_block_processed + else: # use empty state + self.app.state = AppState(last_block_seen=-1, last_block_processed=-1) + # Initialize broker (run worker startup events) await self.app.broker.startup() # Execute Silverback startup task before we init the rest @@ -143,7 +158,6 @@ class WebsocketRunner(BaseRunner, ManagerAccessMixin): def __init__(self, app: SilverbackApp, *args, **kwargs): super().__init__(app, *args, **kwargs) - logger.info(f"Using {self.__class__.__name__}: max_exceptions={self.max_exceptions}") # Check for websocket support if not (ws_uri := app.chain_manager.provider.ws_uri): @@ -158,16 +172,9 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): async for raw_block in self.subscriptions.get_subscription_data(sub_id): block = self.provider.network.ecosystem.decode_block(hexbytes_dict(raw_block)) - if block.number is not None: - await self._checkpoint(last_block_seen=block.number) - - block_task = await block_handler.kiq(raw_block) - result = await block_task.wait_result() - - self._handle_result(result) - - if block.number is not None: - await self._checkpoint(last_block_processed=block.number) + await self._checkpoint(last_block_seen=block.number) + await self._handle_task(await block_handler.kiq(raw_block)) + await self._checkpoint(last_block_processed=block.number) async def _event_task( self, contract_event: ContractEvent, event_handler: AsyncTaskiqDecoratedTask @@ -181,7 +188,9 @@ async def _event_task( address=contract_event.contract.address, topics=["0x" + keccak(text=contract_event.abi.selector).hex()], ) - logger.debug(f"Handling '{contract_event.name}' events via {sub_id}") + logger.debug( + f"Handling '{contract_event.contract.address}:{contract_event.name}' logs via {sub_id}" + ) async for raw_event in self.subscriptions.get_subscription_data(sub_id): event = next( # NOTE: `next` is okay since it only has one item @@ -191,15 +200,9 @@ async def _event_task( ) ) - if event.block_number is not None: - await self._checkpoint(last_block_seen=event.block_number) - - event_task = await event_handler.kiq(event) - result = await event_task.wait_result() - self._handle_result(result) - - if event.block_number is not None: - await self._checkpoint(last_block_processed=event.block_number) + await self._checkpoint(last_block_seen=event.block_number) + await self._handle_task(await event_handler.kiq(event)) + await self._checkpoint(last_block_processed=event.block_number) async def run(self): async with Web3SubscriptionsManager(self.ws_uri) as subscriptions: @@ -234,15 +237,9 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): async for block in async_wrap_iter( chain.blocks.poll_blocks(start_block=start_block, new_block_timeout=new_block_timeout) ): - if block.number is not None: - await self._checkpoint(last_block_seen=block.number) - - block_task = await block_handler.kiq(block) - result = await block_task.wait_result() - self._handle_result(result) - - if block.number is not None: - await self._checkpoint(last_block_processed=block.number) + await self._checkpoint(last_block_seen=block.number) + await self._handle_task(await block_handler.kiq(block)) + await self._checkpoint(last_block_processed=block.number) async def _event_task( self, contract_event: ContractEvent, event_handler: AsyncTaskiqDecoratedTask @@ -264,12 +261,6 @@ async def _event_task( async for event in async_wrap_iter( contract_event.poll_logs(start_block=start_block, new_block_timeout=new_block_timeout) ): - if event.block_number is not None: - await self._checkpoint(last_block_seen=event.block_number) - - event_task = await event_handler.kiq(event) - result = await event_task.wait_result() - self._handle_result(result) - - if event.block_number is not None: - await self._checkpoint(last_block_processed=event.block_number) + await self._checkpoint(last_block_seen=event.block_number) + await self._handle_task(await event_handler.kiq(event)) + await self._checkpoint(last_block_processed=event.block_number) From cc3cfffd12f4c64b3e237f6d818eb17bd03f35c4 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Tue, 16 Apr 2024 22:39:01 -0400 Subject: [PATCH 07/28] refactor!: clean up startup process significantly --- silverback/exceptions.py | 16 ++++++++- silverback/runner.py | 75 ++++++++++++++++++++++++++-------------- 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/silverback/exceptions.py b/silverback/exceptions.py index aa262e75..7bde82b0 100644 --- a/silverback/exceptions.py +++ b/silverback/exceptions.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Sequence from ape.exceptions import ApeException @@ -30,6 +30,20 @@ class SilverbackException(ApeException): """Base Exception for any Silverback runtime faults.""" +# TODO: `ExceptionGroup` added in Python 3.11 +class StartupFailure(SilverbackException): + def __init__(self, *exceptions: Sequence[Exception]): + if error_str := "\n".join(str(e) for e in exceptions): + super().__init__(f"Startup failure(s):\n{error_str}") + else: + super().__init__("Startup failure(s) detected. See logs for details.") + + +class NoTasksAvailableError(SilverbackException): + def __init__(self): + super().__init__("No tasks to execute") + + class Halt(SilverbackException): def __init__(self): super().__init__("App halted, must restart manually") diff --git a/silverback/runner.py b/silverback/runner.py index af45d516..3f318754 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -9,7 +9,7 @@ from taskiq import AsyncTaskiqDecoratedTask, AsyncTaskiqTask from .application import SilverbackApp -from .exceptions import Halt, NoWebsocketAvailableError +from .exceptions import Halt, NoTasksAvailableError, NoWebsocketAvailableError, StartupFailure from .recorder import BaseRecorder, TaskResult from .subscriptions import SubscriptionType, Web3SubscriptionsManager from .types import AppState, SilverbackID, TaskType @@ -105,7 +105,10 @@ async def run(self): and process them by kicking events over to the configured broker. Raises: - :class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute. + :class:`~silverback.exceptions.StartupFailure`: + If there was an exception during startup. + :class:`~silverback.exceptions.NoTasksAvailableError`: + If there are no configured tasks to execute. """ # Initialize recorder (if available) and fetch state if app has been run previously if self.recorder and (startup_state := (await self.recorder.init(app_id=self.identifier))): @@ -118,35 +121,57 @@ async def run(self): await self.app.broker.startup() # Execute Silverback startup task before we init the rest - for startup_task in self.app.tasks[TaskType.STARTUP]: - task = await startup_task.handler.kiq( - SilverbackStartupState( - last_block_seen=self.last_block_seen, - last_block_processed=self.last_block_processed, - ) + if startup_tasks := await asyncio.gather( + *(task_def.handler.kiq() for task_def in self.app.tasks[TaskType.STARTUP]) + ): + results = await asyncio.gather( + *(startup_task.wait_result() for startup_task in startup_tasks) ) - result = await task.wait_result() - self._handle_result(result) - tasks = [] - for task in self.app.tasks[TaskType.NEW_BLOCKS]: - tasks.append(self._block_task(task.handler)) + if any(result.is_err for result in results): + # NOTE: Abort before even starting to run + raise StartupFailure(*(result.error for result in results if result.is_err)) + # NOTE: No need to handle results otherwise + + # Create our long-running event listeners + # NOTE: Any propagated failure in here should be handled such that shutdown tasks also run + # TODO: `asyncio.TaskGroup` added in Python 3.11 + listener_tasks = ( + *( + asyncio.create_task(self._block_task(task_def.handler)) + for task_def in self.app.tasks[TaskType.NEW_BLOCKS] + ), + *( + asyncio.create_task(self._event_task(task_def.container, task_def.handler)) + for task_def in self.app.tasks[TaskType.EVENT_LOG] + ), + ) - for task in self.app.tasks[TaskType.EVENT_LOG]: - tasks.append(self._event_task(task.container, task.handler)) + # NOTE: Safe to do this because no tasks have been scheduled to run yet + if len(listener_tasks) == 0: + raise NoTasksAvailableError() - if len(tasks) == 0: - raise Halt("No tasks to execute") + # Run until one task bubbles up an exception that should stop execution + tasks_with_errors, tasks_running = await asyncio.wait( + listener_tasks, return_when=asyncio.FIRST_EXCEPTION + ) + if runtime_errors := "\n".join(str(task.exception()) for task in tasks_with_errors): + # NOTE: In case we are somehow not displaying the error correctly with task status + logger.debug(f"Runtime error(s) detected, shutting down:\n{runtime_errors}") - try: - await asyncio.gather(*tasks) - except Exception as e: - logger.error(f"Fatal error detected, shutting down: '{e}'") + # Cancel any still running + (task.cancel() for task in tasks_running) + # NOTE: All listener tasks are shut down now - # Execute Silverback shutdown task before shutting down the broker - for shutdown_task in self.app.tasks[TaskType.SHUTDOWN]: - task = await shutdown_task.handler.kiq() - result = self._handle_result(await task.wait_result()) + # Execute Silverback shutdown task(s) before shutting down the broker and app + if shutdown_tasks := await asyncio.gather( + *(task_def.handler.kiq() for task_def in self.app.tasks[TaskType.SHUTDOWN]) + ): + asyncio.gather(*(shutdown_task.is_ready() for shutdown_task in shutdown_tasks)) + if any(result.is_err for result in results): + errors_str = "\n".join(str(result.error) for result in results if result.is_err) + logger.error(f"Errors while shutting down:\n{errors_str}") + # NOTE: No need to handle results otherwise await self.app.broker.shutdown() From 1f86d981c8405bc93a6e1674e7a911260f02873b Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Tue, 16 Apr 2024 22:44:36 -0400 Subject: [PATCH 08/28] fix: don't use py 3.10 unions yet --- silverback/recorder.py | 2 +- silverback/types.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/silverback/recorder.py b/silverback/recorder.py index 730df1b8..7813b06e 100644 --- a/silverback/recorder.py +++ b/silverback/recorder.py @@ -183,7 +183,7 @@ async def add_result(self, result: TaskResult): writer.write("\n") -def get_metrics(session: Path | str, task_name: str) -> Iterator[dict]: +def get_metrics(session: Path, task_name: str) -> Iterator[dict]: with open(session, "r") as file: for line in file: if ( diff --git a/silverback/types.py b/silverback/types.py index 6d87393d..e53d8d38 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,7 +1,7 @@ from datetime import datetime, timezone from decimal import Decimal from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+ -from typing import Annotated, Literal +from typing import Annotated, Literal, Union from pydantic import BaseModel, Field from pydantic.functional_serializers import PlainSerializer @@ -63,7 +63,7 @@ class _BaseDatapoint(BaseModel): # NOTE: only these types of data are implicitly converted e.g. `{"something": 1, "else": 0.001}` -ScalarType = bool | int | float | Decimal +ScalarType = Union[bool, int, float, Decimal] class ScalarDatapoint(_BaseDatapoint): From acccdbb4c25aa6e08c469c5fd8bdabf9f5e15316 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Tue, 16 Apr 2024 22:49:22 -0400 Subject: [PATCH 09/28] fix: missing Annotated from py 3.8 --- silverback/types.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/silverback/types.py b/silverback/types.py index e53d8d38..98aab824 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,11 +1,12 @@ from datetime import datetime, timezone from decimal import Decimal from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+ -from typing import Annotated, Literal, Union +from typing import Literal, Union from pydantic import BaseModel, Field from pydantic.functional_serializers import PlainSerializer from taskiq import Context, TaskiqDepends, TaskiqState +from typing_extensions import Annotated # Introduced 3.9 class TaskType(str, Enum): From 086df4ee83a53d3d44d4826a79d16313b1edd7a5 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 14:40:59 -0400 Subject: [PATCH 10/28] refactor: fix rebase misses --- silverback/recorder.py | 2 +- silverback/runner.py | 2 +- silverback/types.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/silverback/recorder.py b/silverback/recorder.py index 7813b06e..1931e54e 100644 --- a/silverback/recorder.py +++ b/silverback/recorder.py @@ -151,7 +151,7 @@ class JSONLineRecorder(BaseRecorder): To use this recorder, you must configure the following environment variable: - - `SILVERBACK_RECORDER_CLASS`: `"silverback.recorder.JSONLineRecorder"` + - `SILVERBACK_RECORDER_CLASS`: `"silverback.recorder:JSONLineRecorder"` You may also want to give your app a unique name so the data does not get overwritten, if you are using multiple apps from the same directory: diff --git a/silverback/runner.py b/silverback/runner.py index 3f318754..d0612387 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -22,7 +22,7 @@ def __init__( app: SilverbackApp, *args, max_exceptions: int = 3, - recorder: Optional[BaseRecorder] = None, + recorder: BaseRecorder | None = None, **kwargs, ): self.app = app diff --git a/silverback/types.py b/silverback/types.py index 98aab824..1bd870d6 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,7 +1,7 @@ from datetime import datetime, timezone from decimal import Decimal from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+ -from typing import Literal, Union +from typing import Literal from pydantic import BaseModel, Field from pydantic.functional_serializers import PlainSerializer @@ -64,7 +64,7 @@ class _BaseDatapoint(BaseModel): # NOTE: only these types of data are implicitly converted e.g. `{"something": 1, "else": 0.001}` -ScalarType = Union[bool, int, float, Decimal] +ScalarType = bool | int | float | Decimal class ScalarDatapoint(_BaseDatapoint): From a0ac64534bda0cc4bb47ec5d1ea05d51401a616b Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 15:04:40 -0400 Subject: [PATCH 11/28] refactor: use a more recent block number --- example.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example.py b/example.py index b925343c..0d814591 100644 --- a/example.py +++ b/example.py @@ -46,7 +46,7 @@ def exec_block(block: BlockAPI, state: WorkerState): # This is how we trigger off of events # Set new_block_timeout to adjust the expected block time. -@app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25) +@app.on_(USDC.Transfer, start_block=19784367, new_block_timeout=25) # NOTE: Typing isn't required, it will still be an Ape `ContractLog` type def exec_event1(log): if log.log_index % 7 == 3: From 2acfcc031b34130286325ad3bd70c09c0c48bf83 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 15:14:50 -0400 Subject: [PATCH 12/28] fix: do not check `len(ContractEvent)` for performance reasons --- silverback/application.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/silverback/application.py b/silverback/application.py index 87bed0fa..9eaee7f4 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -124,7 +124,10 @@ def broker_task_decorator( def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: labels = {"task_type": str(task_type)} - if container and isinstance(container, ContractEvent): + # NOTE: Do *not* do `if container` because that does a `len(container)` call, + # which for ContractEvent queries *every single log* ever emitted, and really + # we only want to determine if it is not None + if container is not None and isinstance(container, ContractEvent): # Address is almost a certainty if the container is being used as a filter here. if contract_address := getattr(container.contract, "address", None): labels["contract_address"] = contract_address From 1416b3dd53daaa87e1b004ccb4875872060effd9 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 15:15:15 -0400 Subject: [PATCH 13/28] fix: display event signatures as strings --- silverback/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/silverback/application.py b/silverback/application.py index 9eaee7f4..0010e99b 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -131,7 +131,7 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: # Address is almost a certainty if the container is being used as a filter here. if contract_address := getattr(container.contract, "address", None): labels["contract_address"] = contract_address - labels["event_signature"] = container.abi.signature + labels["event_signature"] = f'"{container.abi.signature}"' broker_task = self.broker.register_task( handler, From 152876daa7c75fa7e4563b076c2dd1cfb2ae3edf Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 15:23:24 -0400 Subject: [PATCH 14/28] fix: ensure that task name ends up in labels --- silverback/middlewares.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/silverback/middlewares.py b/silverback/middlewares.py index a57cd6bc..b87f9f0f 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -44,7 +44,9 @@ def fix_dict(data: dict, recurse_count: int = 0) -> dict: return message def _create_label(self, message: TaskiqMessage) -> str: - if labels_str := ",".join(f"{k}={v}" for k, v in message.labels.items()): + if labels_str := ",".join( + f"{k}={v}" for k, v in message.labels.items() if k != "task_name" + ): return f"{message.task_name}[{labels_str}]" else: @@ -55,6 +57,7 @@ def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage: return message # Not a silverback task task_type = message.labels.pop("task_type") + message.labels["task_name"] = message.task_name try: task_type = TaskType(task_type) From f5e5d2faaf58885a99a8534a8fa935f1e19f402e Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 15:45:07 -0400 Subject: [PATCH 15/28] fix: feedback from peer review --- silverback/application.py | 1 + silverback/recorder.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/silverback/application.py b/silverback/application.py index 0010e99b..90dfbc8e 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -131,6 +131,7 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: # Address is almost a certainty if the container is being used as a filter here. if contract_address := getattr(container.contract, "address", None): labels["contract_address"] = contract_address + # NOTE: event signature is a string with spaces/commas, so encapsulate it in quotes labels["event_signature"] = f'"{container.abi.signature}"' broker_task = self.broker.register_task( diff --git a/silverback/recorder.py b/silverback/recorder.py index 1931e54e..7db19b8c 100644 --- a/silverback/recorder.py +++ b/silverback/recorder.py @@ -53,7 +53,6 @@ def _extract_custom_metrics(cls, result: Any, task_name: str) -> dict[str, Datap logger.warning(f"Cannot handle return type of '{task_name}': '{type(result)}'.") return {} - # else: converted_result = {} for metric_name, metric_value in result.items(): From bc9146b92e5b8579dfac5c8fc6a30ad9b173ac8e Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 16:18:03 -0400 Subject: [PATCH 16/28] refactor: shorten name of startup/shutdown tags --- silverback/types.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/silverback/types.py b/silverback/types.py index 1bd870d6..666c84f2 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -6,14 +6,14 @@ from pydantic import BaseModel, Field from pydantic.functional_serializers import PlainSerializer from taskiq import Context, TaskiqDepends, TaskiqState -from typing_extensions import Annotated # Introduced 3.9 +from typing_extensions import Annotated class TaskType(str, Enum): - STARTUP = "silverback_startup" # TODO: Shorten in 0.4.0 + STARTUP = "startup" NEW_BLOCKS = "block" EVENT_LOG = "event" - SHUTDOWN = "silverback_shutdown" # TODO: Shorten in 0.4.0 + SHUTDOWN = "shutdown" def __str__(self) -> str: return self.value From 8ef802e71a9749eacfb88945227f716668581614 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 16:18:28 -0400 Subject: [PATCH 17/28] fix: wrong label selected to pull block number --- silverback/recorder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/silverback/recorder.py b/silverback/recorder.py index 7db19b8c..78d21e93 100644 --- a/silverback/recorder.py +++ b/silverback/recorder.py @@ -74,7 +74,7 @@ def _extract_custom_metrics(cls, result: Any, task_name: str) -> dict[str, Datap def _extract_system_metrics(cls, labels: dict) -> dict: metrics = {} - if block_number := labels.get("number") or labels.get("block"): + if block_number := labels.get("block_number"): metrics["block_number"] = int(block_number) return metrics From 71eda731ae699f4c1dc12e56fa8e271e7fa43ce2 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 16:18:50 -0400 Subject: [PATCH 18/28] refactor: ensure all tasks have task name (not just silverback) --- silverback/middlewares.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/silverback/middlewares.py b/silverback/middlewares.py index b87f9f0f..4b2348e0 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -53,14 +53,14 @@ def _create_label(self, message: TaskiqMessage) -> str: return message.task_name def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage: - if "task_type" not in message.labels: - return message # Not a silverback task - - task_type = message.labels.pop("task_type") + # NOTE: Ensure we always have this, no matter what message.labels["task_name"] = message.task_name + if not (task_type_str := message.labels.pop("task_type")): + return message # Not a silverback task + try: - task_type = TaskType(task_type) + task_type = TaskType(task_type_str) except ValueError: return message # Not a silverback task From 029df436841597cacab8c5cb769e7cc463dc9aca Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 16:19:19 -0400 Subject: [PATCH 19/28] feat: store startup and shutdown result via recorder --- silverback/runner.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/silverback/runner.py b/silverback/runner.py index d0612387..e0a5fe88 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -131,6 +131,11 @@ async def run(self): if any(result.is_err for result in results): # NOTE: Abort before even starting to run raise StartupFailure(*(result.error for result in results if result.is_err)) + + elif self.recorder: + converted_results = map(TaskResult.from_taskiq, results) + await asyncio.gather(*(self.recorder.add_result(r) for r in converted_results)) + # NOTE: No need to handle results otherwise # Create our long-running event listeners @@ -171,6 +176,11 @@ async def run(self): if any(result.is_err for result in results): errors_str = "\n".join(str(result.error) for result in results if result.is_err) logger.error(f"Errors while shutting down:\n{errors_str}") + + elif self.recorder: + converted_results = map(TaskResult.from_taskiq, results) + await asyncio.gather(*(self.recorder.add_result(r) for r in converted_results)) + # NOTE: No need to handle results otherwise await self.app.broker.shutdown() From 8cd2771fbe489604e1314884b207d95688afb583 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 17:03:47 -0400 Subject: [PATCH 20/28] refactor!: move `.identifier` from runner to app --- silverback/application.py | 17 +++++++++++------ silverback/runner.py | 9 +-------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/silverback/application.py b/silverback/application.py index 90dfbc8e..e5ac3c06 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -13,7 +13,7 @@ from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError from .settings import Settings -from .types import TaskType +from .types import SilverbackID, TaskType @dataclass @@ -46,12 +46,16 @@ def __init__(self, settings: Settings | None = None): if not settings: settings = Settings() - self.name = settings.APP_NAME - network = settings.get_provider_context() # NOTE: This allows using connected ape methods e.g. `Contract` provider = network.__enter__() + self.identifier = SilverbackID( + name=settings.APP_NAME, + network=provider.network.name, + ecosystem=provider.network.ecosystem.name, + ) + # Adjust defaults from connection if settings.NEW_BLOCK_TIMEOUT is None and ( provider.network.name.endswith("-fork") or provider.network.name == LOCAL_NETWORK_NAME @@ -72,14 +76,15 @@ def __init__(self, settings: Settings | None = None): self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT self.start_block = settings.START_BLOCK - self.network_choice = f"{provider.network.ecosystem.name}:{provider.network.name}" signer_str = f"\n SIGNER={repr(self.signer)}" start_block_str = f"\n START_BLOCK={self.start_block}" if self.start_block else "" new_block_timeout_str = ( f"\n NEW_BLOCK_TIMEOUT={self.new_block_timeout}" if self.new_block_timeout else "" ) - logger.info( - f'Loaded Silverback App:\n NETWORK="{self.network_choice}"' + + network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}" + logger.success( + f'Loaded Silverback App:\n NETWORK="{network_choice}"' f"{signer_str}{start_block_str}{new_block_timeout_str}" ) diff --git a/silverback/runner.py b/silverback/runner.py index e0a5fe88..0a1c700a 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -31,13 +31,6 @@ def __init__( self.max_exceptions = max_exceptions self.exceptions = 0 - ecosystem_name, network_name = app.network_choice.split(":") - self.identifier = SilverbackID( - name=app.name, - ecosystem=ecosystem_name, - network=network_name, - ) - logger.info(f"Using {self.__class__.__name__}: max_exceptions={self.max_exceptions}") async def _handle_task(self, task: AsyncTaskiqTask): @@ -111,7 +104,7 @@ async def run(self): If there are no configured tasks to execute. """ # Initialize recorder (if available) and fetch state if app has been run previously - if self.recorder and (startup_state := (await self.recorder.init(app_id=self.identifier))): + if self.recorder and (startup_state := (await self.recorder.init(app_id=self.app.identifier))): self.app.state = startup_state else: # use empty state From 9173f535d54520b894c77d7f2a20c1c2d905c27d Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 17:09:15 -0400 Subject: [PATCH 21/28] refactor!: remove WorkerState, suggest using TaskiqState --- example.py | 15 +++++++++------ silverback/__init__.py | 2 -- silverback/types.py | 8 -------- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/example.py b/example.py index 0d814591..ce4b6bee 100644 --- a/example.py +++ b/example.py @@ -1,9 +1,12 @@ +from typing import Annotated + from ape import chain from ape.api import BlockAPI from ape.types import ContractLog from ape_tokens import tokens # type: ignore[import] +from taskiq import Context, TaskiqDepends, TaskiqState -from silverback import CircuitBreaker, SilverbackApp, WorkerState +from silverback import CircuitBreaker, SilverbackApp # Do this first to initialize your app app = SilverbackApp() @@ -28,7 +31,7 @@ def execute(self, query: str): @app.on_worker_startup() -def worker_startup(state: WorkerState): # NOTE: You need the type hint here +def worker_startup(state: TaskiqState): # NOTE: You need the type hint here # NOTE: Can put anything here, any python object works state.db = MyDB() state.block_count = 0 @@ -38,9 +41,9 @@ def worker_startup(state: WorkerState): # NOTE: You need the type hint here # This is how we trigger off of new blocks @app.on_(chain.blocks) # NOTE: The type hint for block is `BlockAPI`, but we parse it using `EcosystemAPI` -# NOTE: If you need something from worker state, you have to use the type hint -def exec_block(block: BlockAPI, state: WorkerState): - state.db.execute(f"some query {block.number}") +# NOTE: If you need something from worker state, you have to use taskiq context +def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): + context.state.db.execute(f"some query {block.number}") return len(block.transactions) @@ -76,6 +79,6 @@ def app_shutdown(): # Just in case you need to release some resources or something inside each worker @app.on_worker_shutdown() -def worker_shutdown(state): +def worker_shutdown(state: TaskiqState): # NOTE: You need the type hint here state.db = None # raise Exception # NOTE: Any exception raised on worker shutdown is ignored diff --git a/silverback/__init__.py b/silverback/__init__.py index 5ee44f1c..56ddad86 100644 --- a/silverback/__init__.py +++ b/silverback/__init__.py @@ -1,10 +1,8 @@ from .application import SilverbackApp from .exceptions import CircuitBreaker, SilverbackException -from .types import WorkerState __all__ = [ "CircuitBreaker", "SilverbackApp", "SilverbackException", - "WorkerState", ] diff --git a/silverback/types.py b/silverback/types.py index 666c84f2..e58afca7 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -5,7 +5,6 @@ from pydantic import BaseModel, Field from pydantic.functional_serializers import PlainSerializer -from taskiq import Context, TaskiqDepends, TaskiqState from typing_extensions import Annotated @@ -52,13 +51,6 @@ class AppState(BaseModel): last_updated: UTCTimestamp = Field(default_factory=utc_now) -def get_worker_state(context: Annotated[Context, TaskiqDepends()]) -> TaskiqState: - return context.state - - -WorkerState = Annotated[TaskiqState, TaskiqDepends(get_worker_state)] - - class _BaseDatapoint(BaseModel): type: str # discriminator From a0e2a3eed1a0722ddef30469bde78be5df174d2c Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 17:13:12 -0400 Subject: [PATCH 22/28] refactor!: move application state from Recorder to new state datastore --- example.py | 6 ++-- silverback/__init__.py | 2 ++ silverback/application.py | 2 -- silverback/recorder.py | 33 +++++---------------- silverback/runner.py | 28 +++++++++++------- silverback/state.py | 62 +++++++++++++++++++++++++++++++++++++++ silverback/types.py | 14 +-------- 7 files changed, 93 insertions(+), 54 deletions(-) create mode 100644 silverback/state.py diff --git a/example.py b/example.py index ce4b6bee..f772e519 100644 --- a/example.py +++ b/example.py @@ -6,7 +6,7 @@ from ape_tokens import tokens # type: ignore[import] from taskiq import Context, TaskiqDepends, TaskiqState -from silverback import CircuitBreaker, SilverbackApp +from silverback import AppState, CircuitBreaker, SilverbackApp # Do this first to initialize your app app = SilverbackApp() @@ -17,11 +17,11 @@ @app.on_startup() -def app_startup(): +def app_startup(startup_state: AppState): # NOTE: This is called just as the app is put into "run" state, # and handled by the first available worker # raise Exception # NOTE: Any exception raised on startup aborts immediately - return {"block_number": app.state.last_block_seen} + return {"block_number": startup_state.last_block_seen} # Can handle some resource initialization for each worker, like LLMs or database connections diff --git a/silverback/__init__.py b/silverback/__init__.py index 56ddad86..43b3c961 100644 --- a/silverback/__init__.py +++ b/silverback/__init__.py @@ -1,7 +1,9 @@ from .application import SilverbackApp from .exceptions import CircuitBreaker, SilverbackException +from .state import AppState __all__ = [ + "AppState", "CircuitBreaker", "SilverbackApp", "SilverbackException", diff --git a/silverback/application.py b/silverback/application.py index e5ac3c06..e80c45a2 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -88,8 +88,6 @@ def __init__(self, settings: Settings | None = None): f"{signer_str}{start_block_str}{new_block_timeout_str}" ) - self.state = None # Runner manages this - def broker_task_decorator( self, task_type: TaskType, diff --git a/silverback/recorder.py b/silverback/recorder.py index 78d21e93..0b8d2018 100644 --- a/silverback/recorder.py +++ b/silverback/recorder.py @@ -8,7 +8,6 @@ from typing_extensions import Self # Introduced 3.11 from .types import ( - AppState, Datapoint, ScalarDatapoint, ScalarType, @@ -96,10 +95,7 @@ def from_taskiq( class BaseRecorder(ABC): """ - Base class used for managing persistent application state, and serializing task results - to an external data recording process. - - NOTE: Persistent state and task results can be managed using two different solutions + Base class used for serializing task results to an external data recording process. Recorders are configured using the following environment variable: @@ -107,17 +103,11 @@ class BaseRecorder(ABC): """ @abstractmethod - async def init(self, app_id: SilverbackID) -> AppState | None: + async def init(self, app_id: SilverbackID): """ Handle any async initialization from Silverback settings (e.g. migrations). - - Returns startup state, if available. """ - @abstractmethod - async def set_state(self, app_state: AppState): - """Set the stored state for a Silverback instance""" - @abstractmethod async def add_result(self, result: TaskResult): """Store a result for a Silverback instance's handler""" @@ -125,9 +115,8 @@ async def add_result(self, result: TaskResult): class JSONLineRecorder(BaseRecorder): """ - Very basic implementation of BaseRecorder used to store application state and handler - result data by storing/retreiving state from a JSON-encoded file, and appending task - results to a file containing newline-separated JSON entries (https://jsonlines.org/). + Very basic implementation of BaseRecorder used to handle results by appending to a file + containing newline-separated JSON entries (https://jsonlines.org/). The file structure that this Recorder uses leverages the value of `SILVERBACK_APP_NAME` as well as the configured network to determine the location where files get saved: @@ -135,7 +124,6 @@ class JSONLineRecorder(BaseRecorder): ./.silverback-sessions/ / / - state.json # always write here session-.json # start time of each app session Each app "session" (everytime the Runner is started up via `silverback run`) is recorded @@ -158,22 +146,14 @@ class JSONLineRecorder(BaseRecorder): - `SILVERBACK_APP_NAME`: Any alphabetical string valid as a folder name """ - async def init(self, app_id: SilverbackID) -> AppState | None: + async def init(self, app_id: SilverbackID): data_folder = ( Path.cwd() / ".silverback-sessions" / app_id.name / app_id.ecosystem / app_id.network ) data_folder.mkdir(parents=True, exist_ok=True) - self.state_backup_file = data_folder / "state.json" self.session_results_file = data_folder / f"session-{iso_format(utc_now())}.jsonl" - return ( - AppState.parse_file(self.state_backup_file) if self.state_backup_file.exists() else None - ) - - async def set_state(self, state: AppState): - self.state_backup_file.write_text(state.model_dump_json()) - async def add_result(self, result: TaskResult): # NOTE: mode `a` means "append to file if exists" # NOTE: JSONNL convention requires the use of `\n` as newline char @@ -183,6 +163,9 @@ async def add_result(self, result: TaskResult): def get_metrics(session: Path, task_name: str) -> Iterator[dict]: + """ + Useful function for fetching results and loading them for display. + """ with open(session, "r") as file: for line in file: if ( diff --git a/silverback/runner.py b/silverback/runner.py index 0a1c700a..72bf9920 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -11,8 +11,9 @@ from .application import SilverbackApp from .exceptions import Halt, NoTasksAvailableError, NoWebsocketAvailableError, StartupFailure from .recorder import BaseRecorder, TaskResult +from .state import AppDatastore, AppState from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .types import AppState, SilverbackID, TaskType +from .types import SilverbackID, TaskType from .utils import async_wrap_iter, hexbytes_dict @@ -27,6 +28,8 @@ def __init__( ): self.app = app self.recorder = recorder + self.state = None + self.datastore = AppDatastore() self.max_exceptions = max_exceptions self.exceptions = 0 @@ -55,23 +58,23 @@ async def _checkpoint( last_block_processed: int | None = None, ): """Set latest checkpoint block number""" - assert self.app.state, f"{self.__class__.__name__}.run() not triggered." + assert self.state, f"{self.__class__.__name__}.run() not triggered." logger.debug( ( - f"Checkpoint block [seen={self.app.state.last_block_seen}, " - f"procssed={self.app.state.last_block_processed}]" + f"Checkpoint block [seen={self.state.last_block_seen}, " + f"procssed={self.state.last_block_processed}]" ) ) if last_block_seen: - self.app.state.last_block_seen = last_block_seen + self.state.last_block_seen = last_block_seen if last_block_processed: - self.app.state.last_block_processed = last_block_processed + self.state.last_block_processed = last_block_processed if self.recorder: try: - await self.recorder.set_state(self.app.state) + await self.datastore.set_state(self.state) except Exception as err: logger.error(f"Error setting state: {err}") @@ -104,18 +107,21 @@ async def run(self): If there are no configured tasks to execute. """ # Initialize recorder (if available) and fetch state if app has been run previously - if self.recorder and (startup_state := (await self.recorder.init(app_id=self.app.identifier))): - self.app.state = startup_state + if self.recorder: + await self.recorder.init(app_id=self.app.identifier) + + if startup_state := (await self.datastore.init(app_id=self.app.identifier)): + self.state = startup_state else: # use empty state - self.app.state = AppState(last_block_seen=-1, last_block_processed=-1) + self.state = AppState(last_block_seen=-1, last_block_processed=-1) # Initialize broker (run worker startup events) await self.app.broker.startup() # Execute Silverback startup task before we init the rest if startup_tasks := await asyncio.gather( - *(task_def.handler.kiq() for task_def in self.app.tasks[TaskType.STARTUP]) + *(task_def.handler.kiq(self.state) for task_def in self.app.tasks[TaskType.STARTUP]) ): results = await asyncio.gather( *(startup_task.wait_result() for startup_task in startup_tasks) diff --git a/silverback/state.py b/silverback/state.py new file mode 100644 index 00000000..36e059a2 --- /dev/null +++ b/silverback/state.py @@ -0,0 +1,62 @@ +from pathlib import Path + +from pydantic import BaseModel, Field + +from .types import SilverbackID, UTCTimestamp, utc_now + + +class AppState(BaseModel): + # Last block number seen by runner + last_block_seen: int + + # Last block number processed by a worker + last_block_processed: int + + # Last time the state was updated + # NOTE: intended to use default when creating a model with this type + last_updated: UTCTimestamp = Field(default_factory=utc_now) + + +class AppDatastore: + """ + Very basic implementation used to store application state and handler result data by + storing/retreiving state from a JSON-encoded file. + + The file structure that this Recorder uses leverages the value of `SILVERBACK_APP_NAME` + as well as the configured network to determine the location where files get saved: + + ./.silverback-sessions/ + / + / + state.json # always write here + + Note that this format can be read by basic means (even in a JS frontend): + + You may also want to give your app a unique name so the data does not get overwritten, + if you are using multiple apps from the same directory: + + - `SILVERBACK_APP_NAME`: Any alphabetical string valid as a folder name + """ + + async def init(self, app_id: SilverbackID) -> AppState | None: + data_folder = ( + Path.cwd() / ".silverback-sessions" / app_id.name / app_id.ecosystem / app_id.network + ) + data_folder.mkdir(parents=True, exist_ok=True) + + self.state_backup_file = data_folder / "state.json" + + return ( + AppState.parse_file(self.state_backup_file) if self.state_backup_file.exists() else None + ) + + async def set_state(self, state: AppState): + if self.state_backup_file.exists(): + old_state = AppState.parse_file(self.state_backup_file) + if old_state.last_block_seen > state.last_block_seen: + state.last_block_seen = old_state.last_block_seen + if old_state.last_block_processed > state.last_block_processed: + state.last_block_processed = old_state.last_block_processed + + state.last_updated = utc_now() + self.state_backup_file.write_text(state.model_dump_json()) diff --git a/silverback/types.py b/silverback/types.py index e58afca7..0844da6c 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -3,7 +3,7 @@ from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+ from typing import Literal -from pydantic import BaseModel, Field +from pydantic import BaseModel from pydantic.functional_serializers import PlainSerializer from typing_extensions import Annotated @@ -39,18 +39,6 @@ def utc_now() -> datetime: ] -class AppState(BaseModel): - # Last block number seen by runner - last_block_seen: int - - # Last block number processed by a worker - last_block_processed: int - - # Last time the state was updated - # NOTE: intended to use default when creating a model with this type - last_updated: UTCTimestamp = Field(default_factory=utc_now) - - class _BaseDatapoint(BaseModel): type: str # discriminator From 143fad2ffdee44441f3ea5993b2c521b0e9a91df Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 17:16:08 -0400 Subject: [PATCH 23/28] fix: revert back to .pop bug fix --- silverback/middlewares.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 4b2348e0..6a755e50 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -56,9 +56,11 @@ def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage: # NOTE: Ensure we always have this, no matter what message.labels["task_name"] = message.task_name - if not (task_type_str := message.labels.pop("task_type")): + if "task_type" not in message.labels: return message # Not a silverback task + task_type_str = message.labels.pop("task_type") + try: task_type = TaskType(task_type_str) except ValueError: From 8ae71df0cd2f9f372166bd0582f36dc9d6cea862 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 17:20:23 -0400 Subject: [PATCH 24/28] fix: move extra quotes for event signatures to middleware --- silverback/application.py | 3 +-- silverback/middlewares.py | 5 ++++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/silverback/application.py b/silverback/application.py index e80c45a2..aeebfd62 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -134,8 +134,7 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: # Address is almost a certainty if the container is being used as a filter here. if contract_address := getattr(container.contract, "address", None): labels["contract_address"] = contract_address - # NOTE: event signature is a string with spaces/commas, so encapsulate it in quotes - labels["event_signature"] = f'"{container.abi.signature}"' + labels["event_signature"] = f"{container.abi.signature}" broker_task = self.broker.register_task( handler, diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 6a755e50..7db17ab5 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -45,7 +45,10 @@ def fix_dict(data: dict, recurse_count: int = 0) -> dict: def _create_label(self, message: TaskiqMessage) -> str: if labels_str := ",".join( - f"{k}={v}" for k, v in message.labels.items() if k != "task_name" + # NOTE: Have to add extra quotes around event signatures so they display as a string + f"{k}={v}" if k != "event_signature" else f'{k}="{v}"' + for k, v in message.labels.items() + if k != "task_name" ): return f"{message.task_name}[{labels_str}]" From b4d7970b784cf212785b5e8d90e261f86fc4931a Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 17:22:37 -0400 Subject: [PATCH 25/28] refactor: rename variable for clarity --- silverback/application.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/silverback/application.py b/silverback/application.py index aeebfd62..81167ff3 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -46,9 +46,9 @@ def __init__(self, settings: Settings | None = None): if not settings: settings = Settings() - network = settings.get_provider_context() + provider_context = settings.get_provider_context() # NOTE: This allows using connected ape methods e.g. `Contract` - provider = network.__enter__() + provider = provider_context.__enter__() self.identifier = SilverbackID( name=settings.APP_NAME, @@ -70,7 +70,7 @@ def __init__(self, settings: Settings | None = None): self.tasks: defaultdict[TaskType, list[TaskData]] = defaultdict(list) self.poll_settings: dict[str, dict] = {} - atexit.register(network.__exit__, None, None, None) + atexit.register(provider_context.__exit__, None, None, None) self.signer = settings.get_signer() self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT From b6468bfd2d3c463a9946d03fcbb02f382edb1a09 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 17:26:46 -0400 Subject: [PATCH 26/28] fix: unused import --- silverback/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/silverback/runner.py b/silverback/runner.py index 72bf9920..d571a574 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -13,7 +13,7 @@ from .recorder import BaseRecorder, TaskResult from .state import AppDatastore, AppState from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .types import SilverbackID, TaskType +from .types import TaskType from .utils import async_wrap_iter, hexbytes_dict From f8d37139dee7c3c860f19792f3caa52acdba6e5a Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Thu, 2 May 2024 17:29:55 -0400 Subject: [PATCH 27/28] fix: was using app.state on shutdown --- example.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example.py b/example.py index f772e519..23039a7b 100644 --- a/example.py +++ b/example.py @@ -74,7 +74,7 @@ async def exec_event2(log: ContractLog): @app.on_shutdown() def app_shutdown(): # raise Exception # NOTE: Any exception raised on shutdown is ignored - return {"block_number": app.state.last_block_processed} + return {"some_metric": 123} # Just in case you need to release some resources or something inside each worker From 17302cb8a4e09b1232c41e7142fada0f1bf8f301 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Tue, 16 Apr 2024 23:28:18 -0400 Subject: [PATCH 28/28] fix: constrain integer values to support maximum parquet type --- silverback/types.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/silverback/types.py b/silverback/types.py index 0844da6c..6448b72c 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -3,7 +3,7 @@ from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+ from typing import Literal -from pydantic import BaseModel +from pydantic import BaseModel, Field from pydantic.functional_serializers import PlainSerializer from typing_extensions import Annotated @@ -43,8 +43,10 @@ class _BaseDatapoint(BaseModel): type: str # discriminator +# NOTE: Maximum supported parquet integer type: https://parquet.apache.org/docs/file-format/types +Int96 = Annotated[int, Field(ge=-(2**95), le=2**95 - 1)] # NOTE: only these types of data are implicitly converted e.g. `{"something": 1, "else": 0.001}` -ScalarType = bool | int | float | Decimal +ScalarType = bool | Int96 | float | Decimal class ScalarDatapoint(_BaseDatapoint):