diff --git a/.gitignore b/.gitignore index 5cba756..0a59533 100644 --- a/.gitignore +++ b/.gitignore @@ -29,7 +29,6 @@ lerna-debug.log* # IDE - VSCode .vscode/* -!.vscode/settings.json !.vscode/tasks.json !.vscode/launch.json !.vscode/extensions.json diff --git a/config/config.yaml b/config/config.yaml index 81fd494..01850cd 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -3,6 +3,8 @@ db: synchronize: false app: port: + verbose: + debug: network: requestRetry: delay: # delay in Milliseconds diff --git a/config/dev.config.yaml b/config/dev.config.yaml index ccc87f8..b304d4a 100644 --- a/config/dev.config.yaml +++ b/config/dev.config.yaml @@ -3,18 +3,19 @@ db: synchronize: true app: port: 3000 + verbose: false + debug: true network: regtest requestRetry: delay: 3000 count: 3 -providerType: ESPLORA +providerType: BITCOIN_CORE_RPC esplora: url: https://blockstream.info batchSize: 5 bitcoinCore: protocol: http rpcHost: 127.0.0.1 - rpcPass: polarpass - rpcUser: polaruser - rpcPort: 18445 - + rpcPass: password + rpcUser: admin + rpcPort: 18443 diff --git a/config/e2e.config.yaml b/config/e2e.config.yaml index f69406e..89d36d2 100644 --- a/config/e2e.config.yaml +++ b/config/e2e.config.yaml @@ -3,6 +3,8 @@ db: synchronize: true app: port: 3000 + verbose: false + debug: true network: regtest requestRetry: delay: 500 diff --git a/src/block-data-providers/base-block-data-provider.abstract.ts b/src/block-data-providers/base-block-data-provider.abstract.ts index 714d440..f19be1b 100644 --- a/src/block-data-providers/base-block-data-provider.abstract.ts +++ b/src/block-data-providers/base-block-data-provider.abstract.ts @@ -5,14 +5,19 @@ import { TransactionInput, TransactionOutput, } from '@/indexer/indexer.service'; +import { ConfigService } from '@nestjs/config'; +import { BlockStateService } from '@/block-state/block-state.service'; +import { BlockState } from '@/block-state/block-state.entity'; export abstract class BaseBlockDataProvider { protected abstract readonly logger: Logger; protected abstract readonly operationStateKey: string; protected constructor( + protected readonly configService: ConfigService, private readonly indexerService: IndexerService, private readonly operationStateService: OperationStateService, + protected readonly blockStateService: BlockStateService, ) {} async indexTransaction( @@ -39,10 +44,38 @@ export abstract class BaseBlockDataProvider { )?.state; } - async setState(state: OperationState): Promise { + async setState( + state: OperationState, + blockState: BlockState, + ): Promise { await this.operationStateService.setOperationState( this.operationStateKey, state, ); + + await this.blockStateService.addBlockState(blockState); + } + + abstract getBlockHash(height: number): Promise; + + async traceReorg(): Promise { + let state = await this.blockStateService.getCurrentBlockState(); + + if (!state) return null; + + while (state) { + const fetchedBlockHash = await this.getBlockHash(state.blockHeight); + + if (state.blockHash === fetchedBlockHash) return state.blockHeight; + + await this.blockStateService.removeState(state); + + this.logger.log( + `Reorg found at height: ${state.blockHeight}, Wrong hash: ${state.blockHash}, Correct hash: ${fetchedBlockHash}`, + ); + state = await this.blockStateService.getCurrentBlockState(); + } + + throw new Error('Cannot Reorgs, blockchain state exhausted'); } } diff --git a/src/block-data-providers/bitcoin-core/interfaces.ts b/src/block-data-providers/bitcoin-core/interfaces.ts index a4411d8..e219fa4 100644 --- a/src/block-data-providers/bitcoin-core/interfaces.ts +++ b/src/block-data-providers/bitcoin-core/interfaces.ts @@ -40,7 +40,6 @@ export interface Output { } export type BitcoinCoreOperationState = { - currentBlockHeight: number; indexedBlockHeight: number; }; diff --git a/src/block-data-providers/bitcoin-core/provider.spec.ts b/src/block-data-providers/bitcoin-core/provider.spec.ts index 7474a52..fa9ab7e 100644 --- a/src/block-data-providers/bitcoin-core/provider.spec.ts +++ b/src/block-data-providers/bitcoin-core/provider.spec.ts @@ -12,6 +12,7 @@ import { rawTransactions, } from '@/block-data-providers/bitcoin-core/provider-fixtures'; import { Test, TestingModule } from '@nestjs/testing'; +import { BlockStateService } from '@/block-state/block-state.service'; describe('Bitcoin Core Provider', () => { let provider: BitcoinCoreProvider; @@ -46,6 +47,10 @@ describe('Bitcoin Core Provider', () => { getOperationState: jest.fn(), }, }, + { + provide: BlockStateService, + useClass: jest.fn(), + }, ], }).compile(); @@ -72,8 +77,8 @@ describe('Bitcoin Core Provider', () => { it('should process each transaction of a block appropriately', async () => { const result = await provider.processBlock(3, 2); - expect(result).toHaveLength(1); - expect(result).toEqual( + expect(result[0]).toHaveLength(1); + expect(result[0]).toEqual( expect.arrayContaining([...parsedTransactions.values()]), ); }); diff --git a/src/block-data-providers/bitcoin-core/provider.ts b/src/block-data-providers/bitcoin-core/provider.ts index 3ca101e..b44bd16 100644 --- a/src/block-data-providers/bitcoin-core/provider.ts +++ b/src/block-data-providers/bitcoin-core/provider.ts @@ -28,6 +28,7 @@ import { import { AxiosRequestConfig } from 'axios'; import * as currency from 'currency.js'; import { AxiosRetryConfig, makeRequest } from '@/common/request'; +import { BlockStateService } from '@/block-state/block-state.service'; @Injectable() export class BitcoinCoreProvider @@ -41,11 +42,17 @@ export class BitcoinCoreProvider private retryConfig: AxiosRetryConfig; public constructor( - private readonly configService: ConfigService, + configService: ConfigService, indexerService: IndexerService, operationStateService: OperationStateService, + blockStateService: BlockStateService, ) { - super(indexerService, operationStateService); + super( + configService, + indexerService, + operationStateService, + blockStateService, + ); const { protocol, rpcPort, rpcHost } = configService.get('bitcoinCore'); @@ -57,25 +64,32 @@ export class BitcoinCoreProvider } async onApplicationBootstrap() { - const getState = await this.getState(); - if (getState) { + const currentState = await this.getState(); + if (currentState) { this.logger.log( `Restoring state from previous run: ${JSON.stringify( - getState, + currentState, )}`, ); } else { this.logger.log('No previous state found. Starting from scratch.'); - const state: BitcoinCoreOperationState = { - currentBlockHeight: 0, - indexedBlockHeight: - this.configService.get('app.network') === - BitcoinNetwork.MAINNET - ? TAPROOT_ACTIVATION_HEIGHT - 1 - : 0, - }; - - await this.setState(state); + + const blockHeight = + this.configService.get('app.network') === + BitcoinNetwork.MAINNET + ? TAPROOT_ACTIVATION_HEIGHT - 1 + : 0; + const blockHash = await this.getBlockHash(blockHeight); + + await this.setState( + { + indexedBlockHeight: blockHeight, + }, + { + blockHash, + blockHeight, + }, + ); } } @@ -85,12 +99,14 @@ export class BitcoinCoreProvider this.isSyncing = true; const state = await this.getState(); + if (!state) { throw new Error('State not found'); } try { const tipHeight = await this.getTipHeight(); + if (tipHeight <= state.indexedBlockHeight) { this.logger.debug( `No new blocks found. Current tip height: ${tipHeight}`, @@ -102,9 +118,11 @@ export class BitcoinCoreProvider const networkInfo = await this.getNetworkInfo(); const verbosityLevel = this.versionToVerbosity(networkInfo.version); - let height = state.indexedBlockHeight + 1; + let height = + ((await this.traceReorg()) ?? state.indexedBlockHeight) + 1; + for (height; height <= tipHeight; height++) { - const transactions = await this.processBlock( + const [transactions, blockHash] = await this.processBlock( height, verbosityLevel, ); @@ -122,7 +140,10 @@ export class BitcoinCoreProvider } state.indexedBlockHeight = height; - await this.setState(state); + await this.setState(state, { + blockHash: blockHash, + blockHeight: height, + }); } } finally { this.isSyncing = false; @@ -143,7 +164,7 @@ export class BitcoinCoreProvider }); } - private async getBlockHash(height: number): Promise { + async getBlockHash(height: number): Promise { return this.request({ method: 'getblockhash', params: [height], @@ -170,7 +191,7 @@ export class BitcoinCoreProvider public async processBlock( height: number, verbosityLevel: number, - ): Promise { + ): Promise<[Transaction[], string]> { const parsedTransactionList: Transaction[] = []; const blockHash = await this.getBlockHash(height); this.logger.debug( @@ -188,7 +209,7 @@ export class BitcoinCoreProvider parsedTransactionList.push(parsedTransaction); } - return parsedTransactionList; + return [parsedTransactionList, blockHash]; } private async parseTransaction( diff --git a/src/block-data-providers/block-provider.module.ts b/src/block-data-providers/block-provider.module.ts index 24142f5..771ae1c 100644 --- a/src/block-data-providers/block-provider.module.ts +++ b/src/block-data-providers/block-provider.module.ts @@ -7,18 +7,31 @@ import { IndexerService } from '@/indexer/indexer.service'; import { ProviderType } from '@/common/enum'; import { BitcoinCoreProvider } from '@/block-data-providers/bitcoin-core/provider'; import { EsploraProvider } from '@/block-data-providers/esplora/provider'; +import { BlockStateService } from '@/block-state/block-state.service'; +import { BlockStateModule } from '@/block-state/block-state.module'; @Module({ - imports: [OperationStateModule, IndexerModule, ConfigModule], + imports: [ + OperationStateModule, + IndexerModule, + ConfigModule, + BlockStateModule, + ], controllers: [], providers: [ { provide: 'BlockDataProvider', - inject: [ConfigService, IndexerService, OperationStateService], + inject: [ + ConfigService, + IndexerService, + OperationStateService, + BlockStateService, + ], useFactory: ( configService: ConfigService, indexerService: IndexerService, operationStateService: OperationStateService, + blockStateService: BlockStateService, ) => { switch (configService.get('providerType')) { case ProviderType.ESPLORA: @@ -26,12 +39,14 @@ import { EsploraProvider } from '@/block-data-providers/esplora/provider'; configService, indexerService, operationStateService, + blockStateService, ); case ProviderType.BITCOIN_CORE_RPC: return new BitcoinCoreProvider( configService, indexerService, operationStateService, + blockStateService, ); default: throw Error('unrecognised provider type in config'); diff --git a/src/block-data-providers/esplora/interface.ts b/src/block-data-providers/esplora/interface.ts index b62c1a9..438ef4d 100644 --- a/src/block-data-providers/esplora/interface.ts +++ b/src/block-data-providers/esplora/interface.ts @@ -1,8 +1,8 @@ -export type EsploraOperationState = { +export interface EsploraOperationState { currentBlockHeight: number; indexedBlockHeight: number; lastProcessedTxIndex: number; -}; +} type EsploraTransactionInput = { txid: string; diff --git a/src/block-data-providers/esplora/provider.ts b/src/block-data-providers/esplora/provider.ts index a27021d..5dc36f5 100644 --- a/src/block-data-providers/esplora/provider.ts +++ b/src/block-data-providers/esplora/provider.ts @@ -11,6 +11,7 @@ import { EsploraTransaction, } from '@/block-data-providers/esplora/interface'; import { TAPROOT_ACTIVATION_HEIGHT } from '@/common/constants'; +import { BlockStateService } from '@/block-state/block-state.service'; import { Cron, CronExpression } from '@nestjs/schedule'; @Injectable() @@ -26,11 +27,17 @@ export class EsploraProvider private readonly batchSize: number; constructor( - private readonly configService: ConfigService, + configService: ConfigService, indexerService: IndexerService, operationStateService: OperationStateService, + blockStateService: BlockStateService, ) { - super(indexerService, operationStateService); + super( + configService, + indexerService, + operationStateService, + blockStateService, + ); this.batchSize = this.configService.get('esplora.batchSize'); @@ -55,25 +62,34 @@ export class EsploraProvider } async onApplicationBootstrap() { - const getState = await this.getState(); - if (getState) { + const currentState = await this.getState(); + if (currentState) { this.logger.log( `Restoring state from previous run: ${JSON.stringify( - getState, + currentState, )}`, ); } else { this.logger.log('No previous state found. Starting from scratch.'); - const state: EsploraOperationState = { - currentBlockHeight: 0, - indexedBlockHeight: - this.configService.get('app.network') === - BitcoinNetwork.MAINNET - ? TAPROOT_ACTIVATION_HEIGHT - 1 - : 0, - lastProcessedTxIndex: 0, // we dont take coinbase txn in account - }; - await this.setState(state); + + const blockHeight = + this.configService.get('app.network') === + BitcoinNetwork.MAINNET + ? TAPROOT_ACTIVATION_HEIGHT - 1 + : 0; + const blockHash = await this.getBlockHash(blockHeight); + + await this.setState( + { + currentBlockHeight: 0, + indexedBlockHeight: blockHeight, + lastProcessedTxIndex: 0, // we don't take coinbase txn into account + }, + { + blockHash, + blockHeight, + }, + ); } } @@ -97,11 +113,10 @@ export class EsploraProvider return; } - for ( - let height = state.indexedBlockHeight; - height <= tipHeight; - height++ - ) { + let height = + ((await this.traceReorg()) ?? state.indexedBlockHeight) + 1; + + for (height; height <= tipHeight; height++) { const blockHash = await this.getBlockHash(height); this.logger.log( `Processing block at height ${height}, hash ${blockHash}`, @@ -156,7 +171,10 @@ export class EsploraProvider state.indexedBlockHeight = height; state.lastProcessedTxIndex = i + this.batchSize - 1; - await this.setState(state); + await this.setState(state, { + blockHeight: height, + blockHash: hash, + }); } catch (error) { this.logger.error( `Error processing transactions in block at height ${height}, hash ${hash}: ${error.message}`, @@ -189,7 +207,7 @@ export class EsploraProvider ); } - private async getBlockHash(height: number): Promise { + async getBlockHash(height: number): Promise { return makeRequest( { method: 'GET', diff --git a/src/block-state/block-state.entity.ts b/src/block-state/block-state.entity.ts new file mode 100644 index 0000000..f31a7d7 --- /dev/null +++ b/src/block-state/block-state.entity.ts @@ -0,0 +1,10 @@ +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +@Entity() +export class BlockState { + @PrimaryColumn('integer') + blockHeight: number; + + @Column('text') + blockHash: string; +} diff --git a/src/block-state/block-state.module.ts b/src/block-state/block-state.module.ts new file mode 100644 index 0000000..84baabe --- /dev/null +++ b/src/block-state/block-state.module.ts @@ -0,0 +1,13 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { BlockState } from '@/block-state/block-state.entity'; +import { BlockStateService } from '@/block-state/block-state.service'; +import { TransactionsModule } from '@/transactions/transactions.module'; + +@Module({ + imports: [TypeOrmModule.forFeature([BlockState]), TransactionsModule], + controllers: [], + providers: [BlockStateService], + exports: [BlockStateService], +}) +export class BlockStateModule {} diff --git a/src/block-state/block-state.service.ts b/src/block-state/block-state.service.ts new file mode 100644 index 0000000..5874649 --- /dev/null +++ b/src/block-state/block-state.service.ts @@ -0,0 +1,36 @@ +import { Injectable } from '@nestjs/common'; +import { Repository } from 'typeorm'; +import { BlockState } from '@/block-state/block-state.entity'; +import { InjectRepository } from '@nestjs/typeorm'; +import { TransactionsService } from '@/transactions/transactions.service'; + +@Injectable() +export class BlockStateService { + constructor( + @InjectRepository(BlockState) + private readonly blockStateRepository: Repository, + private readonly transactionService: TransactionsService, + ) {} + + async getCurrentBlockState(): Promise { + return ( + await this.blockStateRepository.find({ + order: { + blockHeight: 'DESC', + }, + take: 1, + }) + )[0]; + } + + async addBlockState(state: BlockState): Promise { + await this.blockStateRepository.save(state); + } + + async removeState(state: BlockState): Promise { + await this.blockStateRepository.delete(state.blockHeight); + await this.transactionService.deleteTransactionByBlockHash( + state.blockHash, + ); + } +} diff --git a/src/common/request.ts b/src/common/request.ts index e3f1746..595b8a4 100644 --- a/src/common/request.ts +++ b/src/common/request.ts @@ -27,7 +27,7 @@ export const makeRequest = async ( try { const response = await axios.request(requestConfig); - logger.debug( + logger.verbose( `Request to Provider succeeded:\nRequest:\n${JSON.stringify( requestConfig, null, diff --git a/src/configuration.model.ts b/src/configuration.model.ts index d65015b..c51a67d 100644 --- a/src/configuration.model.ts +++ b/src/configuration.model.ts @@ -5,6 +5,7 @@ import { IsIn, IsInt, IsNotEmpty, + IsOptional, IsString, IsUrl, Max, @@ -47,6 +48,14 @@ class AppConfig { @ValidateNested() @Type(() => AxiosRetryConfig) requestRetry: AxiosRetryConfig; + + @IsOptional() + @IsBoolean() + verbose?: boolean; + + @IsOptional() + @IsBoolean() + debug?: boolean; } class EsploraConfig { diff --git a/src/main.ts b/src/main.ts index 874a2ad..6488ce1 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,6 +1,7 @@ import { NestFactory } from '@nestjs/core'; import { AppModule } from '@/app.module'; import { ConfigService } from '@nestjs/config'; +import { LogLevel } from '@nestjs/common'; declare const module: any; @@ -10,6 +11,16 @@ async function bootstrap() { const configService = app.get(ConfigService); const port = configService.get('app.port'); + const isVerbose = configService.get('app.verbose') ?? false; + const isDebug = configService.get('app.debug') ?? false; + + const loggerLevels: LogLevel[] = ['error', 'warn', 'log']; + + if (isVerbose) loggerLevels.push('verbose'); + if (isDebug) loggerLevels.push('debug'); + + app.useLogger(loggerLevels); + await app.listen(port); if (module.hot) { diff --git a/src/transactions/transactions.service.ts b/src/transactions/transactions.service.ts index 28b5bfa..ba57ca7 100644 --- a/src/transactions/transactions.service.ts +++ b/src/transactions/transactions.service.ts @@ -1,7 +1,7 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Transaction } from '@/transactions/transaction.entity'; -import { Repository } from 'typeorm'; +import { DeleteResult, Repository } from 'typeorm'; @Injectable() export class TransactionsService { @@ -23,4 +23,10 @@ export class TransactionsService { async saveTransaction(transaction: Transaction): Promise { return this.transactionRepository.save(transaction); } + + async deleteTransactionByBlockHash( + blockHash: string, + ): Promise { + return this.transactionRepository.delete({ blockHash }); + } }