Skip to content

Commit

Permalink
refactor: enhance main core functionality with improved memory monito…
Browse files Browse the repository at this point in the history
…ring and component health checks
  • Loading branch information
John0n1 committed Feb 2, 2025
1 parent f494767 commit b8643dc
Showing 1 changed file with 102 additions and 76 deletions.
178 changes: 102 additions & 76 deletions python/main_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,31 +66,45 @@ def format(self, record):

logger = logger.getLogger(__name__)

MEMORY_CHECK_INTERVAL = 300
# Constants
MEMORY_CHECK_INTERVAL = 300 # seconds
MIN_ETH_BALANCE = 0.01 # minimum ETH balance threshold
PROVIDER_TIMEOUT = 10 # seconds
WEB3_MAX_RETRIES = 3
WEB3_RETRY_DELAY = 2

# Custom types
ComponentType = Union[
API_Config,
Nonce_Core,
Safety_Net,
Market_Monitor,
Mempool_Monitor,
Transaction_Core,
Strategy_Net
]

class Main_Core:
"""
Builds and manages the entire MEV bot, initializing all components,
managing connections, and orchestrating the main execution loop.
"""

WEB3_MAX_RETRIES: int = 3
WEB3_RETRY_DELAY: int = 2

def __init__(self, configuration: "Configuration") -> None:
"""
Initialize the main application core.
Args:
configuration: Configuration object containing settings.
"""
# Take first memory snapshot after initialization
self.memory_snapshot: tracemalloc.Snapshot = tracemalloc.take_snapshot()
self.configuration: "Configuration" = configuration
tracemalloc.start()
self.memory_snapshot = tracemalloc.take_snapshot()
self.configuration = configuration
self.web3: Optional[AsyncWeb3] = None
self.account: Optional[Account] = None
self.running: bool = False
self.components: Dict[str, Any] = {
self._shutdown_event: asyncio.Event = asyncio.Event()
self.components: Dict[str, Optional[ComponentType]] = {
'api_config': None,
'nonce_core': None,
'safety_net': None,
Expand All @@ -99,7 +113,8 @@ def __init__(self, configuration: "Configuration") -> None:
'transaction_core': None,
'strategy_net': None,
}
logger.info("initializing 0xBuilder...")
self._component_health: Dict[str, bool] = {name: False for name in self.components}
logger.info("Initializing 0xBuilder...")

async def _initialize_components(self) -> None:
"""Initialize all components in the correct dependency order."""
Expand Down Expand Up @@ -346,19 +361,20 @@ async def _add_middleware(self, web3: AsyncWeb3) -> None:
raise

async def _check_account_balance(self) -> None:
"""Check the Ethereum account balance."""
"""Check account balance with improved error handling."""
if not self.account or not self.web3:
raise ValueError("Account or Web3 not initialized")

try:
if not self.account:
raise ValueError("Account not initialized")

balance = await self.web3.eth.get_balance(self.account.address)
balance_eth = self.web3.from_wei(balance, 'ether')

logger.debug(f"Account {self.account.address} initialized ")
logger.info(f"Account {self.account.address[:8]}...{self.account.address[-6:]}")
logger.info(f"Balance: {balance_eth:.4f} ETH")

if balance_eth < 0.01:
logger.warning(f"Low account balance (<0.01 ETH)")
if balance_eth < MIN_ETH_BALANCE:
logger.warning(f"Critical: Low account balance ({balance_eth:.4f} ETH)")
# Could add automatic safety measures here

except Exception as e:
logger.error(f"Balance check failed: {e}")
Expand Down Expand Up @@ -387,104 +403,114 @@ async def _load_abi(self, abi_path: str, abi_registry: "ABI_Registry") -> List[D
raise

async def run(self) -> None:
"""Main execution loop with task management."""
"""Enhanced main execution loop with component health monitoring."""
logger.info("Starting 0xBuilder...")
self.running = True
initial_snapshot = tracemalloc.take_snapshot()

try:
if not self.components['mempool_monitor']:
raise RuntimeError("Mempool monitor not properly initialized")

# Take initial memory snapshot
initial_snapshot = tracemalloc.take_snapshot()
last_memory_check = time.time()
MEMORY_CHECK_INTERVAL = 300

# Create task groups for different operations
async with asyncio.TaskGroup() as tg:
# Start monitoring task
monitoring_task = tg.create_task(
self.components['mempool_monitor'].start_monitoring()
)

# Start processing task
processing_task = tg.create_task(
self._process_profitable_transactions()
)

# Start memory monitoring task
memory_task = tg.create_task(
self._monitor_memory(initial_snapshot)
)
tg.create_task(self.components['mempool_monitor'].start_monitoring())
tg.create_task(self._process_profitable_transactions())
tg.create_task(self._monitor_memory(initial_snapshot))
tg.create_task(self._check_component_health())

except* asyncio.CancelledError:
logger.info("Tasks cancelled during shutdown")
logger.info("Graceful shutdown initiated")
except* Exception as e:
logger.error(f"Fatal error in run loop: {e}")
finally:
await self.stop()

async def _monitor_memory(self, initial_snapshot: tracemalloc.Snapshot) -> None:
"""Separate task for memory monitoring."""
"""Enhanced memory monitoring with leak detection."""
last_snapshot = initial_snapshot
while self.running:
try:
current_snapshot = tracemalloc.take_snapshot()
top_stats = current_snapshot.compare_to(initial_snapshot, 'lineno')

logger.debug("Memory allocation changes:")
for stat in top_stats[:3]:
logger.debug(str(stat))
diff_stats = current_snapshot.compare_to(last_snapshot, 'lineno')

await asyncio.sleep(MEMORY_CHECK_INTERVAL) # Check every 5 minutes
# Log significant memory changes
significant_changes = [stat for stat in diff_stats if abs(stat.size_diff) > 1024 * 1024] # > 1MB
if significant_changes:
logger.warning("Significant memory changes detected:")
for stat in significant_changes[:3]:
logger.warning(str(stat))

last_snapshot = current_snapshot
await asyncio.sleep(MEMORY_CHECK_INTERVAL)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in memory monitoring: {e}")

logger.error(f"Memory monitoring error: {e}")

async def _check_component_health(self) -> None:
"""Periodic health check of all components."""
while self.running:
try:
for name, component in self.components.items():
if component and hasattr(component, 'is_healthy'):
self._component_health[name] = await component.is_healthy()
else:
self._component_health[name] = component is not None

if not all(self._component_health.values()):
unhealthy = [name for name, healthy in self._component_health.items() if not healthy]
logger.warning(f"Unhealthy components detected: {unhealthy}")

await asyncio.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Health check error: {e}")
await asyncio.sleep(5) # Back off on error

async def stop(self) -> None:
"""Gracefully stop all components in the correct order."""
logger.warning("Shutting down Core...")
"""Enhanced graceful shutdown."""
if not self.running:
return

logger.warning("Initiating graceful shutdown...")
self.running = False
self._shutdown_event.set()

try:
shutdown_order = [
'mempool_monitor', # Stop monitoring first
'strategy_net', # Stop strategies
'transaction_core', # Stop transactions
'market_monitor', # Stop market monitoring
'safety_net', # Stop safety checks
'nonce_core', # Stop nonce management
'api_config' # Stop API connections last
]

# Stop components in parallel where possible
# Stop components in parallel with timeout
stop_tasks = []
for component_name in shutdown_order:
component = self.components.get(component_name)
for name, component in self.components.items():
if component and hasattr(component, 'stop'):
stop_tasks.append(self._stop_component(component_name, component))
task = asyncio.create_task(self._stop_component(name, component))
stop_tasks.append(task)

if stop_tasks:
await asyncio.gather(*stop_tasks, return_exceptions=True)
done, pending = await asyncio.wait(stop_tasks, timeout=10)
if pending:
logger.warning(f"Forcing {len(pending)} component(s) to stop")
for task in pending:
task.cancel()

# Clean up web3 connection
# Cleanup web3
if self.web3 and hasattr(self.web3.provider, 'disconnect'):
await self.web3.provider.disconnect()

# Final memory snapshot

# Final memory report
self._log_final_memory_stats()

except Exception as e:
logger.error(f"Error during shutdown: {e}")
finally:
tracemalloc.stop()
logger.info("Shutdown complete")

def _log_final_memory_stats(self) -> None:
"""Log final memory statistics."""
try:
final_snapshot = tracemalloc.take_snapshot()
top_stats = final_snapshot.compare_to(self.memory_snapshot, 'lineno')

logger.debug("Final memory allocation changes:")
for stat in top_stats[:5]:
logger.debug(str(stat))

logger.debug("Core shutdown complete.")

except Exception as e:
logger.error(f"Error during shutdown: {e}")
finally:
tracemalloc.stop()
logger.error(f"Error logging final memory stats: {e}")

async def _stop_component(self, name: str, component: Any) -> None:
"""Stop a single component with error handling."""
Expand Down

0 comments on commit b8643dc

Please sign in to comment.