From 5d38d5abb978e58a29e9c10cd1d2db60bd82a97c Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Fri, 27 Oct 2023 09:02:29 +1300 Subject: [PATCH] Fix not injecting cache into workers (#81) * Fix not injecting cache into workers * Update changelog --- packages/node/CHANGELOG.md | 2 ++ .../worker-block-dispatcher.service.ts | 10 ++++++++- packages/node/src/indexer/fetch.module.ts | 5 +++++ packages/node/src/indexer/sandbox.service.ts | 4 ++++ .../src/indexer/worker/worker-fetch.module.ts | 6 +++++ .../indexer/worker/worker.cache.service.ts | 22 +++++++++++++++++++ 6 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 packages/node/src/indexer/worker/worker.cache.service.ts diff --git a/packages/node/CHANGELOG.md b/packages/node/CHANGELOG.md index 14ce34fe..f24c492c 100644 --- a/packages/node/CHANGELOG.md +++ b/packages/node/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- Not injecting cache into workers (#81) ## [3.1.0] - 2023-10-26 ### Fixed diff --git a/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts b/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts index bbacd652..76a64fdd 100644 --- a/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts +++ b/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts @@ -26,8 +26,11 @@ import { dynamicDsHostFunctions, IProjectUpgradeService, HostUnfinalizedBlocks, + cacheHostFunctions, + HostCache, + InMemoryCacheService, } from '@subql/node-core'; -import { Store } from '@subql/types-core'; +import { Store, Cache } from '@subql/types-core'; import { AlgorandApiConnection } from '../../algorand'; import { AlgorandProjectDs, @@ -44,6 +47,7 @@ type IndexerWorker = IIndexerWorker & { async function createIndexerWorker( store: Store, + cache: Cache, dynamicDsService: IDynamicDsService, unfinalizedBlocksService: IUnfinalizedBlocksService, connectionPoolState: ConnectionPoolStateManager, @@ -55,6 +59,7 @@ async function createIndexerWorker( // HostDynamicDS & HostStore & HostUnfinalizedBlocks HostDynamicDS & HostStore & + HostCache & HostUnfinalizedBlocks & HostConnectionPoolState >( @@ -62,6 +67,7 @@ async function createIndexerWorker( [...baseWorkerFunctions, 'initWorker'], { ...storeHostFunctions(store), + ...cacheHostFunctions(cache), ...dynamicDsHostFunctions(dynamicDsService), unfinalizedBlocksProcess: unfinalizedBlocksService.processUnfinalizedBlockHeader.bind( @@ -90,6 +96,7 @@ export class WorkerBlockDispatcherService @Inject('IProjectUpgradeService') projectUpgadeService: IProjectUpgradeService, smartBatchService: SmartBatchService, + cacheService: InMemoryCacheService, storeService: StoreService, storeCacheService: StoreCacheService, poiService: PoiService, @@ -114,6 +121,7 @@ export class WorkerBlockDispatcherService () => createIndexerWorker( storeService.getStore(), + cacheService.getCache(), dynamicDsService, unfinalizedBlocksService, connectionPoolState, diff --git a/packages/node/src/indexer/fetch.module.ts b/packages/node/src/indexer/fetch.module.ts index a5f8e5ae..555c997b 100644 --- a/packages/node/src/indexer/fetch.module.ts +++ b/packages/node/src/indexer/fetch.module.ts @@ -15,6 +15,7 @@ import { StoreCacheService, ConnectionPoolStateManager, IProjectUpgradeService, + InMemoryCacheService, } from '@subql/node-core'; import { AlgorandApiConnection, AlgorandApiService } from '../algorand'; import { SubqueryProject } from '../configure/SubqueryProject'; @@ -33,6 +34,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; @Module({ providers: [ + InMemoryCacheService, StoreService, StoreCacheService, ConnectionPoolService, @@ -73,6 +75,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; apiService: AlgorandApiService, indexerManager: IndexerManager, smartBatchService: SmartBatchService, + cacheService: InMemoryCacheService, storeService: StoreService, storeCacheService: StoreCacheService, poiService: PoiService, @@ -89,6 +92,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; projectService, projectUpgradeService, smartBatchService, + cacheService, storeService, storeCacheService, poiService, @@ -121,6 +125,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; AlgorandApiService, IndexerManager, SmartBatchService, + InMemoryCacheService, StoreService, StoreCacheService, PoiService, diff --git a/packages/node/src/indexer/sandbox.service.ts b/packages/node/src/indexer/sandbox.service.ts index 74c6dcb2..cc79cf37 100644 --- a/packages/node/src/indexer/sandbox.service.ts +++ b/packages/node/src/indexer/sandbox.service.ts @@ -6,6 +6,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { hostStoreToStore, IndexerSandbox, + InMemoryCacheService, ISubqueryProject, NodeConfig, StoreService, @@ -22,6 +23,7 @@ export class SandboxService { private readonly apiService: AlgorandApiService, @Inject(isMainThread ? StoreService : 'Null') private readonly storeService: StoreService, + private readonly cacheService: InMemoryCacheService, private readonly nodeConfig: NodeConfig, @Inject('ISubqueryProject') private readonly project: ISubqueryProject, ) {} @@ -31,11 +33,13 @@ export class SandboxService { ? this.storeService.getStore() : hostStoreToStore((global as any).host); // Provided in worker.ts + const cache = this.cacheService.getCache(); const entry = this.getDataSourceEntry(ds); let processor = this.processorCache[entry]; if (!processor) { processor = new IndexerSandbox( { + cache, store, root: this.project.root, entry, diff --git a/packages/node/src/indexer/worker/worker-fetch.module.ts b/packages/node/src/indexer/worker/worker-fetch.module.ts index be697da2..26dc848b 100644 --- a/packages/node/src/indexer/worker/worker-fetch.module.ts +++ b/packages/node/src/indexer/worker/worker-fetch.module.ts @@ -8,6 +8,7 @@ import { WorkerDynamicDsService, ConnectionPoolStateManager, WorkerConnectionPoolStateManager, + InMemoryCacheService, } from '@subql/node-core'; import { AlgorandApiService, AlgorandApiConnection } from '../../algorand'; import { SubqueryProject } from '../../configure/SubqueryProject'; @@ -17,6 +18,7 @@ import { IndexerManager } from '../indexer.manager'; import { ProjectService } from '../project.service'; import { SandboxService } from '../sandbox.service'; import { UnfinalizedBlocksService } from '../unfinalizedBlocks.service'; +import { WorkerInMemoryCacheService } from './worker.cache.service'; import { WorkerService } from './worker.service'; import { WorkerUnfinalizedBlocksService } from './worker.unfinalizedBlocks.service'; @@ -61,6 +63,10 @@ import { WorkerUnfinalizedBlocksService } from './worker.unfinalizedBlocks.servi useFactory: () => new WorkerUnfinalizedBlocksService((global as any).host), }, + { + provide: InMemoryCacheService, + useFactory: () => new WorkerInMemoryCacheService((global as any).host), + }, WorkerService, ], exports: [], diff --git a/packages/node/src/indexer/worker/worker.cache.service.ts b/packages/node/src/indexer/worker/worker.cache.service.ts new file mode 100644 index 00000000..b8bb16ec --- /dev/null +++ b/packages/node/src/indexer/worker/worker.cache.service.ts @@ -0,0 +1,22 @@ +// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +// TODO remove and use version in node-core + +import { isMainThread } from 'worker_threads'; +import { Injectable } from '@nestjs/common'; +import { HostCache, hostCacheToCache } from '@subql/node-core'; +import { Cache } from '@subql/types-core'; + +@Injectable() +export class WorkerInMemoryCacheService { + constructor(private host: HostCache) { + if (isMainThread) { + throw new Error('Expected to be worker thread'); + } + } + + getCache(): Cache { + return hostCacheToCache(this.host); + } +}