-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbsim.py
1791 lines (1457 loc) · 62.5 KB
/
dbsim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import argparse, collections, random
from sim import *
dict_of_sets = lambda: collections.defaultdict(set)
dict_of_dicts = lambda: collections.defaultdict(dict)
'''A module for performing database workload simulations.
It provides infrastructure for creating clients and transactions, as
well as automatic detection of isolation and serialization failures.
The simulated database contains a fixed number of records, each of
which contains the PID of the last transaction to update it. The
database model handles the actual storage, but whatever values it
returns are vetted for isolation and serialization errors.
'''
def tarjan(tx_deps, include_trivial=False):
'''A vanilla implementation of Tarjan's algorithm.
This variant is not used directly, but rather serves as a
reference point for specialized variants we do use.
WARNING: it's pretty easy to max out python's recursion limit if
you throw a large graph at this function...
'''
last_index,index = 0,{}
def safe_index(dep):
nonlocal last_index
j = index.setdefault(dep, last_index+1)
if j > last_index:
last_index += 1
return j,True
return j,False
low,stack,s,scs = {}, set(), [], []
def connect(pid, i, deps):
ilow = low[i] = i
s.append(pid)
stack.add(i)
for dep in deps:
j,unseen = safe_index(dep)
if unseen:
jlow = connect(dep, j, tx_deps.get(dep, ()))
if jlow < ilow:
ilow = low[i] = jlow
elif j in stack and j < ilow:
ilow = low[i] = j
if ilow == i:
sc,dep = [],None
while pid != dep:
dep = s.pop()
stack.remove(index[dep])
sc.append(dep)
if len(sc) > 1 or include_trivial:
scs.append(sc)
return ilow
for pid,deps in tx_deps.items():
i,unseen = safe_index(pid)
if unseen:
connect(pid, i, deps)
return scs
def tarjan_iter(tx_deps, include_trivial=False):
'''A vanilla implementation of Tarjan's algorithm.
It turns out to be pretty easy to max out python's recursion limit
if you throw a large graph at this function, so we cheat and use
generators to make an iterative algorithm of sorts.
'''
last_index,index = 0,{}
def safe_index(dep):
nonlocal last_index
j = index.setdefault(dep, last_index+1)
if j > last_index:
last_index += 1
return j,True
return j,False
low,stack,s,scs = {}, set(), [], []
def connect(pid, i, deps):
ilow = low[i] = i
s.append(pid)
stack.add(i)
for dep in deps:
j,unseen = safe_index(dep)
if unseen:
jlow = (yield connect(dep, j, tx_deps.get(dep, ())))
if jlow < ilow:
ilow = low[i] = jlow
elif j in stack and j < ilow:
ilow = low[i] = j
if ilow == i:
sc,dep = [],None
while pid != dep:
dep = s.pop()
stack.remove(index[dep])
sc.append(dep)
if len(sc) > 1 or include_trivial:
scs.append(sc)
return ilow
def do_it():
for pid,deps in tx_deps.items():
i,unseen = safe_index(pid)
if unseen:
yield connect(pid, i, deps)
todos = [do_it(),None]
while len(todos) > 1:
args = todos.pop()
action = todos[-1]
try:
rval = action.send(args)
except StopIteration as stop:
# resume the parent generator
todos[-1] = stop.value
else:
# rval contains a sub-generator to exhaust first
todos.append(rval)
todos.append(None)
return scs
def get_rids_from_subgraph(tx_deps, tids):
tids,rids = set(tids),set()
for tid in tids:
for dep,(rid,edge) in tx_deps[tid].items():
if dep in tids:
rids.add(rid)
return tids,rids
def subgraph_to_dot(out, tx_deps, tids=None, core={}):
if tids is None:
tids = set(tx_deps)
out = make_log(out)
tids,rids = get_rids_from_subgraph(tx_deps, tids)
out('\ndigraph G {')
for pid in tids:
if pid in core:
out('\t"T%d" [ color="red" ]', pid)
deps = tx_deps.get(pid,{})
for dep,(rid,tp) in deps.items():
x,y = tp.split(':')
if dep in tids:
out('\t"T%d" -> "T%s" [ label="%s:R%s:%s" ]', pid, dep, y, rid, x)
errlog('}\n')
def test_tarjan():
deps = {
1: {2, 9},
2: {3},
3: {1},
4: {3, 5},
5: {4, 6},
6: {3, 7},
7: {6},
8: {5, 8},
10: {8},
}
scs = tarjan_iter(deps, True)
for sc in scs:
sc.sort()
errlog('%s', ' '.join(str(x) for x in sc))
def tarjan_incycle(tx_deps, who):
'''A variant of Tarjan's algorithm that only cares about the SCC that
a given node belongs to. If "who" belongs to a non-trivial SCC,
return the cluster. Otherwise, return None.
'''
last_index,index = 0,{}
def safe_index(dep):
nonlocal last_index
j = index.setdefault(dep, last_index+1)
if j > last_index:
last_index += 1
return j,True
return j,False
low,stack,s,scs = {}, set(), [], []
scc = None
def connect(pid, i, deps):
nonlocal scc
ilow = low[i] = i
s.append(pid)
stack.add(i)
for dep in deps:
j,unseen = safe_index(dep)
if unseen:
jlow = connect(dep, j, tx_deps.get(dep, ()))
if jlow < ilow:
ilow = low[i] = jlow
elif j in stack and j < ilow:
ilow = low[i] = j
if ilow == i:
sc,dep,found = [],None,False
while pid != dep:
dep = s.pop()
if dep == who:
found = True
stack.remove(index[dep])
sc.append(dep)
if found and len(sc) > 1:
scc = sc
return ilow
i,unseen = safe_index(who)
assert unseen
connect(who, i, tx_deps[who])
return scc
class NamedTuple(object):
def __init__(self, *args, **kwargs):
'''Initialize a new NamedTuple by merging in zero or more existing
NamedTuple objects and adding zero or more additional
properties. Arguments are processed from left to right, with
last updater taking precedence in case of a naming collision.
'''
for arg in args:
self.__dict__.update(arg.__dict__)
self.__dict__.update(kwargs)
def timestamp_generator(start=1):
next_stamp = start
def thunk():
nonlocal next_stamp
rval,next_stamp = next_stamp,next_stamp+1
return rval
return thunk
def tx_tracker(stats, admission_limit, **extra_kwargs):
'''Infrastructure to track in-flight transactions. Tracking can be
enabled and disabled to extract accurate response time
measurements for transactions that begin during a time window of
interest in a continuous stream of transactions.
Note that a transaction should not be reported as "finished" until
the system is ready to forget it completely. In particular, a
transaction cannot finish before any deferred checks for
serialization failures that might be required.
'''
# known[pid] = (begin_stamp, end_stamp, is_commit)
end_unknown,commit_unknown = None,None
tx_known,tx_active = {0:(1,1,True)},set()
tracking_active,tracking_ending = False,False
tx_tracked,tracking_callback = set(),None
get_next_stamp = timestamp_generator(2)
stats.tx_count,stats.tx_commits,stats.tx_aborts = 0,0,0
def begin_tracking():
'''Begin a measurement window.
During a measurement, all transaction arrivals and completions
are counted in order to determine throughput. Further, any
transaction which starts while tracking is active will affect
statistics (isolation, serialization, failures, etc.) and will
be followed to completion even if tracking is later disabled
(to measure latency accurately).
It is an error to begin a new measurement before the previous
one completes (including completion of all stragglers).
'''
nonlocal tracking_active
assert not tracking_active and not tracking_ending
tracking_active = True
def end_tracking(callback=None):
'''Notify the tracker that a measurement window has closed. All
in-flight transactions will be followed to completion, but no
new transactions will be tracked.
The optional callback will be invoked after the last straggler
completes and has been checked for serialization failures.
'''
nonlocal tracking_active,tracking_ending,tracking_callback
assert tracking_active
tracking_active,tracking_ending = False,True
tracking_callback = callback
def on_begin(pid):
'''Notify the tracker of a new transaction, returning its begin
timestamp and whether it is tracked (if the latter is true, it
is part of a measurement and will be followed to completion
even if the measurement window closes first).
If admission control is active and too many transactions are
already in flight, raise an AdmissionControl exception;
completed but unfinalized transactions do not count towards
the admission control limit.
'''
tx_active.add(pid)
begin = get_next_stamp()
tx_known[pid] = (begin, end_unknown, commit_unknown)
if tracking_active:
tx_tracked.add(pid)
if admission_limit and len(tx_active) > admission_limit:
raise AdmissionControl
return begin,tracking_active
tx_precommit = {}
def on_precommit(pid):
'''Notify the tracker that a transaction is attempting to
complete. The returned timestamp will become the transaction's
completion time if it commits.
'''
return tx_precommit.setdefault(pid, get_next_stamp())
def on_finish(pid, is_commit):
'''Notify the tracker that a transaction has completed (abort or
commit, it doesn't matter). Return the transaction's end
timestamp and whether it was tracked.
The transaction will no longer impact admission control, but
will still be tracked until explicitly laid to rest by a call
to on_finalize().
'''
tracked = is_tracked(pid)
if tracked:
stats.tx_count += 1
if is_commit:
stats.tx_commits += 1
else:
stats.tx_aborts += 1
tx_active.remove(pid)
begin,_,_ = tx_known[pid]
end = tx_precommit.pop(pid, None) or get_next_stamp()
tx_known[pid] = (begin, end, is_commit)
return end,tracked
def is_known(pid):
'''Return True if the system knows about the given PID. False
indicates the PID is either unknown, or has already been
finalized and forgotten.
As a special case, PID=0 is permanently known as an invalid
transaction which committed at t=1.
'''
return pid in tx_known or not pid
def is_tracked(pid):
'''Return True if the transaction is known to be tracked.
'''
return pid in tx_tracked
def is_committed(pid):
'''Return the commit timestamp of a transaction (always non-zero) if
it is known to have committed. Return False if the transaction
is known to have aborted, and None if still in flight.
It is an error to call this function for an unknown PID.
'''
_,end,is_commit = tx_known[pid]
return is_commit and end
def get_begin(pid):
'''Return the begin time of a known transaction'''
begin,_,_ = tx_known[pid]
return begin
def get_end(pid):
'''Return the end time of a known transaction. You probably want to
use is_committed() instead.'''
_,end,_ = tx_known[pid]
return end
def on_finalize(pid):
'''Notify the tracker that a transaction has left the system entirely
and can be forgotten. If it was the last tracked transaction,
invoke the tracking callback (if any).
'''
nonlocal tracking_ending, tracking_callback
assert pid not in tx_precommit
tx_tracked.discard(pid)
tx_known.pop(pid)
if tracking_ending and not tx_tracked:
cb,tracking_ending,tracking_callback = tracking_callback,False,None
cb and cb()
def get_stragglers():
'''Return the set of tracked transactions that prevents the simulation
from ending (if tracking is still active, the set is empty).
'''
return iter(tx_tracked) if tracking_ending else ()
return NamedTuple(
begin_tracking=begin_tracking,
end_tracking=end_tracking,
is_tracking_active=lambda: tracking_active,
is_tracking_ending=lambda: tracking_ending,
get_stragglers=get_stragglers,
get_next_stamp=get_next_stamp,
is_known=is_known,
is_tracked=is_tracked,
is_committed=is_committed,
get_begin=get_begin,
get_end=get_end,
on_begin=on_begin,
on_precommit=on_precommit,
on_finish=on_finish,
on_finalize=on_finalize
)
def historic_log():
'''Any real implementation is rather limited in what it can track at
runtime, which makes it difficult or impossible to reconstruct
history when something goes wrong. In particular, it is impossible
to detect cycles with 100% accuracy and precision (no false
positives or negatives) unless the entire history has been stored.
The reason cycles are hard to detect is that a chain of
anti-dependencies could form as follows:
T{i} r:w ... r:W T1 w:r T{i}, with transactions numbered by the
order they commit. This is true even if we limit how long an old
version is available once an overwrite commits. As long as any
write by T1 remains visible (not overwritten), T{i} could close a
cycle simply by accessing it.
In order to provide reliable cycle detection and reconstruction of
history, without allowing implementations to "cheat", we log all
events (read, write, commit) to this module, and request cycle
detection and other services only once the simulation
completes. Implementations can then track as much or as little
information as they want, without giving up diagnostic capability.
'''
get_next_stamp = timestamp_generator()
# tx_reads[pid] = (rid,dep,when)
tx_reads,tx_writes = dict_of_sets(),dict_of_sets()
# tx_endings[pid] = (begin,commit,abort)
tx_tracked = set()
tx_events = {0:(1,1,None)}
R,W,C = 'read', 'overwrite', 'commit'
def on_begin(pid, is_tracked):
if is_tracked:
tx_tracked.add(pid)
when = get_next_stamp()
tx_events[pid] = (when,None,None)
def on_access(pid, rid, dep, is_read):
when = get_next_stamp()
who = tx_reads if is_read else tx_writes
who[pid].add((rid,dep,when))
def on_finish(pid, is_commit):
when = get_next_stamp()
begin,c,a = tx_events[pid]
if is_commit:
c = when
else:
a = when
tx_events[pid] = (begin,c,a)
def get_tx_deps(include_aborts=False, include_inflight=False):
'''Build a consolidated dependency graph, with at most one edge
joining any pair of transactions in each direction.
The graph will normally include only committed transactions
and edges between them, but callers can request to include
aborted and/or in-flight transactions as well.
The graph is suitable for running tarjan(), and includes the
necessary information to reconstruct swim lanes on the
resulting strongly connected components as well.
'''
tx_deps,clobbers = dict_of_dicts(),{}
if include_aborts:
if include_inflight:
# can abort, can be in flight
is_valid = lambda pid: True
else:
# can abort, but must not in flight
is_valid = lambda pid: tx_events[pid][1:3] != (None,None)
else:
if include_inflight:
# no aborts, can be in flight
is_valid = lambda pid: tx_events[pid][2] is None
else:
# must have commited
is_valid = lambda pid: tx_events[pid][1] is not None
for pid,writes in tx_writes.items():
if is_valid(pid):
for rid,dep,when in writes:
if pid != dep and is_valid(dep):
#errlog('%d w:%d:w %d', dep,rid,pid)
tx_deps[pid].setdefault(dep, (rid,'w:w')) # dep w:w pid
clobbers[rid,dep] = pid
for pid,reads in tx_reads.items():
if is_valid(pid):
for rid,dep,when in reads:
if pid != dep and is_valid(dep):
#errlog('%d w:%d:r %d', dep,rid,pid)
tx_deps[pid].setdefault(dep, (rid,'w:r')) # dep w:r pid
clobber = clobbers.get((rid,dep),None)
if clobber:
#errlog('%d r:%d:w %d', pid,rid,clobber)
tx_deps[clobber].setdefault(pid,(rid,'r:w')) # pid r:w clobber
return tx_deps
def get_neighborhood(tx_deps, tid):
'''Return the subset of tx_deps that involves any peer of the given
tid.
'''
# find the time range we care about
first_begin,c,a = tx_events[tid]
last_end = a or c
# find the transactions whose lifetime overlaps
peers = set()
for dep in tx_deps:
begin,c,a = tx_events[dep]
end = a or c
if end < first_begin:
pass
elif last_end < begin:
pass
else:
#if not (end < first_begin or last_end < begin):
peers.add(dep)
assert tid in peers
# find transactions that tid depends on
changed = False
connected = set()
def add(tid):
nonlocal changed
if tid not in connected:
changed = True
errlog('adding %d', tid)
connected.add(tid)
for dep in tx_deps.get(tid,()):
add(dep)
add(tid)
while changed:
changed = False
for dep,xdeps in tx_deps.items():
if dep in connected:
continue
for x in xdeps:
if x in connected:
changed = True
connected.add(dep)
break
peers = connected
assert tid in peers
# now build the sub-graph
result = {}
for tid,deps in tx_deps.items():
if tid not in peers:
continue
deps = {dep:edge for dep,edge in deps.items() if dep in peers}
if deps:
result[tid] = deps
return result
def print_swimlane(out, tx_deps, tids):
out = make_log(out)
tids,rids = get_rids_from_subgraph(tx_deps, tids)
# print header row
out('%s', '\t\t'.join('T%d' % (i+1) for i in range(len(tids))))
# collect relevant reads and writes from all transactions
R,W,C,A = 'read', 'overwrite', 'commit', 'abort'
events = set()
for tid in tids:
try:
begin,c,a = tx_events[tid]
except KeyError:
pass
else:
events.add((begin,tid,'begin',None,None))
if c is not None:
events.add((c,tid,'commit',None,None))
if a is not None:
events.add((a,tid,'abort',None,None))
for rid,dep,when in tx_reads.get(tid,()):
if rid in rids:
events.add((when,tid,R,rid,dep))
for rid,dep,when in tx_writes.get(tid,()):
if rid in rids:
events.add((when,tid,W,rid,dep))
# print events
events = sorted(events)
tmap = collections.defaultdict(timestamp_generator(1))
rmap = collections.defaultdict(timestamp_generator(1))
for when,tid,action,rid,dep in events:
tnum,rnum = tmap[tid],rmap[rid]
indent = '\t\t'*(tnum-1)
if rid is None:
# commit
out('%s%s', indent, action)
elif dep not in tids:
# read or write to baseline data
out('%s%s R%d', indent, action, rnum)
else:
# read or write to data produced in the cycle
out('%s%s R%d/T%d', indent, action, rnum, tmap[dep])
tmp = sorted((a,b) for b,a in tmap.items())
out('%s', '\t\t'.join('T%d' % pid for (i,pid) in tmp))
def get_lifetimes(tids):
'''For each tid on the list, return whetherit committed, whether it
was tracked, and when it ended.
'''
return {tid:tx_events[tid] for tid in tids if tid in tx_events}
def is_tracked(tid):
return tid in tx_tracked
return NamedTuple(on_begin=on_begin,
on_access=on_access,
on_finish=on_finish,
get_tx_deps=get_tx_deps,
get_lifetimes=get_lifetimes,
get_neighborhood=get_neighborhood,
is_tracked=is_tracked,
print_swimlane=print_swimlane)
class SerializationFailure(Exception): pass
def safe_dependency_tracker(stats,
db_size,
ww_blocks,
wr_blocks,
allow_cycles,
report_cycles,
tid_watch,
rid_watch,
analyze_tids,
verbose,
**extra_kwargs):
'''Create a epoch-based multi-version dependency tracker.
The tracker partitions time into a series of epochs, as follows:
Every epoch begins when some transaction ends.
Whenever an epoch begins, the ending timestamp and set of
in-flight transactions are recorded; the timestamp will open the
next epoch once all in-flight transactions end.
By construction, no transaction can span more than two epochs,
giving a hard limit to how far back in time we need to go in order
to detect all cycles. A version V1 has the following lifecycle, in
terms of epochs:
Transaction T1 commits V1 some time during E1. V0 (the version V1
replaces) is still available for new and existing transactions,
however. We say that T1 is "new."
Once E2 arrives, we can forbid any new transactions from accessing
V0. However, some of T1's peers from E1 could still be in flight,
and they might still need V0 (SI requires it, for example). T1 and
V0 are thus "cooling off" during E2.
Once E3 arrives, All transactions that were using V0 must have
ended, and we can safely delete V0. If T1 is involved in a cycle,
other transactions from the cycle could come from E0 (T1 committed
in E1 but could have started during E0), and could also come from
E2 (those that started in E1 and committed in E2). that cycle
must either be in We cannot forget about T1 quite yet, though, due
to the following scenario:
T3(E2/E3) r:w T2(E1/E2) r:w T1(E0/E1) r:w T3
In the above case, T2 cannot see V1, because it started before T1
committed. Further, T3 cannot see T2's writes for the same
reason. However, T3 can most definitely see T1's writes, closing a
cycle that spans three epochs. Therefore, we continue to remember
T1, even though it is "cold."
Once E4 arrives, we can finally detect all cycles that T1 might be
involved in, using edges from E1/E2/E3.
A transaction that started in E1 could acquire an anti-dependency on T1
each ending when all in-flight transactions from the
The tracker maintains an internal dependency graph in order to
detect committed cycles, but does not use that graph to decide
when to finalize transactions. Instead, the tracker maintains a
"safe point." No transaction that commits before a safe point will
carry any dependencies on any transaction that commits after. As
each new safe point is installed, the tracker discards the
corresponding subset of the dependency graph, ensuring that the
cost of cycle checking stays consistent over time.
The tracker manages the lifecycle and statistics for a simulated
database workload, and provides an independent means for detecting
isolation and serialization failures.
Internally, the tracker maintains data structures that protect old
transactions from repeated checking, so that the cost of checking
does not grow over time.
The central concept used is a "safe point" -- the pid of some
transaction known to have committed before any active transaction
began. Transactions incur no dependencies when accessing values
written before the current safe point began. An optimal safe point
could be maintained using a heap, but would be expensive---heap
maintenance is O(n lg n)---and would force us to check each
transaction individually for failures over an ever-growing
dependency graph. Instead, the system maintains a pending safe
point as well as a set of "pending" transactions. The pending safe
point will be installed once all pending transactions have
finished. The last such transaction to complete is chosen to
become the next pending safe point, and all currently active
transactions are added to the pending set.
There are six kinds of transactions in the dependency graph, based
on how their lifetime compares with the selection and installation
of safe points:
now
|
v
-------------- A --------------- B --------------- C --------------
|--T1--| |--T3--| |--T5--|
|--T2--| |--T4--| |--T6--|
In the above figure, time flows from left to right and is measured
in terms of transaction ids, which increase
monotonically. Installation of safe point A must wait until all
transactions that coexisted with A complete; B is the last to do
so, and is chosen to become the next safe point. As before, it
cannot be installed until all coexisting transactions complete,
with C being the last to do so. Note that the definition of a safe
point means that every transaction will see at most one safe point
installed during its lifetime.
We choose to perform serialization checks whenever a new safe
point is installed. In the above figure, suppose that C has just
committed, allowing us to install safe point B. There is no point
in checking live transactions (e.g. T6) yet because they could
still enter a cycle after the check. It's also unpalatable to
check the set of T4 and T5, because differentiating between T3 and
T4 depends on commit "time" which is messy in our formulation
based on transaction starts. Instead, at the time C commits (and B
is installed) we check all transactions that began between A and
B. All such are guaranteed to have committed, and most SCC we find
will not have been reported before.
At each round, the checker will report failures involving T3, T4,
T2/T3, T2/T4, T3/T4, and T2/T3/T4. We distinguish T3 from T4 by
noting that all deps for T3 occur *before* new_safe_point, while
one or more deps occur *after* new_safe_point in T4.
NOTE: While running Tarjan, we could encounter strongly connected
components involving T3/T4/T5. However, our formulation of safe
points disallows any direct dependencies between T3 and T5,
meaning that T4 must be the common member of 2+ unrelated cycles
(a figure-eight with T4 at the crossing). Each cycle is a
different (unrelated) serialization failure, so we are content to
partition the SCC, reporting the T3/T4 subcomponent now and the
T4/T5 subcomponent next time; by a similar argument, the T1/T2
subcomponent of a T1/T2/T3 failure will have been reported last
time, and we must now report the T2/T3 subcomponent.
NOTE: The first transaction to commit after system startup also
discovers an empty pending set, and will do the right thing by
becoming the new safe point and populating the pending set.
'''
# A database record is a list of versions, newest left-most. The
# only information we actually track is the tid of each version's
# creator. An old version can be reclaimed once the transaction
# that overwrites it has been finalized.
original_version = (0,1)
db = [collections.deque([original_version]) for i in range(db_size)]
tracker = tx_tracker(stats, **extra_kwargs)
log = historic_log()
safe_point = 0
# install next safe point when all pending transactions end
active,pending = set(),set()
pending_safe_point = 1
# active_* -> in flight accesses
active_reads, active_writes = dict_of_sets(), dict_of_sets()
# tx_* -> committed accesses
tx_reads,tx_writes = {},{}
# fresh_tx committed during current safe point; we can safely
# discard aging_tx at next safe point
fresh_tx,aging_tx = {},{}
stats.iso_failures,stats.ser_failures = 0,0
stats.acc_count,stats.rcount,stats.wcount = 0,0,0
stats.ww_waits,stats.wr_waits = 0,0
q,e = not verbose, errlog
def on_begin(pid):
begin,tracked = tracker.on_begin(pid)
active.add(pid)
log.on_begin(pid, tracked)
return begin
clobbering,blocked,no_clobber = {},set(),(None,None)
def on_access(pid, rid, read):
'''Notify the tracker that transaction "pid" is accessing record
"rid", and whether that access is a read or a write. Return
the value that was read or overwritten.
If bool(read) is False, a write is assumed and the most
recently committed version will be overwritten (writes are
coherent); otherwise, a read is performed. Transactions may
also provide a callable "read", if they wish to select a
specific version out of the ones currently available.
The version selector callback should accept a single argument,
an iterator over the triple (id, tid, stamp), where "id" is an
opaque value identifying the version, "tid" identifies the
transaction that created the version, and "stamp" is the
version's commit timestamp. Versions are presented
newest-first, with a transaction's own write (if any)
preceding all committed versions (and having tid=pid and
stamp=None). The callback should either return the id of the
version it wishes to read, or throw an instance of
AbortTransaction if no suitable version was available.
WARNING: allowing a transaction to read two different versions
(perhaps because a new version became available), or read and
overwrite different versions, introduces a dependency
cycle. Transactions can safely read a version they overwrite,
however, even after issuing the write (though by default the
transaction will read its own writes).
'''
Q = (q and pid not in tid_watch and rid not in rid_watch)
if tracker.is_tracked(pid):
stats.acc_count += 1
if read:
stats.rcount += 1
else:
stats.wcount += 1
versions = db[rid]
def iter_versions():
dep,stamp = versions[0]
#e('%d D=%d S=%d', rid, dep, stamp)
assert (not tracker.is_known(dep)
or stamp == tracker.get_end(dep))
if dep == pid or stamp is not None:
#e('A %d D=%d S=%d', rid, dep, stamp)
yield 0,dep,stamp
#e('len(ver)=%d', len(versions))
for i in range(1,len(versions)):
dep,stamp = versions[i]
#e('R%d i=%d D=%d S=%d', rid, i, dep, stamp)
assert (not tracker.is_known(dep)
or stamp == tracker.is_committed(dep))
yield i,dep,stamp
# versions become unavailable once their overwrite finalizes
if not tracker.is_known(dep):
#e('%d not known', dep)
break
if read:
if callable(read):
# caller wants to select the version to use
dep,stamp = versions[read(iter_versions())]
#e('DEP=%d %s', dep, stamp)
elif wr_blocks and rid in clobbering:
# uncommitted overwrite, must block
c,w,rlist = clobbering[rid]
if c in blocked:
raise WaitDepth
rval = []
stats.wr_waits += 1
rlist.append((pid,rval))
blocked.add(pid)
yield from sys_park()
assert len(rval)
dep,stamp = rval[0]
#e('DEP1=%d', dep)
else:
# use most recently-committed version
_,dep,stamp = next(iter_versions())
#e('DEP2=%d', dep)
else:
# write
i,dep,stamp = next(iter_versions())
if not i:
if dep != pid:
Q or e('\tpid=%d taking empty clobber slot for rid=%d', pid, rid)
assert rid not in clobbering
if ww_blocks:
clobbering[rid] = (pid,None,[])
active_writes[pid].add((rid,dep))
versions.appendleft((pid,None))
elif not ww_blocks:
raise WWConflict
else:
c,w,rlist = clobbering[rid]
if w or (c in blocked):
raise WaitDepth
clobbering[rid] = (c,pid,rlist)
Q or e('\tpid=%d blocked on WW conflict on rid=%d with pid=%d', pid, rid, c)
stats.ww_waits += 1
blocked.add(pid)
yield from sys_park()
assert clobbering[rid][0] is pid
assert versions[0] == (pid,None)
dep,stamp = versions[1]
active_writes[pid].add((rid,dep))
Q or e('T=%d %s version of R=%d created by X=%d',
pid, 'reads' if read else 'overwrites', rid, dep)
log.on_access(pid, rid, dep, read)
return dep,stamp
def on_finish(pid, is_commit, callback=None):
'''Notify the tracker that transaction "pid" has finished, and whether