Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi chain rewind service #2673

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/node-core/src/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export interface IBlockchainService<
// Unfinalized blocks
getHeaderForHash(hash: string): Promise<Header>;
getHeaderForHeight(height: number): Promise<Header>;
getRequiredHeaderForHeight(height: number): Promise<Required<Header>>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? getHeaderForHeight should always return a timestamp but there is one substrate chain that doesn't have timestamps in blocks so that is why it is optional


// Dynamic Ds sevice
/**
Expand Down
14 changes: 14 additions & 0 deletions packages/node-core/src/db/db.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import {DynamicModule, Global} from '@nestjs/common';
import {Sequelize, Options as SequelizeOption} from '@subql/x-sequelize';
import {PoolConfig} from 'pg';
import {NodeConfig} from '../configure/NodeConfig';
import {getLogger} from '../logger';
import {exitWithError} from '../process';
Expand Down Expand Up @@ -90,6 +91,19 @@ export async function establishNewSequelize(nodeConfig: NodeConfig): Promise<Seq
return sequelizeFactory(buildSequelizeOptions(nodeConfig, DEFAULT_DB_OPTION))();
}

export function getPgPoolConfig(nodeConfig: NodeConfig): PoolConfig {
const sequelizeOptions = buildSequelizeOptions(nodeConfig, DEFAULT_DB_OPTION);
return {
user: sequelizeOptions.username,
password: sequelizeOptions.password,
host: sequelizeOptions.host,
port: sequelizeOptions.port,
database: sequelizeOptions.database,
max: 1,
ssl: sequelizeOptions.ssl,
};
}

@Global()
export class DbModule {
static forRootWithConfig(nodeConfig: NodeConfig, option: DbOption = DEFAULT_DB_OPTION): DynamicModule {
Expand Down
4 changes: 4 additions & 0 deletions packages/node-core/src/db/sync-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,8 @@ describe('sync helper test', () => {
);
}, 10_000);
});

describe('rewind lock', () => {
// TODO
});
});
39 changes: 39 additions & 0 deletions packages/node-core/src/db/sync-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
Utils,
} from '@subql/x-sequelize';
import {ModelAttributeColumnReferencesOptions, ModelIndexesOptions} from '@subql/x-sequelize/types/model';
import {MultiChainRewindEvent} from '../events';
import {EnumType} from '../utils';
import {formatAttributes, generateIndexName, modelToTableName} from './sequelizeUtil';
// eslint-disable-next-line @typescript-eslint/no-var-requires
Expand Down Expand Up @@ -297,6 +298,44 @@ export function createSchemaTriggerFunction(schema: string): string {
$$ LANGUAGE plpgsql;`;
}

export function createRewindTrigger(schema: string): string {
const triggerName = hashName(schema, 'rewind_trigger', '_global');

return `
CREATE TRIGGER "${triggerName}"
AFTER INSERT OR UPDATE OR DELETE
ON "${schema}"."_global"
FOR EACH ROW
EXECUTE FUNCTION "${schema}".rewind_notification();
`;
}

export function createRewindTriggerFunction(schema: string): string {
const triggerName = hashName(schema, 'rewind_trigger', '_global');

return `
CREATE OR REPLACE FUNCTION "${schema}".rewind_notification()
RETURNS trigger AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.Rewind}');
END IF;

-- During a rollback, there is a chain that needs to be rolled back to an earlier height.
IF TG_OP = 'UPDATE' AND (NEW.value ->> 'timestamp')::BIGINT < (OLD.value ->> 'timestamp')::BIGINT THEN
PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindTimestampDecreased}');
END IF;

IF TG_OP = 'DELETE' THEN
PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindComplete}');
END IF;

RETURN NULL;
END;
$$ LANGUAGE plpgsql;
`;
}

export function getExistedIndexesQuery(schema: string): string {
return `SELECT indexname FROM pg_indexes WHERE schemaname = '${schema}'`;
}
Expand Down
10 changes: 10 additions & 0 deletions packages/node-core/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ export enum PoiEvent {
PoiTarget = 'poi_target',
}

export enum MultiChainRewindEvent {
Rewind = 'rewind',
RewindComplete = 'rewind_complete',
RewindTimestampDecreased = 'timestamp_decreased',
}

export interface RewindPayload {
success: boolean;
height: number;
Expand Down Expand Up @@ -61,3 +67,7 @@ export interface NetworkMetadataPayload {
specName: string;
genesisHash: string;
}

export interface MultiChainRewindPayload {
height: number;
}
52 changes: 52 additions & 0 deletions packages/node-core/src/indexer/entities/GlobalData.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {blake2AsHex} from '@subql/utils';
import {BuildOptions, DataTypes, Model, Sequelize} from '@subql/x-sequelize';

export const RewindTimestampKeyPrefix = 'rewindTimestamp';
export const RewindLockKey = 'rewindLock';
export type RewindTimestampKey = `${typeof RewindTimestampKeyPrefix}_${string}`;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the string portion of this a chainId? Can you comment what sort of values are expected please

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


export type RewindLockInfo = {
/** Timestamp to rewind to. */
timestamp: number;
chainNum: number;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does chainNum mean here? Should it be chainId?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it means the number of other chains that need to wait for rollback. Maybe we need to rename it.

};
export interface GlobalDataKeys {
rewindLock: RewindLockInfo;
[key: RewindTimestampKey]: number;
}

