Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: performance improvements (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun authored and vasco-santos committed May 8, 2019
1 parent f785c62 commit ddf80fe
Show file tree
Hide file tree
Showing 17 changed files with 1,175 additions and 693 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"release-minor": "aegir release --type minor --docs -t node",
"release-major": "aegir release --type major --docs -t node",
"coverage": "aegir coverage",
"coverage-publish": "aegir-coverage publish"
"coverage-publish": "aegir-coverage publish",
"sim": "node test/simulation/index.js"
},
"pre-push": [
"lint",
Expand Down
8 changes: 2 additions & 6 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ exports.PROVIDERS_VALIDITY = 24 * hour

exports.PROVIDERS_CLEANUP_INTERVAL = hour

exports.READ_MESSAGE_TIMEOUT = minute
exports.READ_MESSAGE_TIMEOUT = 10 * second

// The number of records that will be retrieved on a call to getMany()
exports.GET_MANY_RECORD_COUNT = 16
Expand All @@ -34,16 +34,12 @@ exports.K = 20
// Alpha is the concurrency for asynchronous requests
exports.ALPHA = 3

// Number of disjoint query paths to use
// This is set to K/2 per the S/Kademlia paper
exports.DISJOINT_PATHS = 10

exports.maxMessageSize = 2 << 22 // 4MB

exports.defaultRandomWalk = {
enabled: true,
queriesPerPeriod: 1,
interval: 5 * minute,
timeout: 30 * second,
timeout: 10 * second,
delay: 10 * second
}
37 changes: 28 additions & 9 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class KadDHT extends EventEmitter {
* @param {Switch} sw libp2p-switch instance
* @param {object} options DHT options
* @param {number} options.kBucketSize k-bucket size (default 20)
* @param {number} options.concurrency alpha concurrency of queries (default 3)
* @param {Datastore} options.datastore datastore (default MemoryDatastore)
* @param {object} options.validators validators object with namespace as keys and function(key, record, callback)
* @param {object} options.selectors selectors object with namespace as keys and function(key, records)
Expand Down Expand Up @@ -75,11 +76,17 @@ class KadDHT extends EventEmitter {
this.kBucketSize = options.kBucketSize || c.K

/**
* Number of closest peers to return on kBucket search, default 20
*
* ALPHA concurrency at which each query path with run, defaults to 3
* @type {number}
*/
this.concurrency = options.concurrency || c.ALPHA

/**
* Number of disjoint query paths to use
* This is set to `kBucketSize`/2 per the S/Kademlia paper
* @type {number}
*/
this.ncp = options.ncp || c.K
this.disjointPaths = Math.ceil(this.kBucketSize / 2)

/**
* The routing table.
Expand Down Expand Up @@ -321,7 +328,7 @@ class KadDHT extends EventEmitter {
waterfall([
(cb) => utils.convertBuffer(key, cb),
(id, cb) => {
const rtp = this.routingTable.closestPeers(id, c.ALPHA)
const rtp = this.routingTable.closestPeers(id, this.kBucketSize)

this._log('peers in rt: %d', rtp.length)
if (rtp.length === 0) {
Expand Down Expand Up @@ -412,7 +419,7 @@ class KadDHT extends EventEmitter {
return callback(err)
}

const tablePeers = this.routingTable.closestPeers(id, c.ALPHA)
const tablePeers = this.routingTable.closestPeers(id, this.kBucketSize)

const q = new Query(this, key, () => {
// There is no distinction between the disjoint paths,
Expand Down Expand Up @@ -442,7 +449,7 @@ class KadDHT extends EventEmitter {

waterfall([
(cb) => utils.sortClosestPeers(Array.from(res.finalSet), id, cb),
(sorted, cb) => cb(null, sorted.slice(0, c.K))
(sorted, cb) => cb(null, sorted.slice(0, this.kBucketSize))
], callback)
})
})
Expand Down Expand Up @@ -527,6 +534,7 @@ class KadDHT extends EventEmitter {
provide (key, callback) {
this._log('provide: %s', key.toBaseEncodedString())

const errors = []
waterfall([
(cb) => this.providers.addProvider(key, this.peerInfo.id, cb),
(cb) => this.getClosestPeers(key.buffer, cb),
Expand All @@ -536,10 +544,21 @@ class KadDHT extends EventEmitter {

each(peers, (peer, cb) => {
this._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String())
this.network.sendMessage(peer, msg, cb)
this.network.sendMessage(peer, msg, (err) => {
if (err) errors.push(err)
cb()
})
}, cb)
}
], (err) => callback(err))
], (err) => {
if (errors.length) {
// This should be infrequent. This means a peer we previously connected
// to failed to exchange the provide message. If getClosestPeers was an
// iterator, we could continue to pull until we announce to kBucketSize peers.
err = errcode(`Failed to provide to ${errors.length} of ${this.kBucketSize} peers`, 'ERR_SOME_PROVIDES_FAILED', { errors })
}
callback(err)
})
}

/**
Expand Down Expand Up @@ -613,7 +632,7 @@ class KadDHT extends EventEmitter {
waterfall([
(cb) => utils.convertPeerId(id, cb),
(key, cb) => {
const peers = this.routingTable.closestPeers(key, c.ALPHA)
const peers = this.routingTable.closestPeers(key, this.kBucketSize)

if (peers.length === 0) {
return cb(errcode(new Error('Peer lookup failed'), 'ERR_LOOKUP_FAILED'))
Expand Down
13 changes: 6 additions & 7 deletions src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ module.exports = (dht) => ({
}
let ids
try {
ids = dht.routingTable.closestPeers(key, dht.ncp)
ids = dht.routingTable.closestPeers(key, dht.kBucketSize)
} catch (err) {
return callback(err)
}
Expand Down Expand Up @@ -82,7 +82,7 @@ module.exports = (dht) => ({
* Try to fetch a given record by from the local datastore.
* Returns the record iff it is still valid, meaning
* - it was either authored by this node, or
* - it was receceived less than `MAX_RECORD_AGE` ago.
* - it was received less than `MAX_RECORD_AGE` ago.
*
* @param {Buffer} key
* @param {function(Error, Record)} callback
Expand Down Expand Up @@ -171,7 +171,7 @@ module.exports = (dht) => ({
*
* @param {Buffer} key
* @param {PeerId} peer
* @param {function(Error)} callback
* @param {function(Error, Array<PeerInfo>)} callback
* @returns {void}
*
* @private
Expand All @@ -185,13 +185,12 @@ module.exports = (dht) => ({

const out = msg.closerPeers
.filter((pInfo) => !dht._isSelf(pInfo.id))
.map((pInfo) => dht.peerBook.put(pInfo))

callback(null, out)
})
},
/**
* Is the given peer id the peer id?
* Is the given peer id our PeerId?
*
* @param {PeerId} other
* @returns {bool}
Expand All @@ -206,7 +205,7 @@ module.exports = (dht) => ({
*
* @param {PeerId} peer
* @param {PeerId} target
* @param {function(Error)} callback
* @param {function(Error, Message)} callback
* @returns {void}
*
* @private
Expand Down Expand Up @@ -522,7 +521,7 @@ module.exports = (dht) => ({
}
})

const peers = dht.routingTable.closestPeers(key.buffer, c.ALPHA)
const peers = dht.routingTable.closestPeers(key.buffer, dht.kBucketSize)

timeout((cb) => query.run(peers, cb), providerTimeout)((err) => {
query.stop()
Expand Down
Loading

0 comments on commit ddf80fe

Please # to comment.