Skip to content

Commit 06e0523

Browse files
craig[bot]hakuuwwstevendanna
committed
142239: raft: introduce term cache r=pav-kv,tbg a=hakuuww This PR introduces a new sub data structure `termCache` to `raftLog`, which stores a suffix of the `raftLog` in a compressed representation, and helps getting a term of a particular raft entry. `termCache` integrates tightly with `raftLog`, which means `termCache` replies on many assumptions guaranteed by `raftLog`, allowing concise implementation. --- First of all, this is an example of what a raftLog may look like: entryID: term/index `[t5/10, t5/11, t5/12, t5/13, t6/14, t6/15, t6/16, t6/17, t6/18, t6/19, t7/20, t7/21, t7/22, t7/23, t7/24, t7/25, t7/26, t7/27, t8/28, t8/29, t8/30, t8/31, t10/32, t10/33, t10/34]` properties of a raftLog: entryID.Index are strictly increasing and continuous. entryID.term are monotonically increasing, and can have gaps in between. --- Those properties allow us to use term change points to express a long, continous raftLog. The above example raftLog can be expressed as the following in our `termCache` representation: (each entry is a term change point) `[t5/10, t6/14, t7/20, t8/28, t10/32]` In practice, a raftLog may be hundreds of entries long, but with only a few term changes in between. So this compressed representation allows us to represent a long raftLog's entryIDs cheaply. --- One immediate benefit of doing so is that there should no longer be any [raftEntry cache accesses or pebble calls when we want to know the term of a storage persisted entry](https://github.com/cockroachdb/cockroach/blob/e587879be8cd0f1ace03952decf6dda2573f0b56/pkg/kv/kvserver/logstore/logstore.go#L614-L639). (this is assuming term flips are rare, we can still have pebble access if we want to know the term of a very early entry that is more than `termCacheSize` terms old). This helps avoid: - unhelpful evictions on the raftEntry cache - pebble access Currently, both of the above scenarios doesn't incur a big cost, but we can still save a few --- A second benefit is that: since we now keep a compressed representation of suffix of a raftLog, we can use this to carry more information in the raft leader probing follower process. Currently, a raft message MsgAppResp{reject = true} from the follower only carries a single hintIndex and hintTerm. With the term cache, we can include more information about the raftLog of a follower in its MsgAppResp with relatively low overhead. Which can be used to reduce the rtt involved in the leader/follower probing process. Assuming we keep a few(say 4) term change points in the 'termCache', we can attach all 4 of those data points into our raft RPC messages. Which should be enough to cover the whole raftLog of a raft node. The term cache covers entryIDs in the following range: `[raftLog.first, raftLog.last]` or something like: `[entryID at commited index, raftLog.lastIndex]` (in real implementation we also need to attach a lastIndex, which the term cache doesn't keep, but is kept in unstable/raftLog) When receiving this `termCache` information from a `MsgAppResp{reject=true}` or `MsgVoteResp`, the leader can immediately know the accurate fork point of where to send the next MsgApp. instead of doing a few probing rtts to find the fork point. (our current probing algorithm may take 2-3 rtts between Leader and follower to find a fork point in a bad raft case involving multiple leadership changes and partitions) Part of #136296 Epic: None Release note: None 143127: kvserver: add per-operation lock reliability settings r=yuzefovich a=stevendanna Preserving unreplicated locks during split, merge, and lease transfers have different trade offs. For instance, during a split all lock updates are done in memory without any new replicated writes, whereas for merge and lease transfers requiring replicating locks through raft. Here, we put the different operations under different settings since we may want to ship different defaults for the different operations. Epic: none Release note: None Co-authored-by: Anthony Xu <anthony.xu@cockroachlabs.com> Co-authored-by: Steven Danna <danna@cockroachlabs.com>
3 parents 263f14f + 03b193f + e1318fa commit 06e0523

10 files changed

+571
-15
lines changed

pkg/kv/kvserver/client_replica_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -5781,7 +5781,7 @@ func TestLeaseTransferReplicatesLocks(t *testing.T) {
57815781
// txn2 is never unblocked (from the perspective of the client).
57825782
ctx := context.Background()
57835783
st := cluster.MakeClusterSettings()
5784-
concurrency.UnreplicatedLockReliability.Override(ctx, &st.SV, true)
5784+
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
57855785
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
57865786
ServerArgs: base.TestServerArgs{
57875787
Settings: st,
@@ -5891,7 +5891,7 @@ func TestMergeReplicatesLocks(t *testing.T) {
58915891
ctx = context.Background()
58925892
st = cluster.MakeClusterSettings()
58935893
)
5894-
concurrency.UnreplicatedLockReliability.Override(ctx, &st.SV, true)
5894+
concurrency.UnreplicatedLockReliabilityMerge.Override(ctx, &st.SV, true)
58955895

58965896
for _, b := range []bool{true, false} {
58975897
name := "lhs-lock"

pkg/kv/kvserver/concurrency/concurrency_manager.go

+27-9
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,31 @@ var BatchPushedLockResolution = settings.RegisterBoolSetting(
114114
true,
115115
)
116116

117-
// UnreplicatedLockReliability controls whether the replica will attempt
118-
// to keep unreplicated locks during node operations such as split.
119-
var UnreplicatedLockReliability = settings.RegisterBoolSetting(
117+
// UnreplicatedLockReliabilitySplit controls whether the replica will attempt
118+
// to keep unreplicated locks during range split operations.
119+
var UnreplicatedLockReliabilitySplit = settings.RegisterBoolSetting(
120120
settings.SystemOnly,
121-
"kv.lock_table.unreplicated_lock_reliability.enabled",
122-
"whether the replica should attempt to keep unreplicated locks during various node operations",
123-
metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.enabled", true),
121+
"kv.lock_table.unreplicated_lock_reliability.split.enabled",
122+
"whether the replica should attempt to keep unreplicated locks during range splits",
123+
metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.split.enabled", true),
124+
)
125+
126+
// UnreplicatedLockReliabilityLeaseTransfer controls whether the replica will attempt
127+
// to keep unreplicated locks during lease transfer operations.
128+
var UnreplicatedLockReliabilityLeaseTransfer = settings.RegisterBoolSetting(
129+
settings.SystemOnly,
130+
"kv.lock_table.unreplicated_lock_reliability.lease_transfer.enabled",
131+
"whether the replica should attempt to keep unreplicated locks during lease transfers",
132+
metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.lease_transfer.enabled", true),
133+
)
134+
135+
// UnreplicatedLockReliabilityMerge controls whether the replica will
136+
// attempt to keep unreplicated locks during range merge operations.
137+
var UnreplicatedLockReliabilityMerge = settings.RegisterBoolSetting(
138+
settings.SystemOnly,
139+
"kv.lock_table.unreplicated_lock_reliability.merge.enabled",
140+
"whether the replica should attempt to keep unreplicated locks during range merges",
141+
metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.merge.enabled", true),
124142
)
125143

126144
// managerImpl implements the Manager interface.
@@ -590,7 +608,7 @@ var allKeysSpan = roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}
590608

591609
// OnRangeLeaseTransferEval implements the RangeStateListener interface.
592610
func (m *managerImpl) OnRangeLeaseTransferEval() []*roachpb.LockAcquisition {
593-
if !UnreplicatedLockReliability.Get(&m.st.SV) {
611+
if !UnreplicatedLockReliabilityLeaseTransfer.Get(&m.st.SV) {
594612
return nil
595613
}
596614

@@ -606,7 +624,7 @@ func (m *managerImpl) OnRangeLeaseTransferEval() []*roachpb.LockAcquisition {
606624
// during evalutation of Subsume. The returned LockAcquisition structs represent
607625
// held locks that we may want to flush to disk as replicated.
608626
func (m *managerImpl) OnRangeSubsumeEval() []*roachpb.LockAcquisition {
609-
if !UnreplicatedLockReliability.Get(&m.st.SV) {
627+
if !UnreplicatedLockReliabilityMerge.Get(&m.st.SV) {
610628
return nil
611629
}
612630

@@ -636,7 +654,7 @@ func (m *managerImpl) OnRangeLeaseUpdated(seq roachpb.LeaseSequence, isLeasehold
636654
// LHS replica of a split and should be passed the new RHS start key (LHS
637655
// EndKey).
638656
func (m *managerImpl) OnRangeSplit(rhsStartKey roachpb.Key) []roachpb.LockAcquisition {
639-
if UnreplicatedLockReliability.Get(&m.st.SV) {
657+
if UnreplicatedLockReliabilitySplit.Get(&m.st.SV) {
640658
lockToMove := m.lt.ClearGE(rhsStartKey)
641659
m.twq.ClearGE(rhsStartKey)
642660
return lockToMove

pkg/kv/kvserver/concurrency/concurrency_manager_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -735,7 +735,9 @@ func newClusterWithSettings(st *clustersettings.Settings) *cluster {
735735
// Set the latch manager's long latch threshold to infinity to disable
736736
// logging, which could cause a test to erroneously fail.
737737
spanlatch.LongLatchHoldThreshold.Override(context.Background(), &st.SV, math.MaxInt64)
738-
concurrency.UnreplicatedLockReliability.Override(context.Background(), &st.SV, true)
738+
concurrency.UnreplicatedLockReliabilitySplit.Override(context.Background(), &st.SV, true)
739+
concurrency.UnreplicatedLockReliabilityMerge.Override(context.Background(), &st.SV, true)
740+
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(context.Background(), &st.SV, true)
739741
manual := timeutil.NewManualTime(timeutil.Unix(123, 0))
740742
return &cluster{
741743
nodeDesc: &roachpb.NodeDescriptor{NodeID: 1},

pkg/kv/kvserver/replica_command.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -915,7 +915,7 @@ func (r *Replica) AdminMerge(
915915
// This must be a single request in a BatchRequest: there are multiple
916916
// places that do special logic (needed for safety) that rely on
917917
// BatchRequest.IsSingleSubsumeRequest() returning true.
918-
shouldPreserveLocks := concurrency.UnreplicatedLockReliability.Get(&r.ClusterSettings().SV)
918+
shouldPreserveLocks := concurrency.UnreplicatedLockReliabilityMerge.Get(&r.ClusterSettings().SV)
919919
br, pErr := kv.SendWrapped(ctx, r.store.DB().NonTransactionalSender(),
920920
&kvpb.SubsumeRequest{
921921
RequestHeader: kvpb.RequestHeader{

pkg/raft/BUILD.bazel

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"rawnode.go",
1414
"status.go",
1515
"storage.go",
16+
"term_cache.go",
1617
"testing_knobs.go",
1718
"types.go",
1819
"util.go",
@@ -50,6 +51,7 @@ go_test(
5051
"raft_test.go",
5152
"rawnode_test.go",
5253
"storage_test.go",
54+
"term_cache_test.go",
5355
"types_test.go",
5456
"util_test.go",
5557
],

pkg/raft/log.go

+28-2
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,16 @@ type LogSnapshot struct {
4242
storage LogStorage
4343
// unstable contains the unstable log entries.
4444
unstable LeadSlice
45+
// termCache contains a compressed entryID suffix of raftLog.
46+
termCache termCache
4547
// logger gives access to logging errors.
4648
logger raftlogger.Logger
4749
}
4850

51+
// termCacheSize is the default max size of the termCache. It is small because
52+
// term flips are very rare in practice.
53+
const termCacheSize = 4
54+
4955
type raftLog struct {
5056
// storage contains all stable entries since the last snapshot.
5157
storage Storage
@@ -54,6 +60,10 @@ type raftLog struct {
5460
// they will be saved into storage.
5561
unstable unstable
5662

63+
// termCache contains a suffix of the raftLog (both stable and unstable)
64+
// used for term lookup.
65+
termCache termCache
66+
5767
// committed is the highest log position that is known to be in
5868
// stable storage on a quorum of nodes.
5969
committed uint64
@@ -108,6 +118,7 @@ func newLogWithSize(
108118
return &raftLog{
109119
storage: storage,
110120
unstable: newUnstable(last, logger),
121+
termCache: newTermCache(termCacheSize, last),
111122
maxApplyingEntsSize: maxApplyingEntsSize,
112123

113124
// Initialize our committed and applied pointers to the time of the last
@@ -177,15 +188,23 @@ func (l *raftLog) maybeAppend(a LeadSlice) bool {
177188
if first := a.entries[0].Index; first <= l.committed {
178189
l.logger.Panicf("entry %d is already committed [committed(%d)]", first, l.committed)
179190
}
180-
return l.unstable.truncateAndAppend(a)
191+
if !l.unstable.truncateAndAppend(a) {
192+
return false
193+
}
194+
l.termCache.truncateAndAppend(a.LogSlice)
195+
return true
181196
}
182197

183198
// append adds the given log slice to the end of the log.
184199
//
185200
// Returns false if the operation can not be done: entry a.prev does not match
186201
// the lastEntryID of this log, or a.term is outdated.
187202
func (l *raftLog) append(a LeadSlice) bool {
188-
return l.unstable.append(a)
203+
if l.unstable.append(a) {
204+
l.termCache.truncateAndAppend(a.LogSlice)
205+
return true
206+
}
207+
return false
189208
}
190209

191210
// match finds the longest prefix of the given log slice that matches the log.
@@ -449,6 +468,9 @@ func (l LogSnapshot) term(index uint64) (uint64, error) {
449468
return 0, ErrCompacted
450469
}
451470

471+
if term, found := l.termCache.term(index); found {
472+
return term, nil
473+
}
452474
term, err := l.storage.Term(index)
453475
if err == nil {
454476
return term, nil
@@ -516,6 +538,7 @@ func (l *raftLog) restore(s snapshot) bool {
516538
if !l.unstable.restore(s) {
517539
return false
518540
}
541+
l.termCache.reset(id)
519542
l.committed = id.index
520543
return true
521544
}
@@ -667,10 +690,13 @@ func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 {
667690
// snap returns a point-in-time snapshot of the raft log. This snapshot can be
668691
// read from while the underlying storage is not mutated.
669692
func (l *raftLog) snap(storage LogStorage) LogSnapshot {
693+
// NB: termCache and unstable slice are safe to copy, and make sure to not
694+
// corrupt their shallow copies.
670695
return LogSnapshot{
671696
compacted: l.compacted(),
672697
storage: storage,
673698
unstable: l.unstable.LeadSlice,
699+
termCache: l.termCache,
674700
logger: l.logger,
675701
}
676702
}

pkg/raft/log_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,76 @@ func TestStableTo(t *testing.T) {
586586
}
587587
}
588588

589+
// TestTermCacheLookUpAfterStableTo tests the term cache lookup after we have
590+
// persisted entries from unstable to stable. The test asserts that the term
591+
// cache is used for lookups when possible and that the storage is not accessed.
592+
func TestTermCacheLookUpAfterStableTo(t *testing.T) {
593+
for _, tt := range []struct {
594+
init LeadSlice
595+
stableTo LogMark
596+
wantTermCalls int // the expected number of LogStorage.Term() calls
597+
}{{
598+
init: entryID{}.append(1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5),
599+
stableTo: LogMark{Term: 5, Index: 10},
600+
wantTermCalls: 2, // indices 0-1
601+
}, {
602+
init: entryID{}.append(1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9),
603+
stableTo: LogMark{Term: 9, Index: 10},
604+
wantTermCalls: 6, // indices 0-5
605+
}, {
606+
init: entryID{}.append(1, 2, 3),
607+
stableTo: LogMark{Term: 3, Index: 2},
608+
wantTermCalls: 0,
609+
}, {
610+
init: entryID{}.append(1, 2, 3, 4),
611+
stableTo: LogMark{Term: 4, Index: 3},
612+
wantTermCalls: 1, // index 0
613+
}, {
614+
init: entryID{}.append(1, 2, 3, 4, 5),
615+
stableTo: LogMark{Term: 5, Index: 4},
616+
wantTermCalls: 2, // indices 0-1
617+
}, {
618+
init: entryID{}.append(1, 2, 3, 4, 5, 5, 5, 5, 5),
619+
stableTo: LogMark{Term: 5, Index: 7},
620+
wantTermCalls: 2, // indices 0-1
621+
}, {
622+
init: entryID{}.append(1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3),
623+
stableTo: LogMark{Term: 3, Index: 10},
624+
wantTermCalls: 0,
625+
}, {
626+
init: entryID{index: 10, term: 4}.append(5, 5, 6, 6, 7, 7, 8, 8, 9, 9),
627+
stableTo: LogMark{Term: 9, Index: 16},
628+
wantTermCalls: 3, // indices 10-12
629+
}} {
630+
t.Run("", func(t *testing.T) {
631+
// Initialize the log storage to a particular truncated state.
632+
storage := NewMemoryStorage()
633+
initID := tt.init.prev
634+
if initID.index != 0 {
635+
require.NoError(t, storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{
636+
Index: initID.index, Term: initID.term,
637+
}}))
638+
}
639+
// Initialize the raft log from the storage.
640+
raftLog := newLog(storage, raftlogger.DiscardLogger)
641+
// Manually hardcode the term cache size for testing.
642+
raftLog.termCache.maxSize = 4
643+
require.True(t, raftLog.append(tt.init))
644+
// Imitate a transfer of some unstable entries into storage.
645+
require.NoError(t, storage.Append(tt.init.sub(initID.index, tt.stableTo.Index)))
646+
raftLog.stableTo(tt.stableTo)
647+
648+
// Do term lookup for the parts of raftLog not covered by unstable.
649+
start, end := tt.init.LogSlice.prev.index, raftLog.unstable.prev.index
650+
for i := start; i <= end; i++ {
651+
_, err := raftLog.term(i)
652+
require.NoError(t, err)
653+
}
654+
require.Equal(t, tt.wantTermCalls, storage.callStats.term-1)
655+
})
656+
}
657+
}
658+
589659
func TestStableToWithSnap(t *testing.T) {
590660
snapID := entryID{term: 2, index: 5}
591661
snap := pb.Snapshot{Metadata: pb.SnapshotMetadata{Term: snapID.term, Index: snapID.index}}
@@ -674,6 +744,9 @@ func TestLogRestore(t *testing.T) {
674744
require.Equal(t, index, raftLog.committed)
675745
require.Equal(t, index, raftLog.unstable.prev.index)
676746
require.Equal(t, term, mustTerm(raftLog.term(index)))
747+
// Term cache should be re-initialized to only have the snapshot entryID.
748+
require.Equal(t, entryID{index: index, term: term}, raftLog.termCache.first())
749+
require.Equal(t, 1, len(raftLog.termCache.cache))
677750
}
678751

679752
func TestIsOutOfBounds(t *testing.T) {

pkg/raft/storage.go

+1
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ func MakeLogSnapshot(ms *MemoryStorage) LogSnapshot {
334334
compacted: ms.Compacted(),
335335
storage: ms.LogSnapshot(),
336336
unstable: LeadSlice{term: ls.lastEntryID().term, LogSlice: ls},
337+
termCache: newTermCache(1, ms.ls.lastEntryID()),
337338
logger: raftlogger.DiscardLogger,
338339
}
339340
}

0 commit comments

Comments
 (0)