From fa0a7c8a545de644734b5e1e1d743c7a5c873d18 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 23 Jul 2021 14:55:18 +0800 Subject: [PATCH] support to free or monitor inflight buffers (#423) * free memory for inflight Signed-off-by: jayzhan211 * monitor raft peer capacity Signed-off-by: jayzhan211 --- datadriven/src/test_data.rs | 4 +- harness/tests/integration_cases/test_raft.rs | 5 +- .../test_raft_flow_control.rs | 84 +++++++++++++++++++ src/raft.rs | 18 ++++ src/tracker/inflights.rs | 80 ++++++++++++------ src/tracker/progress.rs | 1 - 6 files changed, 162 insertions(+), 30 deletions(-) diff --git a/datadriven/src/test_data.rs b/datadriven/src/test_data.rs index 2c8d00b77..f3b59bdec 100644 --- a/datadriven/src/test_data.rs +++ b/datadriven/src/test_data.rs @@ -89,7 +89,7 @@ mod tests { vals: vec!["some string".to_string()], }; d.cmd_args.push(cmd_arg); - assert_eq!(d.contains_key("key2"), true); - assert_eq!(d.contains_key("key1"), false); + assert!(d.contains_key("key2")); + assert!(!d.contains_key("key1")); } } diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index 9a22d7c53..109f93939 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -2369,9 +2369,8 @@ fn test_read_only_with_learner() { .read_states .drain(..) .collect(); - assert_eq!( - read_states.is_empty(), - false, + assert!( + !read_states.is_empty(), "#{}: read_states is empty, want non-empty", i ); diff --git a/harness/tests/integration_cases/test_raft_flow_control.rs b/harness/tests/integration_cases/test_raft_flow_control.rs index fc7d84317..73355b1b4 100644 --- a/harness/tests/integration_cases/test_raft_flow_control.rs +++ b/harness/tests/integration_cases/test_raft_flow_control.rs @@ -175,3 +175,87 @@ fn test_msg_app_flow_control_recv_heartbeat() { r.read_messages(); } } + +#[test] +fn test_msg_app_flow_control_with_freeing_resources() { + let l = default_logger(); + let mut r = new_test_raft(1, vec![1, 2, 3], 5, 1, new_storage(), &l); + + r.become_candidate(); + r.become_leader(); + + for (_, pr) in r.prs().iter() { + assert!(!pr.ins.buffer_is_allocated()); + } + + for i in 1..=3 { + // Force the progress to be in replicate state. + r.mut_prs().get_mut(i).unwrap().become_replicate(); + } + + r.step(new_message(1, 1, MessageType::MsgPropose, 1)) + .unwrap(); + + for (&id, pr) in r.prs().iter() { + if id != 1 { + assert!(pr.ins.buffer_is_allocated()); + assert_eq!(pr.ins.count(), 1); + } + } + + /* + 1: cap=0/start=0/count=0/buffer=[] + 2: cap=256/start=0/count=1/buffer=[2] + 3: cap=256/start=0/count=1/buffer=[2] + */ + + let mut resp = new_message(2, 1, MessageType::MsgAppendResponse, 0); + resp.index = r.raft_log.last_index(); + r.step(resp).unwrap(); + + assert_eq!(r.prs().get(2).unwrap().ins.count(), 0); + + /* + 1: cap=0/start=0/count=0/buffer=[] + 2: cap=256/start=1/count=0/buffer=[2] + 3: cap=256/start=0/count=1/buffer=[2] + */ + + r.step(new_message(1, 1, MessageType::MsgPropose, 1)) + .unwrap(); + + assert_eq!(r.prs().get(2).unwrap().ins.count(), 1); + assert_eq!(r.prs().get(3).unwrap().ins.count(), 2); + + /* + 1: cap=0/start=0/count=0/buffer=[] + 2: cap=256/start=1/count=1/buffer=[2,3] + 3: cap=256/start=0/count=2/buffer=[2,3] + */ + + let mut resp = new_message(2, 1, MessageType::MsgAppendResponse, 0); + resp.index = r.raft_log.last_index(); + r.step(resp).unwrap(); + + assert_eq!(r.prs().get(2).unwrap().ins.count(), 0); + assert_eq!(r.prs().get(3).unwrap().ins.count(), 2); + assert_eq!(r.inflight_buffers_size(), 512); + + /* + 1: cap=0/start=0/count=0/buffer=[] + 2: cap=256/start=2/count=0/buffer=[2,3] + 3: cap=256/start=0/count=2/buffer=[2,3] + */ + + r.maybe_free_inflight_buffers(); + + assert!(!r.prs().get(2).unwrap().ins.buffer_is_allocated()); + assert_eq!(r.prs().get(2).unwrap().ins.count(), 0); + assert_eq!(r.inflight_buffers_size(), 256); + + /* + 1: cap=0/start=0/count=0/buffer=[] + 2: cap=0/start=0/count=0/buffer=[] + 3: cap=256/start=0/count=2/buffer=[2,3] + */ +} diff --git a/src/raft.rs b/src/raft.rs index 8d93fe38e..1dfce98d2 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -845,6 +845,15 @@ impl RaftCore { } impl Raft { + /// Get the inflight buffer size. + pub fn inflight_buffers_size(&self) -> usize { + let mut total_size = 0; + for (_, pr) in self.prs().iter() { + total_size += pr.ins.cap(); + } + total_size + } + /// Sends an append RPC with new entries (if any) and the current commit index to the given /// peer. pub fn send_append(&mut self, to: u64) { @@ -2835,4 +2844,13 @@ impl Raft { pub fn uncommitted_size(&self) -> usize { self.uncommitted_state.uncommitted_size } + + /// A Raft leader allocates a vector with capacity `max_inflight_msgs` for every peer. + /// It takes a lot of memory if there are too many Raft groups. `maybe_free_inflight_buffers` + /// is used to free memory if necessary. + pub fn maybe_free_inflight_buffers(&mut self) { + for (_, pr) in self.mut_prs().iter_mut() { + pr.ins.maybe_free_buffer(); + } + } } diff --git a/src/tracker/inflights.rs b/src/tracker/inflights.rs index 860a815d0..bafea066f 100644 --- a/src/tracker/inflights.rs +++ b/src/tracker/inflights.rs @@ -15,7 +15,7 @@ // limitations under the License. /// A buffer of inflight messages. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct Inflights { // the starting index in the buffer start: usize, @@ -24,19 +24,9 @@ pub struct Inflights { // ring buffer buffer: Vec, -} -// The `buffer` must have it's capacity set correctly on clone, normally it does not. -impl Clone for Inflights { - fn clone(&self) -> Self { - let mut buffer = self.buffer.clone(); - buffer.reserve(self.buffer.capacity() - self.buffer.len()); - Inflights { - start: self.start, - count: self.count, - buffer, - } - } + // capacity + cap: usize, } impl Inflights { @@ -46,19 +36,14 @@ impl Inflights { buffer: Vec::with_capacity(cap), start: 0, count: 0, + cap, } } /// Returns true if the inflights is full. #[inline] pub fn full(&self) -> bool { - self.count == self.cap() - } - - /// The buffer capacity. - #[inline] - pub fn cap(&self) -> usize { - self.buffer.capacity() + self.count == self.cap } /// Adds an inflight into inflights @@ -67,9 +52,15 @@ impl Inflights { panic!("cannot add into a full inflights") } + if self.buffer.capacity() == 0 { + debug_assert_eq!(self.count, 0); + debug_assert_eq!(self.start, 0); + self.buffer = Vec::with_capacity(self.cap); + } + let mut next = self.start + self.count; - if next >= self.cap() { - next -= self.cap(); + if next >= self.cap { + next -= self.cap; } assert!(next <= self.buffer.len()); if next == self.buffer.len() { @@ -97,8 +88,8 @@ impl Inflights { // increase index and maybe rotate idx += 1; - if idx >= self.cap() { - idx -= self.cap(); + if idx >= self.cap { + idx -= self.cap; } i += 1; @@ -121,6 +112,38 @@ impl Inflights { pub fn reset(&mut self) { self.count = 0; self.start = 0; + self.buffer = vec![]; + } + + // Number of inflight messages. It's for tests. + #[doc(hidden)] + #[inline] + pub fn count(&self) -> usize { + self.count + } + + // Capacity of inflight buffer. + #[doc(hidden)] + #[inline] + pub fn cap(&self) -> usize { + self.buffer.capacity() + } + + // Whether buffer is allocated or not. It's for tests. + #[doc(hidden)] + #[inline] + pub fn buffer_is_allocated(&self) -> bool { + self.cap() > 0 + } + + /// Free unused memory + #[inline] + pub(crate) fn maybe_free_buffer(&mut self) { + if self.count == 0 { + self.start = 0; + self.buffer = vec![]; + debug_assert_eq!(self.buffer.capacity(), 0); + } } } @@ -139,6 +162,7 @@ mod tests { start: 0, count: 5, buffer: vec![0, 1, 2, 3, 4], + cap: 10, }; assert_eq!(inflight, wantin); @@ -151,6 +175,7 @@ mod tests { start: 0, count: 10, buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + cap: 10, }; assert_eq!(inflight, wantin2); @@ -167,6 +192,7 @@ mod tests { start: 5, count: 5, buffer: vec![0, 0, 0, 0, 0, 0, 1, 2, 3, 4], + cap: 10, }; assert_eq!(inflight2, wantin21); @@ -179,6 +205,7 @@ mod tests { start: 5, count: 10, buffer: vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4], + cap: 10, }; assert_eq!(inflight2, wantin22); @@ -197,6 +224,7 @@ mod tests { start: 5, count: 5, buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + cap: 10, }; assert_eq!(inflight, wantin); @@ -207,6 +235,7 @@ mod tests { start: 9, count: 1, buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + cap: 10, }; assert_eq!(inflight, wantin2); @@ -221,6 +250,7 @@ mod tests { start: 3, count: 2, buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9], + cap: 10, }; assert_eq!(inflight, wantin3); @@ -231,6 +261,7 @@ mod tests { start: 5, count: 0, buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9], + cap: 10, }; assert_eq!(inflight, wantin4); @@ -249,6 +280,7 @@ mod tests { start: 1, count: 9, buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + cap: 10, }; assert_eq!(inflight, wantin); diff --git a/src/tracker/progress.rs b/src/tracker/progress.rs index af4c29de1..252aec6d8 100644 --- a/src/tracker/progress.rs +++ b/src/tracker/progress.rs @@ -87,7 +87,6 @@ impl Progress { self.pending_snapshot = 0; self.pending_request_snapshot = INVALID_INDEX; self.recent_active = false; - debug_assert!(self.ins.cap() != 0); self.ins.reset(); }