From 839ebfdd52d2a92e68890bd796595f1ae29a3836 Mon Sep 17 00:00:00 2001 From: maypok86 Date: Thu, 11 Jan 2024 20:36:21 +0300 Subject: [PATCH] [#27] Add Range function --- cache.go | 13 ++++ cache_test.go | 28 ++++++++ internal/hashtable/map.go | 49 +++++++++++++ internal/hashtable/map_test.go | 122 +++++++++++++++++++++++++++++++++ 4 files changed, 212 insertions(+) diff --git a/cache.go b/cache.go index ba3024c..e5b6d2e 100644 --- a/cache.go +++ b/cache.go @@ -265,6 +265,19 @@ func (c *Cache[K, V]) process() { } } +// Range iterates over all items in the cache. +// +// Iteration stops early when the given function returns false. +func (c *Cache[K, V]) Range(f func(key K, value V) bool) { + c.hashmap.Range(func(n *node.Node[K, V]) bool { + if n.IsExpired() { + return true + } + + return f(n.Key(), n.Value()) + }) +} + // Clear clears the hash table, all policies, buffers, etc. // // NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined. diff --git a/cache_test.go b/cache_test.go index a567828..4e3581d 100644 --- a/cache_test.go +++ b/cache_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/maypok86/otter/internal/node" "github.com/maypok86/otter/internal/xruntime" ) @@ -148,6 +149,33 @@ func TestCache_Ratio(t *testing.T) { t.Logf("actual: %.2f, optimal: %.2f", c.Ratio(), o.Ratio()) } +func TestCache_Range(t *testing.T) { + size := 10 + c, err := MustBuilder[int, int](size).Build() + if err != nil { + t.Fatalf("can not create cache: %v", err) + } + + time.Sleep(3 * time.Second) + + c.Set(1, 1) + c.hashmap.Set(node.New(2, 2, 1, 1)) + c.Set(3, 3) + aliveNodes := 2 + iters := 0 + c.Range(func(key, value int) bool { + if key != value { + t.Fatalf("got unexpected key/value for iteration %d: %d/%d", iters, key, value) + return false + } + iters++ + return true + }) + if iters != aliveNodes { + t.Fatalf("got unexpected number of iterations: %d", iters) + } +} + func TestCache_Close(t *testing.T) { size := 10 c, err := MustBuilder[int, int](size).Build() diff --git a/internal/hashtable/map.go b/internal/hashtable/map.go index c6d8dfc..3caa9b8 100644 --- a/internal/hashtable/map.go +++ b/internal/hashtable/map.go @@ -448,6 +448,55 @@ func (m *Map[K, V]) waitForResize() { m.resizeMutex.Unlock() } +// Range calls f sequentially for each node present in the +// map. If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot +// of the Map's contents: no key will be visited more than once, but +// if the value for any key is stored or deleted concurrently, Range +// may reflect any mapping for that key from any point during the +// Range call. +// +// It is safe to modify the map while iterating it. However, the +// concurrent modification rule apply, i.e. the changes may be not +// reflected in the subsequently iterated nodes. +func (m *Map[K, V]) Range(f func(*node.Node[K, V]) bool) { + var zeroPtr unsafe.Pointer + // Pre-allocate array big enough to fit nodes for most hash tables. + buffer := make([]unsafe.Pointer, 0, 16*bucketSize) + tp := atomic.LoadPointer(&m.table) + t := *(*table[K])(tp) + for i := range t.buckets { + rootBucket := &t.buckets[i] + b := rootBucket + // Prevent concurrent modifications and copy all nodes into + // the intermediate slice. + rootBucket.mutex.Lock() + for { + for i := 0; i < bucketSize; i++ { + if b.nodes[i] != nil { + buffer = append(buffer, b.nodes[i]) + } + } + if b.next == nil { + rootBucket.mutex.Unlock() + break + } + b = (*paddedBucket)(b.next) + } + // Call the function for all copied nodes. + for j := range buffer { + n := (*node.Node[K, V])(buffer[j]) + if !f(n) { + return + } + // Remove the reference to allow the copied nodes to be GCed before this method finishes. + buffer[j] = zeroPtr + } + buffer = buffer[:0] + } +} + // Clear deletes all keys and values currently stored in the map. func (m *Map[K, V]) Clear() { table := (*table[K])(atomic.LoadPointer(&m.table)) diff --git a/internal/hashtable/map_test.go b/internal/hashtable/map_test.go index 6686557..d848625 100644 --- a/internal/hashtable/map_test.go +++ b/internal/hashtable/map_test.go @@ -130,6 +130,65 @@ func TestMap_SetThenDelete(t *testing.T) { } } +func TestMap_Range(t *testing.T) { + const numNodes = 1000 + m := New[string, int]() + for i := 0; i < numNodes; i++ { + m.Set(newNode(strconv.Itoa(i), i)) + } + iters := 0 + met := make(map[string]int) + m.Range(func(n *node.Node[string, int]) bool { + if n.Key() != strconv.Itoa(n.Value()) { + t.Fatalf("got unexpected key/value for iteration %d: %v/%v", iters, n.Key(), n.Value()) + return false + } + met[n.Key()] += 1 + iters++ + return true + }) + if iters != numNodes { + t.Fatalf("got unexpected number of iterations: %d", iters) + } + for i := 0; i < numNodes; i++ { + if c := met[strconv.Itoa(i)]; c != 1 { + t.Fatalf("range did not iterate correctly over %d: %d", i, c) + } + } +} + +func TestMap_RangeFalseReturned(t *testing.T) { + m := New[string, int]() + for i := 0; i < 100; i++ { + m.Set(newNode(strconv.Itoa(i), i)) + } + iters := 0 + m.Range(func(n *node.Node[string, int]) bool { + iters++ + return iters != 13 + }) + if iters != 13 { + t.Fatalf("got unexpected number of iterations: %d", iters) + } +} + +func TestMap_RangeNestedDelete(t *testing.T) { + const numNodes = 256 + m := New[string, int]() + for i := 0; i < numNodes; i++ { + m.Set(newNode(strconv.Itoa(i), i)) + } + m.Range(func(n *node.Node[string, int]) bool { + m.Delete(n.Key()) + return true + }) + for i := 0; i < numNodes; i++ { + if _, ok := m.Get(strconv.Itoa(i)); ok { + t.Fatalf("value found for %d", i) + } + } +} + func TestMap_Size(t *testing.T) { const numberOfNodes = 1000 m := New[string, int]() @@ -284,3 +343,66 @@ func TestMap_ParallelSetsAndDeletes(t *testing.T) { wg.Wait() } + +func parallelTypedRangeSetter(t *testing.T, m *Map[int, int], numNodes int, stopFlag *int64, cdone chan bool) { + t.Helper() + + for { + for i := 0; i < numNodes; i++ { + m.Set(newNode(i, i)) + } + if atomic.LoadInt64(stopFlag) != 0 { + break + } + } + cdone <- true +} + +func parallelTypedRangeDeleter(t *testing.T, m *Map[int, int], numNodes int, stopFlag *int64, cdone chan bool) { + t.Helper() + + for { + for i := 0; i < numNodes; i++ { + m.Delete(i) + } + if atomic.LoadInt64(stopFlag) != 0 { + break + } + } + cdone <- true +} + +func TestMapOfParallelRange(t *testing.T) { + const numNodes = 10_000 + m := New[int, int]() + for i := 0; i < numNodes; i++ { + m.Set(newNode(i, i)) + } + // Start goroutines that would be storing and deleting items in parallel. + cdone := make(chan bool) + stopFlag := int64(0) + go parallelTypedRangeSetter(t, m, numNodes, &stopFlag, cdone) + go parallelTypedRangeDeleter(t, m, numNodes, &stopFlag, cdone) + // Iterate the map and verify that no duplicate keys were met. + met := make(map[int]int) + m.Range(func(n *node.Node[int, int]) bool { + if n.Key() != n.Value() { + t.Fatalf("got unexpected value for key %d: %d", n.Key(), n.Value()) + return false + } + met[n.Key()] += 1 + return true + }) + if len(met) == 0 { + t.Fatal("no nodes were met when iterating") + } + for k, c := range met { + if c != 1 { + t.Fatalf("met key %d multiple times: %d", k, c) + } + } + // Make sure that both goroutines finish. + atomic.StoreInt64(&stopFlag, 1) + <-cdone + <-cdone +}