From 3c68a31ad73b579474503c414cea07da6dfd0bff Mon Sep 17 00:00:00 2001 From: Kostiantyn Smyrnov Date: Fri, 8 Mar 2024 12:06:04 +0100 Subject: [PATCH] =?UTF-8?q?fix:=20=F0=9F=90=9B=20Fixed=20contract=20events?= =?UTF-8?q?=20subscription=20fromBlock=20management?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/contracts-manger/src/index.ts | 61 +++++++++++++++----------- 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/packages/contracts-manger/src/index.ts b/packages/contracts-manger/src/index.ts index 95d75f7..4e71283 100644 --- a/packages/contracts-manger/src/index.ts +++ b/packages/contracts-manger/src/index.ts @@ -705,10 +705,10 @@ export class ProtocolContracts< * @param {(logs: GetFilterLogsReturnType) => void} onLogs Callback to execute when logs are received. * @param {bigint} [fromBlock] The block number from which to start listening for events. * @param {number} [pollInterval=1000] The interval in milliseconds at which to poll for new events. - * @returns {Promise<() => void>} A promise that resolves to an unsubscribe function. + * @returns {() => void} A promise that resolves to an unsubscribe function. * @private */ - private async subscribeToEvents< + private subscribeToEvents< // eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents const TAbi extends Abi | readonly unknown[] = Abi, TEventName extends string | undefined = undefined, @@ -717,27 +717,28 @@ export class ProtocolContracts< address: Address, eventName: InferEventName, // eslint-disable-next-line @typescript-eslint/no-explicit-any - onLogs: (logs: any) => void, - fromBlock?: bigint, + onLogs: (logs: any, maxBlockNumber: bigint) => void, + fromBlock: bigint = 0n, pollInterval: number = 1000, - ): Promise<() => void> { - let blockNumber = await this.publicClient.getBlockNumber(); - let isUnsubscribed = false; + ): () => void { let timeoutId: NodeJS.Timeout; - - // Adjust starting block number if fromBlock is provided and valid - if (fromBlock && fromBlock < blockNumber) { - blockNumber = fromBlock; - } + let isUnsubscribed = false; // Function to fetch and process logs const getLogs = async () => { if (isUnsubscribed) return; + const blockNumber = await this.publicClient.getBlockNumber(); + + // Adjust starting block number if fromBlock is provided + if (fromBlock > blockNumber) { + fromBlock = blockNumber; + } + const filter = await this.publicClient.createContractEventFilter({ abi, address, - fromBlock: blockNumber, + fromBlock, strict: true, eventName: eventName, } as unknown as FilterOptions); @@ -745,10 +746,18 @@ export class ProtocolContracts< const logs = await this.publicClient.getFilterLogs({ filter }); if (logs.length > 0) { - onLogs(logs); - // Update the block number to the next after the last log's block - const bn = logs[logs.length - 1].blockNumber; - blockNumber = (bn !== null ? bn : BigInt(0)) + BigInt(1); + const maxBlockNumber = logs.reduce( + (max, log) => + log.blockNumber !== null && log.blockNumber > max + ? log.blockNumber + : max, + 0n, + ); + + if (maxBlockNumber > fromBlock) { + fromBlock = maxBlockNumber + 1n; + onLogs(logs, maxBlockNumber); + } } if (!isUnsubscribed) { @@ -776,14 +785,14 @@ export class ProtocolContracts< * @param {(logs: GetFilterLogsReturnType) => void} onLogs Callback for when logs are received. * @param {bigint} [fromBlock] Starting block number for listening for events. * @param {number} [pollInterval=1000] Polling interval in milliseconds. - * @returns {Promise<() => void>} Unsubscribe function. + * @returns {() => void} Unsubscribe function. */ - async subscribeMarket( + subscribeMarket( eventName: InferEventName, onLogs: (logs: GetFilterLogsReturnType) => void, fromBlock?: bigint, pollInterval: number = 1000, - ): Promise<() => void> { + ): () => void { return this.subscribeToEvents( marketABI, this.contracts['market'].address, @@ -802,14 +811,14 @@ export class ProtocolContracts< * @param {(logs: GetFilterLogsReturnType) => void} onLogs Callback for when logs are received. * @param {bigint} [fromBlock] Starting block number for listening for events. * @param {number} [pollInterval=1000] Polling interval in milliseconds. - * @returns {Promise<() => void>} Unsubscribe function. + * @returns {() => void} Unsubscribe function. */ - async subscribeEntities( + subscribeEntities( eventName: InferEventName, onLogs: (logs: GetFilterLogsReturnType) => void, fromBlock?: bigint, pollInterval: number = 1000, - ): Promise<() => void> { + ): () => void { return this.subscribeToEvents( entitiesRegistryABI, this.contracts['entities'].address, @@ -828,14 +837,14 @@ export class ProtocolContracts< * @param {(logs: GetFilterLogsReturnType) => void} onLogs Callback for when logs are received. * @param {bigint} [fromBlock] Starting block number for listening for events. * @param {number} [pollInterval=1000] Polling interval in milliseconds. - * @returns {Promise<() => void>} Unsubscribe function. + * @returns {() => void} Unsubscribe function. */ - async subscribeConfig( + subscribeConfig( eventName: InferEventName, onLogs: (logs: GetFilterLogsReturnType) => void, fromBlock?: bigint, pollInterval: number = 1000, - ): Promise<() => void> { + ): () => void { return this.subscribeToEvents( configABI, this.contracts['config'].address,