Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!: replace dag walkers with generic CID extraction from blocks #447

Merged
merged 11 commits into from
Sep 16, 2024
24 changes: 5 additions & 19 deletions packages/block-brokers/src/bitswap.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createBitswap } from '@helia/bitswap'
import type { BitswapOptions, Bitswap, BitswapWantBlockProgressEvents, BitswapNotifyProgressEvents } from '@helia/bitswap'
import type { BlockAnnounceOptions, BlockBroker, BlockRetrievalOptions, CreateSessionOptions, Routing } from '@helia/interface'
import type { BlockAnnounceOptions, BlockBroker, BlockRetrievalOptions, CreateSessionOptions, Routing, HasherLoader } from '@helia/interface'
import type { Libp2p, Startable, ComponentLogger } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'
Expand All @@ -9,9 +9,9 @@ import type { MultihashHasher } from 'multiformats/hashes/interface'
interface BitswapComponents {
libp2p: Libp2p
blockstore: Blockstore
hashers: Record<string, MultihashHasher>
routing: Routing
logger: ComponentLogger
getHasher: HasherLoader
}

export interface BitswapInit extends BitswapOptions {
Expand All @@ -23,26 +23,12 @@ class BitswapBlockBroker implements BlockBroker<BitswapWantBlockProgressEvents,
private started: boolean

constructor (components: BitswapComponents, init: BitswapInit = {}) {
const { hashers } = components
const { getHasher } = components

this.bitswap = createBitswap(components, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
let hasher: MultihashHasher | undefined

if (typeof codecOrName === 'string') {
hasher = Object.values(hashers).find(hasher => {
return hasher.name === codecOrName
})
} else {
hasher = hashers[codecOrName]
}

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
getHasher: async (codecOrName: number): Promise<MultihashHasher<number>> => {
return getHasher(codecOrName)
}
},
...init
Expand Down
20 changes: 9 additions & 11 deletions packages/car/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@
import { CarWriter } from '@ipld/car'
import drain from 'it-drain'
import map from 'it-map'
import { createUnsafe } from 'multiformats/block'
import defer from 'p-defer'
import PQueue from 'p-queue'
import type { DAGWalker } from '@helia/interface'
import type { CodecLoader } from '@helia/interface'
import type { GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks'
import type { CarReader } from '@ipld/car'
import type { AbortOptions } from '@libp2p/interface'
Expand All @@ -74,7 +75,7 @@ import type { ProgressOptions } from 'progress-events'

export interface CarComponents {
blockstore: Blockstore
dagWalkers: Record<number, DAGWalker>
getCodec: CodecLoader
}

interface ExportCarOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents> {
Expand Down Expand Up @@ -235,18 +236,15 @@ class DefaultCar implements Car {
* and update the pin count for them
*/
async #walkDag (cid: CID, queue: PQueue, withBlock: (cid: CID, block: Uint8Array) => Promise<void>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void> {
const dagWalker = this.components.dagWalkers[cid.code]
const codec = await this.components.getCodec(cid.code)
const bytes = await this.components.blockstore.get(cid, options)

if (dagWalker == null) {
throw new Error(`No dag walker found for cid codec ${cid.code}`)
}

const block = await this.components.blockstore.get(cid, options)
await withBlock(cid, bytes)

await withBlock(cid, block)
const block = createUnsafe({ bytes, cid, codec })
achingbrain marked this conversation as resolved.
Show resolved Hide resolved

// walk dag, ensure all blocks are present
for await (const cid of dagWalker.walk(block)) {
for await (const [,cid] of block.links()) {
void queue.add(async () => {
await this.#walkDag(cid, queue, withBlock, options)
})
Expand All @@ -257,6 +255,6 @@ class DefaultCar implements Car {
/**
* Create a {@link Car} instance for use with {@link https://github.com/ipfs/helia Helia}
*/
export function car (helia: { blockstore: Blockstore, dagWalkers: Record<number, DAGWalker> }, init: any = {}): Car {
export function car (helia: CarComponents, init: any = {}): Car {
return new DefaultCar(helia, init)
}
27 changes: 0 additions & 27 deletions packages/car/test/fixtures/dag-walkers.ts

This file was deleted.

17 changes: 17 additions & 0 deletions packages/car/test/fixtures/get-codec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* eslint-env mocha */

import * as dagPb from '@ipld/dag-pb'
import * as raw from 'multiformats/codecs/raw'
import type { BlockCodec } from 'multiformats'

export function getCodec (code: number): BlockCodec<any, any> {
if (code === dagPb.code) {
return dagPb
}

if (code === raw.code) {
return raw
}

throw new Error(`Unknown codec ${code}`)
}

Check warning on line 17 in packages/car/test/fixtures/get-codec.ts

View check run for this annotation

Codecov / codecov/patch

packages/car/test/fixtures/get-codec.ts#L15-L17

Added lines #L15 - L17 were not covered by tests
16 changes: 8 additions & 8 deletions packages/car/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import { MemoryDatastore } from 'datastore-core'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import toBuffer from 'it-to-buffer'
import { car, type Car } from '../src/index.js'
import { dagWalkers } from './fixtures/dag-walkers.js'
import { largeFile, smallFile } from './fixtures/files.js'
import { getCodec } from './fixtures/get-codec.js'
import { memoryCarWriter } from './fixtures/memory-car.js'
import type { Blockstore } from 'interface-blockstore'

Expand All @@ -23,14 +23,14 @@ describe('import/export car file', () => {
beforeEach(async () => {
blockstore = new MemoryBlockstore()

c = car({ blockstore, dagWalkers })
c = car({ blockstore, getCodec })
u = unixfs({ blockstore })
})

it('exports and imports a car file', async () => {
const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })
const cid = await otherUnixFS.addBytes(smallFile)

const writer = memoryCarWriter(cid)
Expand All @@ -50,7 +50,7 @@ describe('import/export car file', () => {

const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })
const cid1 = await otherUnixFS.addBytes(fileData1)
const cid2 = await otherUnixFS.addBytes(fileData2)
const cid3 = await otherUnixFS.addBytes(fileData3)
Expand All @@ -70,7 +70,7 @@ describe('import/export car file', () => {
it('exports and imports a multiple block car file', async () => {
const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })
const cid = await otherUnixFS.addBytes(largeFile)

const writer = memoryCarWriter(cid)
Expand All @@ -90,7 +90,7 @@ describe('import/export car file', () => {

const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })
const cid1 = await otherUnixFS.addBytes(fileData1, {
chunker: fixedSize({
chunkSize: 2
Expand Down Expand Up @@ -124,7 +124,7 @@ describe('import/export car file', () => {
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherDatastore = new MemoryDatastore()
const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })

await otherMFS.mkdir('/testDups')
await otherMFS.mkdir('/testDups/sub')
Expand All @@ -151,7 +151,7 @@ describe('import/export car file', () => {
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherDatastore = new MemoryDatastore()
const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })

await otherMFS.mkdir('/testDups')
await otherMFS.mkdir('/testDups/sub')
Expand Down
4 changes: 2 additions & 2 deletions packages/car/test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import toBuffer from 'it-to-buffer'
import { car, type Car } from '../src/index.js'
import { dagWalkers } from './fixtures/dag-walkers.js'
import { smallFile } from './fixtures/files.js'
import { getCodec } from './fixtures/get-codec.js'
import { memoryCarWriter } from './fixtures/memory-car.js'
import type { Blockstore } from 'interface-blockstore'

Expand All @@ -18,7 +18,7 @@ describe('stream car file', () => {
beforeEach(async () => {
blockstore = new MemoryBlockstore()

c = car({ blockstore, dagWalkers })
c = car({ blockstore, getCodec })
u = unixfs({ blockstore })
})

Expand Down
18 changes: 18 additions & 0 deletions packages/interface/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,21 @@ export class NoRoutersAvailableError extends Error {
this.name = 'NoRoutersAvailableError'
}
}

export class UnknownHashAlgorithmError extends Error {
static name = 'UnknownHashAlgorithmError'

constructor (message = 'Unknown hash algorithm') {
super(message)
this.name = 'UnknownHashAlgorithmError'
}
}

export class UnknownCodecError extends Error {
static name = 'UnknownCodecError'

constructor (message = 'Unknown codec') {
super(message)
this.name = 'UnknownCodecError'
}
}
44 changes: 23 additions & 21 deletions packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,21 @@ import type { Routing } from './routing.js'
import type { AbortOptions, ComponentLogger, Metrics } from '@libp2p/interface'
import type { DNS } from '@multiformats/dns'
import type { Datastore } from 'interface-datastore'
import type { MultihashHasher } from 'multiformats'
import type { Await } from 'interface-store'
import type { BlockCodec, MultihashHasher } from 'multiformats'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

export type { Await, AwaitIterable } from 'interface-store'

export interface CodecLoader {
<T = any, Code extends number = any>(code: Code): Await<BlockCodec<Code, T>>
}

export interface HasherLoader {
(code: number): Await<MultihashHasher>
}

/**
* The API presented by a Helia node
*/
Expand Down Expand Up @@ -56,18 +65,6 @@ export interface Helia {
*/
routing: Routing

/**
* DAGWalkers are codec-specific implementations that know how to yield all
* CIDs contained within a block that corresponds to that codec.
*/
dagWalkers: Record<number, DAGWalker>

/**
* Hashers can be used to hash a piece of data with the specified hashing
* algorithm.
*/
hashers: Record<number, MultihashHasher>

/**
* The DNS property can be used to perform lookups of various record types and
* will use a resolver appropriate to the current platform.
Expand All @@ -94,6 +91,19 @@ export interface Helia {
* Remove any unpinned blocks from the blockstore
*/
gc(options?: GCOptions): Promise<void>

/**
* Load an IPLD codec. Implementations may return a promise if, for example,
* the codec is being fetched from the network.
*/
getCodec: CodecLoader

/**
* Hashers can be used to hash a piece of data with the specified hashing
* algorithm. Implementations may return a promise if, for example,
* the hasher is being fetched from the network.
*/
getHasher: HasherLoader
}

export type GcEvents =
Expand All @@ -104,14 +114,6 @@ export interface GCOptions extends AbortOptions, ProgressOptions<GcEvents> {

}

/**
* DAGWalkers take a block and yield CIDs encoded in that block
*/
export interface DAGWalker {
codec: number
walk(block: Uint8Array): Generator<CID, void, undefined>
}

export * from './blocks.js'
export * from './errors.js'
export * from './pins.js'
Expand Down
Loading
Loading