forked from etcd-io/bbolt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtx_check.go
225 lines (195 loc) · 7.67 KB
/
tx_check.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package bbolt
import (
"encoding/hex"
"fmt"
"go.etcd.io/bbolt/internal/common"
)
// Check performs several consistency checks on the database for this transaction.
// An error is returned if any inconsistency is found.
//
// It can be safely run concurrently on a writable transaction. However, this
// incurs a high cost for large databases and databases with a lot of subbuckets
// because of caching. This overhead can be removed if running on a read-only
// transaction, however, it is not safe to execute other writer transactions at
// the same time.
//
// It also allows users to provide a customized `KVStringer` implementation,
// so that bolt can generate human-readable diagnostic messages.
func (tx *Tx) Check(options ...CheckOption) <-chan error {
chkConfig := checkConfig{
kvStringer: HexKVStringer(),
}
for _, op := range options {
op(&chkConfig)
}
ch := make(chan error)
go tx.check(chkConfig.kvStringer, ch)
return ch
}
func (tx *Tx) check(kvStringer KVStringer, ch chan error) {
// Force loading free list if opened in ReadOnly mode.
tx.db.loadFreelist()
// Check if any pages are double freed.
freed := make(map[common.Pgid]bool)
all := make([]common.Pgid, tx.db.freelist.count())
tx.db.freelist.copyall(all)
for _, id := range all {
if freed[id] {
ch <- fmt.Errorf("page %d: already freed", id)
}
freed[id] = true
}
// Track every reachable page.
reachable := make(map[common.Pgid]*common.Page)
reachable[0] = tx.page(0) // meta0
reachable[1] = tx.page(1) // meta1
if tx.meta.Freelist() != common.PgidNoFreelist {
for i := uint32(0); i <= tx.page(tx.meta.Freelist()).Overflow(); i++ {
reachable[tx.meta.Freelist()+common.Pgid(i)] = tx.page(tx.meta.Freelist())
}
}
// Recursively check buckets.
tx.checkBucket(&tx.root, reachable, freed, kvStringer, ch)
// Ensure all pages below high water mark are either reachable or freed.
for i := common.Pgid(0); i < tx.meta.Pgid(); i++ {
_, isReachable := reachable[i]
if !isReachable && !freed[i] {
ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
}
}
// Close the channel to signal completion.
close(ch)
}
func (tx *Tx) checkBucket(b *Bucket, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool,
kvStringer KVStringer, ch chan error) {
// Ignore inline buckets.
if b.RootPage() == 0 {
return
}
// Check every page used by this bucket.
b.tx.forEachPage(b.RootPage(), func(p *common.Page, _ int, stack []common.Pgid) {
if p.Id() > tx.meta.Pgid() {
ch <- fmt.Errorf("page %d: out of bounds: %d (stack: %v)", int(p.Id()), int(b.tx.meta.Pgid()), stack)
}
// Ensure each page is only referenced once.
for i := common.Pgid(0); i <= common.Pgid(p.Overflow()); i++ {
var id = p.Id() + i
if _, ok := reachable[id]; ok {
ch <- fmt.Errorf("page %d: multiple references (stack: %v)", int(id), stack)
}
reachable[id] = p
}
// We should only encounter un-freed leaf and branch pages.
if freed[p.Id()] {
ch <- fmt.Errorf("page %d: reachable freed", int(p.Id()))
} else if !p.IsBranchPage() && !p.IsLeafPage() {
ch <- fmt.Errorf("page %d: invalid type: %s (stack: %v)", int(p.Id()), p.Typ(), stack)
}
})
tx.recursivelyCheckPages(b.RootPage(), kvStringer.KeyToString, ch)
// Check each bucket within this bucket.
_ = b.ForEachBucket(func(k []byte) error {
if child := b.Bucket(k); child != nil {
tx.checkBucket(child, reachable, freed, kvStringer, ch)
}
return nil
})
}
// recursivelyCheckPages confirms database consistency with respect to b-tree
// key order constraints:
// - keys on pages must be sorted
// - keys on children pages are between 2 consecutive keys on the parent's branch page).
func (tx *Tx) recursivelyCheckPages(pgId common.Pgid, keyToString func([]byte) string, ch chan error) {
tx.recursivelyCheckPagesInternal(pgId, nil, nil, nil, keyToString, ch)
}
// recursivelyCheckPagesInternal verifies that all keys in the subtree rooted at `pgid` are:
// - >=`minKeyClosed` (can be nil)
// - <`maxKeyOpen` (can be nil)
// - Are in right ordering relationship to their parents.
// `pagesStack` is expected to contain IDs of pages from the tree root to `pgid` for the clean debugging message.
func (tx *Tx) recursivelyCheckPagesInternal(
pgId common.Pgid, minKeyClosed, maxKeyOpen []byte, pagesStack []common.Pgid,
keyToString func([]byte) string, ch chan error) (maxKeyInSubtree []byte) {
p := tx.page(pgId)
pagesStack = append(pagesStack, pgId)
switch {
case p.IsBranchPage():
// For branch page we navigate ranges of all subpages.
runningMin := minKeyClosed
for i := range p.BranchPageElements() {
elem := p.BranchPageElement(uint16(i))
verifyKeyOrder(elem.Pgid(), "branch", i, elem.Key(), runningMin, maxKeyOpen, ch, keyToString, pagesStack)
maxKey := maxKeyOpen
if i < len(p.BranchPageElements())-1 {
maxKey = p.BranchPageElement(uint16(i + 1)).Key()
}
maxKeyInSubtree = tx.recursivelyCheckPagesInternal(elem.Pgid(), elem.Key(), maxKey, pagesStack, keyToString, ch)
runningMin = maxKeyInSubtree
}
return maxKeyInSubtree
case p.IsLeafPage():
runningMin := minKeyClosed
for i := range p.LeafPageElements() {
elem := p.LeafPageElement(uint16(i))
verifyKeyOrder(pgId, "leaf", i, elem.Key(), runningMin, maxKeyOpen, ch, keyToString, pagesStack)
runningMin = elem.Key()
}
if p.Count() > 0 {
return p.LeafPageElement(p.Count() - 1).Key()
}
default:
ch <- fmt.Errorf("unexpected page type for pgId:%d", pgId)
}
return maxKeyInSubtree
}
/***
* verifyKeyOrder checks whether an entry with given #index on pgId (pageType: "branch|leaf") that has given "key",
* is within range determined by (previousKey..maxKeyOpen) and reports found violations to the channel (ch).
*/
func verifyKeyOrder(pgId common.Pgid, pageType string, index int, key []byte, previousKey []byte, maxKeyOpen []byte, ch chan error, keyToString func([]byte) string, pagesStack []common.Pgid) {
if index == 0 && previousKey != nil && compareKeys(previousKey, key) > 0 {
ch <- fmt.Errorf("the first key[%d]=(hex)%s on %s page(%d) needs to be >= the key in the ancestor (%s). Stack: %v",
index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
}
if index > 0 {
cmpRet := compareKeys(previousKey, key)
if cmpRet > 0 {
ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be > (found <) than previous element (hex)%s. Stack: %v",
index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
}
if cmpRet == 0 {
ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be > (found =) than previous element (hex)%s. Stack: %v",
index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
}
}
if maxKeyOpen != nil && compareKeys(key, maxKeyOpen) >= 0 {
ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be < than key of the next element in ancestor (hex)%s. Pages stack: %v",
index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
}
}
// ===========================================================================================
type checkConfig struct {
kvStringer KVStringer
}
type CheckOption func(options *checkConfig)
func WithKVStringer(kvStringer KVStringer) CheckOption {
return func(c *checkConfig) {
c.kvStringer = kvStringer
}
}
// KVStringer allows to prepare human-readable diagnostic messages.
type KVStringer interface {
KeyToString([]byte) string
ValueToString([]byte) string
}
// HexKVStringer serializes both key & value to hex representation.
func HexKVStringer() KVStringer {
return hexKvStringer{}
}
type hexKvStringer struct{}
func (_ hexKvStringer) KeyToString(key []byte) string {
return hex.EncodeToString(key)
}
func (_ hexKvStringer) ValueToString(value []byte) string {
return hex.EncodeToString(value)
}