Skip to content

Commit

Permalink
Fix not injecting cache into workers (#81)
Browse files Browse the repository at this point in the history
* Fix not injecting cache into workers

* Update changelog
  • Loading branch information
stwiname authored Oct 26, 2023
1 parent de92a63 commit 5d38d5a
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 1 deletion.
2 changes: 2 additions & 0 deletions packages/node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Fixed
- Not injecting cache into workers (#81)

## [3.1.0] - 2023-10-26
### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import {
dynamicDsHostFunctions,
IProjectUpgradeService,
HostUnfinalizedBlocks,
cacheHostFunctions,
HostCache,
InMemoryCacheService,
} from '@subql/node-core';
import { Store } from '@subql/types-core';
import { Store, Cache } from '@subql/types-core';
import { AlgorandApiConnection } from '../../algorand';
import {
AlgorandProjectDs,
Expand All @@ -44,6 +47,7 @@ type IndexerWorker = IIndexerWorker & {

async function createIndexerWorker(
store: Store,
cache: Cache,
dynamicDsService: IDynamicDsService<AlgorandProjectDs>,
unfinalizedBlocksService: IUnfinalizedBlocksService<BlockContent>,
connectionPoolState: ConnectionPoolStateManager<AlgorandApiConnection>,
Expand All @@ -55,13 +59,15 @@ async function createIndexerWorker(
// HostDynamicDS<AlgorandProjectDs> & HostStore & HostUnfinalizedBlocks
HostDynamicDS<AlgorandProjectDs> &
HostStore &
HostCache &
HostUnfinalizedBlocks &
HostConnectionPoolState<AlgorandApiConnection>
>(
path.resolve(__dirname, '../../../dist/indexer/worker/worker.js'),
[...baseWorkerFunctions, 'initWorker'],
{
...storeHostFunctions(store),
...cacheHostFunctions(cache),
...dynamicDsHostFunctions(dynamicDsService),
unfinalizedBlocksProcess:
unfinalizedBlocksService.processUnfinalizedBlockHeader.bind(
Expand Down Expand Up @@ -90,6 +96,7 @@ export class WorkerBlockDispatcherService
@Inject('IProjectUpgradeService')
projectUpgadeService: IProjectUpgradeService,
smartBatchService: SmartBatchService,
cacheService: InMemoryCacheService,
storeService: StoreService,
storeCacheService: StoreCacheService,
poiService: PoiService,
Expand All @@ -114,6 +121,7 @@ export class WorkerBlockDispatcherService
() =>
createIndexerWorker(
storeService.getStore(),
cacheService.getCache(),
dynamicDsService,
unfinalizedBlocksService,
connectionPoolState,
Expand Down
5 changes: 5 additions & 0 deletions packages/node/src/indexer/fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
StoreCacheService,
ConnectionPoolStateManager,
IProjectUpgradeService,
InMemoryCacheService,
} from '@subql/node-core';
import { AlgorandApiConnection, AlgorandApiService } from '../algorand';
import { SubqueryProject } from '../configure/SubqueryProject';
Expand All @@ -33,6 +34,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';

@Module({
providers: [
InMemoryCacheService,
StoreService,
StoreCacheService,
ConnectionPoolService,
Expand Down Expand Up @@ -73,6 +75,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
apiService: AlgorandApiService,
indexerManager: IndexerManager,
smartBatchService: SmartBatchService,
cacheService: InMemoryCacheService,
storeService: StoreService,
storeCacheService: StoreCacheService,
poiService: PoiService,
Expand All @@ -89,6 +92,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
projectService,
projectUpgradeService,
smartBatchService,
cacheService,
storeService,
storeCacheService,
poiService,
Expand Down Expand Up @@ -121,6 +125,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
AlgorandApiService,
IndexerManager,
SmartBatchService,
InMemoryCacheService,
StoreService,
StoreCacheService,
PoiService,
Expand Down
4 changes: 4 additions & 0 deletions packages/node/src/indexer/sandbox.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Inject, Injectable } from '@nestjs/common';
import {
hostStoreToStore,
IndexerSandbox,
InMemoryCacheService,
ISubqueryProject,
NodeConfig,
StoreService,
Expand All @@ -22,6 +23,7 @@ export class SandboxService<Api> {
private readonly apiService: AlgorandApiService,
@Inject(isMainThread ? StoreService : 'Null')
private readonly storeService: StoreService,
private readonly cacheService: InMemoryCacheService,
private readonly nodeConfig: NodeConfig,
@Inject('ISubqueryProject') private readonly project: ISubqueryProject,
) {}
Expand All @@ -31,11 +33,13 @@ export class SandboxService<Api> {
? this.storeService.getStore()
: hostStoreToStore((global as any).host); // Provided in worker.ts

const cache = this.cacheService.getCache();
const entry = this.getDataSourceEntry(ds);
let processor = this.processorCache[entry];
if (!processor) {
processor = new IndexerSandbox(
{
cache,
store,
root: this.project.root,
entry,
Expand Down
6 changes: 6 additions & 0 deletions packages/node/src/indexer/worker/worker-fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
WorkerDynamicDsService,
ConnectionPoolStateManager,
WorkerConnectionPoolStateManager,
InMemoryCacheService,
} from '@subql/node-core';
import { AlgorandApiService, AlgorandApiConnection } from '../../algorand';
import { SubqueryProject } from '../../configure/SubqueryProject';
Expand All @@ -17,6 +18,7 @@ import { IndexerManager } from '../indexer.manager';
import { ProjectService } from '../project.service';
import { SandboxService } from '../sandbox.service';
import { UnfinalizedBlocksService } from '../unfinalizedBlocks.service';
import { WorkerInMemoryCacheService } from './worker.cache.service';
import { WorkerService } from './worker.service';
import { WorkerUnfinalizedBlocksService } from './worker.unfinalizedBlocks.service';

Expand Down Expand Up @@ -61,6 +63,10 @@ import { WorkerUnfinalizedBlocksService } from './worker.unfinalizedBlocks.servi
useFactory: () =>
new WorkerUnfinalizedBlocksService((global as any).host),
},
{
provide: InMemoryCacheService,
useFactory: () => new WorkerInMemoryCacheService((global as any).host),
},
WorkerService,
],
exports: [],
Expand Down
22 changes: 22 additions & 0 deletions packages/node/src/indexer/worker/worker.cache.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

// TODO remove and use version in node-core

import { isMainThread } from 'worker_threads';
import { Injectable } from '@nestjs/common';
import { HostCache, hostCacheToCache } from '@subql/node-core';
import { Cache } from '@subql/types-core';

@Injectable()
export class WorkerInMemoryCacheService {
constructor(private host: HostCache) {
if (isMainThread) {
throw new Error('Expected to be worker thread');
}
}

getCache(): Cache {
return hostCacheToCache(this.host);
}
}

0 comments on commit 5d38d5a

Please sign in to comment.