Skip to content
This repository has been archived by the owner on Feb 8, 2025. It is now read-only.

Commit

Permalink
fix(api): correctly attach events to confirmed result
Browse files Browse the repository at this point in the history
  • Loading branch information
hbriese committed Aug 16, 2024
1 parent c56ada7 commit 85008cc
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 25 deletions.
35 changes: 27 additions & 8 deletions api/src/feat/events/events.service.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Injectable } from '@nestjs/common';
import { Chain } from 'chains';
import { Hex, UUID } from 'lib';
import { asUUID, Hex, UUID } from 'lib';
import { AbiEvent, encodeEventTopics, parseEventLogs, Log as ViemLog } from 'viem';
import { Receipt } from '../system-txs/confirmations.worker';
import { NetworksService } from '~/core/networks';
import { DatabaseService } from '~/core/database';
import e from '~/edgeql-js';

export type Log<
TAbiEvent extends AbiEvent | undefined = undefined,
Expand All @@ -27,7 +29,6 @@ export interface ProcessOptimisticParams {
export interface ProcessConfirmedParams {
logs: Log<undefined, true>[];
chain: Chain;
result?: UUID;
receipt?: Receipt;
}

Expand Down Expand Up @@ -56,7 +57,10 @@ export class EventsService {
optimisticAbi: AbiEvent[] = [];
confirmedAbi: AbiEvent[] = [];

constructor(private networks: NetworksService) {}
constructor(
private db: DatabaseService,
private networks: NetworksService,
) {}

onOptimistic<TAbiEvent extends AbiEvent>(
event: TAbiEvent,
Expand Down Expand Up @@ -105,15 +109,30 @@ export class EventsService {
);
}

async processConfirmed({ chain, logs, result, receipt }: ProcessConfirmedParams) {
async processConfirmed({ chain, logs, receipt }: ProcessConfirmedParams) {
const parsedLogs = parseEventLogs({ logs, abi: this.confirmedAbi, strict: true });

const getResult = async () => {
const id =
receipt &&
(await this.db.queryWith2(
{ hash: e.Bytes32 },
{ hash: receipt.transactionHash },
({ hash }) =>
e.select(e.SystemTx, () => ({ filter_single: { hash }, result: true })).result.id,
));

return (id && asUUID(id)) || null;
};

const network = this.networks.get(chain);
const blocks = await Promise.all(
[...new Set(logs.map((log) => log.blockNumber))].map((blockNumber) =>
const uniqueBlockNumbers = [...new Set(logs.map((log) => log.blockNumber))];
const [result, ...blocks] = await Promise.all([
getResult(),
...uniqueBlockNumbers.map((blockNumber) =>
network.getBlock({ blockNumber, includeTransactions: false }),
),
);
]);

await Promise.all(
parsedLogs
Expand All @@ -129,7 +148,7 @@ export class EventsService {
timestamp,
logIndex: log.logIndex,
log,
result: result ?? null,
result,
receipt,
}),
);
Expand Down
29 changes: 14 additions & 15 deletions api/src/feat/system-txs/confirmations.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class ConfirmationsWorker extends Worker<ConfirmationQueue> {

async process(job: TypedJob<ConfirmationQueue>) {
const { chain } = job.data;
const transaction = isHex(job.data.transaction)
const hash = isHex(job.data.transaction)
? job.data.transaction
: await (async () => {
const v =
Expand All @@ -49,24 +49,23 @@ export class ConfirmationsWorker extends Worker<ConfirmationQueue> {

return isHex(v) ? v : undefined;
})();
if (!transaction) return;
if (!hash) return;

await job.updateData({ ...job.data, transaction });
await job.updateData({ ...job.data, transaction: hash });

const network = this.networks.get(chain);
const receipt = await network.waitForTransactionReceipt({
hash: transaction,
const receipt = await this.networks.get(chain).waitForTransactionReceipt({
hash,
timeout: 60_000,
pollingInterval: 1_000,
pollingInterval: 500,
});

await Promise.all([
...this.listeners.map((listener) => listener({ chain, receipt })),
this.events.processConfirmed({
chain,
logs: receipt.logs as unknown as Log<AbiEvent, true>[],
receipt,
}),
]);
// Execute listeners prior to events to ensure result is available
await Promise.all(this.listeners.map((listener) => listener({ chain, receipt })));

await this.events.processConfirmed({
chain,
logs: receipt.logs as unknown as Log<AbiEvent, true>[],
receipt,
});
}
}
2 changes: 0 additions & 2 deletions api/src/feat/transfers/transfers.events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { EventsService, OptimisticEvent, ConfirmedEvent } from '../events/events
import { and, DatabaseService } from '~/core/database';
import e from '~/edgeql-js';
import { selectAccount } from '../accounts/accounts.util';
import { NetworksService } from '~/core/networks/networks.service';
import { uuid } from 'edgedb/dist/codecs/ifaces';
import { EventPayload, PubsubService } from '~/core/pubsub/pubsub.service';
import { getAbiItem } from 'viem';
Expand Down Expand Up @@ -34,7 +33,6 @@ export class TransfersEvents {
constructor(
private db: DatabaseService,
private events: EventsService,
private networks: NetworksService,
private pubsub: PubsubService,
private accountsCache: AccountsCacheService,
private expo: ExpoService,
Expand Down

0 comments on commit 85008cc

Please sign in to comment.