Skip to content

Commit

Permalink
feat: add hub pool events indexing (#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
amateima authored Sep 3, 2024
1 parent 630cde5 commit ac2c6c3
Show file tree
Hide file tree
Showing 16 changed files with 1,638 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ REFERRAL_DELIMITER_START_TIMESTAMP=1657290720
ENABLE_SPOKE_POOLS_EVENTS_PROCESSING=false
# enable fetching MerkleDistributor events from the contracts
ENABLE_MERKLE_DISTRIBUTOR_EVENTS_PROCESSING=false
# enable fetching HubPool events from the contract
ENABLE_HUB_POOL_EVENTS_PROCESSING=false
# enable the refresh of the referrals materialized view.
ENABLE_REFERRALS_MATERIALIZED_VIEW_REFRESH=false
# specify the strategy used for updating sticky referrals. Valid options: queue | cron | disable
Expand Down
14 changes: 14 additions & 0 deletions migrations/1720909029359-HubPoolProcessedBlock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class HubPoolProcessedBlock1720909029359 implements MigrationInterface {
name = 'HubPoolProcessedBlock1720909029359';

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE TABLE "hub_pool_processed_block" ("id" SERIAL NOT NULL, "chainId" integer NOT NULL, "latestBlock" integer NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "PK_5ee19e9910bd8cac851b865d51a" PRIMARY KEY ("id"))`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE "hub_pool_processed_block"`);
}

}
32 changes: 32 additions & 0 deletions migrations/1721045885663-SetPoolRebalanceRouteEvent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class SetPoolRebalanceRouteEvent1721045885663 implements MigrationInterface {
name = "SetPoolRebalanceRouteEvent1721045885663";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE "events"."set_pool_rebalance_route_event" (
"id" SERIAL NOT NULL,
"blockNumber" integer NOT NULL,
"blockHash" character varying NOT NULL,
"transactionIndex" integer NOT NULL,
"address" character varying NOT NULL,
"chainId" integer NOT NULL,
"transactionHash" character varying NOT NULL,
"logIndex" integer NOT NULL,
"destinationChainId" integer NOT NULL,
"l1Token" character varying NOT NULL,
"destinationToken" character varying NOT NULL,
"createdAt" TIMESTAMP NOT NULL DEFAULT now(),
"updatedAt" TIMESTAMP NOT NULL DEFAULT now(),
CONSTRAINT "UK_sprre_transactionHash_logIndex" UNIQUE ("transactionHash", "logIndex"),
CONSTRAINT "PK_1acd4d3e93afc21bfedc3d99892" PRIMARY KEY ("id"))
`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`DROP TABLE "events"."set_pool_rebalance_route_event"`,
);
}
}
10 changes: 10 additions & 0 deletions src/modules/configuration/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import EthereumSpokePool2_5Abi from "../web3/services/abi/EthereumSpokePool2_5.j
import ArbitrumSpokePool2_5Abi from "../web3/services/abi/ArbitrumSpokePool2_5.json";
import PolygonSpokePool2_5Abi from "../web3/services/abi/PolygonSpokePool2_5.json";
import OptimismSpokePool2_5Abi from "../web3/services/abi/OptimismSpokePool2_5.json";
import HubPoolAbi from "../web3/services/abi/HubPool.json";

import { AcrossContractsVersion } from "../web3/model/across-version";

export enum StickyReferralAddressesMechanism {
Expand Down Expand Up @@ -82,6 +84,13 @@ export const configValues = () => ({
690: process.env.WEB3_NODE_URL_690,
7777777: process.env.WEB3_NODE_URL_7777777,
},
hubPoolContracts: {
[ChainIds.mainnet]: {
address: "0xc186fA914353c44b2E33eBE05f21846F1048bEda",
startBlockNumber: 14819537,
abi: JSON.stringify(HubPoolAbi),
},
},
spokePoolContracts: {
[ChainIds.mainnet]: [
{
Expand Down Expand Up @@ -426,6 +435,7 @@ export const configValues = () => ({
? process.env.SPOKE_POOLS_EVENTS_PROCESSING_CHAIN_IDS.split(",").map((chainId) => parseInt(chainId))
: [ChainIds.mainnet, ChainIds.optimism, ChainIds.arbitrum, ChainIds.polygon],
enableMerkleDistributorEventsProcessing: process.env.ENABLE_MERKLE_DISTRIBUTOR_EVENTS_PROCESSING === "true",
enableHubPoolEventsProcessing: process.env.ENABLE_HUB_POOL_EVENTS_PROCESSING === "true",
enableReferralsMaterializedViewRefresh: process.env.ENABLE_REFERRALS_MATERIALIZED_VIEW_REFRESH === "true",
allowWalletRewardsEdit: process.env.ALLOW_WALLET_REWARDS_EDIT === "true",
stickyReferralAddressesMechanism: process.env.STICKY_REFERRAL_ADDRESSES_MECHANISM
Expand Down
4 changes: 4 additions & 0 deletions src/modules/database/database.providers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import { MerkleDistributorClaim } from "../airdrop/model/merkle-distributor-clai
import { DepositGapCheck } from "../scraper/model/DepositGapCheck.entity";
import { ArbReward } from "../rewards/model/arb-reward.entity";
import { FindMissedFillEventJob } from "../scraper/model/FindMissedFillEventJob.entity";
import { HubPoolProcessedBlock } from "../scraper/model/HubPoolProcessedBlock.entity";
import { SetPoolRebalanceRouteEvent } from "../web3/model/SetPoolRebalanceRouteEvent.entity";

// TODO: Add db entities here
const entities = [
Expand Down Expand Up @@ -62,6 +64,8 @@ const entities = [
DepositGapCheck,
ArbReward,
FindMissedFillEventJob,
HubPoolProcessedBlock,
SetPoolRebalanceRouteEvent,
];

@Injectable()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { OnQueueFailed, Process, Processor } from "@nestjs/bull";
import { Logger } from "@nestjs/common";
import { Job } from "bull";
import { DataSource } from "typeorm";

import { EthProvidersService } from "../../../web3/services/EthProvidersService";
import { HubPoolBlocksEventsQueueMessage, ScraperQueue } from ".";
import { AppConfig } from "../../../configuration/configuration.service";
import { SetPoolRebalanceRoute } from "../../../web3/model/hubpool-events";
import { SetPoolRebalanceRouteEvent } from "../../../web3/model/SetPoolRebalanceRouteEvent.entity";

@Processor(ScraperQueue.HubPoolBlocksEvents)
export class HubPoolBlocksEventsConsumer {
private logger = new Logger(HubPoolBlocksEventsConsumer.name);

constructor(
private providers: EthProvidersService,
private appConfig: AppConfig,
private dataSource: DataSource,
) {}

@Process({ concurrency: 1 })
private async process(job: Job<HubPoolBlocksEventsQueueMessage>) {
const { chainId, from, to } = job.data;
const { address } = this.appConfig.values.web3.hubPoolContracts[chainId];
const setPoolRebalanceRouteEvents = (await this.providers
.getHubPoolEventQuerier(chainId, address)
.getSetPoolRebalanceRouteEvents(from, to)) as SetPoolRebalanceRoute[];
this.logger.log(`(${from}, ${to}) - chainId ${chainId} - found ${setPoolRebalanceRouteEvents.length} SetPoolRebalanceRouteEvent`);

if (setPoolRebalanceRouteEvents.length > 0) {
for (const event of setPoolRebalanceRouteEvents) {
await this.dataSource
.createQueryBuilder()
.insert()
.into(SetPoolRebalanceRouteEvent)
.values({
blockNumber: event.blockNumber,
blockHash: event.blockHash,
transactionIndex: event.transactionIndex,
address: event.address,
chainId,
transactionHash: event.transactionHash,
logIndex: event.logIndex,
destinationChainId: event.args.destinationChainId.toNumber(),
l1Token: event.args.l1Token,
destinationToken: event.args.destinationToken,
})
.orIgnore()
.execute();
}
}
}

@OnQueueFailed()
private onQueueFailed(job: Job, error: Error) {
this.logger.error(`${JSON.stringify(job.data)} failed: ${error}`);
}
}
7 changes: 7 additions & 0 deletions src/modules/scraper/adapter/messaging/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export enum ScraperQueue {
ArbRebateReward = "ArbRebateReward",
MerkleDistributorClaim = "MerkleDistributorClaim",
FindMissedFillEvent = "FindMissedFillEvent",
HubPoolBlocksEvents = "HubPoolBlocksEvents",
}

export type BlocksEventsQueueMessage = {
Expand All @@ -33,6 +34,12 @@ export type MerkleDistributorBlocksEventsQueueMessage = {
to: number;
};

export type HubPoolBlocksEventsQueueMessage = {
chainId: number;
from: number;
to: number;
};

export type FillEventsQueueMessage = {
realizedLpFeePct: string;
originChainId: number;
Expand Down
16 changes: 16 additions & 0 deletions src/modules/scraper/model/HubPoolProcessedBlock.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Column, CreateDateColumn, Entity, PrimaryGeneratedColumn } from "typeorm";

@Entity()
export class HubPoolProcessedBlock {
@PrimaryGeneratedColumn()
id: number;

@Column()
chainId: number;

@Column()
latestBlock: number;

@CreateDateColumn()
createdAt: Date;
}
7 changes: 7 additions & 0 deletions src/modules/scraper/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ import { FindMissedFillEventConsumer } from "./adapter/messaging/FindMissedFillE
import { FindMissedFillEventJob } from "./model/FindMissedFillEventJob.entity";
import { Block } from "../web3/model/block.entity";
import { ArbReward } from "../rewards/model/arb-reward.entity";
import { HubPoolBlocksEventsConsumer } from "./adapter/messaging/HubPoolBlocksEventsConsumer";
import { HubPoolProcessedBlock } from "./model/HubPoolProcessedBlock.entity";

@Module({})
export class ScraperModule {
Expand All @@ -78,6 +80,7 @@ export class ScraperModule {
BlocksEventsConsumer,
MerkleDistributorBlocksEventsConsumer,
MerkleDistributorBlocksEventsConsumerV2,
HubPoolBlocksEventsConsumer,
FillEventsConsumer,
FillEventsConsumer2,
FillEventsV3Consumer,
Expand Down Expand Up @@ -121,6 +124,7 @@ export class ScraperModule {
DepositGapCheck,
FindMissedFillEventJob,
Block,
HubPoolProcessedBlock,
]),
MarketPriceModule.forRoot(),
HttpModule,
Expand Down Expand Up @@ -219,6 +223,9 @@ export class ScraperModule {
BullModule.registerQueue({
name: ScraperQueue.FindMissedFillEvent,
}),
BullModule.registerQueue({
name: ScraperQueue.HubPoolBlocksEvents,
}),
],
exports: [ScraperQueuesService],
controllers: [ScraperController],
Expand Down
36 changes: 36 additions & 0 deletions src/modules/scraper/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
BlocksEventsQueueMessage,
DepositFilledDateQueueMessage,
FeeBreakdownQueueMessage,
HubPoolBlocksEventsQueueMessage,
MerkleDistributorBlocksEventsQueueMessage,
ScraperQueue,
} from "./adapter/messaging";
Expand All @@ -24,6 +25,7 @@ import {
RetryIncompleteDepositsBody,
} from "./entry-point/http/dto";
import { Deposit } from "../deposit/model/deposit.entity";
import { HubPoolProcessedBlock } from "./model/HubPoolProcessedBlock.entity";

const SPOKE_POOL_VERIFIER_CONTRACT_ADDRESS = "0x269727F088F16E1Aea52Cf5a97B1CD41DAA3f02D";

Expand All @@ -40,6 +42,8 @@ export class ScraperService {
private depositRepository: Repository<Deposit>,
@InjectRepository(MerkleDistributorProcessedBlock)
private merkleDistributorProcessedBlockRepository: Repository<MerkleDistributorProcessedBlock>,
@InjectRepository(HubPoolProcessedBlock)
private hubPoolProcessedBlockRepository: Repository<HubPoolProcessedBlock>,
private scraperQueuesService: ScraperQueuesService,
private dataSource: DataSource,
) {
Expand All @@ -59,6 +63,10 @@ export class ScraperService {
this.publishMerkleDistributorBlocksV2(merkleDistributorConfig, 60);
}
}

if (this.appConfig.values.enableHubPoolEventsProcessing) {
this.publishHubPoolBlocks(30);
}
}

public async publishBlocks(chainId: number, secondsInterval: number) {
Expand Down Expand Up @@ -123,6 +131,34 @@ export class ScraperService {
}
}

public async publishHubPoolBlocks(interval: number) {
while (true) {
try {
const chainId = ChainIds.mainnet;
const blockNumber = await this.providers.getProvider(chainId).getBlockNumber();
const configStartBlockNumber = this.appConfig.values.web3.hubPoolContracts[chainId].startBlockNumber;
const range = await this.determineBlockRange(
chainId,
blockNumber,
configStartBlockNumber,
this.hubPoolProcessedBlockRepository,
true,
);

if (!!range) {
const queueMsg = { chainId, ...range };
await this.scraperQueuesService.publishMessage<HubPoolBlocksEventsQueueMessage>(
ScraperQueue.HubPoolBlocksEvents,
queueMsg,
);
}
} catch (error) {
this.logger.error(error);
}
await wait(interval);
}
}

/**
* Compute the start and the end of the next batch of blocks that needs to be processed.
* `from` is computed depending on the latest block saved in DB || start block number defined in config file || 1
Expand Down
2 changes: 2 additions & 0 deletions src/modules/scraper/service/ScraperQueuesService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class ScraperQueuesService {
@InjectQueue(ScraperQueue.ArbRebateReward) private arbRebateRewardQueue: Queue,
@InjectQueue(ScraperQueue.MerkleDistributorClaim) private merkleDistributorClaimQueue: Queue,
@InjectQueue(ScraperQueue.FindMissedFillEvent) private findMissedFillEventQueue: Queue,
@InjectQueue(ScraperQueue.HubPoolBlocksEvents) private hubPoolBlocksEventsQueue: Queue,
) {
this.queuesMap = {
[ScraperQueue.BlocksEvents]: this.blocksEventsQueue,
Expand All @@ -52,6 +53,7 @@ export class ScraperQueuesService {
[ScraperQueue.ArbRebateReward]: this.arbRebateRewardQueue,
[ScraperQueue.MerkleDistributorClaim]: this.merkleDistributorClaimQueue,
[ScraperQueue.FindMissedFillEvent]: this.findMissedFillEventQueue,
[ScraperQueue.HubPoolBlocksEvents]: this.hubPoolBlocksEventsQueue,
};
}

Expand Down
46 changes: 46 additions & 0 deletions src/modules/web3/model/SetPoolRebalanceRouteEvent.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Column, CreateDateColumn, Entity, PrimaryGeneratedColumn, Unique, UpdateDateColumn } from "typeorm";

@Entity({
schema: "events",
})
@Unique("UK_sprre_transactionHash_logIndex", ["transactionHash", "logIndex"])
export class SetPoolRebalanceRouteEvent {
@PrimaryGeneratedColumn()
id: number;

@Column()
blockNumber: number;

@Column()
blockHash: string;

@Column()
transactionIndex: number;

@Column()
address: string;

@Column()
chainId: number;

@Column()
transactionHash: string;

@Column()
logIndex: number;

@Column()
destinationChainId: number;

@Column()
l1Token: string;

@Column()
destinationToken: string;

@CreateDateColumn()
createdAt: Date;

@UpdateDateColumn()
updatedAt: Date;
}
13 changes: 13 additions & 0 deletions src/modules/web3/model/hubpool-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { BigNumber, Event } from "ethers";

export interface SetPoolRebalanceRoute extends Event {
args: [
BigNumber,
string,
string,
] & {
destinationChainId: BigNumber;
l1Token: string;
destinationToken: string;
}
}
Loading

0 comments on commit ac2c6c3

Please sign in to comment.