diff --git a/src/index.js b/src/index.js index 21bbd1de..675f02e4 100644 --- a/src/index.js +++ b/src/index.js @@ -22,6 +22,7 @@ const privateApi = require('./private') const Providers = require('./providers') const Message = require('./message') const RandomWalk = require('./random-walk') +const QueryManager = require('./query-manager') const assert = require('assert') const mergeOptions = require('merge-options') @@ -121,7 +122,7 @@ class KadDHT extends EventEmitter { Object.keys(pa).forEach((name) => { this[name] = pa[name] }) /** - * Provider management + * Random walk management * * @type {RandomWalk} */ @@ -134,6 +135,13 @@ class KadDHT extends EventEmitter { this.randomWalkQueriesPerPeriod = parseInt(options.randomWalk.queriesPerPeriod) this.randomWalkInterval = parseInt(options.randomWalk.interval) this.randomWalkTimeout = parseInt(options.randomWalk.timeout) + + /** + * Keeps track of running queries + * + * @type {QueryManager} + */ + this._queryManager = new QueryManager() } /** @@ -153,6 +161,7 @@ class KadDHT extends EventEmitter { */ start (callback) { this._running = true + this._queryManager.start() this.network.start((err) => { if (err) { return callback(err) @@ -177,6 +186,7 @@ class KadDHT extends EventEmitter { this.providers.stop() this.network.stop(callback) }) + this._queryManager.stop() } /** diff --git a/src/query-manager.js b/src/query-manager.js new file mode 100644 index 00000000..3ebbb907 --- /dev/null +++ b/src/query-manager.js @@ -0,0 +1,52 @@ +'use strict' + +/** + * Keeps track of all running queries. + */ +class QueryManager { + /** + * Creates a new QueryManager. + */ + constructor () { + this.queries = new Set() + this.running = false + } + + /** + * Called when a query is started. + * + * @param {Query} query + */ + queryStarted (query) { + this.queries.add(query) + } + + /** + * Called when a query completes. + * + * @param {Query} query + */ + queryCompleted (query) { + this.queries.delete(query) + } + + /** + * Starts the query manager. + */ + start () { + this.running = true + } + + /** + * Stops all queries. + */ + stop () { + this.running = false + for (const query of this.queries) { + query.stop() + } + this.queries.clear() + } +} + +module.exports = QueryManager diff --git a/src/query.js b/src/query.js index 97e044b0..0d5fe3b5 100644 --- a/src/query.js +++ b/src/query.js @@ -54,17 +54,21 @@ class Query { * @returns {void} */ run (peers, callback) { + if (!this.dht._queryManager.running) { + this._log.error('Attempt to run query after shutdown') + return callback(null, { finalSet: new Set(), paths: [] }) + } + if (peers.length === 0) { + this._log.error('Running query with no peers') + return callback(null, { finalSet: new Set(), paths: [] }) + } + const run = { peersSeen: new Set(), errors: [], paths: null // array of states per disjoint path } - if (peers.length === 0) { - this._log.error('Running query with no peers') - return callback() - } - // create correct number of paths const numPaths = Math.min(c.DISJOINT_PATHS, peers.length) const pathPeers = [] @@ -85,6 +89,9 @@ class Query { } }) + // Register this query so we stop it if the DHT stops + this.dht._queryManager.queryStarted(this) + // Create a manager to keep track of the worker queue for each path this.workerManager = new WorkerManager() each(run.paths, (path, cb) => { @@ -133,6 +140,7 @@ class Query { */ stop () { this.workerManager && this.workerManager.stop() + this.dht._queryManager.queryCompleted(this) } } diff --git a/test/kad-dht.spec.js b/test/kad-dht.spec.js index 51d15c71..fc6f7bc3 100644 --- a/test/kad-dht.spec.js +++ b/test/kad-dht.spec.js @@ -953,29 +953,30 @@ describe('KadDHT', () => { sw.connection.addStreamMuxer(Mplex) sw.connection.reuse() const dht = new KadDHT(sw) + dht.start(() => { + const key = Buffer.from('/v/hello') + const value = Buffer.from('world') + const rec = new Record(key, value) + + const stubs = [ + // Simulate returning a peer id to query + sinon.stub(dht.routingTable, 'closestPeers').returns([peerInfos[1].id]), + // Simulate going out to the network and returning the record + sinon.stub(dht, '_getValueOrPeers').callsFake((peer, k, cb) => { + cb(null, rec) + }) + ] - const key = Buffer.from('/v/hello') - const value = Buffer.from('world') - const rec = new Record(key, value) + dht.getMany(key, 1, (err, res) => { + expect(err).to.not.exist() + expect(res.length).to.eql(1) + expect(res[0].val).to.eql(value) - const stubs = [ - // Simulate returning a peer id to query - sinon.stub(dht.routingTable, 'closestPeers').returns([peerInfos[1].id]), - // Simulate going out to the network and returning the record - sinon.stub(dht, '_getValueOrPeers').callsFake((peer, k, cb) => { - cb(null, rec) + for (const stub of stubs) { + stub.restore() + } + done() }) - ] - - dht.getMany(key, 1, (err, res) => { - expect(err).to.not.exist() - expect(res.length).to.eql(1) - expect(res[0].val).to.eql(value) - - for (const stub of stubs) { - stub.restore() - } - done() }) }) }) diff --git a/test/query.spec.js b/test/query.spec.js index 22dae4d1..82e8a938 100644 --- a/test/query.spec.js +++ b/test/query.spec.js @@ -16,6 +16,15 @@ const Query = require('../src/query') const createPeerInfo = require('./utils/create-peer-info') const createDisjointTracks = require('./utils/create-disjoint-tracks') +const createDHT = (peerInfos, cb) => { + const sw = new Switch(peerInfos[0], new PeerBook()) + sw.transport.add('tcp', new TCP()) + sw.connection.addStreamMuxer(Mplex) + sw.connection.reuse() + const d = new DHT(sw) + d.start(() => cb(null, d)) +} + describe('Query', () => { let peerInfos let dht @@ -28,13 +37,14 @@ describe('Query', () => { } peerInfos = result - const sw = new Switch(peerInfos[0], new PeerBook()) - sw.transport.add('tcp', new TCP()) - sw.connection.addStreamMuxer(Mplex) - sw.connection.reuse() - dht = new DHT(sw) + createDHT(peerInfos, (err, d) => { + if (err) { + return done(err) + } - done() + dht = d + done() + }) }) }) @@ -117,6 +127,23 @@ describe('Query', () => { }) }) + it('returns empty run if initial peer list is empty', (done) => { + const peer = peerInfos[0] + + const query = (p, cb) => {} + + const q = new Query(dht, peer.id.id, () => query) + q.run([], (err, res) => { + expect(err).to.not.exist() + + // Should not visit any peers + expect(res.paths.length).to.eql(0) + expect(res.finalSet.size).to.eql(0) + + done() + }) + }) + it('only closerPeers', (done) => { const peer = peerInfos[0] @@ -234,6 +261,109 @@ describe('Query', () => { }) }) + it('all queries stop after shutdown', (done) => { + createDHT(peerInfos, (err, dhtA) => { + if (err) { + return done(err) + } + + const peer = peerInfos[0] + + // mock this so we can dial non existing peers + dhtA.switch.dial = (peer, callback) => callback() + + // 1 -> 2 -> 3 -> 4 + const topology = { + [peerInfos[1].id.toB58String()]: { + closer: [peerInfos[2]] + }, + [peerInfos[2].id.toB58String()]: { + closer: [peerInfos[3]] + }, + // Should not reach here because query gets shut down + [peerInfos[3].id.toB58String()]: { + closer: [peerInfos[4]] + } + } + + const visited = [] + const query = (p, cb) => { + visited.push(p) + + const invokeCb = () => { + const res = topology[p.toB58String()] || {} + cb(null, { + closerPeers: res.closer || [] + }) + } + + // Shut down after visiting peerInfos[2] + if (p.toB58String() === peerInfos[2].id.toB58String()) { + dhtA.stop(invokeCb) + setTimeout(checkExpectations, 100) + } else { + invokeCb() + } + } + + const q = new Query(dhtA, peer.id.id, () => query) + q.run([peerInfos[1].id], (err, res) => { + expect(err).to.not.exist() + }) + + function checkExpectations () { + // Should only visit peers up to the point where we shut down + expect(visited).to.eql([peerInfos[1].id, peerInfos[2].id]) + + done() + } + }) + }) + + it('queries run after shutdown return immediately', (done) => { + createDHT(peerInfos, (err, dhtA) => { + if (err) { + return done(err) + } + + const peer = peerInfos[0] + + // mock this so we can dial non existing peers + dhtA.switch.dial = (peer, callback) => callback() + + // 1 -> 2 -> 3 + const topology = { + [peerInfos[1].id.toB58String()]: { + closer: [peerInfos[2]] + }, + [peerInfos[2].id.toB58String()]: { + closer: [peerInfos[3]] + } + } + + const query = (p, cb) => { + const res = topology[p.toB58String()] || {} + cb(null, { + closerPeers: res.closer || [] + }) + } + + const q = new Query(dhtA, peer.id.id, () => query) + + dhtA.stop(() => { + q.run([peerInfos[1].id], (err, res) => { + expect(err).to.not.exist() + + // Should not visit any peers + expect(res.paths.length).to.eql(0) + expect(res.finalSet.size).to.eql(0) + + done() + }) + }) + }) + }) + it('disjoint path values', (done) => { const peer = peerInfos[0] const values = ['v0', 'v1'].map(Buffer.from)