diff --git a/packages/node/src/configure/configure.module.ts b/packages/node/src/configure/configure.module.ts index 416686e5..ab04c2ef 100644 --- a/packages/node/src/configure/configure.module.ts +++ b/packages/node/src/configure/configure.module.ts @@ -59,7 +59,10 @@ function warnDeprecations() { @Global() @Module({}) export class ConfigureModule { - static async register(): Promise { + static async getInstance(): Promise<{ + config: NodeConfig; + project: () => Promise; + }> { const { argv } = yargsOptions; let config: NodeConfig; let rawManifest: unknown; @@ -130,6 +133,10 @@ export class ConfigureModule { }); return p; }; + return { config, project }; + } + static async register(): Promise { + const { config, project } = await ConfigureModule.getInstance(); return { module: ConfigureModule, diff --git a/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts b/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts index c5aaa7d3..779fe90b 100644 --- a/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts +++ b/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts @@ -18,6 +18,9 @@ import { HostDynamicDS, WorkerBlockDispatcher, IUnfinalizedBlocksService, + HostConnectionPoolState, + ConnectionPoolStateManager, + connectionPoolStateHostFunctions, } from '@subql/node-core'; import { Store } from '@subql/types-soroban'; import chalk from 'chalk'; @@ -25,6 +28,7 @@ import { SubqlProjectDs, SubqueryProject, } from '../../configure/SubqueryProject'; +import { SorobanApiConnection } from '../../soroban/api.connection'; import { SorobanBlockWrapped } from '../../soroban/block.soroban'; import { DynamicDsService } from '../dynamic-ds.service'; import { UnfinalizedBlocksService } from '../unfinalizedBlocks.service'; @@ -41,6 +45,7 @@ async function createIndexerWorker( store: Store, dynamicDsService: IDynamicDsService, unfinalizedBlocksService: IUnfinalizedBlocksService, + connectionPoolState: ConnectionPoolStateManager, root: string, ): Promise { const indexerWorker = Worker.create< @@ -75,6 +80,7 @@ async function createIndexerWorker( unfinalizedBlocksService.processUnfinalizedBlockHeader.bind( unfinalizedBlocksService, ), + ...connectionPoolStateHostFunctions(connectionPoolState), }, root, ); @@ -100,6 +106,7 @@ export class WorkerBlockDispatcherService @Inject('ISubqueryProject') project: SubqueryProject, dynamicDsService: DynamicDsService, unfinalizedBlocksSevice: UnfinalizedBlocksService, + connectionPoolState: ConnectionPoolStateManager, ) { super( nodeConfig, @@ -116,6 +123,7 @@ export class WorkerBlockDispatcherService storeService.getStore(), dynamicDsService, unfinalizedBlocksSevice, + connectionPoolState, project.root, ), ); diff --git a/packages/node/src/indexer/fetch.module.ts b/packages/node/src/indexer/fetch.module.ts index f192f9aa..46531919 100644 --- a/packages/node/src/indexer/fetch.module.ts +++ b/packages/node/src/indexer/fetch.module.ts @@ -80,6 +80,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; project: SubqueryProject, dynamicDsService: DynamicDsService, unfinalizedBlocks: UnfinalizedBlocksService, + connectionPoolState: ConnectionPoolStateManager, ) => nodeConfig.workers !== undefined ? new WorkerBlockDispatcherService( @@ -93,6 +94,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; project, dynamicDsService, unfinalizedBlocks, + connectionPoolState, ) : new BlockDispatcherService( apiService, @@ -120,6 +122,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; 'ISubqueryProject', DynamicDsService, UnfinalizedBlocksService, + ConnectionPoolStateManager, ], }, FetchService, diff --git a/packages/node/src/indexer/indexer.module.ts b/packages/node/src/indexer/indexer.module.ts index 98b70542..6e6a9700 100644 --- a/packages/node/src/indexer/indexer.module.ts +++ b/packages/node/src/indexer/indexer.module.ts @@ -15,6 +15,8 @@ import { PgMmrCacheService, MmrQueryService, ConnectionPoolStateManager, + WorkerConnectionPoolStateManager, + NodeConfig, } from '@subql/node-core'; import { SubqueryProject } from '../configure/SubqueryProject'; import { SorobanApiService } from '../soroban'; @@ -33,8 +35,16 @@ import { WorkerUnfinalizedBlocksService } from './worker/worker.unfinalizedBlock IndexerManager, StoreCacheService, StoreService, + { + provide: ConnectionPoolStateManager, + useFactory: () => { + if (isMainThread) { + throw new Error('Expected to be worker thread'); + } + return new WorkerConnectionPoolStateManager((global as any).host); + }, + }, ConnectionPoolService, - ConnectionPoolStateManager, { provide: ApiService, useFactory: async ( diff --git a/packages/node/src/indexer/worker/worker.ts b/packages/node/src/indexer/worker/worker.ts index 2a85aeb9..c197a5c9 100644 --- a/packages/node/src/indexer/worker/worker.ts +++ b/packages/node/src/indexer/worker/worker.ts @@ -31,8 +31,11 @@ import { hostDynamicDsKeys, HostDynamicDS, ProcessBlockResponse, + HostConnectionPoolState, + hostConnectionPoolStateKeys, } from '@subql/node-core'; import { SubqlProjectDs } from '../../configure/SubqueryProject'; +import { SorobanApiConnection } from '../../soroban/api.connection'; import { IndexerManager } from '../indexer.manager'; import { WorkerModule } from './worker.module'; import { @@ -119,10 +122,18 @@ async function waitForWorkerBatchSize(heapSizeInBytes: number): Promise { // Register these functions to be exposed to worker host (global as any).host = WorkerHost.create< - HostStore & HostDynamicDS & HostUnfinalizedBlocks, + HostStore & + HostDynamicDS & + HostUnfinalizedBlocks & + HostConnectionPoolState, IInitIndexerWorker >( - [...hostStoreKeys, ...hostDynamicDsKeys, ...hostUnfinalizedBlocksKeys], + [ + ...hostStoreKeys, + ...hostDynamicDsKeys, + ...hostUnfinalizedBlocksKeys, + ...hostConnectionPoolStateKeys, + ], { initWorker, fetchBlock, diff --git a/packages/node/src/soroban/api.service.soroban.ts b/packages/node/src/soroban/api.service.soroban.ts index cd10eab1..4ab4616f 100644 --- a/packages/node/src/soroban/api.service.soroban.ts +++ b/packages/node/src/soroban/api.service.soroban.ts @@ -53,37 +53,35 @@ export class SorobanApiService extends ApiService< const endpointToApiIndex: Record = {}; - await Promise.all( - endpoints.map(async (endpoint, i) => { - const connection = await SorobanApiConnection.create( - endpoint, - this.fetchBlockBatches, - this.eventEmitter, - ); - - const api = connection.unsafeApi; - - this.eventEmitter.emit(IndexerEvent.ApiConnected, { - value: 1, - apiIndex: i, - endpoint: endpoint, - }); - - if (!this.networkMeta) { - this.networkMeta = connection.networkMeta; - } + for await (const [i, endpoint] of endpoints.entries()) { + const connection = await SorobanApiConnection.create( + endpoint, + this.fetchBlockBatches, + this.eventEmitter, + ); + + const api = connection.unsafeApi; + + this.eventEmitter.emit(IndexerEvent.ApiConnected, { + value: 1, + apiIndex: i, + endpoint: endpoint, + }); + + if (!this.networkMeta) { + this.networkMeta = connection.networkMeta; + } - if (network.chainId !== api.getChainId().toString()) { - throw this.metadataMismatchError( - 'ChainId', - network.chainId, - api.getChainId().toString(), - ); - } + if (network.chainId !== api.getChainId().toString()) { + throw this.metadataMismatchError( + 'ChainId', + network.chainId, + api.getChainId().toString(), + ); + } - endpointToApiIndex[endpoint] = connection; - }), - ); + endpointToApiIndex[endpoint] = connection; + } this.connectionPoolService.addBatchToConnections(endpointToApiIndex); diff --git a/packages/node/src/soroban/soroban.server.spec.ts b/packages/node/src/soroban/soroban.server.spec.ts index 036be7a1..5fb3366a 100644 --- a/packages/node/src/soroban/soroban.server.spec.ts +++ b/packages/node/src/soroban/soroban.server.spec.ts @@ -151,6 +151,6 @@ describe('SorobanServer', () => { ], }); expect((server as any).eventsCache[2]).toBeUndefined; - expect(spy).toHaveBeenCalledTimes(2); + expect(spy).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/node/src/soroban/soroban.server.ts b/packages/node/src/soroban/soroban.server.ts index 585a79bd..a76b54cb 100644 --- a/packages/node/src/soroban/soroban.server.ts +++ b/packages/node/src/soroban/soroban.server.ts @@ -26,20 +26,12 @@ export class SorobanServer extends Server { const groupedEvents = groupBy(response.events, (event) => parseInt(event.ledger) === sequence ? 'events' : 'eventsToCache', ); - const events = groupedEvents.events; - let eventsToCache = groupedEvents.eventsToCache; + const events = compact(groupedEvents.events); + let eventsToCache = compact(groupedEvents.eventsToCache); // Update the accumulated events with the events from the current sequence const newEvents = accEvents.concat(events); - if (eventsToCache && response.events.length === DEFAULT_PAGE_SIZE) { - // remove last sequence from eventsToCache as some of them might be paginated out - const lastSequence = last(response.events).ledger; - eventsToCache = compact( - eventsToCache.filter((event) => event.ledger !== lastSequence), - ); - } - if (eventsToCache?.length) { if (response.events.length === DEFAULT_PAGE_SIZE) { const lastSequence = last(response.events).ledger; diff --git a/packages/node/src/subcommands/testing.init.ts b/packages/node/src/subcommands/testing.init.ts index b297a501..589ebeb0 100644 --- a/packages/node/src/subcommands/testing.init.ts +++ b/packages/node/src/subcommands/testing.init.ts @@ -3,6 +3,7 @@ import { NestFactory } from '@nestjs/core'; import { ApiService, getLogger, NestLogger } from '@subql/node-core'; +import { ConfigureModule } from '../configure/configure.module'; import { ProjectService } from '../indexer/project.service'; import { SorobanApiService } from '../soroban'; import { TestingModule } from './testing.module'; @@ -11,17 +12,10 @@ import { TestingService } from './testing.service'; const logger = getLogger('Testing'); export async function testingInit(): Promise { try { - const app = await NestFactory.create(TestingModule, { - logger: new NestLogger(), - }); + const { config, project } = await ConfigureModule.getInstance(); + const subqueryProject = await project(); - await app.init(); - const projectService = app.get(ProjectService); - - // Initialise async services, we do this here rather than in factories, so we can capture one off events - await projectService.init(); - - const testingService = app.get(TestingService); + const testingService = new TestingService(config, subqueryProject); await testingService.init(); await testingService.run(); } catch (e) { diff --git a/packages/node/src/subcommands/testing.module.ts b/packages/node/src/subcommands/testing.module.ts index 6bd852d4..675a0169 100644 --- a/packages/node/src/subcommands/testing.module.ts +++ b/packages/node/src/subcommands/testing.module.ts @@ -11,6 +11,7 @@ import { NodeConfig, PoiService, StoreService, + TestRunner, } from '@subql/node-core'; import { ConfigureModule } from '../configure/configure.module'; import { SubqueryProject } from '../configure/SubqueryProject'; @@ -59,12 +60,22 @@ import { TestingService } from './testing.service'; }, inject: ['ISubqueryProject', ConnectionPoolService, EventEmitter2], }, + TestRunner, + { + provide: 'IApi', + useClass: SorobanApiService, + }, + { + provide: 'IIndexerManager', + useClass: IndexerManager, + }, IndexerManager, SchedulerRegistry, ], imports: [MetaModule, FetchModule], controllers: [], + exports: [TestRunner], }) export class TestingFeatureModule {} diff --git a/packages/node/src/subcommands/testing.service.ts b/packages/node/src/subcommands/testing.service.ts index c687a825..79ad0ab4 100644 --- a/packages/node/src/subcommands/testing.service.ts +++ b/packages/node/src/subcommands/testing.service.ts @@ -8,16 +8,13 @@ import { TestingService as BaseTestingService, TestRunner, NestLogger, - ApiService, } from '@subql/node-core'; import { SorobanBlockWrapper } from '@subql/types-soroban'; import { SubqlProjectDs, SubqueryProject } from '../configure/SubqueryProject'; -import { IndexerManager } from '../indexer/indexer.manager'; import { ProjectService } from '../indexer/project.service'; import { SorobanApi, SorobanApiService } from '../soroban'; import SafeSorobanProvider from '../soroban/safe-api'; import { TestingModule } from './testing.module'; - @Injectable() export class TestingService extends BaseTestingService< SorobanApi, @@ -48,6 +45,14 @@ export class TestingService extends BaseTestingService< ); await testContext.init(); + + const projectService: ProjectService = testContext.get(ProjectService); + const apiService = testContext.get(SorobanApiService); + + // Initialise async services, we do this here rather than in factories, so we can capture one off events + await apiService.init(); + await projectService.init(); + return testContext.get(TestRunner); } }