diff --git a/packages/indexer-common/src/__tests__/ipfs.test.ts b/packages/indexer-common/src/__tests__/ipfs.test.ts index 4b5afd708..3345cb6be 100644 --- a/packages/indexer-common/src/__tests__/ipfs.test.ts +++ b/packages/indexer-common/src/__tests__/ipfs.test.ts @@ -167,7 +167,7 @@ describe(SubgraphManifestResolver, () => { }) it('should resolve dependencies', async () => { - const manifest: SubgraphDependencies = await ipfs.resolveWithDependencies( + const manifest: SubgraphDependencies = await ipfs.resolveGraftDependencies( new SubgraphDeploymentID(DEP_ROOT_HASH), ) expect(manifest).toEqual({ diff --git a/packages/indexer-common/src/graph-node.ts b/packages/indexer-common/src/graph-node.ts index 67687aa24..1b7edb300 100644 --- a/packages/indexer-common/src/graph-node.ts +++ b/packages/indexer-common/src/graph-node.ts @@ -108,13 +108,42 @@ export class SubgraphManifestResolver { return yaml.parse(response.data) } + /** + * resolveCompositionDependencies + * resolves the composed dependencies in datasource[].source.address that is a Qm hash (starts with Qm) + * @param subgraphDeploymentId + * @returns Promise + */ + public async resolveCompositionDependencies( + subgraphDeploymentId: SubgraphDeploymentID, + ): Promise { + const manifest = await this.resolve(subgraphDeploymentId) + const dependencies: SubgraphDependency[] = [] + if (manifest['dataSources']) { + for (const dataSource of manifest['dataSources']) { + if ( + dataSource['source'] && + dataSource['source']['address'] && + dataSource['source']['address'].startsWith('Qm') + ) { + const dep = { + block: dataSource['source']['startBlock'], + base: new SubgraphDeploymentID(dataSource['source']['address']), + } + dependencies.push(dep) + } + } + } + return dependencies + } + /** * Resolves a subgraph's manifest and its dependencies in the order that they need to be resolved. * * @param subgraphDeploymentId * @returns Promise */ - public async resolveWithDependencies( + public async resolveGraftDependencies( subgraphDeploymentId: SubgraphDeploymentID, ): Promise { const deps: SubgraphDependencies = { @@ -694,6 +723,7 @@ export class GraphNode { } else { // Subgraph deployment not found await this.autoGraftDeployDependencies(deployment, deploymentAssignments, name) + await this.autoCompositionDependencies(deployment, deploymentAssignments, name) // Create and deploy the subgraph this.logger.debug( @@ -718,6 +748,36 @@ export class GraphNode { } } + /** + * Automatically deploy composition dependencies + * @param deployment + * @param deploymentAssignments + * @param name + * @returns void + */ + private async autoCompositionDependencies( + deployment: SubgraphDeploymentID, + deploymentAssignments: SubgraphDeploymentAssignment[], + name: string, + ) { + this.logger.debug('Auto composition deploy subgraph dependencies') + const { network: subgraphChainName } = await this.subgraphFeatures(deployment) + const dependencies = + await this.manifestResolver.resolveCompositionDependencies(deployment) + // these dependencies should be deployed in parallel + await Promise.all( + dependencies.map(async (dependency) => { + await this.ensureDependency(dependency, name) + await this.syncToBlock( + dependency.block, + dependency.base, + subgraphChainName, + false, // dependencies in composition should continue indexing + ) + }), + ) + } + /** * Automatically deploy any dependencies of the subgraph, returning only when they are sync'd to the specified block. * @@ -735,7 +795,7 @@ export class GraphNode { ) { this.logger.debug('Auto graft deploy subgraph dependencies') const { network: subgraphChainName } = await this.subgraphFeatures(deployment) - const dependencies = await this.manifestResolver.resolveWithDependencies(deployment) + const dependencies = await this.manifestResolver.resolveGraftDependencies(deployment) if (dependencies.dependencies.length == 0) { this.logger.debug('No subgraph dependencies found', { name, @@ -748,65 +808,68 @@ export class GraphNode { ) for (const dependency of dependencies.dependencies) { - const queriedAssignments = await this.subgraphDeploymentAssignmentsByDeploymentID( - SubgraphStatus.ACTIVE, - [dependency.base.ipfsHash], - ) - this.logger.debug( - 'queried graph-node for assignment', - queriedAssignments.map((a: SubgraphDeploymentAssignment) => { - return { ipfsHash: a.id.ipfsHash, ...a } - }), - ) - const dependencyAssignment = queriedAssignments.find( - (assignment) => assignment.id.ipfsHash == dependency.base.ipfsHash, - ) + await this.ensureDependency(dependency, name) + await this.syncToBlock(dependency.block, dependency.base, subgraphChainName, true) + } + } + } - if (dependencyAssignment) { - this.logger.info("Dependency subgraph found, checking if it's healthy", { - name, - deployment: dependency.base.display, - block_required: dependency.block, - }) + /** + * Ensure a dependency is deployed and healthy + * @param dependency + * @param name + */ + private async ensureDependency(dependency: SubgraphDependency, name: string) { + const queriedAssignments = await this.subgraphDeploymentAssignmentsByDeploymentID( + SubgraphStatus.ACTIVE, + [dependency.base.ipfsHash], + ) + this.logger.debug( + 'queried graph-node for assignment', + queriedAssignments.map((a: SubgraphDeploymentAssignment) => { + return { ipfsHash: a.id.ipfsHash, ...a } + }), + ) + const dependencyAssignment = queriedAssignments.find( + (assignment) => assignment.id.ipfsHash == dependency.base.ipfsHash, + ) - const indexingStatus = await this.indexingStatus([dependency.base]) - const deploymentStatus = indexingStatus.find( - (status) => status.subgraphDeployment.ipfsHash === dependency.base.ipfsHash, - ) - if (!deploymentStatus) { - this.logger.error(`Subgraph not found in indexing status`, { - subgraph: dependency.base.ipfsHash, - indexingStatus, - }) - throw new Error(`Subgraph not found in indexing status`) - } else { - this.logger.info( - 'Dependency subgraph found, will try to sync it to the block required', - { - deploymentStatus, - }, - ) - } - } else if (!dependencyAssignment) { - this.logger.debug( - 'Dependency subgraph not found, creating, deploying and pausing...', - { - name, - deployment: dependency.base.display, - block_required: dependency.block, - }, - ) - // are we paused at the block we wanted? + if (dependencyAssignment) { + this.logger.info("Dependency subgraph found, checking if it's healthy", { + name, + deployment: dependency.base.display, + block_required: dependency.block, + }) - await this.create(name) - await this.deploy(name, dependency.base) - } - await this.syncToBlockAndPause( - dependency.block, - dependency.base, - subgraphChainName, + const indexingStatus = await this.indexingStatus([dependency.base]) + const deploymentStatus = indexingStatus.find( + (status) => status.subgraphDeployment.ipfsHash === dependency.base.ipfsHash, + ) + if (!deploymentStatus) { + this.logger.error(`Subgraph not found in indexing status`, { + subgraph: dependency.base.ipfsHash, + indexingStatus, + }) + throw new Error(`Subgraph not found in indexing status`) + } else { + this.logger.info( + 'Dependency subgraph found, will try to sync it to the block required', + { + deploymentStatus, + }, ) } + } else { + this.logger.debug( + 'Dependency subgraph not found, creating, deploying and pausing...', + { + name, + deployment: dependency.base.display, + block_required: dependency.block, + }, + ) + await this.create(name) + await this.deploy(name, dependency.base) } } @@ -816,10 +879,11 @@ export class GraphNode { * This will resume a paused subgraph if the block height target is higher than the * current block height */ - public async syncToBlockAndPause( + public async syncToBlock( blockHeight: number, subgraphDeployment: SubgraphDeploymentID, chainName: string | null, + shouldPauseWhenComplete: boolean, ): Promise { async function waitForMs(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)) @@ -916,8 +980,24 @@ export class GraphNode { subgraph: subgraphDeployment.ipfsHash, indexingStatus, }) - // pause the subgraph to prevent further indexing - await this.pause(subgraphDeployment) + // pause the subgraph to prevent further indexing (grafting path) + if (shouldPauseWhenComplete) { + this.logger.debug("Pausing subgraph deployment as it's synced to block", { + subgraph: subgraphDeployment.ipfsHash, + blockHeight, + }) + await this.pause(subgraphDeployment) + } // Composition path + else { + this.logger.debug( + `Subgraph is syncd to block ${chain.latestBlock.number} not pausing.`, + { + subgraph: subgraphDeployment.ipfsHash, + indexingStatus, + }, + ) + break + } } else { this.logger.debug(`Subgraph already paused and synced to block.`, { subgraph: subgraphDeployment.ipfsHash,