Skip to content

Commit

Permalink
merge with main and resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Xm0onh committed Feb 5, 2025
2 parents d085515 + d5e140a commit d5c36c2
Show file tree
Hide file tree
Showing 48 changed files with 1,680 additions and 534 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
characters/*
!characters/character.example/
.cookies/
.google/
memories/
!src/config/
dsn-twitter-schemas.json
agent-memory-viewer/backend/src/config/agents.yaml

# Python bytecode
__pycache__/
Expand Down
1 change: 0 additions & 1 deletion agent-memory-viewer/backend/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ DATABASE_URL=postgresql://username:password@localhost:5432/
CORS_ORIGIN=http://localhost:3002,http://localhost:3000

# Blockchain Configuration
CONTRACT_ADDRESS=0x0000000000000000000000000000000000000000
RPC_URL=https://auto-evm.taurus.autonomys.xyz/ws
WS_RPC_URL=wss://auto-evm.taurus.autonomys.xyz/ws

Expand Down
1 change: 1 addition & 0 deletions agent-memory-viewer/backend/src/config/agents.example.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
contractAddress: "<contract-address>"
agents:
- username: "0xargumint"
address: "0x518EbE66287140e9378b9F8D00797291A8dfc2bc"
Expand Down
8 changes: 8 additions & 0 deletions agent-memory-viewer/backend/src/config/agents.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
contractAddress: "0xC1afEbE677baDb71FC760e61479227e43B422E48"
agents:
- username: "0xargumint"
address: "0x518EbE66287140e9378b9F8D00797291A8dfc2bc"
- username: "0xagreemint"
address: "0x53Dd4b9627eb9691D62B90dDD987a8c8DFC99a12"
- username: "hindsight2157"
address: "0xf9c4db01387701c7fA19B5bEbd6716b4F9E66aBA"
15 changes: 11 additions & 4 deletions agent-memory-viewer/backend/src/config/env.validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ const agentConfigSchema = z.object({
address: z.string().regex(/^0x[a-fA-F0-9]{40}$/),
});

const agentsYamlSchema = z.object({
agents: z.array(agentConfigSchema),
contractAddress: z.string().regex(/^0x[a-fA-F0-9]{40}$/),
});

const envSchema = z.object({
NODE_ENV: z.enum(['development', 'production', 'test']).default('development'),
PORT: z.string().transform(Number).default('3010'),
Expand All @@ -37,13 +42,14 @@ const envSchema = z.object({

export type EnvConfig = z.infer<typeof envSchema>;
export type AgentConfig = z.infer<typeof agentConfigSchema>;
export type YamlConfig = z.infer<typeof agentsYamlSchema>;

function loadAgentsConfig(): AgentConfig[] {
function loadAgentsConfig(): { agents: AgentConfig[]; contractAddress: string } {
try {
const configPath = path.join(__dirname, './agents.yaml');
const fileContents = fs.readFileSync(configPath, 'utf8');
const config = yaml.load(fileContents) as { agents: AgentConfig[] };
return config.agents;
const config = yaml.load(fileContents) as YamlConfig;
return agentsYamlSchema.parse(config);
} catch (error) {
console.error('Failed to load agents config:', error);
throw new Error('Failed to load agents configuration');
Expand All @@ -52,11 +58,12 @@ function loadAgentsConfig(): AgentConfig[] {

export function validateEnv(): EnvConfig {
try {
const agents = loadAgentsConfig();
const { agents, contractAddress } = loadAgentsConfig();

return envSchema.parse({
...process.env,
AGENTS: agents,
CONTRACT_ADDRESS: contractAddress,
});
} catch (error) {
if (error instanceof z.ZodError) {
Expand Down
261 changes: 233 additions & 28 deletions agent-memory-viewer/backend/src/utils/agentMemoryContract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,251 @@ import { createLogger } from './logger.js';
const logger = createLogger('agentMemoryContract');

const CONTRACT_ADDRESS = config.CONTRACT_ADDRESS as `0x${string}`;
const wsProvider = new ethers.WebSocketProvider(config.WS_RPC_URL);
const wallet = ethers.Wallet.createRandom(wsProvider);
const contract = new Contract(CONTRACT_ADDRESS, MEMORY_ABI, wallet);
const RECONNECT_DELAY = 5000;

function hashToCid(hash: Uint8Array): string {
const cid = cidFromBlakeHash(Buffer.from(hash));
return cid.toString();
}
class AgentWatcher {
private provider!: ethers.WebSocketProvider;
private contract!: Contract;
private isWatching: boolean = false;
private retryCount: number = 0;
private reconnectTimeout?: NodeJS.Timeout;
private healthCheckInterval?: NodeJS.Timeout;
private processingQueue: Array<{
agent: string;
hash: string;
retryCount: number;
nextRetry?: Date;
}> = [];
private isProcessing: boolean = false;
private readonly MAX_PROCESSING_RETRIES = 3;
private readonly RETRY_DELAY = 5000;
private readonly HEALTH_CHECK_INTERVAL = 30000;

export async function getLastMemoryHash(agentAddress: string): Promise<string> {
const hash = await contract.getLastMemoryHash(agentAddress);
return hashToCid(ethers.getBytes(hash));
}
constructor(
private readonly agentAddress: string,
private readonly agentName: string,
private readonly processMemory: (agent: string, cid: string) => Promise<void>,
) {
this.initializeConnection();
}

private async initializeConnection() {
this.provider = new ethers.WebSocketProvider(config.WS_RPC_URL);
const wallet = ethers.Wallet.createRandom(this.provider);
this.contract = new Contract(CONTRACT_ADDRESS, MEMORY_ABI, wallet);

this.provider.websocket.onerror = this.handleConnectionError.bind(this);
this.startHealthCheck();
}

private startHealthCheck() {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
}

this.healthCheckInterval = setInterval(async () => {
try {
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('Health check timeout')), 5000);
});

const healthCheckPromise = this.contract.getLastMemoryHash(this.agentAddress);

await Promise.race([healthCheckPromise, timeoutPromise]);
logger.info(`Health check passed for agent ${this.agentName}`);
} catch (error) {
logger.warn(`Health check failed for agent ${this.agentName}, reconnecting...`, {
error: error instanceof Error ? error.message : String(error)
});
this.handleConnectionError(error as Error);
}
}, this.HEALTH_CHECK_INTERVAL);
}

export async function watchMemoryHashUpdates(callback: (agent: string, cid: string) => void) {
const agentAddresses = config.AGENTS.map(a => a.address.toLowerCase());
const eventName = 'LastMemoryHashSet';
private handleConnectionError(error: Error): void {
logger.error(`WebSocket error for agent ${this.agentName}:`, error);
this.reconnect();
}

logger.info('Setting up memory hash watcher with WebSocketProvider', {
agentAddresses,
contractAddress: config.CONTRACT_ADDRESS,
});
private async reconnect(): Promise<void> {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}

// Cap the retry count to prevent excessive delays while still maintaining exponential backoff
const MAX_RETRY_COUNT = 10;
const currentRetryCount = Math.min(this.retryCount, MAX_RETRY_COUNT);
const delay = Math.min(
RECONNECT_DELAY * Math.pow(2, currentRetryCount),
60000 // Max 1 minute delay
);

this.retryCount++;

logger.info(`Attempting to reconnect for agent ${this.agentName} (attempt ${this.retryCount}, delay: ${delay}ms)`);

this.reconnectTimeout = setTimeout(async () => {
try {
await this.stop();

// Clear any existing state
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = undefined;
}

// Reinitialize everything
await this.initializeConnection();
await this.start();

this.retryCount = 0; // Reset retry count on successful reconnection
logger.info(`Successfully reconnected for agent ${this.agentName}`);
} catch (error) {
logger.error(`Reconnection failed for agent ${this.agentName}:`, error);
// Ensure we're in a clean state before trying again
await this.stop().catch(stopError =>
logger.error(`Error stopping during reconnection for ${this.agentName}:`, stopError)
);
this.reconnect(); // Schedule next reconnection attempt
}
}, delay);
}

private async processQueue(): Promise<void> {
if (this.isProcessing || this.processingQueue.length === 0) return;

this.isProcessing = true;
const now = new Date();

const listener = (agent: string, hash: string) => {
try {
const cid = hashToCid(ethers.getBytes(hash));
if (agentAddresses.includes(agent.toLowerCase())) {
callback(agent, cid);
for (let i = 0; i < this.processingQueue.length; i++) {
const item = this.processingQueue[i];

if (item.nextRetry && item.nextRetry > now) {
continue;
}

try {
const cid = hashToCid(ethers.getBytes(item.hash));
await this.processMemory(item.agent, cid);
this.processingQueue.splice(i, 1);
i--;
logger.info(`Processed memory for agent ${this.agentName}`, { cid });
} catch (error) {
item.retryCount++;
if (item.retryCount >= this.MAX_PROCESSING_RETRIES) {
logger.error(
`Failed to process memory after ${this.MAX_PROCESSING_RETRIES} attempts, dropping`,
{
agent: this.agentName,
error,
},
);
this.processingQueue.splice(i, 1);
i--;
} else {
item.nextRetry = new Date(
now.getTime() + this.RETRY_DELAY * Math.pow(2, item.retryCount),
);
logger.warn(`Failed to process memory, will retry later`, {
agent: this.agentName,
retryCount: item.retryCount,
nextRetry: item.nextRetry,
error,
});
}
}
}
} finally {
this.isProcessing = false;
if (this.processingQueue.length > 0) {
setTimeout(() => this.processQueue(), 1000);
}
}
}

async start(): Promise<void> {
if (this.isWatching) return;

try {
const eventName = 'LastMemoryHashSet';

const listener = (agent: string, hash: string) => {
if (agent.toLowerCase() === this.agentAddress.toLowerCase()) {
this.processingQueue.push({
agent,
hash,
retryCount: 0,
});
this.processQueue();
}
};

this.contract.on(eventName, listener);
this.isWatching = true;
logger.info(`Started watching memory updates for agent ${this.agentName}`);
} catch (error) {
logger.error('Error processing event data', { error });
logger.error(`Failed to start watcher for agent ${this.agentName}:`, error);
throw error;
}
};
}

contract.on(eventName, listener);
async stop(): Promise<void> {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
}
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}
if (!this.isWatching) return;

logger.info(`Listening for ${eventName} events over WebSocket.`);
try {
this.contract.removeAllListeners();
await this.provider.destroy();
this.isWatching = false;
logger.info(`Stopped watching memory updates for agent ${this.agentName}`);
} catch (error) {
logger.error(`Error stopping watcher for agent ${this.agentName}:`, error);
}
}
}

function hashToCid(hash: Uint8Array): string {
const cid = cidFromBlakeHash(Buffer.from(hash));
return cid.toString();
}

export async function watchMemoryHashUpdates(
callback: (agent: string, cid: string) => Promise<void>,
): Promise<() => Promise<void>> {
const watchers = new Map<string, AgentWatcher>();

// Create a watcher for each agent
for (const agent of config.AGENTS) {
const watcher = new AgentWatcher(agent.address, agent.username, callback);
await watcher.start();
watchers.set(agent.address, watcher);
}

// Return cleanup function
return async () => {
contract.off(eventName, listener);
logger.info('Memory hash watcher stopped.');
for (const watcher of watchers.values()) {
await watcher.stop();
}
watchers.clear();
logger.info('All memory watchers stopped');
};
}

export async function getLastMemoryHash(agentAddress: string): Promise<string> {
const provider = new ethers.WebSocketProvider(config.WS_RPC_URL);
const wallet = ethers.Wallet.createRandom(provider);
const contract = new Contract(CONTRACT_ADDRESS, MEMORY_ABI, wallet);

try {
const hash = await contract.getLastMemoryHash(agentAddress);
return hashToCid(ethers.getBytes(hash));
} finally {
await provider.destroy();
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit d5c36c2

Please sign in to comment.