diff --git a/core/src/consensus/mod.rs b/core/src/consensus/mod.rs index c962f38a55..9141133a9a 100644 --- a/core/src/consensus/mod.rs +++ b/core/src/consensus/mod.rs @@ -207,6 +207,7 @@ pub trait ConsensusEngine: Sync + Send { } /// Phase 3 verification. Check block information against parent. Returns either a null `Ok` or a general error detailing the problem with import. + /// The verifiaction must be conducted only with the two headers' information. fn verify_block_family(&self, _header: &Header, _parent: &Header) -> Result<(), Error> { Ok(()) } diff --git a/core/src/consensus/signer.rs b/core/src/consensus/signer.rs index 0bc0e7df95..8ccfe6836b 100644 --- a/core/src/consensus/signer.rs +++ b/core/src/consensus/signer.rs @@ -128,11 +128,6 @@ impl EngineSigner { self.signer.as_ref().map(|(address, _)| address) } - /// Check if the given address is the signing address. - pub fn is_address(&self, a: &Address) -> bool { - self.signer.map_or(false, |(address, _public)| *a == address) - } - /// Check if the signing address was set. pub fn is_some(&self) -> bool { self.signer.is_some() diff --git a/core/src/consensus/sortition/vrf_sortition.rs b/core/src/consensus/sortition/vrf_sortition.rs index 4125ebfd0a..64caadc86e 100644 --- a/core/src/consensus/sortition/vrf_sortition.rs +++ b/core/src/consensus/sortition/vrf_sortition.rs @@ -35,7 +35,7 @@ pub struct VRFSortition { pub vrf_inst: ECVRF, } -#[derive(Eq, PartialEq, Clone, Debug, RlpEncodable, RlpDecodable)] +#[derive(Eq, PartialEq, Clone, Default, Debug, RlpEncodable, RlpDecodable)] pub struct PriorityInfo { signer_idx: usize, priority: Priority, diff --git a/core/src/consensus/stake/action_data.rs b/core/src/consensus/stake/action_data.rs index acc1add3b3..5841d06ac5 100644 --- a/core/src/consensus/stake/action_data.rs +++ b/core/src/consensus/stake/action_data.rs @@ -236,7 +236,6 @@ impl<'a> Delegation<'a> { #[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd, RlpDecodable, RlpEncodable)] pub struct Validator { - weight: StakeQuantity, delegation: StakeQuantity, deposit: Deposit, pubkey: Public, @@ -245,7 +244,6 @@ pub struct Validator { impl Validator { pub fn new_for_test(delegation: StakeQuantity, deposit: Deposit, pubkey: Public) -> Self { Self { - weight: delegation, delegation, deposit, pubkey, @@ -254,17 +252,12 @@ impl Validator { fn new(delegation: StakeQuantity, deposit: Deposit, pubkey: Public) -> Self { Self { - weight: delegation, delegation, deposit, pubkey, } } - fn reset(&mut self) { - self.weight = self.delegation; - } - pub fn pubkey(&self) -> &Public { &self.pubkey } @@ -345,28 +338,6 @@ impl Validators { Ok(()) } - pub fn update_weight(&mut self, block_author: &Address) { - let min_delegation = self.min_delegation(); - for Validator { - weight, - pubkey, - .. - } in self.0.iter_mut().rev() - { - if public_to_address(pubkey) == *block_author { - // block author - *weight = weight.saturating_sub(min_delegation); - break - } - // neglecting validators - *weight = weight.saturating_sub(min_delegation * 2); - } - if self.0.iter().all(|validator| validator.weight == 0) { - self.0.iter_mut().for_each(Validator::reset); - } - self.0.sort_unstable(); - } - pub fn remove(&mut self, target: &Address) { self.0.retain( |Validator { @@ -379,10 +350,6 @@ impl Validators { pub fn delegation(&self, pubkey: &Public) -> Option<StakeQuantity> { self.0.iter().find(|validator| validator.pubkey == *pubkey).map(|&validator| validator.delegation) } - - fn min_delegation(&self) -> StakeQuantity { - self.0.iter().map(|&validator| validator.delegation).min().expect("There must be at least one validators") - } } impl Deref for Validators { @@ -1758,7 +1725,6 @@ mod tests { pubkey: *pubkey, deposit: 0, delegation: 0, - weight: 0, }) .collect(), ); diff --git a/core/src/consensus/stake/mod.rs b/core/src/consensus/stake/mod.rs index 02789499fe..8472e06ca2 100644 --- a/core/src/consensus/stake/mod.rs +++ b/core/src/consensus/stake/mod.rs @@ -341,12 +341,6 @@ pub fn move_current_to_previous_intermediate_rewards(state: &mut TopLevelState) rewards.save_to_state(state) } -pub fn update_validator_weights(state: &mut TopLevelState, block_author: &Address) -> StateResult<()> { - let mut validators = Validators::load_from_state(state)?; - validators.update_weight(block_author); - validators.save_to_state(state) -} - fn change_params( state: &mut TopLevelState, metadata_seq: u64, @@ -392,17 +386,16 @@ pub fn on_term_close( let current_term = metadata.current_term_id(); ctrace!(ENGINE, "on_term_close. current_term: {}", current_term); + let metadata = metadata.params().expect( + "Term close events can be called after the ChangeParams called, \ + so the metadata always has CommonParams", + ); + let custody_period = metadata.custody_period(); + let release_period = metadata.release_period(); + let (nomination_expiration, custody_until, kick_at) = { - let metadata = metadata.params().expect( - "Term close events can be called after the ChangeParams called, \ - so the metadata always has CommonParams", - ); let nomination_expiration = metadata.nomination_expiration(); assert_ne!(0, nomination_expiration); - let custody_period = metadata.custody_period(); - assert_ne!(0, custody_period); - let release_period = metadata.release_period(); - assert_ne!(0, release_period); (nomination_expiration, current_term + custody_period, current_term + release_period) }; @@ -412,7 +405,10 @@ pub fn on_term_close( let reverted: Vec<_> = expired.into_iter().chain(released).collect(); revert_delegations(state, &reverted)?; - jail(state, inactive_validators, custody_until, kick_at)?; + let jail_enabled = custody_period != 0 || release_period != 0; + if jail_enabled { + jail(state, inactive_validators, custody_until, kick_at)?; + } let validators = Validators::elect(state)?; validators.save_to_state(state)?; diff --git a/core/src/consensus/tendermint/backup.rs b/core/src/consensus/tendermint/backup.rs index 9465854848..95b8c879e8 100644 --- a/core/src/consensus/tendermint/backup.rs +++ b/core/src/consensus/tendermint/backup.rs @@ -14,41 +14,77 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use ctypes::BlockHash; use kvdb::{DBTransaction, KeyValueDB}; +use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream}; -use super::message::ConsensusMessage; +use super::message::{ConsensusMessage, SortitionRound}; use super::types::{Height, Step, View}; +use crate::consensus::PriorityInfo; use crate::db; use crate::db_version; const BACKUP_KEY: &[u8] = b"tendermint-backup"; const BACKUP_VERSION: u32 = 1; +pub struct PriorityInfoProjection(pub (SortitionRound, PriorityInfo)); + +impl PriorityInfoProjection { + pub fn get_view(&self) -> PriorityInfoProjectionView { + let (ref sortition_round, ref priority_info) = self.0; + PriorityInfoProjectionView((sortition_round, priority_info)) + } +} + +impl Decodable for PriorityInfoProjection { + fn decode(rlp: &Rlp) -> Result<Self, DecoderError> { + let item_count = rlp.item_count()?; + if item_count != 2 { + Err(DecoderError::RlpIncorrectListLen { + got: item_count, + expected: 2, + }) + } else { + Ok(Self((rlp.val_at(0)?, rlp.val_at(1)?))) + } + } +} + +pub struct PriorityInfoProjectionView<'a>(pub (&'a SortitionRound, &'a PriorityInfo)); + +impl Encodable for PriorityInfoProjectionView<'_> { + fn rlp_append(&self, s: &mut RlpStream) { + let (sortition_round, priority_info) = self.0; + s.begin_list(2).append(sortition_round).append(priority_info); + } +} + pub struct BackupView<'a> { pub height: &'a Height, pub view: &'a View, pub step: &'a Step, pub votes: &'a [ConsensusMessage], + pub priority_infos: &'a [PriorityInfoProjectionView<'a>], pub finalized_view_of_previous_block: &'a View, pub finalized_view_of_current_block: &'a Option<View>, } +#[derive(RlpDecodable)] pub struct BackupDataV0 { pub height: Height, pub view: View, pub step: Step, pub votes: Vec<ConsensusMessage>, - pub proposal: Option<BlockHash>, + pub priority_infos: Vec<PriorityInfoProjection>, pub last_confirmed_view: View, } +#[derive(RlpDecodable)] pub struct BackupDataV1 { pub height: Height, pub view: View, pub step: Step, pub votes: Vec<ConsensusMessage>, - pub proposal: Option<BlockHash>, + pub priority_infos: Vec<PriorityInfoProjection>, pub finalized_view_of_previous_block: View, pub finalized_view_of_current_block: Option<View>, } @@ -59,12 +95,14 @@ pub fn backup(db: &dyn KeyValueDB, backup_data: BackupView) { view, step, votes, + priority_infos, finalized_view_of_previous_block, finalized_view_of_current_block, } = backup_data; let mut s = rlp::RlpStream::new(); - s.begin_list(6); + s.begin_list(7); s.append(height).append(view).append(step).append_list(votes); + s.append_list(priority_infos); s.append(finalized_view_of_previous_block); s.append(finalized_view_of_current_block); @@ -83,19 +121,7 @@ pub fn restore(db: &dyn KeyValueDB) -> Option<BackupDataV1> { if version < BACKUP_VERSION { migrate(db); } - load_v1(db) -} - -fn find_proposal(votes: &[ConsensusMessage], height: Height, view: View) -> Option<BlockHash> { - votes - .iter() - .rev() - .map(|vote| &vote.on) - .find(|vote_on| { - vote_on.step.step == Step::Propose && vote_on.step.view == view && vote_on.step.height == height - }) - .map(|vote_on| vote_on.block_hash) - .unwrap_or(None) + load_with_version::<BackupDataV1>(db) } fn migrate(db: &dyn KeyValueDB) { @@ -114,7 +140,7 @@ fn migrate(db: &dyn KeyValueDB) { } fn migrate_from_0_to_1(db: &dyn KeyValueDB) { - let v0 = if let Some(v0) = load_v0(db) { + let v0 = if let Some(v0) = load_with_version::<BackupDataV0>(db) { v0 } else { return @@ -125,7 +151,7 @@ fn migrate_from_0_to_1(db: &dyn KeyValueDB) { view: v0.view, step: v0.step, votes: v0.votes, - proposal: v0.proposal, + priority_infos: v0.priority_infos, // This is not a correct behavior if step == Step::Commit. // In Commit state, the Tendermint module overwrote the last_confirmed_view to finalized_view_of_current_block. // So we can't restore finalized_view_of_previous block. @@ -142,59 +168,15 @@ fn migrate_from_0_to_1(db: &dyn KeyValueDB) { view: &v1.view, step: &v1.step, votes: &v1.votes, + priority_infos: &v1.priority_infos.iter().map(|projection| projection.get_view()).collect::<Vec<_>>(), finalized_view_of_previous_block: &v1.finalized_view_of_previous_block, finalized_view_of_current_block: &v1.finalized_view_of_current_block, }) } -fn load_v0(db: &dyn KeyValueDB) -> Option<BackupDataV0> { - let value = db.get(db::COL_EXTRA, BACKUP_KEY).expect("Low level database error. Some issue with disk?"); - let (height, view, step, votes, last_confirmed_view) = value.map(|bytes| { - let rlp = rlp::Rlp::new(&bytes); - ( - rlp.val_at(0).unwrap(), - rlp.val_at(1).unwrap(), - rlp.val_at(2).unwrap(), - rlp.at(3).unwrap().as_list().unwrap(), - rlp.val_at(4).unwrap(), - ) - })?; - - let proposal = find_proposal(&votes, height, view); - - Some(BackupDataV0 { - height, - view, - step, - votes, - proposal, - last_confirmed_view, - }) -} - -fn load_v1(db: &dyn KeyValueDB) -> Option<BackupDataV1> { - #[derive(RlpDecodable)] - struct Backup { - height: Height, - view: View, - step: Step, - votes: Vec<ConsensusMessage>, - finalized_view_of_previous_block: View, - finalized_view_of_current_block: Option<View>, - } - +fn load_with_version<T: Decodable>(db: &dyn KeyValueDB) -> Option<T> { let value = db.get(db::COL_EXTRA, BACKUP_KEY).expect("Low level database error. Some issue with disk?")?; - let backup: Backup = rlp::decode(&value).unwrap(); - - let proposal = find_proposal(&backup.votes, backup.height, backup.view); - - Some(BackupDataV1 { - height: backup.height, - view: backup.view, - step: backup.step, - votes: backup.votes, - proposal, - finalized_view_of_previous_block: backup.finalized_view_of_previous_block, - finalized_view_of_current_block: backup.finalized_view_of_current_block, - }) + let backup: T = rlp::decode(&value).unwrap(); + + Some(backup) } diff --git a/core/src/consensus/tendermint/engine.rs b/core/src/consensus/tendermint/engine.rs index dbfbf86198..6a39ef28c2 100644 --- a/core/src/consensus/tendermint/engine.rs +++ b/core/src/consensus/tendermint/engine.rs @@ -184,7 +184,6 @@ impl ConsensusEngine for Tendermint { self.machine.add_balance(block, &author, block_author_reward)?; } _ => { - stake::update_validator_weights(block.state_mut(), &author)?; stake::add_intermediate_rewards(block.state_mut(), author, block_author_reward)?; } } diff --git a/core/src/consensus/tendermint/message.rs b/core/src/consensus/tendermint/message.rs index 3de0721c62..60d90c5141 100644 --- a/core/src/consensus/tendermint/message.rs +++ b/core/src/consensus/tendermint/message.rs @@ -25,6 +25,7 @@ use snap; use super::super::BitSet; use super::{Height, Step, View}; +use crate::consensus::{Priority, PriorityInfo}; /// Step for the sortition round. /// FIXME: It has a large overlap with the previous VoteStep. @@ -119,17 +120,31 @@ const MESSAGE_ID_REQUEST_PROPOSAL: u8 = 0x05; const MESSAGE_ID_REQUEST_COMMIT: u8 = 0x06; const MESSAGE_ID_COMMIT: u8 = 0x07; +#[derive(Clone, Debug, PartialEq, RlpEncodable, RlpDecodable)] +#[cfg_attr(test, derive(Default))] +pub struct ProposalSummary { + pub priority_info: PriorityInfo, + pub block_hash: BlockHash, +} + +impl ProposalSummary { + pub fn priority(&self) -> Priority { + self.priority_info.priority() + } +} + #[derive(Debug, PartialEq)] pub enum TendermintMessage { ConsensusMessage(Vec<Bytes>), ProposalBlock { signature: SchnorrSignature, + priority_info: Box<PriorityInfo>, view: View, message: Bytes, }, StepState { vote_step: VoteStep, - proposal: Option<BlockHash>, + proposal: Box<Option<ProposalSummary>>, lock_view: Option<View>, known_votes: BitSet, }, @@ -138,8 +153,7 @@ pub enum TendermintMessage { requested_votes: BitSet, }, RequestProposal { - height: Height, - view: View, + round: SortitionRound, }, RequestCommit { height: Height, @@ -160,12 +174,14 @@ impl Encodable for TendermintMessage { } TendermintMessage::ProposalBlock { signature, + priority_info, view, message, } => { - s.begin_list(4); + s.begin_list(5); s.append(&MESSAGE_ID_PROPOSAL_BLOCK); s.append(signature); + s.append(&**priority_info); s.append(view); let compressed = { @@ -184,7 +200,7 @@ impl Encodable for TendermintMessage { s.begin_list(5); s.append(&MESSAGE_ID_STEP_STATE); s.append(vote_step); - s.append(proposal); + s.append(&**proposal); s.append(lock_view); s.append(known_votes); } @@ -198,13 +214,11 @@ impl Encodable for TendermintMessage { s.append(requested_votes); } TendermintMessage::RequestProposal { - height, - view, + round, } => { - s.begin_list(3); + s.begin_list(2); s.append(&MESSAGE_ID_REQUEST_PROPOSAL); - s.append(height); - s.append(view); + s.append(round); } TendermintMessage::RequestCommit { height, @@ -242,15 +256,16 @@ impl Decodable for TendermintMessage { } MESSAGE_ID_PROPOSAL_BLOCK => { let item_count = rlp.item_count()?; - if item_count != 4 { + if item_count != 5 { return Err(DecoderError::RlpIncorrectListLen { got: item_count, - expected: 4, + expected: 5, }) } let signature = rlp.at(1)?; - let view = rlp.at(2)?; - let compressed_message: Vec<u8> = rlp.val_at(3)?; + let priority_info = rlp.at(2)?; + let view = rlp.at(3)?; + let compressed_message: Vec<u8> = rlp.val_at(4)?; let uncompressed_message = { // TODO: Cache the Decoder object let mut snappy_decoder = snap::Decoder::new(); @@ -262,6 +277,7 @@ impl Decodable for TendermintMessage { TendermintMessage::ProposalBlock { signature: signature.as_val()?, + priority_info: Box::new(priority_info.as_val()?), view: view.as_val()?, message: uncompressed_message, } @@ -275,7 +291,7 @@ impl Decodable for TendermintMessage { }) } let vote_step = rlp.at(1)?.as_val()?; - let proposal = rlp.at(2)?.as_val()?; + let proposal = Box::new(rlp.at(2)?.as_val()?); let lock_view = rlp.at(3)?.as_val()?; let known_votes = rlp.at(4)?.as_val()?; TendermintMessage::StepState { @@ -302,17 +318,15 @@ impl Decodable for TendermintMessage { } MESSAGE_ID_REQUEST_PROPOSAL => { let item_count = rlp.item_count()?; - if item_count != 3 { + if item_count != 2 { return Err(DecoderError::RlpIncorrectListLen { got: item_count, - expected: 3, + expected: 2, }) } - let height = rlp.at(1)?.as_val()?; - let view = rlp.at(2)?.as_val()?; + let round = rlp.at(1)?.as_val()?; TendermintMessage::RequestProposal { - height, - view, + round, } } MESSAGE_ID_REQUEST_COMMIT => { @@ -422,6 +436,7 @@ mod tests { fn encode_and_decode_tendermint_message_2() { rlp_encode_and_decode_test!(TendermintMessage::ProposalBlock { signature: SchnorrSignature::random(), + priority_info: Box::new(PriorityInfo::new(1, 0xffu64.into(), 0, 1, vec![])), view: 1, message: vec![1u8, 2u8] }); @@ -433,7 +448,7 @@ mod tests { bit_set.set(2); rlp_encode_and_decode_test!(TendermintMessage::StepState { vote_step: VoteStep::new(10, 123, Step::Prevote), - proposal: Some(Default::default()), + proposal: Box::new(Some(Default::default())), lock_view: Some(2), known_votes: bit_set }); @@ -452,8 +467,10 @@ mod tests { #[test] fn encode_and_decode_tendermint_message_5() { rlp_encode_and_decode_test!(TendermintMessage::RequestProposal { - height: 10, - view: 123, + round: SortitionRound { + height: 10, + view: 123, + } }); } diff --git a/core/src/consensus/tendermint/mod.rs b/core/src/consensus/tendermint/mod.rs index 10d55cc2b3..16c5304ad6 100644 --- a/core/src/consensus/tendermint/mod.rs +++ b/core/src/consensus/tendermint/mod.rs @@ -35,7 +35,7 @@ use ctimer::TimerToken; use parking_lot::RwLock; use self::chain_notify::TendermintChainNotify; -pub use self::message::{ConsensusMessage, SortitionRound, VoteOn, VoteStep}; +pub use self::message::{ConsensusMessage, ProposalSummary, SortitionRound, VoteOn, VoteStep}; pub use self::params::{TendermintParams, TimeGapParams, TimeoutParams}; pub use self::types::{Height, Step, View}; use super::{stake, ValidatorSet}; @@ -45,11 +45,8 @@ use crate::ChainNotify; /// Timer token representing the consensus step timeouts. const ENGINE_TIMEOUT_TOKEN_NONCE_BASE: TimerToken = 23; -/// Timer token for empty proposal blocks. -const ENGINE_TIMEOUT_EMPTY_PROPOSAL: TimerToken = 22; /// Timer token for broadcasting step state. const ENGINE_TIMEOUT_BROADCAST_STEP_STATE: TimerToken = 21; - /// Unit: second const ENGINE_TIMEOUT_BROADCAT_STEP_STATE_INTERVAL: u64 = 1; diff --git a/core/src/consensus/tendermint/network.rs b/core/src/consensus/tendermint/network.rs index 84e942a021..a185a8aa0a 100644 --- a/core/src/consensus/tendermint/network.rs +++ b/core/src/consensus/tendermint/network.rs @@ -24,7 +24,6 @@ use ckey::SchnorrSignature; use cnetwork::{Api, NetworkExtension, NodeId}; use crossbeam_channel as crossbeam; use ctimer::TimerToken; -use ctypes::BlockHash; use primitives::Bytes; use rand::prelude::SliceRandom; use rand::thread_rng; @@ -33,13 +32,12 @@ use rlp::{Encodable, Rlp}; use super::super::BitSet; use super::message::*; use super::params::TimeoutParams; -use super::types::{Height, PeerState, Step, View}; +use super::types::{PeerState, Step, View}; use super::worker; -use crate::consensus::EngineError; +use crate::consensus::{EngineError, Priority, PriorityInfo}; use super::{ - ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_BROADCAT_STEP_STATE_INTERVAL, ENGINE_TIMEOUT_EMPTY_PROPOSAL, - ENGINE_TIMEOUT_TOKEN_NONCE_BASE, + ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_BROADCAT_STEP_STATE_INTERVAL, ENGINE_TIMEOUT_TOKEN_NONCE_BASE, }; pub struct TendermintExtension { @@ -74,7 +72,7 @@ impl TendermintExtension { &mut self, token: &NodeId, vote_step: VoteStep, - proposal: Option<BlockHash>, + proposal: Option<ProposalSummary>, messages: BitSet, ) { let peer_state = match self.peers.get_mut(token) { @@ -114,7 +112,7 @@ impl TendermintExtension { fn broadcast_state( &self, vote_step: VoteStep, - proposal: Option<BlockHash>, + proposal: Option<ProposalSummary>, lock_view: Option<View>, votes: BitSet, ) { @@ -123,7 +121,7 @@ impl TendermintExtension { let message = Arc::new( TendermintMessage::StepState { vote_step, - proposal, + proposal: Box::new(proposal), lock_view, known_votes: votes, } @@ -135,10 +133,17 @@ impl TendermintExtension { } } - fn broadcast_proposal_block(&self, signature: SchnorrSignature, view: View, message: Bytes) { + fn broadcast_proposal_block( + &self, + signature: SchnorrSignature, + priority_info: Box<PriorityInfo>, + view: View, + message: Bytes, + ) { let message = Arc::new( TendermintMessage::ProposalBlock { signature, + priority_info, message, view, } @@ -149,33 +154,32 @@ impl TendermintExtension { } } - fn request_proposal_to_any(&self, height: Height, view: View) { - for (token, peer) in &self.peers { - let is_future_height_and_view = { - let higher_height = peer.vote_step.height > height; - let same_height_and_higher_view = peer.vote_step.height == height && peer.vote_step.view > view; - higher_height || same_height_and_higher_view - }; - - if is_future_height_and_view { - self.request_proposal(token, height, view); - continue - } - - let is_same_height_and_view = peer.vote_step.height == height && peer.vote_step.view == view; - - if is_same_height_and_view && peer.proposal.is_some() { - self.request_proposal(token, height, view); - } - } + fn request_proposal_to_superiors(&self, round: SortitionRound, my_highest: Option<Priority>) { + // Request to future round peers + self.peers + .iter() + .filter(|(_, peer)| round < peer.vote_step.into()) + .for_each(|(token, _)| self.request_proposal(token, round)); + + let current_round_peers = self.peers.iter().filter(|(_, peer)| round == peer.vote_step.into()); + // Request to current round higher peers + if let Some(current_round_highest_priority) = current_round_peers + .clone() + .map(|(_id, peer)| peer.priority()) + .max() + .filter(|highest_priority| *highest_priority > my_highest) + { + current_round_peers + .filter(|(_id, peer)| peer.priority() == current_round_highest_priority) + .for_each(|(token, _)| self.request_proposal(token, round)) + }; } - fn request_proposal(&self, token: &NodeId, height: Height, view: View) { - ctrace!(ENGINE, "Request proposal {} {} to {:?}", height, view, token); + fn request_proposal(&self, token: &NodeId, round: SortitionRound) { + ctrace!(ENGINE, "Request proposal {:?} to {:?}", round, token); let message = Arc::new( TendermintMessage::RequestProposal { - height, - view, + round, } .rlp_bytes(), ); @@ -204,19 +208,11 @@ impl TendermintExtension { } fn set_timer_step(&self, step: Step, view: View, expired_token_nonce: TimerToken) { - self.api.clear_timer(ENGINE_TIMEOUT_EMPTY_PROPOSAL).expect("Timer clear succeeds"); self.api.clear_timer(expired_token_nonce).expect("Timer clear succeeds"); self.api .set_timer_once(expired_token_nonce + 1, self.timeouts.timeout(step, view)) .expect("Timer set succeeds"); } - - fn set_timer_empty_proposal(&self, view: View) { - self.api.clear_timer(ENGINE_TIMEOUT_EMPTY_PROPOSAL).expect("Timer clear succeeds"); - self.api - .set_timer_once(ENGINE_TIMEOUT_EMPTY_PROPOSAL, self.timeouts.timeout(Step::Propose, view) / 2) - .expect("Timer set succeeds"); - } } impl NetworkExtension<Event> for TendermintExtension { @@ -275,6 +271,7 @@ impl NetworkExtension<Event> for TendermintExtension { } Ok(TendermintMessage::ProposalBlock { signature, + priority_info, view, message, }) => { @@ -282,6 +279,7 @@ impl NetworkExtension<Event> for TendermintExtension { self.inner .send(worker::Event::ProposalBlock { signature, + priority_info, view, message: message.clone(), result, @@ -307,33 +305,54 @@ impl NetworkExtension<Event> for TendermintExtension { lock_view, known_votes, ); - self.update_peer_state(token, vote_step, proposal, known_votes); - let (result, receiver) = crossbeam::unbounded(); - self.inner - .send(worker::Event::StepState { - token: *token, - vote_step, - proposal, - lock_view, - known_votes: Box::from(known_votes), - result, - }) - .unwrap(); - - while let Ok(message) = receiver.recv() { - self.api.send(token, Arc::new(message)); + let unchanged = match self.peers.get(token) { + Some(peer_state) => *proposal == peer_state.proposal, + None => false, + }; + let verified = unchanged || { + (*proposal) + .clone() + .map(|summary| { + let (result, receiver) = crossbeam::bounded(1); + self.inner + .send(worker::Event::VerifyPriorityInfo { + height: vote_step.height, + view: vote_step.view, + priority_info: summary.priority_info, + result, + }) + .unwrap(); + receiver.recv().unwrap().unwrap_or(false) + }) + .unwrap_or(true) + }; + if verified { + self.update_peer_state(token, vote_step, (*proposal).clone(), known_votes); + let (result, receiver) = crossbeam::unbounded(); + self.inner + .send(worker::Event::StepState { + token: *token, + vote_step, + proposal: *proposal, + lock_view, + known_votes: Box::from(known_votes), + result, + }) + .unwrap(); + + while let Ok(message) = receiver.recv() { + self.api.send(token, Arc::new(message)); + } } } Ok(TendermintMessage::RequestProposal { - height, - view, + round, }) => { let (result, receiver) = crossbeam::bounded(1); self.inner .send(worker::Event::RequestProposal { token: *token, - height, - view, + round, result, }) .unwrap(); @@ -403,11 +422,7 @@ impl NetworkExtension<Event> for TendermintExtension { } fn on_timeout(&mut self, token: TimerToken) { - debug_assert!( - token >= ENGINE_TIMEOUT_TOKEN_NONCE_BASE - || token == ENGINE_TIMEOUT_EMPTY_PROPOSAL - || token == ENGINE_TIMEOUT_BROADCAST_STEP_STATE - ); + debug_assert!(token >= ENGINE_TIMEOUT_TOKEN_NONCE_BASE || token == ENGINE_TIMEOUT_BROADCAST_STEP_STATE); self.inner.send(worker::Event::OnTimeout(token)).unwrap(); } @@ -424,7 +439,7 @@ impl NetworkExtension<Event> for TendermintExtension { lock_view, votes, } => { - self.broadcast_state(vote_step, proposal, lock_view, votes); + self.broadcast_state(vote_step, *proposal, lock_view, votes); } Event::RequestMessagesToAll { vote_step, @@ -432,28 +447,24 @@ impl NetworkExtension<Event> for TendermintExtension { } => { self.request_messages_to_all(vote_step, requested_votes); } - Event::RequestProposalToAny { - height, - view, + Event::RequestProposalToSuperiors { + round, + current_highest, } => { - self.request_proposal_to_any(height, view); + self.request_proposal_to_superiors(round, current_highest); } Event::SetTimerStep { step, view, expired_token_nonce, } => self.set_timer_step(step, view, expired_token_nonce), - Event::SetTimerEmptyProposal { - view, - } => { - self.set_timer_empty_proposal(view); - } Event::BroadcastProposalBlock { signature, + priority_info, view, message, } => { - self.broadcast_proposal_block(signature, view, message); + self.broadcast_proposal_block(signature, priority_info, view, message); } } } @@ -465,7 +476,7 @@ pub enum Event { }, BroadcastState { vote_step: VoteStep, - proposal: Option<BlockHash>, + proposal: Box<Option<ProposalSummary>>, lock_view: Option<View>, votes: BitSet, }, @@ -473,20 +484,18 @@ pub enum Event { vote_step: VoteStep, requested_votes: BitSet, }, - RequestProposalToAny { - height: Height, - view: View, + RequestProposalToSuperiors { + round: SortitionRound, + current_highest: Option<Priority>, }, SetTimerStep { step: Step, view: View, expired_token_nonce: TimerToken, }, - SetTimerEmptyProposal { - view: View, - }, BroadcastProposalBlock { signature: SchnorrSignature, + priority_info: Box<PriorityInfo>, view: View, message: Bytes, }, diff --git a/core/src/consensus/tendermint/types.rs b/core/src/consensus/tendermint/types.rs index 3092eac591..b44567817a 100644 --- a/core/src/consensus/tendermint/types.rs +++ b/core/src/consensus/tendermint/types.rs @@ -22,25 +22,64 @@ use primitives::Bytes; use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream}; use super::super::BitSet; -use super::message::VoteStep; +use super::message::{ProposalSummary, VoteStep}; use crate::block::{IsBlock, SealedBlock}; -use crate::consensus::sortition::SeedInfo; +use crate::consensus::{sortition::seed::SeedInfo, Priority, PriorityInfo}; pub type Height = u64; pub type View = u64; +#[derive(Clone)] +pub struct ProposeInner { + wait_block_generation: Option<(PriorityInfo, BlockHash)>, + wait_imported: Vec<(PriorityInfo, SealedBlock)>, +} + +impl ProposeInner { + pub fn generation_completed(&mut self) -> Option<(PriorityInfo, BlockHash)> { + self.wait_block_generation.take() + } + + pub fn generation_halted(&mut self) { + self.wait_block_generation = None; + } + + fn import_completed(&mut self, target_block_hash: BlockHash) -> Option<(PriorityInfo, SealedBlock)> { + let position = self + .wait_imported + .iter() + .position(|(_, sealed_block)| sealed_block.header().hash() == target_block_hash)?; + Some(self.wait_imported.remove(position)) + } + + fn wait_block_generation(&mut self, my_priority_info: PriorityInfo, parent_hash: BlockHash) { + self.wait_block_generation = Some((my_priority_info, parent_hash)); + } + + fn wait_imported(&mut self, target_priority_info: PriorityInfo, target_block: SealedBlock) { + self.wait_imported.insert(0, (target_priority_info, target_block)); + } + + pub fn get_wait_block_generation(&self) -> &Option<(PriorityInfo, BlockHash)> { + &self.wait_block_generation + } +} + +impl fmt::Debug for ProposeInner { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "waiting block generation {:?} and waiting block imports {:?}", + self.wait_block_generation, + self.wait_imported.iter().map(|(_, sealed)| sealed.header().hash()).collect::<Vec<_>>() + ) + } +} + #[derive(Clone)] pub enum TendermintState { - Propose, - ProposeWaitBlockGeneration { - parent_hash: BlockHash, - }, - ProposeWaitImported { - block: Box<SealedBlock>, - }, - ProposeWaitEmptyBlockTimer { - block: Box<SealedBlock>, - }, + // wait block generation + Propose(Box<ProposeInner>), Prevote, Precommit, Commit { @@ -54,16 +93,50 @@ pub enum TendermintState { } impl TendermintState { + pub fn new_propose_step() -> Self { + TendermintState::Propose(Box::new(ProposeInner { + wait_block_generation: None, + wait_imported: Vec::new(), + })) + } + + pub fn generation_completed(&mut self) -> Option<(PriorityInfo, BlockHash)> { + if let Self::Propose(inner) = self { + inner.generation_completed() + } else { + None + } + } + + pub fn generation_halted(&mut self) { + if let Self::Propose(inner) = self { + inner.generation_halted() + } + } + + pub fn import_completed(&mut self, target_block_hash: BlockHash) -> Option<(PriorityInfo, SealedBlock)> { + if let Self::Propose(inner) = self { + inner.import_completed(target_block_hash) + } else { + None + } + } + + pub fn wait_block_generation(&mut self, my_priority_info: PriorityInfo, parent_hash: BlockHash) { + if let Self::Propose(inner) = self { + inner.wait_block_generation(my_priority_info, parent_hash); + } + } + + pub fn wait_imported(&mut self, target_priority_info: PriorityInfo, target_block: SealedBlock) { + if let Self::Propose(inner) = self { + inner.wait_imported(target_priority_info, target_block) + } + } + pub fn to_step(&self) -> Step { match self { - TendermintState::Propose => Step::Propose, - TendermintState::ProposeWaitBlockGeneration { - .. - } => Step::Propose, - TendermintState::ProposeWaitImported { - .. - } => Step::Propose, - TendermintState::ProposeWaitEmptyBlockTimer { + TendermintState::Propose { .. } => Step::Propose, TendermintState::Prevote => Step::Prevote, @@ -77,15 +150,6 @@ impl TendermintState { } } - pub fn is_propose_wait_empty_block_timer(&self) -> bool { - match self { - TendermintState::ProposeWaitEmptyBlockTimer { - .. - } => true, - _ => false, - } - } - pub fn is_commit(&self) -> bool { match self { TendermintState::Commit { @@ -117,14 +181,7 @@ impl TendermintState { block_hash, view, } => Some((*view, *block_hash)), - TendermintState::Propose => None, - TendermintState::ProposeWaitBlockGeneration { - .. - } => None, - TendermintState::ProposeWaitImported { - .. - } => None, - TendermintState::ProposeWaitEmptyBlockTimer { + TendermintState::Propose { .. } => None, TendermintState::Prevote => None, @@ -136,16 +193,7 @@ impl TendermintState { impl fmt::Debug for TendermintState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - TendermintState::Propose => write!(f, "TendermintState::Propose"), - TendermintState::ProposeWaitBlockGeneration { - parent_hash, - } => write!(f, "TendermintState::ProposeWaitBlockGeneration({})", parent_hash), - TendermintState::ProposeWaitImported { - block, - } => write!(f, "TendermintState::ProposeWaitImported({})", block.header().hash()), - TendermintState::ProposeWaitEmptyBlockTimer { - block, - } => write!(f, "TendermintState::ProposeWaitEmptyBlockTimer({})", block.header().hash()), + TendermintState::Propose(inner) => write!(f, "TenderminState::Propose, {:?}", inner), TendermintState::Prevote => write!(f, "TendermintState::Prevote"), TendermintState::Precommit => write!(f, "TendermintState::Precommit"), TendermintState::Commit { @@ -200,7 +248,7 @@ impl Encodable for Step { pub struct PeerState { pub vote_step: VoteStep, - pub proposal: Option<BlockHash>, + pub proposal: Option<ProposalSummary>, pub messages: BitSet, } @@ -212,6 +260,10 @@ impl PeerState { messages: BitSet::new(), } } + + pub fn priority(&self) -> Option<Priority> { + self.proposal.as_ref().map(|summary| summary.priority()) + } } pub struct TendermintSealView<'a> { @@ -305,43 +357,3 @@ impl TwoThirdsMajority { } } } - -#[derive(Debug, PartialEq)] -pub enum Proposal { - ProposalReceived(BlockHash, Bytes, SchnorrSignature), - ProposalImported(BlockHash), - None, -} - -impl Proposal { - pub fn new_received(hash: BlockHash, block: Bytes, signature: SchnorrSignature) -> Self { - Proposal::ProposalReceived(hash, block, signature) - } - - pub fn new_imported(hash: BlockHash) -> Self { - Proposal::ProposalImported(hash) - } - - pub fn block_hash(&self) -> Option<BlockHash> { - match self { - Proposal::ProposalReceived(hash, ..) => Some(*hash), - Proposal::ProposalImported(hash) => Some(*hash), - Proposal::None => None, - } - } - - pub fn imported_block_hash(&self) -> Option<BlockHash> { - match self { - Proposal::ProposalReceived(..) => None, - Proposal::ProposalImported(hash) => Some(*hash), - Proposal::None => None, - } - } - - pub fn is_none(&self) -> bool { - match self { - Proposal::None => true, - _ => false, - } - } -} diff --git a/core/src/consensus/tendermint/vote_collector.rs b/core/src/consensus/tendermint/vote_collector.rs index f96216fa66..defb3c203a 100644 --- a/core/src/consensus/tendermint/vote_collector.rs +++ b/core/src/consensus/tendermint/vote_collector.rs @@ -14,25 +14,89 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use std::collections::{BTreeMap, HashMap}; -use std::iter::Iterator; +use std::collections::{btree_set::Iter, BTreeMap, BTreeSet, HashMap}; +use std::iter::{Iterator, Rev}; use ckey::SchnorrSignature; use ctypes::BlockHash; use rlp::{Encodable, RlpStream}; +use super::super::{Priority, PriorityInfo}; use super::stake::Action; -use super::{ConsensusMessage, VoteStep}; +use super::{ConsensusMessage, ProposalSummary, SortitionRound, Step, VoteStep}; use crate::consensus::BitSet; /// Storing all Proposals, Prevotes and Precommits. +/// Invariant: Proposal step links to StepCollector::PP variant +/// and Other steps link to StepCollector::PVPC variant #[derive(Debug)] pub struct VoteCollector { votes: BTreeMap<VoteStep, StepCollector>, } +#[derive(Debug)] +enum StepCollector { + PP(PpCollector), + PVPC(PvPcCollector), +} + +impl StepCollector { + fn new_pp() -> Self { + StepCollector::PP(Default::default()) + } + + fn new_pvpc() -> Self { + StepCollector::PVPC(Default::default()) + } + + fn insert_message(&mut self, message: ConsensusMessage) -> Result<bool, DoubleVote> { + match self { + StepCollector::PP(pp_collector) => pp_collector.message_collector.insert(message), + StepCollector::PVPC(pv_pc_collector) => pv_pc_collector.message_collector.insert(message), + } + } + + fn insert_priority(&mut self, info: PriorityInfo) -> bool { + match self { + StepCollector::PP(pp_collector) => pp_collector.priority_collector.insert(info), + _ => panic!("Invariant violated: propose step must be linked to PpCollector"), + } + } + + fn message_collector(&self) -> &MessageCollector { + match self { + StepCollector::PP(pp_collector) => &pp_collector.message_collector, + StepCollector::PVPC(pv_pc_collector) => &pv_pc_collector.message_collector, + } + } + + fn priority_collector(&self) -> &PriorityCollector { + match self { + StepCollector::PP(pp_collector) => &pp_collector.priority_collector, + _ => panic!("Invariant violated: propose step must be linked to PpCollector"), + } + } +} + +// Struct for propose step vote and priority collecting +#[derive(Debug, Default)] +struct PpCollector { + message_collector: MessageCollector, + priority_collector: PriorityCollector, +} + +#[derive(Debug, Default)] +struct PvPcCollector { + message_collector: MessageCollector, +} + +#[derive(Debug, Default)] +struct PriorityCollector { + priorities: BTreeSet<PriorityInfo>, +} + #[derive(Debug, Default)] -struct StepCollector { +struct MessageCollector { voted: HashMap<usize, ConsensusMessage>, block_votes: HashMap<Option<BlockHash>, BTreeMap<usize, SchnorrSignature>>, messages: Vec<ConsensusMessage>, @@ -60,7 +124,27 @@ impl Encodable for DoubleVote { } } -impl StepCollector { +impl PriorityCollector { + // true: a priority is new + // false: a priority is duplicated + fn insert(&mut self, info: PriorityInfo) -> bool { + self.priorities.insert(info) + } + + fn get_highest(&self) -> Option<PriorityInfo> { + self.priorities.iter().rev().next().cloned() + } + + fn iter_from_highest(&self) -> Rev<Iter<'_, PriorityInfo>> { + self.priorities.iter().rev() + } + + fn iter(&self) -> Iter<PriorityInfo> { + self.priorities.iter() + } +} + +impl MessageCollector { /// Some(true): a message is new /// Some(false): a message is duplicated /// Err(DoubleVote): a double vote @@ -108,13 +192,18 @@ impl StepCollector { } result } + + /// get a ConsensusMessage corresponding to a certain index. + fn fetch_by_idx(&self, idx: usize) -> Option<ConsensusMessage> { + self.voted.get(&idx).cloned() + } } impl Default for VoteCollector { fn default() -> Self { let mut collector = BTreeMap::new(); // Insert dummy entry to fulfill invariant: "only messages newer than the oldest are inserted". - collector.insert(Default::default(), Default::default()); + collector.insert(Default::default(), StepCollector::new_pp()); VoteCollector { votes: collector, } @@ -124,12 +213,18 @@ impl Default for VoteCollector { impl VoteCollector { /// Insert vote if it is newer than the oldest one. pub fn collect(&mut self, message: ConsensusMessage) -> Result<bool, DoubleVote> { - self.votes.entry(*message.round()).or_insert_with(Default::default).insert(message) + match message.round().step { + Step::Propose => { + self.votes.entry(*message.round()).or_insert_with(StepCollector::new_pp).insert_message(message) + } + _ => self.votes.entry(*message.round()).or_insert_with(StepCollector::new_pvpc).insert_message(message), + } } /// Checks if the message should be ignored. pub fn is_old_or_known(&self, message: &ConsensusMessage) -> bool { - let is_known = self.votes.get(&message.round()).map_or(false, |c| c.messages.contains(message)); + let is_known = + self.votes.get(&message.round()).map_or(false, |c| c.message_collector().messages.contains(message)); if is_known { cdebug!(ENGINE, "Known message: {:?}.", message); return true @@ -161,7 +256,7 @@ impl VoteCollector { ) -> (Vec<SchnorrSignature>, Vec<usize>) { self.votes .get(round) - .and_then(|c| c.block_votes.get(&Some(*block_hash))) + .and_then(|c| c.message_collector().block_votes.get(&Some(*block_hash))) .map(|votes| { let (indices, sigs) = votes.iter().unzip(); (sigs, indices) @@ -174,14 +269,14 @@ impl VoteCollector { pub fn round_signature(&self, round: &VoteStep, block_hash: &BlockHash) -> Option<SchnorrSignature> { self.votes .get(round) - .and_then(|c| c.block_votes.get(&Some(*block_hash))) + .and_then(|c| c.message_collector().block_votes.get(&Some(*block_hash))) .and_then(|votes| votes.values().next().cloned()) } /// Count votes which agree with the given message. pub fn aligned_votes(&self, message: &ConsensusMessage) -> BitSet { if let Some(votes) = self.votes.get(&message.round()) { - votes.count_block(&message.block_hash()) + votes.message_collector().count_block(&message.block_hash()) } else { Default::default() } @@ -189,7 +284,7 @@ impl VoteCollector { pub fn block_round_votes(&self, round: &VoteStep, block_hash: &Option<BlockHash>) -> BitSet { if let Some(votes) = self.votes.get(round) { - votes.count_block(block_hash) + votes.message_collector().count_block(block_hash) } else { Default::default() } @@ -198,37 +293,107 @@ impl VoteCollector { /// Count all votes collected for a given round. pub fn round_votes(&self, vote_round: &VoteStep) -> BitSet { if let Some(votes) = self.votes.get(vote_round) { - votes.count() + votes.message_collector().count() } else { Default::default() } } - pub fn get_block_hashes(&self, round: &VoteStep) -> Vec<BlockHash> { - self.votes - .get(round) - .map(|c| c.block_votes.keys().cloned().filter_map(|x| x).collect()) - .unwrap_or_else(Vec::new) - } - pub fn has_votes_for(&self, round: &VoteStep, block_hash: BlockHash) -> bool { let votes = self .votes .get(round) - .map(|c| c.block_votes.keys().cloned().filter_map(|x| x).collect()) + .map(|c| c.message_collector().block_votes.keys().cloned().filter_map(|x| x).collect()) .unwrap_or_else(Vec::new); votes.into_iter().any(|vote_block_hash| vote_block_hash == block_hash) } + pub fn fetch_by_idx(&self, round: &VoteStep, idx: usize) -> Option<ConsensusMessage> { + self.votes.get(round).and_then(|collector| collector.message_collector().fetch_by_idx(idx)) + } + pub fn get_all(&self) -> Vec<ConsensusMessage> { - self.votes.iter().flat_map(|(_round, collector)| collector.messages.clone()).collect() + self.votes.iter().flat_map(|(_round, collector)| collector.message_collector().messages.clone()).collect() } pub fn get_all_votes_in_round(&self, round: &VoteStep) -> Vec<ConsensusMessage> { - self.votes.get(round).map(|c| c.messages.clone()).unwrap_or_default() + self.votes.get(round).map(|c| c.message_collector().messages.clone()).unwrap_or_default() } pub fn get_all_votes_and_indices_in_round(&self, round: &VoteStep) -> Vec<(usize, ConsensusMessage)> { - self.votes.get(round).map(|c| c.voted.iter().map(|(k, v)| (*k, v.clone())).collect()).unwrap_or_default() + self.votes + .get(round) + .map(|c| c.message_collector().voted.iter().map(|(k, v)| (*k, v.clone())).collect()) + .unwrap_or_default() + } +} + +impl VoteCollector { + pub fn collect_priority(&mut self, sortition_round: SortitionRound, info: PriorityInfo) -> bool { + self.votes.entry(sortition_round.into()).or_insert_with(StepCollector::new_pp).insert_priority(info) + } + + pub fn get_highest_priority_info(&self, sortition_round: SortitionRound) -> Option<PriorityInfo> { + self.votes + .get(&sortition_round.into()) + .and_then(|step_collector| step_collector.priority_collector().get_highest()) + } + + pub fn get_highest_priority(&self, sortition_round: SortitionRound) -> Option<Priority> { + self.get_highest_priority_info(sortition_round).map(|priority_info| priority_info.priority()) + } + + pub fn get_highest_proposal_hash(&self, sortition_round: SortitionRound) -> Option<BlockHash> { + self.votes.get(&sortition_round.into()).and_then(|step_collector| { + let highest_priority_idx = + step_collector.priority_collector().get_highest().map(|priority_info| priority_info.signer_idx())?; + step_collector + .message_collector() + .fetch_by_idx(highest_priority_idx) + .and_then(|priority_message| priority_message.block_hash()) + }) + } + + pub fn get_highest_proposal_summary(&self, sortition_round: SortitionRound) -> Option<ProposalSummary> { + let block_hash = self.get_highest_proposal_hash(sortition_round)?; + let priority_info = self.get_highest_priority_info(sortition_round)?; + Some(ProposalSummary { + priority_info, + block_hash, + }) + } + + pub fn block_hashes_from_highest(&self, sortition_round: SortitionRound) -> Vec<BlockHash> { + match self.votes.get(&sortition_round.into()) { + Some(step_collector) => { + let message_collector = step_collector.message_collector(); + let priority_iter_from_highest = step_collector.priority_collector().iter_from_highest(); + priority_iter_from_highest + .map(|priority_info| { + message_collector + .fetch_by_idx(priority_info.signer_idx()) + .expect("Signer index was verified") + .block_hash() + .expect("Proposal vote always have BlockHash") + }) + .collect() + } + None => vec![], + } + } + + pub fn get_all_priority_infos(&self) -> Vec<(SortitionRound, PriorityInfo)> { + self.votes + .iter() + .filter(|(vote_step, _)| vote_step.step == Step::Propose) + .flat_map(|(&vote_step, collector)| { + let round: SortitionRound = vote_step.into(); + collector + .priority_collector() + .iter() + .map(|priority_info| (round, priority_info.clone())) + .collect::<Vec<_>>() + }) + .collect() } } diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index d13545fe33..56d75a5120 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -29,28 +29,31 @@ use ctypes::util::unexpected::Mismatch; use ctypes::{BlockHash, BlockNumber, Header}; use primitives::{u256_from_u128, Bytes, U256}; use rlp::{Encodable, Rlp}; +use vrf::openssl::{CipherSuite, ECVRF}; use super::super::BitSet; -use super::backup::{backup, restore, BackupView}; +use super::backup::{backup, restore, BackupView, PriorityInfoProjection, PriorityInfoProjectionView}; use super::message::*; use super::network; use super::params::TimeGapParams; use super::stake::CUSTOM_ACTION_HANDLER_ID; -use super::types::{Height, Proposal, Step, TendermintSealView, TendermintState, TwoThirdsMajority, View}; +use super::types::{Height, Step, TendermintSealView, TendermintState, TwoThirdsMajority, View}; use super::vote_collector::{DoubleVote, VoteCollector}; use super::vote_regression_checker::VoteRegressionChecker; -use super::{ - ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_EMPTY_PROPOSAL, ENGINE_TIMEOUT_TOKEN_NONCE_BASE, SEAL_FIELDS, -}; +use super::{ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_TOKEN_NONCE_BASE, SEAL_FIELDS}; use crate::account_provider::AccountProvider; use crate::block::*; use crate::client::ConsensusClient; use crate::consensus::signer::EngineSigner; use crate::consensus::validator_set::{DynamicValidator, ValidatorSet}; -use crate::consensus::{sortition::VRFSeed, EngineError, Seal}; +use crate::consensus::{ + sortition::seed::{SeedInfo, VRFSeed}, + EngineError, Priority, PriorityInfo, Seal, VRFSortition, +}; use crate::encoded; use crate::error::{BlockError, Error}; use crate::transaction::{SignedTransaction, UnverifiedTransaction}; +use crate::types::BlockStatus; use crate::views::BlockView; use crate::BlockId; use std::cell::Cell; @@ -83,8 +86,6 @@ struct Worker { signer: EngineSigner, /// Last majority last_two_thirds_majority: TwoThirdsMajority, - /// hash of the proposed block, used for seal submission. - proposal: Proposal, /// The finalized view of the previous height's block. /// The signatures for the previous block is signed for the view below. finalized_view_of_previous_block: View, @@ -94,6 +95,8 @@ struct Worker { validators: Arc<DynamicValidator>, /// Channel to the network extension, must be set later. extension: EventSender<network::Event>, + // VRF sortition scheme, + sortition_scheme: VRFSortition, time_gap_params: TimeGapParams, timeout_token_nonce: usize, vote_regression_checker: VoteRegressionChecker, @@ -139,22 +142,28 @@ pub enum Event { Restore(crossbeam::Sender<()>), ProposalBlock { signature: SchnorrSignature, + priority_info: Box<PriorityInfo>, view: View, message: Bytes, result: crossbeam::Sender<Option<Arc<dyn ConsensusClient>>>, }, + VerifyPriorityInfo { + height: Height, + view: View, + priority_info: PriorityInfo, + result: crossbeam::Sender<Option<bool>>, + }, StepState { token: NodeId, vote_step: VoteStep, - proposal: Option<BlockHash>, + proposal: Option<ProposalSummary>, lock_view: Option<View>, known_votes: Box<BitSet>, result: crossbeam::Sender<Bytes>, }, RequestProposal { token: NodeId, - height: Height, - view: View, + round: SortitionRound, result: crossbeam::Sender<Bytes>, }, GetAllVotesAndAuthors { @@ -185,15 +194,19 @@ impl Worker { client, height: 1, view: 0, - step: TendermintState::Propose, + step: TendermintState::new_propose_step(), votes: Default::default(), signer: Default::default(), last_two_thirds_majority: TwoThirdsMajority::Empty, - proposal: Proposal::None, finalized_view_of_previous_block: 0, finalized_view_of_current_block: None, validators, extension, + sortition_scheme: VRFSortition { + total_power: 1000, + expectation: 7.0, + vrf_inst: ECVRF::from_suite(CipherSuite::SECP256K1_SHA256_SVDW).unwrap(), + }, votes_received: MutTrigger::new(BitSet::new()), time_gap_params, timeout_token_nonce: ENGINE_TIMEOUT_TOKEN_NONCE_BASE, @@ -314,11 +327,12 @@ impl Worker { } Ok(Event::ProposalBlock { signature, + priority_info, view, message, result, }) => { - let client = inner.on_proposal_message(signature, view, message); + let client = inner.on_proposal_message(signature, *priority_info, view, message); result.send(client).unwrap(); } Ok(Event::StepState { @@ -328,11 +342,10 @@ impl Worker { } Ok(Event::RequestProposal { token, - height, - view, + round, result, }) => { - inner.on_request_proposal_message(&token, height, view, result); + inner.on_request_proposal_message(&token, round, result); } Ok(Event::GetAllVotesAndAuthors { vote_step, @@ -355,6 +368,14 @@ impl Worker { let client = inner.on_commit_message(block, votes); result.send(client).unwrap(); } + Ok(Event::VerifyPriorityInfo { + height, + view, + priority_info, + result + }) => { + result.send(inner.verify_priority_info(height, view, priority_info)).unwrap(); + } Err(crossbeam::RecvError) => { cerror!(ENGINE, "The event channel for tendermint thread had been closed."); break @@ -389,28 +410,15 @@ impl Worker { .hash() } - fn prev_vrf_seed(&self) -> VRFSeed { - let parent_header = - self.prev_block_header_of_height(self.height).expect("Height is increased when previous block is imported"); - let parent_seal = parent_header.seal(); - let seal_view = TendermintSealView::new(&parent_seal); - seal_view.vrf_seed().unwrap() - } - - /// Get the index of the proposer of a block to check the new proposer is valid. - fn block_proposer_idx(&self, block_hash: BlockHash) -> Option<usize> { - self.client().block_header(&BlockId::Hash(block_hash)).map(|header| { - let proposer = header.author(); - let parent = if header.number() == 0 { - // Genesis block's parent is not exist - // FIXME: The DynamicValidator should handle the Genesis block correctly. - block_hash - } else { - header.parent_hash() - }; + fn fetch_vrf_seed_info(&self, block_hash: BlockHash) -> Option<SeedInfo> { + let block_header = self.client().block_header(&block_hash.into())?; + let block_seal = block_header.seal(); + Some(TendermintSealView::new(&block_seal).vrf_seed_info().expect("Seal fields was verified")) + } - self.validators.get_index_by_address(&parent, &proposer).expect("The proposer must be in the validator set") - }) + fn prev_vrf_seed_of_height(&self, height: Height) -> Option<VRFSeed> { + self.prev_block_header_of_height(height) + .and_then(|parent_header| self.fetch_vrf_seed_info(parent_header.hash()).map(|seed_info| *seed_info.seed())) } /// Get previous block header of given height @@ -451,24 +459,15 @@ impl Worker { false } - /// Find the designated for the given view. - fn view_proposer(&self, prev_block_hash: &BlockHash, view: View) -> Option<Address> { - self.validators.next_block_proposer(prev_block_hash, view) - } + fn highest_proposal_at(&self, sortition_round: SortitionRound) -> Option<(SchnorrSignature, PriorityInfo, Bytes)> { + let vote_step = sortition_round.into(); - fn first_proposal_at(&self, height: Height, view: View) -> Option<(SchnorrSignature, usize, Bytes)> { - let vote_step = VoteStep { - height, - view, - step: Step::Propose, - }; - - let all_votes = self.votes.get_all_votes_in_round(&vote_step); - let proposal = all_votes.first()?; + let highest_priority_info = self.votes.get_highest_priority_info(sortition_round)?; + let proposal = self.votes.fetch_by_idx(&vote_step, highest_priority_info.signer_idx())?; let block_hash = proposal.on.block_hash.expect("Proposal message always include block hash"); let bytes = self.client().block(&BlockId::Hash(block_hash))?.into_inner(); - Some((proposal.signature, proposal.signer_index, bytes)) + Some((proposal.signature, highest_priority_info, bytes)) } fn is_proposal_received(&self, height: Height, view: View, block_hash: BlockHash) -> bool { @@ -491,10 +490,6 @@ impl Worker { } } - fn need_proposal(&self) -> bool { - self.proposal.is_none() && !self.step.is_commit() - } - fn get_all_votes_and_authors( &self, vote_step: &VoteStep, @@ -511,30 +506,35 @@ impl Worker { } } - /// Check if address is a proposer for given view. - fn check_view_proposer( - &self, - parent: &BlockHash, - height: Height, - view: View, - address: &Address, - ) -> Result<(), EngineError> { - let proposer = self.view_proposer(parent, view).ok_or_else(|| EngineError::PrevBlockNotExist { - height: height as u64, - })?; - if proposer == *address { - Ok(()) - } else { - Err(EngineError::NotProposer(Mismatch { - expected: proposer, - found: *address, - })) - } + fn calculate_current_seed(&mut self) -> SeedInfo { + let parent_seed = self.prev_vrf_seed_of_height(self.height).expect("Parent seed must exist"); + let new_proof = self + .signer + .vrf_prove(&parent_seed.generate_next_msg(self.height, self.view), &mut self.sortition_scheme.vrf_inst) + .expect("Signer key was verified"); + let new_seed = self.sortition_scheme.vrf_inst.proof_to_hash(&new_proof).expect("Correctly generated proof"); + SeedInfo::new(self.signer_index().expect("The signer is a validator"), new_seed, new_proof) + } + + /// Check if current signer is eligible to be a proposer + fn is_signer_highest(&mut self, parent_hash: &BlockHash) -> bool { + self.votes + .get_highest_priority_info(self.current_sortition_round()) + .map(|priority_info| { + self.validators.get(parent_hash, priority_info.signer_idx()) + == *self.signer.public().expect("Engine signer must be set") + }) + .unwrap_or(true) } - /// Check if current signer is the current proposer. - fn is_signer_proposer(&self, bh: &BlockHash) -> bool { - self.view_proposer(bh, self.view).map_or(false, |proposer| self.signer.is_address(&proposer)) + fn signer_priority_info(&mut self, parent_block_hash: BlockHash) -> Option<PriorityInfo> { + let parent_seed = + self.prev_vrf_seed_of_height(self.height).expect("Next height propose step has previous height seed"); + let signer_idx = self.signer_index()?; + let voting_power = self.get_voting_power(self.height - 1, &parent_block_hash, signer_idx); + self.sortition_scheme + .create_highest_priority_info(&parent_seed.round_msg(self.view), &self.signer, signer_idx, voting_power) + .ok()? } fn is_step(&self, message: &ConsensusMessage) -> bool { @@ -583,14 +583,14 @@ impl Worker { fn broadcast_state( &self, vote_step: VoteStep, - proposal: Option<BlockHash>, + proposal: Option<ProposalSummary>, lock_view: Option<View>, votes: &BitSet, ) { self.extension .send(network::Event::BroadcastState { vote_step, - proposal, + proposal: Box::new(proposal), lock_view, votes: *votes, }) @@ -606,11 +606,11 @@ impl Worker { .unwrap(); } - fn request_proposal_to_any(&self, height: Height, view: View) { + fn request_proposal_to_superiors(&self, round: SortitionRound, current_highest: Option<Priority>) { self.extension - .send(network::Event::RequestProposalToAny { - height, - view, + .send(network::Event::RequestProposalToSuperiors { + round, + current_highest, }) .unwrap(); } @@ -631,7 +631,6 @@ impl Worker { fn increment_view(&mut self, n: View) { cinfo!(ENGINE, "increment_view: New view."); self.view += n; - self.proposal = Proposal::None; self.votes_received = MutTrigger::new(BitSet::new()); } @@ -648,7 +647,6 @@ impl Worker { self.last_two_thirds_majority = TwoThirdsMajority::Empty; self.height += 1; self.view = 0; - self.proposal = Proposal::None; self.votes_received = MutTrigger::new(BitSet::new()); self.finalized_view_of_previous_block = self.finalized_view_of_current_block.expect("self.step == Step::Commit"); @@ -666,7 +664,6 @@ impl Worker { self.last_two_thirds_majority = TwoThirdsMajority::Empty; self.height = height; self.view = 0; - self.proposal = Proposal::None; self.votes_received = MutTrigger::new(BitSet::new()); self.finalized_view_of_previous_block = finalized_view_of_previous_height; self.finalized_view_of_current_block = None; @@ -702,42 +699,44 @@ impl Worker { // need to reset vote self.broadcast_state( vote_step, - self.proposal.block_hash(), + self.votes.get_highest_proposal_summary(self.current_sortition_round()), self.last_two_thirds_majority.view(), self.votes_received.borrow_anyway(), ); match state.to_step() { Step::Propose => { cinfo!(ENGINE, "move_to_step: Propose."); - // If there are multiple proposals, use the first proposal. - if let Some(hash) = self.votes.get_block_hashes(&vote_step).first() { - if self.client().block(&BlockId::Hash(*hash)).is_none() { - cwarn!(ENGINE, "Proposal is received but not imported"); - // Proposal is received but is not verified yet. - // Wait for verification. - return - } - self.proposal = Proposal::new_imported(*hash); - self.move_to_step(TendermintState::Prevote, is_restoring); - return - } let parent_block_hash = self.prev_block_hash(); - if !self.is_signer_proposer(&parent_block_hash) { - self.request_proposal_to_any(vote_step.height, vote_step.view); - return - } - if let TwoThirdsMajority::Lock(lock_view, locked_block_hash) = self.last_two_thirds_majority { - cinfo!(ENGINE, "I am a proposer, I'll re-propose a locked block"); - match self.locked_proposal_block(lock_view, locked_block_hash) { - Ok(block) => self.repropose_block(block), - Err(error_msg) => cwarn!(ENGINE, "{}", error_msg), + if let Some(priority_info) = self.signer_priority_info(parent_block_hash) { + if let TwoThirdsMajority::Lock(lock_view, locked_block_hash) = self.last_two_thirds_majority { + cinfo!(ENGINE, "I am eligible to be a proposer, round-info {}-{} and {}. I'll re-propose a locked block with priority", + self.height, + self.view, + priority_info, + ); + match self.locked_proposal_block(lock_view, locked_block_hash) { + Ok(block) => self.repropose_block(priority_info, block), + Err(error_msg) => cwarn!(ENGINE, "{}", error_msg), + } + } else { + cinfo!(ENGINE, "I am eligible to be a proposer, round-info {}-{} and {}. I'll create a block with priority", + self.height, + self.view, + priority_info, + ); + self.update_sealing(parent_block_hash); + self.step.wait_block_generation(priority_info, parent_block_hash); } } else { - cinfo!(ENGINE, "I am a proposer, I'll create a block"); - self.update_sealing(parent_block_hash); - self.step = TendermintState::ProposeWaitBlockGeneration { - parent_hash: parent_block_hash, - }; + let sortition_round = vote_step.into(); + let round_highest_priority = self.votes.get_highest_priority(sortition_round); + cinfo!( + ENGINE, + "I am not eligible to be a proposer, round-info {}-{}. I'll request a proposal", + self.height, + self.view, + ); + self.request_proposal_to_superiors(sortition_round, round_highest_priority); } } Step::Prevote => { @@ -748,8 +747,8 @@ impl Worker { self.request_messages_to_all(vote_step, &BitSet::all_set() - &self.votes_received); if !self.already_generated_message() { let block_hash_candidate = match &self.last_two_thirds_majority { - TwoThirdsMajority::Empty => self.proposal.imported_block_hash(), - TwoThirdsMajority::Unlock(_) => self.proposal.imported_block_hash(), + TwoThirdsMajority::Empty => self.highest_imported_block_hash(), + TwoThirdsMajority::Unlock(_) => self.highest_imported_block_hash(), TwoThirdsMajority::Lock(_, block_hash) => Some(*block_hash), }; let block_hash = block_hash_candidate.filter(|hash| { @@ -820,6 +819,21 @@ impl Worker { } } + fn highest_imported_block_hash(&self) -> Option<BlockHash> { + match self.step { + TendermintState::Prevote => { + self.votes.block_hashes_from_highest(self.current_sortition_round()).into_iter().find(|block_hash| { + if let BlockStatus::InChain = self.client().block_status(&(*block_hash).into()) { + true + } else { + false + } + }) + } + _ => panic!(), + } + } + fn is_generation_time_relevant(&self, block_header: &Header) -> bool { let acceptable_past_gap = self.time_gap_params.allowed_past_gap; let acceptable_future_gap = self.time_gap_params.allowed_future_gap; @@ -844,7 +858,9 @@ impl Worker { let received_locked_block = self.votes.has_votes_for(&vote_step, locked_proposal_hash); if !received_locked_block { - self.request_proposal_to_any(self.height, locked_view); + let sortition_round = vote_step.into(); + let round_highest_priority = self.votes.get_highest_priority(sortition_round); + self.request_proposal_to_superiors(sortition_round, round_highest_priority); return Err(format!("Have a lock on {}-{}, but do not received a locked proposal", self.height, locked_view)) } @@ -908,7 +924,7 @@ impl Worker { { if self.can_move_from_commit_to_propose() { self.move_to_the_next_height(); - self.move_to_step(TendermintState::Propose, is_restoring); + self.move_to_step(TendermintState::new_propose_step(), is_restoring); return } @@ -930,7 +946,7 @@ impl Worker { let next_step = match self.step { TendermintState::Precommit if message.on.block_hash.is_none() && has_enough_aligned_votes => { self.increment_view(1); - Some(TendermintState::Propose) + Some(TendermintState::new_propose_step()) } // Avoid counting votes twice. TendermintState::Prevote if lock_change => Some(TendermintState::Precommit), @@ -978,67 +994,50 @@ impl Worker { step: Step::Propose, }); + let proposal_hash = proposal.hash(); let current_height = self.height; let current_vote_step = VoteStep::new(self.height, self.view, self.step.to_step()); - let proposal_is_for_current = self.votes.has_votes_for(¤t_vote_step, proposal.hash()); + let proposal_is_for_current = self.votes.has_votes_for(¤t_vote_step, proposal_hash); if proposal_is_for_current { - self.proposal = Proposal::new_imported(proposal.hash()); - let current_step = self.step.clone(); - match current_step { - TendermintState::Propose => { - self.move_to_step(TendermintState::Prevote, false); - } - TendermintState::ProposeWaitImported { - block, - } => { - if !block.transactions().is_empty() { - cinfo!(ENGINE, "Submitting proposal block {}", block.header().hash()); - self.move_to_step(TendermintState::Prevote, false); - self.broadcast_proposal_block(self.view, encoded::Block::new(block.rlp_bytes())); - } else { - ctrace!(ENGINE, "Empty proposal is generated, set timer"); - self.step = TendermintState::ProposeWaitEmptyBlockTimer { - block, - }; - self.extension - .send(network::Event::SetTimerEmptyProposal { - view: self.view, - }) - .unwrap(); - } - } - TendermintState::ProposeWaitEmptyBlockTimer { - .. - } => unreachable!(), - _ => {} - }; + if let Some((priority_info, imported_block)) = self.step.import_completed(proposal_hash) { + cinfo!(ENGINE, "Submitting proposal block {}", proposal_hash); + match self.highest_proposal_at(self.current_sortition_round()) { + None => self.broadcast_proposal_block( + self.view, + priority_info, + encoded::Block::new(imported_block.rlp_bytes()), + ), + Some((_, highest_priority_info, _)) if priority_info >= highest_priority_info => self + .broadcast_proposal_block( + self.view, + priority_info, + encoded::Block::new(imported_block.rlp_bytes()), + ), + _ => (), + }; + } } else if current_height < height { let finalized_view_of_previous_height = TendermintSealView::new(proposal.seal()).parent_block_finalized_view().unwrap(); self.jump_to_height(height, finalized_view_of_previous_height); - - let proposal_is_for_view0 = self.votes.has_votes_for( - &VoteStep { - height, - view: 0, - step: Step::Propose, - }, - proposal.hash(), - ); - if proposal_is_for_view0 { - self.proposal = Proposal::new_imported(proposal.hash()) - } - self.move_to_step(TendermintState::Prevote, false); + self.move_to_step(TendermintState::new_propose_step(), false); } } fn backup(&self) { + let priority_infos = self.votes.get_all_priority_infos(); + let priority_info_projection_views = priority_infos + .iter() + .map(|(round, priority_info)| PriorityInfoProjectionView((&round, &priority_info))) + .collect::<Vec<PriorityInfoProjectionView>>(); + backup(self.client().get_kvdb().as_ref(), BackupView { height: &self.height, view: &self.view, step: &self.step.to_step(), votes: &self.votes.get_all(), + priority_infos: &priority_info_projection_views, finalized_view_of_previous_block: &self.finalized_view_of_previous_block, finalized_view_of_current_block: &self.finalized_view_of_current_block, }); @@ -1049,7 +1048,7 @@ impl Worker { let backup = restore(client.get_kvdb().as_ref()); if let Some(backup) = backup { let backup_step = match backup.step { - Step::Propose => TendermintState::Propose, + Step::Propose => TendermintState::new_propose_step(), Step::Prevote => TendermintState::Prevote, Step::Precommit => TendermintState::Precommit, // If the backuped step is `Commit`, we should start at `Precommit` to update the @@ -1060,15 +1059,14 @@ impl Worker { self.step = backup_step; self.height = backup.height; self.view = backup.view; + backup.priority_infos.into_iter().for_each(|projection| match projection { + PriorityInfoProjection((round, priority_info)) => { + self.votes.collect_priority(round, priority_info); + } + }); self.finalized_view_of_previous_block = backup.finalized_view_of_previous_block; self.finalized_view_of_current_block = backup.finalized_view_of_current_block; - if let Some(proposal) = backup.proposal { - if client.block(&BlockId::Hash(proposal)).is_some() { - self.proposal = Proposal::ProposalImported(proposal); - } - } - for vote in backup.votes { let bytes = rlp::encode(&vote); if let Err(err) = self.handle_message(&bytes, true) { @@ -1082,39 +1080,39 @@ impl Worker { SEAL_FIELDS } - fn generate_seal(&self, height: Height, parent_hash: BlockHash) -> Seal { + fn generate_seal(&mut self, height: Height, parent_hash: BlockHash) -> Seal { // Block is received from other nodes while creating a block if height < self.height { + self.step.generation_halted(); return Seal::None } - // We don't know at which view the node starts generating a block. - // If this node's signer is not proposer at the current view, return none. - if !self.is_signer_proposer(&parent_hash) { - cwarn!(ENGINE, "Seal request for an old view"); - return Seal::None - } - - assert_eq!(Proposal::None, self.proposal); - assert_eq!(height, self.height); + if self.is_signer_highest(&parent_hash) { + assert_eq!(height, self.height); - let view = self.view; + let view = self.view; + let current_seed = self.calculate_current_seed(); - let last_block_view = &self.finalized_view_of_previous_block; - assert_eq!(self.prev_block_hash(), parent_hash); + let last_block_view = &self.finalized_view_of_previous_block; + assert_eq!(self.prev_block_hash(), parent_hash); - let (precommits, precommit_indices) = self - .votes - .round_signatures_and_indices(&VoteStep::new(height - 1, *last_block_view, Step::Precommit), &parent_hash); - ctrace!(ENGINE, "Collected seal: {:?}({:?})", precommits, precommit_indices); - let precommit_bitset = BitSet::new_with_indices(&precommit_indices); - Seal::Tendermint { - prev_view: *last_block_view, - cur_view: view, - precommits, - precommit_bitset, - vrf_seed: self.prev_vrf_seed(), - vrf_seed_proof: vec![], + let (precommits, precommit_indices) = self.votes.round_signatures_and_indices( + &VoteStep::new(height - 1, *last_block_view, Step::Precommit), + &parent_hash, + ); + ctrace!(ENGINE, "Collected seal: {:?}({:?})", precommits, precommit_indices); + let precommit_bitset = BitSet::new_with_indices(&precommit_indices); + Seal::Tendermint { + prev_view: *last_block_view, + cur_view: view, + precommits, + precommit_bitset, + vrf_seed_info: Box::new(current_seed), + } + } else { + self.step.generation_halted(); + cdebug!(ENGINE, "Seal generation halted because a higher priority is accepted"); + Seal::None } } @@ -1137,31 +1135,22 @@ impl Worker { } let header = sealed_block.header(); + let parent_hash = header.parent_hash(); - if let TendermintState::ProposeWaitBlockGeneration { - parent_hash: expected_parent_hash, - } = self.step - { - let parent_hash = header.parent_hash(); - assert_eq!( - *parent_hash, expected_parent_hash, - "Generated hash({:?}) is different from expected({:?})", - parent_hash, expected_parent_hash - ); - } else { - ctrace!( - ENGINE, - "Proposal is generated after step is changed. Expected step is ProposeWaitBlockGeneration but current step is {:?}", - self.step, - ); - return - } - debug_assert_eq!(Ok(self.view), TendermintSealView::new(header.seal()).author_view()); - - self.vote_on_header_for_proposal(&header).expect("I'm a proposer"); + match self.step.generation_completed() { + Some((target_priority_info, expected_parent_hash)) if expected_parent_hash == *parent_hash => { + debug_assert_eq!(Ok(self.view), TendermintSealView::new(header.seal()).author_view()); - self.step = TendermintState::ProposeWaitImported { - block: Box::new(sealed_block.clone()), + self.vote_on_header_for_proposal(target_priority_info.clone(), &header).expect("I'm a proposer"); + self.step.wait_imported(target_priority_info, sealed_block.clone()); + } + Some((_, expected_parent_hash)) => ctrace!( + ENGINE, + "Generated parent hash({:?}) is different from the expected one in this round({:?}). The generated block may be an old one", + parent_hash, + expected_parent_hash + ), + _ => ctrace!(ENGINE, "Block generation is unexpected in this round. The generated block may be an old one"), }; } @@ -1191,7 +1180,45 @@ impl Worker { Ok(()) } - fn verify_block_external(&self, header: &Header) -> Result<(), Error> { + fn verify_priority_info(&mut self, height: Height, view: View, priority_info: PriorityInfo) -> Option<bool> { + let parent_seed = self.prev_vrf_seed_of_height(height)?; + let parent_hash = self.client().block_hash(&(height - 1).into())?; + let signer_idx = priority_info.signer_idx(); + let signer_public = self.validators.get(&parent_hash, signer_idx); + let voting_power = self.get_voting_power(height - 1, &parent_hash, signer_idx); + + priority_info + .verify(&parent_seed.round_msg(view), &signer_public, voting_power, &mut self.sortition_scheme) + .ok() + } + + fn verify_seed(&mut self, header: &Header, parent: &Header) -> Result<(), Error> { + let current_seal_view = TendermintSealView::new(&header.seal()); + let current_seed_signer_idx = current_seal_view.vrf_seed_info().expect("Seal field verified").signer_idx(); + let current_signer_public = self.validators.get(&parent.hash(), current_seed_signer_idx); + + let parent_seal_view = TendermintSealView::new(&parent.seal()); + + let parent_seed_info = parent_seal_view.vrf_seed_info().map_err(|_| BlockError::InvalidSeal)?; + let current_seed_info = current_seal_view.vrf_seed_info().map_err(|_| BlockError::InvalidSeal)?; + + match current_seed_info.verify( + header.number(), + current_seal_view.author_view().map_err(|_| BlockError::InvalidSeal)?, + parent_seed_info.seed(), + ¤t_signer_public, + &mut self.sortition_scheme.vrf_inst, + ) { + Ok(true) => Ok(()), + _ => Err(BlockError::InvalidSeal.into()), + } + } + + fn verify_block_external(&mut self, header: &Header) -> Result<(), Error> { + let parent_header = + self.client().block_header(&(*header.parent_hash()).into()).expect("The parent block must exist"); + self.verify_seed(header, &parent_header.decode())?; + let height = header.number() as usize; let author_view = TendermintSealView::new(header.seal()).author_view()?; ctrace!(ENGINE, "Verify external at {}-{}, {:?}", height, author_view, header); @@ -1199,7 +1226,6 @@ impl Worker { if !self.is_authority(header.parent_hash(), proposer) { return Err(EngineError::BlockNotAuthorized(*proposer).into()) } - self.check_view_proposer(header.parent_hash(), header.number(), author_view, &proposer)?; let seal_view = TendermintSealView::new(header.seal()); let bitset_count = seal_view.bitset()?.count(); let precommits_count = seal_view.precommits().item_count()?; @@ -1229,11 +1255,7 @@ impl Worker { }; let mut voted_validators = BitSet::new(); - let grand_parent_hash = self - .client() - .block_header(&(*header.parent_hash()).into()) - .expect("The parent block must exist") - .parent_hash(); + let grand_parent_hash = parent_header.parent_hash(); for (bitset_index, signature) in seal_view.signatures()? { let public = self.validators.get(&grand_parent_hash, bitset_index); if !verify_schnorr(&public, &signature, &precommit_vote_on.hash())? { @@ -1257,40 +1279,11 @@ impl Worker { } fn on_timeout(&mut self, token: usize) { - // Timeout from empty block generation - if token == ENGINE_TIMEOUT_EMPTY_PROPOSAL { - let block = if self.step.is_propose_wait_empty_block_timer() { - let previous = mem::replace(&mut self.step, TendermintState::Propose); - match previous { - TendermintState::ProposeWaitEmptyBlockTimer { - block, - } => block, - _ => unreachable!(), - } - } else { - cwarn!(ENGINE, "Empty proposal timer was not cleared."); - return - }; - - // When self.height != block.header().number() && "propose timeout" is already called, - // the state is stuck and can't move to Prevote. We should change the step to Prevote. - self.move_to_step(TendermintState::Prevote, false); - if self.height == block.header().number() { - cdebug!(ENGINE, "Empty proposal timer is finished, go to the prevote step and broadcast the block"); - cinfo!(ENGINE, "Submitting proposal block {}", block.header().hash()); - self.broadcast_proposal_block(self.view, encoded::Block::new(block.rlp_bytes())); - } else { - cwarn!(ENGINE, "Empty proposal timer was for previous height."); - } - - return - } - if token == ENGINE_TIMEOUT_BROADCAST_STEP_STATE { if let Some(votes_received) = self.votes_received.borrow_if_mutated() { self.broadcast_state( self.vote_step(), - self.proposal.block_hash(), + self.votes.get_highest_proposal_summary(self.current_sortition_round()), self.last_two_thirds_majority.view(), votes_received, ); @@ -1303,29 +1296,15 @@ impl Worker { return } - let next_step = match self.step { - TendermintState::Propose => { - cinfo!(ENGINE, "Propose timeout."); + let next_step = match &self.step { + TendermintState::Propose(inner) => { + if let Some(wait_block_generation) = inner.get_wait_block_generation() { + cwarn!(ENGINE, "Propose timed out but block {:?} is not generated yet", wait_block_generation); + return + } + cinfo!(ENGINE, "Propose timed out"); TendermintState::Prevote } - TendermintState::ProposeWaitBlockGeneration { - .. - } => { - cwarn!(ENGINE, "Propose timed out but block is not generated yet"); - return - } - TendermintState::ProposeWaitImported { - .. - } => { - cwarn!(ENGINE, "Propose timed out but still waiting for the block imported"); - return - } - TendermintState::ProposeWaitEmptyBlockTimer { - .. - } => { - cwarn!(ENGINE, "Propose timed out but still waiting for the empty block"); - return - } TendermintState::Prevote if self.has_enough_any_votes() => { cinfo!(ENGINE, "Prevote timeout."); TendermintState::Precommit @@ -1337,7 +1316,7 @@ impl Worker { TendermintState::Precommit if self.has_enough_any_votes() => { cinfo!(ENGINE, "Precommit timeout."); self.increment_view(1); - TendermintState::Propose + TendermintState::new_propose_step() } TendermintState::Precommit => { cinfo!(ENGINE, "Precommit timeout without enough votes."); @@ -1349,20 +1328,20 @@ impl Worker { } => { cinfo!(ENGINE, "Commit timeout."); - let proposal_imported = self.client().block(&block_hash.into()).is_some(); + let proposal_imported = self.client().block(&(*block_hash).into()).is_some(); let best_block_header = self.client().best_block_header(); - if !proposal_imported || best_block_header.hash() != block_hash { + if !proposal_imported || best_block_header.hash() != *block_hash { cwarn!(ENGINE, "Best chain is not updated yet, wait until imported"); self.step = TendermintState::CommitTimedout { - block_hash, - view, + block_hash: *block_hash, + view: *view, }; return } self.move_to_the_next_height(); - TendermintState::Propose + TendermintState::new_propose_step() } TendermintState::CommitTimedout { .. @@ -1509,26 +1488,24 @@ impl Worker { !self.has_enough_precommit_votes(block_hash) } - fn repropose_block(&mut self, block: encoded::Block) { + fn repropose_block(&mut self, priority_info: PriorityInfo, block: encoded::Block) { let header = block.decode_header(); - self.vote_on_header_for_proposal(&header).expect("I am proposer"); - self.proposal = Proposal::new_imported(header.hash()); - self.broadcast_proposal_block(self.view, block); + self.vote_on_header_for_proposal(priority_info.clone(), &header).expect("I am eligible to be a proposer"); + debug_assert_eq!(self.client().block_status(&header.hash().into()), BlockStatus::InChain); + self.broadcast_proposal_block(self.view, priority_info, block); } - fn broadcast_proposal_block(&self, view: View, block: encoded::Block) { + fn broadcast_proposal_block(&self, view: View, priority_info: PriorityInfo, block: encoded::Block) { let header = block.decode_header(); let hash = header.hash(); - let parent_hash = header.parent_hash(); let vote_step = VoteStep::new(header.number() as Height, view, Step::Propose); cdebug!(ENGINE, "Send proposal {:?}", vote_step); - assert!(self.is_signer_proposer(&parent_hash)); - let signature = self.votes.round_signature(&vote_step, &hash).expect("Proposal vote is generated before"); self.extension .send(network::Event::BroadcastProposalBlock { signature, + priority_info: Box::new(priority_info), view, message: block.into_inner(), }) @@ -1567,15 +1544,16 @@ impl Worker { Ok(Some(vote)) } - fn vote_on_header_for_proposal(&mut self, header: &Header) -> Result<ConsensusMessage, Error> { + fn vote_on_header_for_proposal( + &mut self, + priority_info: PriorityInfo, + header: &Header, + ) -> Result<ConsensusMessage, Error> { assert!(header.number() == self.height); - - let parent_hash = header.parent_hash(); - let prev_proposer_idx = self.block_proposer_idx(*parent_hash).expect("Prev block must exists"); - let signer_index = self.validators.proposer_index(*parent_hash, prev_proposer_idx, self.view as usize); + let signer_index = self.signer_index().expect("I am a validator"); let on = VoteOn { - step: VoteStep::new(self.height, self.view, Step::Propose), + step: self.current_sortition_round().into(), block_hash: Some(header.hash()), }; assert!(self.vote_regression_checker.check(&on), "Vote should not regress"); @@ -1588,6 +1566,7 @@ impl Worker { on, }; + self.votes.collect_priority(self.current_sortition_round(), priority_info); self.votes.collect(vote.clone()).expect("Must not attempt double vote on proposal"); cinfo!(ENGINE, "Voted {:?} as {}th proposer.", vote, signer_index); Ok(vote) @@ -1597,12 +1576,9 @@ impl Worker { &self, header: &Header, proposed_view: View, + signer_index: usize, signature: SchnorrSignature, ) -> Option<ConsensusMessage> { - let prev_proposer_idx = self.block_proposer_idx(*header.parent_hash())?; - let signer_index = - self.validators.proposer_index(*header.parent_hash(), prev_proposer_idx, proposed_view as usize); - let on = VoteOn { step: VoteStep::new(header.number(), proposed_view, Step::Propose), block_hash: Some(header.hash()), @@ -1641,7 +1617,7 @@ impl Worker { cdebug!(ENGINE, "Committed block {} is now the best block", committed_block_hash); if self.can_move_from_commit_to_propose() { self.move_to_the_next_height(); - self.move_to_step(TendermintState::Propose, false); + self.move_to_step(TendermintState::new_propose_step(), false); return } } @@ -1673,7 +1649,7 @@ impl Worker { } } if height_at_begin != self.height { - self.move_to_step(TendermintState::Propose, false); + self.move_to_step(TendermintState::new_propose_step(), false); } if let Some(last_proposal_header) = last_proposal_header { self.on_imported_proposal(&last_proposal_header); @@ -1684,12 +1660,14 @@ impl Worker { fn send_proposal_block( &self, signature: SchnorrSignature, + priority_info: PriorityInfo, view: View, message: Bytes, result: crossbeam::Sender<Bytes>, ) { let message = TendermintMessage::ProposalBlock { signature, + priority_info: Box::new(priority_info), message, view, } @@ -1713,11 +1691,10 @@ impl Worker { result.send(message).unwrap(); } - fn send_request_proposal(&self, token: &NodeId, height: Height, view: View, result: &crossbeam::Sender<Bytes>) { - ctrace!(ENGINE, "Request proposal {} {} to {:?}", height, view, token); + fn send_request_proposal(&self, token: &NodeId, round: SortitionRound, result: &crossbeam::Sender<Bytes>) { + ctrace!(ENGINE, "Request proposal {:?} to {:?}", round, token); let message = TendermintMessage::RequestProposal { - height, - view, + round, } .rlp_bytes(); result.send(message).unwrap(); @@ -1741,9 +1718,24 @@ impl Worker { result.send(message.rlp_bytes()).unwrap(); } + fn get_voting_power(&self, height: u64, block_hash: &BlockHash, signer_idx: usize) -> u64 { + self.validators + .normalized_voting_power(height, block_hash, signer_idx, self.sortition_scheme.total_power) + .unwrap() + } + + #[inline] + fn current_sortition_round(&self) -> SortitionRound { + SortitionRound { + height: self.height, + view: self.view, + } + } + fn on_proposal_message( &mut self, signature: SchnorrSignature, + priority_info: PriorityInfo, proposed_view: View, bytes: Bytes, ) -> Option<Arc<dyn ConsensusClient>> { @@ -1754,7 +1746,14 @@ impl Worker { let block_view = BlockView::new(&bytes); let header_view = block_view.header(); let number = header_view.number(); - cinfo!(ENGINE, "Proposal received for {}-{:?}", number, header_view.hash()); + cinfo!( + ENGINE, + "Proposal received for ({},{})-{:?}. The priority info is {}", + number, + proposed_view, + header_view.hash(), + priority_info, + ); let parent_hash = header_view.parent_hash(); { @@ -1770,13 +1769,14 @@ impl Worker { return None } } - let message = match self.recover_proposal_vote(&header_view, proposed_view, signature) { - Some(vote) => vote, - None => { - cwarn!(ENGINE, "Prev block proposer does not exist for height {}", number); - return None - } - }; + let message = + match self.recover_proposal_vote(&header_view, proposed_view, priority_info.signer_idx(), signature) { + Some(vote) => vote, + None => { + cwarn!(ENGINE, "Prev block proposer does not exist for height {}", number); + return None + } + }; // If the proposal's height is current height + 1 and the proposal has valid precommits, // we should import it and increase height @@ -1790,7 +1790,7 @@ impl Worker { return None } - let signer_public = self.validators.get(&parent_hash, message.signer_index); + let signer_public = self.validators.get(&parent_hash, priority_info.signer_idx()); match message.verify(&signer_public) { Ok(false) => { cwarn!(ENGINE, "Proposal verification failed: signer is different"); @@ -1803,11 +1803,45 @@ impl Worker { _ => {} } + // priority verification block + { + let parent_seed = self.prev_vrf_seed_of_height(number)?; + let signer_idx = priority_info.signer_idx(); + + let voting_power = self.get_voting_power(number - 1, &parent_hash, signer_idx); + match priority_info.verify( + &parent_seed.round_msg(proposed_view), + &signer_public, + voting_power, + &mut self.sortition_scheme, + ) { + Ok(true) => {} + _ => { + cwarn!(ENGINE, "Priority message verification failed"); + return None + } + } + } + if self.votes.is_old_or_known(&message) { cdebug!(ENGINE, "Proposal is already known"); return None } + self.votes.collect_priority( + SortitionRound { + height: number, + view: proposed_view, + }, + priority_info.clone(), + ); + + if let Err(double) = self.votes.collect(message) { + cerror!(ENGINE, "Double Vote found {:?}", double); + self.report_double_vote(&double); + return None + } + if number == self.height as u64 && proposed_view == self.view { // The proposer re-proposed its locked proposal. // If we already imported the proposal, we should set `proposal` here. @@ -1821,23 +1855,22 @@ impl Worker { proposed_view, author_view ); - self.proposal = Proposal::new_imported(header_view.hash()); - } else { - self.proposal = Proposal::new_received(header_view.hash(), bytes.clone(), signature); + } else if Some(priority_info.clone()) + >= self.votes.get_highest_priority_info(self.current_sortition_round()) + { + cdebug!( + ENGINE, + "Received a proposal with the priority {}. Replace the highest proposal", + priority_info.priority() + ); } self.broadcast_state( VoteStep::new(self.height, self.view, self.step.to_step()), - self.proposal.block_hash(), + self.votes.get_highest_proposal_summary(self.current_sortition_round()), self.last_two_thirds_majority.view(), self.votes_received.borrow_anyway(), ); } - - if let Err(double) = self.votes.collect(message) { - cerror!(ENGINE, "Double Vote found {:?}", double); - self.report_double_vote(&double); - return None - } } Some(c) @@ -1847,7 +1880,7 @@ impl Worker { &self, token: &NodeId, peer_vote_step: VoteStep, - peer_proposal: Option<BlockHash>, + peer_proposal: Option<ProposalSummary>, peer_lock_view: Option<View>, peer_known_votes: BitSet, result: crossbeam::Sender<Bytes>, @@ -1884,9 +1917,12 @@ impl Worker { || self.view < peer_vote_step.view || self.height < peer_vote_step.height; - let need_proposal = self.need_proposal(); - if need_proposal && peer_has_proposal { - self.send_request_proposal(token, self.height, self.view, &result); + let is_not_commit_step = !self.step.is_commit(); + let peer_has_higher = + self.votes.get_highest_priority(peer_vote_step.into()) < peer_proposal.map(|summary| summary.priority()); + + if is_not_commit_step && peer_has_proposal && peer_has_higher { + self.send_request_proposal(token, self.current_sortition_round(), &result); } let current_step = current_vote_step.step; @@ -1959,28 +1995,22 @@ impl Worker { fn on_request_proposal_message( &self, token: &NodeId, - request_height: Height, - request_view: View, + requested_round: SortitionRound, result: crossbeam::Sender<Bytes>, ) { - if request_height > self.height { - return - } - - if request_height == self.height && request_view > self.view { - return - } - - if let Some((signature, _signer_index, block)) = self.first_proposal_at(request_height, request_view) { - ctrace!(ENGINE, "Send proposal {}-{} to {:?}", request_height, request_view, token); - self.send_proposal_block(signature, request_view, block, result); + if requested_round > self.current_sortition_round() { return } - if request_height == self.height && request_view == self.view { - if let Proposal::ProposalReceived(_hash, block, signature) = &self.proposal { - self.send_proposal_block(*signature, request_view, block.clone(), result); - } + if let Some((signature, highest_priority_info, block)) = self.highest_proposal_at(requested_round) { + ctrace!( + ENGINE, + "Send proposal of priority {:?} in a round {:?} to {:?}", + highest_priority_info.priority(), + requested_round, + token + ); + self.send_proposal_block(signature, highest_priority_info, requested_round.view, block, result); } } @@ -2177,8 +2207,6 @@ impl Worker { } } - // Since we don't have proposal vote, set proposal = None - self.proposal = Proposal::None; self.view = commit_view; self.votes_received = MutTrigger::new(vote_bitset); self.last_two_thirds_majority = TwoThirdsMajority::Empty; diff --git a/core/src/consensus/validator_set/dynamic_validator.rs b/core/src/consensus/validator_set/dynamic_validator.rs index 5b8efcb0ea..857247e314 100644 --- a/core/src/consensus/validator_set/dynamic_validator.rs +++ b/core/src/consensus/validator_set/dynamic_validator.rs @@ -67,16 +67,6 @@ impl DynamicValidator { fn validators_pubkey(&self, parent: BlockHash) -> Option<Vec<Public>> { self.validators(parent).map(|validators| validators.into_iter().map(|val| *val.pubkey()).collect()) } - - pub fn proposer_index(&self, parent: BlockHash, prev_proposer_index: usize, proposed_view: usize) -> usize { - if let Some(validators) = self.validators(parent) { - let num_validators = validators.len(); - proposed_view % num_validators - } else { - let num_validators = self.initial_list.count(&parent); - (prev_proposer_index + proposed_view + 1) % num_validators - } - } } impl ValidatorSet for DynamicValidator { diff --git a/test/src/e2e.dynval/1/dv.changeParams.test.ts b/test/src/e2e.dynval/1/dv.changeParams.test.ts index 82ad2c8510..d815fd77b4 100644 --- a/test/src/e2e.dynval/1/dv.changeParams.test.ts +++ b/test/src/e2e.dynval/1/dv.changeParams.test.ts @@ -192,10 +192,10 @@ describe("Change commonParams that doesn't affects validator set", function() { it("should be applied after a term seconds", async function() { const initialTermSeconds = initialParams.termSeconds; const newTermSeconds = 5; - const margin = 1.3; + const margin = 1.5; this.slow((initialTermSeconds + newTermSeconds) * 1000 * margin); - this.timeout((initialTermSeconds + newTermSeconds) * 1000 * 2); + this.timeout((initialTermSeconds + newTermSeconds) * 1000 * 2.5); const term1Metadata = (await stake.getTermMetadata(nodes[0].sdk))!; { @@ -247,8 +247,8 @@ describe("Change commonParams that doesn't affects validator set", function() { it("Change minimum fee of pay transaction", async function() { const checkingNode = nodes[0]; - this.slow(4_000); - this.timeout(6_000); + this.slow(6_000); + this.timeout(12_000); const changeTxHash = await changeParams(checkingNode, 1, { ...initialParams, @@ -284,7 +284,7 @@ describe("Change commonParams that doesn't affects validator set", function() { it("Should apply larger metadata limit after increment", async function() { this.slow(6_000); - this.timeout(9_000); + this.timeout(12_000); const alice = validators[0]; const checkingNode = nodes[0]; @@ -321,7 +321,7 @@ describe("Change commonParams that doesn't affects validator set", function() { it("Should apply smaller metadata limit after decrement", async function() { this.slow(6_000); - this.timeout(9_000); + this.timeout(12_000); const alice = validators[0]; const checkingNode = nodes[0]; diff --git a/test/src/e2e.dynval/1/dv.m-m'.test.ts b/test/src/e2e.dynval/1/dv.m-m'.test.ts index 695d6d2f09..02739049a6 100644 --- a/test/src/e2e.dynval/1/dv.m-m'.test.ts +++ b/test/src/e2e.dynval/1/dv.m-m'.test.ts @@ -97,6 +97,13 @@ describe("Dynamic Validator M -> M' (Changed the subset, M, M’ = maximum numbe describe("1. Jail one of the validator", async function() { const { nodes } = withNodes(this, { ...nodeParams, + overrideParams: { + maxNumOfValidators, + delegationThreshold: 1000, + minDeposit: 10000, + custodyPeriod: 10, + releasePeriod: 30 + }, onBeforeEnable: async bootstrappingNodes => { await bootstrappingNodes[alice].clean(); // alice will be jailed! } diff --git a/test/src/e2e.dynval/1/dv.n'.test.ts b/test/src/e2e.dynval/1/dv.n'.test.ts index 2c2518d3c6..fa0ac0ea60 100644 --- a/test/src/e2e.dynval/1/dv.n'.test.ts +++ b/test/src/e2e.dynval/1/dv.n'.test.ts @@ -52,6 +52,10 @@ describe("Dynamic Validator N -> N'", function() { const betty = validators[4]; const { nodes } = withNodes(this, { promiseExpect, + overrideParams: { + custodyPeriod: 10, + releasePeriod: 30 + }, validators: [ { signer: validators[0], delegation: 4200, deposit: 100000 }, { signer: validators[1], delegation: 4100, deposit: 100000 }, @@ -112,6 +116,10 @@ describe("Dynamic Validator N -> N'", function() { const betty = validators[4]; const { nodes } = withNodes(this, { promiseExpect, + overrideParams: { + custodyPeriod: 10, + releasePeriod: 30 + }, validators: [ { signer: validators[0], delegation: 4200, deposit: 100000 }, { signer: validators[1], delegation: 4100, deposit: 100000 }, diff --git a/test/src/e2e.dynval/1/dv.n-1.test.ts b/test/src/e2e.dynval/1/dv.n-1.test.ts index 2ac409c63f..f85eb13796 100644 --- a/test/src/e2e.dynval/1/dv.n-1.test.ts +++ b/test/src/e2e.dynval/1/dv.n-1.test.ts @@ -71,6 +71,10 @@ describe("Dynamic Validator N -> N-1", function() { describe("A node is imprisoned to jail", async function() { const { nodes } = withNodes(this, { promiseExpect, + overrideParams: { + custodyPeriod: 10, + releasePeriod: 30 + }, validators: allDynValidators.map((signer, index) => ({ signer, delegation: 5_000, diff --git a/test/src/e2e.dynval/2/dv.shutdown.test.ts b/test/src/e2e.dynval/2/dv.shutdown.test.ts index 8717de37f0..cad8d3e7e7 100644 --- a/test/src/e2e.dynval/2/dv.shutdown.test.ts +++ b/test/src/e2e.dynval/2/dv.shutdown.test.ts @@ -52,7 +52,9 @@ describe("Shutdown test", function() { overrideParams: { minNumOfValidators: 4, maxNumOfValidators: 8, - delegationThreshold: 1 + delegationThreshold: 1, + custodyPeriod: 10, + releasePeriod: 30 }, validators: [ // Observer: no self-nomination, no-deposit @@ -149,7 +151,7 @@ describe("Shutdown test", function() { .and.to.include.members(getAlphaBetas().addrs); } - await termWaiter.waitForTermPeriods(1, 0.5); + await termWaiter.waitForTermPeriods(1, 2); // Revival await Promise.all(getAlphaBetas().nodes.map(node => node.start())); await fullyConnect(nodes, promiseExpect); @@ -196,7 +198,9 @@ describe("Shutdown test", function() { overrideParams: { minNumOfValidators: 3, maxNumOfValidators: 3, - delegationThreshold: 1 + delegationThreshold: 1, + custodyPeriod: 10, + releasePeriod: 30 }, validators: [ // Observer: no self-nomination, no deposit @@ -239,7 +243,7 @@ describe("Shutdown test", function() { .and.to.include.members(getValidators().addrs); } - await termWaiter.waitForTermPeriods(2, 0.5); + await termWaiter.waitForTermPeriods(2, 2); // Revival await Promise.all(getValidators().nodes.map(node => node.start())); await fullyConnect(nodes, promiseExpect); diff --git a/test/src/e2e.dynval/setup.ts b/test/src/e2e.dynval/setup.ts index 493f6ef602..60242269bd 100644 --- a/test/src/e2e.dynval/setup.ts +++ b/test/src/e2e.dynval/setup.ts @@ -354,8 +354,8 @@ export const defaultParams = { termSeconds: 15, nominationExpiration: 10, - custodyPeriod: 10, - releasePeriod: 30, + custodyPeriod: 0, + releasePeriod: 0, maxNumOfValidators: 5, minNumOfValidators: 3, delegationThreshold: 1000, @@ -465,7 +465,7 @@ export function setTermTestTimeout( ): TermWaiter { const { terms, params: { termSeconds } = defaultParams } = options; const slowMargin = 0.5; - const timeoutMargin = 2.0; + const timeoutMargin = 4.0; context.slow(termSeconds * (terms + slowMargin) * 1000); context.timeout(termSeconds * (terms + timeoutMargin) * 1000); function termPeriodsToTime(termPeriods: number, margin: number): number { @@ -485,7 +485,7 @@ export function setTermTestTimeout( ) { await node.waitForTermChange( waiterParams.target, - termPeriodsToTime(waiterParams.termPeriods, 0.5) + termPeriodsToTime(waiterParams.termPeriods, 2) ); } }; diff --git a/test/src/e2e.long/mempoolMinfee.test.ts b/test/src/e2e.long/mempoolMinfee.test.ts index ef6a0c5a49..9f42e57758 100644 --- a/test/src/e2e.long/mempoolMinfee.test.ts +++ b/test/src/e2e.long/mempoolMinfee.test.ts @@ -157,7 +157,7 @@ describe("MemPoolMinFees", async function() { recipient: validator0Address }); - await Promise.all(nodeArray.map(node => node.waitBlockNumber(4))); + await Promise.all(nodeArray.map(node => node.waitBlockNumber(3))); const expectedTrues = await Promise.all( nodeArray.map(node => node.sdk.rpc.chain.containsTransaction( diff --git a/test/src/e2e.long/staking.test.ts b/test/src/e2e.long/staking.test.ts index 2d5510f2e4..939c1c4a0f 100644 --- a/test/src/e2e.long/staking.test.ts +++ b/test/src/e2e.long/staking.test.ts @@ -37,12 +37,12 @@ import CodeChain from "../helper/spawn"; const RLP = require("rlp"); describe("Staking", function() { - this.timeout(60_000); + this.timeout(80_000); const promiseExpect = new PromiseExpect(); let nodes: CodeChain[]; beforeEach(async function() { - this.timeout(60_000); + this.timeout(80_000); const validatorAddresses = [ validator0Address, diff --git a/test/src/e2e/changeParams.test.ts b/test/src/e2e/changeParams.test.ts index af676236da..a9d856367b 100644 --- a/test/src/e2e/changeParams.test.ts +++ b/test/src/e2e/changeParams.test.ts @@ -819,7 +819,7 @@ describe("ChangeParams", function() { ).rejectedWith(/nomination expiration/); }); - it("custody period cannot be zero", async function() { + it.skip("custody period cannot be zero", async function() { const newParams = [ 0x20, // maxExtraDataSize 0x0400, // maxAssetSchemeMetadataSize @@ -882,7 +882,7 @@ describe("ChangeParams", function() { ).rejectedWith(/custody period/); }); - it("release period cannot be zero", async function() { + it.skip("release period cannot be zero", async function() { const newParams = [ 0x20, // maxExtraDataSize 0x0400, // maxAssetSchemeMetadataSize @@ -945,7 +945,7 @@ describe("ChangeParams", function() { ).rejectedWith(/release period/); }); - it("A release period cannot be equal to a custody period", async function() { + it.skip("A release period cannot be equal to a custody period", async function() { const newParams = [ 0x20, // maxExtraDataSize 0x0400, // maxAssetSchemeMetadataSize diff --git a/test/src/helper/mock/index.ts b/test/src/helper/mock/index.ts index ba9b316fc3..68d258ae88 100644 --- a/test/src/helper/mock/index.ts +++ b/test/src/helper/mock/index.ts @@ -611,6 +611,7 @@ export class Mock { new TendermintMessage({ type: "proposalblock", view: message.view, + priorityInfo: message.priorityInfo, message: RLP.encode([newHeader, block[1]]), signature: newSignature.r + newSignature.s }) @@ -623,8 +624,10 @@ export class Mock { this.sendTendermintMessage( new TendermintMessage({ type: "requestproposal", - height: message.voteStep.height, - view: message.voteStep.view + round: { + height: message.voteStep.height, + view: message.voteStep.view + } }) ); }, 200); diff --git a/test/src/helper/mock/tendermintMessage.ts b/test/src/helper/mock/tendermintMessage.ts index 07d7c8e86b..cadb017d4d 100644 --- a/test/src/helper/mock/tendermintMessage.ts +++ b/test/src/helper/mock/tendermintMessage.ts @@ -44,6 +44,19 @@ interface VoteStep { step: Step; } +interface SortitionRound { + height: number; + view: number; +} + +interface PriorityInfo { + signerIdx: number; + priority: H256; + subUserIdx: number; + numberOfElections: number; + vrfProof: Buffer; +} + export interface ConsensusMessage { type: "consensusmessage"; messages: Array<{ @@ -59,14 +72,20 @@ export interface ConsensusMessage { export interface ProposalBlock { type: "proposalblock"; signature: string; + priorityInfo: PriorityInfo; view: number; message: Buffer; } +interface ProposalSummary { + priorityInfo: PriorityInfo; + blockHash: H256; +} + export interface StepState { type: "stepstate"; voteStep: VoteStep; - proposal: H256 | null; + proposal: ProposalSummary | null; lockView: number | null; knownVotes: Buffer; } @@ -79,8 +98,7 @@ export interface RequestMessage { export interface RequestProposal { type: "requestproposal"; - height: number; - view: number; + round: SortitionRound; } type MessageBody = @@ -124,8 +142,15 @@ export class TendermintMessage { message = { type: "proposalblock", signature: decoded[1].toString("hex"), - view: readUIntRLP(decoded[2]), - message: uncompressSync(decoded[3]) + priorityInfo: { + signerIdx: readUIntRLP(decoded[2][0]), + priority: new H256(decoded[2][1].toString("hex")), + subUserIdx: readUIntRLP(decoded[2][2]), + numberOfElections: readUIntRLP(decoded[2][3]), + vrfProof: decoded[2][4] + }, + view: readUIntRLP(decoded[3]), + message: uncompressSync(decoded[4]) }; break; } @@ -137,10 +162,20 @@ export class TendermintMessage { view: readUIntRLP(decoded[1][1]), step: readUIntRLP(decoded[1][2]) as Step }, - proposal: readOptionalRlp( - decoded[2], - buffer => new H256(buffer.toString("hex")) - ), + proposal: readOptionalRlp(decoded[2], (buffer: any) => { + return { + priorityInfo: { + signerIdx: readUIntRLP(buffer[0][0]), + priority: new H256( + buffer[0][1].toString("hex") + ), + subUserIdx: readUIntRLP(buffer[0][2]), + numberOfElections: readUIntRLP(buffer[0][3]), + vrfProof: buffer[0][4] + }, + blockHash: new H256(buffer[1].toString("hex")) + }; + }), lockView: readOptionalRlp(decoded[3], readUIntRLP), knownVotes: decoded[4] }; @@ -161,8 +196,10 @@ export class TendermintMessage { case MessageType.MESSAGE_ID_REQUEST_PROPOSAL: { message = { type: "requestproposal", - height: readUIntRLP(decoded[1]), - view: readUIntRLP(decoded[2]) + round: { + height: readUIntRLP(decoded[1][0]), + view: readUIntRLP(decoded[1][1]) + } }; break; } @@ -210,6 +247,19 @@ export class TendermintMessage { return [ MessageType.MESSAGE_ID_PROPOSAL_BLOCK, Buffer.from(this.body.signature, "hex"), + [ + new U64( + this.body.priorityInfo.signerIdx + ).toEncodeObject(), + this.body.priorityInfo.priority.toEncodeObject(), + new U64( + this.body.priorityInfo.subUserIdx + ).toEncodeObject(), + new U64( + this.body.priorityInfo.numberOfElections + ).toEncodeObject(), + this.body.priorityInfo.vrfProof + ], new U64(this.body.view).toEncodeObject(), compressSync(this.body.message) ]; @@ -224,7 +274,22 @@ export class TendermintMessage { ], this.body.proposal == null ? [] - : [this.body.proposal.toEncodeObject()], + : [ + [ + new U64( + this.body.proposal.priorityInfo.signerIdx + ).toEncodeObject(), + this.body.proposal.priorityInfo.priority.toEncodeObject(), + new U64( + this.body.proposal.priorityInfo.subUserIdx + ).toEncodeObject(), + new U64( + this.body.proposal.priorityInfo.numberOfElections + ).toEncodeObject(), + this.body.proposal.priorityInfo.vrfProof + ], + this.body.proposal.blockHash.toEncodeObject() + ], this.body.lockView == null ? [] : [new U64(this.body.lockView).toEncodeObject()], @@ -245,8 +310,10 @@ export class TendermintMessage { case "requestproposal": { return [ MessageType.MESSAGE_ID_REQUEST_PROPOSAL, - new U64(this.body.height).toEncodeObject(), - new U64(this.body.view).toEncodeObject() + [ + new U64(this.body.round.height).toEncodeObject(), + new U64(this.body.round.view).toEncodeObject() + ] ]; } } diff --git a/test/src/scheme/tendermint-dynval.json b/test/src/scheme/tendermint-dynval.json index ccadaa520d..0eefae40e4 100644 --- a/test/src/scheme/tendermint-dynval.json +++ b/test/src/scheme/tendermint-dynval.json @@ -9,7 +9,7 @@ "0xdb3a858d2bafd2cb5382fcf366b847a86b58b42ce1fc29fec0cb0315af881a2ad495045adbdbc86ef7a777b541c4e62a0747f25ff6068a5ec3a052c690c4ff8a", "0x42829b18de338aa3abf5e6d80cd511121bf9d34be9a135bbace32a3226479e7f3bb6af76c11dcc724a1666a22910d756b075d54d8fdd97be11efd7a0ac3bb222" ], - "timeoutPropose": 2000, + "timeoutPropose": 1500, "timeoutProposeDelta": 100, "timeoutPrevote": 1000, "timeoutPrevoteDelta": 100, diff --git a/test/src/scheme/tendermint-int.json b/test/src/scheme/tendermint-int.json index 92a0828bcb..3b45ac4907 100644 --- a/test/src/scheme/tendermint-int.json +++ b/test/src/scheme/tendermint-int.json @@ -9,7 +9,7 @@ "0xdb3a858d2bafd2cb5382fcf366b847a86b58b42ce1fc29fec0cb0315af881a2ad495045adbdbc86ef7a777b541c4e62a0747f25ff6068a5ec3a052c690c4ff8a", "0x42829b18de338aa3abf5e6d80cd511121bf9d34be9a135bbace32a3226479e7f3bb6af76c11dcc724a1666a22910d756b075d54d8fdd97be11efd7a0ac3bb222" ], - "timeoutPropose": 1000, + "timeoutPropose": 1500, "timeoutProposeDelta": 1000, "timeoutPrevote": 1000, "timeoutPrevoteDelta": 1000, diff --git a/types/src/common_params.rs b/types/src/common_params.rs index e59c1eab9a..8d289dc425 100644 --- a/types/src/common_params.rs +++ b/types/src/common_params.rs @@ -181,12 +181,6 @@ impl CommonParams { if self.nomination_expiration == 0 { return Err("You should set the nomination expiration".to_string()) } - if self.custody_period == 0 { - return Err("You should set the custody period".to_string()) - } - if self.release_period == 0 { - return Err("You should set the release period".to_string()) - } if self.max_num_of_validators == 0 { return Err("You should set the maximum number of validators".to_string()) } @@ -205,7 +199,7 @@ impl CommonParams { self.min_num_of_validators, self.max_num_of_validators )) } - if self.custody_period >= self.release_period { + if self.custody_period > self.release_period { return Err(format!( "The release period({}) should be longer than the custody period({})", self.release_period, self.custody_period