Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
guplersaxanoid committed Aug 2, 2023
1 parent c178dae commit 3e4f987
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 57 deletions.
9 changes: 8 additions & 1 deletion packages/node/src/configure/configure.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ function warnDeprecations() {
@Global()
@Module({})
export class ConfigureModule {
static async register(): Promise<DynamicModule> {
static async getInstance(): Promise<{
config: NodeConfig;
project: () => Promise<SubqueryProject>;
}> {
const { argv } = yargsOptions;
let config: NodeConfig;
let rawManifest: unknown;
Expand Down Expand Up @@ -130,6 +133,10 @@ export class ConfigureModule {
});
return p;
};
return { config, project };
}
static async register(): Promise<DynamicModule> {
const { config, project } = await ConfigureModule.getInstance();

return {
module: ConfigureModule,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ import {
HostDynamicDS,
WorkerBlockDispatcher,
IUnfinalizedBlocksService,
HostConnectionPoolState,

Check warning on line 21 in packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts

View workflow job for this annotation

GitHub Actions / code-style

'HostConnectionPoolState' is defined but never used
ConnectionPoolStateManager,
connectionPoolStateHostFunctions,
} from '@subql/node-core';
import { Store } from '@subql/types-soroban';
import chalk from 'chalk';

Check warning on line 26 in packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts

View workflow job for this annotation

GitHub Actions / code-style

'chalk' is defined but never used
import {
SubqlProjectDs,
SubqueryProject,
} from '../../configure/SubqueryProject';
import { SorobanApiConnection } from '../../soroban/api.connection';
import { SorobanBlockWrapped } from '../../soroban/block.soroban';
import { DynamicDsService } from '../dynamic-ds.service';
import { UnfinalizedBlocksService } from '../unfinalizedBlocks.service';
Expand All @@ -41,6 +45,7 @@ async function createIndexerWorker(
store: Store,
dynamicDsService: IDynamicDsService<SubqlProjectDs>,
unfinalizedBlocksService: IUnfinalizedBlocksService<SorobanBlockWrapped>,
connectionPoolState: ConnectionPoolStateManager<SorobanApiConnection>,
root: string,
): Promise<IndexerWorker> {
const indexerWorker = Worker.create<
Expand Down Expand Up @@ -75,6 +80,7 @@ async function createIndexerWorker(
unfinalizedBlocksService.processUnfinalizedBlockHeader.bind(
unfinalizedBlocksService,
),
...connectionPoolStateHostFunctions(connectionPoolState),
},
root,
);
Expand All @@ -100,6 +106,7 @@ export class WorkerBlockDispatcherService
@Inject('ISubqueryProject') project: SubqueryProject,
dynamicDsService: DynamicDsService,
unfinalizedBlocksSevice: UnfinalizedBlocksService,
connectionPoolState: ConnectionPoolStateManager<SorobanApiConnection>,
) {
super(
nodeConfig,
Expand All @@ -116,6 +123,7 @@ export class WorkerBlockDispatcherService
storeService.getStore(),
dynamicDsService,
unfinalizedBlocksSevice,
connectionPoolState,
project.root,
),
);
Expand Down
3 changes: 3 additions & 0 deletions packages/node/src/indexer/fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
project: SubqueryProject,
dynamicDsService: DynamicDsService,
unfinalizedBlocks: UnfinalizedBlocksService,
connectionPoolState: ConnectionPoolStateManager<SorobanApiConnection>,
) =>
nodeConfig.workers !== undefined
? new WorkerBlockDispatcherService(
Expand All @@ -93,6 +94,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
project,
dynamicDsService,
unfinalizedBlocks,
connectionPoolState,
)
: new BlockDispatcherService(
apiService,
Expand Down Expand Up @@ -120,6 +122,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
'ISubqueryProject',
DynamicDsService,
UnfinalizedBlocksService,
ConnectionPoolStateManager,
],
},
FetchService,
Expand Down
12 changes: 11 additions & 1 deletion packages/node/src/indexer/indexer.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import {
PgMmrCacheService,
MmrQueryService,
ConnectionPoolStateManager,
WorkerConnectionPoolStateManager,
NodeConfig,
} from '@subql/node-core';
import { SubqueryProject } from '../configure/SubqueryProject';
import { SorobanApiService } from '../soroban';
Expand All @@ -33,8 +35,16 @@ import { WorkerUnfinalizedBlocksService } from './worker/worker.unfinalizedBlock
IndexerManager,
StoreCacheService,
StoreService,
{
provide: ConnectionPoolStateManager,
useFactory: () => {
if (isMainThread) {
throw new Error('Expected to be worker thread');
}
return new WorkerConnectionPoolStateManager((global as any).host);
},
},
ConnectionPoolService,
ConnectionPoolStateManager,
{
provide: ApiService,
useFactory: async (
Expand Down
15 changes: 13 additions & 2 deletions packages/node/src/indexer/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ import {
hostDynamicDsKeys,
HostDynamicDS,
ProcessBlockResponse,
HostConnectionPoolState,
hostConnectionPoolStateKeys,
} from '@subql/node-core';
import { SubqlProjectDs } from '../../configure/SubqueryProject';
import { SorobanApiConnection } from '../../soroban/api.connection';
import { IndexerManager } from '../indexer.manager';
import { WorkerModule } from './worker.module';
import {
Expand Down Expand Up @@ -119,10 +122,18 @@ async function waitForWorkerBatchSize(heapSizeInBytes: number): Promise<void> {

// Register these functions to be exposed to worker host
(global as any).host = WorkerHost.create<
HostStore & HostDynamicDS<SubqlProjectDs> & HostUnfinalizedBlocks,
HostStore &
HostDynamicDS<SubqlProjectDs> &
HostUnfinalizedBlocks &
HostConnectionPoolState<SorobanApiConnection>,
IInitIndexerWorker
>(
[...hostStoreKeys, ...hostDynamicDsKeys, ...hostUnfinalizedBlocksKeys],
[
...hostStoreKeys,
...hostDynamicDsKeys,
...hostUnfinalizedBlocksKeys,
...hostConnectionPoolStateKeys,
],
{
initWorker,
fetchBlock,
Expand Down
56 changes: 27 additions & 29 deletions packages/node/src/soroban/api.service.soroban.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,37 +53,35 @@ export class SorobanApiService extends ApiService<

const endpointToApiIndex: Record<string, SorobanApiConnection> = {};

await Promise.all(
endpoints.map(async (endpoint, i) => {
const connection = await SorobanApiConnection.create(
endpoint,
this.fetchBlockBatches,
this.eventEmitter,
);

const api = connection.unsafeApi;

this.eventEmitter.emit(IndexerEvent.ApiConnected, {
value: 1,
apiIndex: i,
endpoint: endpoint,
});

if (!this.networkMeta) {
this.networkMeta = connection.networkMeta;
}
for await (const [i, endpoint] of endpoints.entries()) {
const connection = await SorobanApiConnection.create(
endpoint,
this.fetchBlockBatches,
this.eventEmitter,
);

const api = connection.unsafeApi;

this.eventEmitter.emit(IndexerEvent.ApiConnected, {
value: 1,
apiIndex: i,
endpoint: endpoint,
});

if (!this.networkMeta) {
this.networkMeta = connection.networkMeta;
}

if (network.chainId !== api.getChainId().toString()) {
throw this.metadataMismatchError(
'ChainId',
network.chainId,
api.getChainId().toString(),
);
}
if (network.chainId !== api.getChainId().toString()) {
throw this.metadataMismatchError(
'ChainId',
network.chainId,
api.getChainId().toString(),
);
}

endpointToApiIndex[endpoint] = connection;
}),
);
endpointToApiIndex[endpoint] = connection;
}

this.connectionPoolService.addBatchToConnections(endpointToApiIndex);

Expand Down
2 changes: 1 addition & 1 deletion packages/node/src/soroban/soroban.server.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,6 @@ describe('SorobanServer', () => {
],
});
expect((server as any).eventsCache[2]).toBeUndefined;
expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenCalledTimes(1);
});
});
12 changes: 2 additions & 10 deletions packages/node/src/soroban/soroban.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,12 @@ export class SorobanServer extends Server {
const groupedEvents = groupBy(response.events, (event) =>
parseInt(event.ledger) === sequence ? 'events' : 'eventsToCache',
);
const events = groupedEvents.events;
let eventsToCache = groupedEvents.eventsToCache;
const events = compact(groupedEvents.events);
let eventsToCache = compact(groupedEvents.eventsToCache);

// Update the accumulated events with the events from the current sequence
const newEvents = accEvents.concat(events);

if (eventsToCache && response.events.length === DEFAULT_PAGE_SIZE) {
// remove last sequence from eventsToCache as some of them might be paginated out
const lastSequence = last(response.events).ledger;
eventsToCache = compact(
eventsToCache.filter((event) => event.ledger !== lastSequence),
);
}

if (eventsToCache?.length) {
if (response.events.length === DEFAULT_PAGE_SIZE) {
const lastSequence = last(response.events).ledger;
Expand Down
14 changes: 4 additions & 10 deletions packages/node/src/subcommands/testing.init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { NestFactory } from '@nestjs/core';
import { ApiService, getLogger, NestLogger } from '@subql/node-core';
import { ConfigureModule } from '../configure/configure.module';
import { ProjectService } from '../indexer/project.service';
import { SorobanApiService } from '../soroban';
import { TestingModule } from './testing.module';
Expand All @@ -11,17 +12,10 @@ import { TestingService } from './testing.service';
const logger = getLogger('Testing');
export async function testingInit(): Promise<void> {
try {
const app = await NestFactory.create(TestingModule, {
logger: new NestLogger(),
});
const { config, project } = await ConfigureModule.getInstance();
const subqueryProject = await project();

await app.init();
const projectService = app.get(ProjectService);

// Initialise async services, we do this here rather than in factories, so we can capture one off events
await projectService.init();

const testingService = app.get(TestingService);
const testingService = new TestingService(config, subqueryProject);
await testingService.init();
await testingService.run();
} catch (e) {
Expand Down
11 changes: 11 additions & 0 deletions packages/node/src/subcommands/testing.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
NodeConfig,
PoiService,
StoreService,
TestRunner,
} from '@subql/node-core';
import { ConfigureModule } from '../configure/configure.module';
import { SubqueryProject } from '../configure/SubqueryProject';
Expand Down Expand Up @@ -59,12 +60,22 @@ import { TestingService } from './testing.service';
},
inject: ['ISubqueryProject', ConnectionPoolService, EventEmitter2],
},
TestRunner,
{
provide: 'IApi',
useClass: SorobanApiService,
},
{
provide: 'IIndexerManager',
useClass: IndexerManager,
},
IndexerManager,
SchedulerRegistry,
],

imports: [MetaModule, FetchModule],
controllers: [],
exports: [TestRunner],
})
export class TestingFeatureModule {}

Expand Down
11 changes: 8 additions & 3 deletions packages/node/src/subcommands/testing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@ import {
TestingService as BaseTestingService,
TestRunner,
NestLogger,
ApiService,
} from '@subql/node-core';
import { SorobanBlockWrapper } from '@subql/types-soroban';
import { SubqlProjectDs, SubqueryProject } from '../configure/SubqueryProject';
import { IndexerManager } from '../indexer/indexer.manager';
import { ProjectService } from '../indexer/project.service';
import { SorobanApi, SorobanApiService } from '../soroban';
import SafeSorobanProvider from '../soroban/safe-api';
import { TestingModule } from './testing.module';

@Injectable()
export class TestingService extends BaseTestingService<
SorobanApi,
Expand Down Expand Up @@ -48,6 +45,14 @@ export class TestingService extends BaseTestingService<
);

await testContext.init();

const projectService: ProjectService = testContext.get(ProjectService);
const apiService = testContext.get(SorobanApiService);

// Initialise async services, we do this here rather than in factories, so we can capture one off events
await apiService.init();
await projectService.init();

return testContext.get(TestRunner);
}
}

0 comments on commit 3e4f987

Please sign in to comment.