Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

WIP: kappa-next #14

Open
wants to merge 87 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
7991a5d
Move index.js to kappa-old.js
Frando Oct 22, 2019
3442666
Add new Kappa!
Frando Oct 22, 2019
9472c73
Add kappaClassic wrapper and source handlers
Frando Oct 22, 2019
8160fb5
Add thunky dependency
Frando Oct 22, 2019
2ae73cb
Fixes
Frando Oct 22, 2019
f298710
Add example as test
Frando Oct 22, 2019
404c78f
Cleanup
Frando Oct 22, 2019
e029dff
Docs
Frando Oct 22, 2019
f5aa224
Rename method
Frando Oct 22, 2019
44cdd21
Add flowchart
Frando Oct 22, 2019
87a4a46
Minor changes to wiring
Frando Oct 22, 2019
a21b44c
Add a hyperdrive source handler & test
Frando Oct 22, 2019
400d16b
Enable all tests
Frando Oct 22, 2019
f4ae334
Use svg for flow graph
Frando Oct 22, 2019
a58dffc
Added kappa-graph.svg
Frando Oct 22, 2019
03665c1
Use svg for flow graph
Frando Oct 22, 2019
6eb507c
Merge branch 'kappa5' of github.com:Frando/kappa-core into kappa5
Frando Oct 22, 2019
7203eda
Remove png graph
Frando Oct 22, 2019
8600c9d
Default opts, and cleanup
Frando Oct 23, 2019
1a281b8
Remove subflows for now
Frando Oct 24, 2019
c996fe6
Fix hyperdrive, improve tests
Frando Oct 24, 2019
c32347e
Tests & cleanup
Frando Oct 24, 2019
11cfe8c
Lint standard
Frando Oct 24, 2019
0046f80
Make methods public
Frando Oct 24, 2019
d81e5ef
Cleanup open & ready
Frando Oct 27, 2019
4f21708
Better multifeed backward compatibility
Frando Oct 29, 2019
6605098
docs
Frando Nov 7, 2019
30753f0
context for views
Frando Nov 7, 2019
e8cc5b1
bump corestore to 4.0.0
ameba23 Nov 7, 2019
ed0f44a
add corestore source
ameba23 Nov 7, 2019
967a3a3
add test for corestore source
ameba23 Nov 7, 2019
37cecec
add default error handler for hypercore source
ameba23 Nov 7, 2019
8516f33
valueencoding = json in corestore test
ameba23 Nov 7, 2019
f08b157
function for corestore constructor
ameba23 Nov 7, 2019
d075ab4
Add transform opt
Frando Nov 14, 2019
53e908a
Add stacked views.
Frando Nov 15, 2019
6a3ff63
clearIndex for stacked views
Frando Nov 15, 2019
a615575
Merge remote-tracking branch 'cobox/corestoreSource' into kappa5
Frando Nov 15, 2019
c50d72d
Rework API after discussions and feedback.
Frando Dec 5, 2019
3cea9f7
Finish API rework. Ready for review.
Frando Dec 5, 2019
e239f0e
version
Frando Dec 5, 2019
f6b7705
Use level-mem for state helper
Frando Dec 6, 2019
7fa4758
Cleanup
Frando Dec 6, 2019
093d951
Improve source utils
Frando Dec 7, 2019
f3c8d7c
Add feed api on multifeed and corestore sources
Frando Dec 7, 2019
70d8717
fix
Frando Dec 7, 2019
373023f
fix
Frando Dec 7, 2019
78f7e11
Streamline and remove unneeded opts
Frando Dec 7, 2019
3c6b8ea
docs
Frando Dec 7, 2019
567286d
docs
Frando Dec 7, 2019
7a5e14d
docs
Frando Dec 7, 2019
fea70bc
docs
Frando Dec 7, 2019
de37cfa
fix: storeversion only after open
Frando Dec 13, 2019
b903074
Use pipeline helper
Frando Dec 13, 2019
b1a6bbc
add close logic
Frando Dec 13, 2019
b033e32
if source has ready method await it in flow.ready
Frando Jan 10, 2020
1577791
fix onsourceready
Frando Jan 10, 2020
721a9e4
fix open logic
Frando Feb 22, 2020
3c574b4
support ready on source directly
Frando Mar 17, 2020
f87f740
support opts.filter for flows
Frando Mar 17, 2020
307c234
missing dev deps
Frando Mar 20, 2020
4eab8c6
filter empty messages
Frando Mar 20, 2020
f0d3def
open in nextTick
Frando Apr 27, 2020
b03cc5b
first round of fixes after review
Frando May 2, 2020
ba96eda
more fixes and doc improvements after review
Frando May 2, 2020
0e3298a
remove filter option for sources and views
Frando May 2, 2020
20eb000
optimization: ensure good version value, dont remove api props
Frando May 2, 2020
3e5ab17
support errors on open
Frando May 2, 2020
040fdae
rename view.clearIndex to view.reset
Frando May 2, 2020
bc39cf3
docs formatting
Frando May 2, 2020
eaa5a0a
Docs improvements
Frando May 2, 2020
86472bd
more fixes from nettle's review
Frando May 2, 2020
2b60ba3
remove outdated test
Frando May 2, 2020
d32bc9d
corrections
Frando May 2, 2020
462fc1a
Prepare for API changes
Frando May 4, 2020
5b04ad8
add error state
Frando May 6, 2020
33e7532
track indexing state
Frando May 6, 2020
567b0bb
mark status property private
Frando May 6, 2020
0f3bc69
move test util libs
Frando May 7, 2020
111d683
cleanup
Frando May 7, 2020
c57c658
remove unneeded deps
Frando May 7, 2020
b29f5f2
fix corestore source
Frando May 7, 2020
da208c9
Emit state-update event on kappa
Frando May 29, 2020
afa09ba
Reuse iteration code on kappa
Frando May 29, 2020
381edb3
Better state tracking
Frando May 29, 2020
def20f1
Expose top-level getState
Frando Jun 2, 2020
187c9f9
Restructure test SimpleSource
Frando Jun 2, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
node_modules
package-lock.json
pnpm-lock.yaml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(comment unrelated to this file)

