Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: invoke onProgress callback if passed as an option #472

Merged
merged 2 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
"p-defer": "^4.0.0",
"p-queue": "^7.3.4",
"private-ip": "^3.0.0",
"progress-events": "^1.0.0",
"protons-runtime": "^5.0.0",
"uint8arraylist": "^2.0.0",
"uint8arrays": "^4.0.2",
Expand Down
8 changes: 4 additions & 4 deletions src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export class ContentFetching {
}

if (!sentCorrection) {
yield queryErrorEvent({ from, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') })
yield queryErrorEvent({ from, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') }, options)
}

this.log.error('Failed error correcting entry')
Expand Down Expand Up @@ -165,7 +165,7 @@ export class ContentFetching {
}

if (!(putEvent.record != null && uint8ArrayEquals(putEvent.record.value, Libp2pRecord.deserialize(record).value))) {
events.push(queryErrorEvent({ from: event.peer.id, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') }))
events.push(queryErrorEvent({ from: event.peer.id, error: new CodeError('value not put correctly', 'ERR_PUT_VALUE_INVALID') }, options))
}
}

Expand Down Expand Up @@ -240,7 +240,7 @@ export class ContentFetching {
yield valueEvent({
value: localRec.value,
from: this.components.peerId
})
}, options)
} catch (err: any) {
this.log('error getting local value for %b', key, err)
}
Expand All @@ -252,7 +252,7 @@ export class ContentFetching {
yield event

if (event.name === 'PEER_RESPONSE' && (event.record != null)) {
yield valueEvent({ from: peer, value: event.record.value })
yield valueEvent({ from: peer, value: event.record.value }, options)
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions src/content-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import type { QueryManager } from '../query/manager.js'
import type { QueryFunc } from '../query/types.js'
import type { RoutingTable } from '../routing-table/index.js'
import type { PeerInfo } from '@libp2p/interface-peer-info'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Logger } from '@libp2p/logger'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { CID } from 'multiformats/cid'
Expand Down Expand Up @@ -56,7 +55,7 @@ export class ContentRouting {
* Announce to the network that we can provide the value for a given key and
* are contactable on the given multiaddrs
*/
async * provide (key: CID, multiaddrs: Multiaddr[], options: AbortOptions = {}): AsyncGenerator<QueryEvent, void, undefined> {
async * provide (key: CID, multiaddrs: Multiaddr[], options: QueryOptions = {}): AsyncGenerator<QueryEvent, void, undefined> {
this.log('provide %s', key)

// Add peer as provider
Expand Down Expand Up @@ -94,7 +93,7 @@ export class ContentRouting {
}
} catch (err: any) {
this.log.error('error sending provide record to peer %p', event.peer.id, err)
events.push(queryErrorEvent({ from: event.peer.id, error: err }))
events.push(queryErrorEvent({ from: event.peer.id, error: err }, options))
}

return events
Expand Down Expand Up @@ -153,8 +152,8 @@ export class ContentRouting {
}
}

yield peerResponseEvent({ from: this.components.peerId, messageType: MESSAGE_TYPE.GET_PROVIDERS, providers })
yield providerEvent({ from: this.components.peerId, providers })
yield peerResponseEvent({ from: this.components.peerId, messageType: MESSAGE_TYPE.GET_PROVIDERS, providers }, options)
yield providerEvent({ from: this.components.peerId, providers }, options)
}

// All done
Expand All @@ -168,7 +167,10 @@ export class ContentRouting {
const findProvidersQuery: QueryFunc = async function * ({ peer, signal }) {
const request = new Message(MESSAGE_TYPE.GET_PROVIDERS, target, 0)

yield * self.network.sendRequest(peer, request, { signal })
yield * self.network.sendRequest(peer, request, {
...options,
signal
})
}

const providers = new Set(provs.map(p => p.toString()))
Expand All @@ -191,7 +193,7 @@ export class ContentRouting {
}

if (newProviders.length > 0) {
yield providerEvent({ from: event.from, providers: newProviders })
yield providerEvent({ from: event.from, providers: newProviders }, options)
}

if (providers.size === toFind) {
Expand Down
27 changes: 13 additions & 14 deletions src/dual-kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { queryErrorEvent } from './query/events.js'
import type { DualKadDHT, KadDHT, KadDHTComponents, KadDHTInit, QueryEvent, QueryOptions } from './index.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerInfo } from '@libp2p/interface-peer-info'
import type { AbortOptions } from '@libp2p/interfaces'
import type { CID } from 'multiformats/cid'

const log = logger('libp2p:kad-dht')
Expand All @@ -26,23 +25,23 @@ class DHTContentRouting implements ContentRouting {
this.dht = dht
}

async provide (cid: CID): Promise<void> {
await drain(this.dht.provide(cid))
async provide (cid: CID, options: QueryOptions = {}): Promise<void> {
await drain(this.dht.provide(cid, options))
}

async * findProviders (cid: CID, options: AbortOptions = {}): AsyncGenerator<PeerInfo, void, undefined> {
async * findProviders (cid: CID, options: QueryOptions = {}): AsyncGenerator<PeerInfo, void, undefined> {
for await (const event of this.dht.findProviders(cid, options)) {
if (event.name === 'PROVIDER') {
yield * event.providers
}
}
}

async put (key: Uint8Array, value: Uint8Array, options?: AbortOptions): Promise<void> {
async put (key: Uint8Array, value: Uint8Array, options?: QueryOptions): Promise<void> {
await drain(this.dht.put(key, value, options))
}

async get (key: Uint8Array, options?: AbortOptions): Promise<Uint8Array> {
async get (key: Uint8Array, options?: QueryOptions): Promise<Uint8Array> {
for await (const event of this.dht.get(key, options)) {
if (event.name === 'VALUE') {
return event.value
Expand All @@ -63,7 +62,7 @@ class DHTPeerRouting implements PeerRouting {
this.dht = dht
}

async findPeer (peerId: PeerId, options: AbortOptions = {}): Promise<PeerInfo> {
async findPeer (peerId: PeerId, options: QueryOptions = {}): Promise<PeerInfo> {
for await (const event of this.dht.findPeer(peerId, options)) {
if (event.name === 'FINAL_PEER') {
return event.peer
Expand All @@ -73,7 +72,7 @@ class DHTPeerRouting implements PeerRouting {
throw new CodeError('Not found', 'ERR_NOT_FOUND')
}

async * getClosestPeers (key: Uint8Array, options: AbortOptions = {}): AsyncIterable<PeerInfo> {
async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncIterable<PeerInfo> {
for await (const event of this.dht.getClosestPeers(key, options)) {
if (event.name === 'FINAL_PEER') {
yield event.peer
Expand Down Expand Up @@ -207,7 +206,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
)) {
yield event

if (event.name === 'DIALING_PEER') {
if (event.name === 'DIAL_PEER') {
queriedPeers = true
}

Expand All @@ -219,7 +218,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
}
}

if (event.name === 'SENDING_QUERY') {
if (event.name === 'SEND_QUERY') {
queriedPeers = true
}
}
Expand All @@ -232,7 +231,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
yield queryErrorEvent({
from: this.components.peerId,
error: new CodeError('Not found', 'ERR_NOT_FOUND')
})
}, options)
}
}

Expand All @@ -241,7 +240,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
/**
* Announce to the network that we can provide given key's value
*/
async * provide (key: CID, options: AbortOptions = {}): AsyncGenerator<QueryEvent> {
async * provide (key: CID, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
let sent = 0
let success = 0
const errors = []
Expand All @@ -256,7 +255,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
for await (const event of merge(...dhts.map(dht => dht.provide(key, options)))) {
yield event

if (event.name === 'SENDING_QUERY') {
if (event.name === 'SEND_QUERY') {
sent++
}

Expand Down Expand Up @@ -304,7 +303,7 @@ export class DefaultDualKadDHT extends EventEmitter<PeerDiscoveryEvents> impleme
)) {
yield event

if (event.name === 'SENDING_QUERY' || event.name === 'FINAL_PEER') {
if (event.name === 'SEND_QUERY' || event.name === 'FINAL_PEER') {
queriedPeers = true
}
}
Expand Down
39 changes: 25 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ import type { Registrar } from '@libp2p/interface-registrar'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Datastore } from 'interface-datastore'
import type { CID } from 'multiformats/cid'
import type { ProgressOptions, ProgressEvent } from 'progress-events'

/**
* The types of events emitted during DHT queries
*/
export enum EventTypes {
SENDING_QUERY = 0,
SEND_QUERY = 0,
PEER_RESPONSE,
FINAL_PEER,
QUERY_ERROR,
PROVIDER,
VALUE,
ADDING_PEER,
DIALING_PEER
ADD_PEER,
DIAL_PEER
}

/**
Expand All @@ -45,17 +46,27 @@ export interface DHTRecord {
timeReceived?: Date
}

export interface QueryOptions extends AbortOptions {
export type DHTProgressEvents =
ProgressEvent<'kad-dht:query:send-query', SendQueryEvent> |
ProgressEvent<'kad-dht:query:peer-response', PeerResponseEvent> |
ProgressEvent<'kad-dht:query:final-peer', FinalPeerEvent> |
ProgressEvent<'kad-dht:query:query-error', QueryErrorEvent> |
ProgressEvent<'kad-dht:query:provider', ProviderEvent> |
ProgressEvent<'kad-dht:query:value', ValueEvent> |
ProgressEvent<'kad-dht:query:add-peer', AddPeerEvent> |
ProgressEvent<'kad-dht:query:dial-peer', DialPeerEvent>

export interface QueryOptions extends AbortOptions, ProgressOptions {
queryFuncTimeout?: number
}

/**
* Emitted when sending queries to remote peers
*/
export interface SendingQueryEvent {
export interface SendQueryEvent {
to: PeerId
type: EventTypes.SENDING_QUERY
name: 'SENDING_QUERY'
type: EventTypes.SEND_QUERY
name: 'SEND_QUERY'
messageName: keyof typeof MessageType
messageType: MessageType
}
Expand Down Expand Up @@ -118,22 +129,22 @@ export interface ValueEvent {
/**
* Emitted when peers are added to a query
*/
export interface AddingPeerEvent {
type: EventTypes.ADDING_PEER
name: 'ADDING_PEER'
export interface AddPeerEvent {
type: EventTypes.ADD_PEER
name: 'ADD_PEER'
peer: PeerId
}

/**
* Emitted when peers are dialled as part of a query
*/
export interface DialingPeerEvent {
export interface DialPeerEvent {
peer: PeerId
type: EventTypes.DIALING_PEER
name: 'DIALING_PEER'
type: EventTypes.DIAL_PEER
name: 'DIAL_PEER'
}

export type QueryEvent = SendingQueryEvent | PeerResponseEvent | FinalPeerEvent | QueryErrorEvent | ProviderEvent | ValueEvent | AddingPeerEvent | DialingPeerEvent
export type QueryEvent = SendQueryEvent | PeerResponseEvent | FinalPeerEvent | QueryErrorEvent | ProviderEvent | ValueEvent | AddPeerEvent | DialPeerEvent

export interface RoutingTable {
size: number
Expand Down
26 changes: 13 additions & 13 deletions src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import * as lp from 'it-length-prefixed'
import { pipe } from 'it-pipe'
import { Message } from './message/index.js'
import {
dialingPeerEvent,
sendingQueryEvent,
dialPeerEvent,
sendQueryEvent,
peerResponseEvent,
queryErrorEvent
} from './query/events.js'
import type { KadDHTComponents, QueryEvent } from './index.js'
import type { KadDHTComponents, QueryEvent, QueryOptions } from './index.js'
import type { Stream } from '@libp2p/interface-connection'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerInfo } from '@libp2p/interface-peer-info'
Expand Down Expand Up @@ -82,14 +82,14 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable {
/**
* Send a request and record RTT for latency measurements
*/
async * sendRequest (to: PeerId, msg: Message, options: AbortOptions = {}): AsyncGenerator<QueryEvent> {
async * sendRequest (to: PeerId, msg: Message, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
if (!this.running) {
return
}

this.log('sending %s to %p', msg.type, to)
yield dialingPeerEvent({ peer: to })
yield sendingQueryEvent({ to, type: msg.type })
yield dialPeerEvent({ peer: to }, options)
yield sendQueryEvent({ to, type: msg.type }, options)

let stream: Stream | undefined

Expand All @@ -105,9 +105,9 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable {
closer: response.closerPeers,
providers: response.providerPeers,
record: response.record
})
}, options)
} catch (err: any) {
yield queryErrorEvent({ from: to, error: err })
yield queryErrorEvent({ from: to, error: err }, options)
} finally {
if (stream != null) {
stream.close()
Expand All @@ -118,14 +118,14 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable {
/**
* Sends a message without expecting an answer
*/
async * sendMessage (to: PeerId, msg: Message, options: AbortOptions = {}): AsyncGenerator<QueryEvent> {
async * sendMessage (to: PeerId, msg: Message, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
if (!this.running) {
return
}

this.log('sending %s to %p', msg.type, to)
yield dialingPeerEvent({ peer: to })
yield sendingQueryEvent({ to, type: msg.type })
yield dialPeerEvent({ peer: to }, options)
yield sendQueryEvent({ to, type: msg.type }, options)

let stream: Stream | undefined

Expand All @@ -135,9 +135,9 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable {

await this._writeMessage(stream, msg.serialize(), options)

yield peerResponseEvent({ from: to, messageType: msg.type })
yield peerResponseEvent({ from: to, messageType: msg.type }, options)
} catch (err: any) {
yield queryErrorEvent({ from: to, error: err })
yield queryErrorEvent({ from: to, error: err }, options)
} finally {
if (stream != null) {
stream.close()
Expand Down
Loading