From aa851f4e7ea77aa38cb1e4161cb16d78acf83976 Mon Sep 17 00:00:00 2001 From: Tate Date: Wed, 12 Feb 2025 09:17:59 +0000 Subject: [PATCH 1/9] multi chain rewind service --- packages/node-core/src/db/db.module.ts | 14 + packages/node-core/src/db/sync-helper.test.ts | 4 + packages/node-core/src/db/sync-helper.ts | 40 +++ packages/node-core/src/events.ts | 10 + packages/node-core/src/indexer/core.module.ts | 2 + .../src/indexer/entities/GlobalData.entity.ts | 53 +++ .../node-core/src/indexer/entities/index.ts | 1 + .../node-core/src/indexer/fetch.service.ts | 25 +- packages/node-core/src/indexer/index.ts | 1 + .../src/indexer/multiChainRewind.service.ts | 339 ++++++++++++++++++ .../node-core/src/indexer/project.service.ts | 86 +++-- .../node-core/src/indexer/store.service.ts | 50 ++- .../src/subcommands/reindex.service.ts | 34 +- packages/node-core/src/utils/reindex.ts | 84 +++-- .../node/src/subcommands/reindex.module.ts | 2 + 15 files changed, 649 insertions(+), 96 deletions(-) create mode 100644 packages/node-core/src/indexer/entities/GlobalData.entity.ts create mode 100644 packages/node-core/src/indexer/multiChainRewind.service.ts diff --git a/packages/node-core/src/db/db.module.ts b/packages/node-core/src/db/db.module.ts index 4c91a071fa..acdaeecb1c 100644 --- a/packages/node-core/src/db/db.module.ts +++ b/packages/node-core/src/db/db.module.ts @@ -3,6 +3,7 @@ import {DynamicModule, Global} from '@nestjs/common'; import {Sequelize, Options as SequelizeOption} from '@subql/x-sequelize'; +import {PoolConfig} from 'pg'; import {NodeConfig} from '../configure/NodeConfig'; import {getLogger} from '../logger'; import {exitWithError} from '../process'; @@ -90,6 +91,19 @@ export async function establishNewSequelize(nodeConfig: NodeConfig): Promise { ); }, 10_000); }); + + describe('rewind lock', () => { + // TODO + }); }); diff --git a/packages/node-core/src/db/sync-helper.ts b/packages/node-core/src/db/sync-helper.ts index bc3b6ac744..b40ded6a2a 100644 --- a/packages/node-core/src/db/sync-helper.ts +++ b/packages/node-core/src/db/sync-helper.ts @@ -16,6 +16,8 @@ import { Utils, } from '@subql/x-sequelize'; import {ModelAttributeColumnReferencesOptions, ModelIndexesOptions} from '@subql/x-sequelize/types/model'; +import {MultiChainRewindEvent} from '../events'; +import {RewindLockKey} from '../indexer'; import {EnumType} from '../utils'; import {formatAttributes, generateIndexName, modelToTableName} from './sequelizeUtil'; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -297,6 +299,44 @@ export function createSchemaTriggerFunction(schema: string): string { $$ LANGUAGE plpgsql;`; } +export function createRewindTrigger(schema: string): string { + const triggerName = hashName(schema, 'rewind_trigger', '_global'); + + return ` + CREATE TRIGGER "${triggerName}" + AFTER INSERT OR UPDATE OR DELETE + ON "${schema}"."_global" + FOR EACH ROW + WHEN ( new.key = '${RewindLockKey}') + EXECUTE FUNCTION "${schema}".rewind_notification();`; +} + +export function createRewindTriggerFunction(schema: string): string { + const triggerName = hashName(schema, 'rewind_trigger', '_global'); + + return ` + CREATE OR REPLACE FUNCTION "${schema}".rewind_notification() + RETURNS trigger AS $$ + BEGIN + IF TG_OP = 'INSERT' THEN + PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.Rewind}'); + END IF; + + -- During a rollback, there is a chain that needs to be rolled back to an earlier height. + IF TG_OP = 'UPDATE' AND (NEW.value ->> 'timestamp')::int < (OLD.value ->> 'timestamp')::int THEN + PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindTimestampDecreased}'); + END IF; + + IF TG_OP = 'DELETE' THEN + PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindComplete}'); + END IF; + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + `; +} + export function getExistedIndexesQuery(schema: string): string { return `SELECT indexname FROM pg_indexes WHERE schemaname = '${schema}'`; } diff --git a/packages/node-core/src/events.ts b/packages/node-core/src/events.ts index 63900a0de4..ce02e4a87a 100644 --- a/packages/node-core/src/events.ts +++ b/packages/node-core/src/events.ts @@ -29,6 +29,12 @@ export enum PoiEvent { PoiTarget = 'poi_target', } +export enum MultiChainRewindEvent { + Rewind = 'rewind', + RewindComplete = 'rewind_complete', + RewindTimestampDecreased = 'timestamp_decreased', +} + export interface RewindPayload { success: boolean; height: number; @@ -61,3 +67,7 @@ export interface NetworkMetadataPayload { specName: string; genesisHash: string; } + +export interface MultiChainRewindPayload { + height: number; +} diff --git a/packages/node-core/src/indexer/core.module.ts b/packages/node-core/src/indexer/core.module.ts index 75cc9e1fcc..3620e08d84 100644 --- a/packages/node-core/src/indexer/core.module.ts +++ b/packages/node-core/src/indexer/core.module.ts @@ -11,6 +11,7 @@ import {ConnectionPoolService} from './connectionPool.service'; import {ConnectionPoolStateManager} from './connectionPoolState.manager'; import {InMemoryCacheService} from './inMemoryCache.service'; import {MonitorService} from './monitor.service'; +import {MultiChainRewindService} from './multiChainRewind.service'; import {PoiService, PoiSyncService} from './poi'; import {SandboxService} from './sandbox.service'; import {StoreService} from './store.service'; @@ -33,6 +34,7 @@ import {storeModelFactory} from './storeModelProvider'; useFactory: storeModelFactory, inject: [NodeConfig, EventEmitter2, Sequelize], }, + MultiChainRewindService, AdminListener, ], controllers: [AdminController], diff --git a/packages/node-core/src/indexer/entities/GlobalData.entity.ts b/packages/node-core/src/indexer/entities/GlobalData.entity.ts new file mode 100644 index 0000000000..c7c7456de4 --- /dev/null +++ b/packages/node-core/src/indexer/entities/GlobalData.entity.ts @@ -0,0 +1,53 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {blake2AsHex} from '@subql/utils'; +import {BuildOptions, DataTypes, Model, Sequelize} from '@subql/x-sequelize'; + +export const RewindTimestampKeyPrefix = 'rewindTimestamp'; +export const RewindLockKey = 'rewindLock'; +export type RewindTimestampKey = `${typeof RewindTimestampKeyPrefix}_${string}`; + +export type RewindLockInfo = { + /** Timestamp to rewind to. */ + timestamp: number; + /** Number of additional chains to rewind. */ + rewindNum: number; +}; +export interface GlobalDataKeys { + rewindLock: RewindLockInfo; + [key: RewindTimestampKey]: number; +} + +export interface GlobalData { + key: k; + value: GlobalDataKeys[k]; +} + +interface GlobalDataEntity extends Model, GlobalData {} + +export type GlobalDataRepo = typeof Model & { + new (values?: unknown, options?: BuildOptions): GlobalDataEntity; +}; + +export function GlobalDataFactory(sequelize: Sequelize, schema: string): GlobalDataRepo { + const tableName = '_global'; + + return sequelize.define( + tableName, + { + key: { + type: DataTypes.STRING, + primaryKey: true, + }, + value: { + type: DataTypes.JSONB, + }, + }, + {freezeTableName: true, schema: schema} + ); +} + +export function generateRewindTimestampKey(chainId: string): RewindTimestampKey { + return `${RewindTimestampKeyPrefix}_${blake2AsHex(chainId)})`.substring(0, 63) as RewindTimestampKey; +} diff --git a/packages/node-core/src/indexer/entities/index.ts b/packages/node-core/src/indexer/entities/index.ts index cf880d9f41..ef1988522a 100644 --- a/packages/node-core/src/indexer/entities/index.ts +++ b/packages/node-core/src/indexer/entities/index.ts @@ -3,3 +3,4 @@ export * from './Poi.entity'; export * from './Metadata.entity'; +export * from './GlobalData.entity'; diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index e4982b8f16..ffb480eb83 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -3,19 +3,20 @@ import assert from 'assert'; import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common'; -import {EventEmitter2} from '@nestjs/event-emitter'; +import {EventEmitter2, OnEvent} from '@nestjs/event-emitter'; import {SchedulerRegistry} from '@nestjs/schedule'; import {BaseDataSource} from '@subql/types-core'; import {range} from 'lodash'; import {IBlockchainService} from '../blockchain.service'; import {NodeConfig} from '../configure'; -import {IndexerEvent} from '../events'; +import {EventPayload, IndexerEvent, MultiChainRewindEvent, MultiChainRewindPayload} from '../events'; import {getLogger} from '../logger'; import {delay, filterBypassBlocks, getModulos} from '../utils'; import {IBlockDispatcher} from './blockDispatcher'; import {mergeNumAndBlocksToNums} from './dictionary'; import {DictionaryService} from './dictionary/dictionary.service'; import {mergeNumAndBlocks} from './dictionary/utils'; +import {IMultiChainHandler, MultiChainRewindService, RewindStatus} from './multiChainRewind.service'; import {IStoreModelProvider} from './storeModelProvider'; import {BypassBlocks, IBlock, IProjectService} from './types'; import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service'; @@ -24,7 +25,7 @@ const logger = getLogger('FetchService'); @Injectable() export class FetchService, FB> - implements OnApplicationShutdown + implements OnApplicationShutdown, IMultiChainHandler { private _latestBestHeight?: number; private _latestFinalizedHeight?: number; @@ -39,7 +40,8 @@ export class FetchService + @Inject('IBlockchainService') private blockchainSevice: IBlockchainService, + private multiChainRewindService: MultiChainRewindService ) {} private get latestBestHeight(): number { @@ -196,6 +198,14 @@ export class FetchService; + setGlobalRewindLock(rewindTimestamp: number): Promise; + /** + * Check if the height is consistent before unlocking. + * @param tx + * @param rewindTimestamp The timestamp to roll back to, in milliseconds. + * @returns the number of remaining rewind chains + */ + releaseChainRewindLock(tx: Transaction, rewindTimestamp: number): Promise; +} + +export interface IMultiChainHandler { + handleMultiChainRewindEvent(rewindBlockPayload: MultiChainRewindPayload): void; +} + +@Injectable() +export class MultiChainRewindService implements IMultiChainRewindService, OnApplicationShutdown { + private _status: RewindStatus = RewindStatus.Normal; + private _chainId?: string; + private _dbSchema?: string; + waitRewindHeader?: Header; + private pgListener?: PoolClient; + constructor( + private nodeConfig: NodeConfig, + private eventEmitter: EventEmitter2, + private sequelize: Sequelize, + private storeService: StoreService, + @Inject('IBlockchainService') private readonly blockchainService: IBlockchainService + ) {} + + private set chainId(chainId: string) { + this._chainId = chainId; + } + + get chainId(): string { + assert(this._chainId, 'chainId is not set'); + return this._chainId; + } + + get dbSchema(): string { + assert(this._dbSchema, 'dbSchema is not set'); + return this._dbSchema; + } + + private set dbSchema(dbSchema: string) { + this._dbSchema = dbSchema; + } + + private set status(status: RewindStatus) { + this._status = status; + } + + getStatus(): RewindStatus { + return this._status; + } + + onApplicationShutdown() { + this.pgListener?.release(); + } + + async init(chainId: string, dbSchema: string, reindex: (targetHeader: Header) => Promise) { + this.chainId = chainId; + this.dbSchema = dbSchema; + + if (this.storeService.historical === 'timestamp') { + // Register a listener and create a schema notification sending function. + await this.registerPgListener(); + + if (this.waitRewindHeader) { + const rewindHeader = {...this.waitRewindHeader}; + await reindex(rewindHeader); + return rewindHeader; + } + } + } + + private async registerPgListener() { + if (this.pgListener) return; + + // Creating a new pgClient is to avoid using the same database connection as the block scheduler, + // which may prevent real-time listening to rollback events. + const pgPool = new Pool(getPgPoolConfig(this.nodeConfig)); + this.pgListener = await pgPool.connect(); + + this.pgListener.on('notification', (msg) => { + Promise.resolve().then(async () => { + const eventType = msg.payload; + logger.info(`Received rewind event: ${eventType}, chainId: ${this.chainId}`); + switch (eventType) { + case MultiChainRewindEvent.Rewind: + case MultiChainRewindEvent.RewindTimestampDecreased: { + this.status = RewindStatus.Rewinding; + const {rewindTimestamp} = await this.getGlobalRewindStatus(); + this.waitRewindHeader = await this.getHeaderByBinarySearch(dayjs(rewindTimestamp).toDate()); + + // Trigger the rewind event, and let the fetchService listen to the message and handle the queueFlush. + this.eventEmitter.emit(eventType, { + height: this.waitRewindHeader.blockHeight, + } satisfies MultiChainRewindPayload); + break; + } + case MultiChainRewindEvent.RewindComplete: + // recover indexing status + this.status = RewindStatus.Normal; + this.waitRewindHeader = undefined; + break; + default: + throw new Error(`Unknown rewind event: ${eventType}`); + } + logger.info(`Handle success rewind event: ${eventType}, chainId: ${this.chainId}`); + }); + }); + + await this.pgListener.query(`LISTEN "${hashName(this.dbSchema, 'rewind_trigger', '_global')}"`); + + // Check whether the current state is in rollback. + const {rewindLock, rewindTimestamp} = await this.getGlobalRewindStatus(); + if (rewindLock) { + this.status = RewindStatus.WaitOtherChain; + } + if (rewindTimestamp) { + this.status = RewindStatus.Rewinding; + this.waitRewindHeader = await this.getHeaderByBinarySearch(dayjs(rewindTimestamp).toDate()); + } + } + + /** + * Serialize the rewind lock + * @param rewindTimestamp ms + * @param chainTotal The total number of registered chains. + * @returns + */ + private serializeRewindLock(rewindTimestamp: number, chainTotal: number): string { + return JSON.stringify({timestamp: rewindTimestamp, chainNum: chainTotal}); + } + + async getGlobalRewindStatus() { + const rewindTimestampKey = generateRewindTimestampKey(this.chainId); + + const records = await this.storeService.globalDataRepo.findAll({ + where: {key: {[Op.in]: [rewindTimestampKey, RewindLockKey]}}, + }); + const rewindLockInfo: GlobalData | undefined = records + .find((r) => r.key === RewindLockKey) + ?.toJSON(); + const rewindTimestampInfo: GlobalData | undefined = records + .find((r) => r.key === rewindTimestampKey) + ?.toJSON(); + + assert( + rewindTimestampInfo !== undefined, + `Not registered rewind timestamp key in global data, chainId: ${this.chainId}` + ); + return {rewindTimestamp: rewindTimestampInfo.value, rewindLock: rewindLockInfo?.value}; + } + + /** + * If the set rewindTimestamp is greater than or equal to the current blockHeight, we do nothing because we will roll back to an earlier time. + * If the set rewindTimestamp is less than the current blockHeight, we should roll back to the earlier rewindTimestamp. + * @param rewindTimestamp rewindTimestamp in milliseconds + */ + async setGlobalRewindLock(rewindTimestamp: number) { + const globalTable = this.storeService.globalDataRepo.tableName; + const chainTotal = await this.storeService.globalDataRepo.count({ + where: { + key: {[Op.like]: `${RewindTimestampKeyPrefix}_%`}, + }, + }); + + const tx = await this.sequelize.transaction(); + try { + const [_, updateRows] = await this.sequelize.query( + `INSERT INTO "${this.dbSchema}"."${globalTable}" ( "key", "value", "createdAt", "updatedAt" ) + VALUES + ( '${RewindLockKey}', '${this.serializeRewindLock(rewindTimestamp, chainTotal)}', now(), now()) + ON CONFLICT ( "key" ) + DO UPDATE + SET "key" = EXCLUDED."key", + "value" = EXCLUDED."value", + "updatedAt" = EXCLUDED."updatedAt" + WHERE "key" = '${RewindLockKey}' AND ("value"->>'timestamp')::int > ${rewindTimestamp}`, + { + type: QueryTypes.INSERT, + transaction: tx, + } + ); + + // If there is a rewind lock that is greater than the current rewind timestamp, we should not update the rewind timestamp + if (updateRows === 1) { + logger.info(`setGlobalRewindLock success chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}`); + await this.storeService.globalDataRepo.update( + {value: rewindTimestamp}, + { + where: {key: {[Op.like]: 'rewindTimestamp_%'}}, + transaction: tx, + } + ); + + // The current chain is in REWINDING state + this.status = RewindStatus.Rewinding; + } + await tx.commit(); + } catch (e: any) { + logger.error( + `setGlobalRewindLock failed chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}, errorMsg: ${e.message}` + ); + await tx.rollback(); + throw e; + } + } + + async releaseChainRewindLock(tx: Transaction, rewindTimestamp: number): Promise { + const globalTable = this.storeService.globalDataRepo.tableName; + + // Ensure the first write occurs and prevent deadlock, only update the rewindNum - 1 + const results = await this.sequelize.query<{value: RewindLockInfo}>( + `UPDATE "${this.dbSchema}"."${globalTable}" + SET value = jsonb_set( + value, + '{chainNum}', + to_jsonb(COALESCE((value ->> 'chainNum')::int, 0) - 1), + false + ) + WHERE "key" = '${RewindLockKey}' AND ("value"->>'timestamp')::int = ${rewindTimestamp} + RETURNING value`, + { + type: QueryTypes.SELECT, + transaction: tx, + } + ); + + // not exist rewind lock in current timestamp + if (results.length === 0) { + logger.warn( + `Release rewind lock failed chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}, the rewind lock does not exist` + ); + return 0; + } + const rewindNum = results[0].value.rewindNum; + + const rewindTimestampKey = generateRewindTimestampKey(this.chainId); + const [affectedCount] = await this.storeService.globalDataRepo.update( + {value: 0}, + { + where: { + key: rewindTimestampKey, + value: rewindTimestamp, + }, + transaction: tx, + } + ); + assert( + affectedCount === 1, + `not found rewind timestamp key in global data, chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}` + ); + + if (rewindNum === 0) { + await this.storeService.globalDataRepo.destroy({where: {key: RewindLockKey}, transaction: tx}); + } + + // The current chain has completed the rewind, and we still need to wait for other chains to finish. + // When fully synchronized, set the status back to normal by pgListener. + this.status = RewindStatus.WaitOtherChain; + logger.info(`Rewind success chainId: ${JSON.stringify({rewindNum, chainId: this.chainId, rewindTimestamp})}`); + return rewindNum; + } + + /** + * Get the block header closest to the given timestamp + * @param timestamp To find the block closest to a given timestamp + * @returns undefined if the timestamp is less than the first block timestamp + */ + async getHeaderByBinarySearch(timestamp: Header['timestamp']): Promise
{ + assert(timestamp, 'getHeaderByBinarySearch `timestamp` is required'); + + let left = 0; + let {height: right} = await this.storeService.getLastProcessedBlock(); + + while (left < right) { + const mid = Math.floor((left + right) / 2); + const header = await this.blockchainService.getHeaderForHeight(mid); + assert(header.timestamp, 'getHeader return `timestamp` is undfined'); + + if (header.timestamp === timestamp) { + return header; + } else if (header.timestamp < timestamp) { + left = mid + 1; + } else { + right = mid; + } + } + + return left ? this.blockchainService.getHeaderForHeight(left) : ({blockHeight: 0} as Header); + } +} diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index c8736864ee..8867f5c7f0 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -2,28 +2,29 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import { isMainThread } from 'worker_threads'; -import { Inject } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { BaseDataSource, IProjectNetworkConfig } from '@subql/types-core'; -import { Sequelize } from '@subql/x-sequelize'; -import { IApi } from '../api.service'; -import { ICoreBlockchainService } from '../blockchain.service'; -import { IProjectUpgradeService, NodeConfig } from '../configure'; -import { IndexerEvent } from '../events'; -import { getLogger } from '../logger'; -import { exitWithError, monitorWrite } from '../process'; -import { getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex } from '../utils'; -import { BlockHeightMap } from '../utils/blockHeightMap'; -import { DsProcessorService } from './ds-processor.service'; -import { DynamicDsService } from './dynamic-ds.service'; -import { MetadataKeys } from './entities'; -import { PoiSyncService } from './poi'; -import { PoiService } from './poi/poi.service'; -import { StoreService } from './store.service'; -import { cacheProviderFlushData } from './storeModelProvider'; -import { ISubqueryProject, IProjectService, BypassBlocks, HistoricalMode, Header } from './types'; -import { IUnfinalizedBlocksService } from './unfinalizedBlocks.service'; +import {isMainThread} from 'worker_threads'; +import {Inject} from '@nestjs/common'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {BaseDataSource, IProjectNetworkConfig} from '@subql/types-core'; +import {Sequelize} from '@subql/x-sequelize'; +import {IApi} from '../api.service'; +import {ICoreBlockchainService} from '../blockchain.service'; +import {IProjectUpgradeService, NodeConfig} from '../configure'; +import {IndexerEvent} from '../events'; +import {getLogger} from '../logger'; +import {exitWithError, monitorWrite} from '../process'; +import {getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex} from '../utils'; +import {BlockHeightMap} from '../utils/blockHeightMap'; +import {DsProcessorService} from './ds-processor.service'; +import {DynamicDsService} from './dynamic-ds.service'; +import {MetadataKeys} from './entities'; +import {MultiChainRewindService} from './multiChainRewind.service'; +import {PoiSyncService} from './poi'; +import {PoiService} from './poi/poi.service'; +import {StoreService} from './store.service'; +import {cacheProviderFlushData} from './storeModelProvider'; +import {ISubqueryProject, IProjectService, BypassBlocks, HistoricalMode, Header} from './types'; +import {IUnfinalizedBlocksService} from './unfinalizedBlocks.service'; const logger = getLogger('Project'); @@ -36,8 +37,9 @@ class NotInitError extends Error { export class ProjectService< DS extends BaseDataSource = BaseDataSource, API extends IApi = IApi, - UnfinalizedBlocksService extends IUnfinalizedBlocksService = IUnfinalizedBlocksService -> implements IProjectService { + UnfinalizedBlocksService extends IUnfinalizedBlocksService = IUnfinalizedBlocksService, +> implements IProjectService +{ private _schema?: string; private _startHeight?: number; private _blockOffset?: number; @@ -55,7 +57,8 @@ export class ProjectService< private readonly dynamicDsService: DynamicDsService, private eventEmitter: EventEmitter2, @Inject('IUnfinalizedBlocksService') private readonly unfinalizedBlockService: UnfinalizedBlocksService, - @Inject('IBlockchainService') private blockchainService: ICoreBlockchainService + @Inject('IBlockchainService') private blockchainService: ICoreBlockchainService, + private multiChainRewindService: MultiChainRewindService ) { if (this.nodeConfig.unfinalizedBlocks && this.nodeConfig.allowSchemaMigration) { throw new Error('Unfinalized Blocks and Schema Migration cannot be enabled at the same time'); @@ -106,6 +109,7 @@ export class ProjectService< // Init metadata before rest of schema so we can determine the correct project version to create the schema await this.storeService.initCoreTables(this._schema); + await this.ensureMetadata(); // DynamicDsService is dependent on metadata so we need to ensure it exists first await this.dynamicDsService.init(this.storeService.modelProvider.metadata); @@ -134,10 +138,16 @@ export class ProjectService< void this.poiSyncService.syncPoi(undefined); } + const reindexMultiChain = await this.initMultiChainRewindService(); + const reindexedUpgrade = await this.initUpgradeService(this.startHeight); // Unfinalized is dependent on POI in some cases, it needs to be init after POI is init const reindexedUnfinalized = await this.initUnfinalizedInternal(); + if (reindexMultiChain !== undefined) { + this._startHeight = reindexMultiChain.blockHeight; + } + if (reindexedUnfinalized !== undefined) { this._startHeight = reindexedUnfinalized.blockHeight; } @@ -216,16 +226,16 @@ export class ProjectService< const existing = await metadata.findMany(keys); - const { chain, genesisHash, specName } = this.apiService.networkMeta; + const {chain, genesisHash, specName} = this.apiService.networkMeta; if (this.project.runner) { - const { node, query } = this.project.runner; + const {node, query} = this.project.runner; await metadata.setBulk([ - { key: 'runnerNode', value: node.name }, - { key: 'runnerNodeVersion', value: node.version }, - { key: 'runnerQuery', value: query.name }, - { key: 'runnerQueryVersion', value: query.version }, + {key: 'runnerNode', value: node.name}, + {key: 'runnerNodeVersion', value: node.version}, + {key: 'runnerQuery', value: query.name}, + {key: 'runnerQueryVersion', value: query.version}, ]); } if (!existing.genesisHash) { @@ -337,7 +347,7 @@ export class ProjectService< const nextProject = projects[i + 1][1]; nextMinStartHeight = Math.max( nextProject.dataSources - .filter((ds): ds is DS & { startBlock: number } => !!ds.startBlock) + .filter((ds): ds is DS & {startBlock: number} => !!ds.startBlock) .sort((a, b) => a.startBlock - b.startBlock)[0].startBlock, projects[i + 1][0] ); @@ -352,12 +362,12 @@ export class ProjectService< }[] = []; [...project.dataSources, ...dynamicDs] - .filter((ds): ds is DS & { startBlock: number } => { + .filter((ds): ds is DS & {startBlock: number} => { return !!ds.startBlock && (!nextMinStartHeight || nextMinStartHeight > ds.startBlock); }) .forEach((ds) => { - events.push({ block: Math.max(height, ds.startBlock), start: true, ds }); - if (ds.endBlock) events.push({ block: ds.endBlock + 1, start: false, ds }); + events.push({block: Math.max(height, ds.startBlock), start: true, ds}); + if (ds.endBlock) events.push({block: ds.endBlock + 1, start: false, ds}); }); // sort events by block in ascending order, start events come before end events @@ -439,6 +449,9 @@ export class ProjectService< } return undefined; } + private async initMultiChainRewindService(): Promise
{ + return this.multiChainRewindService.init(this.project.network.chainId, this.schema, this.reindex.bind(this)); + } private async handleProjectChange(): Promise { if (isMainThread && !this.nodeConfig.allowSchemaMigration) { @@ -464,12 +477,13 @@ export class ProjectService< return reindex( this.getStartBlockFromDataSources(), targetBlockHeader, - { height, timestamp }, + {height, timestamp}, this.storeService, this.unfinalizedBlockService, this.dynamicDsService, this.sequelize, this.projectUpgradeService, + this.multiChainRewindService, this.nodeConfig.proofOfIndex ? this.poiService : undefined /* Not providing force clean service, it should never be needed */ ); diff --git a/packages/node-core/src/indexer/store.service.ts b/packages/node-core/src/indexer/store.service.ts index a2307b0a82..79bb9203a0 100644 --- a/packages/node-core/src/indexer/store.service.ts +++ b/packages/node-core/src/indexer/store.service.ts @@ -34,8 +34,17 @@ import { } from '../db'; import {getLogger} from '../logger'; import {exitWithError} from '../process'; -import {camelCaseObjectKey, customCamelCaseGraphqlKey, getHistoricalUnit} from '../utils'; -import {MetadataFactory, MetadataRepo, PoiFactory, PoiFactoryDeprecate, PoiRepo} from './entities'; +import {camelCaseObjectKey, customCamelCaseGraphqlKey, getHistoricalUnit, hasValue} from '../utils'; +import { + generateRewindTimestampKey, + GlobalDataFactory, + GlobalDataRepo, + MetadataFactory, + MetadataRepo, + PoiFactory, + PoiFactoryDeprecate, + PoiRepo, +} from './entities'; import {Store} from './store'; import {IMetadata, IStoreModelProvider, PlainStoreModelService} from './storeModelProvider'; import {StoreOperations} from './StoreOperations'; @@ -63,6 +72,7 @@ export class StoreService { poiRepo?: PoiRepo; private _modelIndexedFields?: IndexField[]; private _modelsRelations?: GraphQLModelsRelationsEnums; + private _globalDataRepo?: GlobalDataRepo; private _metaDataRepo?: MetadataRepo; private _historical?: HistoricalMode; private _metadataModel?: IMetadata; @@ -104,6 +114,11 @@ export class StoreService { return this._operationStack; } + get globalDataRepo(): GlobalDataRepo { + assert(this._globalDataRepo, new NoInitError()); + return this._globalDataRepo; + } + get blockHeader(): Header { assert(this._blockHeader, new Error('StoreService.setBlockHeader has not been called')); return this._blockHeader; @@ -158,6 +173,10 @@ export class StoreService { this.subqueryProject.network.chainId ); + if (this.historical === 'timestamp') { + this._globalDataRepo = GlobalDataFactory(this.sequelize, schema); + } + this._schema = schema; await this.sequelize.sync(); @@ -172,6 +191,8 @@ export class StoreService { await this.initHotSchemaReloadQueries(schema); await this.metadataModel.set('historicalStateEnabled', this.historical); + + await this.initChainRewindTimestamp(); } async init(schema: string): Promise { @@ -477,6 +498,31 @@ group by // Cant throw here because even with historical disabled the current height is used by the store return getHistoricalUnit(this.historical, this.blockHeader); } + + private async getRewindTimestamp(): Promise { + const rewindTimestampKey = generateRewindTimestampKey(this.subqueryProject.network.chainId); + const record = await this.globalDataRepo.findByPk(rewindTimestampKey); + if (hasValue(record)) { + return record.toJSON().value as number; + } + } + + private async initChainRewindTimestamp() { + if (this.historical !== 'timestamp') return; + if (await this.getRewindTimestamp()) return; + + const rewindTimestampKey = generateRewindTimestampKey(this.subqueryProject.network.chainId); + await this.globalDataRepo.create({key: rewindTimestampKey, value: 0}); + } + + async getLastProcessedBlock(): Promise<{height: number; timestamp?: number}> { + const {lastProcessedBlockTimestamp: timestamp, lastProcessedHeight: height} = await this.metadataModel.findMany([ + 'lastProcessedHeight', + 'lastProcessedBlockTimestamp', + ]); + + return {height: height || 0, timestamp}; + } } // REMOVE 10,000 record per batch diff --git a/packages/node-core/src/subcommands/reindex.service.ts b/packages/node-core/src/subcommands/reindex.service.ts index 4899f3c2a8..126e384d6c 100644 --- a/packages/node-core/src/subcommands/reindex.service.ts +++ b/packages/node-core/src/subcommands/reindex.service.ts @@ -2,11 +2,11 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import { Inject, Injectable } from '@nestjs/common'; -import { BaseDataSource } from '@subql/types-core'; -import { Sequelize } from '@subql/x-sequelize'; -import { IBlockchainService } from '../blockchain.service'; -import { NodeConfig, ProjectUpgradeService } from '../configure'; +import {Inject, Injectable} from '@nestjs/common'; +import {BaseDataSource} from '@subql/types-core'; +import {Sequelize} from '@subql/x-sequelize'; +import {IBlockchainService} from '../blockchain.service'; +import {NodeConfig, ProjectUpgradeService} from '../configure'; import { IUnfinalizedBlocksService, StoreService, @@ -15,19 +15,20 @@ import { IMetadata, cacheProviderFlushData, Header, + MultiChainRewindService, } from '../indexer'; -import { DynamicDsService } from '../indexer/dynamic-ds.service'; -import { getLogger } from '../logger'; -import { exitWithError, monitorWrite } from '../process'; -import { getExistingProjectSchema, initDbSchema, reindex } from '../utils'; -import { ForceCleanService } from './forceClean.service'; +import {DynamicDsService} from '../indexer/dynamic-ds.service'; +import {getLogger} from '../logger'; +import {exitWithError, monitorWrite} from '../process'; +import {getExistingProjectSchema, initDbSchema, reindex} from '../utils'; +import {ForceCleanService} from './forceClean.service'; const logger = getLogger('Reindex'); @Injectable() export class ReindexService

{ private _metadataRepo?: IMetadata; - private _lastProcessedHeader?: { height: number; timestamp?: number }; + private _lastProcessedHeader?: {height: number; timestamp?: number}; constructor( private readonly sequelize: Sequelize, @@ -40,6 +41,7 @@ export class ReindexService

, @Inject('DynamicDsService') private readonly dynamicDsService: DynamicDsService, @Inject('IBlockchainService') private readonly blockchainService: IBlockchainService, + private readonly multiChainRewindService: MultiChainRewindService ) {} private get metadataRepo(): IMetadata { @@ -65,10 +67,7 @@ export class ReindexService

blockHeight <= inputHeight); + const bestBlocks = unfinalizedBlocks.filter(({blockHeight}) => blockHeight <= inputHeight); if (bestBlocks.length && inputHeight >= bestBlocks[0].blockHeight) { return bestBlocks[0]; } @@ -103,7 +102,7 @@ export class ReindexService

, sequelize: Sequelize, projectUpgradeService: IProjectUpgradeService, + multichainRewindService: MultiChainRewindService, poiService?: PoiService, forceCleanService?: ForceCleanService ): Promise { - const lastUnit = storeService.historical === 'timestamp' ? lastProcessed.timestamp : lastProcessed.height; + const isMultiChain = storeService.historical === 'timestamp'; + const lastUnit = isMultiChain ? lastProcessed.timestamp : lastProcessed.height; const targetUnit = getHistoricalUnit(storeService.historical, targetBlockHeader); if (!lastUnit || lastUnit < targetUnit) { @@ -63,7 +67,8 @@ export async function reindex( } // if startHeight is greater than the targetHeight, just force clean - if (targetBlockHeader.blockHeight < startHeight) { + // We prevent the entire data from being cleared due to multiple chains because the startblock is uncertain in multi-chain projects. + if (targetBlockHeader.blockHeight < startHeight && !isMultiChain) { logger.info( `targetHeight: ${targetBlockHeader.blockHeight} is less than startHeight: ${startHeight}. Hence executing force-clean` ); @@ -73,45 +78,50 @@ export async function reindex( // if DB need rollback? no, because forceCleanService will take care of it await cacheProviderResetData(storeService.modelProvider); await forceCleanService?.forceClean(); - } else { - logger.info(`Reindexing to ${storeService.historical}: ${targetUnit}`); - await cacheProviderFlushData(storeService.modelProvider, true); - await cacheProviderResetData(storeService.modelProvider); - if (storeService.modelProvider instanceof StoreCacheService) { - await storeService.modelProvider.flushData(true); - await storeService.modelProvider.resetData(); - } - const transaction = await sequelize.transaction(); - try { - /* + return; + } + + logger.info(`Reindexing to ${storeService.historical}: ${targetUnit}`); + if (isMultiChain) { + await multichainRewindService.setGlobalRewindLock(targetUnit); + } + + await cacheProviderFlushData(storeService.modelProvider, true); + await cacheProviderResetData(storeService.modelProvider); + if (storeService.modelProvider instanceof StoreCacheService) { + await storeService.modelProvider.flushData(true); + await storeService.modelProvider.resetData(); + } + const transaction = await sequelize.transaction(); + try { + /* Must initialize storeService, to ensure all models are loaded, as storeService.init has not been called at this point - 1. During runtime, model should be already been init - 2.1 On start, projectUpgrade rewind will sync the sequelize models - 2.2 On start, without projectUpgrade or upgradablePoint, sequelize will sync models through project.service + 1. During runtime, model should be already been init + 2.1 On start, projectUpgrade rewind will sync the sequelize models + 2.2 On start, without projectUpgrade or upgradablePoint, sequelize will sync models through project.service */ - await projectUpgradeService.rewind( - targetBlockHeader.blockHeight, - lastProcessed.height, - transaction, - storeService - ); + await projectUpgradeService.rewind(targetBlockHeader.blockHeight, lastProcessed.height, transaction, storeService); - await Promise.all([ - storeService.rewind(targetBlockHeader, transaction), - unfinalizedBlockService.resetUnfinalizedBlocks(), // TODO: may not needed for nonfinalized chains - unfinalizedBlockService.resetLastFinalizedVerifiedHeight(), // TODO: may not needed for nonfinalized chains - dynamicDsService.resetDynamicDatasource(targetBlockHeader.blockHeight, transaction), - poiService?.rewind(targetBlockHeader.blockHeight, transaction), - ]); - // Flush metadata changes from above Promise.all - await storeService.modelProvider.metadata.flush?.(transaction, targetUnit); + await Promise.all([ + storeService.rewind(targetBlockHeader, transaction), + unfinalizedBlockService.resetUnfinalizedBlocks(), // TODO: may not needed for nonfinalized chains + unfinalizedBlockService.resetLastFinalizedVerifiedHeight(), // TODO: may not needed for nonfinalized chains + dynamicDsService.resetDynamicDatasource(targetBlockHeader.blockHeight, transaction), + poiService?.rewind(targetBlockHeader.blockHeight, transaction), + ]); + // Flush metadata changes from above Promise.all + await storeService.modelProvider.metadata.flush?.(transaction, targetUnit); - await transaction.commit(); - logger.info('Reindex Success'); - } catch (err: any) { - logger.error(err, 'Reindexing failed'); - await transaction.rollback(); - throw err; + // release rewind lock + if (isMultiChain) { + await multichainRewindService.releaseChainRewindLock(transaction, targetUnit); } + + await transaction.commit(); + logger.info('Reindex Success'); + } catch (err: any) { + logger.error(err, 'Reindexing failed'); + await transaction.rollback(); + throw err; } } diff --git a/packages/node/src/subcommands/reindex.module.ts b/packages/node/src/subcommands/reindex.module.ts index faf76fe32a..917041daaa 100644 --- a/packages/node/src/subcommands/reindex.module.ts +++ b/packages/node/src/subcommands/reindex.module.ts @@ -17,6 +17,7 @@ import { DsProcessorService, UnfinalizedBlocksService, DynamicDsService, + MultiChainRewindService, } from '@subql/node-core'; import { Sequelize } from '@subql/x-sequelize'; import { BlockchainService } from '../blockchain.service'; @@ -65,6 +66,7 @@ import { RuntimeService } from '../indexer/runtime/runtimeService'; provide: 'IBlockchainService', useClass: BlockchainService, }, + MultiChainRewindService, SchedulerRegistry, ], controllers: [], From 419045151e099fe712ce707a09883ffc7093fdb8 Mon Sep 17 00:00:00 2001 From: Tate Date: Wed, 12 Feb 2025 15:08:47 +0000 Subject: [PATCH 2/9] fix some bug --- packages/node-core/src/indexer/core.module.ts | 2 -- packages/node-core/src/indexer/store.service.ts | 5 ++--- packages/node/src/indexer/fetch.module.ts | 2 ++ packages/node/src/indexer/worker/worker-fetch.module.ts | 2 ++ 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/packages/node-core/src/indexer/core.module.ts b/packages/node-core/src/indexer/core.module.ts index 3620e08d84..75cc9e1fcc 100644 --- a/packages/node-core/src/indexer/core.module.ts +++ b/packages/node-core/src/indexer/core.module.ts @@ -11,7 +11,6 @@ import {ConnectionPoolService} from './connectionPool.service'; import {ConnectionPoolStateManager} from './connectionPoolState.manager'; import {InMemoryCacheService} from './inMemoryCache.service'; import {MonitorService} from './monitor.service'; -import {MultiChainRewindService} from './multiChainRewind.service'; import {PoiService, PoiSyncService} from './poi'; import {SandboxService} from './sandbox.service'; import {StoreService} from './store.service'; @@ -34,7 +33,6 @@ import {storeModelFactory} from './storeModelProvider'; useFactory: storeModelFactory, inject: [NodeConfig, EventEmitter2, Sequelize], }, - MultiChainRewindService, AdminListener, ], controllers: [AdminController], diff --git a/packages/node-core/src/indexer/store.service.ts b/packages/node-core/src/indexer/store.service.ts index 79bb9203a0..80dda84231 100644 --- a/packages/node-core/src/indexer/store.service.ts +++ b/packages/node-core/src/indexer/store.service.ts @@ -173,7 +173,7 @@ export class StoreService { this.subqueryProject.network.chainId ); - if (this.historical === 'timestamp') { + if (this.config.historical === 'timestamp') { this._globalDataRepo = GlobalDataFactory(this.sequelize, schema); } @@ -509,8 +509,7 @@ group by private async initChainRewindTimestamp() { if (this.historical !== 'timestamp') return; - if (await this.getRewindTimestamp()) return; - + if ((await this.getRewindTimestamp()) !== undefined) return; const rewindTimestampKey = generateRewindTimestampKey(this.subqueryProject.network.chainId); await this.globalDataRepo.create({key: rewindTimestampKey, value: 0}); } diff --git a/packages/node/src/indexer/fetch.module.ts b/packages/node/src/indexer/fetch.module.ts index e64f3a80ee..057657cb26 100644 --- a/packages/node/src/indexer/fetch.module.ts +++ b/packages/node/src/indexer/fetch.module.ts @@ -23,6 +23,7 @@ import { WorkerBlockDispatcher, FetchService, DictionaryService, + MultiChainRewindService, } from '@subql/node-core'; import { SubstrateDatasource } from '@subql/types'; import { BlockchainService } from '../blockchain.service'; @@ -67,6 +68,7 @@ import { IIndexerWorker } from './worker/worker'; useClass: ProjectService, provide: 'IProjectService', }, + MultiChainRewindService, /* END: Move to node core */ IndexerManager, { diff --git a/packages/node/src/indexer/worker/worker-fetch.module.ts b/packages/node/src/indexer/worker/worker-fetch.module.ts index 8c4bc8ee29..8cbe249478 100644 --- a/packages/node/src/indexer/worker/worker-fetch.module.ts +++ b/packages/node/src/indexer/worker/worker-fetch.module.ts @@ -9,6 +9,7 @@ import { WorkerCoreModule, ProjectService, DsProcessorService, + MultiChainRewindService, } from '@subql/node-core'; import { BlockchainService } from '../../blockchain.service'; import { ApiService } from '../api.service'; @@ -48,6 +49,7 @@ import { WorkerService } from './worker.service'; provide: 'IBlockchainService', useClass: BlockchainService, }, + MultiChainRewindService, WorkerService, ], exports: [], From 56ea202968f33fb37db4ae186704e2a25343806c Mon Sep 17 00:00:00 2001 From: Tate Date: Fri, 14 Feb 2025 07:49:09 +0000 Subject: [PATCH 3/9] fix some bug --- packages/node-core/src/blockchain.service.ts | 1 + packages/node-core/src/db/sync-helper.ts | 7 +-- .../src/indexer/entities/GlobalData.entity.ts | 3 +- .../src/indexer/multiChainRewind.service.ts | 60 ++++++++++++++----- .../node-core/src/indexer/project.service.ts | 10 ++-- packages/node/src/blockchain.service.spec.ts | 6 ++ packages/node/src/blockchain.service.ts | 37 ++++++++++++ 7 files changed, 97 insertions(+), 27 deletions(-) diff --git a/packages/node-core/src/blockchain.service.ts b/packages/node-core/src/blockchain.service.ts index 419e431801..23f6b9614a 100644 --- a/packages/node-core/src/blockchain.service.ts +++ b/packages/node-core/src/blockchain.service.ts @@ -58,6 +58,7 @@ export interface IBlockchainService< // Unfinalized blocks getHeaderForHash(hash: string): Promise

; getHeaderForHeight(height: number): Promise
; + getRequiredHeaderForHeight(height: number): Promise>; // Dynamic Ds sevice /** diff --git a/packages/node-core/src/db/sync-helper.ts b/packages/node-core/src/db/sync-helper.ts index b40ded6a2a..6e93bb1de0 100644 --- a/packages/node-core/src/db/sync-helper.ts +++ b/packages/node-core/src/db/sync-helper.ts @@ -17,7 +17,6 @@ import { } from '@subql/x-sequelize'; import {ModelAttributeColumnReferencesOptions, ModelIndexesOptions} from '@subql/x-sequelize/types/model'; import {MultiChainRewindEvent} from '../events'; -import {RewindLockKey} from '../indexer'; import {EnumType} from '../utils'; import {formatAttributes, generateIndexName, modelToTableName} from './sequelizeUtil'; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -307,8 +306,8 @@ export function createRewindTrigger(schema: string): string { AFTER INSERT OR UPDATE OR DELETE ON "${schema}"."_global" FOR EACH ROW - WHEN ( new.key = '${RewindLockKey}') - EXECUTE FUNCTION "${schema}".rewind_notification();`; + EXECUTE FUNCTION "${schema}".rewind_notification(); + `; } export function createRewindTriggerFunction(schema: string): string { @@ -323,7 +322,7 @@ export function createRewindTriggerFunction(schema: string): string { END IF; -- During a rollback, there is a chain that needs to be rolled back to an earlier height. - IF TG_OP = 'UPDATE' AND (NEW.value ->> 'timestamp')::int < (OLD.value ->> 'timestamp')::int THEN + IF TG_OP = 'UPDATE' AND (NEW.value ->> 'timestamp')::BIGINT < (OLD.value ->> 'timestamp')::BIGINT THEN PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindTimestampDecreased}'); END IF; diff --git a/packages/node-core/src/indexer/entities/GlobalData.entity.ts b/packages/node-core/src/indexer/entities/GlobalData.entity.ts index c7c7456de4..1288751ae5 100644 --- a/packages/node-core/src/indexer/entities/GlobalData.entity.ts +++ b/packages/node-core/src/indexer/entities/GlobalData.entity.ts @@ -11,8 +11,7 @@ export type RewindTimestampKey = `${typeof RewindTimestampKeyPrefix}_${string}`; export type RewindLockInfo = { /** Timestamp to rewind to. */ timestamp: number; - /** Number of additional chains to rewind. */ - rewindNum: number; + chainNum: number; }; export interface GlobalDataKeys { rewindLock: RewindLockInfo; diff --git a/packages/node-core/src/indexer/multiChainRewind.service.ts b/packages/node-core/src/indexer/multiChainRewind.service.ts index 085b5aeda7..e918b49028 100644 --- a/packages/node-core/src/indexer/multiChainRewind.service.ts +++ b/packages/node-core/src/indexer/multiChainRewind.service.ts @@ -10,7 +10,7 @@ import dayjs from 'dayjs'; import {Pool, PoolClient} from 'pg'; import {IBlockchainService} from '../blockchain.service'; import {NodeConfig} from '../configure'; -import {getPgPoolConfig} from '../db'; +import {createRewindTrigger, createRewindTriggerFunction, getPgPoolConfig, getTriggers} from '../db'; import {MultiChainRewindEvent, MultiChainRewindPayload} from '../events'; import {getLogger} from '../logger'; import { @@ -62,8 +62,9 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl private _status: RewindStatus = RewindStatus.Normal; private _chainId?: string; private _dbSchema?: string; - waitRewindHeader?: Header; + private _rewindTriggerName?: string; private pgListener?: PoolClient; + waitRewindHeader?: Required
; constructor( private nodeConfig: NodeConfig, private eventEmitter: EventEmitter2, @@ -89,6 +90,14 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl private set dbSchema(dbSchema: string) { this._dbSchema = dbSchema; } + private set rewindTriggerName(rewindTriggerName: string) { + this._rewindTriggerName = rewindTriggerName; + } + + get rewindTriggerName(): string { + assert(this._rewindTriggerName, 'rewindTriggerName is not set'); + return this._rewindTriggerName; + } private set status(status: RewindStatus) { this._status = status; @@ -105,8 +114,16 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl async init(chainId: string, dbSchema: string, reindex: (targetHeader: Header) => Promise) { this.chainId = chainId; this.dbSchema = dbSchema; + this.rewindTriggerName = hashName(this.dbSchema, 'rewind_trigger', '_global'); if (this.storeService.historical === 'timestamp') { + await this.sequelize.query(`${createRewindTriggerFunction(this.dbSchema)}`); + + const rewindTriggers = await getTriggers(this.sequelize, this.rewindTriggerName); + if (rewindTriggers.length === 0) { + await this.sequelize.query(`${createRewindTrigger(this.dbSchema)}`); + } + // Register a listener and create a schema notification sending function. await this.registerPgListener(); @@ -155,7 +172,8 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl }); }); - await this.pgListener.query(`LISTEN "${hashName(this.dbSchema, 'rewind_trigger', '_global')}"`); + await this.pgListener.query(`LISTEN "${this.rewindTriggerName}"`); + logger.info(`Register rewind listener success, chainId: ${this.chainId}`); // Check whether the current state is in rollback. const {rewindLock, rewindTimestamp} = await this.getGlobalRewindStatus(); @@ -222,7 +240,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl SET "key" = EXCLUDED."key", "value" = EXCLUDED."value", "updatedAt" = EXCLUDED."updatedAt" - WHERE "key" = '${RewindLockKey}' AND ("value"->>'timestamp')::int > ${rewindTimestamp}`, + WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT > ${rewindTimestamp}`, { type: QueryTypes.INSERT, transaction: tx, @@ -262,10 +280,10 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl SET value = jsonb_set( value, '{chainNum}', - to_jsonb(COALESCE((value ->> 'chainNum')::int, 0) - 1), + to_jsonb(COALESCE(("${globalTable}"."value" ->> 'chainNum')::BIGINT, 0) - 1), false ) - WHERE "key" = '${RewindLockKey}' AND ("value"->>'timestamp')::int = ${rewindTimestamp} + WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT = ${rewindTimestamp} RETURNING value`, { type: QueryTypes.SELECT, @@ -280,7 +298,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl ); return 0; } - const rewindNum = results[0].value.rewindNum; + const chainNum = results[0].value.chainNum; const rewindTimestampKey = generateRewindTimestampKey(this.chainId); const [affectedCount] = await this.storeService.globalDataRepo.update( @@ -298,32 +316,32 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl `not found rewind timestamp key in global data, chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}` ); - if (rewindNum === 0) { + if (chainNum === 0) { await this.storeService.globalDataRepo.destroy({where: {key: RewindLockKey}, transaction: tx}); } // The current chain has completed the rewind, and we still need to wait for other chains to finish. // When fully synchronized, set the status back to normal by pgListener. this.status = RewindStatus.WaitOtherChain; - logger.info(`Rewind success chainId: ${JSON.stringify({rewindNum, chainId: this.chainId, rewindTimestamp})}`); - return rewindNum; + logger.info(`Rewind success chainId: ${JSON.stringify({chainNum, chainId: this.chainId, rewindTimestamp})}`); + return chainNum; } /** * Get the block header closest to the given timestamp * @param timestamp To find the block closest to a given timestamp - * @returns undefined if the timestamp is less than the first block timestamp + * @returns */ - async getHeaderByBinarySearch(timestamp: Header['timestamp']): Promise
{ + async getHeaderByBinarySearch(timestamp: Header['timestamp']): Promise> { assert(timestamp, 'getHeaderByBinarySearch `timestamp` is required'); let left = 0; let {height: right} = await this.storeService.getLastProcessedBlock(); - + let searchNum = 0; while (left < right) { + searchNum++; const mid = Math.floor((left + right) / 2); - const header = await this.blockchainService.getHeaderForHeight(mid); - assert(header.timestamp, 'getHeader return `timestamp` is undfined'); + const header = await this.blockchainService.getRequiredHeaderForHeight(mid); if (header.timestamp === timestamp) { return header; @@ -334,6 +352,16 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl } } - return left ? this.blockchainService.getHeaderForHeight(left) : ({blockHeight: 0} as Header); + const targetHeader = left + ? await this.blockchainService.getRequiredHeaderForHeight(left) + : { + blockHash: '', + blockHeight: 0, + parentHash: '', + timestamp, + }; + logger.info(`Binary search times: ${searchNum}, target Header: ${JSON.stringify(targetHeader)}`); + + return targetHeader; } } diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index 8867f5c7f0..9be824f0e1 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -138,15 +138,11 @@ export class ProjectService< void this.poiSyncService.syncPoi(undefined); } - const reindexMultiChain = await this.initMultiChainRewindService(); - const reindexedUpgrade = await this.initUpgradeService(this.startHeight); // Unfinalized is dependent on POI in some cases, it needs to be init after POI is init const reindexedUnfinalized = await this.initUnfinalizedInternal(); - if (reindexMultiChain !== undefined) { - this._startHeight = reindexMultiChain.blockHeight; - } + const reindexMultiChain = await this.initMultiChainRewindService(); if (reindexedUnfinalized !== undefined) { this._startHeight = reindexedUnfinalized.blockHeight; @@ -156,6 +152,10 @@ export class ProjectService< this._startHeight = reindexedUpgrade; } + if (reindexMultiChain !== undefined) { + this._startHeight = reindexMultiChain.blockHeight; + } + // Flush any pending operations to set up DB await cacheProviderFlushData(this.storeService.modelProvider, true); } else { diff --git a/packages/node/src/blockchain.service.spec.ts b/packages/node/src/blockchain.service.spec.ts index 9e8652d900..57589b9ba6 100644 --- a/packages/node/src/blockchain.service.spec.ts +++ b/packages/node/src/blockchain.service.spec.ts @@ -55,4 +55,10 @@ describe('BlockchainService', () => { const interval = await blockchainService.getChainInterval(); expect(interval).toEqual(5000); }); + + it('can get the chain create time', async () => { + const requiredHeader = + await blockchainService.getRequiredHeaderForHeight(24723095); + expect(requiredHeader.timestamp.getTime()).toEqual(1739501268001); + }); }); diff --git a/packages/node/src/blockchain.service.ts b/packages/node/src/blockchain.service.ts index 374bdac380..405133c491 100644 --- a/packages/node/src/blockchain.service.ts +++ b/packages/node/src/blockchain.service.ts @@ -132,6 +132,23 @@ export class BlockchainService return substrateHeaderToHeader(finalizedHeader); } + // async test(blockHeight: number): Promise { + // // 连接到 Polkadot 节点 + // const wsProvider = new WsProvider('wss://rpc.polkadot.io'); + // const api = await ApiPromise.create({ provider: wsProvider }); + + // // 获取区块 + // const blockHash = await api.rpc.chain.getBlockHash(blockHeight); + + // // 获取区块的时间戳 + // const block = await api.rpc.chain.getBlock(blockHash); + // const timestamp = await api.at(blockHash); + + // console.log(`Block #${blockHeight} timestamp: ${timestamp.toString()}`); + // api.disconnect(); + // return; + // } + async getBestHeight(): Promise { const bestHeader = await this.apiService.unsafeApi.rpc.chain.getHeader(); return bestHeader.number.toNumber(); @@ -160,6 +177,26 @@ export class BlockchainService return this.getHeaderForHash(hash.toHex()); } + @mainThreadOnly() + async getRequiredHeaderForHeight(height: number): Promise> { + const blockHeader = await this.getHeaderForHeight(height); + + let timestamp: Date | undefined = blockHeader.timestamp; + + if (!timestamp) { + const blockTimestamp = await ( + await this.apiService.unsafeApi.at(blockHeader.blockHash) + ).query.timestamp.now(); + + timestamp = new Date(blockTimestamp.toNumber()); + } + + return { + ...blockHeader, + timestamp, + }; + } + // eslint-disable-next-line @typescript-eslint/require-await async updateDynamicDs( params: DatasourceParams, From 2694b55f23a8dd06e6f675b1ec0ea775843ad332 Mon Sep 17 00:00:00 2001 From: Tate Date: Sat, 1 Mar 2025 06:24:44 +0000 Subject: [PATCH 4/9] multiChain judgment logic --- packages/node-core/src/indexer/store.service.ts | 4 ++-- packages/node/src/blockchain.service.ts | 17 ----------------- 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/packages/node-core/src/indexer/store.service.ts b/packages/node-core/src/indexer/store.service.ts index 80dda84231..b4be1580a8 100644 --- a/packages/node-core/src/indexer/store.service.ts +++ b/packages/node-core/src/indexer/store.service.ts @@ -173,7 +173,7 @@ export class StoreService { this.subqueryProject.network.chainId ); - if (this.config.historical === 'timestamp') { + if (this.config.multiChain) { this._globalDataRepo = GlobalDataFactory(this.sequelize, schema); } @@ -508,7 +508,7 @@ group by } private async initChainRewindTimestamp() { - if (this.historical !== 'timestamp') return; + if (!this.config.multiChain) return; if ((await this.getRewindTimestamp()) !== undefined) return; const rewindTimestampKey = generateRewindTimestampKey(this.subqueryProject.network.chainId); await this.globalDataRepo.create({key: rewindTimestampKey, value: 0}); diff --git a/packages/node/src/blockchain.service.ts b/packages/node/src/blockchain.service.ts index 405133c491..5f632e1a30 100644 --- a/packages/node/src/blockchain.service.ts +++ b/packages/node/src/blockchain.service.ts @@ -132,23 +132,6 @@ export class BlockchainService return substrateHeaderToHeader(finalizedHeader); } - // async test(blockHeight: number): Promise { - // // 连接到 Polkadot 节点 - // const wsProvider = new WsProvider('wss://rpc.polkadot.io'); - // const api = await ApiPromise.create({ provider: wsProvider }); - - // // 获取区块 - // const blockHash = await api.rpc.chain.getBlockHash(blockHeight); - - // // 获取区块的时间戳 - // const block = await api.rpc.chain.getBlock(blockHash); - // const timestamp = await api.at(blockHash); - - // console.log(`Block #${blockHeight} timestamp: ${timestamp.toString()}`); - // api.disconnect(); - // return; - // } - async getBestHeight(): Promise { const bestHeader = await this.apiService.unsafeApi.rpc.chain.getHeader(); return bestHeader.number.toNumber(); From c5e798a6fcc97c723a885ccd9b72b27379c6068e Mon Sep 17 00:00:00 2001 From: Tate Date: Sat, 1 Mar 2025 06:36:34 +0000 Subject: [PATCH 5/9] fix rewind tirrger --- packages/node-core/src/db/sync-helper.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/packages/node-core/src/db/sync-helper.ts b/packages/node-core/src/db/sync-helper.ts index 6e93bb1de0..d8e5fb8764 100644 --- a/packages/node-core/src/db/sync-helper.ts +++ b/packages/node-core/src/db/sync-helper.ts @@ -17,6 +17,7 @@ import { } from '@subql/x-sequelize'; import {ModelAttributeColumnReferencesOptions, ModelIndexesOptions} from '@subql/x-sequelize/types/model'; import {MultiChainRewindEvent} from '../events'; +import {RewindLockKey} from '../indexer'; import {EnumType} from '../utils'; import {formatAttributes, generateIndexName, modelToTableName} from './sequelizeUtil'; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -316,7 +317,20 @@ export function createRewindTriggerFunction(schema: string): string { return ` CREATE OR REPLACE FUNCTION "${schema}".rewind_notification() RETURNS trigger AS $$ + DECLARE + key_value TEXT; BEGIN + IF TG_OP = 'DELETE' THEN + key_value := OLD.value ->> 'key'; + ELSE + key_value := NEW.value ->> 'key'; + END IF; + + -- Make sure it’s RewindLockKey + IF key_value <> '${RewindLockKey}' THEN + RETURN NULL; + END IF; + IF TG_OP = 'INSERT' THEN PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.Rewind}'); END IF; From 8be26fa68fa2ba1292094a3eac8769ca4fc83fa3 Mon Sep 17 00:00:00 2001 From: Tate Date: Sun, 2 Mar 2025 14:40:49 +0000 Subject: [PATCH 6/9] fixed some bug --- packages/node-core/src/blockchain.service.ts | 6 +- .../src/configure/SubqueryProject.ts | 2 +- packages/node-core/src/db/sync-helper.test.ts | 82 ++++++++++++++++++- .../blockDispatcher/base-block-dispatcher.ts | 1 + .../node-core/src/indexer/fetch.service.ts | 25 +++--- .../src/indexer/multiChainRewind.service.ts | 47 +++++++---- .../src/indexer/project.service.spec.ts | 19 ++++- packages/node-core/src/indexer/types.ts | 2 +- packages/node-core/src/utils/project.ts | 10 +-- packages/node/src/blockchain.service.ts | 19 ++++- .../node/src/indexer/project.service.spec.ts | 18 +++- 11 files changed, 181 insertions(+), 50 deletions(-) diff --git a/packages/node-core/src/blockchain.service.ts b/packages/node-core/src/blockchain.service.ts index 064e3367b3..ac756ac4d7 100644 --- a/packages/node-core/src/blockchain.service.ts +++ b/packages/node-core/src/blockchain.service.ts @@ -13,8 +13,8 @@ export interface ICoreBlockchainService< // Project service onProjectChange(project: SubQueryProject): Promise | void; - /* Not all networks have a block timestamp, e.g. Shiden */ - getBlockTimestamp(height: number): Promise; + /* Not all networks have a block timestamp, e.g. Shiden need to request one more get */ + getBlockTimestamp(height: number): Promise; } export interface IBlockchainService< @@ -58,7 +58,7 @@ export interface IBlockchainService< // Unfinalized blocks getHeaderForHash(hash: string): Promise
; getHeaderForHeight(height: number): Promise
; - getRequiredHeaderForHeight(height: number): Promise>; + getRequiredHeaderForHeight(height: number): Promise
; // Dynamic Ds sevice /** diff --git a/packages/node-core/src/configure/SubqueryProject.ts b/packages/node-core/src/configure/SubqueryProject.ts index e328a6d66d..e23b53c05d 100644 --- a/packages/node-core/src/configure/SubqueryProject.ts +++ b/packages/node-core/src/configure/SubqueryProject.ts @@ -124,7 +124,7 @@ export class BaseSubqueryProject< return this.#dataSources; } - async applyCronTimestamps(getTimestamp: (height: number) => Promise): Promise { + async applyCronTimestamps(getTimestamp: (height: number) => Promise): Promise { this.#dataSources = await insertBlockFiltersCronSchedules( this.dataSources, getTimestamp, diff --git a/packages/node-core/src/db/sync-helper.test.ts b/packages/node-core/src/db/sync-helper.test.ts index 651356ddc7..84b55eb66b 100644 --- a/packages/node-core/src/db/sync-helper.test.ts +++ b/packages/node-core/src/db/sync-helper.test.ts @@ -4,10 +4,19 @@ import {INestApplication} from '@nestjs/common'; import {Test} from '@nestjs/testing'; import {delay} from '@subql/common'; +import {hashName} from '@subql/utils'; import {Sequelize} from '@subql/x-sequelize'; import {NodeConfig} from '../configure/NodeConfig'; +import {MultiChainRewindEvent} from '../events'; +import {RewindLockKey} from '../indexer'; import {DbModule} from './db.module'; -import {createSendNotificationTriggerFunction, createNotifyTrigger, getDbSizeAndUpdateMetadata} from './sync-helper'; +import { + createSendNotificationTriggerFunction, + createNotifyTrigger, + getDbSizeAndUpdateMetadata, + createRewindTriggerFunction, + createRewindTrigger, +} from './sync-helper'; const nodeConfig = new NodeConfig({subquery: 'packages/node-core/test/v1.0.0', subqueryName: 'test'}); @@ -185,7 +194,74 @@ describe('sync helper test', () => { }, 10_000); }); - describe('rewind lock', () => { - // TODO + describe('Multi-chain notification', () => { + let client: unknown; + schema = 'multi-chain-test'; + const listenerHash = hashName(schema, 'rewind_trigger', '_global'); + + afterEach(async () => { + if (client) { + await (client as any).query(`UNLISTEN "${listenerHash}"`); + (client as any).removeAllListeners('notification'); + sequelize.connectionManager.releaseConnection(client); + } + }); + + it('can handle multiple rows in one transaction', async () => { + const module = await Test.createTestingModule({ + imports: [DbModule.forRootWithConfig(nodeConfig)], + }).compile(); + app = module.createNestApplication(); + await app.init(); + sequelize = app.get(Sequelize); + await sequelize.createSchema(schema, {}); + // mock create global table + await sequelize.query(` + CREATE TABLE IF NOT EXISTS "${schema}"._global ( + key VARCHAR(255) NOT NULL PRIMARY KEY, + value JSONB, + "createdAt" TIMESTAMP WITH TIME ZONE NOT NULL, + "updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL + )`); + + await sequelize.query(createRewindTriggerFunction(schema)); + await sequelize.query(createRewindTrigger(schema)); + + client = await sequelize.connectionManager.getConnection({ + type: 'read', + }); + await (client as any).query(`LISTEN "${listenerHash}"`); + + const listener = jest.fn(); + (client as any).on('notification', (msg: any) => { + console.log('Payload:', msg.payload); + listener(msg.payload); + }); + + const rewindSqlFromTimestamp = ( + timestamp: number + ) => `INSERT INTO "${schema}"."_global" ( "key", "value", "createdAt", "updatedAt" ) + VALUES + ( 'rewindLock', '{"timestamp":${timestamp},"chainNum":1}', now(), now()) + ON CONFLICT ( "key" ) + DO UPDATE + SET "key" = EXCLUDED."key", + "value" = EXCLUDED."value", + "updatedAt" = EXCLUDED."updatedAt" + WHERE "_global"."key" = '${RewindLockKey}' AND ("_global"."value"->>'timestamp')::BIGINT > ${timestamp}`; + const rewindTimestamp = 1597669506000; + await sequelize.query(rewindSqlFromTimestamp(rewindTimestamp)); + await delay(1); + + await sequelize.query(rewindSqlFromTimestamp(rewindTimestamp - 1)); + await delay(1); + + await sequelize.query(`DELETE FROM "${schema}"."_global" WHERE "key" = '${RewindLockKey}'`); + await delay(1); + expect(listener).toHaveBeenCalledTimes(3); + expect(listener).toHaveBeenNthCalledWith(1, MultiChainRewindEvent.Rewind); + expect(listener).toHaveBeenNthCalledWith(2, MultiChainRewindEvent.RewindTimestampDecreased); + expect(listener).toHaveBeenNthCalledWith(3, MultiChainRewindEvent.RewindComplete); + }, 20_000); }); }); diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index 6a7030b062..660e259da2 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -34,6 +34,7 @@ export interface IBlockDispatcher { latestBufferedHeight: number; batchSize: number; + setLatestProcessedHeight(height: number): void; // Remove all enqueued blocks, used when a dynamic ds is created flushQueue(height: number): void; } diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index ffb480eb83..c180c1884f 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -16,7 +16,7 @@ import {IBlockDispatcher} from './blockDispatcher'; import {mergeNumAndBlocksToNums} from './dictionary'; import {DictionaryService} from './dictionary/dictionary.service'; import {mergeNumAndBlocks} from './dictionary/utils'; -import {IMultiChainHandler, MultiChainRewindService, RewindStatus} from './multiChainRewind.service'; +import {IMultiChainHandler, IMultiChainRewindService, RewindStatus} from './multiChainRewind.service'; import {IStoreModelProvider} from './storeModelProvider'; import {BypassBlocks, IBlock, IProjectService} from './types'; import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service'; @@ -41,7 +41,7 @@ export class FetchService, - private multiChainRewindService: MultiChainRewindService + private multiChainRewindService: IMultiChainRewindService ) {} private get latestBestHeight(): number { @@ -199,9 +199,18 @@ export class FetchService; + waitRewindHeader?: Header; constructor( private nodeConfig: NodeConfig, private eventEmitter: EventEmitter2, @@ -82,7 +89,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl return this._chainId; } - get dbSchema(): string { + private get dbSchema(): string { assert(this._dbSchema, 'dbSchema is not set'); return this._dbSchema; } @@ -94,7 +101,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl this._rewindTriggerName = rewindTriggerName; } - get rewindTriggerName(): string { + private get rewindTriggerName(): string { assert(this._rewindTriggerName, 'rewindTriggerName is not set'); return this._rewindTriggerName; } @@ -103,7 +110,8 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl this._status = status; } - getStatus(): RewindStatus { + get status() { + assert(this._status, 'status is not set'); return this._status; } @@ -116,7 +124,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl this.dbSchema = dbSchema; this.rewindTriggerName = hashName(this.dbSchema, 'rewind_trigger', '_global'); - if (this.storeService.historical === 'timestamp') { + if (this.nodeConfig.multiChain) { await this.sequelize.query(`${createRewindTriggerFunction(this.dbSchema)}`); const rewindTriggers = await getTriggers(this.sequelize, this.rewindTriggerName); @@ -140,8 +148,9 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl // Creating a new pgClient is to avoid using the same database connection as the block scheduler, // which may prevent real-time listening to rollback events. - const pgPool = new Pool(getPgPoolConfig(this.nodeConfig)); - this.pgListener = await pgPool.connect(); + this.pgListener = (await this.sequelize.connectionManager.getConnection({ + type: 'read', + })) as PoolClient; this.pgListener.on('notification', (msg) => { Promise.resolve().then(async () => { @@ -150,9 +159,9 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl switch (eventType) { case MultiChainRewindEvent.Rewind: case MultiChainRewindEvent.RewindTimestampDecreased: { - this.status = RewindStatus.Rewinding; const {rewindTimestamp} = await this.getGlobalRewindStatus(); - this.waitRewindHeader = await this.getHeaderByBinarySearch(dayjs(rewindTimestamp).toDate()); + this.waitRewindHeader = await this.searchWaitRewindHeader(rewindTimestamp); + this.status = RewindStatus.Rewinding; // Trigger the rewind event, and let the fetchService listen to the message and handle the queueFlush. this.eventEmitter.emit(eventType, { @@ -162,8 +171,8 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl } case MultiChainRewindEvent.RewindComplete: // recover indexing status - this.status = RewindStatus.Normal; this.waitRewindHeader = undefined; + this.status = RewindStatus.Normal; break; default: throw new Error(`Unknown rewind event: ${eventType}`); @@ -182,10 +191,18 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl } if (rewindTimestamp) { this.status = RewindStatus.Rewinding; - this.waitRewindHeader = await this.getHeaderByBinarySearch(dayjs(rewindTimestamp).toDate()); + this.waitRewindHeader = await this.searchWaitRewindHeader(rewindTimestamp); } } + private async searchWaitRewindHeader(rewindTimestamp: number): Promise
{ + const rewindDate = dayjs(rewindTimestamp).toDate(); + const rewindBlockHeader = await this.getHeaderByBinarySearch(rewindDate); + // The blockHeader.timestamp obtained from the query cannot be used directly, as it will cause an infinite loop. + // Different chains have timestamp discrepancies, which will result in infinite backward tracing. + return {...rewindBlockHeader, timestamp: rewindDate}; + } + /** * Serialize the rewind lock * @param rewindTimestamp ms diff --git a/packages/node-core/src/indexer/project.service.spec.ts b/packages/node-core/src/indexer/project.service.spec.ts index 17212b79d7..595839da2c 100644 --- a/packages/node-core/src/indexer/project.service.spec.ts +++ b/packages/node-core/src/indexer/project.service.spec.ts @@ -54,8 +54,8 @@ class TestBlockchainService implements IBlockchainService { // throw new Error('Method onProjectChange not implemented.'); } // eslint-disable-next-line @typescript-eslint/promise-function-async - getBlockTimestamp(height: number): Promise { - return Promise.resolve(undefined); + getBlockTimestamp(height: number): Promise { + return Promise.resolve(new Date()); } getBlockSize(block: IBlock): number { return 0; @@ -112,6 +112,15 @@ class TestBlockchainService implements IBlockchainService { timestamp: new Date(), }; } + // eslint-disable-next-line @typescript-eslint/require-await + async getRequiredHeaderForHeight(height: number): Promise
{ + return { + blockHeight: height, + blockHash: `b${height}`, + parentHash: `b${height - 1}`, + timestamp: new Date(), + }; + } } describe('BaseProjectService', () => { @@ -131,7 +140,8 @@ describe('BaseProjectService', () => { {getDynamicDatasources: jest.fn()} as unknown as DynamicDsService, null as unknown as any, null as unknown as any, - new TestBlockchainService() + new TestBlockchainService(), + null as unknown as any ); }); @@ -424,7 +434,8 @@ describe('BaseProjectService', () => { } as unknown as DynamicDsService, // dynamicDsService new EventEmitter2(), // eventEmitter new UnfinalizedBlocksService(nodeConfig, storeService.modelProvider, blockchainService), // unfinalizedBlocksService - blockchainService + blockchainService, + {init: jest.fn()} as any // MultiChainRewindService ); }; diff --git a/packages/node-core/src/indexer/types.ts b/packages/node-core/src/indexer/types.ts index c3b3cd420b..4af38c2642 100644 --- a/packages/node-core/src/indexer/types.ts +++ b/packages/node-core/src/indexer/types.ts @@ -27,7 +27,7 @@ export interface ISubqueryProject< C = unknown, > extends Omit, 'schema' | 'version' | 'name' | 'specVersion' | 'description'> { readonly schema: GraphQLSchema; - applyCronTimestamps: (getBlockTimestamp: (height: number) => Promise) => Promise; + applyCronTimestamps: (getBlockTimestamp: (height: number) => Promise) => Promise; readonly id: string; chainTypes?: C; // The chainTypes after loaded readonly root: string; diff --git a/packages/node-core/src/utils/project.ts b/packages/node-core/src/utils/project.ts index f5fbd86bef..b724978daa 100644 --- a/packages/node-core/src/utils/project.ts +++ b/packages/node-core/src/utils/project.ts @@ -232,7 +232,7 @@ export type IsRuntimeDs = (ds: DS) => ds is DS; // eslint-disable-next-line @typescript-eslint/require-await export async function insertBlockFiltersCronSchedules( dataSources: DS[], - getBlockTimestamp: (height: number) => Promise, + getBlockTimestamp: (height: number) => Promise, isRuntimeDs: IsRuntimeDs, blockHandlerKind: string ): Promise { @@ -248,13 +248,7 @@ export async function insertBlockFiltersCronSchedules { + async getBlockTimestamp(height: number): Promise { const block = await getBlockByHeight(this.apiService.api, height); - return getTimestamp(block); + + let timestamp = getTimestamp(block); + if (!timestamp) { + // Not all networks have a block timestamp, e.g. Shiden + const blockTimestamp = await ( + await this.apiService.unsafeApi.at(block.hash) + ).query.timestamp.now(); + + timestamp = new Date(blockTimestamp.toNumber()); + } + + return timestamp; } getBlockSize(block: IBlock): number { @@ -161,7 +172,9 @@ export class BlockchainService } @mainThreadOnly() - async getRequiredHeaderForHeight(height: number): Promise> { + async getRequiredHeaderForHeight( + height: number, + ): Promise
{ const blockHeader = await this.getHeaderForHeight(height); let timestamp: Date | undefined = blockHeader.timestamp; diff --git a/packages/node/src/indexer/project.service.spec.ts b/packages/node/src/indexer/project.service.spec.ts index 36c2e7f3dd..98787a5737 100644 --- a/packages/node/src/indexer/project.service.spec.ts +++ b/packages/node/src/indexer/project.service.spec.ts @@ -12,6 +12,7 @@ import { upgradableSubqueryProject, DsProcessorService, DynamicDsService, + MultiChainRewindService, } from '@subql/node-core'; import { SubstrateDatasourceKind, SubstrateHandlerKind } from '@subql/types'; import { GraphQLSchema } from 'graphql'; @@ -145,6 +146,7 @@ describe('ProjectService', () => { apiService: ApiService, project: SubqueryProject, blockchainService: BlockchainService, + multiChainRewindService: MultiChainRewindService, ) => new TestProjectService( { @@ -172,14 +174,25 @@ describe('ProjectService', () => { null as unknown as any, null as unknown as any, blockchainService, + multiChainRewindService, ), - inject: ['APIService', 'ISubqueryProject', 'IBlockchainService'], + inject: [ + 'APIService', + 'ISubqueryProject', + 'IBlockchainService', + MultiChainRewindService, + ], }, EventEmitter2, { provide: 'APIService', useFactory: ApiService.init, - inject: ['ISubqueryProject', ConnectionPoolService, EventEmitter2, NodeConfig] + inject: [ + 'ISubqueryProject', + ConnectionPoolService, + EventEmitter2, + NodeConfig, + ], }, { provide: ProjectUpgradeService, @@ -194,6 +207,7 @@ describe('ProjectService', () => { provide: 'IBlockchainService', useClass: BlockchainService, }, + MultiChainRewindService, ], imports: [EventEmitterModule.forRoot()], }).compile(); From 3a1bd7cc843ee6ea8cb1497c69aed01cb342aabf Mon Sep 17 00:00:00 2001 From: Tate Date: Mon, 3 Mar 2025 08:14:35 +0000 Subject: [PATCH 7/9] inject issues --- packages/node-core/src/indexer/fetch.service.ts | 4 ++-- packages/node-core/src/indexer/multiChainRewind.service.ts | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index c180c1884f..9da787f57d 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -16,7 +16,7 @@ import {IBlockDispatcher} from './blockDispatcher'; import {mergeNumAndBlocksToNums} from './dictionary'; import {DictionaryService} from './dictionary/dictionary.service'; import {mergeNumAndBlocks} from './dictionary/utils'; -import {IMultiChainHandler, IMultiChainRewindService, RewindStatus} from './multiChainRewind.service'; +import {IMultiChainHandler, MultiChainRewindService, RewindStatus} from './multiChainRewind.service'; import {IStoreModelProvider} from './storeModelProvider'; import {BypassBlocks, IBlock, IProjectService} from './types'; import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service'; @@ -41,7 +41,7 @@ export class FetchService, - private multiChainRewindService: IMultiChainRewindService + private multiChainRewindService: MultiChainRewindService ) {} private get latestBestHeight(): number { diff --git a/packages/node-core/src/indexer/multiChainRewind.service.ts b/packages/node-core/src/indexer/multiChainRewind.service.ts index 946a67d1d9..a2ba04274b 100644 --- a/packages/node-core/src/indexer/multiChainRewind.service.ts +++ b/packages/node-core/src/indexer/multiChainRewind.service.ts @@ -185,6 +185,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl logger.info(`Register rewind listener success, chainId: ${this.chainId}`); // Check whether the current state is in rollback. + // If a global lock situation occurs, prioritize setting it to the WaitOtherChain state. If a rollback is still required, then set it to the rewinding state. const {rewindLock, rewindTimestamp} = await this.getGlobalRewindStatus(); if (rewindLock) { this.status = RewindStatus.WaitOtherChain; From be995ac513f4770abe568e295f6f387b39801e13 Mon Sep 17 00:00:00 2001 From: Tate Date: Mon, 3 Mar 2025 08:24:50 +0000 Subject: [PATCH 8/9] fix build --- .../src/indexer/fetch.service.spec.ts | 71 ++++++++++--------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index 247333d6fd..c7b8fe17b4 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -1,9 +1,9 @@ // Copyright 2020-2025 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { SchedulerRegistry } from '@nestjs/schedule'; -import { BaseCustomDataSource, BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry } from '@subql/types-core'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {SchedulerRegistry} from '@nestjs/schedule'; +import {BaseCustomDataSource, BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry} from '@subql/types-core'; import { UnfinalizedBlocksService, BlockDispatcher, @@ -19,9 +19,9 @@ import { IBaseIndexerWorker, BypassBlocks, } from '../'; -import { BlockHeightMap } from '../utils/blockHeightMap'; -import { DictionaryService } from './dictionary/dictionary.service'; -import { FetchService } from './fetch.service'; +import {BlockHeightMap} from '../utils/blockHeightMap'; +import {DictionaryService} from './dictionary/dictionary.service'; +import {FetchService} from './fetch.service'; const CHAIN_INTERVAL = 100; // 100ms @@ -73,7 +73,7 @@ class TestBlockchainService implements IBlockchainService { fetchBlockWorker( worker: IBaseIndexerWorker, blockNum: number, - context: { workers: IBaseIndexerWorker[] } + context: {workers: IBaseIndexerWorker[]} ): Promise
{ throw new Error('Method not implemented.'); } @@ -82,7 +82,7 @@ class TestBlockchainService implements IBlockchainService { blockHeight: this.finalizedHeight, blockHash: '0xxx', parentHash: '0xxx', - timestamp: new Date() + timestamp: new Date(), }); } async getBestHeight(): Promise { @@ -124,9 +124,13 @@ class TestBlockchainService implements IBlockchainService { throw new Error('Method not implemented.'); } // eslint-disable-next-line @typescript-eslint/promise-function-async - getBlockTimestamp(height: number): Promise { + getBlockTimestamp(height: number): Promise { throw new Error('Method not implemented.'); } + + async getRequiredHeaderForHeight(height: number): Promise
{ + return (await this.getHeaderForHeight(height)) as any; + } } const nodeConfig = new NodeConfig({ @@ -161,7 +165,7 @@ function mockModuloDs(startBlock: number, endBlock: number, modulo: number): Bas { kind: 'mock/Handler', handler: 'mockFunction', - filter: { modulo: modulo }, + filter: {modulo: modulo}, }, ], }, @@ -268,7 +272,8 @@ describe('Fetch Service', () => { set: jest.fn(), }, } as any, - blockchainService + blockchainService, + {} as any ); spyOnEnqueueSequential = jest.spyOn(fetchService as any, 'enqueueSequential') as any; @@ -302,20 +307,20 @@ describe('Fetch Service', () => { const moduloBlockHeightMap = new BlockHeightMap( new Map([ - [1, [{ ...mockModuloDs(1, 100, 20), startBlock: 1, endBlock: 100 }]], + [1, [{...mockModuloDs(1, 100, 20), startBlock: 1, endBlock: 100}]], [ 101, // empty gap for discontinuous block [], ], - [201, [{ ...mockModuloDs(201, 500, 30), startBlock: 201, endBlock: 500 }]], + [201, [{...mockModuloDs(201, 500, 30), startBlock: 201, endBlock: 500}]], // to infinite - [500, [{ ...mockModuloDs(500, Number.MAX_SAFE_INTEGER, 99), startBlock: 500 }]], + [500, [{...mockModuloDs(500, Number.MAX_SAFE_INTEGER, 99), startBlock: 500}]], // multiple ds [ 600, [ - { ...mockModuloDs(500, 800, 99), startBlock: 600, endBlock: 800 }, - { ...mockModuloDs(700, Number.MAX_SAFE_INTEGER, 101), startBlock: 700 }, + {...mockModuloDs(500, 800, 99), startBlock: 600, endBlock: 800}, + {...mockModuloDs(700, Number.MAX_SAFE_INTEGER, 101), startBlock: 700}, ], ], ]) @@ -333,43 +338,43 @@ describe('Fetch Service', () => { [ 1, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, ], ], [ 10, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, - { ...mockDs, startBlock: 10, endBlock: 20 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, + {...mockDs, startBlock: 10, endBlock: 20}, ], ], [ 21, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, ], ], [ 50, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, - { ...mockDs, startBlock: 50, endBlock: 200 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, + {...mockDs, startBlock: 50, endBlock: 200}, ], ], [ 101, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 50, endBlock: 200 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 50, endBlock: 200}, ], ], - [201, [{ ...mockDs, startBlock: 1, endBlock: 300 }]], + [201, [{...mockDs, startBlock: 1, endBlock: 300}]], [301, []], - [500, [{ ...mockDs, startBlock: 500 }]], + [500, [{...mockDs, startBlock: 500}]], ]) ) ); @@ -505,7 +510,7 @@ describe('Fetch Service', () => { { kind: 'mock/BlockHandler', handler: 'mockFunction', - filter: { modulo: 3 }, + filter: {modulo: 3}, }, { kind: 'mock/CallHandler', @@ -638,7 +643,7 @@ describe('Fetch Service', () => { it('enqueues modulo blocks with furture dataSources', async () => { fetchService.mockGetModulos([3]); - dataSources.push({ ...mockDs, startBlock: 20 }); + dataSources.push({...mockDs, startBlock: 20}); await fetchService.init(1); @@ -651,7 +656,7 @@ describe('Fetch Service', () => { it('at the end of modulo block filter, enqueue END should be min of data source range end height and api last height', async () => { // So this will skip next data source fetchService.mockGetModulos([10]); - dataSources.push({ ...mockDs, startBlock: 200 }); + dataSources.push({...mockDs, startBlock: 200}); await fetchService.init(191); expect((fetchService as any).useDictionary).toBeFalsy(); From c05acd8905369dfffa7560463ef1276a1b5d716c Mon Sep 17 00:00:00 2001 From: Tate Date: Mon, 3 Mar 2025 14:28:56 +0000 Subject: [PATCH 9/9] global model --- .../src/indexer/multiChainRewind.service.ts | 145 ++------------ .../storeModelProvider/global/global.ts | 187 ++++++++++++++++++ .../storeModelProvider/global/index.ts | 4 + 3 files changed, 207 insertions(+), 129 deletions(-) create mode 100644 packages/node-core/src/indexer/storeModelProvider/global/global.ts create mode 100644 packages/node-core/src/indexer/storeModelProvider/global/index.ts diff --git a/packages/node-core/src/indexer/multiChainRewind.service.ts b/packages/node-core/src/indexer/multiChainRewind.service.ts index a2ba04274b..ce8537213d 100644 --- a/packages/node-core/src/indexer/multiChainRewind.service.ts +++ b/packages/node-core/src/indexer/multiChainRewind.service.ts @@ -13,16 +13,8 @@ import {NodeConfig} from '../configure'; import {createRewindTrigger, createRewindTriggerFunction, getPgPoolConfig, getTriggers} from '../db'; import {MultiChainRewindEvent, MultiChainRewindPayload} from '../events'; import {getLogger} from '../logger'; -import { - generateRewindTimestampKey, - GlobalData, - GlobalDataKeys, - RewindLockInfo, - RewindLockKey, - RewindTimestampKey, - RewindTimestampKeyPrefix, -} from './entities'; import {StoreService} from './store.service'; +import {PlainGlobalModel} from './storeModelProvider/global/global'; import {Header} from './types'; const logger = getLogger('MultiChainRewindService'); @@ -39,10 +31,6 @@ export interface IMultiChainRewindService { chainId: string; status: RewindStatus; waitRewindHeader?: Header; - getGlobalRewindStatus(): Promise<{ - rewindTimestamp: GlobalDataKeys[RewindTimestampKey]; - rewindLock?: GlobalDataKeys[typeof RewindLockKey]; - }>; setGlobalRewindLock(rewindTimestamp: number): Promise; /** * Check if the height is consistent before unlocking. @@ -71,6 +59,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl private _dbSchema?: string; private _rewindTriggerName?: string; private pgListener?: PoolClient; + private _globalModel?: PlainGlobalModel = undefined; waitRewindHeader?: Header; constructor( private nodeConfig: NodeConfig, @@ -115,6 +104,13 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl return this._status; } + get globalModel() { + if (!this._globalModel) { + this._globalModel = new PlainGlobalModel(this.dbSchema, this.chainId, this.storeService.globalDataRepo); + } + return this._globalModel; + } + onApplicationShutdown() { this.pgListener?.release(); } @@ -159,7 +155,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl switch (eventType) { case MultiChainRewindEvent.Rewind: case MultiChainRewindEvent.RewindTimestampDecreased: { - const {rewindTimestamp} = await this.getGlobalRewindStatus(); + const {rewindTimestamp} = await this.globalModel.getGlobalRewindStatus(); this.waitRewindHeader = await this.searchWaitRewindHeader(rewindTimestamp); this.status = RewindStatus.Rewinding; @@ -186,7 +182,7 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl // Check whether the current state is in rollback. // If a global lock situation occurs, prioritize setting it to the WaitOtherChain state. If a rollback is still required, then set it to the rewinding state. - const {rewindLock, rewindTimestamp} = await this.getGlobalRewindStatus(); + const {rewindLock, rewindTimestamp} = await this.globalModel.getGlobalRewindStatus(); if (rewindLock) { this.status = RewindStatus.WaitOtherChain; } @@ -214,130 +210,21 @@ export class MultiChainRewindService implements IMultiChainRewindService, OnAppl return JSON.stringify({timestamp: rewindTimestamp, chainNum: chainTotal}); } - async getGlobalRewindStatus() { - const rewindTimestampKey = generateRewindTimestampKey(this.chainId); - - const records = await this.storeService.globalDataRepo.findAll({ - where: {key: {[Op.in]: [rewindTimestampKey, RewindLockKey]}}, - }); - const rewindLockInfo: GlobalData | undefined = records - .find((r) => r.key === RewindLockKey) - ?.toJSON(); - const rewindTimestampInfo: GlobalData | undefined = records - .find((r) => r.key === rewindTimestampKey) - ?.toJSON(); - - assert( - rewindTimestampInfo !== undefined, - `Not registered rewind timestamp key in global data, chainId: ${this.chainId}` - ); - return {rewindTimestamp: rewindTimestampInfo.value, rewindLock: rewindLockInfo?.value}; - } - /** * If the set rewindTimestamp is greater than or equal to the current blockHeight, we do nothing because we will roll back to an earlier time. * If the set rewindTimestamp is less than the current blockHeight, we should roll back to the earlier rewindTimestamp. * @param rewindTimestamp rewindTimestamp in milliseconds */ async setGlobalRewindLock(rewindTimestamp: number) { - const globalTable = this.storeService.globalDataRepo.tableName; - const chainTotal = await this.storeService.globalDataRepo.count({ - where: { - key: {[Op.like]: `${RewindTimestampKeyPrefix}_%`}, - }, - }); - - const tx = await this.sequelize.transaction(); - try { - const [_, updateRows] = await this.sequelize.query( - `INSERT INTO "${this.dbSchema}"."${globalTable}" ( "key", "value", "createdAt", "updatedAt" ) - VALUES - ( '${RewindLockKey}', '${this.serializeRewindLock(rewindTimestamp, chainTotal)}', now(), now()) - ON CONFLICT ( "key" ) - DO UPDATE - SET "key" = EXCLUDED."key", - "value" = EXCLUDED."value", - "updatedAt" = EXCLUDED."updatedAt" - WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT > ${rewindTimestamp}`, - { - type: QueryTypes.INSERT, - transaction: tx, - } - ); - - // If there is a rewind lock that is greater than the current rewind timestamp, we should not update the rewind timestamp - if (updateRows === 1) { - logger.info(`setGlobalRewindLock success chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}`); - await this.storeService.globalDataRepo.update( - {value: rewindTimestamp}, - { - where: {key: {[Op.like]: 'rewindTimestamp_%'}}, - transaction: tx, - } - ); - - // The current chain is in REWINDING state - this.status = RewindStatus.Rewinding; - } - await tx.commit(); - } catch (e: any) { - logger.error( - `setGlobalRewindLock failed chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}, errorMsg: ${e.message}` - ); - await tx.rollback(); - throw e; + const {needRewind} = await this.globalModel.setGlobalRewindLock(rewindTimestamp); + if (needRewind) { + logger.info(`setGlobalRewindLock success chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}`); + this.status = RewindStatus.Rewinding; } } async releaseChainRewindLock(tx: Transaction, rewindTimestamp: number): Promise { - const globalTable = this.storeService.globalDataRepo.tableName; - - // Ensure the first write occurs and prevent deadlock, only update the rewindNum - 1 - const results = await this.sequelize.query<{value: RewindLockInfo}>( - `UPDATE "${this.dbSchema}"."${globalTable}" - SET value = jsonb_set( - value, - '{chainNum}', - to_jsonb(COALESCE(("${globalTable}"."value" ->> 'chainNum')::BIGINT, 0) - 1), - false - ) - WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT = ${rewindTimestamp} - RETURNING value`, - { - type: QueryTypes.SELECT, - transaction: tx, - } - ); - - // not exist rewind lock in current timestamp - if (results.length === 0) { - logger.warn( - `Release rewind lock failed chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}, the rewind lock does not exist` - ); - return 0; - } - const chainNum = results[0].value.chainNum; - - const rewindTimestampKey = generateRewindTimestampKey(this.chainId); - const [affectedCount] = await this.storeService.globalDataRepo.update( - {value: 0}, - { - where: { - key: rewindTimestampKey, - value: rewindTimestamp, - }, - transaction: tx, - } - ); - assert( - affectedCount === 1, - `not found rewind timestamp key in global data, chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}` - ); - - if (chainNum === 0) { - await this.storeService.globalDataRepo.destroy({where: {key: RewindLockKey}, transaction: tx}); - } - + const chainNum = await this.globalModel.releaseChainRewindLock(tx, rewindTimestamp); // The current chain has completed the rewind, and we still need to wait for other chains to finish. // When fully synchronized, set the status back to normal by pgListener. this.status = RewindStatus.WaitOtherChain; diff --git a/packages/node-core/src/indexer/storeModelProvider/global/global.ts b/packages/node-core/src/indexer/storeModelProvider/global/global.ts new file mode 100644 index 0000000000..795b676e6b --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/global/global.ts @@ -0,0 +1,187 @@ +// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import assert from 'assert'; +import {getLogger} from '@subql/node-core/logger'; +import {Op, QueryTypes, Sequelize, Transaction} from '@subql/x-sequelize'; +import { + generateRewindTimestampKey, + GlobalData, + GlobalDataKeys, + GlobalDataRepo, + RewindLockInfo, + RewindLockKey, + RewindTimestampKey, + RewindTimestampKeyPrefix, +} from '../../entities'; + +export interface IGlobalData { + getGlobalRewindStatus(): Promise<{ + rewindTimestamp: GlobalDataKeys[RewindTimestampKey]; + rewindLock?: GlobalDataKeys[typeof RewindLockKey]; + }>; + + setGlobalRewindLock(rewindTimestamp: number): Promise<{needRewind: boolean}>; + /** + * Check if the height is consistent before unlocking. + * @param tx + * @param rewindTimestamp The timestamp to roll back to, in milliseconds. + * @returns the number of remaining rewind chains + */ + releaseChainRewindLock(tx: Transaction, rewindTimestamp: number): Promise; +} + +const logger = getLogger('PlainGlobalModel'); + +export class PlainGlobalModel implements IGlobalData { + constructor( + private readonly dbSchema: string, + private readonly chainId: string, + private readonly model: GlobalDataRepo + ) {} + + private get sequelize(): Sequelize { + const sequelize = this.model.sequelize; + + if (!sequelize) { + throw new Error(`Sequelize is not available on ${this.model.name}`); + } + + return sequelize; + } + + /** + * Serialize the rewind lock + * @param rewindTimestamp ms + * @param chainTotal The total number of registered chains. + * @returns + */ + private serializeRewindLock(rewindTimestamp: number, chainTotal: number): string { + return JSON.stringify({timestamp: rewindTimestamp, chainNum: chainTotal}); + } + + async getGlobalRewindStatus() { + const rewindTimestampKey = generateRewindTimestampKey(this.chainId); + + const records = await this.model.findAll({ + where: {key: {[Op.in]: [rewindTimestampKey, RewindLockKey]}}, + }); + const rewindLockInfo: GlobalData | undefined = records + .find((r) => r.key === RewindLockKey) + ?.toJSON(); + const rewindTimestampInfo: GlobalData | undefined = records + .find((r) => r.key === rewindTimestampKey) + ?.toJSON(); + + assert( + rewindTimestampInfo !== undefined, + `Not registered rewind timestamp key in global data, chainId: ${this.chainId}` + ); + return {rewindTimestamp: rewindTimestampInfo.value, rewindLock: rewindLockInfo?.value}; + } + + /** + * If the set rewindTimestamp is greater than or equal to the current blockHeight, we do nothing because we will roll back to an earlier time. + * If the set rewindTimestamp is less than the current blockHeight, we should roll back to the earlier rewindTimestamp. + * @param rewindTimestamp rewindTimestamp in milliseconds + */ + async setGlobalRewindLock(rewindTimestamp: number): Promise<{needRewind: boolean}> { + const globalTable = this.model.tableName; + const chainTotal = await this.model.count({ + where: { + key: {[Op.like]: `${RewindTimestampKeyPrefix}_%`}, + }, + }); + + let needRewind = false; + const tx = await this.sequelize.transaction(); + try { + const [_, updateRows] = await this.sequelize.query( + `INSERT INTO "${this.dbSchema}"."${globalTable}" ( "key", "value", "createdAt", "updatedAt" ) + VALUES + ( '${RewindLockKey}', '${this.serializeRewindLock(rewindTimestamp, chainTotal)}', now(), now()) + ON CONFLICT ( "key" ) + DO UPDATE + SET "key" = EXCLUDED."key", + "value" = EXCLUDED."value", + "updatedAt" = EXCLUDED."updatedAt" + WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT > ${rewindTimestamp}`, + { + type: QueryTypes.INSERT, + transaction: tx, + } + ); + + // If there is a rewind lock that is greater than the current rewind timestamp, we should not update the rewind timestamp + if (updateRows === 1) { + await this.model.update( + {value: rewindTimestamp}, + { + where: {key: {[Op.like]: 'rewindTimestamp_%'}}, + transaction: tx, + } + ); + + // The current chain is in REWINDING state + needRewind = true; + } + await tx.commit(); + return {needRewind}; + } catch (e: any) { + logger.error( + `setGlobalRewindLock failed chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}, errorMsg: ${e.message}` + ); + await tx.rollback(); + throw e; + } + } + + async releaseChainRewindLock(tx: Transaction, rewindTimestamp: number): Promise { + const globalTable = this.model.tableName; + + // Ensure the first write occurs and prevent deadlock, only update the rewindNum - 1 + const results = await this.sequelize.query<{value: RewindLockInfo}>( + `UPDATE "${this.dbSchema}"."${globalTable}" + SET value = jsonb_set( + value, + '{chainNum}', + to_jsonb(COALESCE(("${globalTable}"."value" ->> 'chainNum')::BIGINT, 0) - 1), + false + ) + WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT = ${rewindTimestamp} + RETURNING value`, + { + type: QueryTypes.SELECT, + transaction: tx, + } + ); + + // not exist rewind lock in current timestamp + if (results.length === 0) { + return 0; + } + const chainNum = results[0].value.chainNum; + + const rewindTimestampKey = generateRewindTimestampKey(this.chainId); + const [affectedCount] = await this.model.update( + {value: 0}, + { + where: { + key: rewindTimestampKey, + value: rewindTimestamp, + }, + transaction: tx, + } + ); + assert( + affectedCount === 1, + `not found rewind timestamp key in global data, chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}` + ); + + if (chainNum === 0) { + await this.model.destroy({where: {key: RewindLockKey}, transaction: tx}); + } + + return chainNum; + } +} diff --git a/packages/node-core/src/indexer/storeModelProvider/global/index.ts b/packages/node-core/src/indexer/storeModelProvider/global/index.ts new file mode 100644 index 0000000000..6e4c701873 --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/global/index.ts @@ -0,0 +1,4 @@ +// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +export {IGlobalData} from './global';