diff --git a/e2e/indexer.e2e-spec.ts b/e2e/indexer.e2e-spec.ts index 42a40d4..cd6f245 100644 --- a/e2e/indexer.e2e-spec.ts +++ b/e2e/indexer.e2e-spec.ts @@ -14,7 +14,7 @@ describe('Indexer', () => { beforeAll(async () => { const walletHelper = new WalletHelper(); const bitcoinRPCUtil = new BitcoinRPCUtil(); - const indexerService = new IndexerService({} as any); + const indexerService = new IndexerService(); apiHelper = new ApiHelper(); await bitcoinRPCUtil.createWallet('test_wallet'); diff --git a/jest.config.ts b/jest.config.ts index 33ea88f..2b3a097 100644 --- a/jest.config.ts +++ b/jest.config.ts @@ -17,6 +17,8 @@ const config: Config.InitialOptions = { '.*.entity.ts', '.*.dto.ts', '.*.spec.ts', + '.*.mock.ts', + '.*.fixture.ts', '.*.module-definition.ts', '.*.configuration.ts', '.*.configuration.model.ts', diff --git a/src/block-data-providers/base-block-data-provider.abstract.ts b/src/block-data-providers/base-block-data-provider.abstract.ts index f19be1b..68498cc 100644 --- a/src/block-data-providers/base-block-data-provider.abstract.ts +++ b/src/block-data-providers/base-block-data-provider.abstract.ts @@ -8,6 +8,8 @@ import { import { ConfigService } from '@nestjs/config'; import { BlockStateService } from '@/block-state/block-state.service'; import { BlockState } from '@/block-state/block-state.entity'; +import { EntityManager } from 'typeorm'; +import { OperationState } from '@/operation-state/operation-state.entity'; export abstract class BaseBlockDataProvider { protected abstract readonly logger: Logger; @@ -26,6 +28,7 @@ export abstract class BaseBlockDataProvider { vout: TransactionOutput[], blockHeight: number, blockHash: string, + manager: EntityManager, ): Promise { await this.indexerService.index( txid, @@ -33,6 +36,7 @@ export abstract class BaseBlockDataProvider { vout, blockHeight, blockHash, + manager, ); } @@ -47,13 +51,14 @@ export abstract class BaseBlockDataProvider { async setState( state: OperationState, blockState: BlockState, + manager: EntityManager, ): Promise { - await this.operationStateService.setOperationState( - this.operationStateKey, - state, - ); + const operationState = new OperationState(); + operationState.id = this.operationStateKey; + operationState.state = state; - await this.blockStateService.addBlockState(blockState); + await manager.save(OperationState, operationState); + await manager.save(BlockState, blockState); } abstract getBlockHash(height: number): Promise; diff --git a/src/block-data-providers/bitcoin-core/provider.spec.ts b/src/block-data-providers/bitcoin-core/provider.spec.ts index fa9ab7e..6275e79 100644 --- a/src/block-data-providers/bitcoin-core/provider.spec.ts +++ b/src/block-data-providers/bitcoin-core/provider.spec.ts @@ -13,6 +13,7 @@ import { } from '@/block-data-providers/bitcoin-core/provider-fixtures'; import { Test, TestingModule } from '@nestjs/testing'; import { BlockStateService } from '@/block-state/block-state.service'; +import { DbTransactionService } from '@/db-transaction/db-transaction.service'; describe('Bitcoin Core Provider', () => { let provider: BitcoinCoreProvider; @@ -51,6 +52,12 @@ describe('Bitcoin Core Provider', () => { provide: BlockStateService, useClass: jest.fn(), }, + { + provide: DbTransactionService, + useValue: { + execute: jest.fn(), + }, + }, ], }).compile(); diff --git a/src/block-data-providers/bitcoin-core/provider.ts b/src/block-data-providers/bitcoin-core/provider.ts index b44bd16..303bc60 100644 --- a/src/block-data-providers/bitcoin-core/provider.ts +++ b/src/block-data-providers/bitcoin-core/provider.ts @@ -29,6 +29,7 @@ import { AxiosRequestConfig } from 'axios'; import * as currency from 'currency.js'; import { AxiosRetryConfig, makeRequest } from '@/common/request'; import { BlockStateService } from '@/block-state/block-state.service'; +import { DbTransactionService } from '@/db-transaction/db-transaction.service'; @Injectable() export class BitcoinCoreProvider @@ -46,6 +47,7 @@ export class BitcoinCoreProvider indexerService: IndexerService, operationStateService: OperationStateService, blockStateService: BlockStateService, + private readonly dbTransactionService: DbTransactionService, ) { super( configService, @@ -81,15 +83,18 @@ export class BitcoinCoreProvider : 0; const blockHash = await this.getBlockHash(blockHeight); - await this.setState( - { - indexedBlockHeight: blockHeight, - }, - { - blockHash, - blockHeight, - }, - ); + await this.dbTransactionService.execute(async (manager) => { + await this.setState( + { + indexedBlockHeight: blockHeight, + }, + { + blockHash, + blockHeight, + }, + manager, + ); + }); } } @@ -127,22 +132,29 @@ export class BitcoinCoreProvider verbosityLevel, ); - for (const transaction of transactions) { - const { txid, vin, vout, blockHeight, blockHash } = - transaction; - await this.indexTransaction( - txid, - vin, - vout, - blockHeight, - blockHash, + await this.dbTransactionService.execute(async (manager) => { + for (const transaction of transactions) { + const { txid, vin, vout, blockHeight, blockHash } = + transaction; + await this.indexTransaction( + txid, + vin, + vout, + blockHeight, + blockHash, + manager, + ); + } + + state.indexedBlockHeight = height; + await this.setState( + state, + { + blockHash: blockHash, + blockHeight: height, + }, + manager, ); - } - - state.indexedBlockHeight = height; - await this.setState(state, { - blockHash: blockHash, - blockHeight: height, }); } } finally { diff --git a/src/block-data-providers/block-provider.module.ts b/src/block-data-providers/block-provider.module.ts index 771ae1c..93b18d1 100644 --- a/src/block-data-providers/block-provider.module.ts +++ b/src/block-data-providers/block-provider.module.ts @@ -9,6 +9,8 @@ import { BitcoinCoreProvider } from '@/block-data-providers/bitcoin-core/provide import { EsploraProvider } from '@/block-data-providers/esplora/provider'; import { BlockStateService } from '@/block-state/block-state.service'; import { BlockStateModule } from '@/block-state/block-state.module'; +import { DbTransactionModule } from '@/db-transaction/db-transaction.module'; +import { DbTransactionService } from '@/db-transaction/db-transaction.service'; @Module({ imports: [ @@ -16,6 +18,7 @@ import { BlockStateModule } from '@/block-state/block-state.module'; IndexerModule, ConfigModule, BlockStateModule, + DbTransactionModule, ], controllers: [], providers: [ @@ -26,12 +29,14 @@ import { BlockStateModule } from '@/block-state/block-state.module'; IndexerService, OperationStateService, BlockStateService, + DbTransactionService, ], useFactory: ( configService: ConfigService, indexerService: IndexerService, operationStateService: OperationStateService, blockStateService: BlockStateService, + dbTransactionService: DbTransactionService, ) => { switch (configService.get('providerType')) { case ProviderType.ESPLORA: @@ -40,6 +45,7 @@ import { BlockStateModule } from '@/block-state/block-state.module'; indexerService, operationStateService, blockStateService, + dbTransactionService, ); case ProviderType.BITCOIN_CORE_RPC: return new BitcoinCoreProvider( @@ -47,6 +53,7 @@ import { BlockStateModule } from '@/block-state/block-state.module'; indexerService, operationStateService, blockStateService, + dbTransactionService, ); default: throw Error('unrecognised provider type in config'); diff --git a/src/block-data-providers/esplora/provider.ts b/src/block-data-providers/esplora/provider.ts index 5dc36f5..b1536cb 100644 --- a/src/block-data-providers/esplora/provider.ts +++ b/src/block-data-providers/esplora/provider.ts @@ -13,6 +13,7 @@ import { import { TAPROOT_ACTIVATION_HEIGHT } from '@/common/constants'; import { BlockStateService } from '@/block-state/block-state.service'; import { Cron, CronExpression } from '@nestjs/schedule'; +import { DbTransactionService } from '@/db-transaction/db-transaction.service'; @Injectable() export class EsploraProvider @@ -31,6 +32,7 @@ export class EsploraProvider indexerService: IndexerService, operationStateService: OperationStateService, blockStateService: BlockStateService, + private readonly dbTransactionService: DbTransactionService, ) { super( configService, @@ -79,17 +81,20 @@ export class EsploraProvider : 0; const blockHash = await this.getBlockHash(blockHeight); - await this.setState( - { - currentBlockHeight: 0, - indexedBlockHeight: blockHeight, - lastProcessedTxIndex: 0, // we don't take coinbase txn into account - }, - { - blockHash, - blockHeight, - }, - ); + await this.dbTransactionService.execute(async (manager) => { + await this.setState( + { + currentBlockHeight: 0, + indexedBlockHeight: blockHeight, + lastProcessedTxIndex: 0, // we don't take coinbase txn into account + }, + { + blockHash, + blockHeight, + }, + manager, + ); + }); } } @@ -144,36 +149,45 @@ export class EsploraProvider ); try { - await Promise.all( - batch.map(async (txid) => { - const tx = await this.getTx(txid); - const vin: TransactionInput[] = tx.vin.map((input) => ({ - txid: input.txid, - vout: input.vout, - scriptSig: input.scriptsig, - prevOutScript: input.prevout.scriptpubkey, - witness: input.witness, - })); - const vout = tx.vout.map((output) => ({ - scriptPubKey: output.scriptpubkey, - value: output.value, - })); - - await this.indexTransaction( - txid, - vin, - vout, - height, - hash, - ); - }, this), - ); + await this.dbTransactionService.execute(async (manager) => { + await Promise.all( + batch.map(async (txid) => { + const tx = await this.getTx(txid); + const vin: TransactionInput[] = tx.vin.map( + (input) => ({ + txid: input.txid, + vout: input.vout, + scriptSig: input.scriptsig, + prevOutScript: input.prevout.scriptpubkey, + witness: input.witness, + }), + ); + const vout = tx.vout.map((output) => ({ + scriptPubKey: output.scriptpubkey, + value: output.value, + })); + + await this.indexTransaction( + txid, + vin, + vout, + height, + hash, + manager, + ); + }, this), + ); - state.indexedBlockHeight = height; - state.lastProcessedTxIndex = i + this.batchSize - 1; - await this.setState(state, { - blockHeight: height, - blockHash: hash, + state.indexedBlockHeight = height; + state.lastProcessedTxIndex = i + this.batchSize - 1; + await this.setState( + state, + { + blockHeight: height, + blockHash: hash, + }, + manager, + ); }); } catch (error) { this.logger.error( diff --git a/src/block-state/block-state.service.ts b/src/block-state/block-state.service.ts index 5874649..db4c32e 100644 --- a/src/block-state/block-state.service.ts +++ b/src/block-state/block-state.service.ts @@ -23,10 +23,6 @@ export class BlockStateService { )[0]; } - async addBlockState(state: BlockState): Promise { - await this.blockStateRepository.save(state); - } - async removeState(state: BlockState): Promise { await this.blockStateRepository.delete(state.blockHeight); await this.transactionService.deleteTransactionByBlockHash( diff --git a/src/db-transaction/db-transaction.mock.ts b/src/db-transaction/db-transaction.mock.ts new file mode 100644 index 0000000..aeb84dd --- /dev/null +++ b/src/db-transaction/db-transaction.mock.ts @@ -0,0 +1,43 @@ +import 'jest'; + +export const mockQueryBuilder = { + select: jest.fn().mockReturnThis(), + addSelect: jest.fn().mockReturnThis(), + where: jest.fn().mockReturnThis(), + groupBy: jest.fn().mockReturnThis(), + createQueryBuilder: jest.fn().mockReturnThis(), + leftJoin: jest.fn().mockReturnThis(), + leftJoinAndSelect: jest.fn().mockReturnThis(), + andWhere: jest.fn().mockReturnThis(), + getOne: jest.fn(), + getQuery: jest.fn(), + getRawOne: jest.fn(), + from: jest.fn().mockReturnThis(), +}; + +export const mockEntityManager = { + createQueryBuilder: jest.fn().mockReturnValue(mockQueryBuilder), + query: jest.fn(), + save: jest.fn(), + update: jest.fn(), + upsert: jest.fn(), + find: jest.fn(), + findOne: jest.fn(), + findOneBy: jest.fn(), + increment: jest.fn(), + findBy: jest.fn(), + exists: jest.fn(), +}; + +export const queryRunnerMock = { + connect: jest.fn(), + startTransaction: jest.fn(), + commitTransaction: jest.fn(), + rollbackTransaction: jest.fn(), + release: jest.fn(), + manager: mockEntityManager, +}; + +export class MockDataSource { + createQueryRunner = jest.fn().mockReturnValue(queryRunnerMock); +} diff --git a/src/db-transaction/db-transaction.module.ts b/src/db-transaction/db-transaction.module.ts new file mode 100644 index 0000000..5b13bd6 --- /dev/null +++ b/src/db-transaction/db-transaction.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { DbTransactionService } from '@/db-transaction/db-transaction.service'; + +@Module({ + imports: [], + controllers: [], + providers: [DbTransactionService], + exports: [DbTransactionService], +}) +export class DbTransactionModule {} diff --git a/src/db-transaction/db-transaction.service.spec.ts b/src/db-transaction/db-transaction.service.spec.ts new file mode 100644 index 0000000..2b8873e --- /dev/null +++ b/src/db-transaction/db-transaction.service.spec.ts @@ -0,0 +1,61 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { getDataSourceToken } from '@nestjs/typeorm'; +import { DataSource, QueryRunner } from 'typeorm'; +import { DbTransactionService } from '@/db-transaction/db-transaction.service'; +import { MockDataSource } from '@/db-transaction/db-transaction.mock'; + +describe('DbTransactionService', () => { + let service: DbTransactionService; + let mockQueryRunner: QueryRunner; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + DbTransactionService, + { + provide: getDataSourceToken('default'), + useClass: MockDataSource, + }, + ], + }).compile(); + service = module.get(DbTransactionService); + mockQueryRunner = module + .get(getDataSourceToken('default')) + .createQueryRunner(); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + it('should commit transaction and release query runner on success', async () => { + const dummyExecutable = jest.fn(); + await service.execute(dummyExecutable); + expect(mockQueryRunner.connect).toHaveBeenCalledTimes(1); + expect(mockQueryRunner.startTransaction).toHaveBeenCalledTimes(1); + expect(dummyExecutable).toHaveBeenCalledTimes(1); + expect(dummyExecutable.mock.calls[0][0]).toBe(mockQueryRunner.manager); + expect(mockQueryRunner.commitTransaction).toHaveBeenCalledTimes(1); + expect(mockQueryRunner.rollbackTransaction).toHaveBeenCalledTimes(0); + expect(mockQueryRunner.release).toHaveBeenCalledTimes(1); + }); + + it('should roll back transaction and release query runner on error', async () => { + const dummyExecutable = jest.fn(); + dummyExecutable.mockRejectedValue(new Error('mock error')); + await expect(service.execute(dummyExecutable)).rejects.toThrow( + 'mock error', + ); + expect(mockQueryRunner.connect).toHaveBeenCalledTimes(1); + expect(mockQueryRunner.startTransaction).toHaveBeenCalledTimes(1); + expect(dummyExecutable).toHaveBeenCalledTimes(1); + expect(dummyExecutable.mock.calls[0][0]).toBe(mockQueryRunner.manager); + expect(mockQueryRunner.commitTransaction).toHaveBeenCalledTimes(0); + expect(mockQueryRunner.rollbackTransaction).toHaveBeenCalledTimes(1); + expect(mockQueryRunner.release).toHaveBeenCalledTimes(1); + }); + + afterEach(() => { + jest.resetAllMocks(); + }); +}); diff --git a/src/db-transaction/db-transaction.service.ts b/src/db-transaction/db-transaction.service.ts new file mode 100644 index 0000000..77e8c93 --- /dev/null +++ b/src/db-transaction/db-transaction.service.ts @@ -0,0 +1,40 @@ +import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import { DataSource, EntityManager, QueryRunner } from 'typeorm'; +import { IsolationLevel } from 'typeorm/driver/types/IsolationLevel'; + +@Injectable() +export class DbTransactionService implements OnModuleDestroy { + private readonly queryRunnerSet: Set; + constructor(private readonly dataSource: DataSource) { + this.queryRunnerSet = new Set(); + } + + async execute( + executable: (manager: EntityManager) => Promise, + isolationLevel: IsolationLevel = 'SERIALIZABLE', + ): Promise { + const queryRunner = this.dataSource.createQueryRunner(); + this.queryRunnerSet.add(queryRunner); + await queryRunner.connect(); + await queryRunner.startTransaction(isolationLevel); + try { + const result = await executable(queryRunner.manager); + await queryRunner.commitTransaction(); + return result; + } catch (err) { + // since we have errors, rollback the changes we made + await queryRunner.rollbackTransaction(); + throw err; + } finally { + // we need to release a queryRunner which was manually instantiated + await queryRunner.release(); + this.queryRunnerSet.delete(queryRunner); + } + } + + async onModuleDestroy(): Promise { + for (const queryRunner of this.queryRunnerSet) { + await queryRunner.rollbackTransaction(); + } + } +} diff --git a/src/indexer/indexer.service.spec.ts b/src/indexer/indexer.service.spec.ts index 738d3a9..dea4454 100644 --- a/src/indexer/indexer.service.spec.ts +++ b/src/indexer/indexer.service.spec.ts @@ -1,14 +1,13 @@ import { Test, TestingModule } from '@nestjs/testing'; -import { TransactionsService } from '@/transactions/transactions.service'; -import { getRepositoryToken } from '@nestjs/typeorm'; import { Transaction } from '@/transactions/transaction.entity'; import { IndexerService } from '@/indexer/indexer.service'; import { testData } from '@/indexer/indexer.fixture'; -import { DataSource, Repository } from 'typeorm'; +import { DataSource } from 'typeorm'; +import { DbTransactionService } from '@/db-transaction/db-transaction.service'; describe('IndexerService', () => { let service: IndexerService; - let repository: Repository; + let dbTransactionService: DbTransactionService; let dataSource: DataSource; beforeEach(async () => { @@ -21,20 +20,21 @@ describe('IndexerService', () => { logging: false, }); await dataSource.initialize(); - repository = dataSource.getRepository(Transaction); const module: TestingModule = await Test.createTestingModule({ providers: [ IndexerService, - TransactionsService, + DbTransactionService, { - provide: getRepositoryToken(Transaction), - useValue: repository, + provide: DataSource, + useValue: dataSource, }, ], }).compile(); service = module.get(IndexerService); + dbTransactionService = + module.get(DbTransactionService); }); it('should be defined', () => { @@ -44,18 +44,25 @@ describe('IndexerService', () => { it.each(testData)( 'should validate that scanTweaks are created for only valid transactions', async (transaction) => { - await service.index( - transaction.txid, - transaction.vin, - transaction.vout, - 0, - '0000000000000000000000000000000000000000000000000000000000000000', - ); - - const transactionEntity = await repository.findOne({ - where: { id: transaction.txid }, + await dbTransactionService.execute(async (manager) => { + await service.index( + transaction.txid, + transaction.vin, + transaction.vout, + 0, + '0000000000000000000000000000000000000000000000000000000000000000', + manager, + ); }); + const transactionEntity = + await dbTransactionService.execute( + async (manager) => + manager.findOne(Transaction, { + where: { id: transaction.txid }, + }), + ); + if (transaction.scanTweak) { expect(transactionEntity.scanTweak).toBe(transaction.scanTweak); } else { diff --git a/src/indexer/indexer.service.ts b/src/indexer/indexer.service.ts index 564950e..bcaf463 100644 --- a/src/indexer/indexer.service.ts +++ b/src/indexer/indexer.service.ts @@ -1,4 +1,3 @@ -import { TransactionsService } from '@/transactions/transactions.service'; import { Transaction, TransactionOutput as TransactionOutputEntity, @@ -6,6 +5,7 @@ import { import { createTaggedHash, extractPubKeyFromScript } from '@/common/common'; import { publicKeyCombine, publicKeyTweakMul } from 'secp256k1'; import { Injectable } from '@nestjs/common'; +import { EntityManager } from 'typeorm'; export type TransactionInput = { txid: string; // transaction id @@ -22,15 +22,14 @@ export type TransactionOutput = { @Injectable() export class IndexerService { - constructor(private readonly transactionsService: TransactionsService) {} - async index( txid: string, vin: TransactionInput[], vout: TransactionOutput[], blockHeight: number, blockHash: string, - ) { + manager: EntityManager, + ): Promise { const scanResult = this.computeScanTweak(vin, vout); if (scanResult !== null) { const [scanTweak, eligibleOutputPubKeys] = scanResult; @@ -42,7 +41,7 @@ export class IndexerService { transaction.outputs = eligibleOutputPubKeys; transaction.isSpent = false; - await this.transactionsService.saveTransaction(transaction); + await manager.save(Transaction, transaction); } } diff --git a/src/operation-state/operation-state.service.ts b/src/operation-state/operation-state.service.ts index 554debb..5ba56c9 100644 --- a/src/operation-state/operation-state.service.ts +++ b/src/operation-state/operation-state.service.ts @@ -13,11 +13,4 @@ export class OperationStateService { async getOperationState(id: string): Promise { return this.operationStateRepository.findOneBy({ id: id }); } - - async setOperationState(id: string, state: any): Promise { - const operationState = new OperationState(); - operationState.id = id; - operationState.state = state; - return this.operationStateRepository.save(operationState); - } } diff --git a/tsconfig.build.json b/tsconfig.build.json index 64f86c6..3f31828 100644 --- a/tsconfig.build.json +++ b/tsconfig.build.json @@ -1,4 +1,4 @@ { "extends": "./tsconfig.json", - "exclude": ["node_modules", "test", "dist", "**/*spec.ts"] + "exclude": ["node_modules", "test", "dist", "**/*spec.ts", "**/*mock.ts", "**/*fixture.ts"] }