Skip to content

Commit

Permalink
support to adjust max inflight msgs
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu committed Aug 2, 2021
1 parent fa0a7c8 commit d9ffd50
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 18 deletions.
4 changes: 2 additions & 2 deletions harness/tests/integration_cases/test_raft_flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ fn test_msg_app_flow_control_with_freeing_resources() {

assert_eq!(r.prs().get(2).unwrap().ins.count(), 0);
assert_eq!(r.prs().get(3).unwrap().ins.count(), 2);
assert_eq!(r.inflight_buffers_size(), 512);
assert_eq!(r.inflight_buffers_size(), 4096);

/*
1: cap=0/start=0/count=0/buffer=[]
Expand All @@ -251,7 +251,7 @@ fn test_msg_app_flow_control_with_freeing_resources() {

assert!(!r.prs().get(2).unwrap().ins.buffer_is_allocated());
assert_eq!(r.prs().get(2).unwrap().ins.count(), 0);
assert_eq!(r.inflight_buffers_size(), 256);
assert_eq!(r.inflight_buffers_size(), 2048);

/*
1: cap=0/start=0/count=0/buffer=[]
Expand Down
2 changes: 1 addition & 1 deletion src/confchange/datadriven_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn test_conf_change_data_driven() -> anyhow::Result<()> {
walk("src/confchange/testdata", |path| -> anyhow::Result<()> {
let logger = default_logger();

let mut tr = ProgressTracker::new(10, default_logger());
let mut tr = ProgressTracker::new(10);
let mut idx = 0;

run_test(
Expand Down
11 changes: 9 additions & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ impl<T: Storage> Raft<T> {
voters.len(),
learners.len(),
c.max_inflight_msgs,
logger.clone(),
),
msgs: Default::default(),
r: RaftCore {
Expand Down Expand Up @@ -849,7 +848,7 @@ impl<T: Storage> Raft<T> {
pub fn inflight_buffers_size(&self) -> usize {
let mut total_size = 0;
for (_, pr) in self.prs().iter() {
total_size += pr.ins.cap();
total_size += pr.ins.buffer_capacity() * std::mem::size_of::<u64>();
}
total_size
}
Expand Down Expand Up @@ -2853,4 +2852,12 @@ impl<T: Storage> Raft<T> {
pr.ins.maybe_free_buffer();
}
}

/// To adjust `max_inflight_msgs` for the specified peer.
/// Set to `0` will disable the progress.
pub fn adjust_max_inflight_msgs(&mut self, target: u64, cap: usize) {
if let Some(pr) = self.mut_prs().get_mut(target) {
pr.ins.set_cap(cap);
}
}
}
10 changes: 2 additions & 8 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@ pub use self::inflights::Inflights;
pub use self::progress::Progress;
pub use self::state::ProgressState;

use slog::Logger;

use crate::confchange::{MapChange, MapChangeType};
use crate::eraftpb::ConfState;
use crate::quorum::{AckedIndexer, Index, VoteResult};
use crate::{DefaultHashBuilder, HashMap, HashSet, JointConfig};
use std::fmt::Debug;

use getset::Getters;

/// Config reflects the configuration tracked in a ProgressTracker.
Expand Down Expand Up @@ -205,21 +202,19 @@ pub struct ProgressTracker {
max_inflight: usize,

group_commit: bool,
pub(crate) logger: Logger,
}

impl ProgressTracker {
/// Creates a new ProgressTracker.
pub fn new(max_inflight: usize, logger: Logger) -> Self {
Self::with_capacity(0, 0, max_inflight, logger)
pub fn new(max_inflight: usize) -> Self {
Self::with_capacity(0, 0, max_inflight)
}

/// Create a progress set with the specified sizes already reserved.
pub fn with_capacity(
voters: usize,
learners: usize,
max_inflight: usize,
logger: Logger,
) -> Self {
ProgressTracker {
progress: HashMap::with_capacity_and_hasher(
Expand All @@ -230,7 +225,6 @@ impl ProgressTracker {
votes: HashMap::with_capacity_and_hasher(voters, DefaultHashBuilder::default()),
max_inflight,
group_commit: false,
logger,
}
}

Expand Down
123 changes: 118 additions & 5 deletions src/tracker/inflights.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;

/// A buffer of inflight messages.
#[derive(Debug, PartialEq, Clone)]
pub struct Inflights {
Expand All @@ -27,6 +29,9 @@ pub struct Inflights {

// capacity
cap: usize,

// To support dynamically change inflight size.
incoming_cap: Option<usize>,
}

impl Inflights {
Expand All @@ -37,13 +42,43 @@ impl Inflights {
start: 0,
count: 0,
cap,
incoming_cap: None,
}
}

/// Adjust inflight buffer capacity. Set it to `0` will disable the progress.
// Calling it between `self.full()` and `self.add()` can cause a panic.
pub fn set_cap(&mut self, incoming_cap: usize) {
match self.cap.cmp(&incoming_cap) {
Ordering::Equal => self.incoming_cap = None,
Ordering::Less => {
self.buffer.reserve(incoming_cap - self.count);
self.cap = incoming_cap;
self.incoming_cap = None;
}
Ordering::Greater => {
if self.count <= incoming_cap && self.start + self.count < incoming_cap {
self.cap = incoming_cap;
let (cur_cap, cur_len) = (self.buffer.capacity(), self.buffer.len());
if cur_cap > incoming_cap && cur_len <= incoming_cap {
// TODO: Simplify it after `shrink_to` is stable.
unsafe {
self.buffer.set_len(incoming_cap);
self.buffer.shrink_to_fit();
self.buffer.set_len(cur_len);
}
}
} else {
self.incoming_cap = Some(incoming_cap);
}
}
}
}

/// Returns true if the inflights is full.
#[inline]
pub fn full(&self) -> bool {
self.count == self.cap
self.count == self.cap || self.incoming_cap.map_or(false, |cap| self.count >= cap)
}

/// Adds an inflight into inflights
Expand All @@ -55,6 +90,7 @@ impl Inflights {
if self.buffer.capacity() == 0 {
debug_assert_eq!(self.count, 0);
debug_assert_eq!(self.start, 0);
debug_assert!(self.incoming_cap.is_none());
self.buffer = Vec::with_capacity(self.cap);
}

Expand Down Expand Up @@ -98,6 +134,14 @@ impl Inflights {
// free i inflights and set new start index
self.count -= i;
self.start = idx;

if self.count == 0 {
if let Some(incoming_cap) = self.incoming_cap.take() {
self.start = 0;
self.cap = incoming_cap;
self.buffer = Vec::with_capacity(self.cap);
}
}
}

/// Frees the first buffer entry.
Expand All @@ -113,6 +157,7 @@ impl Inflights {
self.count = 0;
self.start = 0;
self.buffer = vec![];
self.cap = self.incoming_cap.take().unwrap_or(self.cap);
}

// Number of inflight messages. It's for tests.
Expand All @@ -122,23 +167,23 @@ impl Inflights {
self.count
}

// Capacity of inflight buffer.
// Capacity of the internal buffer.
#[doc(hidden)]
#[inline]
pub fn cap(&self) -> usize {
pub fn buffer_capacity(&self) -> usize {
self.buffer.capacity()
}

// Whether buffer is allocated or not. It's for tests.
#[doc(hidden)]
#[inline]
pub fn buffer_is_allocated(&self) -> bool {
self.cap() > 0
self.buffer_capacity() > 0
}

/// Free unused memory
#[inline]
pub(crate) fn maybe_free_buffer(&mut self) {
pub fn maybe_free_buffer(&mut self) {
if self.count == 0 {
self.start = 0;
self.buffer = vec![];
Expand All @@ -163,6 +208,7 @@ mod tests {
count: 5,
buffer: vec![0, 1, 2, 3, 4],
cap: 10,
incoming_cap: None,
};

assert_eq!(inflight, wantin);
Expand All @@ -176,6 +222,7 @@ mod tests {
count: 10,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
cap: 10,
incoming_cap: None,
};

assert_eq!(inflight, wantin2);
Expand All @@ -193,6 +240,7 @@ mod tests {
count: 5,
buffer: vec![0, 0, 0, 0, 0, 0, 1, 2, 3, 4],
cap: 10,
incoming_cap: None,
};

assert_eq!(inflight2, wantin21);
Expand All @@ -206,6 +254,7 @@ mod tests {
count: 10,
buffer: vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4],
cap: 10,
incoming_cap: None,
};

assert_eq!(inflight2, wantin22);
Expand All @@ -225,6 +274,7 @@ mod tests {
count: 5,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
cap: 10,
incoming_cap: None,
};

assert_eq!(inflight, wantin);
Expand All @@ -236,6 +286,7 @@ mod tests {
count: 1,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
cap: 10,
incoming_cap: None,
};

assert_eq!(inflight, wantin2);
Expand All @@ -251,6 +302,7 @@ mod tests {
count: 2,
buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9],
cap: 10,
incoming_cap: None,
};

assert_eq!(inflight, wantin3);
Expand All @@ -262,6 +314,7 @@ mod tests {
count: 0,
buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9],
cap: 10,
incoming_cap: None,
};

assert_eq!(inflight, wantin4);
Expand All @@ -281,8 +334,68 @@ mod tests {
count: 9,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
cap: 10,
incoming_cap: None,
};

assert_eq!(inflight, wantin);
}

#[test]
fn test_inflights_set_cap() {
let mut inflight = Inflights::new(128);

(0..16).for_each(|i| inflight.add(i));
assert_eq!(inflight.count(), 16);

// Adjust cap to a larger value.
inflight.set_cap(1024);
assert_eq!(inflight.cap, 1024);
assert_eq!(inflight.incoming_cap, None);
assert_eq!(inflight.buffer_capacity(), 1024);

// Adjust cap to a less value than the current one.
inflight.set_cap(8);
assert_eq!(inflight.cap, 1024);
assert_eq!(inflight.incoming_cap, Some(8));
assert!(inflight.full());

// Free somethings. It should still be full.
inflight.free_to(7);
assert!(inflight.full());

// Free more one slot, then it won't be full. However buffer capacity can't
// shrink in the current implementation.
inflight.free_first_one();
assert!(!inflight.full());
assert_eq!(inflight.buffer_capacity(), 1024);

// The internal buffer can be shrinked after it is freed totally.
inflight.free_to(15);
assert!(inflight.start < inflight.buffer_capacity());
assert_eq!(inflight.buffer_capacity(), 8);

// 1024 -> 8 -> 1024. `incoming_cap` should be cleared after the second `set_cap`.
inflight.set_cap(1024);
(0..16).for_each(|i| inflight.add(i));
inflight.set_cap(8);
assert_eq!(inflight.cap, 1024);
assert_eq!(inflight.incoming_cap, Some(8));
inflight.set_cap(1024);
assert_eq!(inflight.incoming_cap, None);

// 1024 -> 512. The internal buffer should be shrinked.
inflight.set_cap(512);
assert_eq!(inflight.cap, 512);
assert_eq!(inflight.incoming_cap, None);
assert_eq!(inflight.buffer_capacity(), 512);

// 1024 -> 512. The buffer shouldn't be shrinked as the tail part is in used.
let mut inflight = Inflights::new(1024);
inflight.buffer = vec![0; 1024];
inflight.start = 800;
(0..16).for_each(|i| inflight.add(i));
inflight.set_cap(512);
assert_eq!(inflight.incoming_cap, Some(512));
assert_eq!(inflight.buffer_capacity(), 1024);
}
}

0 comments on commit d9ffd50

Please # to comment.