diff --git a/src/constants.ts b/src/constants.ts index e9990909..12edd7f5 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -41,8 +41,11 @@ export const ALPHA = 3 // How often we look for our closest DHT neighbours export const QUERY_SELF_INTERVAL = Number(5 * minute) +// How often we look for the first set of our closest DHT neighbours +export const QUERY_SELF_INITIAL_INTERVAL = Number(Number(second)) + // How long to look for our closest DHT neighbours for -export const QUERY_SELF_TIMEOUT = Number(30 * second) +export const QUERY_SELF_TIMEOUT = Number(5 * second) // How often we try to find new peers export const TABLE_REFRESH_INTERVAL = Number(5 * minute) diff --git a/src/index.ts b/src/index.ts index 1f61c10e..4ced200b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -45,6 +45,21 @@ export interface KadDHTInit { */ querySelfInterval?: number + /** + * During startup we run the self-query at a shorter interval to ensure + * the containing node can respond to queries quickly. Set this interval + * here in ms (default: 1000) + */ + initialQuerySelfInterval?: number + + /** + * After startup by default all queries will be paused until the initial + * self-query has run and there are some peers in the routing table. + * + * Pass true here to disable this behaviour. (default: false) + */ + allowQueryWithZeroPeers?: boolean + /** * A custom protocol prefix to use (default: '/ipfs') */ diff --git a/src/kad-dht.ts b/src/kad-dht.ts index eecdac64..6f1410b4 100644 --- a/src/kad-dht.ts +++ b/src/kad-dht.ts @@ -25,6 +25,7 @@ import { validators as recordValidators } from '@libp2p/record/validators' import { selectors as recordSelectors } from '@libp2p/record/selectors' import { symbol } from '@libp2p/interface-peer-discovery' import { PROTOCOL_DHT, PROTOCOL_PREFIX, LAN_PREFIX } from './constants.js' +import pDefer from 'p-defer' export const DEFAULT_MAX_INBOUND_STREAMS = 32 export const DEFAULT_MAX_OUTBOUND_STREAMS = 64 @@ -117,10 +118,22 @@ export class KadDHT extends EventEmitter implements DHT { protocol: this.protocol, lan: this.lan }) + + // all queries should wait for the initial query-self query to run so we have + // some peers and don't force consumers to use arbitrary timeouts + const initialQuerySelfHasRun = pDefer() + + // if the user doesn't want to wait for query peers, resolve the initial + // self-query promise immediately + if (init.allowQueryWithZeroPeers === true) { + initialQuerySelfHasRun.resolve() + } + this.queryManager = new QueryManager(components, { // Number of disjoint query paths to use - This is set to `kBucketSize/2` per the S/Kademlia paper disjointPaths: Math.ceil(this.kBucketSize / 2), - lan + lan, + initialQuerySelfHasRun }) // DHT components @@ -167,7 +180,10 @@ export class KadDHT extends EventEmitter implements DHT { this.querySelf = new QuerySelf(components, { peerRouting: this.peerRouting, interval: querySelfInterval, - lan: this.lan + initialInterval: init.initialQuerySelfInterval, + lan: this.lan, + initialQuerySelfHasRun, + routingTable: this.routingTable }) // handle peers being discovered during processing of DHT messages @@ -212,7 +228,7 @@ export class KadDHT extends EventEmitter implements DHT { } async onPeerConnect (peerData: PeerInfo): Promise { - this.log('peer %p connected with protocols %s', peerData.id, peerData.protocols) + this.log('peer %p connected with protocols', peerData.id, peerData.protocols) if (this.lan) { peerData = removePublicAddresses(peerData) diff --git a/src/peer-routing/index.ts b/src/peer-routing/index.ts index c6bbcf2f..328a8956 100644 --- a/src/peer-routing/index.ts +++ b/src/peer-routing/index.ts @@ -13,9 +13,9 @@ import { Libp2pRecord } from '@libp2p/record' import { logger } from '@libp2p/logger' import { keys } from '@libp2p/crypto' import { peerIdFromKeys } from '@libp2p/peer-id' -import type { DHTRecord, DialingPeerEvent, FinalPeerEvent, QueryEvent, QueryOptions, Validators } from '@libp2p/interface-dht' +import type { DHTRecord, DialingPeerEvent, FinalPeerEvent, QueryEvent, Validators } from '@libp2p/interface-dht' import type { RoutingTable } from '../routing-table/index.js' -import type { QueryManager } from '../query/manager.js' +import type { QueryManager, QueryOptions } from '../query/manager.js' import type { Network } from '../network.js' import type { Logger } from '@libp2p/logger' import type { AbortOptions } from '@libp2p/interfaces' diff --git a/src/query-self.ts b/src/query-self.ts index 6a5473bb..6b2fae6a 100644 --- a/src/query-self.ts +++ b/src/query-self.ts @@ -1,20 +1,39 @@ import { setMaxListeners } from 'events' import take from 'it-take' import length from 'it-length' -import { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K } from './constants.js' +import { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K, QUERY_SELF_INITIAL_INTERVAL } from './constants.js' import { anySignal } from 'any-signal' import { logger, Logger } from '@libp2p/logger' import type { PeerRouting } from './peer-routing/index.js' import type { Startable } from '@libp2p/interfaces/startable' import { pipe } from 'it-pipe' import type { KadDHTComponents } from './index.js' +import type { DeferredPromise } from 'p-defer' +import type { RoutingTable } from './routing-table/index.js' export interface QuerySelfInit { lan: boolean peerRouting: PeerRouting + routingTable: RoutingTable count?: number interval?: number + initialInterval?: number queryTimeout?: number + initialQuerySelfHasRun: DeferredPromise +} + +function debounce (func: () => void, wait: number): () => void { + let timeout: ReturnType | undefined + + return function () { + const later = function (): void { + timeout = undefined + func() + } + + clearTimeout(timeout) + timeout = setTimeout(later, wait) + } } /** @@ -24,40 +43,51 @@ export class QuerySelf implements Startable { private readonly log: Logger private readonly components: KadDHTComponents private readonly peerRouting: PeerRouting + private readonly routingTable: RoutingTable private readonly count: number private readonly interval: number + private readonly initialInterval: number private readonly queryTimeout: number + private started: boolean private running: boolean private timeoutId?: NodeJS.Timer private controller?: AbortController + private initialQuerySelfHasRun?: DeferredPromise constructor (components: KadDHTComponents, init: QuerySelfInit) { - const { peerRouting, lan, count, interval, queryTimeout } = init + const { peerRouting, lan, count, interval, queryTimeout, routingTable } = init this.components = components this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:query-self`) this.running = false + this.started = false this.peerRouting = peerRouting + this.routingTable = routingTable this.count = count ?? K this.interval = interval ?? QUERY_SELF_INTERVAL + this.initialInterval = init.initialInterval ?? QUERY_SELF_INITIAL_INTERVAL this.queryTimeout = queryTimeout ?? QUERY_SELF_TIMEOUT + this.initialQuerySelfHasRun = init.initialQuerySelfHasRun + + this.querySelf = debounce(this.querySelf.bind(this), 100) } isStarted (): boolean { - return this.running + return this.started } async start (): Promise { - if (this.running) { + if (this.started) { return } - this.running = true - this._querySelf() + this.started = true + clearTimeout(this.timeoutId) + this.timeoutId = setTimeout(this.querySelf.bind(this), this.initialInterval) } async stop (): Promise { - this.running = false + this.started = false if (this.timeoutId != null) { clearTimeout(this.timeoutId) @@ -68,36 +98,76 @@ export class QuerySelf implements Startable { } } - _querySelf (): void { - Promise.resolve().then(async () => { - this.controller = new AbortController() - const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)]) + querySelf (): void { + if (!this.started) { + this.log('skip self-query because we are not started') + return + } - // this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged - try { - if (setMaxListeners != null) { - setMaxListeners(Infinity, signal) - } - } catch {} // fails on node < 15.4 - - try { - const found = await pipe( - this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), { - signal - }), - (source) => take(source, this.count), - async (source) => await length(source) - ) - - this.log('query ran successfully - found %d peers', found) - } catch (err: any) { - this.log('query error', err) - } finally { - this.timeoutId = setTimeout(this._querySelf.bind(this), this.interval) - signal.clear() + if (this.running) { + this.log('skip self-query because we are already running, will run again in %dms', this.interval) + return + } + + if (this.routingTable.size === 0) { + let nextInterval = this.interval + + if (this.initialQuerySelfHasRun != null) { + // if we've not yet run the first self query, shorten the interval until we try again + nextInterval = this.initialInterval } - }).catch(err => { - this.log('query error', err) - }) + + this.log('skip self-query because routing table is empty, will run again in %dms', nextInterval) + clearTimeout(this.timeoutId) + this.timeoutId = setTimeout(this.querySelf.bind(this), nextInterval) + return + } + + this.running = true + + Promise.resolve() + .then(async () => { + this.controller = new AbortController() + const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)]) + + // this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged + try { + if (setMaxListeners != null) { + setMaxListeners(Infinity, signal) + } + } catch {} // fails on node < 15.4 + + try { + this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout) + + const found = await pipe( + this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), { + signal, + isSelfQuery: true + }), + (source) => take(source, this.count), + async (source) => await length(source) + ) + + this.log('self-query ran successfully - found %d peers', found) + + if (this.initialQuerySelfHasRun != null) { + this.initialQuerySelfHasRun.resolve() + this.initialQuerySelfHasRun = undefined + } + } catch (err: any) { + this.log.error('self-query error', err) + } finally { + signal.clear() + } + }).catch(err => { + this.log('self-query error', err) + }).finally(() => { + this.running = false + + this.log('running self-query again in %dms', this.interval) + clearTimeout(this.timeoutId) + this.timeoutId = setTimeout(this.querySelf.bind(this), this.interval) + }) } } diff --git a/src/query/manager.ts b/src/query/manager.ts index 24f05c94..799bcbd3 100644 --- a/src/query/manager.ts +++ b/src/query/manager.ts @@ -11,9 +11,12 @@ import { logger } from '@libp2p/logger' import type { PeerId } from '@libp2p/interface-peer-id' import type { Startable } from '@libp2p/interfaces/startable' import type { QueryFunc } from './types.js' -import type { QueryEvent, QueryOptions } from '@libp2p/interface-dht' +import type { QueryEvent } from '@libp2p/interface-dht' import { PeerSet } from '@libp2p/peer-collections' import type { Metric, Metrics } from '@libp2p/interface-metrics' +import type { DeferredPromise } from 'p-defer' +import type { AbortOptions } from '@libp2p/interfaces' +import { AbortError } from '@libp2p/interfaces/errors' export interface CleanUpEvents { 'cleanup': CustomEvent @@ -23,6 +26,7 @@ export interface QueryManagerInit { lan?: boolean disjointPaths?: number alpha?: number + initialQuerySelfHasRun: DeferredPromise } export interface QueryManagerComponents { @@ -30,6 +34,11 @@ export interface QueryManagerComponents { metrics?: Metrics } +export interface QueryOptions extends AbortOptions { + queryFuncTimeout?: number + isSelfQuery?: boolean +} + /** * Keeps track of all running queries */ @@ -46,6 +55,8 @@ export class QueryManager implements Startable { queryTime: Metric } + private initialQuerySelfHasRun?: DeferredPromise + constructor (components: QueryManagerComponents, init: QueryManagerInit) { const { lan = false, disjointPaths = K, alpha = ALPHA } = init @@ -55,6 +66,7 @@ export class QueryManager implements Startable { this.alpha = alpha ?? ALPHA this.lan = lan this.queries = 0 + this.initialQuerySelfHasRun = init.initialQuerySelfHasRun // allow us to stop queries on shut down this.shutDownController = new AbortController() @@ -131,6 +143,21 @@ export class QueryManager implements Startable { const cleanUp = new EventEmitter() try { + if (options.isSelfQuery !== true && this.initialQuerySelfHasRun != null) { + log('waiting for initial query-self query before continuing') + + await Promise.race([ + new Promise((resolve, reject) => { + signal.addEventListener('abort', () => { + reject(new AbortError('Query was aborted before self-query ran')) + }) + }), + this.initialQuerySelfHasRun.promise + ]) + + this.initialQuerySelfHasRun = undefined + } + log('query:start') this.queries++ this.metrics?.runningQueries.update(this.queries) diff --git a/test/kad-dht.spec.ts b/test/kad-dht.spec.ts index 3be7f953..f9a49c44 100644 --- a/test/kad-dht.spec.ts +++ b/test/kad-dht.spec.ts @@ -343,13 +343,23 @@ describe('KadDHT', () => { expect(resA).to.have.property('value').that.equalBytes(valueA) expect(resB).to.have.property('value').that.equalBytes(valueA) - expect(dhtASpy.callCount).to.eql(2) + let foundGetValue = false + let foundPutValue = false - expect(dhtASpy.getCall(0).args[0].equals(dhtB.components.peerId)).to.be.true() // query B - expect(dhtASpy.getCall(0).args[1].type).to.equal('GET_VALUE') // query B + for (const call of dhtASpy.getCalls()) { + if (call.args[0].equals(dhtB.components.peerId) && call.args[1].type === 'GET_VALUE') { + // query B + foundGetValue = true + } + + if (call.args[0].equals(dhtB.components.peerId) && call.args[1].type === 'PUT_VALUE') { + // update B + foundPutValue = true + } + } - expect(dhtASpy.getCall(1).args[0].equals(dhtB.components.peerId)).to.be.true() // update B - expect(dhtASpy.getCall(1).args[1].type).to.equal('PUT_VALUE') // update B + expect(foundGetValue).to.be.true('did not get value from dhtB') + expect(foundPutValue).to.be.true('did not update value on dhtB') }) it('layered get', async function () { @@ -676,7 +686,7 @@ describe('KadDHT', () => { this.timeout(240 * 1000) // Create 101 nodes - const nDHTs = 100 + const nDHTs = 101 const dhts = await Promise.all( new Array(nDHTs).fill(0).map(async () => await tdht.spawn()) diff --git a/test/query.spec.ts b/test/query.spec.ts index 2f01700e..55468f5e 100644 --- a/test/query.spec.ts +++ b/test/query.spec.ts @@ -3,7 +3,7 @@ import { expect } from 'aegir/chai' import delay from 'delay' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { QueryManager } from '../src/query/manager.js' +import { QueryManager, QueryManagerInit } from '../src/query/manager.js' import { createPeerId, createPeerIds } from './utils/create-peer-id.js' import all from 'it-all' import drain from 'it-drain' @@ -18,6 +18,7 @@ import { EventTypes, QueryEvent } from '@libp2p/interface-dht' import { MESSAGE_TYPE } from '../src/message/index.js' import type { QueryFunc } from '../src/query/types.js' import { convertBuffer } from '../src/utils.js' +import pDefer from 'p-defer' interface TopologyEntry { delay?: number @@ -32,6 +33,16 @@ type Topology = Record +const defaultInit = (): QueryManagerInit => { + const init: QueryManagerInit = { + initialQuerySelfHasRun: pDefer() + } + + init.initialQuerySelfHasRun.resolve() + + return init +} + describe('QueryManager', () => { let ourPeerId: PeerId let peers: PeerId[] @@ -104,6 +115,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1 }) @@ -115,6 +127,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1 }) @@ -129,6 +142,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1 }) await manager.start() @@ -161,6 +175,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1, alpha: 1 }) @@ -211,6 +226,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1, alpha: 1 }) @@ -251,6 +267,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 2, alpha: 1 }) @@ -295,6 +312,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 2, alpha: 2 }) @@ -344,6 +362,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 10 }) await manager.start() @@ -381,6 +400,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 10 }) await manager.start() @@ -400,6 +420,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1, alpha: 1 }) @@ -450,6 +471,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 20, alpha: 1 }) @@ -484,6 +506,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1, alpha: 1 }) @@ -514,6 +537,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 3 }) await manager.start() @@ -547,6 +571,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1, alpha: 1 }) @@ -598,6 +623,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 2 }) await manager.start() @@ -637,6 +663,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 2 }) await manager.start() @@ -669,11 +696,93 @@ describe('QueryManager', () => { await manager.stop() }) + it('should allow the self-query query to run', async () => { + const manager = new QueryManager({ + peerId: ourPeerId + }, { + initialQuerySelfHasRun: pDefer() + }) + await manager.start() + + const queryFunc: QueryFunc = async function * ({ peer }) { // eslint-disable-line require-await + // yield query result + yield valueEvent({ + from: peer, + value: uint8ArrayFromString('cool') + }) + } + + const results = await all(manager.run(key, [peers[7]], queryFunc, { + // this bypasses awaiting on the initialQuerySelfHasRun deferred promise + isSelfQuery: true + })) + + // should have the result + expect(results).to.containSubset([{ + value: uint8ArrayFromString('cool') + }]) + + await manager.stop() + }) + + it('should wait for the self-query query to run before running other queries', async () => { + const initialQuerySelfHasRun = pDefer() + + const manager = new QueryManager({ + peerId: ourPeerId + }, { + initialQuerySelfHasRun, + alpha: 2 + }) + await manager.start() + + let regularQueryTimeStarted: number = 0 + let selfQueryTimeStarted: number = Infinity + + // run a regular query and the self query together + await Promise.all([ + all(manager.run(key, [peers[7]], async function * ({ peer }) { // eslint-disable-line require-await + regularQueryTimeStarted = Date.now() + + // yield query result + yield valueEvent({ + from: peer, + value: uint8ArrayFromString('cool') + }) + })), + all(manager.run(key, [peers[7]], async function * ({ peer }) { // eslint-disable-line require-await + selfQueryTimeStarted = Date.now() + + // make sure we take enough time so that the `regularQuery` time diff is big enough to measure + await delay(100) + + // yield query result + yield valueEvent({ + from: peer, + value: uint8ArrayFromString('it me') + }) + + // normally done by the QuerySelf component + initialQuerySelfHasRun.resolve() + }, { + // this bypasses awaiting on the initialQuerySelfHasRun deferred promise + isSelfQuery: true + })) + ]) + + // should have started the regular query after the self query finished + expect(regularQueryTimeStarted).to.be.greaterThan(selfQueryTimeStarted) + + await manager.stop() + }) + it('should end paths when they have no closer peers to those already queried', async () => { const manager = new QueryManager({ peerId: ourPeerId }, { - disjointPaths: 1, alpha: 1 + ...defaultInit(), + disjointPaths: 1, + alpha: 1 }) await manager.start() diff --git a/test/utils/test-dht.ts b/test/utils/test-dht.ts index 4466e4d6..c022b659 100644 --- a/test/utils/test-dht.ts +++ b/test/utils/test-dht.ts @@ -77,6 +77,8 @@ export class TestDHT { v: () => 0 }, querySelfInterval: 600000, + initialQuerySelfInterval: 600000, + allowQueryWithZeroPeers: true, ...options }