Skip to content
This repository has been archived by the owner on Aug 1, 2023. It is now read-only.

chore: add gossipsub grafting delay #116

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"is-os": "^1.0.1",
"it-all": "^1.0.1",
"it-concat": "^1.0.0",
"it-drain": "^1.0.1",
"it-last": "^1.0.1",
"libp2p-webrtc-star": "^0.17.9",
"mocha": "^7.1.1",
Expand Down
55 changes: 32 additions & 23 deletions test/ipns-pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const base64url = require('base64url')
const ipns = require('ipns')
const delay = require('delay')
const last = require('it-last')
const drain = require('it-drain')
const pRetry = require('p-retry')
const waitFor = require('./utils/wait-for')
const { expect } = require('./utils/chai')
Expand All @@ -15,21 +16,23 @@ const daemonsOptions = {
args: ['--enable-namesys-pubsub'] // enable ipns over pubsub
}

const retryOptions = {
retries: 5
}

const namespace = '/record/'

const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU'

describe('ipns-pubsub', function () {
this.timeout(350 * 1000)
let nodes = []
let go
let js
let otherGo

// Spawn daemons
before(async function () {
nodes = await Promise.all([
[
go,
js,
otherGo
] = await Promise.all([
daemonFactory.spawn({
type: 'go',
test: true,
Expand All @@ -52,11 +55,11 @@ describe('ipns-pubsub', function () {

// Connect nodes and wait for republish
before(async function () {
await nodes[0].api.swarm.connect(nodes[1].api.peerId.addresses[0])
await go.api.swarm.connect(js.api.peerId.addresses[0])

// TODO: go-ipfs needs two nodes in the DHT to be able to publish a record
// Remove this when js-ipfs has a DHT
await nodes[0].api.swarm.connect(nodes[2].api.peerId.addresses[0])
await go.api.swarm.connect(otherGo.api.peerId.addresses[0])

console.log('wait for republish as we can receive the republish message first') // eslint-disable-line
await delay(60000)
Expand All @@ -65,7 +68,7 @@ describe('ipns-pubsub', function () {
after(() => daemonFactory.clean())

it('should get enabled state of pubsub', async function () {
for (const node of nodes) {
for (const node of [js, go]) {
const state = await node.api.name.pubsub.state()
expect(state).to.exist()
expect(state.enabled).to.equal(true)
Expand All @@ -75,14 +78,14 @@ describe('ipns-pubsub', function () {
it('should publish the received record to a go node and a js subscriber should receive it', async function () {
this.timeout(300 * 1000)
// TODO find out why JS doesn't resolve, might be just missing a DHT
await expect(last(nodes[1].api.name.resolve(nodes[0].api.peerId.id, { stream: false }))).to.eventually.be.rejected.with(/was not found in the network/)
await subscribeToReceiveByPubsub(nodes[0], nodes[1], nodes[0].api.peerId.id, nodes[1].api.peerId.id)
await expect(last(js.api.name.resolve(go.api.peerId.id, { stream: false }))).to.eventually.be.rejected.with(/was not found in the network/)
await subscribeToReceiveByPubsub(go, js, go.api.peerId.id, js.api.peerId.id)
})

it('should publish the received record to a js node and a go subscriber should receive it', async function () {
this.timeout(350 * 1000)
await last(nodes[0].api.name.resolve(nodes[1].api.peerId.id, { stream: false }))
await subscribeToReceiveByPubsub(nodes[1], nodes[0], nodes[1].api.peerId.id, nodes[0].api.peerId.id)
await drain(go.api.name.resolve(js.api.peerId.id, { stream: false }))
await subscribeToReceiveByPubsub(js, go, js.api.peerId.id, go.api.peerId.id)
})
})

Expand Down Expand Up @@ -115,23 +118,29 @@ const subscribeToReceiveByPubsub = async (nodeA, nodeB, idA, idB) => {
}

// wait until a peer know about other peer to subscribe a topic
const waitForNotificationOfSubscription = (daemon, topic, peerId) => pRetry(async () => {
const res = await daemon.pubsub.peers(topic)
const waitForNotificationOfSubscription = async (daemon, topic, peerId) => {
const start = Date.now()

if (!res || !res.length || !res.includes(peerId)) {
throw new Error('Could not find peer subscribing')
}
}, retryOptions)
await pRetry(async (attempt) => {
const res = await daemon.pubsub.peers(topic)

if (!res.includes(peerId)) {
throw new Error(`Could not find peer ${peerId} subscription in list ${res} after ${attempt} retries and ${Date.now() - start}ms`)
}
})
}

// Wait until a peer subscribes a topic
const waitForPeerToSubscribe = async (daemon, topic) => {
await pRetry(async () => {
const start = Date.now()

await pRetry(async (attempt) => {
const res = await daemon.pubsub.ls()

if (!res || !res.length || !res.includes(topic)) {
throw new Error('Could not find subscription')
if (!res.includes(topic)) {
throw new Error(`Could not find subscription to ${topic} in ${res} after ${attempt} retries and ${Date.now() - start}ms`)
}

return res[0]
}, retryOptions)
})
}
99 changes: 30 additions & 69 deletions test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,26 @@ const pRetry = require('p-retry')
const { expect } = require('./utils/chai')
const daemonFactory = require('./utils/daemon-factory')

const retryOptions = {
retries: 5
}

const waitForTopicPeer = (topic, peer, daemon) => {
return pRetry(async () => {
const start = Date.now()

return pRetry(async (attempt) => {
const peers = await daemon.api.pubsub.peers(topic)

if (!peers.includes(peer.id)) {
throw new Error(`Could not find peer ${peer.id}`)
throw new Error(`Could not find peer ${peer.id} after ${attempt} retries and ${Date.now() - start}ms`)
}
}, retryOptions)
})
}

const daemonOptions = {
args: ['--enable-pubsub-experiment']
}

const timeout = 20e3
const timeout = 60 * 1000

describe('pubsub', function () {
this.timeout(60 * 1000)
this.timeout(timeout)

const tests = {
'publish from Go, subscribe on Go': [() => daemonFactory.spawn({ ...daemonOptions, type: 'go' }), () => daemonFactory.spawn({ ...daemonOptions, type: 'go' })],
Expand All @@ -42,13 +40,10 @@ describe('pubsub', function () {
let daemon2

before('spawn nodes', async function () {
this.timeout(timeout)
;[daemon1, daemon2] = await Promise.all(tests[name].map(fn => fn()))
[daemon1, daemon2] = await Promise.all(tests[name].map(fn => fn()))
})

before('connect', async function () {
this.timeout(timeout)

await daemon1.api.swarm.connect(daemon2.api.peerId.addresses[0])
await daemon2.api.swarm.connect(daemon1.api.peerId.addresses[0])

Expand All @@ -63,22 +58,28 @@ describe('pubsub', function () {

after(() => daemonFactory.clean())

it('should exchange ascii data', function () {
const data = Buffer.from('hello world')
const topic = 'pubsub-ascii'
function testPubsub (data) {
const topic = 'pubsub-' + Math.random()

const subscriber = () => new Promise((resolve) => {
const subscriber = () => new Promise((resolve, reject) => {
daemon1.api.pubsub.subscribe(topic, () => {})
daemon2.api.pubsub.subscribe(topic, (msg) => {
expect(msg.data.toString()).to.equal(data.toString())
expect(msg).to.have.property('seqno')
expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true)
expect(msg).to.have.property('topicIDs').and.to.include(topic)
expect(msg).to.have.property('from', daemon1.api.peerId.id)
resolve()
try {
expect(msg.data).to.deep.equal(data)
expect(msg).to.have.property('seqno')
expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true)
expect(msg).to.have.property('topicIDs').and.to.include(topic)
expect(msg).to.have.property('from', daemon1.api.peerId.id)

resolve()
} catch (err) {
reject(err)
}
})
})

const publisher = async () => {
await waitForTopicPeer(topic, daemon1.api.peerId, daemon2)
await waitForTopicPeer(topic, daemon2.api.peerId, daemon1)
await daemon1.api.pubsub.publish(topic, data)
}
Expand All @@ -87,58 +88,18 @@ describe('pubsub', function () {
subscriber(),
publisher()
])
}

it('should exchange ascii data', function () {
return testPubsub(Buffer.from('hello world'))
})

it('should exchange non ascii data', function () {
const data = Buffer.from('你好世界')
const topic = 'pubsub-non-ascii'

const subscriber = () => new Promise((resolve) => {
daemon2.api.pubsub.subscribe(topic, (msg) => {
expect(msg.data.toString()).to.equal(data.toString())
expect(msg).to.have.property('seqno')
expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true)
expect(msg).to.have.property('topicIDs').and.to.include(topic)
expect(msg).to.have.property('from', daemon1.api.peerId.id)
resolve()
})
})

const publisher = async () => {
await waitForTopicPeer(topic, daemon2.api.peerId, daemon1)
await daemon1.api.pubsub.publish(topic, data)
}

return Promise.all([
subscriber(),
publisher()
])
return testPubsub(Buffer.from('你好世界'))
})

it('should exchange binary data', function () {
const data = Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex')
const topic = 'pubsub-binary'

const subscriber = () => new Promise((resolve) => {
daemon2.api.pubsub.subscribe(topic, (msg) => {
expect(msg.data.toString()).to.equal(data.toString())
expect(msg).to.have.property('seqno')
expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true)
expect(msg).to.have.property('topicIDs').and.to.include(topic)
expect(msg).to.have.property('from', daemon1.api.peerId.id)
resolve()
})
})

const publisher = async () => {
await waitForTopicPeer(topic, daemon2.api.peerId, daemon1)
await daemon1.api.pubsub.publish(topic, data)
}

return Promise.all([
subscriber(),
publisher()
])
return testPubsub(Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex'))
})
})
})
Expand Down