Skip to content

Commit

Permalink
Optimize RBMutex footprint and thread-to-slot distribution (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz authored Oct 31, 2022
1 parent 24ebe2c commit 1f19135
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 113 deletions.
2 changes: 1 addition & 1 deletion BENCHMARKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ $ go test -bench .

To limit the number of used CPU cores append `-cpu=<number>` argument to the above command.

This document contains some benchmark results obtained on a cloud VM.
This document contains some benchmark results obtained for xsync v1.0.0 on a cloud VM.

### Counter vs. atomic int64

Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,17 @@ To get the optimal performance, you may want to set the queue size to be large e
A `RBMutex` is a reader biased reader/writer mutual exclusion lock. The lock can be held by an many readers or a single writer.

```go
var m xsync.RBMutex
mu := xsync.NewRBMutex()
// reader lock calls return a token
t := m.RLock()
t := mu.RLock()
// the token must be later used to unlock the mutex
m.RUnlock(t)
mu.RUnlock(t)
// writer locks are the same as in sync.RWMutex
m.Lock()
m.Unlock()
mu.Lock()
mu.Unlock()
```

`RBMutex` is based on the BRAVO (Biased Locking for Reader-Writer Locks) algorithm: https://arxiv.org/pdf/1810.01553.pdf
`RBMutex` is based on a modified version of BRAVO (Biased Locking for Reader-Writer Locks) algorithm: https://arxiv.org/pdf/1810.01553.pdf

The idea of the algorithm is to build on top of an existing reader-writer mutex and introduce a fast path for readers. On the fast path, reader lock attempts are sharded over an internal array based on the reader identity (a token in case of Golang). This means that readers do not contend over a single atomic counter like it's done in, say, `sync.RWMutex` allowing for better scalability in terms of cores.

Expand Down
3 changes: 3 additions & 0 deletions counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ var ptokenPool sync.Pool
// the counter
type ptoken struct {
idx uint32
//lint:ignore U1000 prevents false sharing
pad [cacheLineSize - 4]byte
}

// A Counter is a striped int64 counter.
Expand All @@ -35,6 +37,7 @@ type cstripe struct {
pad [cacheLineSize - 8]byte
}

// NewCounter creates a new Counter instance.
func NewCounter() *Counter {
nstripes := nextPowOf2(parallelism())
c := Counter{
Expand Down
96 changes: 62 additions & 34 deletions rbmutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ import (
"time"
)

const (
// number of reader slots; must be a power of two
rslots = 4096
// slow-down guard
nslowdown = 9
)
// slow-down guard
const nslowdown = 7

// pool for reader tokens
var rtokenPool sync.Pool

// RToken is a reader lock token.
type RToken struct {
slot uint32
//lint:ignore U1000 prevents false sharing
pad [cacheLineSize - 4]byte
}

// A RBMutex is a reader biased reader/writer mutual exclusion lock.
Expand All @@ -28,49 +26,79 @@ type RToken struct {
//
// A RBMutex must not be copied after first use.
//
// RBMutex is based on the BRAVO (Biased Locking for Reader-Writer
// Locks) algorithm: https://arxiv.org/pdf/1810.01553.pdf
// RBMutex is based on a modified version of BRAVO
// (Biased Locking for Reader-Writer Locks) algorithm:
// https://arxiv.org/pdf/1810.01553.pdf
//
// RBMutex is a specialized mutex for scenarios, such as caches,
// where the vast majority of locks are acquired by readers and write
// lock acquire attempts are infrequent. In such scenarios, RBMutex
// performs better than the sync.RWMutex on large multicore machines.
// performs better than sync.RWMutex on large multicore machines.
//
// RBMutex extends sync.RWMutex internally and uses it as the "reader
// bias disabled" fallback, so the same semantics apply. The only
// noticeable difference is in reader tokens returned from the
// RLock/RUnlock methods.
type RBMutex struct {
readers [rslots]int32
rslots []rslot
rmask uint32
rbias int32
inhibitUntil time.Time
rw sync.RWMutex
}

type rslot struct {
mu int32
//lint:ignore U1000 prevents false sharing
pad [cacheLineSize - 4]byte
}

// NewRBMutex creates a new RBMutex instance.
func NewRBMutex() *RBMutex {
nslots := nextPowOf2(parallelism())
mu := RBMutex{
rslots: make([]rslot, nslots),
rmask: nslots - 1,
rbias: 1,
}
return &mu
}

// RLock locks m for reading and returns a reader token. The
// token must be used in the later RUnlock call.
//
// Should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock.
func (m *RBMutex) RLock() *RToken {
if atomic.LoadInt32(&m.rbias) == 1 {
func (mu *RBMutex) RLock() *RToken {
if atomic.LoadInt32(&mu.rbias) == 1 {
t, ok := rtokenPool.Get().(*RToken)
if !ok {
t = new(RToken)
// Since rslots is a power of two, we can use & instead of %.
t.slot = uint32(fastrand() & (rslots - 1))
t.slot = fastrand()
}
if atomic.CompareAndSwapInt32(&m.readers[t.slot], 0, 1) {
if atomic.LoadInt32(&m.rbias) == 1 {
return t
// Try all available slots to distribute reader threads to slots.
for i := 0; i < len(mu.rslots); i++ {
slot := t.slot + uint32(i)
rslot := &mu.rslots[slot&mu.rmask]
rslotmu := atomic.LoadInt32(&rslot.mu)
if atomic.CompareAndSwapInt32(&rslot.mu, rslotmu, rslotmu+1) {
if atomic.LoadInt32(&mu.rbias) == 1 {
// Hot path succeeded.
t.slot = slot
return t
}
// The mutex is no longer reader biased. Go to the slow path.
atomic.AddInt32(&rslot.mu, -1)
rtokenPool.Put(t)
break
}
atomic.StoreInt32(&m.readers[t.slot], 0)
// Contention detected. Give a try with the next slot.
}
rtokenPool.Put(t)
}
m.rw.RLock()
if atomic.LoadInt32(&m.rbias) == 0 && time.Now().After(m.inhibitUntil) {
atomic.StoreInt32(&m.rbias, 1)
// Slow path.
mu.rw.RLock()
if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) {
atomic.StoreInt32(&mu.rbias, 1)
}
return nil
}
Expand All @@ -79,30 +107,30 @@ func (m *RBMutex) RLock() *RToken {
// the RLock call must be provided. RUnlock does not affect other
// simultaneous readers. A panic is raised if m is not locked for
// reading on entry to RUnlock.
func (m *RBMutex) RUnlock(t *RToken) {
func (mu *RBMutex) RUnlock(t *RToken) {
if t == nil {
m.rw.RUnlock()
mu.rw.RUnlock()
return
}
if !atomic.CompareAndSwapInt32(&m.readers[t.slot], 1, 0) {
if atomic.AddInt32(&mu.rslots[t.slot&mu.rmask].mu, -1) < 0 {
panic("invalid reader state detected")
}
rtokenPool.Put(t)
}

// Lock locks m for writing. If the lock is already locked for
// reading or writing, Lock blocks until the lock is available.
func (m *RBMutex) Lock() {
m.rw.Lock()
if atomic.LoadInt32(&m.rbias) == 1 {
atomic.StoreInt32(&m.rbias, 0)
func (mu *RBMutex) Lock() {
mu.rw.Lock()
if atomic.LoadInt32(&mu.rbias) == 1 {
atomic.StoreInt32(&mu.rbias, 0)
start := time.Now()
for i := 0; i < rslots; i++ {
for atomic.LoadInt32(&m.readers[i]) == 1 {
for i := 0; i < len(mu.rslots); i++ {
for atomic.LoadInt32(&mu.rslots[i].mu) > 0 {
runtime.Gosched()
}
}
m.inhibitUntil = time.Now().Add(time.Since(start) * nslowdown)
mu.inhibitUntil = time.Now().Add(time.Since(start) * nslowdown)
}
}

Expand All @@ -112,6 +140,6 @@ func (m *RBMutex) Lock() {
// As with RWMutex, a locked RBMutex is not associated with a
// particular goroutine. One goroutine may RLock (Lock) a RBMutex and
// then arrange for another goroutine to RUnlock (Unlock) it.
func (m *RBMutex) Unlock() {
m.rw.Unlock()
func (mu *RBMutex) Unlock() {
mu.rw.Unlock()
}
Loading

0 comments on commit 1f19135

Please # to comment.