Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dht.unannounce (do not merge yet) #184

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ module.**
a single parameter that is an `Error` or null.


#### `dht.unannounce(infoHash, [port], [callback])`

Similar to `dht.announce` except it will unannounce you as a peer sharing an infoHash.

#### `arr = dht.toJSON()`

Returns the current state of the DHT, including DHT nodes and BEP44 values.
Expand Down
86 changes: 70 additions & 16 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -394,21 +394,30 @@ DHT.prototype.announce = function (infoHash, port, cb) {
if (typeof port === 'function') return this.announce(infoHash, 0, port)
infoHash = toBuffer(infoHash)
if (!cb) cb = noop
this._updatePeer(true, infoHash, port, cb)
}

DHT.prototype.unannounce = function (infoHash, port, cb) {
if (typeof port === 'function') return this.unannounce(infoHash, 0, port)
infoHash = toBuffer(infoHash)
if (!cb) cb = noop
this._updatePeer(false, infoHash, port, cb)
}

DHT.prototype._updatePeer = function (announcing, infoHash, port, cb) {
var table = this._tables.get(infoHash.toString('hex'))
if (!table) return this._preannounce(infoHash, port, cb)
if (!table) return this._preUpdatePeer(announcing, infoHash, port, cb)

if (this._host) {
var dhtPort = this.listening ? this.address().port : 0
this._addPeer(
{host: this._host, port: port || dhtPort},
infoHash,
{host: this._host, port: dhtPort}
)
var peer = {host: this._host, port: port || dhtPort}
var from = {host: this._host, port: dhtPort}
if (announcing) this._addPeer(peer, infoHash, from)
else this._removePeer(peer, infoHash, from)
}

var message = {
q: 'announce_peer',
q: announcing ? 'announce_peer' : 'unannounce_peer',
a: {
id: this._rpc.id,
token: null, // queryAll sets this
Expand All @@ -418,17 +427,17 @@ DHT.prototype.announce = function (infoHash, port, cb) {
}
}

this._debug('announce %s %d', infoHash, port)
this._debug(announcing ? 'announce %s %d' : 'unannounce %s %d', infoHash, port)
this._rpc.queryAll(table.closest(infoHash), message, null, cb)
}

DHT.prototype._preannounce = function (infoHash, port, cb) {
DHT.prototype._preUpdatePeer = function (announcing, infoHash, port, cb) {
var self = this

this.lookup(infoHash, function (err) {
if (self.destroyed) return cb(new Error('dht is destroyed'))
if (err) return cb(err)
self.announce(infoHash, port, cb)
self._updatePeer(announcing, infoHash, port, cb)
})
}

Expand Down Expand Up @@ -510,6 +519,9 @@ DHT.prototype._onquery = function (query, peer) {
case 'announce_peer':
return this._onannouncepeer(query, peer)

case 'unannounce_peer':
return this._onunannouncepeer(query, peer)

case 'get':
return this._onget(query, peer)

Expand Down Expand Up @@ -547,21 +559,51 @@ DHT.prototype._ongetpeers = function (query, peer) {
}

DHT.prototype._onannouncepeer = function (query, peer) {
var req = this._validatePeerUpdate(query, peer)

if (!req) {
return this._rpc.error(peer, query, [203, 'cannot `announce_peer` with bad token'])
}

var from = {host: req.host, port: peer.port}
var infoHash = query.a.info_hash

this.emit('announce_peer', infoHash, from)
this._addPeer(req, infoHash, from)
this._rpc.response(peer, query, {id: this._rpc.id})
}

DHT.prototype._onunannouncepeer = function (query, peer) {
var req = this._validatePeerUpdate(query, peer)

if (!req) {
return this._rpc.error(peer, query, [203, 'cannot `unannounce_peer` with bad token'])
}

var from = {host: req.host, port: peer.port}
var infoHash = query.a.info_hash

this.emit('unannounce_peer', infoHash, from)
this._removePeer(req, infoHash, from)
this._rpc.response(peer, query, {id: this._rpc.id})
}

DHT.prototype._validatePeerUpdate = function (query, peer) {
var host = peer.address || peer.host
var port = query.a.implied_port ? peer.port : query.a.port
if (!port || typeof port !== 'number' || port <= 0 || port > 65535) return
var infoHash = query.a.info_hash
var token = query.a.token
if (!infoHash || !token) return

if (!this._validateToken(host, token)) {
return this._rpc.error(peer, query, [203, 'cannot `announce_peer` with bad token'])
}
if (!this._validateToken(host, token)) return null

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop null to match code above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still does the same. Kinda hard to see from the diff. Returning null in here triggers the .error handler in the caller


this.emit('announce_peer', infoHash, {host: host, port: peer.port})
return {host: host, port: port}
}

this._addPeer({host: host, port: port}, infoHash, {host: host, port: peer.port})
this._rpc.response(peer, query, {id: this._rpc.id})
DHT.prototype._removePeer = function (peer, infoHash, from) {
this._peers.remove(infoHash.toString('hex'), encodePeer(peer.host, peer.port))
this.emit('unannounce', peer, infoHash, from)
}

DHT.prototype._addPeer = function (peer, infoHash, from) {
Expand Down Expand Up @@ -813,6 +855,18 @@ PeerStore.prototype.add = function (key, peer) {
if (++this.used > this.max) this._evict()
}

PeerStore.prototype.remove = function (key, peer) {
var peers = this.peers.peek(key)
if (!peers) return false
var removed = peers.map.remove(peer.toString('hex'))
if (!removed) return false
this.used--
swap(peers.values, peers.values.length - 1, removed.index)
peers.values.pop()
if (!peers.values.length) this.peers.remove(key)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks similar to _evict below it, is there a chance for reuse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up writing a new simple module that encapsulates this instead. https://github.com/mafintosh/record-cache. Wanna use this here and get rid of peer store in general

return true
}

PeerStore.prototype._evict = function () {
var a = this.peers.peek(this.peers.tail)
var b = a.map.remove(a.map.tail)
Expand Down
37 changes: 30 additions & 7 deletions test/announce.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,41 @@ test('`announce` with {host: false}', function (t) {
})
})

test('`announce` with {host: "127.0.0.1"}', function (t) {
t.plan(3)
test('`announce` and `unannounce` with {host: "127.0.0.1"}', function (t) {
t.plan(6)
var dht = new DHT({ bootstrap: false, host: '127.0.0.1' })
common.failOnWarningOrError(t, dht)

var infoHash = common.randomId()
var unannounced = false

dht.announce(infoHash, 6969, function (err) {
t.pass(err instanceof Error, 'announce should fail')
dht.lookup(infoHash, function (err) {
t.error(err)
dht.destroy()
dht.unannounce(infoHash, 6969, function (err) {
t.pass(err instanceof Error, 'unannounce should fail')
unannounced = true
dht.lookup(infoHash, function (err) {
t.error(err)
dht.destroy()
})
})
})

dht.on('unannounce', function () {
t.pass('should unannounce')
})

dht.on('peer', function (peer) {
if (unannounced) t.fail('peer should be unannounced')
t.deepEqual(peer, { host: '127.0.0.1', port: 6969 })
})
})
})

test('announce with implied port', function (t) {
t.plan(2)
test('announce and unannounce with implied port', function (t) {
t.plan(4)
var dht1 = new DHT({ bootstrap: false })
var infoHash = common.randomId()

Expand All @@ -52,8 +66,17 @@ test('announce with implied port', function (t) {
dht2.announce(infoHash, function () {
dht2.once('peer', function (peer) {
t.deepEqual(peer, {host: '127.0.0.1', port: dht2.address().port})
dht1.destroy()
dht2.destroy()
dht2.unannounce(infoHash, function (err) {
t.error(err)
dht2.on('peer', function (peer) {
t.fail('should be no peers')
})
dht2.lookup(infoHash, function (err) {
t.error(err)
dht1.destroy()
dht2.destroy()
})
})
})

dht2.lookup(infoHash)
Expand Down
30 changes: 24 additions & 6 deletions test/horde.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ var from = 2
var to = 20

for (var i = from; i <= to; i++) {
runAnnounceLookupTest(i)
runAnnounceLookupTest(i, false)
runAnnounceLookupTest(i, true)
}

function runAnnounceLookupTest (numInstances) {
test('horde: announce+lookup with ' + numInstances + ' DHTs', function (t) {
function runAnnounceLookupTest (numInstances, unannounce) {
test('horde: announce+lookup' + (unannounce ? '+unannounce' : '') + ' with ' + numInstances + ' DHTs', function (t) {
var numRunning = numInstances
findPeers(numInstances, t, function (err, dhts) {
findPeers(numInstances, t, unannounce, function (err, dhts) {
if (err) throw err

dhts.forEach(function (dht) {
Expand All @@ -40,7 +41,7 @@ function runAnnounceLookupTest (numInstances) {
* Initialize [numInstances] dhts, have one announce an infoHash, and another perform a
* lookup. Times out after a while.
*/
function findPeers (numInstances, t, cb) {
function findPeers (numInstances, t, unannounce, cb) {
cb = once(cb)
var dhts = []
var timeoutId = setTimeout(function () {
Expand Down Expand Up @@ -69,16 +70,33 @@ function findPeers (numInstances, t, cb) {

// lookup from other DHTs
dhts[0].announce(infoHash, 9998, function () {
dhts[1].lookup(infoHash)
dhts[1].lookup(infoHash, function () {
if (unannounce) runUnannounce()
})
})
})

dhts[1].on('peer', function (peer, hash) {
t.equal(hash.toString('hex'), infoHash)
t.equal(peer.port, 9998)
if (unannounce) return
clearTimeout(timeoutId)
cb(null, dhts)
})

function runUnannounce () {
dhts[0].unannounce(infoHash, 9998, function (err) {
t.error(err)
dhts[1].on('peer', function () {
t.fail('peer should be unannounced')
})
dhts[1].lookup(infoHash, function (err) {
t.error(err)
clearTimeout(timeoutId)
cb(null, dhts)
})
})
}
}

/**
Expand Down