Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Memberlist: Optimise receive path by not cloning current ring state. #76

Merged
merged 14 commits into from
Nov 26, 2021
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
* [ENHANCEMENT] Add grpcclient, grpcencoding and grpcutil packages. #39
* [ENHANCEMENT] Replace go-kit/kit/log with go-kit/log. #52
* [ENHANCEMENT] Add spanlogger package. #42
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
9 changes: 8 additions & 1 deletion kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
m.storeMu.Lock()
defer m.storeMu.Unlock()

curr := m.store[key].Clone()
// Note that we do not take a deep copy of curr.value here, it is modified in-place.
// This is safe because the entire function runs under the store lock; we do not return
// the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS).
curr := m.store[key]
// if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set.
if casVersion > 0 && curr.version != casVersion {
return nil, 0, errVersionMismatch
Expand Down Expand Up @@ -1224,6 +1227,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
codecID: codec.CodecID(),
}

// The "changes" returned by Merge() can contain references to the "result"
// state. Therefore, make sure we clone it before releasing the lock.
change = change.Clone()

return change, newVersion, nil
}

Expand Down
100 changes: 100 additions & 0 deletions ring/bench/ring_memberlist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package bench

import (
"context"
"fmt"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
)

type dnsProviderMock struct {
resolved []string
}

func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error {
p.resolved = addrs
return nil
}

func (p dnsProviderMock) Addresses() []string {
return p.resolved
}

func encodeMessage(b *testing.B, key string, d *ring.Desc) []byte {
c := ring.GetCodec()
val, err := c.Encode(d)
require.NoError(b, err)

kvPair := memberlist.KeyValuePair{
Key: key,
Value: val,
Codec: c.CodecID(),
}

ser, err := kvPair.Marshal()
require.NoError(b, err)
return ser
}

// Benchmark the memberlist receive path when it us being used as the ring backing store.
func BenchmarkMemberlistReceiveWithRingDesc(b *testing.B) {
c := ring.GetCodec()

var cfg memberlist.KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = memberlist.TCPTransportConfig{
BindAddrs: []string{"localhost"},
}
cfg.Codecs = []codec.Codec{c}

mkv := memberlist.NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())
require.NoError(b, services.StartAndAwaitRunning(context.Background(), mkv))
defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck

_, err := memberlist.NewClient(mkv, c)
require.NoError(b, err)

// Build the initial ring state:
// - The ring isn't actually in use, so the fields such as address/zone are not important.
// - The number of keys in the store has no impact for this test, so simulate a single ring.
// - The number of instances in the ring does have a big impact.
const numInstances = 600
const numTokens = 128
{
var tokensUsed []uint32

initialDesc := ring.NewDesc()
for i := 0; i < numInstances; i++ {
tokens := ring.GenerateTokens(numTokens, tokensUsed)
initialDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", tokens, ring.ACTIVE, time.Now())
}
// Send a single update to populate the store.
msg := encodeMessage(b, "ring", initialDesc)
mkv.NotifyMsg(msg)
}

// Pre-encode some payloads. It's not significant what the payloads actually
// update in the ring, though it may be important for future optimisations.
testMsgs := make([][]byte, 100)
for i := range testMsgs {
testDesc := ring.NewDesc()
testDesc.AddIngester(fmt.Sprintf("instance-%d", i), "127.0.0.1", "zone", nil, ring.ACTIVE, time.Now())
testMsgs[i] = encodeMessage(b, "ring", testDesc)
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
mkv.NotifyMsg(testMsgs[i%len(testMsgs)])
}
}