From 541e9d36ba5f26115e2698068ebd23f9bf0236b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 17 Jan 2023 16:16:18 +0800 Subject: [PATCH] Feature: add "Inflight" to store info about inflight replication data But it is not used anywhere until replication progress control is moved from `ReplicationCore` to `Engine`. `Inflight` will be part of the **progress** data. --- openraft/src/lib.rs | 2 + openraft/src/log_id_range.rs | 34 ++++ openraft/src/progress/inflight.rs | 287 ++++++++++++++++++++++++++++++ openraft/src/progress/mod.rs | 4 + 4 files changed, 327 insertions(+) create mode 100644 openraft/src/log_id_range.rs create mode 100644 openraft/src/progress/inflight.rs diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 6a40c7100..cdc4c774d 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -33,6 +33,8 @@ mod store_wrapper; mod summary; mod vote; +pub(crate) mod log_id_range; + mod engine; pub mod error; mod internal_server_state; diff --git a/openraft/src/log_id_range.rs b/openraft/src/log_id_range.rs new file mode 100644 index 000000000..9ab93bb9b --- /dev/null +++ b/openraft/src/log_id_range.rs @@ -0,0 +1,34 @@ +use std::fmt::Display; +use std::fmt::Formatter; + +use crate::LogId; +use crate::MessageSummary; +use crate::NodeId; + +/// A log id range of continuous series of log entries. +/// +/// The range of log to send is left open right close: `(prev_log_id, last_log_id]`. +#[derive(Clone, Copy, Debug)] +#[derive(PartialEq, Eq)] +pub(crate) struct LogIdRange { + /// The prev log id before the first to send, exclusive. + pub(crate) prev_log_id: Option>, + + /// The last log id to send, inclusive. + pub(crate) last_log_id: Option>, +} + +impl Display for LogIdRange { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "({}, {}]", self.prev_log_id.summary(), self.last_log_id.summary()) + } +} + +impl LogIdRange { + pub(crate) fn new(prev: Option>, last: Option>) -> Self { + Self { + prev_log_id: prev, + last_log_id: last, + } + } +} diff --git a/openraft/src/progress/inflight.rs b/openraft/src/progress/inflight.rs new file mode 100644 index 000000000..49b0c83f1 --- /dev/null +++ b/openraft/src/progress/inflight.rs @@ -0,0 +1,287 @@ +// TODO: remove it +#![allow(unused)] + +use std::fmt::Display; +use std::fmt::Formatter; + +use crate::log_id_range::LogIdRange; +use crate::LogId; +use crate::LogIdOptionExt; +use crate::MessageSummary; +use crate::NodeId; + +/// The inflight data being transmitting from leader to a follower/learner. +/// +/// If inflight data is non-None, it's waiting for responses from a follower/learner. +/// The follower/learner respond with `ack()` or `conflict()` to update the state of inflight data. +#[derive(Clone, Copy, Debug)] +#[derive(PartialEq, Eq)] +pub(crate) enum Inflight { + None, + + /// Being replicating a series of logs. + Logs(LogIdRange), + + /// Being replicating a snapshot. + Snapshot { + /// The last log id snapshot includes. + /// + /// It is None, if the snapshot is empty. + last_log_id: Option>, + }, +} + +impl Display for Inflight { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Inflight::None => { + write!(f, "None") + } + Inflight::Logs(l) => { + write!(f, "Logs:{}", l) + } + Inflight::Snapshot { last_log_id: last_next } => { + write!(f, "Snapshot:{}", last_next.summary()) + } + } + } +} + +impl Inflight { + pub(crate) fn logs(prev: Option>, last: Option>) -> Self { + #![allow(clippy::nonminimal_bool)] + if !(prev < last) { + Self::None + } else { + Self::Logs(LogIdRange::new(prev, last)) + } + } + + pub(crate) fn snapshot(snapshot_last_log_id: Option>) -> Self { + Self::Snapshot { + last_log_id: snapshot_last_log_id, + } + } + + pub(crate) fn is_none(&self) -> bool { + &Inflight::None == self + } + + // test it if used + #[allow(dead_code)] + pub(crate) fn is_sending_log(&self) -> bool { + matches!(self, Inflight::Logs(_)) + } + + // test it if used + #[allow(dead_code)] + pub(crate) fn is_sending_snapshot(&self) -> bool { + matches!(self, Inflight::Snapshot { .. }) + } + + /// Update inflight state when log upto `upto` is acknowledged by a follower/learner. + pub(crate) fn ack(&mut self, upto: Option>) { + match self { + Inflight::None => { + unreachable!("no inflight data") + } + Inflight::Logs(logs) => { + *self = { + debug_assert!(upto >= logs.prev_log_id); + debug_assert!(upto <= logs.last_log_id); + Inflight::logs(upto, logs.last_log_id) + } + } + Inflight::Snapshot { last_log_id } => { + debug_assert_eq!(&upto, last_log_id); + *self = Inflight::None; + } + } + } + + /// Update inflight state when a conflicting log id is responded by a follower/learner. + pub(crate) fn conflict(&mut self, conflict: u64) { + match self { + Inflight::None => { + unreachable!("no inflight data") + } + Inflight::Logs(logs) => { + // if prev_log_id==None, it will never conflict + debug_assert_eq!(Some(conflict), logs.prev_log_id.index()); + *self = Inflight::None + } + Inflight::Snapshot { last_log_id: _ } => { + unreachable!("sending snapshot should not conflict"); + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::log_id_range::LogIdRange; + use crate::progress::Inflight; + use crate::LeaderId; + use crate::LogId; + + fn log_id(index: u64) -> LogId { + LogId { + leader_id: LeaderId { term: 1, node_id: 1 }, + index, + } + } + + #[test] + fn test_inflight_create() -> anyhow::Result<()> { + // Logs + let l = Inflight::logs(Some(log_id(5)), Some(log_id(10))); + assert_eq!(Inflight::Logs(LogIdRange::new(Some(log_id(5)), Some(log_id(10)))), l); + assert!(!l.is_none()); + + // Empty range + let l = Inflight::logs(Some(log_id(11)), Some(log_id(10))); + assert_eq!(Inflight::None, l); + assert!(l.is_none()); + + // Snapshot + let l = Inflight::snapshot(Some(log_id(10))); + assert_eq!( + Inflight::Snapshot { + last_log_id: Some(log_id(10)) + }, + l + ); + assert!(!l.is_none()); + + Ok(()) + } + + #[test] + fn test_inflight_is_xxx() -> anyhow::Result<()> { + let l = Inflight::::None; + assert!(l.is_none()); + + let l = Inflight::logs(Some(log_id(5)), Some(log_id(10))); + assert!(l.is_sending_log()); + + let l = Inflight::snapshot(Some(log_id(10))); + assert!(l.is_sending_snapshot()); + + Ok(()) + } + + #[test] + fn test_inflight_ack() -> anyhow::Result<()> { + // Update matching when no inflight data + { + let res = std::panic::catch_unwind(|| { + let mut f = Inflight::::None; + f.ack(Some(log_id(4))); + }); + tracing::info!("res: {:?}", res); + assert!(res.is_err(), "Inflight::None can not ack"); + } + + // Update matching when transmitting by logs + { + let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10))); + + f.ack(Some(log_id(5))); + assert_eq!(Inflight::logs(Some(log_id(5)), Some(log_id(10))), f); + + f.ack(Some(log_id(6))); + assert_eq!(Inflight::logs(Some(log_id(6)), Some(log_id(10))), f); + + f.ack(Some(log_id(9))); + assert_eq!(Inflight::logs(Some(log_id(9)), Some(log_id(10))), f); + + f.ack(Some(log_id(10))); + assert_eq!(Inflight::None, f); + + { + let res = std::panic::catch_unwind(|| { + let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10))); + f.ack(Some(log_id(4))); + }); + tracing::info!("res: {:?}", res); + assert!(res.is_err(), "non-matching ack < prev_log_id"); + } + + { + let res = std::panic::catch_unwind(|| { + let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10))); + f.ack(Some(log_id(11))); + }); + tracing::info!("res: {:?}", res); + assert!(res.is_err(), "non-matching ack > prev_log_id"); + } + } + + // Update matching when transmitting by snapshot + { + { + let mut f = Inflight::snapshot(Some(log_id(5))); + f.ack(Some(log_id(5))); + assert_eq!(Inflight::None, f, "valid ack"); + } + + { + let res = std::panic::catch_unwind(|| { + let mut f = Inflight::snapshot(Some(log_id(5))); + f.ack(Some(log_id(4))); + }); + tracing::info!("res: {:?}", res); + assert!(res.is_err(), "non-matching ack != snapshot.last_log_id"); + } + } + + Ok(()) + } + + #[test] + fn test_inflight_conflict() -> anyhow::Result<()> { + { + let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10))); + f.conflict(5); + assert_eq!(Inflight::None, f, "valid conflict"); + } + + { + let res = std::panic::catch_unwind(|| { + let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10))); + f.conflict(4); + }); + tracing::info!("res: {:?}", res); + assert!(res.is_err(), "non-matching conflict < prev_log_id"); + } + + { + let res = std::panic::catch_unwind(|| { + let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10))); + f.conflict(6); + }); + tracing::info!("res: {:?}", res); + assert!(res.is_err(), "non-matching conflict > prev_log_id"); + } + + { + let res = std::panic::catch_unwind(|| { + let mut f = Inflight::::None; + f.conflict(5); + }); + tracing::info!("res: {:?}", res); + assert!(res.is_err(), "conflict is not expected by Inflight::None"); + } + + { + let res = std::panic::catch_unwind(|| { + let mut f = Inflight::snapshot(Some(log_id(5))); + f.conflict(5); + }); + tracing::info!("res: {:?}", res); + assert!(res.is_err(), "conflict is not expected by Inflight::Snapshot"); + } + + Ok(()) + } +} diff --git a/openraft/src/progress/mod.rs b/openraft/src/progress/mod.rs index f36c47dde..8e55602a7 100644 --- a/openraft/src/progress/mod.rs +++ b/openraft/src/progress/mod.rs @@ -8,11 +8,15 @@ #[cfg(test)] mod bench; pub(crate) mod entry; +mod inflight; use std::borrow::Borrow; use std::fmt::Debug; use std::slice::Iter; +// TODO: remove it +#[allow(unused_imports)] pub(crate) use inflight::Inflight; + use crate::quorum::QuorumSet; /// Track progress of several incremental values.