diff --git a/README.md b/README.md new file mode 100644 index 0000000..b687708 --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +# btree-2d + +Package `btree-2d` implements a 2-dimensional B+tree data structure. +The B+tree implementation itself is generated by http://github.com/cznic/b. + +## Installation + +``` +go get github.com/zenhotels/btree-2d +``` + +## Example + +None. + +## Benchmarks + +None. + +## License + +BSD/MIT diff --git a/btree2d.go b/btree2d.go new file mode 100644 index 0000000..20666b0 --- /dev/null +++ b/btree2d.go @@ -0,0 +1,90 @@ +package btree2d + +import ( + "github.com/zenhotels/btree-2d/common" + "github.com/zenhotels/btree-2d/primary" + "github.com/zenhotels/btree-2d/secondary" +) + +type BTree2D interface { + Sync(next BTree2D, onAdd, onDel func(key1, key2 common.Comparable)) + GetLayer(key1 common.Comparable) (secondary.Layer, bool) + SetLayer(key1 common.Comparable, layer secondary.Layer) + ForEach(fn func(key common.Comparable, layer secondary.Layer) bool) + ForEach2(key1 common.Comparable, fn func(key2 common.Comparable) bool) + Put(key1 common.Comparable, key2 common.FinalizableComparable) + Delete(key1, key2 common.Comparable) bool + Drop(key1 common.Comparable) bool +} + +func NewBTree2D() BTree2D { + return btree2d{ + primary: primary.NewLayer(), + } +} + +type btree2d struct { + primary primary.Layer +} + +func (prev btree2d) Sync(next BTree2D, onAdd, onDel func(key1, key2 common.Comparable)) { + nextBTree2D := next.(btree2d) + + switch { + case onAdd != nil && onDel != nil: + prev.primary.Sync(nextBTree2D.primary, func(k1 primary.Key, k2 secondary.Key) { + onAdd(k1.Value, k2.Value) + }, func(k1 primary.Key, k2 secondary.Key) { + onDel(k1.Value, k2.Value) + }) + case onAdd != nil: + prev.primary.Sync(nextBTree2D.primary, func(k1 primary.Key, k2 secondary.Key) { + onAdd(k1.Value, k2.Value) + }, nil) + case onDel != nil: + prev.primary.Sync(nextBTree2D.primary, nil, func(k1 primary.Key, k2 secondary.Key) { + onDel(k1.Value, k2.Value) + }) + default: + prev.primary.Sync(nextBTree2D.primary, nil, nil) + } +} + +func (b btree2d) ForEach(fn func(key common.Comparable, layer secondary.Layer) bool) { + b.primary.ForEach(func(key primary.Key, layer secondary.Layer) bool { + return fn(key.Value, layer) + }) +} + +func (b btree2d) ForEach2(key1 common.Comparable, fn func(key2 common.Comparable) bool) { + if layer2, ok := b.primary.Get(primary.Key{key1}); ok { + layer2.ForEach(func(key secondary.Key) bool { + return fn(key.Value) + }) + } +} + +func (b btree2d) SetLayer(key1 common.Comparable, layer secondary.Layer) { + b.primary.Set(primary.Key{key1}, layer) +} + +func (b btree2d) GetLayer(key1 common.Comparable) (secondary.Layer, bool) { + return b.primary.Get(primary.Key{key1}) +} + +func (b btree2d) Drop(key1 common.Comparable) bool { + return b.primary.Drop(primary.Key{key1}) +} + +func (b btree2d) Put(key1 common.Comparable, key2 common.FinalizableComparable) { + b.primary.Put(primary.Key{key1}, secondary.Key{key2}) +} + +func (b btree2d) Delete(key1, key2 common.Comparable) (ok bool) { + layer2, ok := b.primary.Get(primary.Key{key1}) + if !ok { + return false + } + fkey := NewFinalizable(key2) + return layer2.Delete(secondary.Key{fkey}) +} diff --git a/btree_test.go b/btree_test.go new file mode 100644 index 0000000..7d82653 --- /dev/null +++ b/btree_test.go @@ -0,0 +1,147 @@ +package btree2d + +import ( + "fmt" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/zenhotels/btree-2d/common" + "github.com/zenhotels/btree-2d/secondary" +) + +// coverage: 21.3% of statements +func TestBTree2DSync(t *testing.T) { + assert := assert.New(t) + + next := getTree(1000) + var added int + var deleted int + empty := NewBTree2D() + empty.Sync(next, func(_, _ common.Comparable) { + // onAdd + added++ + }, func(_, _ common.Comparable) { + // onDel + deleted++ + }) + + assert.Equal(1000*1000, added) + assert.Equal(0, deleted) + + var layer1 int + var layer2 int + empty.ForEach(func(_ common.Comparable, layer secondary.Layer) bool { + layer1++ + layer.ForEach(func(_ secondary.Key) bool { + layer2++ + return false + }) + return false + }) + + assert.Equal(1000, layer1) + assert.Equal(1000*1000, layer2) +} + +func getTree(nLimit int, callbacks ...func()) BTree2D { + next := NewBTree2D() + for i := 0; i < nLimit; i++ { + for j := 0; j < nLimit; j++ { + info := &routeInfo{ + Host: uint64((i + 1) * (j + 1)), + } + next.Put(ID(i), NewFinalizable(info)) + } + } + return next +} + +func BenchmarkTreeSync(b *testing.B) { + next := getTree(100, func() {}) + b.ResetTimer() + b.ReportAllocs() + + var added int + var deleted int + for i := 0; i < b.N; i++ { + empty := NewBTree2D() + empty.Sync(next, func(_, _ common.Comparable) { + // onAdd + added++ + }, func(_, _ common.Comparable) { + // onDel + deleted++ + }) + } + + b.StopTimer() + if added != 10000*b.N { + b.Fatal("wrong added count", added) + } +} + +// func BenchmarkRegistrySync(b *testing.B) { +// next := getRegistry(100, func() {}) +// b.ResetTimer() +// b.ReportAllocs() + +// var added int +// var deleted int +// for i := 0; i < b.N; i++ { +// var empty route.Registry +// empty.Sync(next, func(_ uint64, _ route.RouteInfo) { +// // onAdd +// added++ +// }, func(_ uint64, _ route.RouteInfo) { +// // onDel +// deleted++ +// }) +// } + +// b.StopTimer() +// if added != 10000*b.N { +// b.Fatal("wrong added count", added) +// } +// } + +// func getRegistry(nLimit int, callbacks ...func()) *route.Registry { +// next := new(route.Registry) +// for i := 0; i < nLimit; i++ { +// for j := 0; j < nLimit; j++ { +// info := route.RouteInfo{ +// Host: uint64((i + 1) * (j + 1)), +// } +// next.Push(uint64(i), info) +// } +// } +// return next +// } + +type ID uint64 + +func (u ID) Less(u2 common.Comparable) bool { + return u < u2.(ID) +} + +type routeInfo struct { + Host uint64 + Distance int + Upstream *http.Transport +} + +func (r routeInfo) String() string { + return fmt.Sprintf("{%d<-%s:%d}", r.Host, r.Upstream, r.Distance) +} + +func (r routeInfo) Less(r2 common.Comparable) bool { + return r.Host < r2.(*routeInfo).Host +} + +// func (r *routeInfo) AddFinalizer(fn func()) bool { +// return false +// } + +// func (r *routeInfo) Finalize() { + +// } diff --git a/common/common.go b/common/common.go new file mode 100644 index 0000000..be79ca6 --- /dev/null +++ b/common/common.go @@ -0,0 +1,12 @@ +package common + +type Comparable interface { + Less(other Comparable) bool +} + +type FinalizableComparable interface { + Comparable + + Finalize() + AddFinalizer(fn func()) bool +} diff --git a/finalizers.go b/finalizers.go new file mode 100644 index 0000000..c1e71de --- /dev/null +++ b/finalizers.go @@ -0,0 +1,77 @@ +package btree2d + +import ( + "sync" + "sync/atomic" + + "github.com/zenhotels/btree-2d/common" +) + +const MaxFinalizers = 16 + +func NewFinalizable(obj common.Comparable) common.FinalizableComparable { + return &withFinalizers{ + obj: obj, + } +} + +func NonFinalizable(obj common.Comparable) common.FinalizableComparable { + return &withFinalizers{ + obj: obj, + } +} + +type withoutFinalizers struct { + obj common.Comparable +} + +func (w *withoutFinalizers) Less(w2 common.Comparable) bool { + return w.obj.Less(w2.(*withoutFinalizers).obj) +} + +func (w *withoutFinalizers) Finalize() { + +} + +func (w *withoutFinalizers) AddFinalizer(fn func()) bool { + return false +} + +type withFinalizers struct { + obj common.Comparable + + offset int64 + list []func() + onceNewList sync.Once +} + +func (w *withFinalizers) Less(w2 common.Comparable) bool { + return w.obj.Less(w2.(*withFinalizers).obj) +} + +func (w *withFinalizers) Finalize() { + offset := atomic.SwapInt64(&w.offset, 100) + if offset >= 100 { + return + } else if offset > MaxFinalizers { + offset = MaxFinalizers + } + _ = w.list[MaxFinalizers-1] + for i := int64(0); i < offset; i++ { + go w.list[i]() + } + w.list = nil // protected by swapInt64 +} + +func (w *withFinalizers) AddFinalizer(fn func()) bool { + w.onceNewList.Do(func() { + w.list = make([]func(), MaxFinalizers) + }) + offset := atomic.AddInt64(&w.offset, 1) + if offset > MaxFinalizers { + atomic.AddInt64(&w.offset, -1) + return false + } + w.list[offset-1] = fn + return true +} diff --git a/lockie/LICENSE b/lockie/LICENSE new file mode 100644 index 0000000..2d31db4 --- /dev/null +++ b/lockie/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Meteora + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/lockie/lockie.go b/lockie/lockie.go new file mode 100644 index 0000000..dfb6ce9 --- /dev/null +++ b/lockie/lockie.go @@ -0,0 +1,92 @@ +// Package lockie provides a very simple spinlock. +package lockie + +import ( + "runtime" + "sync/atomic" + "unsafe" +) + +// i64b is a boolean value which represents whether or not the current system is 64-bit +var i64b = is64bit() + +// NewLockie returns a Lockie interface +func NewLockie() Lockie { + // If we are not on 64-bit, return Lockie32 + if !i64b { + return &Lockie32{} + } + + // Else, return Lockie64 + return &Lockie64{} +} + +// Lockie is the primary interface for locking/unlocking +type Lockie interface { + Lock() + Unlock() +} + +// NewLockie64 returns a pointer to a new instance of Lockie64 +func NewLockie64() *Lockie64 { + return &Lockie64{} +} + +// Lockie64 is the 64-bit optimized locking mechanism +type Lockie64 struct { + // Lock state + // 0 represents an unlocked state + // 1 represents a locked state + lock int64 +} + +// Lock acquires a write-lock +func (l *Lockie64) Lock() { + // Loop until we are able to swap value of l.lock from 0 to 1 + for !atomic.CompareAndSwapInt64(&l.lock, 0, 1) { + // Allow other go routines to utilize some CPU time + runtime.Gosched() + } +} + +// Unlock releases a lock +func (l *Lockie64) Unlock() { + // Swaps the value of l.lock to 0 + atomic.StoreInt64(&l.lock, 0) +} + +// NewLockie32 returns a pointer to a new instance of Lockie32 +func NewLockie32() *Lockie32 { + return &Lockie32{} +} + +// Lockie32 is the 32-bit optimized locking mechanism +type Lockie32 struct { + // Lock state + // 0 represents an unlocked state + // 1 represents a locked state + lock int32 +} + +// Lock acquires a write-lock +func (l *Lockie32) Lock() { + // Loop until we are able to swap value of l.lock from 0 to 1 + for !atomic.CompareAndSwapInt32(&l.lock, 0, 1) { + // Allow other go routines to utilize some CPU time + runtime.Gosched() + } +} + +// Unlock releases a lock +func (l *Lockie32) Unlock() { + // Swaps the value of l.lock to 0 + atomic.StoreInt32(&l.lock, 0) +} + +// Checks to see if the current system is 64-bit +func is64bit() bool { + var i int + // If size of int is 8 bytes, we are on a 64-bit system + // Otherwise, we are on a 32-bit system (4 byte int length) + return unsafe.Sizeof(&i) == 8 +} diff --git a/lockie/lockie_test.go b/lockie/lockie_test.go new file mode 100644 index 0000000..526225a --- /dev/null +++ b/lockie/lockie_test.go @@ -0,0 +1,151 @@ +package lockie + +import ( + "sync" + "testing" +) + +var ( + mux = &TestItem{ + mux: &sync.Mutex{}, + m: make(map[string]int), + } + + rwmux = &TestItem{ + mux: &sync.RWMutex{}, + m: make(map[string]int), + } + + lkie = &TestItem{ + mux: NewLockie(), + m: make(map[string]int), + } +) + +type Locker interface { + Lock() + Unlock() +} + +type TestItem struct { + mux Locker + m map[string]int +} + +type RWMuxTestItem struct { + mux sync.RWMutex + m map[string]int +} + +func (t *TestItem) Get(k string) (v int) { + t.mux.Lock() + v = t.m[k] + t.mux.Unlock() + return v +} + +func (t *TestItem) Put(k string, v int) { + t.mux.Lock() + t.m[k] = v + t.mux.Unlock() +} + +func (t *RWMuxTestItem) Get(k string) (v int) { + t.mux.RLock() + v = t.m[k] + t.mux.RUnlock() + return v +} + +func (t *RWMuxTestItem) Put(k string, v int) { + t.mux.Lock() + t.m[k] = v + t.mux.Unlock() +} + +type TestDB interface { + Get(string) int + Put(string, int) +} + +func RBench(b *testing.B, db TestDB) { + b.SetParallelism(4) + b.RunParallel(func(p *testing.PB) { + var v int + for p.Next() { + v = db.Get("hello") + } + + if v == -1 { + return + } + }) + + b.ReportAllocs() +} + +func WBench(b *testing.B, db TestDB) { + b.SetParallelism(4) + b.RunParallel(func(p *testing.PB) { + for p.Next() { + db.Put("hello", 46) + } + }) + + b.ReportAllocs() +} + +func RWBench(b *testing.B, db TestDB) { + b.SetParallelism(4) + b.RunParallel(func(p *testing.PB) { + var v int + for p.Next() { + v = db.Get("hello") + db.Put("hello", v) + } + }) + + b.ReportAllocs() +} + +func TestMain(t *testing.T) { + mux.Put("hello", 46) + rwmux.Put("hello", 46) + lkie.Put("hello", 46) +} + +func BenchmarkMuxR(b *testing.B) { + RBench(b, mux) +} + +func BenchmarkMuxW(b *testing.B) { + WBench(b, mux) +} + +func BenchmarkMuxRW(b *testing.B) { + RWBench(b, mux) +} + +func BenchmarkRWMuxR(b *testing.B) { + RBench(b, rwmux) +} + +func BenchmarkRWMuxW(b *testing.B) { + WBench(b, rwmux) +} + +func BenchmarkRWMuxRW(b *testing.B) { + RWBench(b, rwmux) +} + +func BenchmarkLockieR(b *testing.B) { + RBench(b, lkie) +} + +func BenchmarkLockieW(b *testing.B) { + WBench(b, lkie) +} + +func BenchmarkLockieRW(b *testing.B) { + RWBench(b, lkie) +} diff --git a/primary/primary.go b/primary/primary.go new file mode 100644 index 0000000..4d09b67 --- /dev/null +++ b/primary/primary.go @@ -0,0 +1,361 @@ +package primary + +import ( + "io" + + "github.com/zenhotels/btree-2d/lockie" + "github.com/zenhotels/btree-2d/secondary" +) + +// Layer represents the primary layer, +// a tree holding Comparable keys pointing to secondary layers. +type Layer struct { + store *Tree + lock lockie.Lockie +} + +// NewLayer initializes a new primary layer handle. +func NewLayer() Layer { + return Layer{ + store: NewTree(treeCmp), + lock: lockie.NewLockie(), + } +} + +// TODO(xlab): Set may trigger finalizers? + +// Set just adds a secondary layer to the tree, overwriting the previous one. +func (l Layer) Set(k Key, layer secondary.Layer) { + l.lock.Lock() + l.store.Set(k, layer) + l.lock.Unlock() +} + +// Put adds secondary keys to the secondary layer, if it not exists +// then it will be atomically created. +func (l Layer) Put(k Key, k2 secondary.Key) { + l.lock.Lock() + l.store.Put(k, func(oldLayer secondary.Layer, exists bool) (newLayer secondary.Layer, write bool) { + if !exists { + oldLayer = secondary.NewLayer() + } + oldLayer.Set(k2) + return oldLayer, true + }) + l.lock.Unlock() +} + +// Seek returns an Enumerator positioned on a secondary layer such that k >= layer's key. +func (l Layer) Seek(k Key) (e *Enumerator, ok bool) { + l.lock.Lock() + e, ok = l.store.Seek(k) + l.lock.Unlock() + return +} + +// SeekFirst returns an Enumerator positioned on the first secondary layer in the tree. +func (l Layer) SeekFirst() (e *Enumerator, err error) { + l.lock.Lock() + e, err = l.store.SeekFirst() + l.lock.Unlock() + return +} + +// ForEach runs the provided function for every element in the layer, +// if function returns true, the loop stops. +func (l Layer) ForEach(fn func(key Key, layer secondary.Layer) bool) { + l.lock.Lock() + e, err := l.store.SeekFirst() + l.lock.Unlock() + if err != io.EOF { + k, layer, err := e.Next() + for err != io.EOF { + if stop := fn(k, layer); stop { + return + } + l.lock.Lock() + k, layer, err = e.Next() + l.lock.Unlock() + } + e.Close() + } +} + +// Drop removes the whole secondary layer associated with the key, +// invokes all the finalizers associated with elements of this secondary layer. +func (l Layer) Drop(k Key) (ok bool) { + l.lock.Lock() + v, found := l.store.Get(k) + if found { + ok = l.store.Delete(k) + } + l.lock.Unlock() + if found { + v.Finalize() + } + return +} + +// Get returns the secondary layer associated with the key. +func (l Layer) Get(k Key) (layer secondary.Layer, ok bool) { + l.lock.Lock() + v, ok := l.store.Get(k) + l.lock.Unlock() + return v, ok +} + +func (prev Layer) Sync(next Layer, onAdd, onDel func(key1 Key, key2 secondary.Key)) { + if prev.store == next.store { + return + } + // TODO(xlab): init at random versions so would not collide + // if next.store.Ver() <= prev.store.Ver() { + // return + // } + prev.lock.Lock() + next.lock.Lock() + prevIter, prevErr := prev.store.SeekFirst() + nextIter, nextErr := next.store.SeekFirst() + next.lock.Unlock() + prev.lock.Unlock() + + switch { + case prevErr == io.EOF && nextErr == io.EOF: + // do nothing, both are empty + return + case prevErr == io.EOF: + // previous storage is empty, everything is added + addAll(prev, next.lock, nextIter, onAdd) + nextIter.Close() + return + case nextErr == io.EOF: + // next storage is empty, everything is deleted + deleteAll(prev, prev.lock, prevIter, onDel) + prevIter.Close() + return + default: + // do sync and trigger the corresponding callbacks + syncAll(prev, next, prevIter, nextIter, onAdd, onDel) + prevIter.Close() + nextIter.Close() + return + } +} + +func addAll(prev Layer, nextLock lockie.Lockie, nextIter *Enumerator, + onAdd func(key1 Key, key2 secondary.Key)) { + + nextLock.Lock() + nextK, nextLayer, err := nextIter.Next() + nextLock.Unlock() + + for err != io.EOF { + if nextLayer.Len() > 0 { + // create a new layer to set into prev + newLayer := secondary.NewLayer() + + // fills layer while calling the onAdd callback + if onAdd != nil { + newLayer.Sync(nextLayer, func(k2 secondary.Key) { + onAdd(nextK, k2) + }, nil) + } else { + newLayer.Sync(nextLayer, nil, nil) + } + + // set the new layer into prev + prev.lock.Lock() + prev.store.Set(nextK, newLayer) + prev.lock.Unlock() + } + // advance next iter + nextLock.Lock() + nextK, nextLayer, err = nextIter.Next() + nextLock.Unlock() + } +} + +func deleteAll(prev Layer, prevLock lockie.Lockie, prevIter *Enumerator, + onDel func(key1 Key, key2 secondary.Key)) { + + prevLock.Lock() + prevK, prevLayer, err := prevIter.Next() + prevLock.Unlock() + + for err != io.EOF { + // nukes the prevLayer yet calling the onDel callback + if onDel != nil { + prevLayer.Sync(secondary.NewLayer(), nil, func(k2 secondary.Key) { + onDel(prevK, k2) + }) + } else { + prevLayer.Sync(secondary.NewLayer(), nil, nil) + } + + // advance next iter + prevLock.Lock() + prevK, prevLayer, err = prevIter.Next() + prevLock.Unlock() + } + // finally clear the store + prevLock.Lock() + prev.store.Clear() + prevLock.Unlock() +} + +func syncAll(prev, next Layer, prevIter, nextIter *Enumerator, + onAdd, onDel func(k1 Key, k2 secondary.Key)) { + + prev.lock.Lock() + next.lock.Lock() + prevK, prevLayer, prevErr := prevIter.Next() + nextK, nextLayer, nextErr := nextIter.Next() + next.lock.Unlock() + prev.lock.Unlock() + + for { + switch { + case prevErr == io.EOF: + if nextErr == io.EOF { + return // we're done + } + // at this point prev is ended, so nextK is added + if nextLayer.Len() > 0 { + // create a new layer to set into prev + newLayer := secondary.NewLayer() + + // fills layer while calling the onAdd callback + if onAdd != nil { + newLayer.Sync(nextLayer, func(k2 secondary.Key) { + onAdd(nextK, k2) + }, nil) + } else { + newLayer.Sync(nextLayer, nil, nil) + } + + // set the new layer into prev + prev.lock.Lock() + prev.store.Set(nextK, newLayer) + prev.lock.Unlock() + } + // move next iterator + next.lock.Lock() + nextK, nextLayer, nextErr = nextIter.Next() + next.lock.Unlock() + + case nextErr == io.EOF: + if prevErr == io.EOF { + return // we're done + } + // at this point next is ended, so prevK is deleted + if onDel != nil { + prevLayer.ForEach(func(k2 secondary.Key) bool { + if onDel != nil { + onDel(prevK, k2) + } + k2.Finalize() + return false + }) + } else { + prevLayer.Finalize() + } + // delete prevK in prev + prev.lock.Lock() + prev.store.Delete(prevK) + // move prev iterator + prevK, prevLayer, prevErr = prevIter.Next() + prev.lock.Unlock() + + case prevK.Less(nextK): + // old prevK has been deleted apparently + if onDel != nil { + prevLayer.ForEach(func(k2 secondary.Key) bool { + if onDel != nil { + onDel(prevK, k2) + } + k2.Finalize() + return false + }) + } else { + prevLayer.Finalize() + } + + // delete prevK in prev + prev.lock.Lock() + prev.store.Delete(prevK) + // move prev iterator + prevK, prevLayer, prevErr = prevIter.Next() + prev.lock.Unlock() + + case nextK.Less(prevK): + // new nextK has been inserted apparently + if nextLayer.Len() > 0 { + // create a new layer to set into prev + newLayer := secondary.NewLayer() + + // fills layer while calling the onAdd callback + if onAdd != nil { + newLayer.Sync(nextLayer, func(k2 secondary.Key) { + onAdd(nextK, k2) + }, nil) + } else { + newLayer.Sync(nextLayer, nil, nil) + } + + // set the new layer into prev + prev.lock.Lock() + prev.store.Set(nextK, newLayer) + prev.lock.Unlock() + } + // move next iterator + next.lock.Lock() + nextK, nextLayer, nextErr = nextIter.Next() + next.lock.Unlock() + + default: + // we're on the same keys, sync the layers + switch { + case onAdd != nil && onDel != nil: + prevLayer.Sync(nextLayer, func(k2 secondary.Key) { + onAdd(nextK, k2) + }, func(k2 secondary.Key) { + onDel(prevK, k2) + }) + case onAdd != nil: + prevLayer.Sync(nextLayer, func(k2 secondary.Key) { + onAdd(nextK, k2) + }, nil) + case onDel != nil: + prevLayer.Sync(nextLayer, nil, func(k2 secondary.Key) { + onDel(prevK, k2) + }) + default: // no callbacks + prevLayer.Sync(nextLayer, nil, nil) + } + + // move both iterators + prev.lock.Lock() + next.lock.Lock() + prevK, prevLayer, prevErr = prevIter.Next() + nextK, nextLayer, nextErr = nextIter.Next() + next.lock.Unlock() + prev.lock.Unlock() + } + } +} + +func (l Layer) Len() int { + l.lock.Lock() + count := l.store.Len() + l.lock.Unlock() + return count +} + +func treeCmp(k1, k2 Key) int { + if k1.Less(k2) { + return -1 + } else if k2.Less(k1) { + return 1 + } + return 0 +} diff --git a/primary/primary_btree.go b/primary/primary_btree.go new file mode 100644 index 0000000..84a1507 --- /dev/null +++ b/primary/primary_btree.go @@ -0,0 +1,936 @@ +// btree.go Copyright 2014 The b Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This file has been generated by http://github.com/cznic/b + +package primary + +import ( + "fmt" + "io" + "sync" + "sync/atomic" + + "github.com/zenhotels/btree-2d/secondary" +) + +const ( + kx = 64 //TODO benchmark tune this number if using custom key/value type(s). + kd = 64 //TODO benchmark tune this number if using custom key/value type(s). +) + +func init() { + if kd < 1 { + panic(fmt.Errorf("kd %d: out of range", kd)) + } + + if kx < 2 { + panic(fmt.Errorf("kx %d: out of range", kx)) + } +} + +var ( + btDPool = sync.Pool{New: func() interface{} { return &d{} }} + btEPool = btEpool{sync.Pool{New: func() interface{} { return &Enumerator{} }}} + btTPool = btTpool{sync.Pool{New: func() interface{} { return &Tree{} }}} + btXPool = sync.Pool{New: func() interface{} { return &x{} }} +) + +type btTpool struct{ sync.Pool } + +func (p *btTpool) get(cmp Cmp) *Tree { + x := p.Get().(*Tree) + x.cmp = cmp + return x +} + +type btEpool struct{ sync.Pool } + +func (p *btEpool) get(err error, hit bool, i int, k Key, q *d, t *Tree, ver uint64) *Enumerator { + x := p.Get().(*Enumerator) + x.err, x.hit, x.i, x.k, x.q, x.t, x.ver = err, hit, i, k, q, t, ver + return x +} + +type ( + // Cmp compares a and b. Return value is: + // + // < 0 if a < b + // 0 if a == b + // > 0 if a > b + // + Cmp func(a, b Key) int + + d struct { // data page + c int + d [2*kd + 1]de + n *d + p *d + } + + de struct { // d element + k Key + v secondary.Layer + } + + // Enumerator captures the state of enumerating a tree. It is returned + // from the Seek* methods. The enumerator is aware of any mutations + // made to the tree in the process of enumerating it and automatically + // resumes the enumeration at the proper key, if possible. + // + // However, once an Enumerator returns io.EOF to signal "no more + // items", it does no more attempt to "resync" on tree mutation(s). In + // other words, io.EOF from an Enumerator is "sticky" (idempotent). + Enumerator struct { + err error + hit bool + i int + k Key + q *d + t *Tree + ver uint64 + } + + // Tree is a B+tree. + Tree struct { + c int + cmp Cmp + first *d + last *d + r interface{} + ver uint64 + } + + xe struct { // x element + ch interface{} + k Key + } + + x struct { // index page + c int + x [2*kx + 2]xe + } +) + +var ( // R/O zero values + zd d + zde de + ze Enumerator + zk Key + zt Tree + zx x + zxe xe +) + +func clr(q interface{}) { + switch x := q.(type) { + case *x: + for i := 0; i <= x.c; i++ { // Ch0 Sep0 ... Chn-1 Sepn-1 Chn + clr(x.x[i].ch) + } + *x = zx + btXPool.Put(x) + case *d: + *x = zd + btDPool.Put(x) + } +} + +// -------------------------------------------------------------------------- x + +func newX(ch0 interface{}) *x { + r := btXPool.Get().(*x) + r.x[0].ch = ch0 + return r +} + +func (q *x) extract(i int) { + q.c-- + if i < q.c { + copy(q.x[i:], q.x[i+1:q.c+1]) + q.x[q.c].ch = q.x[q.c+1].ch + q.x[q.c].k = zk // GC + q.x[q.c+1] = zxe // GC + } +} + +func (q *x) insert(i int, k Key, ch interface{}) *x { + c := q.c + if i < c { + q.x[c+1].ch = q.x[c].ch + copy(q.x[i+2:], q.x[i+1:c]) + q.x[i+1].k = q.x[i].k + } + c++ + q.c = c + q.x[i].k = k + q.x[i+1].ch = ch + return q +} + +func (q *x) siblings(i int) (l, r *d) { + if i >= 0 { + if i > 0 { + l = q.x[i-1].ch.(*d) + } + if i < q.c { + r = q.x[i+1].ch.(*d) + } + } + return +} + +// -------------------------------------------------------------------------- d + +func (l *d) mvL(r *d, c int) { + copy(l.d[l.c:], r.d[:c]) + copy(r.d[:], r.d[c:r.c]) + l.c += c + r.c -= c +} + +func (l *d) mvR(r *d, c int) { + copy(r.d[c:], r.d[:r.c]) + copy(r.d[:c], l.d[l.c-c:]) + r.c += c + l.c -= c +} + +// ----------------------------------------------------------------------- Tree + +// NewTree returns a newly created, empty Tree. The compare function is used +// for key collation. +func NewTree(cmp Cmp) *Tree { + return btTPool.get(cmp) +} + +// Clear removes all K/V pairs from the tree. +func (t *Tree) Clear() { + if t.r == nil { + return + } + + clr(t.r) + t.c, t.first, t.last, t.r = 0, nil, nil, nil + atomic.AddUint64(&t.ver, 1) +} + +// Close performs Clear and recycles t to a pool for possible later reuse. No +// references to t should exist or such references must not be used afterwards. +func (t *Tree) Close() { + t.Clear() + *t = zt + btTPool.Put(t) +} + +func (t *Tree) cat(p *x, q, r *d, pi int) { + atomic.AddUint64(&t.ver, 1) + q.mvL(r, r.c) + if r.n != nil { + r.n.p = q + } else { + t.last = q + } + q.n = r.n + *r = zd + btDPool.Put(r) + if p.c > 1 { + p.extract(pi) + p.x[pi].ch = q + return + } + + switch x := t.r.(type) { + case *x: + *x = zx + btXPool.Put(x) + case *d: + *x = zd + btDPool.Put(x) + } + t.r = q +} + +func (t *Tree) catX(p, q, r *x, pi int) { + atomic.AddUint64(&t.ver, 1) + q.x[q.c].k = p.x[pi].k + copy(q.x[q.c+1:], r.x[:r.c]) + q.c += r.c + 1 + q.x[q.c].ch = r.x[r.c].ch + *r = zx + btXPool.Put(r) + if p.c > 1 { + p.c-- + pc := p.c + if pi < pc { + p.x[pi].k = p.x[pi+1].k + copy(p.x[pi+1:], p.x[pi+2:pc+1]) + p.x[pc].ch = p.x[pc+1].ch + p.x[pc].k = zk // GC + p.x[pc+1].ch = nil // GC + } + return + } + + switch x := t.r.(type) { + case *x: + *x = zx + btXPool.Put(x) + case *d: + *x = zd + btDPool.Put(x) + } + t.r = q +} + +// Delete removes the k's KV pair, if it exists, in which case Delete returns +// true. +func (t *Tree) Delete(k Key) (ok bool) { + pi := -1 + var p *x + q := t.r + if q == nil { + return false + } + + for { + var i int + i, ok = t.find(q, k) + if ok { + switch x := q.(type) { + case *x: + if x.c < kx && q != t.r { + x, i = t.underflowX(p, x, pi, i) + } + pi = i + 1 + p = x + q = x.x[pi].ch + ok = false + continue + case *d: + t.extract(x, i) + if x.c >= kd { + return true + } + + if q != t.r { + t.underflow(p, x, pi) + } else if t.c == 0 { + t.Clear() + } + return true + } + } + + switch x := q.(type) { + case *x: + if x.c < kx && q != t.r { + x, i = t.underflowX(p, x, pi, i) + } + pi = i + p = x + q = x.x[i].ch + case *d: + return false + } + } +} + +func (t *Tree) extract(q *d, i int) { // (r secondary.Layer) { + atomic.AddUint64(&t.ver, 1) + //r = q.d[i].v // prepared for Extract + q.c-- + if i < q.c { + copy(q.d[i:], q.d[i+1:q.c+1]) + } + q.d[q.c] = zde // GC + t.c-- + return +} + +func (t *Tree) find(q interface{}, k Key) (i int, ok bool) { + var mk Key + l := 0 + switch x := q.(type) { + case *x: + h := x.c - 1 + for l <= h { + m := (l + h) >> 1 + mk = x.x[m].k + switch cmp := t.cmp(k, mk); { + case cmp > 0: + l = m + 1 + case cmp == 0: + return m, true + default: + h = m - 1 + } + } + case *d: + h := x.c - 1 + for l <= h { + m := (l + h) >> 1 + mk = x.d[m].k + switch cmp := t.cmp(k, mk); { + case cmp > 0: + l = m + 1 + case cmp == 0: + return m, true + default: + h = m - 1 + } + } + } + return l, false +} + +// First returns the first item of the tree in the key collating order, or +// (zero-value, zero-value) if the tree is empty. +func (t *Tree) First() (k Key, v secondary.Layer) { + if q := t.first; q != nil { + q := &q.d[0] + k, v = q.k, q.v + } + return +} + +// Get returns the value associated with k and true if it exists. Otherwise Get +// returns (zero-value, false). +func (t *Tree) Get(k Key) (v secondary.Layer, ok bool) { + q := t.r + if q == nil { + return + } + + for { + var i int + if i, ok = t.find(q, k); ok { + switch x := q.(type) { + case *x: + q = x.x[i+1].ch + continue + case *d: + return x.d[i].v, true + } + } + switch x := q.(type) { + case *x: + q = x.x[i].ch + default: + return + } + } +} + +func (t *Tree) insert(q *d, i int, k Key, v secondary.Layer) *d { + atomic.AddUint64(&t.ver, 1) + c := q.c + if i < c { + copy(q.d[i+1:], q.d[i:c]) + } + c++ + q.c = c + q.d[i].k, q.d[i].v = k, v + t.c++ + return q +} + +// Last returns the last item of the tree in the key collating order, or +// (zero-value, zero-value) if the tree is empty. +func (t *Tree) Last() (k Key, v secondary.Layer) { + if q := t.last; q != nil { + q := &q.d[q.c-1] + k, v = q.k, q.v + } + return +} + +// Len returns the number of items in the tree. +func (t *Tree) Len() int { + return t.c +} + +func (t *Tree) overflow(p *x, q *d, pi, i int, k Key, v secondary.Layer) { + atomic.AddUint64(&t.ver, 1) + l, r := p.siblings(pi) + + if l != nil && l.c < 2*kd && i != 0 { + l.mvL(q, 1) + t.insert(q, i-1, k, v) + p.x[pi-1].k = q.d[0].k + return + } + + if r != nil && r.c < 2*kd { + if i < 2*kd { + q.mvR(r, 1) + t.insert(q, i, k, v) + p.x[pi].k = r.d[0].k + return + } + + t.insert(r, 0, k, v) + p.x[pi].k = k + return + } + + t.split(p, q, pi, i, k, v) +} + +// Seek returns an Enumerator positioned on an item such that k >= item's key. +// ok reports if k == item.key The Enumerator's position is possibly after the +// last item in the tree. +func (t *Tree) Seek(k Key) (e *Enumerator, ok bool) { + q := t.r + if q == nil { + e = btEPool.get(nil, false, 0, k, nil, t, atomic.LoadUint64(&t.ver)) + return + } + + for { + var i int + if i, ok = t.find(q, k); ok { + switch x := q.(type) { + case *x: + q = x.x[i+1].ch + continue + case *d: + return btEPool.get(nil, ok, i, k, x, t, atomic.LoadUint64(&t.ver)), true + } + } + + switch x := q.(type) { + case *x: + q = x.x[i].ch + case *d: + return btEPool.get(nil, ok, i, k, x, t, atomic.LoadUint64(&t.ver)), false + } + } +} + +// SeekFirst returns an enumerator positioned on the first KV pair in the tree, +// if any. For an empty tree, err == io.EOF is returned and e will be nil. +func (t *Tree) SeekFirst() (e *Enumerator, err error) { + q := t.first + if q == nil { + return nil, io.EOF + } + + return btEPool.get(nil, true, 0, q.d[0].k, q, t, atomic.LoadUint64(&t.ver)), nil +} + +// SeekLast returns an enumerator positioned on the last KV pair in the tree, +// if any. For an empty tree, err == io.EOF is returned and e will be nil. +func (t *Tree) SeekLast() (e *Enumerator, err error) { + q := t.last + if q == nil { + return nil, io.EOF + } + + return btEPool.get(nil, true, q.c-1, q.d[q.c-1].k, q, t, atomic.LoadUint64(&t.ver)), nil +} + +// Set sets the value associated with k. +func (t *Tree) Set(k Key, v secondary.Layer) { + //dbg("--- PRE Set(%v, %v)\n%s", k, v, t.dump()) + //defer func() { + // dbg("--- POST\n%s\n====\n", t.dump()) + //}() + + pi := -1 + var p *x + q := t.r + if q == nil { + z := t.insert(btDPool.Get().(*d), 0, k, v) + t.r, t.first, t.last = z, z, z + return + } + + for { + i, ok := t.find(q, k) + if ok { + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + 1 + p = x + q = x.x[i+1].ch + continue + case *d: + x.d[i].v = v + } + return + } + + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + p = x + q = x.x[i].ch + case *d: + switch { + case x.c < 2*kd: + t.insert(x, i, k, v) + default: + t.overflow(p, x, pi, i, k, v) + } + return + } + } +} + +// Put combines Get and Set in a more efficient way where the tree is walked +// only once. The upd(ater) receives (old-value, true) if a KV pair for k +// exists or (zero-value, false) otherwise. It can then return a (new-value, +// true) to create or overwrite the existing value in the KV pair, or +// (whatever, false) if it decides not to create or not to update the value of +// the KV pair. +// +// tree.Set(k, v) call conceptually equals calling +// +// tree.Put(k, func(Key, bool){ return v, true }) +// +// modulo the differing return values. +func (t *Tree) Put(k Key, upd func(oldV secondary.Layer, exists bool) (newV secondary.Layer, write bool)) (oldV secondary.Layer, written bool) { + pi := -1 + var p *x + q := t.r + var newV secondary.Layer + if q == nil { + // new KV pair in empty tree + newV, written = upd(newV, false) + if !written { + return + } + + z := t.insert(btDPool.Get().(*d), 0, k, newV) + t.r, t.first, t.last = z, z, z + return + } + + for { + i, ok := t.find(q, k) + if ok { + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + 1 + p = x + q = x.x[i+1].ch + continue + case *d: + oldV = x.d[i].v + newV, written = upd(oldV, true) + if !written { + return + } + + x.d[i].v = newV + } + return + } + + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + p = x + q = x.x[i].ch + case *d: // new KV pair + newV, written = upd(newV, false) + if !written { + return + } + + switch { + case x.c < 2*kd: + t.insert(x, i, k, newV) + default: + t.overflow(p, x, pi, i, k, newV) + } + return + } + } +} + +func (t *Tree) split(p *x, q *d, pi, i int, k Key, v secondary.Layer) { + atomic.AddUint64(&t.ver, 1) + r := btDPool.Get().(*d) + if q.n != nil { + r.n = q.n + r.n.p = r + } else { + t.last = r + } + q.n = r + r.p = q + + copy(r.d[:], q.d[kd:2*kd]) + for i := range q.d[kd:] { + q.d[kd+i] = zde + } + q.c = kd + r.c = kd + var done bool + if i > kd { + done = true + t.insert(r, i-kd, k, v) + } + if pi >= 0 { + p.insert(pi, r.d[0].k, r) + } else { + t.r = newX(q).insert(0, r.d[0].k, r) + } + if done { + return + } + + t.insert(q, i, k, v) +} + +func (t *Tree) splitX(p *x, q *x, pi int, i int) (*x, int) { + atomic.AddUint64(&t.ver, 1) + r := btXPool.Get().(*x) + copy(r.x[:], q.x[kx+1:]) + q.c = kx + r.c = kx + if pi >= 0 { + p.insert(pi, q.x[kx].k, r) + q.x[kx].k = zk + for i := range q.x[kx+1:] { + q.x[kx+i+1] = zxe + } + + switch { + case i < kx: + return q, i + case i == kx: + return p, pi + default: // i > kx + return r, i - kx - 1 + } + } + + nr := newX(q).insert(0, q.x[kx].k, r) + t.r = nr + q.x[kx].k = zk + for i := range q.x[kx+1:] { + q.x[kx+i+1] = zxe + } + + switch { + case i < kx: + return q, i + case i == kx: + return nr, 0 + default: // i > kx + return r, i - kx - 1 + } +} + +func (t *Tree) underflow(p *x, q *d, pi int) { + atomic.AddUint64(&t.ver, 1) + l, r := p.siblings(pi) + + if l != nil && l.c+q.c >= 2*kd { + l.mvR(q, 1) + p.x[pi-1].k = q.d[0].k + return + } + + if r != nil && q.c+r.c >= 2*kd { + q.mvL(r, 1) + p.x[pi].k = r.d[0].k + r.d[r.c] = zde // GC + return + } + + if l != nil { + t.cat(p, l, q, pi-1) + return + } + + t.cat(p, q, r, pi) +} + +func (t *Tree) underflowX(p *x, q *x, pi int, i int) (*x, int) { + atomic.AddUint64(&t.ver, 1) + var l, r *x + + if pi >= 0 { + if pi > 0 { + l = p.x[pi-1].ch.(*x) + } + if pi < p.c { + r = p.x[pi+1].ch.(*x) + } + } + + if l != nil && l.c > kx { + q.x[q.c+1].ch = q.x[q.c].ch + copy(q.x[1:], q.x[:q.c]) + q.x[0].ch = l.x[l.c].ch + q.x[0].k = p.x[pi-1].k + q.c++ + i++ + l.c-- + p.x[pi-1].k = l.x[l.c].k + return q, i + } + + if r != nil && r.c > kx { + q.x[q.c].k = p.x[pi].k + q.c++ + q.x[q.c].ch = r.x[0].ch + p.x[pi].k = r.x[0].k + copy(r.x[:], r.x[1:r.c]) + r.c-- + rc := r.c + r.x[rc].ch = r.x[rc+1].ch + r.x[rc].k = zk + r.x[rc+1].ch = nil + return q, i + } + + if l != nil { + i += l.c + 1 + t.catX(p, l, q, pi-1) + q = l + return q, i + } + + t.catX(p, q, r, pi) + return q, i +} + +// ----------------------------------------------------------------- Enumerator + +// Close recycles e to a pool for possible later reuse. No references to e +// should exist or such references must not be used afterwards. +func (e *Enumerator) Close() { + *e = ze + btEPool.Put(e) +} + +// Next returns the currently enumerated item, if it exists and moves to the +// next item in the key collation order. If there is no item to return, err == +// io.EOF is returned. +func (e *Enumerator) Next() (k Key, v secondary.Layer, err error) { + if err = e.err; err != nil { + return + } + if atomic.LoadUint64(&e.ver) != atomic.LoadUint64(&e.t.ver) { + f, hit := e.t.Seek(e.k) + if !e.hit && hit { + if err = f.next(); err != nil { + return + } + } + + *e = *f + f.Close() + } + if e.q == nil { + e.err, err = io.EOF, io.EOF + return + } + + if e.i >= e.q.c { + if err = e.next(); err != nil { + return + } + } + + i := e.q.d[e.i] + k, v = i.k, i.v + e.k, e.hit = k, false + e.next() + return +} + +func (e *Enumerator) next() error { + if e.q == nil { + e.err = io.EOF + return io.EOF + } + + switch { + case e.i < e.q.c-1: + e.i++ + default: + if e.q, e.i = e.q.n, 0; e.q == nil { + e.err = io.EOF + } + } + return e.err +} + +// Prev returns the currently enumerated item, if it exists and moves to the +// previous item in the key collation order. If there is no item to return, err +// == io.EOF is returned. +func (e *Enumerator) Prev() (k Key, v secondary.Layer, err error) { + if err = e.err; err != nil { + return + } + if atomic.LoadUint64(&e.ver) != atomic.LoadUint64(&e.t.ver) { + f, hit := e.t.Seek(e.k) + if !e.hit && hit { + if err = f.prev(); err != nil { + return + } + } + + *e = *f + f.Close() + } + if e.q == nil { + e.err, err = io.EOF, io.EOF + return + } + + if e.i >= e.q.c { + if err = e.next(); err != nil { + return + } + } + + i := e.q.d[e.i] + k, v = i.k, i.v + e.k, e.hit = k, false + e.prev() + return +} + +func (e *Enumerator) prev() error { + if e.q == nil { + e.err = io.EOF + return io.EOF + } + + switch { + case e.i > 0: + e.i-- + default: + if e.q = e.q.p; e.q == nil { + e.err = io.EOF + break + } + + e.i = e.q.c - 1 + } + return e.err +} + +func (t *Tree) Ver() uint64 { + return atomic.LoadUint64(&t.ver) +} diff --git a/primary/primary_key.go b/primary/primary_key.go new file mode 100644 index 0000000..ba3fd59 --- /dev/null +++ b/primary/primary_key.go @@ -0,0 +1,11 @@ +package primary + +import "github.com/zenhotels/btree-2d/common" + +type Key struct { + Value common.Comparable +} + +func (p Key) Less(p2 Key) bool { + return p.Value.Less(p2.Value) +} diff --git a/secondary/secondary.go b/secondary/secondary.go new file mode 100644 index 0000000..d017490 --- /dev/null +++ b/secondary/secondary.go @@ -0,0 +1,276 @@ +package secondary + +import ( + "io" + + "github.com/zenhotels/btree-2d/lockie" +) + +// Layer represents the secondary layer, +// a tree holding Finalizable yet Comparable keys. +type Layer struct { + store *Tree + lock lockie.Lockie +} + +// NewLayer initializes a new secondary layer handle. +func NewLayer() Layer { + return Layer{ + store: NewTree(treeCmp), + lock: lockie.NewLockie(), + } +} + +// TODO(xlab): Set may trigger finalizers? + +// Set just adds a key to the tree, overwriting the previous one. +func (l Layer) Set(k Key) { + l.lock.Lock() + l.store.Set(k, k) + l.lock.Unlock() +} + +// ForEach runs the provided function for every element in the layer, +// if function returns true, the loop stops. +func (l Layer) ForEach(fn func(key Key) bool) { + l.lock.Lock() + e, err := l.store.SeekFirst() + l.lock.Unlock() + if err != io.EOF { + k, _, err := e.Next() + for err != io.EOF { + if stop := fn(k); stop { + return + } + l.lock.Lock() + k, _, err = e.Next() + l.lock.Unlock() + } + e.Close() + } +} + +// Seek returns an Enumerator positioned on a key such that k >= key. +func (l Layer) Seek(k Key) (e *Enumerator, ok bool) { + l.lock.Lock() + e, ok = l.store.Seek(k) + l.lock.Unlock() + return +} + +// SeekFirst returns an Enumerator positioned on the first key in the tree. +func (l Layer) SeekFirst() (e *Enumerator, err error) { + l.lock.Lock() + e, err = l.store.SeekFirst() + l.lock.Unlock() + return +} + +// Delete removes the key and runs all its finalizers. +func (l Layer) Delete(k Key) (ok bool) { + l.lock.Lock() + v, found := l.store.Get(k) + if found { + ok = l.store.Delete(k) + } + l.lock.Unlock() + if found { + v.Finalize() + } + return +} + +// Finalize locks the layer and runs finalizers of all the keys +// from this layer. Call this if you're going to drop an entire layer. +func (l Layer) Finalize() { + l.lock.Lock() + e, err := l.store.SeekFirst() + if err != io.EOF { + k, _, err := e.Next() + for err != io.EOF { + k.Finalize() + k, _, err = e.Next() + } + e.Close() + } + l.lock.Unlock() +} + +func (prev Layer) Sync(next Layer, onAdd, onDel func(key Key)) { + if prev.store == next.store { + return + } + // TODO(xlab): init at random versions so would not collide + // if next.store.Ver() <= prev.store.Ver() { + // return + // } + prev.lock.Lock() + next.lock.Lock() + prevIter, prevErr := prev.store.SeekFirst() + nextIter, nextErr := next.store.SeekFirst() + next.lock.Unlock() + prev.lock.Unlock() + + switch { + case prevErr == io.EOF && nextErr == io.EOF: + // do nothing, both are empty + return + case prevErr == io.EOF: + // previous storage is empty, everything is added + addAll(prev, next.lock, nextIter, onAdd) + nextIter.Close() + return + case nextErr == io.EOF: + // next storage is empty, everything is deleted + deleteAll(prev, prev.lock, prevIter, onDel) + prevIter.Close() + return + default: + // do sync and trigger the corresponding callbacks + syncAll(prev, next, prevIter, nextIter, onAdd, onDel) + prevIter.Close() + nextIter.Close() + return + } +} + +func addAll(prev Layer, nextLock lockie.Lockie, nextIter *Enumerator, onAdd func(k Key)) { + nextLock.Lock() + k, _, err := nextIter.Next() + nextLock.Unlock() + + for err != io.EOF { + prev.lock.Lock() + prev.store.Set(k, k) + prev.lock.Unlock() + if onAdd != nil { + onAdd(k) + } + nextLock.Lock() + k, _, err = nextIter.Next() + nextLock.Unlock() + } +} + +func deleteAll(prev Layer, prevLock lockie.Lockie, prevIter *Enumerator, onDel func(k Key)) { + prevLock.Lock() + k, _, err := prevIter.Next() + prevLock.Unlock() + + for err != io.EOF { + if onDel != nil { + onDel(k) // run the callback + } + k.Finalize() // emit the finalizers + + prevLock.Lock() + k, _, err = prevIter.Next() + prevLock.Unlock() + } + // finally clear the store + prevLock.Lock() + prev.store.Clear() + prevLock.Unlock() +} + +func syncAll(prev, next Layer, prevIter, nextIter *Enumerator, onAdd, onDel func(k Key)) { + prev.lock.Lock() + next.lock.Lock() + prevK, _, prevErr := prevIter.Next() + nextK, _, nextErr := nextIter.Next() + next.lock.Unlock() + prev.lock.Unlock() + + for { + switch { + case prevErr == io.EOF: + if nextErr == io.EOF { + return // we're done + } + // at this point prev is ended, so nextK is added + if onAdd != nil { + onAdd(nextK) + } + + // set nextK into prev + prev.lock.Lock() + prev.store.Set(nextK, nextK) + prev.lock.Unlock() + // move next iterator + next.lock.Lock() + nextK, _, nextErr = nextIter.Next() + next.lock.Unlock() + + case nextErr == io.EOF: + if prevErr == io.EOF { + return // we're done + } + // at this point next is ended, so prevK is deleted + if onDel != nil { + onDel(prevK) + } + prevK.Finalize() + + // delete prevK in prev + prev.lock.Lock() + prev.store.Delete(prevK) + // move prev iterator + prevK, _, prevErr = prevIter.Next() + prev.lock.Unlock() + + case prevK.Less(nextK): + // old prevK has been deleted apparently + if onDel != nil { + onDel(prevK) + } + prevK.Finalize() + + // delete prevK in prev + prev.lock.Lock() + prev.store.Delete(prevK) + // move prev iterator + prevK, _, prevErr = prevIter.Next() + prev.lock.Unlock() + + case nextK.Less(prevK): + // new nextK has been inserted apparently + if onAdd != nil { + onAdd(nextK) + } + + // set nextK into prev + prev.lock.Lock() + prev.store.Set(nextK, nextK) + prev.lock.Unlock() + // move next iterator + next.lock.Lock() + nextK, _, nextErr = nextIter.Next() + next.lock.Unlock() + + default: + // we're on the same keys, move both iterators + prev.lock.Lock() + next.lock.Lock() + prevK, _, prevErr = prevIter.Next() + nextK, _, nextErr = nextIter.Next() + next.lock.Unlock() + prev.lock.Unlock() + } + } +} + +func (l Layer) Len() int { + l.lock.Lock() + count := l.store.Len() + l.lock.Unlock() + return count +} + +func treeCmp(k1, k2 Key) int { + if k1.Less(k2) { + return -1 + } else if k2.Less(k1) { + return 1 + } + return 0 +} diff --git a/secondary/secondary_btree.go b/secondary/secondary_btree.go new file mode 100644 index 0000000..ff9c29d --- /dev/null +++ b/secondary/secondary_btree.go @@ -0,0 +1,934 @@ +// btree.go Copyright 2014 The b Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This file has been generated by http://github.com/cznic/b + +package secondary + +import ( + "fmt" + "io" + "sync" + "sync/atomic" +) + +const ( + kx = 64 //TODO benchmark tune this number if using custom key/value type(s). + kd = 64 //TODO benchmark tune this number if using custom key/value type(s). +) + +func init() { + if kd < 1 { + panic(fmt.Errorf("kd %d: out of range", kd)) + } + + if kx < 2 { + panic(fmt.Errorf("kx %d: out of range", kx)) + } +} + +var ( + btDPool = sync.Pool{New: func() interface{} { return &d{} }} + btEPool = btEpool{sync.Pool{New: func() interface{} { return &Enumerator{} }}} + btTPool = btTpool{sync.Pool{New: func() interface{} { return &Tree{} }}} + btXPool = sync.Pool{New: func() interface{} { return &x{} }} +) + +type btTpool struct{ sync.Pool } + +func (p *btTpool) get(cmp Cmp) *Tree { + x := p.Get().(*Tree) + x.cmp = cmp + return x +} + +type btEpool struct{ sync.Pool } + +func (p *btEpool) get(err error, hit bool, i int, k Key, q *d, t *Tree, ver uint64) *Enumerator { + x := p.Get().(*Enumerator) + x.err, x.hit, x.i, x.k, x.q, x.t, x.ver = err, hit, i, k, q, t, ver + return x +} + +type ( + // Cmp compares a and b. Return value is: + // + // < 0 if a < b + // 0 if a == b + // > 0 if a > b + // + Cmp func(a, b Key) int + + d struct { // data page + c int + d [2*kd + 1]de + n *d + p *d + } + + de struct { // d element + k Key + v Key + } + + // Enumerator captures the state of enumerating a tree. It is returned + // from the Seek* methods. The enumerator is aware of any mutations + // made to the tree in the process of enumerating it and automatically + // resumes the enumeration at the proper key, if possible. + // + // However, once an Enumerator returns io.EOF to signal "no more + // items", it does no more attempt to "resync" on tree mutation(s). In + // other words, io.EOF from an Enumerator is "sticky" (idempotent). + Enumerator struct { + err error + hit bool + i int + k Key + q *d + t *Tree + ver uint64 + } + + // Tree is a B+tree. + Tree struct { + c int + cmp Cmp + first *d + last *d + r interface{} + ver uint64 + } + + xe struct { // x element + ch interface{} + k Key + } + + x struct { // index page + c int + x [2*kx + 2]xe + } +) + +var ( // R/O zero values + zd d + zde de + ze Enumerator + zk Key + zt Tree + zx x + zxe xe +) + +func clr(q interface{}) { + switch x := q.(type) { + case *x: + for i := 0; i <= x.c; i++ { // Ch0 Sep0 ... Chn-1 Sepn-1 Chn + clr(x.x[i].ch) + } + *x = zx + btXPool.Put(x) + case *d: + *x = zd + btDPool.Put(x) + } +} + +// -------------------------------------------------------------------------- x + +func newX(ch0 interface{}) *x { + r := btXPool.Get().(*x) + r.x[0].ch = ch0 + return r +} + +func (q *x) extract(i int) { + q.c-- + if i < q.c { + copy(q.x[i:], q.x[i+1:q.c+1]) + q.x[q.c].ch = q.x[q.c+1].ch + q.x[q.c].k = zk // GC + q.x[q.c+1] = zxe // GC + } +} + +func (q *x) insert(i int, k Key, ch interface{}) *x { + c := q.c + if i < c { + q.x[c+1].ch = q.x[c].ch + copy(q.x[i+2:], q.x[i+1:c]) + q.x[i+1].k = q.x[i].k + } + c++ + q.c = c + q.x[i].k = k + q.x[i+1].ch = ch + return q +} + +func (q *x) siblings(i int) (l, r *d) { + if i >= 0 { + if i > 0 { + l = q.x[i-1].ch.(*d) + } + if i < q.c { + r = q.x[i+1].ch.(*d) + } + } + return +} + +// -------------------------------------------------------------------------- d + +func (l *d) mvL(r *d, c int) { + copy(l.d[l.c:], r.d[:c]) + copy(r.d[:], r.d[c:r.c]) + l.c += c + r.c -= c +} + +func (l *d) mvR(r *d, c int) { + copy(r.d[c:], r.d[:r.c]) + copy(r.d[:c], l.d[l.c-c:]) + r.c += c + l.c -= c +} + +// ----------------------------------------------------------------------- Tree + +// NewTree returns a newly created, empty Tree. The compare function is used +// for key collation. +func NewTree(cmp Cmp) *Tree { + return btTPool.get(cmp) +} + +// Clear removes all K/V pairs from the tree. +func (t *Tree) Clear() { + if t.r == nil { + return + } + + clr(t.r) + t.c, t.first, t.last, t.r = 0, nil, nil, nil + atomic.AddUint64(&t.ver, 1) +} + +// Close performs Clear and recycles t to a pool for possible later reuse. No +// references to t should exist or such references must not be used afterwards. +func (t *Tree) Close() { + t.Clear() + *t = zt + btTPool.Put(t) +} + +func (t *Tree) cat(p *x, q, r *d, pi int) { + atomic.AddUint64(&t.ver, 1) + q.mvL(r, r.c) + if r.n != nil { + r.n.p = q + } else { + t.last = q + } + q.n = r.n + *r = zd + btDPool.Put(r) + if p.c > 1 { + p.extract(pi) + p.x[pi].ch = q + return + } + + switch x := t.r.(type) { + case *x: + *x = zx + btXPool.Put(x) + case *d: + *x = zd + btDPool.Put(x) + } + t.r = q +} + +func (t *Tree) catX(p, q, r *x, pi int) { + atomic.AddUint64(&t.ver, 1) + q.x[q.c].k = p.x[pi].k + copy(q.x[q.c+1:], r.x[:r.c]) + q.c += r.c + 1 + q.x[q.c].ch = r.x[r.c].ch + *r = zx + btXPool.Put(r) + if p.c > 1 { + p.c-- + pc := p.c + if pi < pc { + p.x[pi].k = p.x[pi+1].k + copy(p.x[pi+1:], p.x[pi+2:pc+1]) + p.x[pc].ch = p.x[pc+1].ch + p.x[pc].k = zk // GC + p.x[pc+1].ch = nil // GC + } + return + } + + switch x := t.r.(type) { + case *x: + *x = zx + btXPool.Put(x) + case *d: + *x = zd + btDPool.Put(x) + } + t.r = q +} + +// Delete removes the k's KV pair, if it exists, in which case Delete returns +// true. +func (t *Tree) Delete(k Key) (ok bool) { + pi := -1 + var p *x + q := t.r + if q == nil { + return false + } + + for { + var i int + i, ok = t.find(q, k) + if ok { + switch x := q.(type) { + case *x: + if x.c < kx && q != t.r { + x, i = t.underflowX(p, x, pi, i) + } + pi = i + 1 + p = x + q = x.x[pi].ch + ok = false + continue + case *d: + t.extract(x, i) + if x.c >= kd { + return true + } + + if q != t.r { + t.underflow(p, x, pi) + } else if t.c == 0 { + t.Clear() + } + return true + } + } + + switch x := q.(type) { + case *x: + if x.c < kx && q != t.r { + x, i = t.underflowX(p, x, pi, i) + } + pi = i + p = x + q = x.x[i].ch + case *d: + return false + } + } +} + +func (t *Tree) extract(q *d, i int) { // (r Key) { + atomic.AddUint64(&t.ver, 1) + //r = q.d[i].v // prepared for Extract + q.c-- + if i < q.c { + copy(q.d[i:], q.d[i+1:q.c+1]) + } + q.d[q.c] = zde // GC + t.c-- + return +} + +func (t *Tree) find(q interface{}, k Key) (i int, ok bool) { + var mk Key + l := 0 + switch x := q.(type) { + case *x: + h := x.c - 1 + for l <= h { + m := (l + h) >> 1 + mk = x.x[m].k + switch cmp := t.cmp(k, mk); { + case cmp > 0: + l = m + 1 + case cmp == 0: + return m, true + default: + h = m - 1 + } + } + case *d: + h := x.c - 1 + for l <= h { + m := (l + h) >> 1 + mk = x.d[m].k + switch cmp := t.cmp(k, mk); { + case cmp > 0: + l = m + 1 + case cmp == 0: + return m, true + default: + h = m - 1 + } + } + } + return l, false +} + +// First returns the first item of the tree in the key collating order, or +// (zero-value, zero-value) if the tree is empty. +func (t *Tree) First() (k Key, v Key) { + if q := t.first; q != nil { + q := &q.d[0] + k, v = q.k, q.v + } + return +} + +// Get returns the value associated with k and true if it exists. Otherwise Get +// returns (zero-value, false). +func (t *Tree) Get(k Key) (v Key, ok bool) { + q := t.r + if q == nil { + return + } + + for { + var i int + if i, ok = t.find(q, k); ok { + switch x := q.(type) { + case *x: + q = x.x[i+1].ch + continue + case *d: + return x.d[i].v, true + } + } + switch x := q.(type) { + case *x: + q = x.x[i].ch + default: + return + } + } +} + +func (t *Tree) insert(q *d, i int, k Key, v Key) *d { + atomic.AddUint64(&t.ver, 1) + c := q.c + if i < c { + copy(q.d[i+1:], q.d[i:c]) + } + c++ + q.c = c + q.d[i].k, q.d[i].v = k, v + t.c++ + return q +} + +// Last returns the last item of the tree in the key collating order, or +// (zero-value, zero-value) if the tree is empty. +func (t *Tree) Last() (k Key, v Key) { + if q := t.last; q != nil { + q := &q.d[q.c-1] + k, v = q.k, q.v + } + return +} + +// Len returns the number of items in the tree. +func (t *Tree) Len() int { + return t.c +} + +func (t *Tree) overflow(p *x, q *d, pi, i int, k Key, v Key) { + atomic.AddUint64(&t.ver, 1) + l, r := p.siblings(pi) + + if l != nil && l.c < 2*kd && i != 0 { + l.mvL(q, 1) + t.insert(q, i-1, k, v) + p.x[pi-1].k = q.d[0].k + return + } + + if r != nil && r.c < 2*kd { + if i < 2*kd { + q.mvR(r, 1) + t.insert(q, i, k, v) + p.x[pi].k = r.d[0].k + return + } + + t.insert(r, 0, k, v) + p.x[pi].k = k + return + } + + t.split(p, q, pi, i, k, v) +} + +// Seek returns an Enumerator positioned on an item such that k >= item's key. +// ok reports if k == item.key The Enumerator's position is possibly after the +// last item in the tree. +func (t *Tree) Seek(k Key) (e *Enumerator, ok bool) { + q := t.r + if q == nil { + e = btEPool.get(nil, false, 0, k, nil, t, atomic.LoadUint64(&t.ver)) + return + } + + for { + var i int + if i, ok = t.find(q, k); ok { + switch x := q.(type) { + case *x: + q = x.x[i+1].ch + continue + case *d: + return btEPool.get(nil, ok, i, k, x, t, atomic.LoadUint64(&t.ver)), true + } + } + + switch x := q.(type) { + case *x: + q = x.x[i].ch + case *d: + return btEPool.get(nil, ok, i, k, x, t, atomic.LoadUint64(&t.ver)), false + } + } +} + +// SeekFirst returns an enumerator positioned on the first KV pair in the tree, +// if any. For an empty tree, err == io.EOF is returned and e will be nil. +func (t *Tree) SeekFirst() (e *Enumerator, err error) { + q := t.first + if q == nil { + return nil, io.EOF + } + + return btEPool.get(nil, true, 0, q.d[0].k, q, t, atomic.LoadUint64(&t.ver)), nil +} + +// SeekLast returns an enumerator positioned on the last KV pair in the tree, +// if any. For an empty tree, err == io.EOF is returned and e will be nil. +func (t *Tree) SeekLast() (e *Enumerator, err error) { + q := t.last + if q == nil { + return nil, io.EOF + } + + return btEPool.get(nil, true, q.c-1, q.d[q.c-1].k, q, t, atomic.LoadUint64(&t.ver)), nil +} + +// Set sets the value associated with k. +func (t *Tree) Set(k Key, v Key) { + //dbg("--- PRE Set(%v, %v)\n%s", k, v, t.dump()) + //defer func() { + // dbg("--- POST\n%s\n====\n", t.dump()) + //}() + + pi := -1 + var p *x + q := t.r + if q == nil { + z := t.insert(btDPool.Get().(*d), 0, k, v) + t.r, t.first, t.last = z, z, z + return + } + + for { + i, ok := t.find(q, k) + if ok { + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + 1 + p = x + q = x.x[i+1].ch + continue + case *d: + x.d[i].v = v + } + return + } + + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + p = x + q = x.x[i].ch + case *d: + switch { + case x.c < 2*kd: + t.insert(x, i, k, v) + default: + t.overflow(p, x, pi, i, k, v) + } + return + } + } +} + +// Put combines Get and Set in a more efficient way where the tree is walked +// only once. The upd(ater) receives (old-value, true) if a KV pair for k +// exists or (zero-value, false) otherwise. It can then return a (new-value, +// true) to create or overwrite the existing value in the KV pair, or +// (whatever, false) if it decides not to create or not to update the value of +// the KV pair. +// +// tree.Set(k, v) call conceptually equals calling +// +// tree.Put(k, func(Key, bool){ return v, true }) +// +// modulo the differing return values. +func (t *Tree) Put(k Key, upd func(oldV Key, exists bool) (newV Key, write bool)) (oldV Key, written bool) { + pi := -1 + var p *x + q := t.r + var newV Key + if q == nil { + // new KV pair in empty tree + newV, written = upd(newV, false) + if !written { + return + } + + z := t.insert(btDPool.Get().(*d), 0, k, newV) + t.r, t.first, t.last = z, z, z + return + } + + for { + i, ok := t.find(q, k) + if ok { + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + 1 + p = x + q = x.x[i+1].ch + continue + case *d: + oldV = x.d[i].v + newV, written = upd(oldV, true) + if !written { + return + } + + x.d[i].v = newV + } + return + } + + switch x := q.(type) { + case *x: + if x.c > 2*kx { + x, i = t.splitX(p, x, pi, i) + } + pi = i + p = x + q = x.x[i].ch + case *d: // new KV pair + newV, written = upd(newV, false) + if !written { + return + } + + switch { + case x.c < 2*kd: + t.insert(x, i, k, newV) + default: + t.overflow(p, x, pi, i, k, newV) + } + return + } + } +} + +func (t *Tree) split(p *x, q *d, pi, i int, k Key, v Key) { + atomic.AddUint64(&t.ver, 1) + r := btDPool.Get().(*d) + if q.n != nil { + r.n = q.n + r.n.p = r + } else { + t.last = r + } + q.n = r + r.p = q + + copy(r.d[:], q.d[kd:2*kd]) + for i := range q.d[kd:] { + q.d[kd+i] = zde + } + q.c = kd + r.c = kd + var done bool + if i > kd { + done = true + t.insert(r, i-kd, k, v) + } + if pi >= 0 { + p.insert(pi, r.d[0].k, r) + } else { + t.r = newX(q).insert(0, r.d[0].k, r) + } + if done { + return + } + + t.insert(q, i, k, v) +} + +func (t *Tree) splitX(p *x, q *x, pi int, i int) (*x, int) { + atomic.AddUint64(&t.ver, 1) + r := btXPool.Get().(*x) + copy(r.x[:], q.x[kx+1:]) + q.c = kx + r.c = kx + if pi >= 0 { + p.insert(pi, q.x[kx].k, r) + q.x[kx].k = zk + for i := range q.x[kx+1:] { + q.x[kx+i+1] = zxe + } + + switch { + case i < kx: + return q, i + case i == kx: + return p, pi + default: // i > kx + return r, i - kx - 1 + } + } + + nr := newX(q).insert(0, q.x[kx].k, r) + t.r = nr + q.x[kx].k = zk + for i := range q.x[kx+1:] { + q.x[kx+i+1] = zxe + } + + switch { + case i < kx: + return q, i + case i == kx: + return nr, 0 + default: // i > kx + return r, i - kx - 1 + } +} + +func (t *Tree) underflow(p *x, q *d, pi int) { + atomic.AddUint64(&t.ver, 1) + l, r := p.siblings(pi) + + if l != nil && l.c+q.c >= 2*kd { + l.mvR(q, 1) + p.x[pi-1].k = q.d[0].k + return + } + + if r != nil && q.c+r.c >= 2*kd { + q.mvL(r, 1) + p.x[pi].k = r.d[0].k + r.d[r.c] = zde // GC + return + } + + if l != nil { + t.cat(p, l, q, pi-1) + return + } + + t.cat(p, q, r, pi) +} + +func (t *Tree) underflowX(p *x, q *x, pi int, i int) (*x, int) { + atomic.AddUint64(&t.ver, 1) + var l, r *x + + if pi >= 0 { + if pi > 0 { + l = p.x[pi-1].ch.(*x) + } + if pi < p.c { + r = p.x[pi+1].ch.(*x) + } + } + + if l != nil && l.c > kx { + q.x[q.c+1].ch = q.x[q.c].ch + copy(q.x[1:], q.x[:q.c]) + q.x[0].ch = l.x[l.c].ch + q.x[0].k = p.x[pi-1].k + q.c++ + i++ + l.c-- + p.x[pi-1].k = l.x[l.c].k + return q, i + } + + if r != nil && r.c > kx { + q.x[q.c].k = p.x[pi].k + q.c++ + q.x[q.c].ch = r.x[0].ch + p.x[pi].k = r.x[0].k + copy(r.x[:], r.x[1:r.c]) + r.c-- + rc := r.c + r.x[rc].ch = r.x[rc+1].ch + r.x[rc].k = zk + r.x[rc+1].ch = nil + return q, i + } + + if l != nil { + i += l.c + 1 + t.catX(p, l, q, pi-1) + q = l + return q, i + } + + t.catX(p, q, r, pi) + return q, i +} + +// ----------------------------------------------------------------- Enumerator + +// Close recycles e to a pool for possible later reuse. No references to e +// should exist or such references must not be used afterwards. +func (e *Enumerator) Close() { + *e = ze + btEPool.Put(e) +} + +// Next returns the currently enumerated item, if it exists and moves to the +// next item in the key collation order. If there is no item to return, err == +// io.EOF is returned. +func (e *Enumerator) Next() (k Key, v Key, err error) { + if err = e.err; err != nil { + return + } + if atomic.LoadUint64(&e.ver) != atomic.LoadUint64(&e.t.ver) { + f, hit := e.t.Seek(e.k) + if !e.hit && hit { + if err = f.next(); err != nil { + return + } + } + + *e = *f + f.Close() + } + if e.q == nil { + e.err, err = io.EOF, io.EOF + return + } + + if e.i >= e.q.c { + if err = e.next(); err != nil { + return + } + } + + i := e.q.d[e.i] + k, v = i.k, i.v + e.k, e.hit = k, false + e.next() + return +} + +func (e *Enumerator) next() error { + if e.q == nil { + e.err = io.EOF + return io.EOF + } + + switch { + case e.i < e.q.c-1: + e.i++ + default: + if e.q, e.i = e.q.n, 0; e.q == nil { + e.err = io.EOF + } + } + return e.err +} + +// Prev returns the currently enumerated item, if it exists and moves to the +// previous item in the key collation order. If there is no item to return, err +// == io.EOF is returned. +func (e *Enumerator) Prev() (k Key, v Key, err error) { + if err = e.err; err != nil { + return + } + if atomic.LoadUint64(&e.ver) != atomic.LoadUint64(&e.t.ver) { + f, hit := e.t.Seek(e.k) + if !e.hit && hit { + if err = f.prev(); err != nil { + return + } + } + + *e = *f + f.Close() + } + if e.q == nil { + e.err, err = io.EOF, io.EOF + return + } + + if e.i >= e.q.c { + if err = e.next(); err != nil { + return + } + } + + i := e.q.d[e.i] + k, v = i.k, i.v + e.k, e.hit = k, false + e.prev() + return +} + +func (e *Enumerator) prev() error { + if e.q == nil { + e.err = io.EOF + return io.EOF + } + + switch { + case e.i > 0: + e.i-- + default: + if e.q = e.q.p; e.q == nil { + e.err = io.EOF + break + } + + e.i = e.q.c - 1 + } + return e.err +} + +func (t *Tree) Ver() uint64 { + return atomic.LoadUint64(&t.ver) +} diff --git a/secondary/secondary_key.go b/secondary/secondary_key.go new file mode 100644 index 0000000..b577c54 --- /dev/null +++ b/secondary/secondary_key.go @@ -0,0 +1,15 @@ +package secondary + +import "github.com/zenhotels/btree-2d/common" + +type Key struct { + Value common.FinalizableComparable +} + +func (s Key) Less(s2 Key) bool { + return s.Value.Less(s2.Value) +} + +func (s Key) Finalize() { + s.Value.Finalize() +} diff --git a/secondary/secondary_test.go b/secondary/secondary_test.go new file mode 100644 index 0000000..edcdbfc --- /dev/null +++ b/secondary/secondary_test.go @@ -0,0 +1,53 @@ +package secondary + +import ( + "fmt" + "log" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/zenhotels/btree-2d/common" +) + +type testKey struct { + N int +} + +func (k testKey) Less(k2 common.Comparable) bool { + kk := k2.(*testKey) + return k.N < kk.N +} + +func (k testKey) String() string { + return fmt.Sprintf("testKey: %d", k.N) +} + +func (k *testKey) Finalize() {} + +func (k *testKey) AddFinalizer(func()) bool { return false } + +// coverage: 23.9% of statements +func TestSync(t *testing.T) { + assert := assert.New(t) + next := NewLayer() + next.Set(Key{&testKey{1}}) + next.Set(Key{&testKey{2}}) + next.Set(Key{&testKey{3}}) + prev := NewLayer() + prev.Set(Key{&testKey{1}}) + prev.Set(Key{&testKey{4}}) + + added := make([]Key, 0, 3) + deleted := make([]Key, 0, 3) + prev.Sync(next, func(k Key) { + added = append(added, k) + }, func(k Key) { + deleted = append(deleted, k) + }) + + assert.Len(added, 2) + assert.Len(deleted, 1) + log.Println("added:", added) + log.Println("deleted:", deleted) + assert.Equal(3, prev.Len()) +}