Hey do you think we ought to update example.js to show how the new API works? That or nix it and just have a good README example.

yarn.lock
SANDBOX
169 changes: 169 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,174 @@
# kappa-core

## **kappa-core WIP rewrite** ##

kappa-core is a database abstraction for append-only logs and add-only sets. A kappa-core is a container for pairs of sources and views, called *flows*. In each flow, data flows from a source into a view. Sources have a pull function that fetches new messages since the last pull. Views have a map function which is called for each batch of messages from the source.

kappa-core itself is dependencyless, but this module also contains stateful source handlers for [hypercores](https://github.com/mafintosh/hypercore), [multifeed](https://github.com/kappa-core/multifeed), and [corestore](https://github.com/andrewosh/corestore).

## API

`const { Kappa } = require('kappa-core')`

#### `const kappa = new Kappa()`

Create a new kappa core.

#### `kappa.use(name, source, view)`

Register a flow.

* `name` (string) the name of the flow, has to be unique per kappa core
* `source` object with properties:
* `open: function (flow, cb)` *(optional)* Handler to call on open. `flow` is the current flow object (see below for docs). Call `cb` when done with opening.
* `close: function (cb)`: *(optional)* Handler to call on close. Has to call `cb`.
* `pull: function (next)`: **(required)** Handler to pull new messages from the view. Should call `next` with either nothing or an object that looks like this:
```javascript
{
error: Error,
messages: [], // array of messages
finished: true, // if set to false, signal that more messages are pending
onindexed: function (cb) {
// will be called when the view finished indexing
// call cb after the source state is updated
// may return a state object with, by convention, the following keys:
cb(null, {
totalBlocks: Number,
indexedBlocks: Number,
prevIndexedBlocks: Number
})
}
}
```
* `reset: function (cb)`: **(required)** Handler to reset internal state. This is called when a full reindex is necessary. This means that the next pull ought to start at the beginning.
* `storeVersion: function (version, cb)`: **(required)** Handler to store the flow version number.
* `fetchVersion: function (cb)`: **(required)** Handler to fetch the version stored with `storeVersion`.
* See the `SimpleState` docs below how to easily implement the `reset`, `storeVersion` and `fetchVersion` methods.

* `view` object with properties:
* `open: function (flow, cb)` *(optional)* Handler to call on open. `flow` is the current flow object (see below for docs). Call `cb` when done with opening.
* `close: function (cb)`: *(optional)* Handler to call on close. Has to call `cb`.
* `map: function (messages, next)` **(required)** Handler for each batch of messages. Call `next` when done indexing this batch of messages.
* `reset: function (cb)`: **(required)** Handler to delete all indexed data. This is called by the Kappa core when a complete reindex is necessary. The `map` function will receive messages from the start on afterwards.
* `version: int` The view version. If the version is increased, the Kappa core will clear and restart the indexing for this view after the next reopening of the core. Defaults to `1`.

Both `source` and `view` can have an `api` property with an object of functions. The functions are exposed on `kappa.view[name]` / `kappa.source[name]`. Their `this` object refers to the flow they are part of, and their first parameter is the `kappa`. Other parameters are passed through.

The source has to track its state, so that subsequent calls to `pull()` do not return the same messages. Use the `onindexed` callback to update state. How to track its state is up to the source implementation. kappa-core provides a `SimpleState` helper to simplify this, see its documentation below.

There are several source handlers included in kappa-core (TODO: document sources). See the tests and sources directories.

#### `kappa.reset(name, cb)`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the downstream usecase for this? I think if nobody we know needs it, we could make it hidden, and keep the API surface area down, which means less maintenance for us!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be made hidden, yes. Instead advertize in the API to increase the view version number if you want to trigger a reindex. I used it in testing and benchmarking situations (eg to measure how long indexing takes).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I agree with keeping it around undocumented for tests & benchmarks.


Reset a specific flow, to restart indexing. This is equal to reopening the kappa-core with a changed view version for this flow.

#### `kappa.ready(names, cb)`

Call `cb` exactly once, after all flows with a name in the `names` array have finished processing. If `names` is empty, all flows will be awaited. This `names` is a string, the flow of this name will be awaited. If the requested flows are already ready, `cb` is called immediately.

#### `kappa.pause()`

Pause processing of all flows

#### `kappa.resume()`

Resume processing of all flows

## Flow

When calling `kappa.use()` a new *Flow* is created. A Flow is the combination of a source and a view - where the data flows from the source into the view. The `Flow` object is passed to sources and views in their `open` handler. It has this public API:

* `flow.name`: (string) A name that uniquely identifies this flow within the Kappa core.
* `flow.update()`: Signal to the flow that the source has new data available. Youwant to call this from a source when the source has new data. If the Kappa core is not paused, this will cause the `pull` handler to be called.
* `flow.ready(cb)`: Calls `cb` (with no arguments) when this flow has finished processing all messages. `cb` is called immediately if the flow is already finished.
* `flow.getState()`: Get the current indexing state. Returns an object:
```javascript
{
status: 'idle' | 'running' | 'paused' | 'error',
error: null | Error,
// ... other keys as returned by the source
// by convention this should include the following keys:
totalBlocks: Number,
indexedBlocks: Number,
prevIndexedBlocks: Number
}
```
* `flow.view`: Object with the view's API functions
* `flow.source`: Object with the source's API functions

## SimpleState

`kappa-core` exports a `SimpleState` class that can be used by sources for a simple state handling. It persists state either in-memory, and supports a [LevelDB](https://github.com/Level/level) (or compatible) option for persistence.

Example:

```javascript
const { Kappa, SimpleState } = require('kappa-core')
function createSource (opts) {
const state = new SimpleState({ db: opts.db })
return {
pull (next) {
// get your current state
state.get((err, state) => {
if (err) return next()
// fetch messages from your data source
fetchMessages(state, ({ messages, finished, nextState }) => {
// call next with an onindexed handler
next({
messages,
finished,
onindexed (cb) {
// store the new state
state.put(nextState, cb)
}
})
})
})
},
fetchVersion: state.fetchVersion,
storeVersion: state.storeVersion,
reset (cb) {
state.put('', cb)
}
}
}
```

## Sources

#### [hypercore](https://github.com/mafintosh/hypercore)

```javascript
const createHypercoreSource = require('kappa-core/sources/hypercore')
const source = createHypercoreSource({ feed, db })
```

where `feed` is a hypercore instance and `db` is a levelup instance (for persisting state)

#### [multifeed](https://github.com/kappa-core/multifeed)

```javascript
const createMultifeedSource = require('kappa-core/sources/multifeed')
const source = createMultifeedSource({ feeds, db })
```

where `feeds` is a multifeed instance and `db` is a levelup instance (for persisting state)

This source exposes an API method `feed (key)` that return a feed by key from the underlying multifeed.

#### [corestore](https://github.com/andrewosh/corestore)

```javascript
const createCorestoreSource = require('kappa-core/sources/corestore')
const source = createCorestoreSource({ store, db })
```

where `store` is a corestore instance and `db` is a levelup instance (for persisting state)

This source exposes an API method `feed (key)` that return a feed by key from the underlying corestore.

---

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(reminder for us to revise the old README, which is in tact below the above additions)

> kappa-core is a minimal peer-to-peer database, based on append-only logs and materialized views.

## Introduction
Expand Down
131 changes: 4 additions & 127 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,129 +1,6 @@
var inherits = require('inherits')
var EventEmitter = require('events').EventEmitter
var hypercore = require('hypercore')
var multifeed = require('multifeed')
var indexer = require('multifeed-index')
const Kappa = require('./kappa')
const SimpleState = require('./sources/util/state')

module.exports = Kappa

function Kappa (storage, opts) {
if (!(this instanceof Kappa)) return new Kappa(storage, opts)
if (!opts) opts = {}

this._logs = opts.multifeed || multifeed(hypercore, storage, opts)
this._indexes = {}

this.api = {}
}

inherits(Kappa, EventEmitter)

Kappa.prototype.use = function (name, version, view) {
var self = this
if (typeof version !== 'number') {
view = version
version = undefined
}
var idx = indexer(Object.assign({}, view, {
log: this._logs,
version: version,
maxBatch: view.maxBatch || 10,
batch: view.map
}))
idx.on('error', function (err) {
self.emit('error', err)
})
if (view.indexed) idx.on('indexed', view.indexed)
this._indexes[name] = idx
this.api[name] = {}
this.api[name].ready = idx.ready.bind(idx)
for (var key in view.api) {
if (typeof view.api[key] === 'function') this.api[name][key] = view.api[key].bind(idx, this)
else this.api[name][key] = view.api[key]
}
}

Kappa.prototype.feeds = function () {
return this._logs.feeds()
}

Kappa.prototype.ready = function (viewNames, cb) {
if (typeof viewNames === 'function') {
cb = viewNames
viewNames = []
}

if (typeof viewNames === 'string') viewNames = [viewNames]
if (viewNames.length === 0) {
viewNames = Object.keys(this._indexes)
}

var pending = viewNames.length + 1
var self = this
this._logs.ready(function () {
for (var i = 0; i < viewNames.length; i++) {
self._indexes[viewNames[i]].ready(done)
}
done()
})

function done () {
if (!--pending) cb()
}
}

Kappa.prototype.pause = function (viewNames, cb) {
if (typeof viewNames === 'function') {
cb = viewNames
viewNames = []
}
cb = cb || noop

if (!viewNames) viewNames = []
if (typeof viewNames === 'string') viewNames = [viewNames]
if (viewNames.length === 0) {
viewNames = Object.keys(this._indexes)
}

var pending = viewNames.length + 1
var self = this
this._logs.ready(function () {
for (var i = 0; i < viewNames.length; i++) {
self._indexes[viewNames[i]].pause(done)
}
done()
})

function done () {
if (!--pending) cb()
}
}

Kappa.prototype.resume = function (viewNames) {
if (!viewNames) viewNames = []
if (typeof viewNames === 'string') viewNames = [viewNames]
if (viewNames.length === 0) {
viewNames = Object.keys(this._indexes)
}

var self = this
this._logs.ready(function () {
for (var i = 0; i < viewNames.length; i++) {
self._indexes[viewNames[i]].resume()
}
})
}

Kappa.prototype.writer = function (name, cb) {
this._logs.writer(name, cb)
}

Kappa.prototype.feed = function (key) {
return this._logs.feed(key)
}

Kappa.prototype.replicate = function (opts) {
return this._logs.replicate(opts)
}

function noop () {}
module.exports.Kappa = Kappa
module.exports.SimpleState = SimpleState
3 changes: 3 additions & 0 deletions kappa-graph.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading