Skip to content

Commit

Permalink
support to free or monitor inflight buffers (#423)
Browse files Browse the repository at this point in the history
* free memory for inflight

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* monitor raft peer capacity

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 authored Jul 23, 2021
1 parent 7c21f8d commit fa0a7c8
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 30 deletions.
4 changes: 2 additions & 2 deletions datadriven/src/test_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ mod tests {
vals: vec!["some string".to_string()],
};
d.cmd_args.push(cmd_arg);
assert_eq!(d.contains_key("key2"), true);
assert_eq!(d.contains_key("key1"), false);
assert!(d.contains_key("key2"));
assert!(!d.contains_key("key1"));
}
}
5 changes: 2 additions & 3 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2369,9 +2369,8 @@ fn test_read_only_with_learner() {
.read_states
.drain(..)
.collect();
assert_eq!(
read_states.is_empty(),
false,
assert!(
!read_states.is_empty(),
"#{}: read_states is empty, want non-empty",
i
);
Expand Down
84 changes: 84 additions & 0 deletions harness/tests/integration_cases/test_raft_flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,87 @@ fn test_msg_app_flow_control_recv_heartbeat() {
r.read_messages();
}
}

#[test]
fn test_msg_app_flow_control_with_freeing_resources() {
let l = default_logger();
let mut r = new_test_raft(1, vec![1, 2, 3], 5, 1, new_storage(), &l);

r.become_candidate();
r.become_leader();

for (_, pr) in r.prs().iter() {
assert!(!pr.ins.buffer_is_allocated());
}

for i in 1..=3 {
// Force the progress to be in replicate state.
r.mut_prs().get_mut(i).unwrap().become_replicate();
}

r.step(new_message(1, 1, MessageType::MsgPropose, 1))
.unwrap();

for (&id, pr) in r.prs().iter() {
if id != 1 {
assert!(pr.ins.buffer_is_allocated());
assert_eq!(pr.ins.count(), 1);
}
}

/*
1: cap=0/start=0/count=0/buffer=[]
2: cap=256/start=0/count=1/buffer=[2]
3: cap=256/start=0/count=1/buffer=[2]
*/

let mut resp = new_message(2, 1, MessageType::MsgAppendResponse, 0);
resp.index = r.raft_log.last_index();
r.step(resp).unwrap();

assert_eq!(r.prs().get(2).unwrap().ins.count(), 0);

/*
1: cap=0/start=0/count=0/buffer=[]
2: cap=256/start=1/count=0/buffer=[2]
3: cap=256/start=0/count=1/buffer=[2]
*/

r.step(new_message(1, 1, MessageType::MsgPropose, 1))
.unwrap();

assert_eq!(r.prs().get(2).unwrap().ins.count(), 1);
assert_eq!(r.prs().get(3).unwrap().ins.count(), 2);

/*
1: cap=0/start=0/count=0/buffer=[]
2: cap=256/start=1/count=1/buffer=[2,3]
3: cap=256/start=0/count=2/buffer=[2,3]
*/

let mut resp = new_message(2, 1, MessageType::MsgAppendResponse, 0);
resp.index = r.raft_log.last_index();
r.step(resp).unwrap();

assert_eq!(r.prs().get(2).unwrap().ins.count(), 0);
assert_eq!(r.prs().get(3).unwrap().ins.count(), 2);
assert_eq!(r.inflight_buffers_size(), 512);

/*
1: cap=0/start=0/count=0/buffer=[]
2: cap=256/start=2/count=0/buffer=[2,3]
3: cap=256/start=0/count=2/buffer=[2,3]
*/

r.maybe_free_inflight_buffers();

assert!(!r.prs().get(2).unwrap().ins.buffer_is_allocated());
assert_eq!(r.prs().get(2).unwrap().ins.count(), 0);
assert_eq!(r.inflight_buffers_size(), 256);

/*
1: cap=0/start=0/count=0/buffer=[]
2: cap=0/start=0/count=0/buffer=[]
3: cap=256/start=0/count=2/buffer=[2,3]
*/
}
18 changes: 18 additions & 0 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,15 @@ impl<T: Storage> RaftCore<T> {
}

impl<T: Storage> Raft<T> {
/// Get the inflight buffer size.
pub fn inflight_buffers_size(&self) -> usize {
let mut total_size = 0;
for (_, pr) in self.prs().iter() {
total_size += pr.ins.cap();
}
total_size
}

/// Sends an append RPC with new entries (if any) and the current commit index to the given
/// peer.
pub fn send_append(&mut self, to: u64) {
Expand Down Expand Up @@ -2835,4 +2844,13 @@ impl<T: Storage> Raft<T> {
pub fn uncommitted_size(&self) -> usize {
self.uncommitted_state.uncommitted_size
}

/// A Raft leader allocates a vector with capacity `max_inflight_msgs` for every peer.
/// It takes a lot of memory if there are too many Raft groups. `maybe_free_inflight_buffers`
/// is used to free memory if necessary.
pub fn maybe_free_inflight_buffers(&mut self) {
for (_, pr) in self.mut_prs().iter_mut() {
pr.ins.maybe_free_buffer();
}
}
}
80 changes: 56 additions & 24 deletions src/tracker/inflights.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// limitations under the License.

/// A buffer of inflight messages.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct Inflights {
// the starting index in the buffer
start: usize,
Expand All @@ -24,19 +24,9 @@ pub struct Inflights {

// ring buffer
buffer: Vec<u64>,
}

// The `buffer` must have it's capacity set correctly on clone, normally it does not.
impl Clone for Inflights {
fn clone(&self) -> Self {
let mut buffer = self.buffer.clone();
buffer.reserve(self.buffer.capacity() - self.buffer.len());
Inflights {
start: self.start,
count: self.count,
buffer,
}
}
// capacity
cap: usize,
}

impl Inflights {
Expand All @@ -46,19 +36,14 @@ impl Inflights {
buffer: Vec::with_capacity(cap),
start: 0,
count: 0,
cap,
}
}

/// Returns true if the inflights is full.
#[inline]
pub fn full(&self) -> bool {
self.count == self.cap()
}

/// The buffer capacity.
#[inline]
pub fn cap(&self) -> usize {
self.buffer.capacity()
self.count == self.cap
}

/// Adds an inflight into inflights
Expand All @@ -67,9 +52,15 @@ impl Inflights {
panic!("cannot add into a full inflights")
}

if self.buffer.capacity() == 0 {
debug_assert_eq!(self.count, 0);
debug_assert_eq!(self.start, 0);
self.buffer = Vec::with_capacity(self.cap);
}

let mut next = self.start + self.count;
if next >= self.cap() {
next -= self.cap();
if next >= self.cap {
next -= self.cap;
}
assert!(next <= self.buffer.len());
if next == self.buffer.len() {
Expand Down Expand Up @@ -97,8 +88,8 @@ impl Inflights {

// increase index and maybe rotate
idx += 1;
if idx >= self.cap() {
idx -= self.cap();
if idx >= self.cap {
idx -= self.cap;
}

i += 1;
Expand All @@ -121,6 +112,38 @@ impl Inflights {
pub fn reset(&mut self) {
self.count = 0;
self.start = 0;
self.buffer = vec![];
}

// Number of inflight messages. It's for tests.
#[doc(hidden)]
#[inline]
pub fn count(&self) -> usize {
self.count
}

// Capacity of inflight buffer.
#[doc(hidden)]
#[inline]
pub fn cap(&self) -> usize {
self.buffer.capacity()
}

// Whether buffer is allocated or not. It's for tests.
#[doc(hidden)]
#[inline]
pub fn buffer_is_allocated(&self) -> bool {
self.cap() > 0
}

/// Free unused memory
#[inline]
pub(crate) fn maybe_free_buffer(&mut self) {
if self.count == 0 {
self.start = 0;
self.buffer = vec![];
debug_assert_eq!(self.buffer.capacity(), 0);
}
}
}

Expand All @@ -139,6 +162,7 @@ mod tests {
start: 0,
count: 5,
buffer: vec![0, 1, 2, 3, 4],
cap: 10,
};

assert_eq!(inflight, wantin);
Expand All @@ -151,6 +175,7 @@ mod tests {
start: 0,
count: 10,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
cap: 10,
};

assert_eq!(inflight, wantin2);
Expand All @@ -167,6 +192,7 @@ mod tests {
start: 5,
count: 5,
buffer: vec![0, 0, 0, 0, 0, 0, 1, 2, 3, 4],
cap: 10,
};

assert_eq!(inflight2, wantin21);
Expand All @@ -179,6 +205,7 @@ mod tests {
start: 5,
count: 10,
buffer: vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4],
cap: 10,
};

assert_eq!(inflight2, wantin22);
Expand All @@ -197,6 +224,7 @@ mod tests {
start: 5,
count: 5,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
cap: 10,
};

assert_eq!(inflight, wantin);
Expand All @@ -207,6 +235,7 @@ mod tests {
start: 9,
count: 1,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
cap: 10,
};

assert_eq!(inflight, wantin2);
Expand All @@ -221,6 +250,7 @@ mod tests {
start: 3,
count: 2,
buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9],
cap: 10,
};

assert_eq!(inflight, wantin3);
Expand All @@ -231,6 +261,7 @@ mod tests {
start: 5,
count: 0,
buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9],
cap: 10,
};

assert_eq!(inflight, wantin4);
Expand All @@ -249,6 +280,7 @@ mod tests {
start: 1,
count: 9,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
cap: 10,
};

assert_eq!(inflight, wantin);
Expand Down
1 change: 0 additions & 1 deletion src/tracker/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ impl Progress {
self.pending_snapshot = 0;
self.pending_request_snapshot = INVALID_INDEX;
self.recent_active = false;
debug_assert!(self.ins.cap() != 0);
self.ins.reset();
}

Expand Down

0 comments on commit fa0a7c8

Please # to comment.