diff --git a/package-lock.json b/package-lock.json index eb6314b6..3ebf3f79 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14500,8 +14500,7 @@ "it-drain": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/it-drain/-/it-drain-1.0.1.tgz", - "integrity": "sha512-4aX8AsJWjRh0inNXGLa90fvxuB7vQY70WFasvskUMtpXXz8+MUH8R7PODBtn4yXCJ25ud2iRwWwa1g8DRDbrlA==", - "dev": true + "integrity": "sha512-4aX8AsJWjRh0inNXGLa90fvxuB7vQY70WFasvskUMtpXXz8+MUH8R7PODBtn4yXCJ25ud2iRwWwa1g8DRDbrlA==" }, "it-first": { "version": "1.0.2", diff --git a/package.json b/package.json index 4dbfc6a1..ccf85129 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,7 @@ "is-os": "^1.0.1", "it-all": "^1.0.1", "it-concat": "^1.0.0", + "it-drain": "^1.0.1", "it-last": "^1.0.1", "libp2p-webrtc-star": "^0.17.9", "mocha": "^7.1.1", diff --git a/test/ipns-pubsub.js b/test/ipns-pubsub.js index 2501e4d0..f553190b 100644 --- a/test/ipns-pubsub.js +++ b/test/ipns-pubsub.js @@ -6,6 +6,7 @@ const base64url = require('base64url') const ipns = require('ipns') const delay = require('delay') const last = require('it-last') +const drain = require('it-drain') const pRetry = require('p-retry') const waitFor = require('./utils/wait-for') const { expect } = require('./utils/chai') @@ -15,21 +16,23 @@ const daemonsOptions = { args: ['--enable-namesys-pubsub'] // enable ipns over pubsub } -const retryOptions = { - retries: 5 -} - const namespace = '/record/' const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU' describe('ipns-pubsub', function () { this.timeout(350 * 1000) - let nodes = [] + let go + let js + let otherGo // Spawn daemons before(async function () { - nodes = await Promise.all([ + [ + go, + js, + otherGo + ] = await Promise.all([ daemonFactory.spawn({ type: 'go', test: true, @@ -52,11 +55,11 @@ describe('ipns-pubsub', function () { // Connect nodes and wait for republish before(async function () { - await nodes[0].api.swarm.connect(nodes[1].api.peerId.addresses[0]) + await go.api.swarm.connect(js.api.peerId.addresses[0]) // TODO: go-ipfs needs two nodes in the DHT to be able to publish a record // Remove this when js-ipfs has a DHT - await nodes[0].api.swarm.connect(nodes[2].api.peerId.addresses[0]) + await go.api.swarm.connect(otherGo.api.peerId.addresses[0]) console.log('wait for republish as we can receive the republish message first') // eslint-disable-line await delay(60000) @@ -65,7 +68,7 @@ describe('ipns-pubsub', function () { after(() => daemonFactory.clean()) it('should get enabled state of pubsub', async function () { - for (const node of nodes) { + for (const node of [js, go]) { const state = await node.api.name.pubsub.state() expect(state).to.exist() expect(state.enabled).to.equal(true) @@ -75,14 +78,14 @@ describe('ipns-pubsub', function () { it('should publish the received record to a go node and a js subscriber should receive it', async function () { this.timeout(300 * 1000) // TODO find out why JS doesn't resolve, might be just missing a DHT - await expect(last(nodes[1].api.name.resolve(nodes[0].api.peerId.id, { stream: false }))).to.eventually.be.rejected.with(/was not found in the network/) - await subscribeToReceiveByPubsub(nodes[0], nodes[1], nodes[0].api.peerId.id, nodes[1].api.peerId.id) + await expect(last(js.api.name.resolve(go.api.peerId.id, { stream: false }))).to.eventually.be.rejected.with(/was not found in the network/) + await subscribeToReceiveByPubsub(go, js, go.api.peerId.id, js.api.peerId.id) }) it('should publish the received record to a js node and a go subscriber should receive it', async function () { this.timeout(350 * 1000) - await last(nodes[0].api.name.resolve(nodes[1].api.peerId.id, { stream: false })) - await subscribeToReceiveByPubsub(nodes[1], nodes[0], nodes[1].api.peerId.id, nodes[0].api.peerId.id) + await drain(go.api.name.resolve(js.api.peerId.id, { stream: false })) + await subscribeToReceiveByPubsub(js, go, js.api.peerId.id, go.api.peerId.id) }) }) @@ -115,23 +118,29 @@ const subscribeToReceiveByPubsub = async (nodeA, nodeB, idA, idB) => { } // wait until a peer know about other peer to subscribe a topic -const waitForNotificationOfSubscription = (daemon, topic, peerId) => pRetry(async () => { - const res = await daemon.pubsub.peers(topic) +const waitForNotificationOfSubscription = async (daemon, topic, peerId) => { + const start = Date.now() - if (!res || !res.length || !res.includes(peerId)) { - throw new Error('Could not find peer subscribing') - } -}, retryOptions) + await pRetry(async (attempt) => { + const res = await daemon.pubsub.peers(topic) + + if (!res.includes(peerId)) { + throw new Error(`Could not find peer ${peerId} subscription in list ${res} after ${attempt} retries and ${Date.now() - start}ms`) + } + }) +} // Wait until a peer subscribes a topic const waitForPeerToSubscribe = async (daemon, topic) => { - await pRetry(async () => { + const start = Date.now() + + await pRetry(async (attempt) => { const res = await daemon.pubsub.ls() - if (!res || !res.length || !res.includes(topic)) { - throw new Error('Could not find subscription') + if (!res.includes(topic)) { + throw new Error(`Could not find subscription to ${topic} in ${res} after ${attempt} retries and ${Date.now() - start}ms`) } return res[0] - }, retryOptions) + }) } diff --git a/test/pubsub.js b/test/pubsub.js index 1ced647a..5955b077 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -6,28 +6,26 @@ const pRetry = require('p-retry') const { expect } = require('./utils/chai') const daemonFactory = require('./utils/daemon-factory') -const retryOptions = { - retries: 5 -} - const waitForTopicPeer = (topic, peer, daemon) => { - return pRetry(async () => { + const start = Date.now() + + return pRetry(async (attempt) => { const peers = await daemon.api.pubsub.peers(topic) if (!peers.includes(peer.id)) { - throw new Error(`Could not find peer ${peer.id}`) + throw new Error(`Could not find peer ${peer.id} after ${attempt} retries and ${Date.now() - start}ms`) } - }, retryOptions) + }) } const daemonOptions = { args: ['--enable-pubsub-experiment'] } -const timeout = 20e3 +const timeout = 60 * 1000 describe('pubsub', function () { - this.timeout(60 * 1000) + this.timeout(timeout) const tests = { 'publish from Go, subscribe on Go': [() => daemonFactory.spawn({ ...daemonOptions, type: 'go' }), () => daemonFactory.spawn({ ...daemonOptions, type: 'go' })], @@ -42,13 +40,10 @@ describe('pubsub', function () { let daemon2 before('spawn nodes', async function () { - this.timeout(timeout) - ;[daemon1, daemon2] = await Promise.all(tests[name].map(fn => fn())) + [daemon1, daemon2] = await Promise.all(tests[name].map(fn => fn())) }) before('connect', async function () { - this.timeout(timeout) - await daemon1.api.swarm.connect(daemon2.api.peerId.addresses[0]) await daemon2.api.swarm.connect(daemon1.api.peerId.addresses[0]) @@ -63,22 +58,28 @@ describe('pubsub', function () { after(() => daemonFactory.clean()) - it('should exchange ascii data', function () { - const data = Buffer.from('hello world') - const topic = 'pubsub-ascii' + function testPubsub (data) { + const topic = 'pubsub-' + Math.random() - const subscriber = () => new Promise((resolve) => { + const subscriber = () => new Promise((resolve, reject) => { + daemon1.api.pubsub.subscribe(topic, () => {}) daemon2.api.pubsub.subscribe(topic, (msg) => { - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').and.to.include(topic) - expect(msg).to.have.property('from', daemon1.api.peerId.id) - resolve() + try { + expect(msg.data).to.deep.equal(data) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').and.to.include(topic) + expect(msg).to.have.property('from', daemon1.api.peerId.id) + + resolve() + } catch (err) { + reject(err) + } }) }) const publisher = async () => { + await waitForTopicPeer(topic, daemon1.api.peerId, daemon2) await waitForTopicPeer(topic, daemon2.api.peerId, daemon1) await daemon1.api.pubsub.publish(topic, data) } @@ -87,58 +88,18 @@ describe('pubsub', function () { subscriber(), publisher() ]) + } + + it('should exchange ascii data', function () { + return testPubsub(Buffer.from('hello world')) }) it('should exchange non ascii data', function () { - const data = Buffer.from('你好世界') - const topic = 'pubsub-non-ascii' - - const subscriber = () => new Promise((resolve) => { - daemon2.api.pubsub.subscribe(topic, (msg) => { - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').and.to.include(topic) - expect(msg).to.have.property('from', daemon1.api.peerId.id) - resolve() - }) - }) - - const publisher = async () => { - await waitForTopicPeer(topic, daemon2.api.peerId, daemon1) - await daemon1.api.pubsub.publish(topic, data) - } - - return Promise.all([ - subscriber(), - publisher() - ]) + return testPubsub(Buffer.from('你好世界')) }) it('should exchange binary data', function () { - const data = Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex') - const topic = 'pubsub-binary' - - const subscriber = () => new Promise((resolve) => { - daemon2.api.pubsub.subscribe(topic, (msg) => { - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').and.to.include(topic) - expect(msg).to.have.property('from', daemon1.api.peerId.id) - resolve() - }) - }) - - const publisher = async () => { - await waitForTopicPeer(topic, daemon2.api.peerId, daemon1) - await daemon1.api.pubsub.publish(topic, data) - } - - return Promise.all([ - subscriber(), - publisher() - ]) + return testPubsub(Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex')) }) }) })