Skip to content
This repository was archived by the owner on Oct 5, 2023. It is now read-only.

refactor: Replace EventEmitter with EventTarget #63

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@
"build:docs/toc": "echo 'TODO'",
"build:tests": "rm -f test/browser/bundle.js* && webpack --config ./conf/webpack.tests.config.js",
"prepublishOnly": "npm run build",
"lint": "standard --env=mocha",
"lint:fix": "standard --fix",
"lint": "standard --env=mocha --globals=CustomEvent",
"lint:fix": "standard --fix --globals=CustomEvent",
"webrtc": "webrtc-star --port=12345",
"webrtc:background": "webrtc-star --port=12345 &"
},
Expand Down
11 changes: 4 additions & 7 deletions src/access-controllers/orbitdb.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { EventEmitter } from 'events'
import ensureACAddress from '../utils/ensure-ac-address.js'
import IPFSAccessController from './ipfs.js'

const type = 'orbitdb'

const OrbitDBAccessController = ({ write } = {}) => async ({ orbitdb, identities, address }) => {
const events = new EventEmitter()
const events = new EventTarget()

address = address || 'default-access-controller'
write = write || [orbitdb.identity.id]
Expand All @@ -14,11 +13,9 @@ const OrbitDBAccessController = ({ write } = {}) => async ({ orbitdb, identities
const db = await orbitdb.open(ensureACAddress(address), { type: 'keyvalue', AccessController: IPFSAccessController({ write }) })
address = db.address

const onUpdate = (entry) => {
events.emit('update', entry)
}

db.events.on('update', onUpdate)
db.events.addEventListener('update', event => {
events.dispatchEvent(new CustomEvent('update', { detail: event.detail }))
})

// Return true if entry is allowed to be added to the database
const canAppend = async (entry) => {
Expand Down
11 changes: 5 additions & 6 deletions src/database.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { EventEmitter } from 'events'
import PQueue from 'p-queue'
import Sync from './sync.js'
import { Log, Entry } from './oplog/index.js'
Expand Down Expand Up @@ -30,7 +29,7 @@ const Database = async ({ ipfs, identity, address, name, access, directory, meta

const log = await Log(identity, { logId: address, access, entryStorage, headsStorage, indexStorage })

const events = new EventEmitter()
const events = new EventTarget()
const queue = new PQueue({ concurrency: 1 })

const addOperation = async (op) => {
Expand All @@ -40,7 +39,7 @@ const Database = async ({ ipfs, identity, address, name, access, directory, meta
if (onUpdate) {
await onUpdate(log, entry)
}
events.emit('update', entry)
events.dispatchEvent(new CustomEvent('update', { detail: entry }))
return entry.hash
}
const hash = await queue.add(task)
Expand All @@ -57,7 +56,7 @@ const Database = async ({ ipfs, identity, address, name, access, directory, meta
if (onUpdate) {
await onUpdate(log, entry)
}
events.emit('update', entry)
events.dispatchEvent(new CustomEvent('update', { detail: entry }))
}
}
}
Expand All @@ -68,13 +67,13 @@ const Database = async ({ ipfs, identity, address, name, access, directory, meta
await sync.stop()
await queue.onIdle()
await log.close()
events.emit('close')
events.dispatchEvent(new Event('close'))
}

const drop = async () => {
await queue.onIdle()
await log.clear()
events.emit('drop')
events.dispatchEvent(new Event('drop'))
}

// Start the Sync protocol
Expand Down
83 changes: 83 additions & 0 deletions src/db/keyvalue-persisted.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import LevelStorage from '../storage/level.js'
import { KeyValue } from './index.js'
import pathJoin from '../utils/path-join.js'
import PQueue from 'p-queue'

const valueEncoding = 'json'

const KeyValuePersisted = async ({ OpLog, Database, ipfs, identity, address, name, access, directory, storage, meta }) => {
const keyValueStore = await KeyValue({ OpLog, Database, ipfs, identity, address, name, access, directory, storage, meta })
const { events, log } = keyValueStore

const queue = new PQueue({ concurrency: 1 })

directory = pathJoin(directory || './orbitdb', `./${address}/_index/`)
const index = await LevelStorage({ path: directory, valueEncoding })

let latestOplogHash

const updateIndex = (index) => async (entry) => {
const keys = {}

for await (const entry of log.iterator({ gt: latestOplogHash })) {
const { op, key, value } = entry.payload

if (op === 'PUT' && !keys[key]) {
keys[key] = true
await index.put(key, value)
} else if (op === 'DEL' && !keys[key]) {
keys[key] = true
await index.del(key)
}
}
latestOplogHash = entry.hash
}

const get = async (key) => {
await queue.onIdle()
const value = await index.get(key)
if (value) {
return value
}
return keyValueStore.get(key)
}

const iterator = async function * ({ amount } = {}) {
await queue.onIdle()
for await (const { hash, key, value } of keyValueStore.iterator({ amount })) {
yield { hash, key, value }
}
}

const task = async () => {
await queue.add(updateIndex(index))
}

const close = async () => {
events.removeEventListener('update', task)
await queue.onIdle()
await index.close()
await keyValueStore.close()
}

// TOD: rename to clear()
const drop = async () => {
events.removeEventListener('update', task)
await queue.onIdle()
await index.clear()
await keyValueStore.drop()
}

// Listen for update events from the database and update the index on every update
events.addEventListener('update', task)

return {
...keyValueStore,
get,
iterator,
close,
drop
}
}

export default KeyValuePersisted
2 changes: 1 addition & 1 deletion src/orbitdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ const OrbitDB = async ({ ipfs, id, identity, keystore, directory } = {}) => {
}
const db = await Database({ ipfs, identity, address: address.toString(), name, access: accessController, directory, meta, syncAutomatically: sync != null ? sync : true, headsStorage, entryStorage, indexStorage, referencesCount })

db.events.on('close', onDatabaseClosed(address.toString()))
db.events.addEventListener('close', onDatabaseClosed(address.toString()), { once: true })

databases[address.toString()] = db

Expand Down
17 changes: 8 additions & 9 deletions src/sync.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { pipe } from 'it-pipe'
import PQueue from 'p-queue'
import { EventEmitter } from 'events'
import { TimeoutController } from 'timeout-abort-controller'
import pathJoin from './utils/path-join.js'

Expand Down Expand Up @@ -33,8 +32,8 @@ const DefaultTimeout = 30000 // 30 seconds
* @param {Object} params One or more parameters for configuring Sync.
* @param {IPFS} params.ipfs An IPFS instance. Used for synchronizing peers.
* @param {Log} params.log The Log instance to sync.
* @param {Object} params.events An event emitter. Defaults to an instance of
* EventEmitter. Events emitted are 'join', 'error' and 'leave'.
* @param {Object} params.events An event target. Defaults to an instance of
* EventTarget. Events emitted are 'join', 'error' and 'leave'.
* @param {Function} params.onSynced A function that is called after the peer
* has received heads from another peer.
* @param {Boolean} params.start True if sync should start automatically, false
Expand All @@ -51,14 +50,14 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
const queue = new PQueue({ concurrency: 1 })
const peers = new Set()

events = events || new EventEmitter()
events = events || new EventTarget()
timeout = timeout || DefaultTimeout

let started = false

const onPeerJoined = async (peerId) => {
const heads = await log.heads()
events.emit('join', peerId, heads)
events.dispatchEvent(new CustomEvent('join', { detail: { peerId, heads } }))
}

const sendHeads = async (source) => {
Expand Down Expand Up @@ -87,7 +86,7 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
await pipe(stream, receiveHeads(peerId), sendHeads, stream)
} catch (e) {
peers.delete(peerId)
events.emit('error', e)
events.dispatchEvent(new CustomEvent('error', { detail: e }))
}
}

Expand All @@ -114,7 +113,7 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
// Skip peer, they don't have this database currently
} else {
peers.delete(peerId)
events.emit('error', e)
events.dispatchEvent(new CustomEvent('error', { detail: e }))
}
} finally {
if (timeoutController) {
Expand All @@ -123,7 +122,7 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
}
} else {
peers.delete(peerId)
events.emit('leave', peerId)
events.dispatchEvent(new CustomEvent('leave', { detail: peerId }))
}
}
queue.add(task)
Expand All @@ -139,7 +138,7 @@ const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
await onSynced(message.data)
}
} catch (e) {
events.emit('error', e)
events.dispatchEvent(new CustomEvent('error', { detail: e }))
}
}
queue.add(task)
Expand Down
5 changes: 3 additions & 2 deletions test/.mocharc.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"recursive": true,
"exit": true,
"slow": 1000,
"exclude": ["test/browser/**/*.js"]
}
"exclude": ["test/browser/**/*.js"],
"node-option": ["experimental-global-customevent"]
}
8 changes: 4 additions & 4 deletions test/access-controllers/orbit-db-access-controller.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ describe('OrbitDBAccessController', function () {

it('emit \'update\' event when a capability was added', async () => {
let update = false
const onUpdate = (entry) => {
const onUpdate = () => {
update = true
}

accessController.events.on('update', onUpdate)
accessController.events.addEventListener('update', onUpdate)

await accessController.grant('read', 'AXES')

Expand Down Expand Up @@ -261,11 +261,11 @@ describe('OrbitDBAccessController', function () {
await accessController.grant('admin', 'dogs')

let update = false
const onUpdate = (entry) => {
const onUpdate = () => {
update = true
}

accessController.events.on('update', onUpdate)
accessController.events.addEventListener('update', onUpdate)

await accessController.revoke('admin', 'cats')

Expand Down
Loading