From b8643dc3815e7f1a6467b867fa2458527f88afb3 Mon Sep 17 00:00:00 2001 From: John0n1 Date: Sun, 2 Feb 2025 02:17:57 +0100 Subject: [PATCH] refactor: enhance main core functionality with improved memory monitoring and component health checks --- python/main_core.py | 178 +++++++++++++++++++++++++------------------- 1 file changed, 102 insertions(+), 76 deletions(-) diff --git a/python/main_core.py b/python/main_core.py index 98e5b49..aa49573 100644 --- a/python/main_core.py +++ b/python/main_core.py @@ -66,7 +66,23 @@ def format(self, record): logger = logger.getLogger(__name__) -MEMORY_CHECK_INTERVAL = 300 +# Constants +MEMORY_CHECK_INTERVAL = 300 # seconds +MIN_ETH_BALANCE = 0.01 # minimum ETH balance threshold +PROVIDER_TIMEOUT = 10 # seconds +WEB3_MAX_RETRIES = 3 +WEB3_RETRY_DELAY = 2 + +# Custom types +ComponentType = Union[ + API_Config, + Nonce_Core, + Safety_Net, + Market_Monitor, + Mempool_Monitor, + Transaction_Core, + Strategy_Net +] class Main_Core: """ @@ -74,9 +90,6 @@ class Main_Core: managing connections, and orchestrating the main execution loop. """ - WEB3_MAX_RETRIES: int = 3 - WEB3_RETRY_DELAY: int = 2 - def __init__(self, configuration: "Configuration") -> None: """ Initialize the main application core. @@ -84,13 +97,14 @@ def __init__(self, configuration: "Configuration") -> None: Args: configuration: Configuration object containing settings. """ - # Take first memory snapshot after initialization - self.memory_snapshot: tracemalloc.Snapshot = tracemalloc.take_snapshot() - self.configuration: "Configuration" = configuration + tracemalloc.start() + self.memory_snapshot = tracemalloc.take_snapshot() + self.configuration = configuration self.web3: Optional[AsyncWeb3] = None self.account: Optional[Account] = None self.running: bool = False - self.components: Dict[str, Any] = { + self._shutdown_event: asyncio.Event = asyncio.Event() + self.components: Dict[str, Optional[ComponentType]] = { 'api_config': None, 'nonce_core': None, 'safety_net': None, @@ -99,7 +113,8 @@ def __init__(self, configuration: "Configuration") -> None: 'transaction_core': None, 'strategy_net': None, } - logger.info("initializing 0xBuilder...") + self._component_health: Dict[str, bool] = {name: False for name in self.components} + logger.info("Initializing 0xBuilder...") async def _initialize_components(self) -> None: """Initialize all components in the correct dependency order.""" @@ -346,19 +361,20 @@ async def _add_middleware(self, web3: AsyncWeb3) -> None: raise async def _check_account_balance(self) -> None: - """Check the Ethereum account balance.""" + """Check account balance with improved error handling.""" + if not self.account or not self.web3: + raise ValueError("Account or Web3 not initialized") + try: - if not self.account: - raise ValueError("Account not initialized") - balance = await self.web3.eth.get_balance(self.account.address) balance_eth = self.web3.from_wei(balance, 'ether') - logger.debug(f"Account {self.account.address} initialized ") + logger.info(f"Account {self.account.address[:8]}...{self.account.address[-6:]}") logger.info(f"Balance: {balance_eth:.4f} ETH") - if balance_eth < 0.01: - logger.warning(f"Low account balance (<0.01 ETH)") + if balance_eth < MIN_ETH_BALANCE: + logger.warning(f"Critical: Low account balance ({balance_eth:.4f} ETH)") + # Could add automatic safety measures here except Exception as e: logger.error(f"Balance check failed: {e}") @@ -387,104 +403,114 @@ async def _load_abi(self, abi_path: str, abi_registry: "ABI_Registry") -> List[D raise async def run(self) -> None: - """Main execution loop with task management.""" + """Enhanced main execution loop with component health monitoring.""" logger.info("Starting 0xBuilder...") self.running = True + initial_snapshot = tracemalloc.take_snapshot() try: - if not self.components['mempool_monitor']: - raise RuntimeError("Mempool monitor not properly initialized") - - # Take initial memory snapshot - initial_snapshot = tracemalloc.take_snapshot() - last_memory_check = time.time() - MEMORY_CHECK_INTERVAL = 300 - - # Create task groups for different operations async with asyncio.TaskGroup() as tg: - # Start monitoring task - monitoring_task = tg.create_task( - self.components['mempool_monitor'].start_monitoring() - ) - - # Start processing task - processing_task = tg.create_task( - self._process_profitable_transactions() - ) - - # Start memory monitoring task - memory_task = tg.create_task( - self._monitor_memory(initial_snapshot) - ) + tg.create_task(self.components['mempool_monitor'].start_monitoring()) + tg.create_task(self._process_profitable_transactions()) + tg.create_task(self._monitor_memory(initial_snapshot)) + tg.create_task(self._check_component_health()) except* asyncio.CancelledError: - logger.info("Tasks cancelled during shutdown") + logger.info("Graceful shutdown initiated") except* Exception as e: logger.error(f"Fatal error in run loop: {e}") finally: await self.stop() async def _monitor_memory(self, initial_snapshot: tracemalloc.Snapshot) -> None: - """Separate task for memory monitoring.""" + """Enhanced memory monitoring with leak detection.""" + last_snapshot = initial_snapshot while self.running: try: current_snapshot = tracemalloc.take_snapshot() - top_stats = current_snapshot.compare_to(initial_snapshot, 'lineno') - - logger.debug("Memory allocation changes:") - for stat in top_stats[:3]: - logger.debug(str(stat)) + diff_stats = current_snapshot.compare_to(last_snapshot, 'lineno') - await asyncio.sleep(MEMORY_CHECK_INTERVAL) # Check every 5 minutes + # Log significant memory changes + significant_changes = [stat for stat in diff_stats if abs(stat.size_diff) > 1024 * 1024] # > 1MB + if significant_changes: + logger.warning("Significant memory changes detected:") + for stat in significant_changes[:3]: + logger.warning(str(stat)) + + last_snapshot = current_snapshot + await asyncio.sleep(MEMORY_CHECK_INTERVAL) except asyncio.CancelledError: break except Exception as e: - logger.error(f"Error in memory monitoring: {e}") - + logger.error(f"Memory monitoring error: {e}") + + async def _check_component_health(self) -> None: + """Periodic health check of all components.""" + while self.running: + try: + for name, component in self.components.items(): + if component and hasattr(component, 'is_healthy'): + self._component_health[name] = await component.is_healthy() + else: + self._component_health[name] = component is not None + + if not all(self._component_health.values()): + unhealthy = [name for name, healthy in self._component_health.items() if not healthy] + logger.warning(f"Unhealthy components detected: {unhealthy}") + + await asyncio.sleep(60) # Check every minute + except Exception as e: + logger.error(f"Health check error: {e}") + await asyncio.sleep(5) # Back off on error + async def stop(self) -> None: - """Gracefully stop all components in the correct order.""" - logger.warning("Shutting down Core...") + """Enhanced graceful shutdown.""" + if not self.running: + return + + logger.warning("Initiating graceful shutdown...") self.running = False + self._shutdown_event.set() try: - shutdown_order = [ - 'mempool_monitor', # Stop monitoring first - 'strategy_net', # Stop strategies - 'transaction_core', # Stop transactions - 'market_monitor', # Stop market monitoring - 'safety_net', # Stop safety checks - 'nonce_core', # Stop nonce management - 'api_config' # Stop API connections last - ] - - # Stop components in parallel where possible + # Stop components in parallel with timeout stop_tasks = [] - for component_name in shutdown_order: - component = self.components.get(component_name) + for name, component in self.components.items(): if component and hasattr(component, 'stop'): - stop_tasks.append(self._stop_component(component_name, component)) + task = asyncio.create_task(self._stop_component(name, component)) + stop_tasks.append(task) if stop_tasks: - await asyncio.gather(*stop_tasks, return_exceptions=True) + done, pending = await asyncio.wait(stop_tasks, timeout=10) + if pending: + logger.warning(f"Forcing {len(pending)} component(s) to stop") + for task in pending: + task.cancel() - # Clean up web3 connection + # Cleanup web3 if self.web3 and hasattr(self.web3.provider, 'disconnect'): await self.web3.provider.disconnect() - - # Final memory snapshot + + # Final memory report + self._log_final_memory_stats() + + except Exception as e: + logger.error(f"Error during shutdown: {e}") + finally: + tracemalloc.stop() + logger.info("Shutdown complete") + + def _log_final_memory_stats(self) -> None: + """Log final memory statistics.""" + try: final_snapshot = tracemalloc.take_snapshot() top_stats = final_snapshot.compare_to(self.memory_snapshot, 'lineno') logger.debug("Final memory allocation changes:") for stat in top_stats[:5]: logger.debug(str(stat)) - - logger.debug("Core shutdown complete.") - except Exception as e: - logger.error(f"Error during shutdown: {e}") - finally: - tracemalloc.stop() + logger.error(f"Error logging final memory stats: {e}") async def _stop_component(self, name: str, component: Any) -> None: """Stop a single component with error handling."""