-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathipfs-pubsub-peer-monitor.js
74 lines (61 loc) · 1.78 KB
/
ipfs-pubsub-peer-monitor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
'use strict'
const { difference} = require('./utils')
const EventEmitter = require('events')
const DEFAULT_OPTIONS = {
start: true,
pollInterval: 1000,
}
class IpfsPubsubPeerMonitor extends EventEmitter {
constructor (ipfsPubsub, topic, options) {
super()
this._pubsub = ipfsPubsub
this._topic = topic
this._options = Object.assign({}, DEFAULT_OPTIONS, options)
this._peers = []
this._interval = null
if (this._options.start)
this.start()
}
get started () { return this._interval !== null }
set started (val) { throw new Error("'started' is read-only") }
start () {
if (this._interval)
this.stop()
this._interval = setInterval(
this._pollPeers.bind(this),
this._options.pollInterval
)
this._pollPeers()
}
stop () {
clearInterval(this._interval)
this._interval = null
this.removeAllListeners('error')
this.removeAllListeners('join')
this.removeAllListeners('leave')
}
async getPeers () {
this._peers = await this._pubsub.peers(this._topic)
return this._peers.slice()
}
hasPeer (peer) {
return this._peers.includes(peer)
}
async _pollPeers () {
try {
const peers = await this._pubsub.peers(this._topic)
IpfsPubsubPeerMonitor._emitJoinsAndLeaves(new Set(this._peers), new Set(peers), this)
this._peers = peers
} catch (err) {
clearInterval(this._interval)
this.emit('error', err)
}
}
static _emitJoinsAndLeaves (oldValues, newValues, events) {
const emitJoin = addedPeer => events.emit('join', addedPeer)
const emitLeave = removedPeer => events.emit('leave', removedPeer)
difference(newValues, oldValues).forEach(emitJoin)
difference(oldValues, newValues).forEach(emitLeave)
}
}
module.exports = IpfsPubsubPeerMonitor