Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Stop running queries on shutdown #95

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const privateApi = require('./private')
const Providers = require('./providers')
const Message = require('./message')
const RandomWalk = require('./random-walk')
const QueryManager = require('./query-manager')
const assert = require('assert')
const mergeOptions = require('merge-options')

Expand Down Expand Up @@ -121,7 +122,7 @@ class KadDHT extends EventEmitter {
Object.keys(pa).forEach((name) => { this[name] = pa[name] })

/**
* Provider management
* Random walk management
*
* @type {RandomWalk}
*/
Expand All @@ -134,6 +135,13 @@ class KadDHT extends EventEmitter {
this.randomWalkQueriesPerPeriod = parseInt(options.randomWalk.queriesPerPeriod)
this.randomWalkInterval = parseInt(options.randomWalk.interval)
this.randomWalkTimeout = parseInt(options.randomWalk.timeout)

/**
* Keeps track of running queries
*
* @type {QueryManager}
*/
this._queryManager = new QueryManager()
}

/**
Expand All @@ -153,6 +161,7 @@ class KadDHT extends EventEmitter {
*/
start (callback) {
this._running = true
this._queryManager.start()
this.network.start((err) => {
if (err) {
return callback(err)
Expand All @@ -177,6 +186,7 @@ class KadDHT extends EventEmitter {
this.providers.stop()
this.network.stop(callback)
})
this._queryManager.stop()
}

/**
Expand Down
52 changes: 52 additions & 0 deletions src/query-manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict'

/**
* Keeps track of all running queries.
*/
class QueryManager {
/**
* Creates a new QueryManager.
*/
constructor () {
this.queries = new Set()
this.running = false
}

/**
* Called when a query is started.
*
* @param {Query} query
*/
queryStarted (query) {
this.queries.add(query)
}

/**
* Called when a query completes.
*
* @param {Query} query
*/
queryCompleted (query) {
this.queries.delete(query)
}

/**
* Starts the query manager.
*/
start () {
this.running = true
}

/**
* Stops all queries.
*/
stop () {
this.running = false
for (const query of this.queries) {
query.stop()
}
this.queries.clear()
}
}

module.exports = QueryManager
9 changes: 9 additions & 0 deletions src/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class Query {
* @returns {void}
*/
run (peers, callback) {
if (!this.dht._queryManager.running) {
this._log.error('Attempt to run query after shutdown')
return callback()
}

const run = {
peersSeen: new Set(),
errors: [],
Expand Down Expand Up @@ -85,6 +90,9 @@ class Query {
}
})

// Register this query so we stop it if the DHT stops
this.dht._queryManager.queryStarted(this)

// Create a manager to keep track of the worker queue for each path
this.workerManager = new WorkerManager()
each(run.paths, (path, cb) => {
Expand Down Expand Up @@ -133,6 +141,7 @@ class Query {
*/
stop () {
this.workerManager && this.workerManager.stop()
this.dht._queryManager.queryCompleted(this)
}
}

Expand Down
41 changes: 21 additions & 20 deletions test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -953,29 +953,30 @@ describe('KadDHT', () => {
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const dht = new KadDHT(sw)
dht.start(() => {
const key = Buffer.from('/v/hello')
const value = Buffer.from('world')
const rec = new Record(key, value)

const stubs = [
// Simulate returning a peer id to query
sinon.stub(dht.routingTable, 'closestPeers').returns([peerInfos[1].id]),
// Simulate going out to the network and returning the record
sinon.stub(dht, '_getValueOrPeers').callsFake((peer, k, cb) => {
cb(null, rec)
})
]

const key = Buffer.from('/v/hello')
const value = Buffer.from('world')
const rec = new Record(key, value)
dht.getMany(key, 1, (err, res) => {
expect(err).to.not.exist()
expect(res.length).to.eql(1)
expect(res[0].val).to.eql(value)

const stubs = [
// Simulate returning a peer id to query
sinon.stub(dht.routingTable, 'closestPeers').returns([peerInfos[1].id]),
// Simulate going out to the network and returning the record
sinon.stub(dht, '_getValueOrPeers').callsFake((peer, k, cb) => {
cb(null, rec)
for (const stub of stubs) {
stub.restore()
}
done()
})
]

dht.getMany(key, 1, (err, res) => {
expect(err).to.not.exist()
expect(res.length).to.eql(1)
expect(res[0].val).to.eql(value)

for (const stub of stubs) {
stub.restore()
}
done()
})
})
})
Expand Down
81 changes: 75 additions & 6 deletions test/query.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ const Query = require('../src/query')
const createPeerInfo = require('./utils/create-peer-info')
const createDisjointTracks = require('./utils/create-disjoint-tracks')

const createDHT = (peerInfos, cb) => {
const sw = new Switch(peerInfos[0], new PeerBook())
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const d = new DHT(sw)
d.start(() => cb(null, d))
}

describe('Query', () => {
let peerInfos
let dht
Expand All @@ -28,13 +37,14 @@ describe('Query', () => {
}

peerInfos = result
const sw = new Switch(peerInfos[0], new PeerBook())
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
dht = new DHT(sw)
createDHT(peerInfos, (err, d) => {
if (err) {
return done(err)
}

done()
dht = d
done()
})
})
})

Expand Down Expand Up @@ -234,6 +244,65 @@ describe('Query', () => {
})
})

it('all queries stop after shutdown', (done) => {
createDHT(peerInfos, (err, dhtA) => {
if (err) {
return done(err)
}

const peer = peerInfos[0]

// mock this so we can dial non existing peers
dhtA.switch.dial = (peer, callback) => callback()

// 1 -> 2 -> 3 -> 4
const topology = {
[peerInfos[1].id.toB58String()]: {
closer: [peerInfos[2]]
},
[peerInfos[2].id.toB58String()]: {
closer: [peerInfos[3]]
},
// Should not reach here because query gets shut down
[peerInfos[3].id.toB58String()]: {
closer: [peerInfos[4]]
}
}

const visited = []
const query = (p, cb) => {
visited.push(p)

const invokeCb = () => {
const res = topology[p.toB58String()] || {}
cb(null, {
closerPeers: res.closer || []
})
}

// Shut down after visiting peerInfos[2]
if (p.toB58String() === peerInfos[2].id.toB58String()) {
dhtA.stop(invokeCb)
setTimeout(checkExpectations, 100)
} else {
invokeCb()
}
}

const q = new Query(dhtA, peer.id.id, () => query)
q.run([peerInfos[1].id], (err, res) => {
expect(err).to.not.exist()
})

function checkExpectations () {
// Should only visit peers up to the point where we shut down
expect(visited).to.eql([peerInfos[1].id, peerInfos[2].id])

done()
}
})
})

it('disjoint path values', (done) => {
const peer = peerInfos[0]
const values = ['v0', 'v1'].map(Buffer.from)
Expand Down