From 4577dc1133f5a4ddbc29b62c9bdbb225ae0f1b14 Mon Sep 17 00:00:00 2001 From: Joshua Aruokhai Date: Mon, 30 Sep 2024 14:56:21 +0100 Subject: [PATCH] chore: added option for verbosity and debug level draft: improved operation state mechanism draft: improved operation state mechanism draft: improved operation state mechanism implemented a new entity to store cached block state --- .gitignore | 1 - config/config.yaml | 3 +- config/dev.config.yaml | 4 +- config/e2e.config.yaml | 3 +- .../base-block-data-provider.abstract.ts | 101 ++++-------------- .../bitcoin-core/interfaces.ts | 6 +- .../bitcoin-core/provider.spec.ts | 5 + .../bitcoin-core/provider.ts | 57 +++++----- .../block-provider.module.ts | 19 ++-- src/block-data-providers/esplora/interface.ts | 5 +- src/block-data-providers/esplora/provider.ts | 59 +++++----- src/block-state/block-state.entity.ts | 10 ++ src/block-state/block-state.module.ts | 13 +++ src/block-state/block-state.service.ts | 36 +++++++ src/configuration.model.ts | 13 ++- src/main.ts | 11 ++ src/operation-state/operation-state.entity.ts | 1 - 17 files changed, 190 insertions(+), 157 deletions(-) create mode 100644 src/block-state/block-state.entity.ts create mode 100644 src/block-state/block-state.module.ts create mode 100644 src/block-state/block-state.service.ts diff --git a/.gitignore b/.gitignore index eadb0bb..0d1bd19 100644 --- a/.gitignore +++ b/.gitignore @@ -29,7 +29,6 @@ lerna-debug.log* # IDE - VSCode .vscode/* -!.vscode/settings.json !.vscode/tasks.json !.vscode/launch.json !.vscode/extensions.json diff --git a/config/config.yaml b/config/config.yaml index 61740ff..01850cd 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -3,7 +3,8 @@ db: synchronize: false app: port: - schedulerInterval: #interval in seconds + verbose: + debug: network: requestRetry: delay: # delay in Milliseconds diff --git a/config/dev.config.yaml b/config/dev.config.yaml index 50e16b4..b304d4a 100644 --- a/config/dev.config.yaml +++ b/config/dev.config.yaml @@ -3,7 +3,8 @@ db: synchronize: true app: port: 3000 - schedulerInterval: 10 + verbose: false + debug: true network: regtest requestRetry: delay: 3000 @@ -18,4 +19,3 @@ bitcoinCore: rpcPass: password rpcUser: admin rpcPort: 18443 - diff --git a/config/e2e.config.yaml b/config/e2e.config.yaml index 4b1df7d..89d36d2 100644 --- a/config/e2e.config.yaml +++ b/config/e2e.config.yaml @@ -3,7 +3,8 @@ db: synchronize: true app: port: 3000 - schedulerInterval: 10 + verbose: false + debug: true network: regtest requestRetry: delay: 500 diff --git a/src/block-data-providers/base-block-data-provider.abstract.ts b/src/block-data-providers/base-block-data-provider.abstract.ts index 0d0cb86..f19be1b 100644 --- a/src/block-data-providers/base-block-data-provider.abstract.ts +++ b/src/block-data-providers/base-block-data-provider.abstract.ts @@ -5,44 +5,20 @@ import { TransactionInput, TransactionOutput, } from '@/indexer/indexer.service'; -import { TransactionsService } from '@/transactions/transactions.service'; -import { SchedulerRegistry } from '@nestjs/schedule'; -import { CronJob } from 'cron'; import { ConfigService } from '@nestjs/config'; +import { BlockStateService } from '@/block-state/block-state.service'; +import { BlockState } from '@/block-state/block-state.entity'; -export interface BaseOperationState { - indexedBlockHeight: number; - blockCache: Record; -} - -export abstract class BaseBlockDataProvider< - OperationState extends BaseOperationState, -> { +export abstract class BaseBlockDataProvider { protected abstract readonly logger: Logger; protected abstract readonly operationStateKey: string; - protected cacheSize = 6; - protected readonly CRON_JOB_NAME = 'providerSync'; protected constructor( protected readonly configService: ConfigService, private readonly indexerService: IndexerService, private readonly operationStateService: OperationStateService, - private readonly transactionService: TransactionsService, - private readonly schedulerRegistry: SchedulerRegistry, - ) { - const schedulerIntervalInSeconds = this.configService.get( - 'app.schedulerInterval', - ); - - const job = new CronJob( - `*/${schedulerIntervalInSeconds} * * * * *`, - () => this.sync(), - ); - this.schedulerRegistry.addCronJob(this.CRON_JOB_NAME, job); - job.start(); - } - - abstract sync(): void; + protected readonly blockStateService: BlockStateService, + ) {} async indexTransaction( txid: string, @@ -68,69 +44,38 @@ export abstract class BaseBlockDataProvider< )?.state; } - async setState(partialState: Partial): Promise { - const oldState = (await this.getState()) || ({} as OperationState); - - if (partialState.blockCache) { - const updatedBlockCache = { - ...oldState.blockCache, - ...partialState.blockCache, - }; - - if (this.cacheSize < Object.keys(updatedBlockCache).length) { - delete updatedBlockCache[oldState.indexedBlockHeight - 5]; - } - - partialState.blockCache = updatedBlockCache; - } - - const newState = { - ...oldState, - ...partialState, - }; - + async setState( + state: OperationState, + blockState: BlockState, + ): Promise { await this.operationStateService.setOperationState( this.operationStateKey, - newState, + state, ); + + await this.blockStateService.addBlockState(blockState); } abstract getBlockHash(height: number): Promise; async traceReorg(): Promise { - const { indexedBlockHeight, blockCache } = await this.getState(); - let counter = indexedBlockHeight; + let state = await this.blockStateService.getCurrentBlockState(); - if (Object.keys(blockCache).length === 0) { - return indexedBlockHeight; - } - - while (true) { - const storedBlockHash = blockCache[counter]; + if (!state) return null; - if (storedBlockHash === undefined) { - throw new Error('Reorgs levels deep'); - } + while (state) { + const fetchedBlockHash = await this.getBlockHash(state.blockHeight); - const fetchedBlockHash = await this.getBlockHash(counter); + if (state.blockHash === fetchedBlockHash) return state.blockHeight; - if (storedBlockHash === fetchedBlockHash) { - return counter; - } - console.log( - 'reorg found at count: ', - counter, - ' and hash: ', - storedBlockHash, - ' ', - fetchedBlockHash, - ); + await this.blockStateService.removeState(state); - await this.transactionService.deleteTransactionByBlockHash( - storedBlockHash, + this.logger.log( + `Reorg found at height: ${state.blockHeight}, Wrong hash: ${state.blockHash}, Correct hash: ${fetchedBlockHash}`, ); - - --counter; + state = await this.blockStateService.getCurrentBlockState(); } + + throw new Error('Cannot Reorgs, blockchain state exhausted'); } } diff --git a/src/block-data-providers/bitcoin-core/interfaces.ts b/src/block-data-providers/bitcoin-core/interfaces.ts index 99e4614..e219fa4 100644 --- a/src/block-data-providers/bitcoin-core/interfaces.ts +++ b/src/block-data-providers/bitcoin-core/interfaces.ts @@ -1,5 +1,4 @@ import { TransactionInput, TransactionOutput } from '@/indexer/indexer.service'; -import { BaseOperationState } from '@/block-data-providers/base-block-data-provider.abstract'; export interface Block { height: number; @@ -40,10 +39,9 @@ export interface Output { }; } -export interface BitcoinCoreOperationState extends BaseOperationState { - currentBlockHeight: number; +export type BitcoinCoreOperationState = { indexedBlockHeight: number; -} +}; export type Transaction = { txid: string; diff --git a/src/block-data-providers/bitcoin-core/provider.spec.ts b/src/block-data-providers/bitcoin-core/provider.spec.ts index 5849457..fa9ab7e 100644 --- a/src/block-data-providers/bitcoin-core/provider.spec.ts +++ b/src/block-data-providers/bitcoin-core/provider.spec.ts @@ -12,6 +12,7 @@ import { rawTransactions, } from '@/block-data-providers/bitcoin-core/provider-fixtures'; import { Test, TestingModule } from '@nestjs/testing'; +import { BlockStateService } from '@/block-state/block-state.service'; describe('Bitcoin Core Provider', () => { let provider: BitcoinCoreProvider; @@ -46,6 +47,10 @@ describe('Bitcoin Core Provider', () => { getOperationState: jest.fn(), }, }, + { + provide: BlockStateService, + useClass: jest.fn(), + }, ], }).compile(); diff --git a/src/block-data-providers/bitcoin-core/provider.ts b/src/block-data-providers/bitcoin-core/provider.ts index 552ac7b..b44bd16 100644 --- a/src/block-data-providers/bitcoin-core/provider.ts +++ b/src/block-data-providers/bitcoin-core/provider.ts @@ -7,7 +7,7 @@ import { SATS_PER_BTC, TAPROOT_ACTIVATION_HEIGHT, } from '@/common/constants'; -import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule'; +import { Cron, CronExpression } from '@nestjs/schedule'; import { IndexerService, TransactionInput, @@ -28,7 +28,7 @@ import { import { AxiosRequestConfig } from 'axios'; import * as currency from 'currency.js'; import { AxiosRetryConfig, makeRequest } from '@/common/request'; -import { TransactionsService } from '@/transactions/transactions.service'; +import { BlockStateService } from '@/block-state/block-state.service'; @Injectable() export class BitcoinCoreProvider @@ -45,15 +45,13 @@ export class BitcoinCoreProvider configService: ConfigService, indexerService: IndexerService, operationStateService: OperationStateService, - transactionService: TransactionsService, - schedulerRegistry: SchedulerRegistry, + blockStateService: BlockStateService, ) { super( configService, indexerService, operationStateService, - transactionService, - schedulerRegistry, + blockStateService, ); const { protocol, rpcPort, rpcHost } = @@ -66,35 +64,42 @@ export class BitcoinCoreProvider } async onApplicationBootstrap() { - const getState = await this.getState(); - if (getState) { + const currentState = await this.getState(); + if (currentState) { this.logger.log( `Restoring state from previous run: ${JSON.stringify( - getState, + currentState, )}`, ); } else { this.logger.log('No previous state found. Starting from scratch.'); - const state: BitcoinCoreOperationState = { - currentBlockHeight: 0, - blockCache: {}, - indexedBlockHeight: - this.configService.get('app.network') === - BitcoinNetwork.MAINNET - ? TAPROOT_ACTIVATION_HEIGHT - 1 - : 0, - }; - - await this.setState(state); + + const blockHeight = + this.configService.get('app.network') === + BitcoinNetwork.MAINNET + ? TAPROOT_ACTIVATION_HEIGHT - 1 + : 0; + const blockHash = await this.getBlockHash(blockHeight); + + await this.setState( + { + indexedBlockHeight: blockHeight, + }, + { + blockHash, + blockHeight, + }, + ); } } + @Cron(CronExpression.EVERY_10_SECONDS) async sync() { if (this.isSyncing) return; this.isSyncing = true; - console.log("sync running"); const state = await this.getState(); + if (!state) { throw new Error('State not found'); } @@ -113,7 +118,8 @@ export class BitcoinCoreProvider const networkInfo = await this.getNetworkInfo(); const verbosityLevel = this.versionToVerbosity(networkInfo.version); - let height = (await this.traceReorg()) + 1; + let height = + ((await this.traceReorg()) ?? state.indexedBlockHeight) + 1; for (height; height <= tipHeight; height++) { const [transactions, blockHash] = await this.processBlock( @@ -133,9 +139,10 @@ export class BitcoinCoreProvider ); } - await this.setState({ - indexedBlockHeight: height, - blockCache: { [height]: blockHash }, + state.indexedBlockHeight = height; + await this.setState(state, { + blockHash: blockHash, + blockHeight: height, }); } } finally { diff --git a/src/block-data-providers/block-provider.module.ts b/src/block-data-providers/block-provider.module.ts index f0b6f90..771ae1c 100644 --- a/src/block-data-providers/block-provider.module.ts +++ b/src/block-data-providers/block-provider.module.ts @@ -7,16 +7,15 @@ import { IndexerService } from '@/indexer/indexer.service'; import { ProviderType } from '@/common/enum'; import { BitcoinCoreProvider } from '@/block-data-providers/bitcoin-core/provider'; import { EsploraProvider } from '@/block-data-providers/esplora/provider'; -import { TransactionsService } from '@/transactions/transactions.service'; -import { TransactionsModule } from '@/transactions/transactions.module'; -import { SchedulerRegistry } from '@nestjs/schedule'; +import { BlockStateService } from '@/block-state/block-state.service'; +import { BlockStateModule } from '@/block-state/block-state.module'; @Module({ imports: [ OperationStateModule, IndexerModule, ConfigModule, - TransactionsModule, + BlockStateModule, ], controllers: [], providers: [ @@ -26,15 +25,13 @@ import { SchedulerRegistry } from '@nestjs/schedule'; ConfigService, IndexerService, OperationStateService, - TransactionsService, - SchedulerRegistry, + BlockStateService, ], useFactory: ( configService: ConfigService, indexerService: IndexerService, operationStateService: OperationStateService, - transactionService: TransactionsService, - schedulerRegistry: SchedulerRegistry, + blockStateService: BlockStateService, ) => { switch (configService.get('providerType')) { case ProviderType.ESPLORA: @@ -42,16 +39,14 @@ import { SchedulerRegistry } from '@nestjs/schedule'; configService, indexerService, operationStateService, - transactionService, - schedulerRegistry, + blockStateService, ); case ProviderType.BITCOIN_CORE_RPC: return new BitcoinCoreProvider( configService, indexerService, operationStateService, - transactionService, - schedulerRegistry, + blockStateService, ); default: throw Error('unrecognised provider type in config'); diff --git a/src/block-data-providers/esplora/interface.ts b/src/block-data-providers/esplora/interface.ts index 0059fa3..438ef4d 100644 --- a/src/block-data-providers/esplora/interface.ts +++ b/src/block-data-providers/esplora/interface.ts @@ -1,7 +1,6 @@ -import { BaseOperationState } from '@/block-data-providers/base-block-data-provider.abstract'; - -export interface EsploraOperationState extends BaseOperationState { +export interface EsploraOperationState { currentBlockHeight: number; + indexedBlockHeight: number; lastProcessedTxIndex: number; } diff --git a/src/block-data-providers/esplora/provider.ts b/src/block-data-providers/esplora/provider.ts index 195083b..5dc36f5 100644 --- a/src/block-data-providers/esplora/provider.ts +++ b/src/block-data-providers/esplora/provider.ts @@ -11,8 +11,8 @@ import { EsploraTransaction, } from '@/block-data-providers/esplora/interface'; import { TAPROOT_ACTIVATION_HEIGHT } from '@/common/constants'; -import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule'; -import { TransactionsService } from '@/transactions/transactions.service'; +import { BlockStateService } from '@/block-state/block-state.service'; +import { Cron, CronExpression } from '@nestjs/schedule'; @Injectable() export class EsploraProvider @@ -30,15 +30,13 @@ export class EsploraProvider configService: ConfigService, indexerService: IndexerService, operationStateService: OperationStateService, - transactionService: TransactionsService, - schedulerRegistry: SchedulerRegistry, + blockStateService: BlockStateService, ) { super( configService, indexerService, operationStateService, - transactionService, - schedulerRegistry, + blockStateService, ); this.batchSize = this.configService.get('esplora.batchSize'); @@ -64,29 +62,38 @@ export class EsploraProvider } async onApplicationBootstrap() { - const getState = await this.getState(); - if (getState) { + const currentState = await this.getState(); + if (currentState) { this.logger.log( `Restoring state from previous run: ${JSON.stringify( - getState, + currentState, )}`, ); } else { this.logger.log('No previous state found. Starting from scratch.'); - const state: EsploraOperationState = { - currentBlockHeight: 0, - blockCache: {}, - indexedBlockHeight: - this.configService.get('app.network') === - BitcoinNetwork.MAINNET - ? TAPROOT_ACTIVATION_HEIGHT - 1 - : 0, - lastProcessedTxIndex: 0, // we dont take coinbase txn in account - }; - await this.setState(state); + + const blockHeight = + this.configService.get('app.network') === + BitcoinNetwork.MAINNET + ? TAPROOT_ACTIVATION_HEIGHT - 1 + : 0; + const blockHash = await this.getBlockHash(blockHeight); + + await this.setState( + { + currentBlockHeight: 0, + indexedBlockHeight: blockHeight, + lastProcessedTxIndex: 0, // we don't take coinbase txn into account + }, + { + blockHash, + blockHeight, + }, + ); } } + @Cron(CronExpression.EVERY_10_SECONDS) async sync() { if (this.isSyncing) return; this.isSyncing = true; @@ -106,7 +113,8 @@ export class EsploraProvider return; } - let height = (await this.traceReorg()) + 1; + let height = + ((await this.traceReorg()) ?? state.indexedBlockHeight) + 1; for (height; height <= tipHeight; height++) { const blockHash = await this.getBlockHash(height); @@ -161,10 +169,11 @@ export class EsploraProvider }, this), ); - await this.setState({ - indexedBlockHeight: height, - lastProcessedTxIndex: i + this.batchSize - 1, - blockCache: { [height]: hash }, + state.indexedBlockHeight = height; + state.lastProcessedTxIndex = i + this.batchSize - 1; + await this.setState(state, { + blockHeight: height, + blockHash: hash, }); } catch (error) { this.logger.error( diff --git a/src/block-state/block-state.entity.ts b/src/block-state/block-state.entity.ts new file mode 100644 index 0000000..f31a7d7 --- /dev/null +++ b/src/block-state/block-state.entity.ts @@ -0,0 +1,10 @@ +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +@Entity() +export class BlockState { + @PrimaryColumn('integer') + blockHeight: number; + + @Column('text') + blockHash: string; +} diff --git a/src/block-state/block-state.module.ts b/src/block-state/block-state.module.ts new file mode 100644 index 0000000..84baabe --- /dev/null +++ b/src/block-state/block-state.module.ts @@ -0,0 +1,13 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { BlockState } from '@/block-state/block-state.entity'; +import { BlockStateService } from '@/block-state/block-state.service'; +import { TransactionsModule } from '@/transactions/transactions.module'; + +@Module({ + imports: [TypeOrmModule.forFeature([BlockState]), TransactionsModule], + controllers: [], + providers: [BlockStateService], + exports: [BlockStateService], +}) +export class BlockStateModule {} diff --git a/src/block-state/block-state.service.ts b/src/block-state/block-state.service.ts new file mode 100644 index 0000000..5874649 --- /dev/null +++ b/src/block-state/block-state.service.ts @@ -0,0 +1,36 @@ +import { Injectable } from '@nestjs/common'; +import { Repository } from 'typeorm'; +import { BlockState } from '@/block-state/block-state.entity'; +import { InjectRepository } from '@nestjs/typeorm'; +import { TransactionsService } from '@/transactions/transactions.service'; + +@Injectable() +export class BlockStateService { + constructor( + @InjectRepository(BlockState) + private readonly blockStateRepository: Repository, + private readonly transactionService: TransactionsService, + ) {} + + async getCurrentBlockState(): Promise { + return ( + await this.blockStateRepository.find({ + order: { + blockHeight: 'DESC', + }, + take: 1, + }) + )[0]; + } + + async addBlockState(state: BlockState): Promise { + await this.blockStateRepository.save(state); + } + + async removeState(state: BlockState): Promise { + await this.blockStateRepository.delete(state.blockHeight); + await this.transactionService.deleteTransactionByBlockHash( + state.blockHash, + ); + } +} diff --git a/src/configuration.model.ts b/src/configuration.model.ts index 20a8b3b..c51a67d 100644 --- a/src/configuration.model.ts +++ b/src/configuration.model.ts @@ -5,6 +5,7 @@ import { IsIn, IsInt, IsNotEmpty, + IsOptional, IsString, IsUrl, Max, @@ -40,10 +41,6 @@ class AppConfig { @Max(65535) port: number; - @IsInt() - @Min(1) - schedulerInterval: number; - @IsEnum(BitcoinNetwork) network: BitcoinNetwork; @@ -51,6 +48,14 @@ class AppConfig { @ValidateNested() @Type(() => AxiosRetryConfig) requestRetry: AxiosRetryConfig; + + @IsOptional() + @IsBoolean() + verbose?: boolean; + + @IsOptional() + @IsBoolean() + debug?: boolean; } class EsploraConfig { diff --git a/src/main.ts b/src/main.ts index 874a2ad..6488ce1 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,6 +1,7 @@ import { NestFactory } from '@nestjs/core'; import { AppModule } from '@/app.module'; import { ConfigService } from '@nestjs/config'; +import { LogLevel } from '@nestjs/common'; declare const module: any; @@ -10,6 +11,16 @@ async function bootstrap() { const configService = app.get(ConfigService); const port = configService.get('app.port'); + const isVerbose = configService.get('app.verbose') ?? false; + const isDebug = configService.get('app.debug') ?? false; + + const loggerLevels: LogLevel[] = ['error', 'warn', 'log']; + + if (isVerbose) loggerLevels.push('verbose'); + if (isDebug) loggerLevels.push('debug'); + + app.useLogger(loggerLevels); + await app.listen(port); if (module.hot) { diff --git a/src/operation-state/operation-state.entity.ts b/src/operation-state/operation-state.entity.ts index 3db2027..837ed41 100644 --- a/src/operation-state/operation-state.entity.ts +++ b/src/operation-state/operation-state.entity.ts @@ -7,5 +7,4 @@ export class OperationState { @Column('simple-json') state: any; - }