diff --git a/BENCHMARKS.md b/BENCHMARKS.md index 62c30ce..5311216 100644 --- a/BENCHMARKS.md +++ b/BENCHMARKS.md @@ -7,7 +7,7 @@ $ go test -bench . To limit the number of used CPU cores append `-cpu=` argument to the above command. -This document contains some benchmark results obtained on a cloud VM. +This document contains some benchmark results obtained for xsync v1.0.0 on a cloud VM. ### Counter vs. atomic int64 diff --git a/README.md b/README.md index 0f0ab07..b00f553 100644 --- a/README.md +++ b/README.md @@ -101,17 +101,17 @@ To get the optimal performance, you may want to set the queue size to be large e A `RBMutex` is a reader biased reader/writer mutual exclusion lock. The lock can be held by an many readers or a single writer. ```go -var m xsync.RBMutex +mu := xsync.NewRBMutex() // reader lock calls return a token -t := m.RLock() +t := mu.RLock() // the token must be later used to unlock the mutex -m.RUnlock(t) +mu.RUnlock(t) // writer locks are the same as in sync.RWMutex -m.Lock() -m.Unlock() +mu.Lock() +mu.Unlock() ``` -`RBMutex` is based on the BRAVO (Biased Locking for Reader-Writer Locks) algorithm: https://arxiv.org/pdf/1810.01553.pdf +`RBMutex` is based on a modified version of BRAVO (Biased Locking for Reader-Writer Locks) algorithm: https://arxiv.org/pdf/1810.01553.pdf The idea of the algorithm is to build on top of an existing reader-writer mutex and introduce a fast path for readers. On the fast path, reader lock attempts are sharded over an internal array based on the reader identity (a token in case of Golang). This means that readers do not contend over a single atomic counter like it's done in, say, `sync.RWMutex` allowing for better scalability in terms of cores. diff --git a/counter.go b/counter.go index 5f4989d..4bf2c91 100644 --- a/counter.go +++ b/counter.go @@ -16,6 +16,8 @@ var ptokenPool sync.Pool // the counter type ptoken struct { idx uint32 + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - 4]byte } // A Counter is a striped int64 counter. @@ -35,6 +37,7 @@ type cstripe struct { pad [cacheLineSize - 8]byte } +// NewCounter creates a new Counter instance. func NewCounter() *Counter { nstripes := nextPowOf2(parallelism()) c := Counter{ diff --git a/rbmutex.go b/rbmutex.go index 10b8148..c4a503f 100644 --- a/rbmutex.go +++ b/rbmutex.go @@ -7,12 +7,8 @@ import ( "time" ) -const ( - // number of reader slots; must be a power of two - rslots = 4096 - // slow-down guard - nslowdown = 9 -) +// slow-down guard +const nslowdown = 7 // pool for reader tokens var rtokenPool sync.Pool @@ -20,6 +16,8 @@ var rtokenPool sync.Pool // RToken is a reader lock token. type RToken struct { slot uint32 + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - 4]byte } // A RBMutex is a reader biased reader/writer mutual exclusion lock. @@ -28,49 +26,79 @@ type RToken struct { // // A RBMutex must not be copied after first use. // -// RBMutex is based on the BRAVO (Biased Locking for Reader-Writer -// Locks) algorithm: https://arxiv.org/pdf/1810.01553.pdf +// RBMutex is based on a modified version of BRAVO +// (Biased Locking for Reader-Writer Locks) algorithm: +// https://arxiv.org/pdf/1810.01553.pdf // // RBMutex is a specialized mutex for scenarios, such as caches, // where the vast majority of locks are acquired by readers and write // lock acquire attempts are infrequent. In such scenarios, RBMutex -// performs better than the sync.RWMutex on large multicore machines. +// performs better than sync.RWMutex on large multicore machines. // // RBMutex extends sync.RWMutex internally and uses it as the "reader // bias disabled" fallback, so the same semantics apply. The only // noticeable difference is in reader tokens returned from the // RLock/RUnlock methods. type RBMutex struct { - readers [rslots]int32 + rslots []rslot + rmask uint32 rbias int32 inhibitUntil time.Time rw sync.RWMutex } +type rslot struct { + mu int32 + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - 4]byte +} + +// NewRBMutex creates a new RBMutex instance. +func NewRBMutex() *RBMutex { + nslots := nextPowOf2(parallelism()) + mu := RBMutex{ + rslots: make([]rslot, nslots), + rmask: nslots - 1, + rbias: 1, + } + return &mu +} + // RLock locks m for reading and returns a reader token. The // token must be used in the later RUnlock call. // // Should not be used for recursive read locking; a blocked Lock // call excludes new readers from acquiring the lock. -func (m *RBMutex) RLock() *RToken { - if atomic.LoadInt32(&m.rbias) == 1 { +func (mu *RBMutex) RLock() *RToken { + if atomic.LoadInt32(&mu.rbias) == 1 { t, ok := rtokenPool.Get().(*RToken) if !ok { t = new(RToken) - // Since rslots is a power of two, we can use & instead of %. - t.slot = uint32(fastrand() & (rslots - 1)) + t.slot = fastrand() } - if atomic.CompareAndSwapInt32(&m.readers[t.slot], 0, 1) { - if atomic.LoadInt32(&m.rbias) == 1 { - return t + // Try all available slots to distribute reader threads to slots. + for i := 0; i < len(mu.rslots); i++ { + slot := t.slot + uint32(i) + rslot := &mu.rslots[slot&mu.rmask] + rslotmu := atomic.LoadInt32(&rslot.mu) + if atomic.CompareAndSwapInt32(&rslot.mu, rslotmu, rslotmu+1) { + if atomic.LoadInt32(&mu.rbias) == 1 { + // Hot path succeeded. + t.slot = slot + return t + } + // The mutex is no longer reader biased. Go to the slow path. + atomic.AddInt32(&rslot.mu, -1) + rtokenPool.Put(t) + break } - atomic.StoreInt32(&m.readers[t.slot], 0) + // Contention detected. Give a try with the next slot. } - rtokenPool.Put(t) } - m.rw.RLock() - if atomic.LoadInt32(&m.rbias) == 0 && time.Now().After(m.inhibitUntil) { - atomic.StoreInt32(&m.rbias, 1) + // Slow path. + mu.rw.RLock() + if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) { + atomic.StoreInt32(&mu.rbias, 1) } return nil } @@ -79,12 +107,12 @@ func (m *RBMutex) RLock() *RToken { // the RLock call must be provided. RUnlock does not affect other // simultaneous readers. A panic is raised if m is not locked for // reading on entry to RUnlock. -func (m *RBMutex) RUnlock(t *RToken) { +func (mu *RBMutex) RUnlock(t *RToken) { if t == nil { - m.rw.RUnlock() + mu.rw.RUnlock() return } - if !atomic.CompareAndSwapInt32(&m.readers[t.slot], 1, 0) { + if atomic.AddInt32(&mu.rslots[t.slot&mu.rmask].mu, -1) < 0 { panic("invalid reader state detected") } rtokenPool.Put(t) @@ -92,17 +120,17 @@ func (m *RBMutex) RUnlock(t *RToken) { // Lock locks m for writing. If the lock is already locked for // reading or writing, Lock blocks until the lock is available. -func (m *RBMutex) Lock() { - m.rw.Lock() - if atomic.LoadInt32(&m.rbias) == 1 { - atomic.StoreInt32(&m.rbias, 0) +func (mu *RBMutex) Lock() { + mu.rw.Lock() + if atomic.LoadInt32(&mu.rbias) == 1 { + atomic.StoreInt32(&mu.rbias, 0) start := time.Now() - for i := 0; i < rslots; i++ { - for atomic.LoadInt32(&m.readers[i]) == 1 { + for i := 0; i < len(mu.rslots); i++ { + for atomic.LoadInt32(&mu.rslots[i].mu) > 0 { runtime.Gosched() } } - m.inhibitUntil = time.Now().Add(time.Since(start) * nslowdown) + mu.inhibitUntil = time.Now().Add(time.Since(start) * nslowdown) } } @@ -112,6 +140,6 @@ func (m *RBMutex) Lock() { // As with RWMutex, a locked RBMutex is not associated with a // particular goroutine. One goroutine may RLock (Lock) a RBMutex and // then arrange for another goroutine to RUnlock (Unlock) it. -func (m *RBMutex) Unlock() { - m.rw.Unlock() +func (mu *RBMutex) Unlock() { + mu.rw.Unlock() } diff --git a/rbmutex_test.go b/rbmutex_test.go index c6cf4b7..a551294 100644 --- a/rbmutex_test.go +++ b/rbmutex_test.go @@ -15,29 +15,34 @@ import ( ) func TestRBMutexSerialReader(t *testing.T) { - var m RBMutex - for i := 0; i < 10; i++ { - tk := m.RLock() - m.RUnlock(tk) + const numIters = 10 + mu := NewRBMutex() + var rtokens [numIters]*RToken + for i := 0; i < numIters; i++ { + rtokens[i] = mu.RLock() + + } + for i := 0; i < numIters; i++ { + mu.RUnlock(rtokens[i]) } } -func parallelReader(m *RBMutex, clocked, cunlock, cdone chan bool) { - tk := m.RLock() +func parallelReader(mu *RBMutex, clocked, cunlock, cdone chan bool) { + tk := mu.RLock() clocked <- true <-cunlock - m.RUnlock(tk) + mu.RUnlock(tk) cdone <- true } func doTestParallelReaders(numReaders, gomaxprocs int) { runtime.GOMAXPROCS(gomaxprocs) - var m RBMutex + mu := NewRBMutex() clocked := make(chan bool) cunlock := make(chan bool) cdone := make(chan bool) for i := 0; i < numReaders; i++ { - go parallelReader(&m, clocked, cunlock, cdone) + go parallelReader(mu, clocked, cunlock, cdone) } // Wait for all parallel RLock()s to succeed. for i := 0; i < numReaders; i++ { @@ -53,60 +58,58 @@ func doTestParallelReaders(numReaders, gomaxprocs int) { } func TestRBMutexParallelReaders(t *testing.T) { - defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(0)) doTestParallelReaders(1, 4) doTestParallelReaders(3, 4) doTestParallelReaders(4, 2) } -func reader(m *RBMutex, numIterations int, activity *int32, cdone chan bool) { +func reader(mu *RBMutex, numIterations int, activity *int32, cdone chan bool) { for i := 0; i < numIterations; i++ { - tk := m.RLock() + tk := mu.RLock() n := atomic.AddInt32(activity, 1) if n < 1 || n >= 10000 { - m.RUnlock(tk) + mu.RUnlock(tk) panic(fmt.Sprintf("rlock(%d)\n", n)) } for i := 0; i < 100; i++ { } atomic.AddInt32(activity, -1) - m.RUnlock(tk) + mu.RUnlock(tk) } cdone <- true } -func writer(m *RBMutex, numIterations int, activity *int32, cdone chan bool) { +func writer(mu *RBMutex, numIterations int, activity *int32, cdone chan bool) { for i := 0; i < numIterations; i++ { - m.Lock() + mu.Lock() n := atomic.AddInt32(activity, 10000) if n != 10000 { - m.Unlock() + mu.Unlock() panic(fmt.Sprintf("wlock(%d)\n", n)) } for i := 0; i < 100; i++ { } atomic.AddInt32(activity, -10000) - m.Unlock() + mu.Unlock() } cdone <- true } func hammerRBMutex(gomaxprocs, numReaders, numIterations int) { runtime.GOMAXPROCS(gomaxprocs) - var ( - // Number of active readers + 10000 * number of active writers. - activity int32 - m RBMutex - ) + // Number of active readers + 10000 * number of active writers. + var activity int32 + mu := NewRBMutex() cdone := make(chan bool) - go writer(&m, numIterations, &activity, cdone) + go writer(mu, numIterations, &activity, cdone) var i int for i = 0; i < numReaders/2; i++ { - go reader(&m, numIterations, &activity, cdone) + go reader(mu, numIterations, &activity, cdone) } - go writer(&m, numIterations, &activity, cdone) + go writer(mu, numIterations, &activity, cdone) for ; i < numReaders; i++ { - go reader(&m, numIterations, &activity, cdone) + go reader(mu, numIterations, &activity, cdone) } // Wait for the 2 writers and all readers to finish. for i := 0; i < 2+numReaders; i++ { @@ -115,11 +118,8 @@ func hammerRBMutex(gomaxprocs, numReaders, numIterations int) { } func TestRBMutex(t *testing.T) { - defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) - n := 1000 - if testing.Short() { - n = 5 - } + const n = 1000 + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(0)) hammerRBMutex(1, 1, n) hammerRBMutex(1, 3, n) hammerRBMutex(1, 10, n) @@ -132,112 +132,128 @@ func TestRBMutex(t *testing.T) { hammerRBMutex(10, 5, n) } -func benchmarkRBMutex(b *testing.B, localWork, writeRatio int) { - var m RBMutex +func benchmarkRBMutex(b *testing.B, parallelism, localWork, writeRatio int) { + mu := NewRBMutex() + b.SetParallelism(parallelism) b.RunParallel(func(pb *testing.PB) { foo := 0 for pb.Next() { foo++ if writeRatio > 0 && foo%writeRatio == 0 { - m.Lock() - //lint:ignore SA2001 critical section is empty in this benchmark - m.Unlock() + mu.Lock() + for i := 0; i != localWork; i += 1 { + foo *= 2 + foo /= 2 + } + mu.Unlock() } else { - tk := m.RLock() + tk := mu.RLock() for i := 0; i != localWork; i += 1 { foo *= 2 foo /= 2 } - m.RUnlock(tk) + mu.RUnlock(tk) } } _ = foo }) } +func BenchmarkRBMutexReadOnly_HighParallelism(b *testing.B) { + benchmarkRBMutex(b, 1024, 0, -1) +} + func BenchmarkRBMutexReadOnly(b *testing.B) { - benchmarkRBMutex(b, 0, -1) + benchmarkRBMutex(b, -1, 0, -1) +} + +func BenchmarkRBMutexWrite100000(b *testing.B) { + benchmarkRBMutex(b, -1, 0, 100000) } func BenchmarkRBMutexWrite10000(b *testing.B) { - benchmarkRBMutex(b, 0, 10000) + benchmarkRBMutex(b, -1, 0, 10000) } func BenchmarkRBMutexWrite1000(b *testing.B) { - benchmarkRBMutex(b, 0, 1000) + benchmarkRBMutex(b, -1, 0, 1000) } -func BenchmarkRBMutexWrite100(b *testing.B) { - benchmarkRBMutex(b, 0, 100) +func BenchmarkRBMutexWorkReadOnly(b *testing.B) { + benchmarkRBMutex(b, -1, 100, -1) } -func BenchmarkRBMutexWorkReadOnly(b *testing.B) { - benchmarkRBMutex(b, 100, -1) +func BenchmarkRBMutexWorkWrite100000(b *testing.B) { + benchmarkRBMutex(b, -1, 100, 100000) } func BenchmarkRBMutexWorkWrite10000(b *testing.B) { - benchmarkRBMutex(b, 100, 10000) + benchmarkRBMutex(b, -1, 100, 10000) } func BenchmarkRBMutexWorkWrite1000(b *testing.B) { - benchmarkRBMutex(b, 100, 1000) + benchmarkRBMutex(b, -1, 100, 1000) } -func BenchmarkRBMutexWorkWrite100(b *testing.B) { - benchmarkRBMutex(b, 100, 100) -} - -func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) { - var m sync.RWMutex +func benchmarkRWMutex(b *testing.B, parallelism, localWork, writeRatio int) { + var mu sync.RWMutex + b.SetParallelism(parallelism) b.RunParallel(func(pb *testing.PB) { foo := 0 for pb.Next() { foo++ if writeRatio > 0 && foo%writeRatio == 0 { - m.Lock() - //lint:ignore SA2001 critical section is empty in this benchmark - m.Unlock() + mu.Lock() + for i := 0; i != localWork; i += 1 { + foo *= 2 + foo /= 2 + } + mu.Unlock() } else { - m.RLock() + mu.RLock() for i := 0; i != localWork; i += 1 { foo *= 2 foo /= 2 } - m.RUnlock() + mu.RUnlock() } } _ = foo }) } +func BenchmarkRWMutexReadOnly_HighParallelism(b *testing.B) { + benchmarkRWMutex(b, 1024, 0, -1) +} + func BenchmarkRWMutexReadOnly(b *testing.B) { - benchmarkRWMutex(b, 0, -1) + benchmarkRWMutex(b, -1, 0, -1) +} + +func BenchmarkRWMutexWrite100000(b *testing.B) { + benchmarkRWMutex(b, -1, 0, 100000) } func BenchmarkRWMutexWrite10000(b *testing.B) { - benchmarkRWMutex(b, 0, 10000) + benchmarkRWMutex(b, -1, 0, 10000) } func BenchmarkRWMutexWrite1000(b *testing.B) { - benchmarkRWMutex(b, 0, 1000) + benchmarkRWMutex(b, -1, 0, 1000) } -func BenchmarkRWMutexWrite100(b *testing.B) { - benchmarkRWMutex(b, 0, 100) +func BenchmarkRWMutexWorkReadOnly(b *testing.B) { + benchmarkRWMutex(b, -1, 100, -1) } -func BenchmarkRWMutexWorkReadOnly(b *testing.B) { - benchmarkRWMutex(b, 100, -1) +func BenchmarkRWMutexWorkWrite100000(b *testing.B) { + benchmarkRWMutex(b, -1, 100, 100000) } func BenchmarkRWMutexWorkWrite10000(b *testing.B) { - benchmarkRWMutex(b, 100, 10000) + benchmarkRWMutex(b, -1, 100, 10000) } func BenchmarkRWMutexWorkWrite1000(b *testing.B) { - benchmarkRWMutex(b, 100, 1000) -} - -func BenchmarkRWMutexWorkWrite100(b *testing.B) { - benchmarkRWMutex(b, 100, 100) + benchmarkRWMutex(b, -1, 100, 1000) }