From ce9364159c5977f1ad79a457d4874ccf27849356 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 19 Apr 2023 12:26:59 +0200 Subject: [PATCH 1/3] fix: wait for self-query to have run before running queries In order to run a query we need DHT peers. The most common way to find these is to run a self-query which happens after startup. There's no way for the users to know when this has happened so they end up putting arbitrary waits into the code so their queries have a better chance of succeeding. Instead just delay all queries until the self-query has run and found some peers, then continue with the queries. --- src/kad-dht.ts | 12 +++- src/peer-routing/index.ts | 4 +- src/query-self.ts | 12 +++- src/query/manager.ts | 29 +++++++++- test/query.spec.ts | 113 +++++++++++++++++++++++++++++++++++++- 5 files changed, 162 insertions(+), 8 deletions(-) diff --git a/src/kad-dht.ts b/src/kad-dht.ts index 2660e462..401315e8 100644 --- a/src/kad-dht.ts +++ b/src/kad-dht.ts @@ -25,6 +25,7 @@ import { validators as recordValidators } from '@libp2p/record/validators' import { selectors as recordSelectors } from '@libp2p/record/selectors' import { symbol } from '@libp2p/interface-peer-discovery' import { PROTOCOL_DHT, PROTOCOL_PREFIX, LAN_PREFIX } from './constants.js' +import pDefer from 'p-defer' export const DEFAULT_MAX_INBOUND_STREAMS = 32 export const DEFAULT_MAX_OUTBOUND_STREAMS = 64 @@ -117,10 +118,16 @@ export class KadDHT extends EventEmitter implements DHT { protocol: this.protocol, lan: this.lan }) + + // all queries should wait for the intial query-self query to run so we have + // some peers and don't force consumers to use arbitrary timeouts + const initialQuerySelfHasRun = pDefer() + this.queryManager = new QueryManager(components, { // Number of disjoint query paths to use - This is set to `kBucketSize/2` per the S/Kademlia paper disjointPaths: Math.ceil(this.kBucketSize / 2), - lan + lan, + initialQuerySelfHasRun }) // DHT components @@ -167,7 +174,8 @@ export class KadDHT extends EventEmitter implements DHT { this.querySelf = new QuerySelf(components, { peerRouting: this.peerRouting, interval: querySelfInterval, - lan: this.lan + lan: this.lan, + initialQuerySelfHasRun }) // handle peers being discovered during processing of DHT messages diff --git a/src/peer-routing/index.ts b/src/peer-routing/index.ts index 2a8545ff..6df98392 100644 --- a/src/peer-routing/index.ts +++ b/src/peer-routing/index.ts @@ -13,9 +13,9 @@ import { Libp2pRecord } from '@libp2p/record' import { logger } from '@libp2p/logger' import { keys } from '@libp2p/crypto' import { peerIdFromKeys } from '@libp2p/peer-id' -import type { DHTRecord, DialingPeerEvent, FinalPeerEvent, QueryEvent, QueryOptions, Validators } from '@libp2p/interface-dht' +import type { DHTRecord, DialingPeerEvent, FinalPeerEvent, QueryEvent, Validators } from '@libp2p/interface-dht' import type { RoutingTable } from '../routing-table/index.js' -import type { QueryManager } from '../query/manager.js' +import type { QueryManager, QueryOptions } from '../query/manager.js' import type { Network } from '../network.js' import type { Logger } from '@libp2p/logger' import type { AbortOptions } from '@libp2p/interfaces' diff --git a/src/query-self.ts b/src/query-self.ts index 6a5473bb..6b4c8cab 100644 --- a/src/query-self.ts +++ b/src/query-self.ts @@ -8,6 +8,7 @@ import type { PeerRouting } from './peer-routing/index.js' import type { Startable } from '@libp2p/interfaces/startable' import { pipe } from 'it-pipe' import type { KadDHTComponents } from './index.js' +import type { DeferredPromise } from 'p-defer' export interface QuerySelfInit { lan: boolean @@ -15,6 +16,7 @@ export interface QuerySelfInit { count?: number interval?: number queryTimeout?: number + initialQuerySelfHasRun: DeferredPromise } /** @@ -30,6 +32,7 @@ export class QuerySelf implements Startable { private running: boolean private timeoutId?: NodeJS.Timer private controller?: AbortController + private initialQuerySelfHasRun?: DeferredPromise constructor (components: KadDHTComponents, init: QuerySelfInit) { const { peerRouting, lan, count, interval, queryTimeout } = init @@ -41,6 +44,7 @@ export class QuerySelf implements Startable { this.count = count ?? K this.interval = interval ?? QUERY_SELF_INTERVAL this.queryTimeout = queryTimeout ?? QUERY_SELF_TIMEOUT + this.initialQuerySelfHasRun = init.initialQuerySelfHasRun } isStarted (): boolean { @@ -83,13 +87,19 @@ export class QuerySelf implements Startable { try { const found = await pipe( this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), { - signal + signal, + isSelfQuery: true }), (source) => take(source, this.count), async (source) => await length(source) ) this.log('query ran successfully - found %d peers', found) + + if (this.initialQuerySelfHasRun != null) { + this.initialQuerySelfHasRun.resolve() + this.initialQuerySelfHasRun = undefined + } } catch (err: any) { this.log('query error', err) } finally { diff --git a/src/query/manager.ts b/src/query/manager.ts index 24f05c94..799bcbd3 100644 --- a/src/query/manager.ts +++ b/src/query/manager.ts @@ -11,9 +11,12 @@ import { logger } from '@libp2p/logger' import type { PeerId } from '@libp2p/interface-peer-id' import type { Startable } from '@libp2p/interfaces/startable' import type { QueryFunc } from './types.js' -import type { QueryEvent, QueryOptions } from '@libp2p/interface-dht' +import type { QueryEvent } from '@libp2p/interface-dht' import { PeerSet } from '@libp2p/peer-collections' import type { Metric, Metrics } from '@libp2p/interface-metrics' +import type { DeferredPromise } from 'p-defer' +import type { AbortOptions } from '@libp2p/interfaces' +import { AbortError } from '@libp2p/interfaces/errors' export interface CleanUpEvents { 'cleanup': CustomEvent @@ -23,6 +26,7 @@ export interface QueryManagerInit { lan?: boolean disjointPaths?: number alpha?: number + initialQuerySelfHasRun: DeferredPromise } export interface QueryManagerComponents { @@ -30,6 +34,11 @@ export interface QueryManagerComponents { metrics?: Metrics } +export interface QueryOptions extends AbortOptions { + queryFuncTimeout?: number + isSelfQuery?: boolean +} + /** * Keeps track of all running queries */ @@ -46,6 +55,8 @@ export class QueryManager implements Startable { queryTime: Metric } + private initialQuerySelfHasRun?: DeferredPromise + constructor (components: QueryManagerComponents, init: QueryManagerInit) { const { lan = false, disjointPaths = K, alpha = ALPHA } = init @@ -55,6 +66,7 @@ export class QueryManager implements Startable { this.alpha = alpha ?? ALPHA this.lan = lan this.queries = 0 + this.initialQuerySelfHasRun = init.initialQuerySelfHasRun // allow us to stop queries on shut down this.shutDownController = new AbortController() @@ -131,6 +143,21 @@ export class QueryManager implements Startable { const cleanUp = new EventEmitter() try { + if (options.isSelfQuery !== true && this.initialQuerySelfHasRun != null) { + log('waiting for initial query-self query before continuing') + + await Promise.race([ + new Promise((resolve, reject) => { + signal.addEventListener('abort', () => { + reject(new AbortError('Query was aborted before self-query ran')) + }) + }), + this.initialQuerySelfHasRun.promise + ]) + + this.initialQuerySelfHasRun = undefined + } + log('query:start') this.queries++ this.metrics?.runningQueries.update(this.queries) diff --git a/test/query.spec.ts b/test/query.spec.ts index 0ed51766..898f4859 100644 --- a/test/query.spec.ts +++ b/test/query.spec.ts @@ -3,7 +3,7 @@ import { expect } from 'aegir/chai' import delay from 'delay' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { QueryManager } from '../src/query/manager.js' +import { QueryManager, QueryManagerInit } from '../src/query/manager.js' import { createPeerId, createPeerIds } from './utils/create-peer-id.js' import all from 'it-all' import drain from 'it-drain' @@ -18,6 +18,7 @@ import { EventTypes, QueryEvent } from '@libp2p/interface-dht' import { MESSAGE_TYPE } from '../src/message/index.js' import type { QueryFunc } from '../src/query/types.js' import { convertBuffer } from '../src/utils.js' +import pDefer from 'p-defer' interface TopologyEntry { delay?: number @@ -32,6 +33,16 @@ type Topology = Record +const defaultInit = (): QueryManagerInit => { + const init: QueryManagerInit = { + initialQuerySelfHasRun: pDefer() + } + + init.initialQuerySelfHasRun.resolve() + + return init +} + describe('QueryManager', () => { let ourPeerId: PeerId let peers: PeerId[] @@ -104,6 +115,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1 }) @@ -115,6 +127,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1 }) @@ -129,6 +142,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1 }) await manager.start() @@ -161,6 +175,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1, alpha: 1 }) @@ -211,6 +226,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1, alpha: 1 }) @@ -251,6 +267,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 2, alpha: 1 }) @@ -295,6 +312,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 2, alpha: 2 }) @@ -344,6 +362,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 10 }) await manager.start() @@ -381,6 +400,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 10 }) await manager.start() @@ -400,6 +420,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1, alpha: 1 }) @@ -450,6 +471,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 20, alpha: 1 }) @@ -484,6 +506,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1, alpha: 1 }) @@ -514,6 +537,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 3 }) await manager.start() @@ -547,6 +571,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 1, alpha: 1 }) @@ -598,6 +623,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 2 }) await manager.start() @@ -637,6 +663,7 @@ describe('QueryManager', () => { const manager = new QueryManager({ peerId: ourPeerId }, { + ...defaultInit(), disjointPaths: 2 }) await manager.start() @@ -669,11 +696,93 @@ describe('QueryManager', () => { await manager.stop() }) + it('should allow the self-query query to run', async () => { + const manager = new QueryManager({ + peerId: ourPeerId + }, { + initialQuerySelfHasRun: pDefer() + }) + await manager.start() + + const queryFunc: QueryFunc = async function * ({ peer }) { // eslint-disable-line require-await + // yield query result + yield valueEvent({ + from: peer, + value: uint8ArrayFromString('cool') + }) + } + + const results = await all(manager.run(key, [peers[7]], queryFunc, { + // this bypasses awaiting on the initialQuerySelfHasRun deferred promise + isSelfQuery: true + })) + + // should have the result + expect(results).to.containSubset([{ + value: uint8ArrayFromString('cool') + }]) + + await manager.stop() + }) + + it('should wait for the self-query query to run before running other queries', async () => { + const initialQuerySelfHasRun = pDefer() + + const manager = new QueryManager({ + peerId: ourPeerId + }, { + initialQuerySelfHasRun, + alpha: 2 + }) + await manager.start() + + let regularQueryTimeStarted: number = 0 + let selfQueryTimeStarted: number = Infinity + + // run a regular query and the self query together + await Promise.all([ + all(manager.run(key, [peers[7]], async function * ({ peer }) { // eslint-disable-line require-await + regularQueryTimeStarted = Date.now() + + // yield query result + yield valueEvent({ + from: peer, + value: uint8ArrayFromString('cool') + }) + })), + all(manager.run(key, [peers[7]], async function * ({ peer }) { // eslint-disable-line require-await + selfQueryTimeStarted = Date.now() + + // make sure we take enough time so that the `regularQuery` time diff is big enough to measure + await delay(100) + + // yield query result + yield valueEvent({ + from: peer, + value: uint8ArrayFromString('it me') + }) + + // normally done by the QuerySelf component + initialQuerySelfHasRun.resolve() + }, { + // this bypasses awaiting on the initialQuerySelfHasRun deferred promise + isSelfQuery: true + })) + ]) + + // should have started the regular query after the self query finished + expect(regularQueryTimeStarted).to.be.greaterThan(selfQueryTimeStarted) + + await manager.stop() + }) + it.skip('should end paths when they have no closer peers to those already queried', async () => { const manager = new QueryManager({ peerId: ourPeerId }, { - disjointPaths: 1, alpha: 1 + ...defaultInit(), + disjointPaths: 1, + alpha: 1 }) await manager.start() From 8be1fa809b8ca09fa70b5549f5bf096cba468473 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 4 May 2023 14:03:58 +0100 Subject: [PATCH 2/3] chore: allow disabling self-query wait --- src/constants.ts | 5 +- src/index.ts | 8 +++ src/kad-dht.ts | 18 +++++- src/query-self.ts | 144 +++++++++++++++++++++++++++++------------ test/kad-dht.spec.ts | 20 ++++-- test/utils/test-dht.ts | 1 + 6 files changed, 145 insertions(+), 51 deletions(-) diff --git a/src/constants.ts b/src/constants.ts index e9990909..12edd7f5 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -41,8 +41,11 @@ export const ALPHA = 3 // How often we look for our closest DHT neighbours export const QUERY_SELF_INTERVAL = Number(5 * minute) +// How often we look for the first set of our closest DHT neighbours +export const QUERY_SELF_INITIAL_INTERVAL = Number(Number(second)) + // How long to look for our closest DHT neighbours for -export const QUERY_SELF_TIMEOUT = Number(30 * second) +export const QUERY_SELF_TIMEOUT = Number(5 * second) // How often we try to find new peers export const TABLE_REFRESH_INTERVAL = Number(5 * minute) diff --git a/src/index.ts b/src/index.ts index 1f61c10e..56545f17 100644 --- a/src/index.ts +++ b/src/index.ts @@ -45,6 +45,14 @@ export interface KadDHTInit { */ querySelfInterval?: number + /** + * After startup by default all queries will be paused until the initial + * self-query has run and there are some peers in the routing table. + * + * Pass true here to disable this behaviour. (default: false) + */ + allowQueryWithZeroPeers?: boolean + /** * A custom protocol prefix to use (default: '/ipfs') */ diff --git a/src/kad-dht.ts b/src/kad-dht.ts index f7ea9966..5381a831 100644 --- a/src/kad-dht.ts +++ b/src/kad-dht.ts @@ -119,10 +119,16 @@ export class KadDHT extends EventEmitter implements DHT { lan: this.lan }) - // all queries should wait for the intial query-self query to run so we have + // all queries should wait for the initial query-self query to run so we have // some peers and don't force consumers to use arbitrary timeouts const initialQuerySelfHasRun = pDefer() + // if the user doesn't want to wait for query peers, resolve the initial + // self-query promise immediately + if (init.allowQueryWithZeroPeers === true) { + initialQuerySelfHasRun.resolve() + } + this.queryManager = new QueryManager(components, { // Number of disjoint query paths to use - This is set to `kBucketSize/2` per the S/Kademlia paper disjointPaths: Math.ceil(this.kBucketSize / 2), @@ -175,7 +181,8 @@ export class KadDHT extends EventEmitter implements DHT { peerRouting: this.peerRouting, interval: querySelfInterval, lan: this.lan, - initialQuerySelfHasRun + initialQuerySelfHasRun, + routingTable: this.routingTable }) // handle peers being discovered during processing of DHT messages @@ -220,7 +227,7 @@ export class KadDHT extends EventEmitter implements DHT { } async onPeerConnect (peerData: PeerInfo): Promise { - this.log('peer %p connected with protocols %s', peerData.id, peerData.protocols) + this.log('peer %p connected with protocols', peerData.id, peerData.protocols) if (this.lan) { peerData = removePublicAddresses(peerData) @@ -235,6 +242,11 @@ export class KadDHT extends EventEmitter implements DHT { try { await this.routingTable.add(peerData.id) + + if (this.routingTable.size < this.kBucketSize) { + // not enough peers yet, run debounced self-query with new peer + this.querySelf.querySelf() + } } catch (err: any) { this.log.error('could not add %p to routing table', peerData.id, err) } diff --git a/src/query-self.ts b/src/query-self.ts index 6b4c8cab..6b2fae6a 100644 --- a/src/query-self.ts +++ b/src/query-self.ts @@ -1,7 +1,7 @@ import { setMaxListeners } from 'events' import take from 'it-take' import length from 'it-length' -import { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K } from './constants.js' +import { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K, QUERY_SELF_INITIAL_INTERVAL } from './constants.js' import { anySignal } from 'any-signal' import { logger, Logger } from '@libp2p/logger' import type { PeerRouting } from './peer-routing/index.js' @@ -9,16 +9,33 @@ import type { Startable } from '@libp2p/interfaces/startable' import { pipe } from 'it-pipe' import type { KadDHTComponents } from './index.js' import type { DeferredPromise } from 'p-defer' +import type { RoutingTable } from './routing-table/index.js' export interface QuerySelfInit { lan: boolean peerRouting: PeerRouting + routingTable: RoutingTable count?: number interval?: number + initialInterval?: number queryTimeout?: number initialQuerySelfHasRun: DeferredPromise } +function debounce (func: () => void, wait: number): () => void { + let timeout: ReturnType | undefined + + return function () { + const later = function (): void { + timeout = undefined + func() + } + + clearTimeout(timeout) + timeout = setTimeout(later, wait) + } +} + /** * Receives notifications of new peers joining the network that support the DHT protocol */ @@ -26,42 +43,51 @@ export class QuerySelf implements Startable { private readonly log: Logger private readonly components: KadDHTComponents private readonly peerRouting: PeerRouting + private readonly routingTable: RoutingTable private readonly count: number private readonly interval: number + private readonly initialInterval: number private readonly queryTimeout: number + private started: boolean private running: boolean private timeoutId?: NodeJS.Timer private controller?: AbortController private initialQuerySelfHasRun?: DeferredPromise constructor (components: KadDHTComponents, init: QuerySelfInit) { - const { peerRouting, lan, count, interval, queryTimeout } = init + const { peerRouting, lan, count, interval, queryTimeout, routingTable } = init this.components = components this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:query-self`) this.running = false + this.started = false this.peerRouting = peerRouting + this.routingTable = routingTable this.count = count ?? K this.interval = interval ?? QUERY_SELF_INTERVAL + this.initialInterval = init.initialInterval ?? QUERY_SELF_INITIAL_INTERVAL this.queryTimeout = queryTimeout ?? QUERY_SELF_TIMEOUT this.initialQuerySelfHasRun = init.initialQuerySelfHasRun + + this.querySelf = debounce(this.querySelf.bind(this), 100) } isStarted (): boolean { - return this.running + return this.started } async start (): Promise { - if (this.running) { + if (this.started) { return } - this.running = true - this._querySelf() + this.started = true + clearTimeout(this.timeoutId) + this.timeoutId = setTimeout(this.querySelf.bind(this), this.initialInterval) } async stop (): Promise { - this.running = false + this.started = false if (this.timeoutId != null) { clearTimeout(this.timeoutId) @@ -72,42 +98,76 @@ export class QuerySelf implements Startable { } } - _querySelf (): void { - Promise.resolve().then(async () => { - this.controller = new AbortController() - const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)]) + querySelf (): void { + if (!this.started) { + this.log('skip self-query because we are not started') + return + } - // this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged - try { - if (setMaxListeners != null) { - setMaxListeners(Infinity, signal) - } - } catch {} // fails on node < 15.4 - - try { - const found = await pipe( - this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), { - signal, - isSelfQuery: true - }), - (source) => take(source, this.count), - async (source) => await length(source) - ) - - this.log('query ran successfully - found %d peers', found) - - if (this.initialQuerySelfHasRun != null) { - this.initialQuerySelfHasRun.resolve() - this.initialQuerySelfHasRun = undefined - } - } catch (err: any) { - this.log('query error', err) - } finally { - this.timeoutId = setTimeout(this._querySelf.bind(this), this.interval) - signal.clear() + if (this.running) { + this.log('skip self-query because we are already running, will run again in %dms', this.interval) + return + } + + if (this.routingTable.size === 0) { + let nextInterval = this.interval + + if (this.initialQuerySelfHasRun != null) { + // if we've not yet run the first self query, shorten the interval until we try again + nextInterval = this.initialInterval } - }).catch(err => { - this.log('query error', err) - }) + + this.log('skip self-query because routing table is empty, will run again in %dms', nextInterval) + clearTimeout(this.timeoutId) + this.timeoutId = setTimeout(this.querySelf.bind(this), nextInterval) + return + } + + this.running = true + + Promise.resolve() + .then(async () => { + this.controller = new AbortController() + const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)]) + + // this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged + try { + if (setMaxListeners != null) { + setMaxListeners(Infinity, signal) + } + } catch {} // fails on node < 15.4 + + try { + this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout) + + const found = await pipe( + this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), { + signal, + isSelfQuery: true + }), + (source) => take(source, this.count), + async (source) => await length(source) + ) + + this.log('self-query ran successfully - found %d peers', found) + + if (this.initialQuerySelfHasRun != null) { + this.initialQuerySelfHasRun.resolve() + this.initialQuerySelfHasRun = undefined + } + } catch (err: any) { + this.log.error('self-query error', err) + } finally { + signal.clear() + } + }).catch(err => { + this.log('self-query error', err) + }).finally(() => { + this.running = false + + this.log('running self-query again in %dms', this.interval) + clearTimeout(this.timeoutId) + this.timeoutId = setTimeout(this.querySelf.bind(this), this.interval) + }) } } diff --git a/test/kad-dht.spec.ts b/test/kad-dht.spec.ts index 3be7f953..863682cb 100644 --- a/test/kad-dht.spec.ts +++ b/test/kad-dht.spec.ts @@ -343,13 +343,23 @@ describe('KadDHT', () => { expect(resA).to.have.property('value').that.equalBytes(valueA) expect(resB).to.have.property('value').that.equalBytes(valueA) - expect(dhtASpy.callCount).to.eql(2) + let foundGetValue = false + let foundPutValue = false - expect(dhtASpy.getCall(0).args[0].equals(dhtB.components.peerId)).to.be.true() // query B - expect(dhtASpy.getCall(0).args[1].type).to.equal('GET_VALUE') // query B + for (const call of dhtASpy.getCalls()) { + if (call.args[0].equals(dhtB.components.peerId) && call.args[1].type === 'GET_VALUE') { + // query B + foundGetValue = true + } + + if (call.args[0].equals(dhtB.components.peerId) && call.args[1].type === 'PUT_VALUE') { + // update B + foundPutValue = true + } + } - expect(dhtASpy.getCall(1).args[0].equals(dhtB.components.peerId)).to.be.true() // update B - expect(dhtASpy.getCall(1).args[1].type).to.equal('PUT_VALUE') // update B + expect(foundGetValue).to.be.true('did not get value from dhtB') + expect(foundPutValue).to.be.true('did not update value on dhtB') }) it('layered get', async function () { diff --git a/test/utils/test-dht.ts b/test/utils/test-dht.ts index 4466e4d6..f7f67bd5 100644 --- a/test/utils/test-dht.ts +++ b/test/utils/test-dht.ts @@ -77,6 +77,7 @@ export class TestDHT { v: () => 0 }, querySelfInterval: 600000, + allowQueryWithZeroPeers: true, ...options } From d4414dde1ae0ee0cec016ca2b1d9195bcf1dc86f Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 4 May 2023 14:23:23 +0100 Subject: [PATCH 3/3] chore: fix tests --- src/index.ts | 7 +++++++ src/kad-dht.ts | 6 +----- test/kad-dht.spec.ts | 2 +- test/utils/test-dht.ts | 1 + 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/index.ts b/src/index.ts index 56545f17..4ced200b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -45,6 +45,13 @@ export interface KadDHTInit { */ querySelfInterval?: number + /** + * During startup we run the self-query at a shorter interval to ensure + * the containing node can respond to queries quickly. Set this interval + * here in ms (default: 1000) + */ + initialQuerySelfInterval?: number + /** * After startup by default all queries will be paused until the initial * self-query has run and there are some peers in the routing table. diff --git a/src/kad-dht.ts b/src/kad-dht.ts index 5381a831..6f1410b4 100644 --- a/src/kad-dht.ts +++ b/src/kad-dht.ts @@ -180,6 +180,7 @@ export class KadDHT extends EventEmitter implements DHT { this.querySelf = new QuerySelf(components, { peerRouting: this.peerRouting, interval: querySelfInterval, + initialInterval: init.initialQuerySelfInterval, lan: this.lan, initialQuerySelfHasRun, routingTable: this.routingTable @@ -242,11 +243,6 @@ export class KadDHT extends EventEmitter implements DHT { try { await this.routingTable.add(peerData.id) - - if (this.routingTable.size < this.kBucketSize) { - // not enough peers yet, run debounced self-query with new peer - this.querySelf.querySelf() - } } catch (err: any) { this.log.error('could not add %p to routing table', peerData.id, err) } diff --git a/test/kad-dht.spec.ts b/test/kad-dht.spec.ts index 863682cb..f9a49c44 100644 --- a/test/kad-dht.spec.ts +++ b/test/kad-dht.spec.ts @@ -686,7 +686,7 @@ describe('KadDHT', () => { this.timeout(240 * 1000) // Create 101 nodes - const nDHTs = 100 + const nDHTs = 101 const dhts = await Promise.all( new Array(nDHTs).fill(0).map(async () => await tdht.spawn()) diff --git a/test/utils/test-dht.ts b/test/utils/test-dht.ts index f7f67bd5..c022b659 100644 --- a/test/utils/test-dht.ts +++ b/test/utils/test-dht.ts @@ -77,6 +77,7 @@ export class TestDHT { v: () => 0 }, querySelfInterval: 600000, + initialQuerySelfInterval: 600000, allowQueryWithZeroPeers: true, ...options }