Skip to content

Commit

Permalink
common: auto resolve composition dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
dwerner committed Feb 21, 2025
1 parent d1f9408 commit ab45df7
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 60 deletions.
2 changes: 1 addition & 1 deletion packages/indexer-common/src/__tests__/ipfs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ describe(SubgraphManifestResolver, () => {
})

it('should resolve dependencies', async () => {
const manifest: SubgraphDependencies = await ipfs.resolveWithDependencies(
const manifest: SubgraphDependencies = await ipfs.resolveGraftDependencies(
new SubgraphDeploymentID(DEP_ROOT_HASH),
)
expect(manifest).toEqual({
Expand Down
198 changes: 139 additions & 59 deletions packages/indexer-common/src/graph-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,42 @@ export class SubgraphManifestResolver {
return yaml.parse(response.data)
}

/**
* resolveCompositionDependencies
* resolves the composed dependencies in datasource[].source.address that is a Qm hash (starts with Qm)
* @param subgraphDeploymentId
* @returns Promise<SubgraphDependency[]>
*/
public async resolveCompositionDependencies(
subgraphDeploymentId: SubgraphDeploymentID,
): Promise<SubgraphDependency[]> {
const manifest = await this.resolve(subgraphDeploymentId)
const dependencies: SubgraphDependency[] = []
if (manifest['dataSources']) {
for (const dataSource of manifest['dataSources']) {
if (
dataSource['source'] &&
dataSource['source']['address'] &&
dataSource['source']['address'].startsWith('Qm')
) {
const dep = {
block: dataSource['source']['startBlock'],
base: new SubgraphDeploymentID(dataSource['source']['address']),
}
dependencies.push(dep)
}
}
}
return dependencies
}

/**
* Resolves a subgraph's manifest and its dependencies in the order that they need to be resolved.
*
* @param subgraphDeploymentId
* @returns Promise<SubgraphDependencies>
*/
public async resolveWithDependencies(
public async resolveGraftDependencies(
subgraphDeploymentId: SubgraphDeploymentID,
): Promise<SubgraphDependencies> {
const deps: SubgraphDependencies = {
Expand Down Expand Up @@ -694,6 +723,7 @@ export class GraphNode {
} else {
// Subgraph deployment not found
await this.autoGraftDeployDependencies(deployment, deploymentAssignments, name)
await this.autoCompositionDependencies(deployment, deploymentAssignments, name)

// Create and deploy the subgraph
this.logger.debug(
Expand All @@ -718,6 +748,36 @@ export class GraphNode {
}
}

/**
* Automatically deploy composition dependencies
* @param deployment
* @param deploymentAssignments
* @param name
* @returns void
*/
private async autoCompositionDependencies(
deployment: SubgraphDeploymentID,
deploymentAssignments: SubgraphDeploymentAssignment[],
name: string,
) {
this.logger.debug('Auto composition deploy subgraph dependencies')
const { network: subgraphChainName } = await this.subgraphFeatures(deployment)
const dependencies =
await this.manifestResolver.resolveCompositionDependencies(deployment)
// these dependencies should be deployed in parallel
await Promise.all(
dependencies.map(async (dependency) => {
await this.ensureDependency(dependency, name)
await this.syncToBlock(
dependency.block,
dependency.base,
subgraphChainName,
false, // dependencies in composition should continue indexing
)
}),
)
}

/**
* Automatically deploy any dependencies of the subgraph, returning only when they are sync'd to the specified block.
*
Expand All @@ -735,7 +795,7 @@ export class GraphNode {
) {
this.logger.debug('Auto graft deploy subgraph dependencies')
const { network: subgraphChainName } = await this.subgraphFeatures(deployment)
const dependencies = await this.manifestResolver.resolveWithDependencies(deployment)
const dependencies = await this.manifestResolver.resolveGraftDependencies(deployment)
if (dependencies.dependencies.length == 0) {
this.logger.debug('No subgraph dependencies found', {
name,
Expand All @@ -748,65 +808,68 @@ export class GraphNode {
)

for (const dependency of dependencies.dependencies) {
const queriedAssignments = await this.subgraphDeploymentAssignmentsByDeploymentID(
SubgraphStatus.ACTIVE,
[dependency.base.ipfsHash],
)
this.logger.debug(
'queried graph-node for assignment',
queriedAssignments.map((a: SubgraphDeploymentAssignment) => {
return { ipfsHash: a.id.ipfsHash, ...a }
}),
)
const dependencyAssignment = queriedAssignments.find(
(assignment) => assignment.id.ipfsHash == dependency.base.ipfsHash,
)
await this.ensureDependency(dependency, name)
await this.syncToBlock(dependency.block, dependency.base, subgraphChainName, true)
}
}
}

if (dependencyAssignment) {
this.logger.info("Dependency subgraph found, checking if it's healthy", {
name,
deployment: dependency.base.display,
block_required: dependency.block,
})
/**
* Ensure a dependency is deployed and healthy
* @param dependency
* @param name
*/
private async ensureDependency(dependency: SubgraphDependency, name: string) {
const queriedAssignments = await this.subgraphDeploymentAssignmentsByDeploymentID(
SubgraphStatus.ACTIVE,
[dependency.base.ipfsHash],
)
this.logger.debug(
'queried graph-node for assignment',
queriedAssignments.map((a: SubgraphDeploymentAssignment) => {
return { ipfsHash: a.id.ipfsHash, ...a }
}),
)
const dependencyAssignment = queriedAssignments.find(
(assignment) => assignment.id.ipfsHash == dependency.base.ipfsHash,
)

const indexingStatus = await this.indexingStatus([dependency.base])
const deploymentStatus = indexingStatus.find(
(status) => status.subgraphDeployment.ipfsHash === dependency.base.ipfsHash,
)
if (!deploymentStatus) {
this.logger.error(`Subgraph not found in indexing status`, {
subgraph: dependency.base.ipfsHash,
indexingStatus,
})
throw new Error(`Subgraph not found in indexing status`)
} else {
this.logger.info(
'Dependency subgraph found, will try to sync it to the block required',
{
deploymentStatus,
},
)
}
} else if (!dependencyAssignment) {
this.logger.debug(
'Dependency subgraph not found, creating, deploying and pausing...',
{
name,
deployment: dependency.base.display,
block_required: dependency.block,
},
)
// are we paused at the block we wanted?
if (dependencyAssignment) {
this.logger.info("Dependency subgraph found, checking if it's healthy", {
name,
deployment: dependency.base.display,
block_required: dependency.block,
})

await this.create(name)
await this.deploy(name, dependency.base)
}
await this.syncToBlockAndPause(
dependency.block,
dependency.base,
subgraphChainName,
const indexingStatus = await this.indexingStatus([dependency.base])
const deploymentStatus = indexingStatus.find(
(status) => status.subgraphDeployment.ipfsHash === dependency.base.ipfsHash,
)
if (!deploymentStatus) {
this.logger.error(`Subgraph not found in indexing status`, {
subgraph: dependency.base.ipfsHash,
indexingStatus,
})
throw new Error(`Subgraph not found in indexing status`)
} else {
this.logger.info(
'Dependency subgraph found, will try to sync it to the block required',
{
deploymentStatus,
},
)
}
} else {
this.logger.debug(
'Dependency subgraph not found, creating, deploying and pausing...',
{
name,
deployment: dependency.base.display,
block_required: dependency.block,
},
)
await this.create(name)
await this.deploy(name, dependency.base)
}
}

Expand All @@ -816,10 +879,11 @@ export class GraphNode {
* This will resume a paused subgraph if the block height target is higher than the
* current block height
*/
public async syncToBlockAndPause(
public async syncToBlock(
blockHeight: number,
subgraphDeployment: SubgraphDeploymentID,
chainName: string | null,
shouldPauseWhenComplete: boolean,
): Promise<void> {
async function waitForMs(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
Expand Down Expand Up @@ -916,8 +980,24 @@ export class GraphNode {
subgraph: subgraphDeployment.ipfsHash,
indexingStatus,
})
// pause the subgraph to prevent further indexing
await this.pause(subgraphDeployment)
// pause the subgraph to prevent further indexing (grafting path)
if (shouldPauseWhenComplete) {
this.logger.debug("Pausing subgraph deployment as it's synced to block", {
subgraph: subgraphDeployment.ipfsHash,
blockHeight,
})
await this.pause(subgraphDeployment)
} // Composition path
else {
this.logger.debug(
`Subgraph is syncd to block ${chain.latestBlock.number} not pausing.`,
{
subgraph: subgraphDeployment.ipfsHash,
indexingStatus,
},
)
break
}
} else {
this.logger.debug(`Subgraph already paused and synced to block.`, {
subgraph: subgraphDeployment.ipfsHash,
Expand Down

0 comments on commit ab45df7

Please sign in to comment.