Skip to content

Commit

Permalink
fix memberlist delete operation (#645)
Browse files Browse the repository at this point in the history
* fix memberlist delete operation

* added new fields to kv pair in update logic

* merged changes from Nikos's branch

* update tests to match Nikos's PR

* fix missing import

* handle snappy decode of empty data

* comment explaining empty incoming data

* fix import lint

* updated logging
  • Loading branch information
aldernero authored Feb 13, 2025
1 parent eab3648 commit e83d24e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 44 deletions.
64 changes: 35 additions & 29 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/hashicorp/memberlist"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
Expand Down Expand Up @@ -367,9 +368,10 @@ func (v ValueDesc) String() string {

var (
// if merge fails because of CAS version mismatch, this error is returned. CAS operation reacts on it
errVersionMismatch = errors.New("version mismatch")
errNoChangeDetected = errors.New("no change detected")
errTooManyRetries = errors.New("too many retries")
errVersionMismatch = errors.New("version mismatch")
errNoChangeDetected = errors.New("no change detected")
errTooManyRetries = errors.New("too many retries")
emptySnappyEncodedData = snappy.Encode(nil, []byte{})
)

// NewKV creates new gossip-based KV service. Note that service needs to be started, until then it doesn't initialize
Expand Down Expand Up @@ -547,7 +549,7 @@ func (m *KV) running(ctx context.Context) error {
case <-obsoleteEntriesTickerChan:
// cleanupObsoleteEntries is normally called during push/pull, but if there are no other
// nodes to push/pull with, we can call it periodically to make sure we remove unused entries from memory.
level.Debug(m.logger).Log("msg", "initiating cleanup of obsolete entries")
level.Info(m.logger).Log("msg", "initiating cleanup of obsolete entries")
m.cleanupObsoleteEntries()

case <-ctx.Done():
Expand Down Expand Up @@ -1045,11 +1047,13 @@ func (m *KV) Delete(key string) error {

c := m.GetCodec(val.CodecID)
if c == nil {
level.Error(m.logger).Log("msg", "could not mark key for deletion due to an invalid codec", "key", key, "codec", val.CodecID)
return fmt.Errorf("invalid codec: %s", val.CodecID)
}

change, newver, deleted, updated, err := m.mergeValueForKey(key, val.value, false, 0, val.CodecID, true, time.Now())
if err != nil {
level.Error(m.logger).Log("msg", "could not mark key for deletion due to error while trying to merge new value", "key", key, "err", err)
return err
}

Expand All @@ -1058,6 +1062,8 @@ func (m *KV) Delete(key string) error {
m.broadcastNewValue(key, change, newver, c, false, deleted, updated)
}

level.Info(m.logger).Log("msg", "successfully marked key for deletion", "key", key)

return nil
}

Expand Down Expand Up @@ -1171,8 +1177,8 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec
level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key)
return
}
data, err := codec.Encode(change)

data, err := handlePossibleNilEncode(codec, change)
if err != nil {
level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err)
m.numberOfBroadcastMessagesDropped.Inc()
Expand All @@ -1187,7 +1193,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec
return
}

