diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 1862daa6bea..b4d79ec20a3 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -100,6 +100,7 @@ export type EnvVar = | 'P2P_TX_PROTOCOL' | 'P2P_UDP_ANNOUNCE_ADDR' | 'P2P_UDP_LISTEN_ADDR' + | 'P2P_ARCHIVED_TX_LIMIT' | 'PEER_ID_PRIVATE_KEY' | 'PROVER_BLOB_SINK_URL' | 'PROOF_VERIFIER_L1_START_BLOCK' diff --git a/yarn-project/p2p/src/client/factory.ts b/yarn-project/p2p/src/client/factory.ts index e29c4b95206..954b584a91e 100644 --- a/yarn-project/p2p/src/client/factory.ts +++ b/yarn-project/p2p/src/client/factory.ts @@ -44,9 +44,10 @@ export const createP2PClient = async ( let config = { ..._config }; const logger = createLogger('p2p'); const store = deps.store ?? (await createStore('p2p', config, createLogger('p2p:lmdb'))); + const archive = await createStore('p2p-archive', config, createLogger('p2p-archive:lmdb')); const mempools: MemPools = { - txPool: deps.txPool ?? new AztecKVTxPool(store, telemetry), + txPool: deps.txPool ?? new AztecKVTxPool(store, archive, telemetry, config.archivedTxLimit), epochProofQuotePool: deps.epochProofQuotePool ?? new MemoryEpochProofQuotePool(telemetry), attestationPool: clientType === P2PClientType.Full diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 44010f75b07..de8b85cb2b8 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -135,6 +135,13 @@ export type P2P = P2PApi & { */ getTxByHash(txHash: TxHash): Promise; + /** + * Returns an archived transaction from the transaction pool by its hash. + * @param txHash - Hash of tx to return. + * @returns A single tx or undefined. + */ + getArchivedTxByHash(txHash: TxHash): Promise; + /** * Returns whether the given tx hash is flagged as pending or mined. * @param txHash - Hash of the tx to query. @@ -523,6 +530,15 @@ export class P2PClient return this.requestTxByHash(txHash); } + /** + * Returns an archived transaction in the transaction pool by its hash. + * @param txHash - Hash of the archived transaction to look for. + * @returns A single tx or undefined. + */ + getArchivedTxByHash(txHash: TxHash): Promise { + return Promise.resolve(this.txPool.getArchivedTxByHash(txHash)); + } + /** * Verifies the 'tx' and, if valid, adds it to local tx pool and forwards it to other peers. * @param tx - The tx to verify. diff --git a/yarn-project/p2p/src/config.ts b/yarn-project/p2p/src/config.ts index 5f5ea19258a..1835dc946a2 100644 --- a/yarn-project/p2p/src/config.ts +++ b/yarn-project/p2p/src/config.ts @@ -154,6 +154,9 @@ export interface P2PConfig extends P2PReqRespConfig { * The chain id of the L1 chain. */ l1ChainId: number; + + /** Limit of transactions to archive in the tx pool. Once the archived tx limit is reached, the oldest archived txs will be purged. */ + archivedTxLimit: number; } export const p2pConfigMappings: ConfigMappingsType = { @@ -305,6 +308,12 @@ export const p2pConfigMappings: ConfigMappingsType = { description: 'The number of blocks to fetch in a single batch.', ...numberConfigHelper(20), }, + archivedTxLimit: { + env: 'P2P_ARCHIVED_TX_LIMIT', + description: + 'The number of transactions that will be archived. If the limit is set to 0 then archiving will be disabled.', + ...numberConfigHelper(0), + }, ...p2pReqRespConfigMappings, }; diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts index bc9f91329e3..ea584401412 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts @@ -1,3 +1,4 @@ +import { mockTx } from '@aztec/circuit-types'; import { openTmpStore } from '@aztec/kv-store/lmdb'; import { AztecKVTxPool } from './aztec_kv_tx_pool.js'; @@ -6,8 +7,39 @@ import { describeTxPool } from './tx_pool_test_suite.js'; describe('KV TX pool', () => { let txPool: AztecKVTxPool; beforeEach(() => { - txPool = new AztecKVTxPool(openTmpStore()); + txPool = new AztecKVTxPool(openTmpStore(), openTmpStore()); }); describeTxPool(() => txPool); + + it('Returns archived txs and purges archived txs once the archived tx limit is reached', async () => { + // set the archived tx limit to 2 + txPool = new AztecKVTxPool(openTmpStore(), openTmpStore(), undefined, 2); + + const tx1 = mockTx(1); + const tx2 = mockTx(2); + const tx3 = mockTx(3); + const tx4 = mockTx(4); + const tx5 = mockTx(5); + await txPool.addTxs([tx1, tx2, tx3, tx4, tx5]); + + // delete two txs and assert that they are properly archived + await txPool.deleteTxs([tx1.getTxHash(), tx2.getTxHash()]); + expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toEqual(tx1); + expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2); + + // delete a single tx and assert that the first tx is purged and the new tx is archived + await txPool.deleteTxs([tx3.getTxHash()]); + expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toBeUndefined(); + expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2); + expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toEqual(tx3); + + // delete multiple txs and assert that the old txs are purged and the new txs are archived + await txPool.deleteTxs([tx4.getTxHash(), tx5.getTxHash()]); + expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toBeUndefined(); + expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toBeUndefined(); + expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toBeUndefined(); + expect(txPool.getArchivedTxByHash(tx4.getTxHash())).toEqual(tx4); + expect(txPool.getArchivedTxByHash(tx5.getTxHash())).toEqual(tx5); + }); }); diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts index da6ca6d526b..ad7e4981fcb 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts @@ -1,5 +1,6 @@ import { Tx, TxHash } from '@aztec/circuit-types'; import { type TxAddedToPoolStats } from '@aztec/circuit-types/stats'; +import { ClientIvcProof } from '@aztec/circuits.js'; import { type Logger, createLogger } from '@aztec/foundation/log'; import { type AztecKVStore, type AztecMap, type AztecMultiMap } from '@aztec/kv-store'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; @@ -9,7 +10,7 @@ import { getPendingTxPriority } from './priority.js'; import { type TxPool } from './tx_pool.js'; /** - * In-memory implementation of the Transaction Pool. + * KV implementation of the Transaction Pool. */ export class AztecKVTxPool implements TxPool { #store: AztecKVStore; @@ -23,25 +24,47 @@ export class AztecKVTxPool implements TxPool { /** Index from tx priority (stored as hex) to its tx hash, filtered by pending txs. */ #pendingTxPriorityToHash: AztecMultiMap; + /** KV store for archived txs. */ + #archive: AztecKVStore; + + /** Archived txs map for future lookup. */ + #archivedTxs: AztecMap; + + /** Indexes of the archived txs by insertion order. */ + #archivedTxIndices: AztecMap; + + /** Number of txs to archive. */ + #archivedTxLimit: number; + #log: Logger; #metrics: PoolInstrumentation; /** - * Class constructor for in-memory TxPool. Initiates our transaction pool as a JS Map. - * @param store - A KV store. + * Class constructor for KV TxPool. Initiates our transaction pool as an AztecMap. + * @param store - A KV store for live txs in the pool. + * @param archive - A KV store for archived txs. + * @param telemetry - A telemetry client. + * @param archivedTxLimit - The number of txs to archive. * @param log - A logger. */ constructor( store: AztecKVStore, + archive: AztecKVStore, telemetry: TelemetryClient = getTelemetryClient(), + archivedTxLimit: number = 0, log = createLogger('p2p:tx_pool'), ) { this.#txs = store.openMap('txs'); this.#minedTxHashToBlock = store.openMap('txHashToBlockMined'); this.#pendingTxPriorityToHash = store.openMultiMap('pendingTxFeeToHash'); + this.#archivedTxs = archive.openMap('archivedTxs'); + this.#archivedTxIndices = archive.openMap('archivedTxIndices'); + this.#archivedTxLimit = archivedTxLimit; + this.#store = store; + this.#archive = archive; this.#log = log; this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize()); } @@ -129,6 +152,21 @@ export class AztecKVTxPool implements TxPool { return undefined; } + /** + * Checks if an archived tx exists and returns it. + * @param txHash - The tx hash. + * @returns The transaction metadata, if found, 'undefined' otherwise. + */ + public getArchivedTxByHash(txHash: TxHash): Tx | undefined { + const buffer = this.#archivedTxs.get(txHash.toString()); + if (buffer) { + const tx = Tx.fromBuffer(buffer); + tx.setTxHash(txHash); + return tx; + } + return undefined; + } + /** * Adds a list of transactions to the pool. Duplicates are ignored. * @param txs - An array of txs to be added to the pool. @@ -162,13 +200,14 @@ export class AztecKVTxPool implements TxPool { /** * Deletes transactions from the pool. Tx hashes that are not present are ignored. * @param txHashes - An array of tx hashes to be removed from the tx pool. - * @returns The number of transactions that was deleted from the pool. + * @returns Empty promise. */ public deleteTxs(txHashes: TxHash[]): Promise { let pendingDeleted = 0; let minedDeleted = 0; - return this.#store.transaction(() => { + const deletedTxs: Tx[] = []; + const poolDbTx = this.#store.transaction(() => { for (const hash of txHashes) { const key = hash.toString(); const tx = this.getTxByHash(hash); @@ -184,6 +223,10 @@ export class AztecKVTxPool implements TxPool { pendingDeleted++; } + if (this.#archivedTxLimit) { + deletedTxs.push(tx); + } + void this.#txs.delete(key); void this.#minedTxHashToBlock.delete(key); } @@ -192,6 +235,8 @@ export class AztecKVTxPool implements TxPool { this.#metrics.recordRemovedObjects(pendingDeleted, 'pending'); this.#metrics.recordRemovedObjects(minedDeleted, 'mined'); }); + + return this.#archivedTxLimit ? poolDbTx.then(() => this.archiveTxs(deletedTxs)) : poolDbTx; } /** @@ -213,4 +258,41 @@ export class AztecKVTxPool implements TxPool { public getAllTxHashes(): TxHash[] { return Array.from(this.#txs.keys()).map(x => TxHash.fromString(x)); } + + /** + * Archives a list of txs for future reference. The number of archived txs is limited by the specified archivedTxLimit. + * @param txs - The list of transactions to archive. + * @returns Empty promise. + */ + private archiveTxs(txs: Tx[]): Promise { + return this.#archive.transaction(() => { + // calcualte the head and tail indices of the archived txs by insertion order. + let headIdx = (this.#archivedTxIndices.entries({ limit: 1, reverse: true }).next().value?.[0] ?? -1) + 1; + let tailIdx = this.#archivedTxIndices.entries({ limit: 1 }).next().value?.[0] ?? 0; + + for (const tx of txs) { + while (headIdx - tailIdx >= this.#archivedTxLimit) { + const txHash = this.#archivedTxIndices.get(tailIdx); + if (txHash) { + void this.#archivedTxs.delete(txHash); + void this.#archivedTxIndices.delete(tailIdx); + } + tailIdx++; + } + + const archivedTx: Tx = new Tx( + tx.data, + ClientIvcProof.empty(), + tx.unencryptedLogs, + tx.contractClassLogs, + tx.enqueuedPublicFunctionCalls, + tx.publicTeardownFunctionCall, + ); + const txHash = tx.getTxHash().toString(); + void this.#archivedTxs.set(txHash, archivedTx.toBuffer()); + void this.#archivedTxIndices.set(headIdx, txHash); + headIdx++; + } + }); + } } diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts index 7d0d87df675..2fc985a6a1f 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts @@ -100,6 +100,10 @@ export class InMemoryTxPool implements TxPool { return result === undefined ? undefined : Tx.clone(result); } + public getArchivedTxByHash(): Tx | undefined { + return undefined; + } + /** * Adds a list of transactions to the pool. Duplicates are ignored. * @param txs - An array of txs to be added to the pool. diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts index 173565c8293..6ad69b3c7de 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts @@ -17,6 +17,13 @@ export interface TxPool { */ getTxByHash(txHash: TxHash): Tx | undefined; + /** + * Checks if an archived transaction exists in the pool and returns it. + * @param txHash - The hash of the transaction, used as an ID. + * @returns The transaction, if found, 'undefined' otherwise. + */ + getArchivedTxByHash(txHash: TxHash): Tx | undefined; + /** * Marks the set of txs as mined, as opposed to pending. * @param txHashes - Hashes of the txs to flag as mined.