export interface GlobalData<k extends keyof GlobalDataKeys = keyof GlobalDataKeys> {
key: k;
value: GlobalDataKeys[k];
}

interface GlobalDataEntity extends Model<GlobalData>, GlobalData {}

export type GlobalDataRepo = typeof Model & {
new (values?: unknown, options?: BuildOptions): GlobalDataEntity;
};

export function GlobalDataFactory(sequelize: Sequelize, schema: string): GlobalDataRepo {
const tableName = '_global';

return <GlobalDataRepo>sequelize.define(
tableName,
{
key: {
type: DataTypes.STRING,
primaryKey: true,
},
value: {
type: DataTypes.JSONB,
},
Comment on lines +42 to +44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the purpose of this being JSONB? Looking at the rest of the code it would simplify the queries to have well defined columns

},
{freezeTableName: true, schema: schema}
);
}

export function generateRewindTimestampKey(chainId: string): RewindTimestampKey {
return `${RewindTimestampKeyPrefix}_${blake2AsHex(chainId)})`.substring(0, 63) as RewindTimestampKey;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this hash and substring necessary? We use it in other places such as table or function names because of postgres limitations but in this case because its data I don't see why these steps are necessary.

}
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

export * from './Poi.entity';
export * from './Metadata.entity';
export * from './GlobalData.entity';
25 changes: 21 additions & 4 deletions packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@

import assert from 'assert';
import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {EventEmitter2, OnEvent} from '@nestjs/event-emitter';
import {SchedulerRegistry} from '@nestjs/schedule';
import {BaseDataSource} from '@subql/types-core';
import {range} from 'lodash';
import {IBlockchainService} from '../blockchain.service';
import {NodeConfig} from '../configure';
import {IndexerEvent} from '../events';
import {EventPayload, IndexerEvent, MultiChainRewindEvent, MultiChainRewindPayload} from '../events';
import {getLogger} from '../logger';
import {delay, filterBypassBlocks, getModulos} from '../utils';
import {IBlockDispatcher} from './blockDispatcher';
import {mergeNumAndBlocksToNums} from './dictionary';
import {DictionaryService} from './dictionary/dictionary.service';
import {mergeNumAndBlocks} from './dictionary/utils';
import {IMultiChainHandler, MultiChainRewindService, RewindStatus} from './multiChainRewind.service';
import {IStoreModelProvider} from './storeModelProvider';
import {BypassBlocks, IBlock, IProjectService} from './types';
import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service';
Expand All @@ -24,7 +25,7 @@ const logger = getLogger('FetchService');

@Injectable()
export class FetchService<DS extends BaseDataSource, B extends IBlockDispatcher<FB>, FB>
implements OnApplicationShutdown
implements OnApplicationShutdown, IMultiChainHandler
{
private _latestBestHeight?: number;
private _latestFinalizedHeight?: number;
Expand All @@ -39,7 +40,8 @@ export class FetchService<DS extends BaseDataSource, B extends IBlockDispatcher<
private schedulerRegistry: SchedulerRegistry,
@Inject('IUnfinalizedBlocksService') private unfinalizedBlocksService: IUnfinalizedBlocksServiceUtil,
@Inject('IStoreModelProvider') private storeModelProvider: IStoreModelProvider,
@Inject('IBlockchainService') private blockchainSevice: IBlockchainService<DS>
@Inject('IBlockchainService') private blockchainSevice: IBlockchainService<DS>,
private multiChainRewindService: MultiChainRewindService
) {}

private get latestBestHeight(): number {
Expand Down Expand Up @@ -196,6 +198,14 @@ export class FetchService<DS extends BaseDataSource, B extends IBlockDispatcher<
// Update the target height, this happens here to stay in sync with the rest of indexing
void this.storeModelProvider.metadata.set('targetHeight', latestHeight);

// If we're rewinding, we should wait until it's done
const multiChainStatus = this.multiChainRewindService.getStatus();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There needs to be more places to pause indexing to allow a multichain rewind. The fetch service just enqueues blocks, but the block dispatcher and indexer manager need to also stop fetching blocks and indexing current blocks.

if (RewindStatus.Normal !== multiChainStatus) {
logger.info(`Wait for all chains to complete rewind, current chainId: ${this.multiChainRewindService.chainId}`);
await delay(3);
continue;
}

// This could be latestBestHeight, dictionary should never include finalized blocks
// TODO add buffer so dictionary not used when project synced
if (startBlockHeight < this.latestBestHeight - scaledBatchSize) {
Expand Down Expand Up @@ -383,4 +393,11 @@ export class FetchService<DS extends BaseDataSource, B extends IBlockDispatcher<
this.updateDictionary();
this.blockDispatcher.flushQueue(blockHeight);
}

@OnEvent(MultiChainRewindEvent.Rewind)
@OnEvent(MultiChainRewindEvent.RewindTimestampDecreased)
handleMultiChainRewindEvent(payload: MultiChainRewindPayload) {
logger.info(`Received rewind event, height: ${payload.height}`);
this.resetForNewDs(payload.height);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this function needs to be renamed if it has other uses now.

}
}
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ export * from './indexer.manager';
export * from './ds-processor.service';
export * from './unfinalizedBlocks.service';
export * from './monitor.service';
export * from './multiChainRewind.service';
export * from './core.module';
Loading
Loading