mergedChanges := handlePossibleNilMergeContent(change)
mergedChanges := change.MergeContent()
m.addSentMessage(Message{
Time: time.Now(),
Size: len(pairData),
Expand Down Expand Up @@ -1295,9 +1301,11 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
Time: time.Now(),
Size: update.messageSize,
Pair: KeyValuePair{
Key: key,
Value: update.value,
Codec: update.codec.CodecID(),
Key: key,
Value: update.value,
Codec: update.codec.CodecID(),
Deleted: deleted,
UpdateTimeMillis: updateTimeMillis(updated),
},
Version: version,
Changes: changes,
Expand Down Expand Up @@ -1496,6 +1504,10 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) {
}

func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, bool, time.Time, error) {
// Even if there is no change to the Mergeable, we still may need to update the timestamp and deleted state.
if len(incomingData) == 0 {
incomingData = emptySnappyEncodedData
}
decodedValue, err := codec.Decode(incomingData)
if err != nil {
return nil, 0, false, time.Time{}, fmt.Errorf("failed to decode value: %v", err)
Expand All @@ -1522,6 +1534,12 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue
// This is safe because the entire function runs under the store lock; we do not return
// the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS).
curr := m.store[key]

// if current entry is nil but the incoming for that key is deleted then we return no change, as we do not want to revive the entry.
if curr.value == nil && deleted {
return nil, 0, false, time.Time{}, err
}

// if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set.
if casVersion > 0 && curr.Version != casVersion {
return nil, 0, false, time.Time{}, errVersionMismatch
Expand All @@ -1530,13 +1548,11 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue
if err != nil {
return nil, 0, false, time.Time{}, err
}

newVersion = curr.Version + 1
newUpdated = curr.UpdateTime
newDeleted = curr.Deleted

// If incoming value is newer, use its timestamp and deleted value
if !updateTime.IsZero() && updateTime.After(newUpdated) {
if !updateTime.IsZero() && updateTime.After(newUpdated) && deleted {
newUpdated = updateTime
newDeleted = deleted
}
Expand Down Expand Up @@ -1566,6 +1582,12 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue
}
}

if change == nil && curr.Deleted != newDeleted {
// return result as change if the only thing that changes is the Delete state of the entry.
change = result
}

newVersion = curr.Version + 1
m.store[key] = ValueDesc{
value: result,
Version: newVersion,
Expand Down Expand Up @@ -1664,7 +1686,7 @@ func (m *KV) cleanupObsoleteEntries() {

for k, v := range m.store {
if v.Deleted && time.Since(v.UpdateTime) > m.cfg.ObsoleteEntriesTimeout {
level.Debug(m.logger).Log("msg", "deleting entry from KV store", "key", k)
level.Info(m.logger).Log("msg", "deleting entry from KV store", "key", k)
delete(m.store, k)
}
}
Expand Down Expand Up @@ -1695,19 +1717,3 @@ func updateTimeMillis(ts time.Time) int64 {
}
return ts.UnixMilli()
}

func handlePossibleNilEncode(codec codec.Codec, change Mergeable) ([]byte, error) {
if change == nil {
return []byte{}, nil
}

return codec.Encode(change)
}

func handlePossibleNilMergeContent(change Mergeable) []string {
if change == nil {
return []string{}
}

return change.MergeContent()
}
40 changes: 25 additions & 15 deletions kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/test"
)

const ACTIVE = 1
Expand Down Expand Up @@ -252,6 +253,16 @@ func getLocalhostAddrs() []string {
return []string{localhostIP}
}

func checkMemberlistEntry(t *testing.T, kv *Client, key string, duration time.Duration) {
test.Poll(t, duration, nil, func() interface{} {
val := get(t, kv, key)
if val != nil {
return fmt.Errorf("expected nil, got: %v", val)
}
return nil
})
}

func TestBasicGetAndCas(t *testing.T) {
c := dataCodec{}

Expand Down Expand Up @@ -580,6 +591,8 @@ func TestDelete(t *testing.T) {

c := dataCodec{}

reg := prometheus.NewRegistry()

var cfg KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = TCPTransportConfig{
Expand All @@ -588,11 +601,11 @@ func TestDelete(t *testing.T) {
}
cfg.GossipNodes = 1
cfg.GossipInterval = 100 * time.Millisecond
cfg.ObsoleteEntriesTimeout = 1 * time.Second
cfg.ObsoleteEntriesTimeout = 500 * time.Millisecond
cfg.ClusterLabelVerificationDisabled = true
cfg.Codecs = []codec.Codec{c}

mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())
mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, reg)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv))
defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck

Expand All @@ -614,12 +627,14 @@ func TestDelete(t *testing.T) {
t.Fatalf("Failed to delete key %s: %v", key, err)
}

time.Sleep(2 * time.Second) // wait for obsolete entries to be removed
val = get(t, kv, key)
checkMemberlistEntry(t, kv, key, 2*time.Second)

if val != nil {
t.Errorf("Expected nil, got: %v", val)
}
// Validate that there are no encoding errors during the Delete flow.
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP memberlist_client_messages_to_broadcast_dropped_total Number of broadcast messages intended to be sent but were dropped due to encoding errors or for being too big
# TYPE memberlist_client_messages_to_broadcast_dropped_total counter
memberlist_client_messages_to_broadcast_dropped_total 0
`), "memberlist_client_messages_to_broadcast_dropped_total"))
}

func TestDeleteMultipleClients(t *testing.T) {
Expand Down Expand Up @@ -676,14 +691,9 @@ func TestDeleteMultipleClients(t *testing.T) {
t.Fatalf("Failed to delete key %s: %v", key, err)
}

time.Sleep(5 * deleteTime) // wait for obsolete entries to be removed

val, err = kv1.Get(context.Background(), key)
require.NoError(t, err)
require.Nil(t, val)
val, err = kv2.Get(context.Background(), key)
require.NoError(t, err)
require.Nil(t, val)
// wait for the obselete entries to be removed.
checkMemberlistEntry(t, kv1, key, 10*deleteTime)
checkMemberlistEntry(t, kv2, key, 10*deleteTime)
}

func TestMultipleClients(t *testing.T) {
Expand Down

0 comments on commit e83d24e

Please # to comment.