Skip to content

Commit

Permalink
fix: make indexing transactional
Browse files Browse the repository at this point in the history
Signed-off-by: chaitika_ <[email protected]>
  • Loading branch information
chaitika committed Nov 20, 2024
1 parent 57f1b07 commit 11d060a
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 105 deletions.
2 changes: 1 addition & 1 deletion e2e/indexer.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('Indexer', () => {
beforeAll(async () => {
const walletHelper = new WalletHelper();
const bitcoinRPCUtil = new BitcoinRPCUtil();
const indexerService = new IndexerService({} as any);
const indexerService = new IndexerService();
apiHelper = new ApiHelper();

await bitcoinRPCUtil.createWallet('test_wallet');
Expand Down
2 changes: 2 additions & 0 deletions jest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const config: Config.InitialOptions = {
'.*.entity.ts',
'.*.dto.ts',
'.*.spec.ts',
'.*.mock.ts',
'.*.fixture.ts',
'.*.module-definition.ts',
'.*.configuration.ts',
'.*.configuration.model.ts',
Expand Down
15 changes: 10 additions & 5 deletions src/block-data-providers/base-block-data-provider.abstract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
import { ConfigService } from '@nestjs/config';
import { BlockStateService } from '@/block-state/block-state.service';
import { BlockState } from '@/block-state/block-state.entity';
import { EntityManager } from 'typeorm';
import { OperationState } from '@/operation-state/operation-state.entity';

export abstract class BaseBlockDataProvider<OperationState> {
protected abstract readonly logger: Logger;
Expand All @@ -26,13 +28,15 @@ export abstract class BaseBlockDataProvider<OperationState> {
vout: TransactionOutput[],
blockHeight: number,
blockHash: string,
manager: EntityManager,
): Promise<void> {
await this.indexerService.index(
txid,
vin,
vout,
blockHeight,
blockHash,
manager,
);
}

Expand All @@ -47,13 +51,14 @@ export abstract class BaseBlockDataProvider<OperationState> {
async setState(
state: OperationState,
blockState: BlockState,
manager: EntityManager,
): Promise<void> {
await this.operationStateService.setOperationState(
this.operationStateKey,
state,
);
const operationState = new OperationState();
operationState.id = this.operationStateKey;
operationState.state = state;

await this.blockStateService.addBlockState(blockState);
await manager.save(OperationState, operationState);
await manager.save(BlockState, blockState);
}

abstract getBlockHash(height: number): Promise<string>;
Expand Down
7 changes: 7 additions & 0 deletions src/block-data-providers/bitcoin-core/provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from '@/block-data-providers/bitcoin-core/provider-fixtures';
import { Test, TestingModule } from '@nestjs/testing';
import { BlockStateService } from '@/block-state/block-state.service';
import { DbTransactionService } from '@/db-transaction/db-transaction.service';

describe('Bitcoin Core Provider', () => {
let provider: BitcoinCoreProvider;
Expand Down Expand Up @@ -51,6 +52,12 @@ describe('Bitcoin Core Provider', () => {
provide: BlockStateService,
useClass: jest.fn(),
},
{
provide: DbTransactionService,
useValue: {
execute: jest.fn(),
},
},
],
}).compile();

Expand Down
60 changes: 36 additions & 24 deletions src/block-data-providers/bitcoin-core/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { AxiosRequestConfig } from 'axios';
import * as currency from 'currency.js';
import { AxiosRetryConfig, makeRequest } from '@/common/request';
import { BlockStateService } from '@/block-state/block-state.service';
import { DbTransactionService } from '@/db-transaction/db-transaction.service';

@Injectable()
export class BitcoinCoreProvider
Expand All @@ -46,6 +47,7 @@ export class BitcoinCoreProvider
indexerService: IndexerService,
operationStateService: OperationStateService,
blockStateService: BlockStateService,
private readonly dbTransactionService: DbTransactionService,
) {
super(
configService,
Expand Down Expand Up @@ -81,15 +83,18 @@ export class BitcoinCoreProvider
: 0;
const blockHash = await this.getBlockHash(blockHeight);

await this.setState(
{
indexedBlockHeight: blockHeight,
},
{
blockHash,
blockHeight,
},
);
await this.dbTransactionService.execute(async (manager) => {
await this.setState(
{
indexedBlockHeight: blockHeight,
},
{
blockHash,
blockHeight,
},
manager,
);
});
}
}

Expand Down Expand Up @@ -127,22 +132,29 @@ export class BitcoinCoreProvider
verbosityLevel,
);

for (const transaction of transactions) {
const { txid, vin, vout, blockHeight, blockHash } =
transaction;
await this.indexTransaction(
txid,
vin,
vout,
blockHeight,
blockHash,
await this.dbTransactionService.execute(async (manager) => {
for (const transaction of transactions) {
const { txid, vin, vout, blockHeight, blockHash } =
transaction;
await this.indexTransaction(
txid,
vin,
vout,
blockHeight,
blockHash,
manager,
);
}

state.indexedBlockHeight = height;
await this.setState(
state,
{
blockHash: blockHash,
blockHeight: height,
},
manager,
);
}

state.indexedBlockHeight = height;
await this.setState(state, {
blockHash: blockHash,
blockHeight: height,
});
}
} finally {
Expand Down
7 changes: 7 additions & 0 deletions src/block-data-providers/block-provider.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import { BitcoinCoreProvider } from '@/block-data-providers/bitcoin-core/provide
import { EsploraProvider } from '@/block-data-providers/esplora/provider';
import { BlockStateService } from '@/block-state/block-state.service';
import { BlockStateModule } from '@/block-state/block-state.module';
import { DbTransactionModule } from '@/db-transaction/db-transaction.module';
import { DbTransactionService } from '@/db-transaction/db-transaction.service';

@Module({
imports: [
OperationStateModule,
IndexerModule,
ConfigModule,
BlockStateModule,
DbTransactionModule,
],
controllers: [],
providers: [
Expand All @@ -26,12 +29,14 @@ import { BlockStateModule } from '@/block-state/block-state.module';
IndexerService,
OperationStateService,
BlockStateService,
DbTransactionService,
],
useFactory: (
configService: ConfigService,
indexerService: IndexerService,
operationStateService: OperationStateService,
blockStateService: BlockStateService,
dbTransactionService: DbTransactionService,
) => {
switch (configService.get<ProviderType>('providerType')) {
case ProviderType.ESPLORA:
Expand All @@ -40,13 +45,15 @@ import { BlockStateModule } from '@/block-state/block-state.module';
indexerService,
operationStateService,
blockStateService,
dbTransactionService,
);
case ProviderType.BITCOIN_CORE_RPC:
return new BitcoinCoreProvider(
configService,
indexerService,
operationStateService,
blockStateService,
dbTransactionService,
);
default:
throw Error('unrecognised provider type in config');
Expand Down
94 changes: 54 additions & 40 deletions src/block-data-providers/esplora/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import { TAPROOT_ACTIVATION_HEIGHT } from '@/common/constants';
import { BlockStateService } from '@/block-state/block-state.service';
import { Cron, CronExpression } from '@nestjs/schedule';
import { DbTransactionService } from '@/db-transaction/db-transaction.service';

@Injectable()
export class EsploraProvider
Expand All @@ -31,6 +32,7 @@ export class EsploraProvider
indexerService: IndexerService,
operationStateService: OperationStateService,
blockStateService: BlockStateService,
private readonly dbTransactionService: DbTransactionService,
) {
super(
configService,
Expand Down Expand Up @@ -79,17 +81,20 @@ export class EsploraProvider
: 0;
const blockHash = await this.getBlockHash(blockHeight);

await this.setState(
{
currentBlockHeight: 0,
indexedBlockHeight: blockHeight,
lastProcessedTxIndex: 0, // we don't take coinbase txn into account
},
{
blockHash,
blockHeight,
},
);
await this.dbTransactionService.execute(async (manager) => {
await this.setState(
{
currentBlockHeight: 0,
indexedBlockHeight: blockHeight,
lastProcessedTxIndex: 0, // we don't take coinbase txn into account
},
{
blockHash,
blockHeight,
},
manager,
);
});
}
}

Expand Down Expand Up @@ -144,36 +149,45 @@ export class EsploraProvider
);

try {
await Promise.all(
batch.map(async (txid) => {
const tx = await this.getTx(txid);
const vin: TransactionInput[] = tx.vin.map((input) => ({
txid: input.txid,
vout: input.vout,
scriptSig: input.scriptsig,
prevOutScript: input.prevout.scriptpubkey,
witness: input.witness,
}));
const vout = tx.vout.map((output) => ({
scriptPubKey: output.scriptpubkey,
value: output.value,
}));

await this.indexTransaction(
txid,
vin,
vout,
height,
hash,
);
}, this),
);
await this.dbTransactionService.execute(async (manager) => {
await Promise.all(
batch.map(async (txid) => {
const tx = await this.getTx(txid);
const vin: TransactionInput[] = tx.vin.map(
(input) => ({
txid: input.txid,
vout: input.vout,
scriptSig: input.scriptsig,
prevOutScript: input.prevout.scriptpubkey,
witness: input.witness,
}),
);
const vout = tx.vout.map((output) => ({
scriptPubKey: output.scriptpubkey,
value: output.value,
}));

await this.indexTransaction(
txid,
vin,
vout,
height,
hash,
manager,
);
}, this),
);

state.indexedBlockHeight = height;
state.lastProcessedTxIndex = i + this.batchSize - 1;
await this.setState(state, {
blockHeight: height,
blockHash: hash,
state.indexedBlockHeight = height;
state.lastProcessedTxIndex = i + this.batchSize - 1;
await this.setState(
state,
{
blockHeight: height,
blockHash: hash,
},
manager,
);
});
} catch (error) {
this.logger.error(
Expand Down
4 changes: 0 additions & 4 deletions src/block-state/block-state.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ export class BlockStateService {
)[0];
}

async addBlockState(state: BlockState): Promise<void> {
await this.blockStateRepository.save(state);
}

async removeState(state: BlockState): Promise<void> {
await this.blockStateRepository.delete(state.blockHeight);
await this.transactionService.deleteTransactionByBlockHash(
Expand Down
43 changes: 43 additions & 0 deletions src/db-transaction/db-transaction.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import 'jest';

export const mockQueryBuilder = {
select: jest.fn().mockReturnThis(),
addSelect: jest.fn().mockReturnThis(),
where: jest.fn().mockReturnThis(),
groupBy: jest.fn().mockReturnThis(),
createQueryBuilder: jest.fn().mockReturnThis(),
leftJoin: jest.fn().mockReturnThis(),
leftJoinAndSelect: jest.fn().mockReturnThis(),
andWhere: jest.fn().mockReturnThis(),
getOne: jest.fn(),
getQuery: jest.fn(),
getRawOne: jest.fn(),
from: jest.fn().mockReturnThis(),
};

export const mockEntityManager = {
createQueryBuilder: jest.fn().mockReturnValue(mockQueryBuilder),
query: jest.fn(),
save: jest.fn(),
update: jest.fn(),
upsert: jest.fn(),
find: jest.fn(),
findOne: jest.fn(),
findOneBy: jest.fn(),
increment: jest.fn(),
findBy: jest.fn(),
exists: jest.fn(),
};

export const queryRunnerMock = {
connect: jest.fn(),
startTransaction: jest.fn(),
commitTransaction: jest.fn(),
rollbackTransaction: jest.fn(),
release: jest.fn(),
manager: mockEntityManager,
};

export class MockDataSource {
createQueryRunner = jest.fn().mockReturnValue(queryRunnerMock);
}
Loading

0 comments on commit 11d060a

Please sign in to comment.