forked from xline-kv/Xline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrevision_check.rs
101 lines (93 loc) · 3.25 KB
/
revision_check.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use xlineapi::execute_error::ExecuteError;
use crate::rpc::{CompactionRequest, RangeRequest, Request, TxnRequest};
/// A union of requests that need revision check
pub(crate) enum RevisionRequest<'a> {
/// Compaction request
Compaction(&'a CompactionRequest),
/// Range request
Range(&'a RangeRequest),
/// Txn request
Txn(&'a TxnRequest),
}
impl<'a> From<&'a CompactionRequest> for RevisionRequest<'a> {
fn from(req: &'a CompactionRequest) -> Self {
RevisionRequest::Compaction(req)
}
}
impl<'a> From<&'a RangeRequest> for RevisionRequest<'a> {
fn from(req: &'a RangeRequest) -> Self {
RevisionRequest::Range(req)
}
}
impl<'a> From<&'a TxnRequest> for RevisionRequest<'a> {
fn from(req: &'a TxnRequest) -> Self {
RevisionRequest::Txn(req)
}
}
/// Revision check
pub(crate) trait RevisionCheck {
/// check if the request is valid given the compacted and current revision
fn check_revision(
self,
compacted_revision: i64,
current_revision: i64,
) -> Result<(), ExecuteError>;
}
impl<'a, T> RevisionCheck for &'a T
where
&'a T: Into<RevisionRequest<'a>>,
{
fn check_revision(
self,
compacted_revision: i64,
current_revision: i64,
) -> Result<(), ExecuteError> {
debug_assert!(
compacted_revision <= current_revision,
"compacted revision should not larger than current revision"
);
let request = self.into();
match request {
RevisionRequest::Compaction(r) => {
if r.revision <= compacted_revision {
Err(ExecuteError::RevisionCompacted(
r.revision,
compacted_revision,
))
} else if r.revision > current_revision {
Err(ExecuteError::RevisionTooLarge(r.revision, current_revision))
} else {
Ok(())
}
}
RevisionRequest::Range(r) => {
if r.revision > current_revision {
Err(ExecuteError::RevisionTooLarge(r.revision, current_revision))
} else {
(r.revision >= compacted_revision || r.revision <= 0)
.then_some(())
.ok_or(ExecuteError::RevisionCompacted(
r.revision,
compacted_revision,
))
}
}
RevisionRequest::Txn(r) => {
for op in r.success.iter().chain(r.failure.iter()) {
if let Some(ref req) = op.request {
match *req {
Request::RequestRange(ref req) => {
req.check_revision(compacted_revision, current_revision)?;
}
Request::RequestTxn(ref req) => {
req.check_revision(compacted_revision, current_revision)?;
}
Request::RequestPut(_) | Request::RequestDeleteRange(_) => (),
}
}
}
Ok(())
}
}
}
}