From 92d767d0232548d90f541196d885ad0fb51ba6c3 Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Mon, 13 Jan 2020 15:18:40 +0900 Subject: [PATCH 01/13] Modify functionality of vote_collector Previously, vote_collector has been collected only consensusMessage information. Now, vote_collect also collects priority information for propse steps. That's because priority data should be combined to the corresponding consensusMessage data when propagates to peers. --- .../consensus/tendermint/vote_collector.rs | 117 +++++++++++++++--- 1 file changed, 101 insertions(+), 16 deletions(-) diff --git a/core/src/consensus/tendermint/vote_collector.rs b/core/src/consensus/tendermint/vote_collector.rs index f96216fa66..de90ed3019 100644 --- a/core/src/consensus/tendermint/vote_collector.rs +++ b/core/src/consensus/tendermint/vote_collector.rs @@ -14,25 +14,89 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::iter::Iterator; use ckey::SchnorrSignature; use ctypes::BlockHash; use rlp::{Encodable, RlpStream}; +use super::super::PriorityInfo; use super::stake::Action; -use super::{ConsensusMessage, VoteStep}; +use super::{ConsensusMessage, SortitionRound, Step, VoteStep}; use crate::consensus::BitSet; /// Storing all Proposals, Prevotes and Precommits. +/// Invariant: Proposal step links to StepCollector::PP variant +/// and Other steps link to StepCollector::PVPC variant #[derive(Debug)] pub struct VoteCollector { votes: BTreeMap, } +#[derive(Debug)] +enum StepCollector { + PP(PpCollector), + PVPC(PvPcCollector), +} + +impl StepCollector { + fn new_pp() -> Self { + StepCollector::PP(Default::default()) + } + + fn new_pvpc() -> Self { + StepCollector::PVPC(Default::default()) + } + + fn insert_message(&mut self, message: ConsensusMessage) -> Result { + match self { + StepCollector::PP(pp_collector) => pp_collector.message_collector.insert(message), + StepCollector::PVPC(pv_pc_collector) => pv_pc_collector.message_collector.insert(message), + } + } + + fn insert_priority(&mut self, info: PriorityInfo) -> bool { + match self { + StepCollector::PP(pp_collector) => pp_collector.priority_collector.insert(info), + _ => panic!("Invariant violated: propose step must be linked to PpCollector"), + } + } + + fn message_collector(&self) -> &MessageCollector { + match self { + StepCollector::PP(pp_collector) => &pp_collector.message_collector, + StepCollector::PVPC(pv_pc_collector) => &pv_pc_collector.message_collector, + } + } + + fn priority_collector(&self) -> &PriorityCollector { + match self { + StepCollector::PP(pp_collector) => &pp_collector.priority_collector, + _ => panic!("Invariant violated: propose step must be linked to PpCollector"), + } + } +} + +// Struct for propose step vote and priority collecting #[derive(Debug, Default)] -struct StepCollector { +struct PpCollector { + message_collector: MessageCollector, + priority_collector: PriorityCollector, +} + +#[derive(Debug, Default)] +struct PvPcCollector { + message_collector: MessageCollector, +} + +#[derive(Debug, Default)] +struct PriorityCollector { + priorities: BTreeSet, +} + +#[derive(Debug, Default)] +struct MessageCollector { voted: HashMap, block_votes: HashMap, BTreeMap>, messages: Vec, @@ -60,7 +124,15 @@ impl Encodable for DoubleVote { } } -impl StepCollector { +impl PriorityCollector { + // true: a priority is new + // false: a priority is duplicated + fn insert(&mut self, info: PriorityInfo) -> bool { + self.priorities.insert(info) + } +} + +impl MessageCollector { /// Some(true): a message is new /// Some(false): a message is duplicated /// Err(DoubleVote): a double vote @@ -114,7 +186,7 @@ impl Default for VoteCollector { fn default() -> Self { let mut collector = BTreeMap::new(); // Insert dummy entry to fulfill invariant: "only messages newer than the oldest are inserted". - collector.insert(Default::default(), Default::default()); + collector.insert(Default::default(), StepCollector::new_pp()); VoteCollector { votes: collector, } @@ -124,12 +196,18 @@ impl Default for VoteCollector { impl VoteCollector { /// Insert vote if it is newer than the oldest one. pub fn collect(&mut self, message: ConsensusMessage) -> Result { - self.votes.entry(*message.round()).or_insert_with(Default::default).insert(message) + match message.round().step { + Step::Propose => { + self.votes.entry(*message.round()).or_insert_with(StepCollector::new_pp).insert_message(message) + } + _ => self.votes.entry(*message.round()).or_insert_with(StepCollector::new_pvpc).insert_message(message), + } } /// Checks if the message should be ignored. pub fn is_old_or_known(&self, message: &ConsensusMessage) -> bool { - let is_known = self.votes.get(&message.round()).map_or(false, |c| c.messages.contains(message)); + let is_known = + self.votes.get(&message.round()).map_or(false, |c| c.message_collector().messages.contains(message)); if is_known { cdebug!(ENGINE, "Known message: {:?}.", message); return true @@ -161,7 +239,7 @@ impl VoteCollector { ) -> (Vec, Vec) { self.votes .get(round) - .and_then(|c| c.block_votes.get(&Some(*block_hash))) + .and_then(|c| c.message_collector().block_votes.get(&Some(*block_hash))) .map(|votes| { let (indices, sigs) = votes.iter().unzip(); (sigs, indices) @@ -174,14 +252,14 @@ impl VoteCollector { pub fn round_signature(&self, round: &VoteStep, block_hash: &BlockHash) -> Option { self.votes .get(round) - .and_then(|c| c.block_votes.get(&Some(*block_hash))) + .and_then(|c| c.message_collector().block_votes.get(&Some(*block_hash))) .and_then(|votes| votes.values().next().cloned()) } /// Count votes which agree with the given message. pub fn aligned_votes(&self, message: &ConsensusMessage) -> BitSet { if let Some(votes) = self.votes.get(&message.round()) { - votes.count_block(&message.block_hash()) + votes.message_collector().count_block(&message.block_hash()) } else { Default::default() } @@ -189,7 +267,7 @@ impl VoteCollector { pub fn block_round_votes(&self, round: &VoteStep, block_hash: &Option) -> BitSet { if let Some(votes) = self.votes.get(round) { - votes.count_block(block_hash) + votes.message_collector().count_block(block_hash) } else { Default::default() } @@ -198,7 +276,7 @@ impl VoteCollector { /// Count all votes collected for a given round. pub fn round_votes(&self, vote_round: &VoteStep) -> BitSet { if let Some(votes) = self.votes.get(vote_round) { - votes.count() + votes.message_collector().count() } else { Default::default() } @@ -215,20 +293,27 @@ impl VoteCollector { let votes = self .votes .get(round) - .map(|c| c.block_votes.keys().cloned().filter_map(|x| x).collect()) + .map(|c| c.message_collector().block_votes.keys().cloned().filter_map(|x| x).collect()) .unwrap_or_else(Vec::new); votes.into_iter().any(|vote_block_hash| vote_block_hash == block_hash) } + pub fn fetch_by_idx(&self, round: &VoteStep, idx: usize) -> Option { + self.votes.get(round).and_then(|collector| collector.message_collector().fetch_by_idx(idx)) + } + pub fn get_all(&self) -> Vec { - self.votes.iter().flat_map(|(_round, collector)| collector.messages.clone()).collect() + self.votes.iter().flat_map(|(_round, collector)| collector.message_collector().messages.clone()).collect() } pub fn get_all_votes_in_round(&self, round: &VoteStep) -> Vec { - self.votes.get(round).map(|c| c.messages.clone()).unwrap_or_default() + self.votes.get(round).map(|c| c.message_collector().messages.clone()).unwrap_or_default() } pub fn get_all_votes_and_indices_in_round(&self, round: &VoteStep) -> Vec<(usize, ConsensusMessage)> { - self.votes.get(round).map(|c| c.voted.iter().map(|(k, v)| (*k, v.clone())).collect()).unwrap_or_default() + self.votes + .get(round) + .map(|c| c.message_collector().voted.iter().map(|(k, v)| (*k, v.clone())).collect()) + .unwrap_or_default() } } From 6b13d96ac050e37fe3689da274920cba8e5608b5 Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Fri, 15 Nov 2019 18:36:51 +0900 Subject: [PATCH 02/13] Introduce verify_seed to Tendermint To verify seed of two consecutive blocks, the function is newly implemented in the Tendermint worker --- core/src/consensus/mod.rs | 1 + core/src/consensus/tendermint/worker.rs | 56 +++++++++++++++++++------ 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/core/src/consensus/mod.rs b/core/src/consensus/mod.rs index c962f38a55..9141133a9a 100644 --- a/core/src/consensus/mod.rs +++ b/core/src/consensus/mod.rs @@ -207,6 +207,7 @@ pub trait ConsensusEngine: Sync + Send { } /// Phase 3 verification. Check block information against parent. Returns either a null `Ok` or a general error detailing the problem with import. + /// The verifiaction must be conducted only with the two headers' information. fn verify_block_family(&self, _header: &Header, _parent: &Header) -> Result<(), Error> { Ok(()) } diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index d13545fe33..4c81487ca4 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -47,7 +47,10 @@ use crate::block::*; use crate::client::ConsensusClient; use crate::consensus::signer::EngineSigner; use crate::consensus::validator_set::{DynamicValidator, ValidatorSet}; -use crate::consensus::{sortition::VRFSeed, EngineError, Seal}; +use crate::consensus::{ + sortition::seed::{SeedInfo, VRFSeed}, + EngineError, Seal, VRFSortition, +}; use crate::encoded; use crate::error::{BlockError, Error}; use crate::transaction::{SignedTransaction, UnverifiedTransaction}; @@ -94,6 +97,8 @@ struct Worker { validators: Arc, /// Channel to the network extension, must be set later. extension: EventSender, + // VRF sortition scheme, + sortition_scheme: VRFSortition, time_gap_params: TimeGapParams, timeout_token_nonce: usize, vote_regression_checker: VoteRegressionChecker, @@ -389,12 +394,15 @@ impl Worker { .hash() } - fn prev_vrf_seed(&self) -> VRFSeed { - let parent_header = - self.prev_block_header_of_height(self.height).expect("Height is increased when previous block is imported"); - let parent_seal = parent_header.seal(); - let seal_view = TendermintSealView::new(&parent_seal); - seal_view.vrf_seed().unwrap() + fn fetch_vrf_seed_info(&self, block_hash: BlockHash) -> Option { + let block_header = self.client().block_header(&block_hash.into())?; + let block_seal = block_header.seal(); + Some(TendermintSealView::new(&block_seal).vrf_seed_info().expect("Seal fields was verified")) + } + + fn prev_vrf_seed_of_height(&self, height: Height) -> Option { + self.prev_block_header_of_height(height) + .and_then(|parent_header| self.fetch_vrf_seed_info(parent_header.hash()).map(|seed_info| *seed_info.seed())) } /// Get the index of the proposer of a block to check the new proposer is valid. @@ -1191,7 +1199,33 @@ impl Worker { Ok(()) } - fn verify_block_external(&self, header: &Header) -> Result<(), Error> { + fn verify_seed(&mut self, header: &Header, parent: &Header) -> Result<(), Error> { + let current_seal_view = TendermintSealView::new(&header.seal()); + let current_seed_signer_idx = current_seal_view.vrf_seed_info().expect("Seal field verified").signer_idx(); + let current_signer_public = self.validators.get(&parent.hash(), current_seed_signer_idx); + + let parent_seal_view = TendermintSealView::new(&parent.seal()); + + let parent_seed_info = parent_seal_view.vrf_seed_info().map_err(|_| BlockError::InvalidSeal)?; + let current_seed_info = current_seal_view.vrf_seed_info().map_err(|_| BlockError::InvalidSeal)?; + + match current_seed_info.verify( + header.number(), + current_seal_view.author_view().map_err(|_| BlockError::InvalidSeal)?, + parent_seed_info.seed(), + ¤t_signer_public, + &mut self.sortition_scheme.vrf_inst, + ) { + Ok(true) => Ok(()), + _ => Err(BlockError::InvalidSeal.into()), + } + } + + fn verify_block_external(&mut self, header: &Header) -> Result<(), Error> { + let parent_header = + self.client().block_header(&(*header.parent_hash()).into()).expect("The parent block must exist"); + self.verify_seed(header, &parent_header.decode())?; + let height = header.number() as usize; let author_view = TendermintSealView::new(header.seal()).author_view()?; ctrace!(ENGINE, "Verify external at {}-{}, {:?}", height, author_view, header); @@ -1229,11 +1263,7 @@ impl Worker { }; let mut voted_validators = BitSet::new(); - let grand_parent_hash = self - .client() - .block_header(&(*header.parent_hash()).into()) - .expect("The parent block must exist") - .parent_hash(); + let grand_parent_hash = parent_header.parent_hash(); for (bitset_index, signature) in seal_view.signatures()? { let public = self.validators.get(&grand_parent_hash, bitset_index); if !verify_schnorr(&public, &signature, &precommit_vote_on.hash())? { From 17f983b68d2141d238beacff10e2aab372484d74 Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Fri, 17 Jan 2020 21:46:57 +0900 Subject: [PATCH 03/13] Do not wait when empty block is proposed We don't need to wait additional times because in case an empty block is generated. Because the nodes should wait fixed timeout in the next proposal step. --- core/src/consensus/tendermint/mod.rs | 3 -- core/src/consensus/tendermint/network.rs | 25 +--------- core/src/consensus/tendermint/types.rs | 21 --------- core/src/consensus/tendermint/worker.rs | 60 ++---------------------- 4 files changed, 6 insertions(+), 103 deletions(-) diff --git a/core/src/consensus/tendermint/mod.rs b/core/src/consensus/tendermint/mod.rs index 10d55cc2b3..38ba99492b 100644 --- a/core/src/consensus/tendermint/mod.rs +++ b/core/src/consensus/tendermint/mod.rs @@ -45,11 +45,8 @@ use crate::ChainNotify; /// Timer token representing the consensus step timeouts. const ENGINE_TIMEOUT_TOKEN_NONCE_BASE: TimerToken = 23; -/// Timer token for empty proposal blocks. -const ENGINE_TIMEOUT_EMPTY_PROPOSAL: TimerToken = 22; /// Timer token for broadcasting step state. const ENGINE_TIMEOUT_BROADCAST_STEP_STATE: TimerToken = 21; - /// Unit: second const ENGINE_TIMEOUT_BROADCAT_STEP_STATE_INTERVAL: u64 = 1; diff --git a/core/src/consensus/tendermint/network.rs b/core/src/consensus/tendermint/network.rs index 84e942a021..5b83806b81 100644 --- a/core/src/consensus/tendermint/network.rs +++ b/core/src/consensus/tendermint/network.rs @@ -38,8 +38,7 @@ use super::worker; use crate::consensus::EngineError; use super::{ - ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_BROADCAT_STEP_STATE_INTERVAL, ENGINE_TIMEOUT_EMPTY_PROPOSAL, - ENGINE_TIMEOUT_TOKEN_NONCE_BASE, + ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_BROADCAT_STEP_STATE_INTERVAL, ENGINE_TIMEOUT_TOKEN_NONCE_BASE, }; pub struct TendermintExtension { @@ -204,19 +203,11 @@ impl TendermintExtension { } fn set_timer_step(&self, step: Step, view: View, expired_token_nonce: TimerToken) { - self.api.clear_timer(ENGINE_TIMEOUT_EMPTY_PROPOSAL).expect("Timer clear succeeds"); self.api.clear_timer(expired_token_nonce).expect("Timer clear succeeds"); self.api .set_timer_once(expired_token_nonce + 1, self.timeouts.timeout(step, view)) .expect("Timer set succeeds"); } - - fn set_timer_empty_proposal(&self, view: View) { - self.api.clear_timer(ENGINE_TIMEOUT_EMPTY_PROPOSAL).expect("Timer clear succeeds"); - self.api - .set_timer_once(ENGINE_TIMEOUT_EMPTY_PROPOSAL, self.timeouts.timeout(Step::Propose, view) / 2) - .expect("Timer set succeeds"); - } } impl NetworkExtension for TendermintExtension { @@ -403,11 +394,7 @@ impl NetworkExtension for TendermintExtension { } fn on_timeout(&mut self, token: TimerToken) { - debug_assert!( - token >= ENGINE_TIMEOUT_TOKEN_NONCE_BASE - || token == ENGINE_TIMEOUT_EMPTY_PROPOSAL - || token == ENGINE_TIMEOUT_BROADCAST_STEP_STATE - ); + debug_assert!(token >= ENGINE_TIMEOUT_TOKEN_NONCE_BASE || token == ENGINE_TIMEOUT_BROADCAST_STEP_STATE); self.inner.send(worker::Event::OnTimeout(token)).unwrap(); } @@ -443,11 +430,6 @@ impl NetworkExtension for TendermintExtension { view, expired_token_nonce, } => self.set_timer_step(step, view, expired_token_nonce), - Event::SetTimerEmptyProposal { - view, - } => { - self.set_timer_empty_proposal(view); - } Event::BroadcastProposalBlock { signature, view, @@ -482,9 +464,6 @@ pub enum Event { view: View, expired_token_nonce: TimerToken, }, - SetTimerEmptyProposal { - view: View, - }, BroadcastProposalBlock { signature: SchnorrSignature, view: View, diff --git a/core/src/consensus/tendermint/types.rs b/core/src/consensus/tendermint/types.rs index 3092eac591..fc12d02c25 100644 --- a/core/src/consensus/tendermint/types.rs +++ b/core/src/consensus/tendermint/types.rs @@ -38,9 +38,6 @@ pub enum TendermintState { ProposeWaitImported { block: Box, }, - ProposeWaitEmptyBlockTimer { - block: Box, - }, Prevote, Precommit, Commit { @@ -63,9 +60,6 @@ impl TendermintState { TendermintState::ProposeWaitImported { .. } => Step::Propose, - TendermintState::ProposeWaitEmptyBlockTimer { - .. - } => Step::Propose, TendermintState::Prevote => Step::Prevote, TendermintState::Precommit => Step::Precommit, TendermintState::Commit { @@ -77,15 +71,6 @@ impl TendermintState { } } - pub fn is_propose_wait_empty_block_timer(&self) -> bool { - match self { - TendermintState::ProposeWaitEmptyBlockTimer { - .. - } => true, - _ => false, - } - } - pub fn is_commit(&self) -> bool { match self { TendermintState::Commit { @@ -124,9 +109,6 @@ impl TendermintState { TendermintState::ProposeWaitImported { .. } => None, - TendermintState::ProposeWaitEmptyBlockTimer { - .. - } => None, TendermintState::Prevote => None, TendermintState::Precommit => None, } @@ -143,9 +125,6 @@ impl fmt::Debug for TendermintState { TendermintState::ProposeWaitImported { block, } => write!(f, "TendermintState::ProposeWaitImported({})", block.header().hash()), - TendermintState::ProposeWaitEmptyBlockTimer { - block, - } => write!(f, "TendermintState::ProposeWaitEmptyBlockTimer({})", block.header().hash()), TendermintState::Prevote => write!(f, "TendermintState::Prevote"), TendermintState::Precommit => write!(f, "TendermintState::Precommit"), TendermintState::Commit { diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index 4c81487ca4..335df0b071 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -39,9 +39,7 @@ use super::stake::CUSTOM_ACTION_HANDLER_ID; use super::types::{Height, Proposal, Step, TendermintSealView, TendermintState, TwoThirdsMajority, View}; use super::vote_collector::{DoubleVote, VoteCollector}; use super::vote_regression_checker::VoteRegressionChecker; -use super::{ - ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_EMPTY_PROPOSAL, ENGINE_TIMEOUT_TOKEN_NONCE_BASE, SEAL_FIELDS, -}; +use super::{ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_TOKEN_NONCE_BASE, SEAL_FIELDS}; use crate::account_provider::AccountProvider; use crate::block::*; use crate::client::ConsensusClient; @@ -999,25 +997,10 @@ impl Worker { TendermintState::ProposeWaitImported { block, } => { - if !block.transactions().is_empty() { - cinfo!(ENGINE, "Submitting proposal block {}", block.header().hash()); - self.move_to_step(TendermintState::Prevote, false); - self.broadcast_proposal_block(self.view, encoded::Block::new(block.rlp_bytes())); - } else { - ctrace!(ENGINE, "Empty proposal is generated, set timer"); - self.step = TendermintState::ProposeWaitEmptyBlockTimer { - block, - }; - self.extension - .send(network::Event::SetTimerEmptyProposal { - view: self.view, - }) - .unwrap(); - } + cinfo!(ENGINE, "Submitting proposal block {}", block.header().hash()); + self.move_to_step(TendermintState::Prevote, false); + self.broadcast_proposal_block(self.view, encoded::Block::new(block.rlp_bytes())); } - TendermintState::ProposeWaitEmptyBlockTimer { - .. - } => unreachable!(), _ => {} }; } else if current_height < height { @@ -1287,35 +1270,6 @@ impl Worker { } fn on_timeout(&mut self, token: usize) { - // Timeout from empty block generation - if token == ENGINE_TIMEOUT_EMPTY_PROPOSAL { - let block = if self.step.is_propose_wait_empty_block_timer() { - let previous = mem::replace(&mut self.step, TendermintState::Propose); - match previous { - TendermintState::ProposeWaitEmptyBlockTimer { - block, - } => block, - _ => unreachable!(), - } - } else { - cwarn!(ENGINE, "Empty proposal timer was not cleared."); - return - }; - - // When self.height != block.header().number() && "propose timeout" is already called, - // the state is stuck and can't move to Prevote. We should change the step to Prevote. - self.move_to_step(TendermintState::Prevote, false); - if self.height == block.header().number() { - cdebug!(ENGINE, "Empty proposal timer is finished, go to the prevote step and broadcast the block"); - cinfo!(ENGINE, "Submitting proposal block {}", block.header().hash()); - self.broadcast_proposal_block(self.view, encoded::Block::new(block.rlp_bytes())); - } else { - cwarn!(ENGINE, "Empty proposal timer was for previous height."); - } - - return - } - if token == ENGINE_TIMEOUT_BROADCAST_STEP_STATE { if let Some(votes_received) = self.votes_received.borrow_if_mutated() { self.broadcast_state( @@ -1350,12 +1304,6 @@ impl Worker { cwarn!(ENGINE, "Propose timed out but still waiting for the block imported"); return } - TendermintState::ProposeWaitEmptyBlockTimer { - .. - } => { - cwarn!(ENGINE, "Propose timed out but still waiting for the empty block"); - return - } TendermintState::Prevote if self.has_enough_any_votes() => { cinfo!(ENGINE, "Prevote timeout."); TendermintState::Precommit From 91feaa14d5a931db35944bed3d2688c60e49ccb8 Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Fri, 17 Jan 2020 22:11:34 +0900 Subject: [PATCH 04/13] Get rid of the state variable "proposal" Proposal information is now managed only by vote_collector --- core/src/consensus/tendermint/types.rs | 40 ------- .../consensus/tendermint/vote_collector.rs | 65 ++++++++++- core/src/consensus/tendermint/worker.rs | 109 ++++++++---------- 3 files changed, 112 insertions(+), 102 deletions(-) diff --git a/core/src/consensus/tendermint/types.rs b/core/src/consensus/tendermint/types.rs index fc12d02c25..5dd701b916 100644 --- a/core/src/consensus/tendermint/types.rs +++ b/core/src/consensus/tendermint/types.rs @@ -284,43 +284,3 @@ impl TwoThirdsMajority { } } } - -#[derive(Debug, PartialEq)] -pub enum Proposal { - ProposalReceived(BlockHash, Bytes, SchnorrSignature), - ProposalImported(BlockHash), - None, -} - -impl Proposal { - pub fn new_received(hash: BlockHash, block: Bytes, signature: SchnorrSignature) -> Self { - Proposal::ProposalReceived(hash, block, signature) - } - - pub fn new_imported(hash: BlockHash) -> Self { - Proposal::ProposalImported(hash) - } - - pub fn block_hash(&self) -> Option { - match self { - Proposal::ProposalReceived(hash, ..) => Some(*hash), - Proposal::ProposalImported(hash) => Some(*hash), - Proposal::None => None, - } - } - - pub fn imported_block_hash(&self) -> Option { - match self { - Proposal::ProposalReceived(..) => None, - Proposal::ProposalImported(hash) => Some(*hash), - Proposal::None => None, - } - } - - pub fn is_none(&self) -> bool { - match self { - Proposal::None => true, - _ => false, - } - } -} diff --git a/core/src/consensus/tendermint/vote_collector.rs b/core/src/consensus/tendermint/vote_collector.rs index de90ed3019..519639d825 100644 --- a/core/src/consensus/tendermint/vote_collector.rs +++ b/core/src/consensus/tendermint/vote_collector.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::{BTreeMap, BTreeSet, HashMap}; -use std::iter::Iterator; +use std::collections::{btree_set::Iter, BTreeMap, BTreeSet, HashMap}; +use std::iter::{Iterator, Rev}; use ckey::SchnorrSignature; use ctypes::BlockHash; @@ -23,7 +23,7 @@ use rlp::{Encodable, RlpStream}; use super::super::PriorityInfo; use super::stake::Action; -use super::{ConsensusMessage, SortitionRound, Step, VoteStep}; +use super::{ConsensusMessage, ProposalSummary, SortitionRound, Step, VoteStep}; use crate::consensus::BitSet; /// Storing all Proposals, Prevotes and Precommits. @@ -130,6 +130,14 @@ impl PriorityCollector { fn insert(&mut self, info: PriorityInfo) -> bool { self.priorities.insert(info) } + + fn get_highest(&self) -> Option { + self.priorities.iter().rev().next().cloned() + } + + fn iter_from_highest(&self) -> Rev> { + self.priorities.iter().rev() + } } impl MessageCollector { @@ -317,3 +325,54 @@ impl VoteCollector { .unwrap_or_default() } } + +impl VoteCollector { + pub fn collect_priority(&mut self, sortition_round: SortitionRound, info: PriorityInfo) -> bool { + self.votes.entry(sortition_round.into()).or_insert_with(StepCollector::new_pp).insert_priority(info) + } + + pub fn get_highest_priority_info(&self, sortition_round: SortitionRound) -> Option { + self.votes + .get(&sortition_round.into()) + .and_then(|step_collector| step_collector.priority_collector().get_highest()) + } + + pub fn get_highest_proposal_hash(&self, sortition_round: SortitionRound) -> Option { + self.votes.get(&sortition_round.into()).and_then(|step_collector| { + let highest_priority_idx = + step_collector.priority_collector().get_highest().map(|priority_info| priority_info.signer_idx())?; + step_collector + .message_collector() + .fetch_by_idx(highest_priority_idx) + .and_then(|priority_message| priority_message.block_hash()) + }) + } + + pub fn get_highest_proposal_summary(&self, sortition_round: SortitionRound) -> Option { + let block_hash = self.get_highest_proposal_hash(sortition_round)?; + let priority_info = self.get_highest_priority_info(sortition_round)?; + Some(ProposalSummary { + priority_info, + block_hash, + }) + } + + pub fn block_hashes_from_highest(&self, sortition_round: SortitionRound) -> Vec { + match self.votes.get(&sortition_round.into()) { + Some(step_collector) => { + let message_collector = step_collector.message_collector(); + let priority_iter_from_highest = step_collector.priority_collector().iter_from_highest(); + priority_iter_from_highest + .map(|priority_info| { + message_collector + .fetch_by_idx(priority_info.signer_idx()) + .expect("Signer index was verified") + .block_hash() + .expect("Proposal vote always have BlockHash") + }) + .collect() + } + None => vec![], + } + } +} diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index 335df0b071..2bb7f9aa99 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -36,7 +36,7 @@ use super::message::*; use super::network; use super::params::TimeGapParams; use super::stake::CUSTOM_ACTION_HANDLER_ID; -use super::types::{Height, Proposal, Step, TendermintSealView, TendermintState, TwoThirdsMajority, View}; +use super::types::{Height, Step, TendermintSealView, TendermintState, TwoThirdsMajority, View}; use super::vote_collector::{DoubleVote, VoteCollector}; use super::vote_regression_checker::VoteRegressionChecker; use super::{ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_TOKEN_NONCE_BASE, SEAL_FIELDS}; @@ -52,6 +52,7 @@ use crate::consensus::{ use crate::encoded; use crate::error::{BlockError, Error}; use crate::transaction::{SignedTransaction, UnverifiedTransaction}; +use crate::types::BlockStatus; use crate::views::BlockView; use crate::BlockId; use std::cell::Cell; @@ -84,8 +85,6 @@ struct Worker { signer: EngineSigner, /// Last majority last_two_thirds_majority: TwoThirdsMajority, - /// hash of the proposed block, used for seal submission. - proposal: Proposal, /// The finalized view of the previous height's block. /// The signatures for the previous block is signed for the view below. finalized_view_of_previous_block: View, @@ -192,7 +191,6 @@ impl Worker { votes: Default::default(), signer: Default::default(), last_two_thirds_majority: TwoThirdsMajority::Empty, - proposal: Proposal::None, finalized_view_of_previous_block: 0, finalized_view_of_current_block: None, validators, @@ -637,7 +635,6 @@ impl Worker { fn increment_view(&mut self, n: View) { cinfo!(ENGINE, "increment_view: New view."); self.view += n; - self.proposal = Proposal::None; self.votes_received = MutTrigger::new(BitSet::new()); } @@ -654,7 +651,6 @@ impl Worker { self.last_two_thirds_majority = TwoThirdsMajority::Empty; self.height += 1; self.view = 0; - self.proposal = Proposal::None; self.votes_received = MutTrigger::new(BitSet::new()); self.finalized_view_of_previous_block = self.finalized_view_of_current_block.expect("self.step == Step::Commit"); @@ -672,7 +668,6 @@ impl Worker { self.last_two_thirds_majority = TwoThirdsMajority::Empty; self.height = height; self.view = 0; - self.proposal = Proposal::None; self.votes_received = MutTrigger::new(BitSet::new()); self.finalized_view_of_previous_block = finalized_view_of_previous_height; self.finalized_view_of_current_block = None; @@ -708,7 +703,7 @@ impl Worker { // need to reset vote self.broadcast_state( vote_step, - self.proposal.block_hash(), + self.votes.get_highest_proposal_summary(self.current_sortition_round()), self.last_two_thirds_majority.view(), self.votes_received.borrow_anyway(), ); @@ -723,9 +718,6 @@ impl Worker { // Wait for verification. return } - self.proposal = Proposal::new_imported(*hash); - self.move_to_step(TendermintState::Prevote, is_restoring); - return } let parent_block_hash = self.prev_block_hash(); if !self.is_signer_proposer(&parent_block_hash) { @@ -754,8 +746,8 @@ impl Worker { self.request_messages_to_all(vote_step, &BitSet::all_set() - &self.votes_received); if !self.already_generated_message() { let block_hash_candidate = match &self.last_two_thirds_majority { - TwoThirdsMajority::Empty => self.proposal.imported_block_hash(), - TwoThirdsMajority::Unlock(_) => self.proposal.imported_block_hash(), + TwoThirdsMajority::Empty => self.highest_imported_block_hash(), + TwoThirdsMajority::Unlock(_) => self.highest_imported_block_hash(), TwoThirdsMajority::Lock(_, block_hash) => Some(*block_hash), }; let block_hash = block_hash_candidate.filter(|hash| { @@ -826,6 +818,21 @@ impl Worker { } } + fn highest_imported_block_hash(&self) -> Option { + match self.step { + TendermintState::Prevote => { + self.votes.block_hashes_from_highest(self.current_sortition_round()).into_iter().find(|block_hash| { + if let BlockStatus::InChain = self.client().block_status(&(*block_hash).into()) { + true + } else { + false + } + }) + } + _ => panic!(), + } + } + fn is_generation_time_relevant(&self, block_header: &Header) -> bool { let acceptable_past_gap = self.time_gap_params.allowed_past_gap; let acceptable_future_gap = self.time_gap_params.allowed_future_gap; @@ -988,7 +995,6 @@ impl Worker { let current_vote_step = VoteStep::new(self.height, self.view, self.step.to_step()); let proposal_is_for_current = self.votes.has_votes_for(¤t_vote_step, proposal.hash()); if proposal_is_for_current { - self.proposal = Proposal::new_imported(proposal.hash()); let current_step = self.step.clone(); match current_step { TendermintState::Propose => { @@ -1008,19 +1014,7 @@ impl Worker { TendermintSealView::new(proposal.seal()).parent_block_finalized_view().unwrap(); self.jump_to_height(height, finalized_view_of_previous_height); - - let proposal_is_for_view0 = self.votes.has_votes_for( - &VoteStep { - height, - view: 0, - step: Step::Propose, - }, - proposal.hash(), - ); - if proposal_is_for_view0 { - self.proposal = Proposal::new_imported(proposal.hash()) - } - self.move_to_step(TendermintState::Prevote, false); + self.move_to_step(TendermintState::new_propose_step(), false); } } @@ -1054,12 +1048,6 @@ impl Worker { self.finalized_view_of_previous_block = backup.finalized_view_of_previous_block; self.finalized_view_of_current_block = backup.finalized_view_of_current_block; - if let Some(proposal) = backup.proposal { - if client.block(&BlockId::Hash(proposal)).is_some() { - self.proposal = Proposal::ProposalImported(proposal); - } - } - for vote in backup.votes { let bytes = rlp::encode(&vote); if let Err(err) = self.handle_message(&bytes, true) { @@ -1086,7 +1074,6 @@ impl Worker { return Seal::None } - assert_eq!(Proposal::None, self.proposal); assert_eq!(height, self.height); let view = self.view; @@ -1274,7 +1261,7 @@ impl Worker { if let Some(votes_received) = self.votes_received.borrow_if_mutated() { self.broadcast_state( self.vote_step(), - self.proposal.block_hash(), + self.votes.get_highest_proposal_summary(self.current_sortition_round()), self.last_two_thirds_majority.view(), votes_received, ); @@ -1490,7 +1477,7 @@ impl Worker { fn repropose_block(&mut self, block: encoded::Block) { let header = block.decode_header(); self.vote_on_header_for_proposal(&header).expect("I am proposer"); - self.proposal = Proposal::new_imported(header.hash()); + debug_assert_eq!(self.client().block_status(&header.hash().into()), BlockStatus::InChain); self.broadcast_proposal_block(self.view, block); } @@ -1799,13 +1786,21 @@ impl Worker { proposed_view, author_view ); - self.proposal = Proposal::new_imported(header_view.hash()); - } else { - self.proposal = Proposal::new_received(header_view.hash(), bytes.clone(), signature); + } else if Some(priority_info.priority()) + >= self + .votes + .get_highest_priority_info(self.current_sortition_round()) + .map(|priority_info| priority_info.priority()) + { + cdebug!( + ENGINE, + "Received a proposal with the priority {}. Replace the highest proposal", + priority_info.priority() + ); } self.broadcast_state( VoteStep::new(self.height, self.view, self.step.to_step()), - self.proposal.block_hash(), + self.votes.get_highest_proposal_summary(self.current_sortition_round()), self.last_two_thirds_majority.view(), self.votes_received.borrow_anyway(), ); @@ -1862,9 +1857,12 @@ impl Worker { || self.view < peer_vote_step.view || self.height < peer_vote_step.height; - let need_proposal = self.need_proposal(); - if need_proposal && peer_has_proposal { - self.send_request_proposal(token, self.height, self.view, &result); + let is_not_commit_step = !self.step.is_commit(); + let peer_has_higher = + self.votes.get_highest_priority(peer_vote_step.into()) < peer_proposal.map(|summary| summary.priority()); + + if is_not_commit_step && peer_has_proposal && peer_has_higher { + self.send_request_proposal(token, self.current_sortition_round(), &result); } let current_step = current_vote_step.step; @@ -1945,20 +1943,15 @@ impl Worker { return } - if request_height == self.height && request_view > self.view { - return - } - - if let Some((signature, _signer_index, block)) = self.first_proposal_at(request_height, request_view) { - ctrace!(ENGINE, "Send proposal {}-{} to {:?}", request_height, request_view, token); - self.send_proposal_block(signature, request_view, block, result); - return - } - - if request_height == self.height && request_view == self.view { - if let Proposal::ProposalReceived(_hash, block, signature) = &self.proposal { - self.send_proposal_block(*signature, request_view, block.clone(), result); - } + if let Some((signature, highest_priority_info, block)) = self.highest_proposal_at(requested_round) { + ctrace!( + ENGINE, + "Send proposal of priority {:?} in a round {:?} to {:?}", + highest_priority_info.priority(), + requested_round, + token + ); + self.send_proposal_block(signature, highest_priority_info, requested_round.view, block, result); } } @@ -2155,8 +2148,6 @@ impl Worker { } } - // Since we don't have proposal vote, set proposal = None - self.proposal = Proposal::None; self.view = commit_view; self.votes_received = MutTrigger::new(vote_bitset); self.last_two_thirds_majority = TwoThirdsMajority::Empty; From 2beb9b341266e8184872aac0da93624b6bd38777 Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Tue, 19 Nov 2019 17:47:33 +0900 Subject: [PATCH 05/13] Introduce randomized leader election to worker --- core/src/consensus/signer.rs | 5 - core/src/consensus/tendermint/message.rs | 57 +-- core/src/consensus/tendermint/mod.rs | 2 +- core/src/consensus/tendermint/network.rs | 69 ++-- core/src/consensus/tendermint/types.rs | 6 +- .../consensus/tendermint/vote_collector.rs | 12 +- core/src/consensus/tendermint/worker.rs | 333 +++++++++--------- .../validator_set/dynamic_validator.rs | 10 - 8 files changed, 246 insertions(+), 248 deletions(-) diff --git a/core/src/consensus/signer.rs b/core/src/consensus/signer.rs index 0bc0e7df95..8ccfe6836b 100644 --- a/core/src/consensus/signer.rs +++ b/core/src/consensus/signer.rs @@ -128,11 +128,6 @@ impl EngineSigner { self.signer.as_ref().map(|(address, _)| address) } - /// Check if the given address is the signing address. - pub fn is_address(&self, a: &Address) -> bool { - self.signer.map_or(false, |(address, _public)| *a == address) - } - /// Check if the signing address was set. pub fn is_some(&self) -> bool { self.signer.is_some() diff --git a/core/src/consensus/tendermint/message.rs b/core/src/consensus/tendermint/message.rs index 3de0721c62..7eb616c462 100644 --- a/core/src/consensus/tendermint/message.rs +++ b/core/src/consensus/tendermint/message.rs @@ -25,6 +25,7 @@ use snap; use super::super::BitSet; use super::{Height, Step, View}; +use crate::consensus::Priority; /// Step for the sortition round. /// FIXME: It has a large overlap with the previous VoteStep. @@ -119,17 +120,25 @@ const MESSAGE_ID_REQUEST_PROPOSAL: u8 = 0x05; const MESSAGE_ID_REQUEST_COMMIT: u8 = 0x06; const MESSAGE_ID_COMMIT: u8 = 0x07; +#[derive(Clone, Debug, PartialEq, RlpEncodable, RlpDecodable)] +#[cfg_attr(test, derive(Default))] +pub struct ProposalSummary { + pub priority: Priority, + pub block_hash: BlockHash, +} + #[derive(Debug, PartialEq)] pub enum TendermintMessage { ConsensusMessage(Vec), ProposalBlock { signature: SchnorrSignature, + priority_info: Box, view: View, message: Bytes, }, StepState { vote_step: VoteStep, - proposal: Option, + proposal: Box>, lock_view: Option, known_votes: BitSet, }, @@ -138,8 +147,7 @@ pub enum TendermintMessage { requested_votes: BitSet, }, RequestProposal { - height: Height, - view: View, + round: SortitionRound, }, RequestCommit { height: Height, @@ -160,12 +168,14 @@ impl Encodable for TendermintMessage { } TendermintMessage::ProposalBlock { signature, + priority_info, view, message, } => { - s.begin_list(4); + s.begin_list(5); s.append(&MESSAGE_ID_PROPOSAL_BLOCK); s.append(signature); + s.append(&**priority_info); s.append(view); let compressed = { @@ -184,7 +194,7 @@ impl Encodable for TendermintMessage { s.begin_list(5); s.append(&MESSAGE_ID_STEP_STATE); s.append(vote_step); - s.append(proposal); + s.append(&**proposal); s.append(lock_view); s.append(known_votes); } @@ -198,13 +208,11 @@ impl Encodable for TendermintMessage { s.append(requested_votes); } TendermintMessage::RequestProposal { - height, - view, + round, } => { - s.begin_list(3); + s.begin_list(2); s.append(&MESSAGE_ID_REQUEST_PROPOSAL); - s.append(height); - s.append(view); + s.append(round); } TendermintMessage::RequestCommit { height, @@ -242,15 +250,16 @@ impl Decodable for TendermintMessage { } MESSAGE_ID_PROPOSAL_BLOCK => { let item_count = rlp.item_count()?; - if item_count != 4 { + if item_count != 5 { return Err(DecoderError::RlpIncorrectListLen { got: item_count, - expected: 4, + expected: 5, }) } let signature = rlp.at(1)?; - let view = rlp.at(2)?; - let compressed_message: Vec = rlp.val_at(3)?; + let priority_info = rlp.at(2)?; + let view = rlp.at(3)?; + let compressed_message: Vec = rlp.val_at(4)?; let uncompressed_message = { // TODO: Cache the Decoder object let mut snappy_decoder = snap::Decoder::new(); @@ -262,6 +271,7 @@ impl Decodable for TendermintMessage { TendermintMessage::ProposalBlock { signature: signature.as_val()?, + priority_info: Box::new(priority_info.as_val()?), view: view.as_val()?, message: uncompressed_message, } @@ -275,7 +285,7 @@ impl Decodable for TendermintMessage { }) } let vote_step = rlp.at(1)?.as_val()?; - let proposal = rlp.at(2)?.as_val()?; + let proposal = Box::new(rlp.at(2)?.as_val()?); let lock_view = rlp.at(3)?.as_val()?; let known_votes = rlp.at(4)?.as_val()?; TendermintMessage::StepState { @@ -302,17 +312,15 @@ impl Decodable for TendermintMessage { } MESSAGE_ID_REQUEST_PROPOSAL => { let item_count = rlp.item_count()?; - if item_count != 3 { + if item_count != 2 { return Err(DecoderError::RlpIncorrectListLen { got: item_count, - expected: 3, + expected: 2, }) } - let height = rlp.at(1)?.as_val()?; - let view = rlp.at(2)?.as_val()?; + let round = rlp.at(1)?.as_val()?; TendermintMessage::RequestProposal { - height, - view, + round, } } MESSAGE_ID_REQUEST_COMMIT => { @@ -422,6 +430,7 @@ mod tests { fn encode_and_decode_tendermint_message_2() { rlp_encode_and_decode_test!(TendermintMessage::ProposalBlock { signature: SchnorrSignature::random(), + priority_info: Box::new(PriorityInfo::new(1, 0xffu64.into(), 0, 1, vec![])), view: 1, message: vec![1u8, 2u8] }); @@ -452,8 +461,10 @@ mod tests { #[test] fn encode_and_decode_tendermint_message_5() { rlp_encode_and_decode_test!(TendermintMessage::RequestProposal { - height: 10, - view: 123, + round: SortitionRound { + height: 10, + view: 123, + } }); } diff --git a/core/src/consensus/tendermint/mod.rs b/core/src/consensus/tendermint/mod.rs index 38ba99492b..16c5304ad6 100644 --- a/core/src/consensus/tendermint/mod.rs +++ b/core/src/consensus/tendermint/mod.rs @@ -35,7 +35,7 @@ use ctimer::TimerToken; use parking_lot::RwLock; use self::chain_notify::TendermintChainNotify; -pub use self::message::{ConsensusMessage, SortitionRound, VoteOn, VoteStep}; +pub use self::message::{ConsensusMessage, ProposalSummary, SortitionRound, VoteOn, VoteStep}; pub use self::params::{TendermintParams, TimeGapParams, TimeoutParams}; pub use self::types::{Height, Step, View}; use super::{stake, ValidatorSet}; diff --git a/core/src/consensus/tendermint/network.rs b/core/src/consensus/tendermint/network.rs index 5b83806b81..3ec981b026 100644 --- a/core/src/consensus/tendermint/network.rs +++ b/core/src/consensus/tendermint/network.rs @@ -24,7 +24,6 @@ use ckey::SchnorrSignature; use cnetwork::{Api, NetworkExtension, NodeId}; use crossbeam_channel as crossbeam; use ctimer::TimerToken; -use ctypes::BlockHash; use primitives::Bytes; use rand::prelude::SliceRandom; use rand::thread_rng; @@ -33,9 +32,9 @@ use rlp::{Encodable, Rlp}; use super::super::BitSet; use super::message::*; use super::params::TimeoutParams; -use super::types::{Height, PeerState, Step, View}; +use super::types::{PeerState, Step, View}; use super::worker; -use crate::consensus::EngineError; +use crate::consensus::{EngineError, PriorityInfo}; use super::{ ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_BROADCAT_STEP_STATE_INTERVAL, ENGINE_TIMEOUT_TOKEN_NONCE_BASE, @@ -73,7 +72,7 @@ impl TendermintExtension { &mut self, token: &NodeId, vote_step: VoteStep, - proposal: Option, + proposal: Option, messages: BitSet, ) { let peer_state = match self.peers.get_mut(token) { @@ -113,7 +112,7 @@ impl TendermintExtension { fn broadcast_state( &self, vote_step: VoteStep, - proposal: Option, + proposal: Option, lock_view: Option, votes: BitSet, ) { @@ -122,7 +121,7 @@ impl TendermintExtension { let message = Arc::new( TendermintMessage::StepState { vote_step, - proposal, + proposal: Box::new(proposal), lock_view, known_votes: votes, } @@ -134,10 +133,17 @@ impl TendermintExtension { } } - fn broadcast_proposal_block(&self, signature: SchnorrSignature, view: View, message: Bytes) { + fn broadcast_proposal_block( + &self, + signature: SchnorrSignature, + priority_info: Box, + view: View, + message: Bytes, + ) { let message = Arc::new( TendermintMessage::ProposalBlock { signature, + priority_info, message, view, } @@ -148,33 +154,28 @@ impl TendermintExtension { } } - fn request_proposal_to_any(&self, height: Height, view: View) { + fn request_proposal_to_any(&self, round: SortitionRound) { for (token, peer) in &self.peers { - let is_future_height_and_view = { - let higher_height = peer.vote_step.height > height; - let same_height_and_higher_view = peer.vote_step.height == height && peer.vote_step.view > view; - higher_height || same_height_and_higher_view - }; + let is_future_height_and_view = round < peer.vote_step.into(); if is_future_height_and_view { - self.request_proposal(token, height, view); + self.request_proposal(token, round); continue } - let is_same_height_and_view = peer.vote_step.height == height && peer.vote_step.view == view; + let is_same_height_and_view = round == peer.vote_step.into(); if is_same_height_and_view && peer.proposal.is_some() { - self.request_proposal(token, height, view); + self.request_proposal(token, round); } } } - fn request_proposal(&self, token: &NodeId, height: Height, view: View) { - ctrace!(ENGINE, "Request proposal {} {} to {:?}", height, view, token); + fn request_proposal(&self, token: &NodeId, round: SortitionRound) { + ctrace!(ENGINE, "Request proposal {:?} to {:?}", round, token); let message = Arc::new( TendermintMessage::RequestProposal { - height, - view, + round, } .rlp_bytes(), ); @@ -266,6 +267,7 @@ impl NetworkExtension for TendermintExtension { } Ok(TendermintMessage::ProposalBlock { signature, + priority_info, view, message, }) => { @@ -273,6 +275,7 @@ impl NetworkExtension for TendermintExtension { self.inner .send(worker::Event::ProposalBlock { signature, + priority_info, view, message: message.clone(), result, @@ -298,13 +301,13 @@ impl NetworkExtension for TendermintExtension { lock_view, known_votes, ); - self.update_peer_state(token, vote_step, proposal, known_votes); + self.update_peer_state(token, vote_step, (*proposal).clone(), known_votes); let (result, receiver) = crossbeam::unbounded(); self.inner .send(worker::Event::StepState { token: *token, vote_step, - proposal, + proposal: *proposal, lock_view, known_votes: Box::from(known_votes), result, @@ -316,15 +319,13 @@ impl NetworkExtension for TendermintExtension { } } Ok(TendermintMessage::RequestProposal { - height, - view, + round, }) => { let (result, receiver) = crossbeam::bounded(1); self.inner .send(worker::Event::RequestProposal { token: *token, - height, - view, + round, result, }) .unwrap(); @@ -411,7 +412,7 @@ impl NetworkExtension for TendermintExtension { lock_view, votes, } => { - self.broadcast_state(vote_step, proposal, lock_view, votes); + self.broadcast_state(vote_step, *proposal, lock_view, votes); } Event::RequestMessagesToAll { vote_step, @@ -420,10 +421,9 @@ impl NetworkExtension for TendermintExtension { self.request_messages_to_all(vote_step, requested_votes); } Event::RequestProposalToAny { - height, - view, + round, } => { - self.request_proposal_to_any(height, view); + self.request_proposal_to_any(round); } Event::SetTimerStep { step, @@ -432,10 +432,11 @@ impl NetworkExtension for TendermintExtension { } => self.set_timer_step(step, view, expired_token_nonce), Event::BroadcastProposalBlock { signature, + priority_info, view, message, } => { - self.broadcast_proposal_block(signature, view, message); + self.broadcast_proposal_block(signature, priority_info, view, message); } } } @@ -447,7 +448,7 @@ pub enum Event { }, BroadcastState { vote_step: VoteStep, - proposal: Option, + proposal: Box>, lock_view: Option, votes: BitSet, }, @@ -456,8 +457,7 @@ pub enum Event { requested_votes: BitSet, }, RequestProposalToAny { - height: Height, - view: View, + round: SortitionRound, }, SetTimerStep { step: Step, @@ -466,6 +466,7 @@ pub enum Event { }, BroadcastProposalBlock { signature: SchnorrSignature, + priority_info: Box, view: View, message: Bytes, }, diff --git a/core/src/consensus/tendermint/types.rs b/core/src/consensus/tendermint/types.rs index 5dd701b916..b50b323e1c 100644 --- a/core/src/consensus/tendermint/types.rs +++ b/core/src/consensus/tendermint/types.rs @@ -22,9 +22,9 @@ use primitives::Bytes; use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream}; use super::super::BitSet; -use super::message::VoteStep; +use super::message::{ProposalSummary, VoteStep}; use crate::block::{IsBlock, SealedBlock}; -use crate::consensus::sortition::SeedInfo; +use crate::consensus::{sortition::seed::SeedInfo, Priority}; pub type Height = u64; pub type View = u64; @@ -179,7 +179,7 @@ impl Encodable for Step { pub struct PeerState { pub vote_step: VoteStep, - pub proposal: Option, + pub proposal: Option, pub messages: BitSet, } diff --git a/core/src/consensus/tendermint/vote_collector.rs b/core/src/consensus/tendermint/vote_collector.rs index 519639d825..2412f9fc8e 100644 --- a/core/src/consensus/tendermint/vote_collector.rs +++ b/core/src/consensus/tendermint/vote_collector.rs @@ -188,6 +188,11 @@ impl MessageCollector { } result } + + /// get a ConsensusMessage corresponding to a certain index. + fn fetch_by_idx(&self, idx: usize) -> Option { + self.voted.get(&idx).cloned() + } } impl Default for VoteCollector { @@ -290,13 +295,6 @@ impl VoteCollector { } } - pub fn get_block_hashes(&self, round: &VoteStep) -> Vec { - self.votes - .get(round) - .map(|c| c.block_votes.keys().cloned().filter_map(|x| x).collect()) - .unwrap_or_else(Vec::new) - } - pub fn has_votes_for(&self, round: &VoteStep, block_hash: BlockHash) -> bool { let votes = self .votes diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index 2bb7f9aa99..a928807082 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -29,6 +29,7 @@ use ctypes::util::unexpected::Mismatch; use ctypes::{BlockHash, BlockNumber, Header}; use primitives::{u256_from_u128, Bytes, U256}; use rlp::{Encodable, Rlp}; +use vrf::openssl::{CipherSuite, ECVRF}; use super::super::BitSet; use super::backup::{backup, restore, BackupView}; @@ -47,7 +48,7 @@ use crate::consensus::signer::EngineSigner; use crate::consensus::validator_set::{DynamicValidator, ValidatorSet}; use crate::consensus::{ sortition::seed::{SeedInfo, VRFSeed}, - EngineError, Seal, VRFSortition, + EngineError, PriorityInfo, Seal, VRFSortition, }; use crate::encoded; use crate::error::{BlockError, Error}; @@ -141,6 +142,7 @@ pub enum Event { Restore(crossbeam::Sender<()>), ProposalBlock { signature: SchnorrSignature, + priority_info: Box, view: View, message: Bytes, result: crossbeam::Sender>>, @@ -148,15 +150,14 @@ pub enum Event { StepState { token: NodeId, vote_step: VoteStep, - proposal: Option, + proposal: Option, lock_view: Option, known_votes: Box, result: crossbeam::Sender, }, RequestProposal { token: NodeId, - height: Height, - view: View, + round: SortitionRound, result: crossbeam::Sender, }, GetAllVotesAndAuthors { @@ -195,6 +196,11 @@ impl Worker { finalized_view_of_current_block: None, validators, extension, + sortition_scheme: VRFSortition { + total_power: 1000, + expectation: 7.0, + vrf_inst: ECVRF::from_suite(CipherSuite::SECP256K1_SHA256_SVDW).unwrap(), + }, votes_received: MutTrigger::new(BitSet::new()), time_gap_params, timeout_token_nonce: ENGINE_TIMEOUT_TOKEN_NONCE_BASE, @@ -315,11 +321,12 @@ impl Worker { } Ok(Event::ProposalBlock { signature, + priority_info, view, message, result, }) => { - let client = inner.on_proposal_message(signature, view, message); + let client = inner.on_proposal_message(signature, *priority_info, view, message); result.send(client).unwrap(); } Ok(Event::StepState { @@ -329,11 +336,10 @@ impl Worker { } Ok(Event::RequestProposal { token, - height, - view, + round, result, }) => { - inner.on_request_proposal_message(&token, height, view, result); + inner.on_request_proposal_message(&token, round, result); } Ok(Event::GetAllVotesAndAuthors { vote_step, @@ -401,22 +407,6 @@ impl Worker { .and_then(|parent_header| self.fetch_vrf_seed_info(parent_header.hash()).map(|seed_info| *seed_info.seed())) } - /// Get the index of the proposer of a block to check the new proposer is valid. - fn block_proposer_idx(&self, block_hash: BlockHash) -> Option { - self.client().block_header(&BlockId::Hash(block_hash)).map(|header| { - let proposer = header.author(); - let parent = if header.number() == 0 { - // Genesis block's parent is not exist - // FIXME: The DynamicValidator should handle the Genesis block correctly. - block_hash - } else { - header.parent_hash() - }; - - self.validators.get_index_by_address(&parent, &proposer).expect("The proposer must be in the validator set") - }) - } - /// Get previous block header of given height fn prev_block_header_of_height(&self, height: Height) -> Option { let prev_height = (height - 1) as u64; @@ -455,24 +445,15 @@ impl Worker { false } - /// Find the designated for the given view. - fn view_proposer(&self, prev_block_hash: &BlockHash, view: View) -> Option
{ - self.validators.next_block_proposer(prev_block_hash, view) - } - - fn first_proposal_at(&self, height: Height, view: View) -> Option<(SchnorrSignature, usize, Bytes)> { - let vote_step = VoteStep { - height, - view, - step: Step::Propose, - }; + fn highest_proposal_at(&self, sortition_round: SortitionRound) -> Option<(SchnorrSignature, PriorityInfo, Bytes)> { + let vote_step = sortition_round.into(); - let all_votes = self.votes.get_all_votes_in_round(&vote_step); - let proposal = all_votes.first()?; + let highest_priority_info = self.votes.get_highest_priority_info(sortition_round)?; + let proposal = self.votes.fetch_by_idx(&vote_step, highest_priority_info.signer_idx())?; let block_hash = proposal.on.block_hash.expect("Proposal message always include block hash"); let bytes = self.client().block(&BlockId::Hash(block_hash))?.into_inner(); - Some((proposal.signature, proposal.signer_index, bytes)) + Some((proposal.signature, highest_priority_info, bytes)) } fn is_proposal_received(&self, height: Height, view: View, block_hash: BlockHash) -> bool { @@ -495,10 +476,6 @@ impl Worker { } } - fn need_proposal(&self) -> bool { - self.proposal.is_none() && !self.step.is_commit() - } - fn get_all_votes_and_authors( &self, vote_step: &VoteStep, @@ -515,30 +492,35 @@ impl Worker { } } - /// Check if address is a proposer for given view. - fn check_view_proposer( - &self, - parent: &BlockHash, - height: Height, - view: View, - address: &Address, - ) -> Result<(), EngineError> { - let proposer = self.view_proposer(parent, view).ok_or_else(|| EngineError::PrevBlockNotExist { - height: height as u64, - })?; - if proposer == *address { - Ok(()) - } else { - Err(EngineError::NotProposer(Mismatch { - expected: proposer, - found: *address, - })) - } + fn calculate_current_seed(&mut self) -> SeedInfo { + let parent_seed = self.prev_vrf_seed_of_height(self.height).expect("Parent seed must exist"); + let new_proof = self + .signer + .vrf_prove(&parent_seed.generate_next_msg(self.height, self.view), &mut self.sortition_scheme.vrf_inst) + .expect("Signer key was verified"); + let new_seed = self.sortition_scheme.vrf_inst.proof_to_hash(&new_proof).expect("Correctly generated proof"); + SeedInfo::new(self.signer_index().expect("The signer is a validator"), new_seed, new_proof) } - /// Check if current signer is the current proposer. - fn is_signer_proposer(&self, bh: &BlockHash) -> bool { - self.view_proposer(bh, self.view).map_or(false, |proposer| self.signer.is_address(&proposer)) + /// Check if current signer is eligible to be a proposer + fn is_signer_highest(&mut self, parent_hash: &BlockHash) -> bool { + self.votes + .get_highest_priority_info(self.current_sortition_round()) + .map(|priority_info| { + self.validators.get(parent_hash, priority_info.signer_idx()) + == *self.signer.public().expect("Engine signer must be set") + }) + .unwrap_or(true) + } + + fn signer_priority_info(&mut self, parent_block_hash: BlockHash) -> Option { + let parent_seed = + self.prev_vrf_seed_of_height(self.height).expect("Next height propose step has previous height seed"); + let signer_idx = self.signer_index()?; + let voting_power = self.get_voting_power(self.height - 1, &parent_block_hash, signer_idx); + self.sortition_scheme + .create_highest_priority_info(&parent_seed.round_msg(self.view), &self.signer, signer_idx, voting_power) + .ok()? } fn is_step(&self, message: &ConsensusMessage) -> bool { @@ -587,14 +569,14 @@ impl Worker { fn broadcast_state( &self, vote_step: VoteStep, - proposal: Option, + proposal: Option, lock_view: Option, votes: &BitSet, ) { self.extension .send(network::Event::BroadcastState { vote_step, - proposal, + proposal: Box::new(proposal), lock_view, votes: *votes, }) @@ -610,11 +592,10 @@ impl Worker { .unwrap(); } - fn request_proposal_to_any(&self, height: Height, view: View) { + fn request_proposal_to_any(&self, round: SortitionRound) { self.extension .send(network::Event::RequestProposalToAny { - height, - view, + round, }) .unwrap(); } @@ -710,32 +691,23 @@ impl Worker { match state.to_step() { Step::Propose => { cinfo!(ENGINE, "move_to_step: Propose."); - // If there are multiple proposals, use the first proposal. - if let Some(hash) = self.votes.get_block_hashes(&vote_step).first() { - if self.client().block(&BlockId::Hash(*hash)).is_none() { - cwarn!(ENGINE, "Proposal is received but not imported"); - // Proposal is received but is not verified yet. - // Wait for verification. - return - } - } let parent_block_hash = self.prev_block_hash(); - if !self.is_signer_proposer(&parent_block_hash) { - self.request_proposal_to_any(vote_step.height, vote_step.view); - return - } - if let TwoThirdsMajority::Lock(lock_view, locked_block_hash) = self.last_two_thirds_majority { - cinfo!(ENGINE, "I am a proposer, I'll re-propose a locked block"); - match self.locked_proposal_block(lock_view, locked_block_hash) { - Ok(block) => self.repropose_block(block), - Err(error_msg) => cwarn!(ENGINE, "{}", error_msg), + if let Some(priority_info) = self.signer_priority_info(parent_block_hash) { + if let TwoThirdsMajority::Lock(lock_view, locked_block_hash) = self.last_two_thirds_majority { + cinfo!(ENGINE, "I am eligible to be a proposer, I'll re-propose a locked block"); + match self.locked_proposal_block(lock_view, locked_block_hash) { + Ok(block) => self.repropose_block(priority_info, block), + Err(error_msg) => cwarn!(ENGINE, "{}", error_msg), + } + } else { + cinfo!(ENGINE, "I am eligible to be a proposer, I'll create a block"); + self.update_sealing(parent_block_hash); + self.step = TendermintState::ProposeWaitBlockGeneration { + parent_hash: parent_block_hash, + }; } } else { - cinfo!(ENGINE, "I am a proposer, I'll create a block"); - self.update_sealing(parent_block_hash); - self.step = TendermintState::ProposeWaitBlockGeneration { - parent_hash: parent_block_hash, - }; + self.request_proposal_to_any(vote_step.into()); } } Step::Prevote => { @@ -857,7 +829,7 @@ impl Worker { let received_locked_block = self.votes.has_votes_for(&vote_step, locked_proposal_hash); if !received_locked_block { - self.request_proposal_to_any(self.height, locked_view); + self.request_proposal_to_any(vote_step.into()); return Err(format!("Have a lock on {}-{}, but do not received a locked proposal", self.height, locked_view)) } @@ -997,15 +969,15 @@ impl Worker { if proposal_is_for_current { let current_step = self.step.clone(); match current_step { - TendermintState::Propose => { - self.move_to_step(TendermintState::Prevote, false); - } TendermintState::ProposeWaitImported { block, } => { cinfo!(ENGINE, "Submitting proposal block {}", block.header().hash()); - self.move_to_step(TendermintState::Prevote, false); - self.broadcast_proposal_block(self.view, encoded::Block::new(block.rlp_bytes())); + self.broadcast_proposal_block( + self.view, + self.votes.get_highest_priority_info(self.current_sortition_round()).unwrap(), + encoded::Block::new(block.rlp_bytes()), + ); } _ => {} }; @@ -1061,38 +1033,37 @@ impl Worker { SEAL_FIELDS } - fn generate_seal(&self, height: Height, parent_hash: BlockHash) -> Seal { + fn generate_seal(&mut self, height: Height, parent_hash: BlockHash) -> Seal { // Block is received from other nodes while creating a block if height < self.height { return Seal::None } - // We don't know at which view the node starts generating a block. - // If this node's signer is not proposer at the current view, return none. - if !self.is_signer_proposer(&parent_hash) { - cwarn!(ENGINE, "Seal request for an old view"); - return Seal::None - } + if self.is_signer_highest(&parent_hash) { + assert_eq!(height, self.height); - assert_eq!(height, self.height); + let view = self.view; + let current_seed = self.calculate_current_seed(); - let view = self.view; + let last_block_view = &self.finalized_view_of_previous_block; + assert_eq!(self.prev_block_hash(), parent_hash); - let last_block_view = &self.finalized_view_of_previous_block; - assert_eq!(self.prev_block_hash(), parent_hash); - - let (precommits, precommit_indices) = self - .votes - .round_signatures_and_indices(&VoteStep::new(height - 1, *last_block_view, Step::Precommit), &parent_hash); - ctrace!(ENGINE, "Collected seal: {:?}({:?})", precommits, precommit_indices); - let precommit_bitset = BitSet::new_with_indices(&precommit_indices); - Seal::Tendermint { - prev_view: *last_block_view, - cur_view: view, - precommits, - precommit_bitset, - vrf_seed: self.prev_vrf_seed(), - vrf_seed_proof: vec![], + let (precommits, precommit_indices) = self.votes.round_signatures_and_indices( + &VoteStep::new(height - 1, *last_block_view, Step::Precommit), + &parent_hash, + ); + ctrace!(ENGINE, "Collected seal: {:?}({:?})", precommits, precommit_indices); + let precommit_bitset = BitSet::new_with_indices(&precommit_indices); + Seal::Tendermint { + prev_view: *last_block_view, + cur_view: view, + precommits, + precommit_bitset, + vrf_seed_info: Box::new(current_seed), + } + } else { + cdebug!(ENGINE, "Seal generation halted because a higher priority is accepted"); + Seal::None } } @@ -1203,7 +1174,6 @@ impl Worker { if !self.is_authority(header.parent_hash(), proposer) { return Err(EngineError::BlockNotAuthorized(*proposer).into()) } - self.check_view_proposer(header.parent_hash(), header.number(), author_view, &proposer)?; let seal_view = TendermintSealView::new(header.seal()); let bitset_count = seal_view.bitset()?.count(); let precommits_count = seal_view.precommits().item_count()?; @@ -1474,26 +1444,24 @@ impl Worker { !self.has_enough_precommit_votes(block_hash) } - fn repropose_block(&mut self, block: encoded::Block) { + fn repropose_block(&mut self, priority_info: PriorityInfo, block: encoded::Block) { let header = block.decode_header(); - self.vote_on_header_for_proposal(&header).expect("I am proposer"); + self.vote_on_header_for_proposal(&header).expect("I am eligible to be a proposer"); debug_assert_eq!(self.client().block_status(&header.hash().into()), BlockStatus::InChain); - self.broadcast_proposal_block(self.view, block); + self.broadcast_proposal_block(self.view, priority_info, block); } - fn broadcast_proposal_block(&self, view: View, block: encoded::Block) { + fn broadcast_proposal_block(&self, view: View, priority_info: PriorityInfo, block: encoded::Block) { let header = block.decode_header(); let hash = header.hash(); - let parent_hash = header.parent_hash(); let vote_step = VoteStep::new(header.number() as Height, view, Step::Propose); cdebug!(ENGINE, "Send proposal {:?}", vote_step); - assert!(self.is_signer_proposer(&parent_hash)); - let signature = self.votes.round_signature(&vote_step, &hash).expect("Proposal vote is generated before"); self.extension .send(network::Event::BroadcastProposalBlock { signature, + priority_info: Box::new(priority_info), view, message: block.into_inner(), }) @@ -1534,10 +1502,7 @@ impl Worker { fn vote_on_header_for_proposal(&mut self, header: &Header) -> Result { assert!(header.number() == self.height); - - let parent_hash = header.parent_hash(); - let prev_proposer_idx = self.block_proposer_idx(*parent_hash).expect("Prev block must exists"); - let signer_index = self.validators.proposer_index(*parent_hash, prev_proposer_idx, self.view as usize); + let signer_index = self.signer_index().expect("I am a validator"); let on = VoteOn { step: VoteStep::new(self.height, self.view, Step::Propose), @@ -1562,12 +1527,9 @@ impl Worker { &self, header: &Header, proposed_view: View, + signer_index: usize, signature: SchnorrSignature, ) -> Option { - let prev_proposer_idx = self.block_proposer_idx(*header.parent_hash())?; - let signer_index = - self.validators.proposer_index(*header.parent_hash(), prev_proposer_idx, proposed_view as usize); - let on = VoteOn { step: VoteStep::new(header.number(), proposed_view, Step::Propose), block_hash: Some(header.hash()), @@ -1649,12 +1611,14 @@ impl Worker { fn send_proposal_block( &self, signature: SchnorrSignature, + priority_info: PriorityInfo, view: View, message: Bytes, result: crossbeam::Sender, ) { let message = TendermintMessage::ProposalBlock { signature, + priority_info: Box::new(priority_info), message, view, } @@ -1678,11 +1642,10 @@ impl Worker { result.send(message).unwrap(); } - fn send_request_proposal(&self, token: &NodeId, height: Height, view: View, result: &crossbeam::Sender) { - ctrace!(ENGINE, "Request proposal {} {} to {:?}", height, view, token); + fn send_request_proposal(&self, token: &NodeId, round: SortitionRound, result: &crossbeam::Sender) { + ctrace!(ENGINE, "Request proposal {:?} to {:?}", round, token); let message = TendermintMessage::RequestProposal { - height, - view, + round, } .rlp_bytes(); result.send(message).unwrap(); @@ -1706,9 +1669,24 @@ impl Worker { result.send(message.rlp_bytes()).unwrap(); } + fn get_voting_power(&self, height: u64, block_hash: &BlockHash, signer_idx: usize) -> u64 { + self.validators + .normalized_voting_power(height, block_hash, signer_idx, self.sortition_scheme.total_power) + .unwrap() + } + + #[inline] + fn current_sortition_round(&self) -> SortitionRound { + SortitionRound { + height: self.height, + view: self.view, + } + } + fn on_proposal_message( &mut self, signature: SchnorrSignature, + priority_info: PriorityInfo, proposed_view: View, bytes: Bytes, ) -> Option> { @@ -1735,13 +1713,14 @@ impl Worker { return None } } - let message = match self.recover_proposal_vote(&header_view, proposed_view, signature) { - Some(vote) => vote, - None => { - cwarn!(ENGINE, "Prev block proposer does not exist for height {}", number); - return None - } - }; + let message = + match self.recover_proposal_vote(&header_view, proposed_view, priority_info.signer_idx(), signature) { + Some(vote) => vote, + None => { + cwarn!(ENGINE, "Prev block proposer does not exist for height {}", number); + return None + } + }; // If the proposal's height is current height + 1 and the proposal has valid precommits, // we should import it and increase height @@ -1755,7 +1734,7 @@ impl Worker { return None } - let signer_public = self.validators.get(&parent_hash, message.signer_index); + let signer_public = self.validators.get(&parent_hash, priority_info.signer_idx()); match message.verify(&signer_public) { Ok(false) => { cwarn!(ENGINE, "Proposal verification failed: signer is different"); @@ -1768,11 +1747,45 @@ impl Worker { _ => {} } + // priority verification block + { + let parent_seed = self.prev_vrf_seed_of_height(number)?; + let signer_idx = priority_info.signer_idx(); + + let voting_power = self.get_voting_power(number - 1, &parent_hash, signer_idx); + match priority_info.verify( + &parent_seed.round_msg(proposed_view), + &signer_public, + voting_power, + &mut self.sortition_scheme, + ) { + Ok(true) => {} + _ => { + cwarn!(ENGINE, "Priority message verification failed"); + return None + } + } + } + if self.votes.is_old_or_known(&message) { cdebug!(ENGINE, "Proposal is already known"); return None } + self.votes.collect_priority( + SortitionRound { + height: number, + view: proposed_view, + }, + priority_info.clone(), + ); + + if let Err(double) = self.votes.collect(message) { + cerror!(ENGINE, "Double Vote found {:?}", double); + self.report_double_vote(&double); + return None + } + if number == self.height as u64 && proposed_view == self.view { // The proposer re-proposed its locked proposal. // If we already imported the proposal, we should set `proposal` here. @@ -1786,11 +1799,8 @@ impl Worker { proposed_view, author_view ); - } else if Some(priority_info.priority()) - >= self - .votes - .get_highest_priority_info(self.current_sortition_round()) - .map(|priority_info| priority_info.priority()) + } else if Some(priority_info.clone()) + >= self.votes.get_highest_priority_info(self.current_sortition_round()) { cdebug!( ENGINE, @@ -1805,12 +1815,6 @@ impl Worker { self.votes_received.borrow_anyway(), ); } - - if let Err(double) = self.votes.collect(message) { - cerror!(ENGINE, "Double Vote found {:?}", double); - self.report_double_vote(&double); - return None - } } Some(c) @@ -1820,7 +1824,7 @@ impl Worker { &self, token: &NodeId, peer_vote_step: VoteStep, - peer_proposal: Option, + peer_proposal: Option, peer_lock_view: Option, peer_known_votes: BitSet, result: crossbeam::Sender, @@ -1935,11 +1939,10 @@ impl Worker { fn on_request_proposal_message( &self, token: &NodeId, - request_height: Height, - request_view: View, + requested_round: SortitionRound, result: crossbeam::Sender, ) { - if request_height > self.height { + if requested_round > self.current_sortition_round() { return } diff --git a/core/src/consensus/validator_set/dynamic_validator.rs b/core/src/consensus/validator_set/dynamic_validator.rs index 5b8efcb0ea..857247e314 100644 --- a/core/src/consensus/validator_set/dynamic_validator.rs +++ b/core/src/consensus/validator_set/dynamic_validator.rs @@ -67,16 +67,6 @@ impl DynamicValidator { fn validators_pubkey(&self, parent: BlockHash) -> Option> { self.validators(parent).map(|validators| validators.into_iter().map(|val| *val.pubkey()).collect()) } - - pub fn proposer_index(&self, parent: BlockHash, prev_proposer_index: usize, proposed_view: usize) -> usize { - if let Some(validators) = self.validators(parent) { - let num_validators = validators.len(); - proposed_view % num_validators - } else { - let num_validators = self.initial_list.count(&parent); - (prev_proposer_index + proposed_view + 1) % num_validators - } - } } impl ValidatorSet for DynamicValidator { From 128c952773c6faa2d8ca7f61fda8aa80ee878fd3 Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Mon, 2 Dec 2019 16:29:06 +0900 Subject: [PATCH 06/13] Replace request_proposal_to_any to request_proposal_to_superiors --- core/src/consensus/sortition/vrf_sortition.rs | 2 +- core/src/consensus/tendermint/message.rs | 12 ++- core/src/consensus/tendermint/network.rs | 97 ++++++++++++------- core/src/consensus/tendermint/types.rs | 4 + .../consensus/tendermint/vote_collector.rs | 6 +- core/src/consensus/tendermint/worker.rs | 42 +++++++- 6 files changed, 119 insertions(+), 44 deletions(-) diff --git a/core/src/consensus/sortition/vrf_sortition.rs b/core/src/consensus/sortition/vrf_sortition.rs index 4125ebfd0a..64caadc86e 100644 --- a/core/src/consensus/sortition/vrf_sortition.rs +++ b/core/src/consensus/sortition/vrf_sortition.rs @@ -35,7 +35,7 @@ pub struct VRFSortition { pub vrf_inst: ECVRF, } -#[derive(Eq, PartialEq, Clone, Debug, RlpEncodable, RlpDecodable)] +#[derive(Eq, PartialEq, Clone, Default, Debug, RlpEncodable, RlpDecodable)] pub struct PriorityInfo { signer_idx: usize, priority: Priority, diff --git a/core/src/consensus/tendermint/message.rs b/core/src/consensus/tendermint/message.rs index 7eb616c462..60d90c5141 100644 --- a/core/src/consensus/tendermint/message.rs +++ b/core/src/consensus/tendermint/message.rs @@ -25,7 +25,7 @@ use snap; use super::super::BitSet; use super::{Height, Step, View}; -use crate::consensus::Priority; +use crate::consensus::{Priority, PriorityInfo}; /// Step for the sortition round. /// FIXME: It has a large overlap with the previous VoteStep. @@ -123,10 +123,16 @@ const MESSAGE_ID_COMMIT: u8 = 0x07; #[derive(Clone, Debug, PartialEq, RlpEncodable, RlpDecodable)] #[cfg_attr(test, derive(Default))] pub struct ProposalSummary { - pub priority: Priority, + pub priority_info: PriorityInfo, pub block_hash: BlockHash, } +impl ProposalSummary { + pub fn priority(&self) -> Priority { + self.priority_info.priority() + } +} + #[derive(Debug, PartialEq)] pub enum TendermintMessage { ConsensusMessage(Vec), @@ -442,7 +448,7 @@ mod tests { bit_set.set(2); rlp_encode_and_decode_test!(TendermintMessage::StepState { vote_step: VoteStep::new(10, 123, Step::Prevote), - proposal: Some(Default::default()), + proposal: Box::new(Some(Default::default())), lock_view: Some(2), known_votes: bit_set }); diff --git a/core/src/consensus/tendermint/network.rs b/core/src/consensus/tendermint/network.rs index 3ec981b026..a185a8aa0a 100644 --- a/core/src/consensus/tendermint/network.rs +++ b/core/src/consensus/tendermint/network.rs @@ -34,7 +34,7 @@ use super::message::*; use super::params::TimeoutParams; use super::types::{PeerState, Step, View}; use super::worker; -use crate::consensus::{EngineError, PriorityInfo}; +use crate::consensus::{EngineError, Priority, PriorityInfo}; use super::{ ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_BROADCAT_STEP_STATE_INTERVAL, ENGINE_TIMEOUT_TOKEN_NONCE_BASE, @@ -154,21 +154,25 @@ impl TendermintExtension { } } - fn request_proposal_to_any(&self, round: SortitionRound) { - for (token, peer) in &self.peers { - let is_future_height_and_view = round < peer.vote_step.into(); - - if is_future_height_and_view { - self.request_proposal(token, round); - continue - } - - let is_same_height_and_view = round == peer.vote_step.into(); - - if is_same_height_and_view && peer.proposal.is_some() { - self.request_proposal(token, round); - } - } + fn request_proposal_to_superiors(&self, round: SortitionRound, my_highest: Option) { + // Request to future round peers + self.peers + .iter() + .filter(|(_, peer)| round < peer.vote_step.into()) + .for_each(|(token, _)| self.request_proposal(token, round)); + + let current_round_peers = self.peers.iter().filter(|(_, peer)| round == peer.vote_step.into()); + // Request to current round higher peers + if let Some(current_round_highest_priority) = current_round_peers + .clone() + .map(|(_id, peer)| peer.priority()) + .max() + .filter(|highest_priority| *highest_priority > my_highest) + { + current_round_peers + .filter(|(_id, peer)| peer.priority() == current_round_highest_priority) + .for_each(|(token, _)| self.request_proposal(token, round)) + }; } fn request_proposal(&self, token: &NodeId, round: SortitionRound) { @@ -301,21 +305,44 @@ impl NetworkExtension for TendermintExtension { lock_view, known_votes, ); - self.update_peer_state(token, vote_step, (*proposal).clone(), known_votes); - let (result, receiver) = crossbeam::unbounded(); - self.inner - .send(worker::Event::StepState { - token: *token, - vote_step, - proposal: *proposal, - lock_view, - known_votes: Box::from(known_votes), - result, - }) - .unwrap(); - - while let Ok(message) = receiver.recv() { - self.api.send(token, Arc::new(message)); + let unchanged = match self.peers.get(token) { + Some(peer_state) => *proposal == peer_state.proposal, + None => false, + }; + let verified = unchanged || { + (*proposal) + .clone() + .map(|summary| { + let (result, receiver) = crossbeam::bounded(1); + self.inner + .send(worker::Event::VerifyPriorityInfo { + height: vote_step.height, + view: vote_step.view, + priority_info: summary.priority_info, + result, + }) + .unwrap(); + receiver.recv().unwrap().unwrap_or(false) + }) + .unwrap_or(true) + }; + if verified { + self.update_peer_state(token, vote_step, (*proposal).clone(), known_votes); + let (result, receiver) = crossbeam::unbounded(); + self.inner + .send(worker::Event::StepState { + token: *token, + vote_step, + proposal: *proposal, + lock_view, + known_votes: Box::from(known_votes), + result, + }) + .unwrap(); + + while let Ok(message) = receiver.recv() { + self.api.send(token, Arc::new(message)); + } } } Ok(TendermintMessage::RequestProposal { @@ -420,10 +447,11 @@ impl NetworkExtension for TendermintExtension { } => { self.request_messages_to_all(vote_step, requested_votes); } - Event::RequestProposalToAny { + Event::RequestProposalToSuperiors { round, + current_highest, } => { - self.request_proposal_to_any(round); + self.request_proposal_to_superiors(round, current_highest); } Event::SetTimerStep { step, @@ -456,8 +484,9 @@ pub enum Event { vote_step: VoteStep, requested_votes: BitSet, }, - RequestProposalToAny { + RequestProposalToSuperiors { round: SortitionRound, + current_highest: Option, }, SetTimerStep { step: Step, diff --git a/core/src/consensus/tendermint/types.rs b/core/src/consensus/tendermint/types.rs index b50b323e1c..5c74bd1c22 100644 --- a/core/src/consensus/tendermint/types.rs +++ b/core/src/consensus/tendermint/types.rs @@ -191,6 +191,10 @@ impl PeerState { messages: BitSet::new(), } } + + pub fn priority(&self) -> Option { + self.proposal.as_ref().map(|summary| summary.priority()) + } } pub struct TendermintSealView<'a> { diff --git a/core/src/consensus/tendermint/vote_collector.rs b/core/src/consensus/tendermint/vote_collector.rs index 2412f9fc8e..23289ec677 100644 --- a/core/src/consensus/tendermint/vote_collector.rs +++ b/core/src/consensus/tendermint/vote_collector.rs @@ -21,7 +21,7 @@ use ckey::SchnorrSignature; use ctypes::BlockHash; use rlp::{Encodable, RlpStream}; -use super::super::PriorityInfo; +use super::super::{Priority, PriorityInfo}; use super::stake::Action; use super::{ConsensusMessage, ProposalSummary, SortitionRound, Step, VoteStep}; use crate::consensus::BitSet; @@ -335,6 +335,10 @@ impl VoteCollector { .and_then(|step_collector| step_collector.priority_collector().get_highest()) } + pub fn get_highest_priority(&self, sortition_round: SortitionRound) -> Option { + self.get_highest_priority_info(sortition_round).map(|priority_info| priority_info.priority()) + } + pub fn get_highest_proposal_hash(&self, sortition_round: SortitionRound) -> Option { self.votes.get(&sortition_round.into()).and_then(|step_collector| { let highest_priority_idx = diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index a928807082..4735251f97 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -48,7 +48,7 @@ use crate::consensus::signer::EngineSigner; use crate::consensus::validator_set::{DynamicValidator, ValidatorSet}; use crate::consensus::{ sortition::seed::{SeedInfo, VRFSeed}, - EngineError, PriorityInfo, Seal, VRFSortition, + EngineError, Priority, PriorityInfo, Seal, VRFSortition, }; use crate::encoded; use crate::error::{BlockError, Error}; @@ -147,6 +147,12 @@ pub enum Event { message: Bytes, result: crossbeam::Sender>>, }, + VerifyPriorityInfo { + height: Height, + view: View, + priority_info: PriorityInfo, + result: crossbeam::Sender>, + }, StepState { token: NodeId, vote_step: VoteStep, @@ -362,6 +368,14 @@ impl Worker { let client = inner.on_commit_message(block, votes); result.send(client).unwrap(); } + Ok(Event::VerifyPriorityInfo { + height, + view, + priority_info, + result + }) => { + result.send(inner.verify_priority_info(height, view, priority_info)).unwrap(); + } Err(crossbeam::RecvError) => { cerror!(ENGINE, "The event channel for tendermint thread had been closed."); break @@ -592,10 +606,11 @@ impl Worker { .unwrap(); } - fn request_proposal_to_any(&self, round: SortitionRound) { + fn request_proposal_to_superiors(&self, round: SortitionRound, current_highest: Option) { self.extension - .send(network::Event::RequestProposalToAny { + .send(network::Event::RequestProposalToSuperiors { round, + current_highest, }) .unwrap(); } @@ -707,7 +722,10 @@ impl Worker { }; } } else { - self.request_proposal_to_any(vote_step.into()); + let sortition_round = vote_step.into(); + let round_highest_priority = self.votes.get_highest_priority(sortition_round); + cinfo!(ENGINE, "I am not eligible to be a proposer, I'll request a proposal"); + self.request_proposal_to_superiors(sortition_round, round_highest_priority); } } Step::Prevote => { @@ -829,7 +847,9 @@ impl Worker { let received_locked_block = self.votes.has_votes_for(&vote_step, locked_proposal_hash); if !received_locked_block { - self.request_proposal_to_any(vote_step.into()); + let sortition_round = vote_step.into(); + let round_highest_priority = self.votes.get_highest_priority(sortition_round); + self.request_proposal_to_superiors(sortition_round, round_highest_priority); return Err(format!("Have a lock on {}-{}, but do not received a locked proposal", self.height, locked_view)) } @@ -1140,6 +1160,18 @@ impl Worker { Ok(()) } + fn verify_priority_info(&mut self, height: Height, view: View, priority_info: PriorityInfo) -> Option { + let parent_seed = self.prev_vrf_seed_of_height(height)?; + let parent_hash = self.client().block_hash(&(height - 1).into())?; + let signer_idx = priority_info.signer_idx(); + let signer_public = self.validators.get(&parent_hash, signer_idx); + let voting_power = self.get_voting_power(height - 1, &parent_hash, signer_idx); + + priority_info + .verify(&parent_seed.round_msg(view), &signer_public, voting_power, &mut self.sortition_scheme) + .ok() + } + fn verify_seed(&mut self, header: &Header, parent: &Header) -> Result<(), Error> { let current_seal_view = TendermintSealView::new(&header.seal()); let current_seed_signer_idx = current_seal_view.vrf_seed_info().expect("Seal field verified").signer_idx(); From c97a9a25fba4bcd21b888d17029c8ee89b821e10 Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Tue, 3 Dec 2019 17:40:51 +0900 Subject: [PATCH 07/13] Change the possible tendermint states To accept multiple proposals and wait for their imports, I introduced a new type for TendermintState::Propose. --- core/src/consensus/tendermint/types.rs | 119 ++++++++++++++++----- core/src/consensus/tendermint/worker.rs | 132 +++++++++++------------- 2 files changed, 156 insertions(+), 95 deletions(-) diff --git a/core/src/consensus/tendermint/types.rs b/core/src/consensus/tendermint/types.rs index 5c74bd1c22..b44567817a 100644 --- a/core/src/consensus/tendermint/types.rs +++ b/core/src/consensus/tendermint/types.rs @@ -24,20 +24,62 @@ use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream}; use super::super::BitSet; use super::message::{ProposalSummary, VoteStep}; use crate::block::{IsBlock, SealedBlock}; -use crate::consensus::{sortition::seed::SeedInfo, Priority}; +use crate::consensus::{sortition::seed::SeedInfo, Priority, PriorityInfo}; pub type Height = u64; pub type View = u64; +#[derive(Clone)] +pub struct ProposeInner { + wait_block_generation: Option<(PriorityInfo, BlockHash)>, + wait_imported: Vec<(PriorityInfo, SealedBlock)>, +} + +impl ProposeInner { + pub fn generation_completed(&mut self) -> Option<(PriorityInfo, BlockHash)> { + self.wait_block_generation.take() + } + + pub fn generation_halted(&mut self) { + self.wait_block_generation = None; + } + + fn import_completed(&mut self, target_block_hash: BlockHash) -> Option<(PriorityInfo, SealedBlock)> { + let position = self + .wait_imported + .iter() + .position(|(_, sealed_block)| sealed_block.header().hash() == target_block_hash)?; + Some(self.wait_imported.remove(position)) + } + + fn wait_block_generation(&mut self, my_priority_info: PriorityInfo, parent_hash: BlockHash) { + self.wait_block_generation = Some((my_priority_info, parent_hash)); + } + + fn wait_imported(&mut self, target_priority_info: PriorityInfo, target_block: SealedBlock) { + self.wait_imported.insert(0, (target_priority_info, target_block)); + } + + pub fn get_wait_block_generation(&self) -> &Option<(PriorityInfo, BlockHash)> { + &self.wait_block_generation + } +} + +impl fmt::Debug for ProposeInner { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "waiting block generation {:?} and waiting block imports {:?}", + self.wait_block_generation, + self.wait_imported.iter().map(|(_, sealed)| sealed.header().hash()).collect::>() + ) + } +} + #[derive(Clone)] pub enum TendermintState { - Propose, - ProposeWaitBlockGeneration { - parent_hash: BlockHash, - }, - ProposeWaitImported { - block: Box, - }, + // wait block generation + Propose(Box), Prevote, Precommit, Commit { @@ -51,13 +93,50 @@ pub enum TendermintState { } impl TendermintState { + pub fn new_propose_step() -> Self { + TendermintState::Propose(Box::new(ProposeInner { + wait_block_generation: None, + wait_imported: Vec::new(), + })) + } + + pub fn generation_completed(&mut self) -> Option<(PriorityInfo, BlockHash)> { + if let Self::Propose(inner) = self { + inner.generation_completed() + } else { + None + } + } + + pub fn generation_halted(&mut self) { + if let Self::Propose(inner) = self { + inner.generation_halted() + } + } + + pub fn import_completed(&mut self, target_block_hash: BlockHash) -> Option<(PriorityInfo, SealedBlock)> { + if let Self::Propose(inner) = self { + inner.import_completed(target_block_hash) + } else { + None + } + } + + pub fn wait_block_generation(&mut self, my_priority_info: PriorityInfo, parent_hash: BlockHash) { + if let Self::Propose(inner) = self { + inner.wait_block_generation(my_priority_info, parent_hash); + } + } + + pub fn wait_imported(&mut self, target_priority_info: PriorityInfo, target_block: SealedBlock) { + if let Self::Propose(inner) = self { + inner.wait_imported(target_priority_info, target_block) + } + } + pub fn to_step(&self) -> Step { match self { - TendermintState::Propose => Step::Propose, - TendermintState::ProposeWaitBlockGeneration { - .. - } => Step::Propose, - TendermintState::ProposeWaitImported { + TendermintState::Propose { .. } => Step::Propose, TendermintState::Prevote => Step::Prevote, @@ -102,11 +181,7 @@ impl TendermintState { block_hash, view, } => Some((*view, *block_hash)), - TendermintState::Propose => None, - TendermintState::ProposeWaitBlockGeneration { - .. - } => None, - TendermintState::ProposeWaitImported { + TendermintState::Propose { .. } => None, TendermintState::Prevote => None, @@ -118,13 +193,7 @@ impl TendermintState { impl fmt::Debug for TendermintState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - TendermintState::Propose => write!(f, "TendermintState::Propose"), - TendermintState::ProposeWaitBlockGeneration { - parent_hash, - } => write!(f, "TendermintState::ProposeWaitBlockGeneration({})", parent_hash), - TendermintState::ProposeWaitImported { - block, - } => write!(f, "TendermintState::ProposeWaitImported({})", block.header().hash()), + TendermintState::Propose(inner) => write!(f, "TenderminState::Propose, {:?}", inner), TendermintState::Prevote => write!(f, "TendermintState::Prevote"), TendermintState::Precommit => write!(f, "TendermintState::Precommit"), TendermintState::Commit { diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index 4735251f97..751056686c 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -194,7 +194,7 @@ impl Worker { client, height: 1, view: 0, - step: TendermintState::Propose, + step: TendermintState::new_propose_step(), votes: Default::default(), signer: Default::default(), last_two_thirds_majority: TwoThirdsMajority::Empty, @@ -717,9 +717,7 @@ impl Worker { } else { cinfo!(ENGINE, "I am eligible to be a proposer, I'll create a block"); self.update_sealing(parent_block_hash); - self.step = TendermintState::ProposeWaitBlockGeneration { - parent_hash: parent_block_hash, - }; + self.step.wait_block_generation(priority_info, parent_block_hash); } } else { let sortition_round = vote_step.into(); @@ -913,7 +911,7 @@ impl Worker { { if self.can_move_from_commit_to_propose() { self.move_to_the_next_height(); - self.move_to_step(TendermintState::Propose, is_restoring); + self.move_to_step(TendermintState::new_propose_step(), is_restoring); return } @@ -935,7 +933,7 @@ impl Worker { let next_step = match self.step { TendermintState::Precommit if message.on.block_hash.is_none() && has_enough_aligned_votes => { self.increment_view(1); - Some(TendermintState::Propose) + Some(TendermintState::new_propose_step()) } // Avoid counting votes twice. TendermintState::Prevote if lock_change => Some(TendermintState::Precommit), @@ -983,24 +981,28 @@ impl Worker { step: Step::Propose, }); + let proposal_hash = proposal.hash(); let current_height = self.height; let current_vote_step = VoteStep::new(self.height, self.view, self.step.to_step()); - let proposal_is_for_current = self.votes.has_votes_for(¤t_vote_step, proposal.hash()); + let proposal_is_for_current = self.votes.has_votes_for(¤t_vote_step, proposal_hash); if proposal_is_for_current { - let current_step = self.step.clone(); - match current_step { - TendermintState::ProposeWaitImported { - block, - } => { - cinfo!(ENGINE, "Submitting proposal block {}", block.header().hash()); - self.broadcast_proposal_block( + if let Some((priority_info, imported_block)) = self.step.import_completed(proposal_hash) { + cinfo!(ENGINE, "Submitting proposal block {}", proposal_hash); + match self.highest_proposal_at(self.current_sortition_round()) { + None => self.broadcast_proposal_block( self.view, - self.votes.get_highest_priority_info(self.current_sortition_round()).unwrap(), - encoded::Block::new(block.rlp_bytes()), - ); - } - _ => {} - }; + priority_info, + encoded::Block::new(imported_block.rlp_bytes()), + ), + Some((_, highest_priority_info, _)) if priority_info >= highest_priority_info => self + .broadcast_proposal_block( + self.view, + priority_info, + encoded::Block::new(imported_block.rlp_bytes()), + ), + _ => (), + }; + } } else if current_height < height { let finalized_view_of_previous_height = TendermintSealView::new(proposal.seal()).parent_block_finalized_view().unwrap(); @@ -1026,7 +1028,7 @@ impl Worker { let backup = restore(client.get_kvdb().as_ref()); if let Some(backup) = backup { let backup_step = match backup.step { - Step::Propose => TendermintState::Propose, + Step::Propose => TendermintState::new_propose_step(), Step::Prevote => TendermintState::Prevote, Step::Precommit => TendermintState::Precommit, // If the backuped step is `Commit`, we should start at `Precommit` to update the @@ -1056,6 +1058,7 @@ impl Worker { fn generate_seal(&mut self, height: Height, parent_hash: BlockHash) -> Seal { // Block is received from other nodes while creating a block if height < self.height { + self.step.generation_halted(); return Seal::None } @@ -1082,6 +1085,7 @@ impl Worker { vrf_seed_info: Box::new(current_seed), } } else { + self.step.generation_halted(); cdebug!(ENGINE, "Seal generation halted because a higher priority is accepted"); Seal::None } @@ -1106,31 +1110,22 @@ impl Worker { } let header = sealed_block.header(); + let parent_hash = header.parent_hash(); - if let TendermintState::ProposeWaitBlockGeneration { - parent_hash: expected_parent_hash, - } = self.step - { - let parent_hash = header.parent_hash(); - assert_eq!( - *parent_hash, expected_parent_hash, - "Generated hash({:?}) is different from expected({:?})", - parent_hash, expected_parent_hash - ); - } else { - ctrace!( - ENGINE, - "Proposal is generated after step is changed. Expected step is ProposeWaitBlockGeneration but current step is {:?}", - self.step, - ); - return - } - debug_assert_eq!(Ok(self.view), TendermintSealView::new(header.seal()).author_view()); + match self.step.generation_completed() { + Some((target_priority_info, expected_parent_hash)) if expected_parent_hash == *parent_hash => { + debug_assert_eq!(Ok(self.view), TendermintSealView::new(header.seal()).author_view()); - self.vote_on_header_for_proposal(&header).expect("I'm a proposer"); - - self.step = TendermintState::ProposeWaitImported { - block: Box::new(sealed_block.clone()), + self.vote_on_header_for_proposal(target_priority_info.clone(), &header).expect("I'm a proposer"); + self.step.wait_imported(target_priority_info, sealed_block.clone()); + } + Some((_, expected_parent_hash)) => ctrace!( + ENGINE, + "Generated parent hash({:?}) is different from the expected one in this round({:?}). The generated block may be an old one", + parent_hash, + expected_parent_hash + ), + _ => ctrace!(ENGINE, "Block generation is unexpected in this round. The generated block may be an old one"), }; } @@ -1276,23 +1271,15 @@ impl Worker { return } - let next_step = match self.step { - TendermintState::Propose => { - cinfo!(ENGINE, "Propose timeout."); + let next_step = match &self.step { + TendermintState::Propose(inner) => { + if let Some(wait_block_generation) = inner.get_wait_block_generation() { + cwarn!(ENGINE, "Propose timed out but block {:?} is not generated yet", wait_block_generation); + return + } + cinfo!(ENGINE, "Propose timed out"); TendermintState::Prevote } - TendermintState::ProposeWaitBlockGeneration { - .. - } => { - cwarn!(ENGINE, "Propose timed out but block is not generated yet"); - return - } - TendermintState::ProposeWaitImported { - .. - } => { - cwarn!(ENGINE, "Propose timed out but still waiting for the block imported"); - return - } TendermintState::Prevote if self.has_enough_any_votes() => { cinfo!(ENGINE, "Prevote timeout."); TendermintState::Precommit @@ -1304,7 +1291,7 @@ impl Worker { TendermintState::Precommit if self.has_enough_any_votes() => { cinfo!(ENGINE, "Precommit timeout."); self.increment_view(1); - TendermintState::Propose + TendermintState::new_propose_step() } TendermintState::Precommit => { cinfo!(ENGINE, "Precommit timeout without enough votes."); @@ -1316,20 +1303,20 @@ impl Worker { } => { cinfo!(ENGINE, "Commit timeout."); - let proposal_imported = self.client().block(&block_hash.into()).is_some(); + let proposal_imported = self.client().block(&(*block_hash).into()).is_some(); let best_block_header = self.client().best_block_header(); - if !proposal_imported || best_block_header.hash() != block_hash { + if !proposal_imported || best_block_header.hash() != *block_hash { cwarn!(ENGINE, "Best chain is not updated yet, wait until imported"); self.step = TendermintState::CommitTimedout { - block_hash, - view, + block_hash: *block_hash, + view: *view, }; return } self.move_to_the_next_height(); - TendermintState::Propose + TendermintState::new_propose_step() } TendermintState::CommitTimedout { .. @@ -1478,7 +1465,7 @@ impl Worker { fn repropose_block(&mut self, priority_info: PriorityInfo, block: encoded::Block) { let header = block.decode_header(); - self.vote_on_header_for_proposal(&header).expect("I am eligible to be a proposer"); + self.vote_on_header_for_proposal(priority_info.clone(), &header).expect("I am eligible to be a proposer"); debug_assert_eq!(self.client().block_status(&header.hash().into()), BlockStatus::InChain); self.broadcast_proposal_block(self.view, priority_info, block); } @@ -1532,12 +1519,16 @@ impl Worker { Ok(Some(vote)) } - fn vote_on_header_for_proposal(&mut self, header: &Header) -> Result { + fn vote_on_header_for_proposal( + &mut self, + priority_info: PriorityInfo, + header: &Header, + ) -> Result { assert!(header.number() == self.height); let signer_index = self.signer_index().expect("I am a validator"); let on = VoteOn { - step: VoteStep::new(self.height, self.view, Step::Propose), + step: self.current_sortition_round().into(), block_hash: Some(header.hash()), }; assert!(self.vote_regression_checker.check(&on), "Vote should not regress"); @@ -1550,6 +1541,7 @@ impl Worker { on, }; + self.votes.collect_priority(self.current_sortition_round(), priority_info); self.votes.collect(vote.clone()).expect("Must not attempt double vote on proposal"); cinfo!(ENGINE, "Voted {:?} as {}th proposer.", vote, signer_index); Ok(vote) @@ -1600,7 +1592,7 @@ impl Worker { cdebug!(ENGINE, "Committed block {} is now the best block", committed_block_hash); if self.can_move_from_commit_to_propose() { self.move_to_the_next_height(); - self.move_to_step(TendermintState::Propose, false); + self.move_to_step(TendermintState::new_propose_step(), false); return } } @@ -1632,7 +1624,7 @@ impl Worker { } } if height_at_begin != self.height { - self.move_to_step(TendermintState::Propose, false); + self.move_to_step(TendermintState::new_propose_step(), false); } if let Some(last_proposal_header) = last_proposal_header { self.on_imported_proposal(&last_proposal_header); From d40b1f6cbf660b23b021a9c24c71e7bc88b56741 Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Fri, 6 Dec 2019 17:08:51 +0900 Subject: [PATCH 08/13] Fix testhelper types --- test/src/helper/mock/index.ts | 7 +- test/src/helper/mock/tendermintMessage.ts | 95 +++++++++++++++++++---- 2 files changed, 86 insertions(+), 16 deletions(-) diff --git a/test/src/helper/mock/index.ts b/test/src/helper/mock/index.ts index ba9b316fc3..68d258ae88 100644 --- a/test/src/helper/mock/index.ts +++ b/test/src/helper/mock/index.ts @@ -611,6 +611,7 @@ export class Mock { new TendermintMessage({ type: "proposalblock", view: message.view, + priorityInfo: message.priorityInfo, message: RLP.encode([newHeader, block[1]]), signature: newSignature.r + newSignature.s }) @@ -623,8 +624,10 @@ export class Mock { this.sendTendermintMessage( new TendermintMessage({ type: "requestproposal", - height: message.voteStep.height, - view: message.voteStep.view + round: { + height: message.voteStep.height, + view: message.voteStep.view + } }) ); }, 200); diff --git a/test/src/helper/mock/tendermintMessage.ts b/test/src/helper/mock/tendermintMessage.ts index 07d7c8e86b..cadb017d4d 100644 --- a/test/src/helper/mock/tendermintMessage.ts +++ b/test/src/helper/mock/tendermintMessage.ts @@ -44,6 +44,19 @@ interface VoteStep { step: Step; } +interface SortitionRound { + height: number; + view: number; +} + +interface PriorityInfo { + signerIdx: number; + priority: H256; + subUserIdx: number; + numberOfElections: number; + vrfProof: Buffer; +} + export interface ConsensusMessage { type: "consensusmessage"; messages: Array<{ @@ -59,14 +72,20 @@ export interface ConsensusMessage { export interface ProposalBlock { type: "proposalblock"; signature: string; + priorityInfo: PriorityInfo; view: number; message: Buffer; } +interface ProposalSummary { + priorityInfo: PriorityInfo; + blockHash: H256; +} + export interface StepState { type: "stepstate"; voteStep: VoteStep; - proposal: H256 | null; + proposal: ProposalSummary | null; lockView: number | null; knownVotes: Buffer; } @@ -79,8 +98,7 @@ export interface RequestMessage { export interface RequestProposal { type: "requestproposal"; - height: number; - view: number; + round: SortitionRound; } type MessageBody = @@ -124,8 +142,15 @@ export class TendermintMessage { message = { type: "proposalblock", signature: decoded[1].toString("hex"), - view: readUIntRLP(decoded[2]), - message: uncompressSync(decoded[3]) + priorityInfo: { + signerIdx: readUIntRLP(decoded[2][0]), + priority: new H256(decoded[2][1].toString("hex")), + subUserIdx: readUIntRLP(decoded[2][2]), + numberOfElections: readUIntRLP(decoded[2][3]), + vrfProof: decoded[2][4] + }, + view: readUIntRLP(decoded[3]), + message: uncompressSync(decoded[4]) }; break; } @@ -137,10 +162,20 @@ export class TendermintMessage { view: readUIntRLP(decoded[1][1]), step: readUIntRLP(decoded[1][2]) as Step }, - proposal: readOptionalRlp( - decoded[2], - buffer => new H256(buffer.toString("hex")) - ), + proposal: readOptionalRlp(decoded[2], (buffer: any) => { + return { + priorityInfo: { + signerIdx: readUIntRLP(buffer[0][0]), + priority: new H256( + buffer[0][1].toString("hex") + ), + subUserIdx: readUIntRLP(buffer[0][2]), + numberOfElections: readUIntRLP(buffer[0][3]), + vrfProof: buffer[0][4] + }, + blockHash: new H256(buffer[1].toString("hex")) + }; + }), lockView: readOptionalRlp(decoded[3], readUIntRLP), knownVotes: decoded[4] }; @@ -161,8 +196,10 @@ export class TendermintMessage { case MessageType.MESSAGE_ID_REQUEST_PROPOSAL: { message = { type: "requestproposal", - height: readUIntRLP(decoded[1]), - view: readUIntRLP(decoded[2]) + round: { + height: readUIntRLP(decoded[1][0]), + view: readUIntRLP(decoded[1][1]) + } }; break; } @@ -210,6 +247,19 @@ export class TendermintMessage { return [ MessageType.MESSAGE_ID_PROPOSAL_BLOCK, Buffer.from(this.body.signature, "hex"), + [ + new U64( + this.body.priorityInfo.signerIdx + ).toEncodeObject(), + this.body.priorityInfo.priority.toEncodeObject(), + new U64( + this.body.priorityInfo.subUserIdx + ).toEncodeObject(), + new U64( + this.body.priorityInfo.numberOfElections + ).toEncodeObject(), + this.body.priorityInfo.vrfProof + ], new U64(this.body.view).toEncodeObject(), compressSync(this.body.message) ]; @@ -224,7 +274,22 @@ export class TendermintMessage { ], this.body.proposal == null ? [] - : [this.body.proposal.toEncodeObject()], + : [ + [ + new U64( + this.body.proposal.priorityInfo.signerIdx + ).toEncodeObject(), + this.body.proposal.priorityInfo.priority.toEncodeObject(), + new U64( + this.body.proposal.priorityInfo.subUserIdx + ).toEncodeObject(), + new U64( + this.body.proposal.priorityInfo.numberOfElections + ).toEncodeObject(), + this.body.proposal.priorityInfo.vrfProof + ], + this.body.proposal.blockHash.toEncodeObject() + ], this.body.lockView == null ? [] : [new U64(this.body.lockView).toEncodeObject()], @@ -245,8 +310,10 @@ export class TendermintMessage { case "requestproposal": { return [ MessageType.MESSAGE_ID_REQUEST_PROPOSAL, - new U64(this.body.height).toEncodeObject(), - new U64(this.body.view).toEncodeObject() + [ + new U64(this.body.round.height).toEncodeObject(), + new U64(this.body.round.view).toEncodeObject() + ] ]; } } From de53612c78ae64f29389613236dd1705c4fd5162 Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Wed, 11 Dec 2019 12:14:27 +0900 Subject: [PATCH 09/13] Modify Backup and Restore --- core/src/consensus/tendermint/backup.rs | 120 ++++++++---------- .../consensus/tendermint/vote_collector.rs | 19 +++ core/src/consensus/tendermint/worker.rs | 14 +- 3 files changed, 83 insertions(+), 70 deletions(-) diff --git a/core/src/consensus/tendermint/backup.rs b/core/src/consensus/tendermint/backup.rs index 9465854848..95b8c879e8 100644 --- a/core/src/consensus/tendermint/backup.rs +++ b/core/src/consensus/tendermint/backup.rs @@ -14,41 +14,77 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use ctypes::BlockHash; use kvdb::{DBTransaction, KeyValueDB}; +use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream}; -use super::message::ConsensusMessage; +use super::message::{ConsensusMessage, SortitionRound}; use super::types::{Height, Step, View}; +use crate::consensus::PriorityInfo; use crate::db; use crate::db_version; const BACKUP_KEY: &[u8] = b"tendermint-backup"; const BACKUP_VERSION: u32 = 1; +pub struct PriorityInfoProjection(pub (SortitionRound, PriorityInfo)); + +impl PriorityInfoProjection { + pub fn get_view(&self) -> PriorityInfoProjectionView { + let (ref sortition_round, ref priority_info) = self.0; + PriorityInfoProjectionView((sortition_round, priority_info)) + } +} + +impl Decodable for PriorityInfoProjection { + fn decode(rlp: &Rlp) -> Result { + let item_count = rlp.item_count()?; + if item_count != 2 { + Err(DecoderError::RlpIncorrectListLen { + got: item_count, + expected: 2, + }) + } else { + Ok(Self((rlp.val_at(0)?, rlp.val_at(1)?))) + } + } +} + +pub struct PriorityInfoProjectionView<'a>(pub (&'a SortitionRound, &'a PriorityInfo)); + +impl Encodable for PriorityInfoProjectionView<'_> { + fn rlp_append(&self, s: &mut RlpStream) { + let (sortition_round, priority_info) = self.0; + s.begin_list(2).append(sortition_round).append(priority_info); + } +} + pub struct BackupView<'a> { pub height: &'a Height, pub view: &'a View, pub step: &'a Step, pub votes: &'a [ConsensusMessage], + pub priority_infos: &'a [PriorityInfoProjectionView<'a>], pub finalized_view_of_previous_block: &'a View, pub finalized_view_of_current_block: &'a Option, } +#[derive(RlpDecodable)] pub struct BackupDataV0 { pub height: Height, pub view: View, pub step: Step, pub votes: Vec, - pub proposal: Option, + pub priority_infos: Vec, pub last_confirmed_view: View, } +#[derive(RlpDecodable)] pub struct BackupDataV1 { pub height: Height, pub view: View, pub step: Step, pub votes: Vec, - pub proposal: Option, + pub priority_infos: Vec, pub finalized_view_of_previous_block: View, pub finalized_view_of_current_block: Option, } @@ -59,12 +95,14 @@ pub fn backup(db: &dyn KeyValueDB, backup_data: BackupView) { view, step, votes, + priority_infos, finalized_view_of_previous_block, finalized_view_of_current_block, } = backup_data; let mut s = rlp::RlpStream::new(); - s.begin_list(6); + s.begin_list(7); s.append(height).append(view).append(step).append_list(votes); + s.append_list(priority_infos); s.append(finalized_view_of_previous_block); s.append(finalized_view_of_current_block); @@ -83,19 +121,7 @@ pub fn restore(db: &dyn KeyValueDB) -> Option { if version < BACKUP_VERSION { migrate(db); } - load_v1(db) -} - -fn find_proposal(votes: &[ConsensusMessage], height: Height, view: View) -> Option { - votes - .iter() - .rev() - .map(|vote| &vote.on) - .find(|vote_on| { - vote_on.step.step == Step::Propose && vote_on.step.view == view && vote_on.step.height == height - }) - .map(|vote_on| vote_on.block_hash) - .unwrap_or(None) + load_with_version::(db) } fn migrate(db: &dyn KeyValueDB) { @@ -114,7 +140,7 @@ fn migrate(db: &dyn KeyValueDB) { } fn migrate_from_0_to_1(db: &dyn KeyValueDB) { - let v0 = if let Some(v0) = load_v0(db) { + let v0 = if let Some(v0) = load_with_version::(db) { v0 } else { return @@ -125,7 +151,7 @@ fn migrate_from_0_to_1(db: &dyn KeyValueDB) { view: v0.view, step: v0.step, votes: v0.votes, - proposal: v0.proposal, + priority_infos: v0.priority_infos, // This is not a correct behavior if step == Step::Commit. // In Commit state, the Tendermint module overwrote the last_confirmed_view to finalized_view_of_current_block. // So we can't restore finalized_view_of_previous block. @@ -142,59 +168,15 @@ fn migrate_from_0_to_1(db: &dyn KeyValueDB) { view: &v1.view, step: &v1.step, votes: &v1.votes, + priority_infos: &v1.priority_infos.iter().map(|projection| projection.get_view()).collect::>(), finalized_view_of_previous_block: &v1.finalized_view_of_previous_block, finalized_view_of_current_block: &v1.finalized_view_of_current_block, }) } -fn load_v0(db: &dyn KeyValueDB) -> Option { - let value = db.get(db::COL_EXTRA, BACKUP_KEY).expect("Low level database error. Some issue with disk?"); - let (height, view, step, votes, last_confirmed_view) = value.map(|bytes| { - let rlp = rlp::Rlp::new(&bytes); - ( - rlp.val_at(0).unwrap(), - rlp.val_at(1).unwrap(), - rlp.val_at(2).unwrap(), - rlp.at(3).unwrap().as_list().unwrap(), - rlp.val_at(4).unwrap(), - ) - })?; - - let proposal = find_proposal(&votes, height, view); - - Some(BackupDataV0 { - height, - view, - step, - votes, - proposal, - last_confirmed_view, - }) -} - -fn load_v1(db: &dyn KeyValueDB) -> Option { - #[derive(RlpDecodable)] - struct Backup { - height: Height, - view: View, - step: Step, - votes: Vec, - finalized_view_of_previous_block: View, - finalized_view_of_current_block: Option, - } - +fn load_with_version(db: &dyn KeyValueDB) -> Option { let value = db.get(db::COL_EXTRA, BACKUP_KEY).expect("Low level database error. Some issue with disk?")?; - let backup: Backup = rlp::decode(&value).unwrap(); - - let proposal = find_proposal(&backup.votes, backup.height, backup.view); - - Some(BackupDataV1 { - height: backup.height, - view: backup.view, - step: backup.step, - votes: backup.votes, - proposal, - finalized_view_of_previous_block: backup.finalized_view_of_previous_block, - finalized_view_of_current_block: backup.finalized_view_of_current_block, - }) + let backup: T = rlp::decode(&value).unwrap(); + + Some(backup) } diff --git a/core/src/consensus/tendermint/vote_collector.rs b/core/src/consensus/tendermint/vote_collector.rs index 23289ec677..defb3c203a 100644 --- a/core/src/consensus/tendermint/vote_collector.rs +++ b/core/src/consensus/tendermint/vote_collector.rs @@ -138,6 +138,10 @@ impl PriorityCollector { fn iter_from_highest(&self) -> Rev> { self.priorities.iter().rev() } + + fn iter(&self) -> Iter { + self.priorities.iter() + } } impl MessageCollector { @@ -377,4 +381,19 @@ impl VoteCollector { None => vec![], } } + + pub fn get_all_priority_infos(&self) -> Vec<(SortitionRound, PriorityInfo)> { + self.votes + .iter() + .filter(|(vote_step, _)| vote_step.step == Step::Propose) + .flat_map(|(&vote_step, collector)| { + let round: SortitionRound = vote_step.into(); + collector + .priority_collector() + .iter() + .map(|priority_info| (round, priority_info.clone())) + .collect::>() + }) + .collect() + } } diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index 751056686c..03f60c5a70 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -32,7 +32,7 @@ use rlp::{Encodable, Rlp}; use vrf::openssl::{CipherSuite, ECVRF}; use super::super::BitSet; -use super::backup::{backup, restore, BackupView}; +use super::backup::{backup, restore, BackupView, PriorityInfoProjection, PriorityInfoProjectionView}; use super::message::*; use super::network; use super::params::TimeGapParams; @@ -1013,11 +1013,18 @@ impl Worker { } fn backup(&self) { + let priority_infos = self.votes.get_all_priority_infos(); + let priority_info_projection_views = priority_infos + .iter() + .map(|(round, priority_info)| PriorityInfoProjectionView((&round, &priority_info))) + .collect::>(); + backup(self.client().get_kvdb().as_ref(), BackupView { height: &self.height, view: &self.view, step: &self.step.to_step(), votes: &self.votes.get_all(), + priority_infos: &priority_info_projection_views, finalized_view_of_previous_block: &self.finalized_view_of_previous_block, finalized_view_of_current_block: &self.finalized_view_of_current_block, }); @@ -1039,6 +1046,11 @@ impl Worker { self.step = backup_step; self.height = backup.height; self.view = backup.view; + backup.priority_infos.into_iter().for_each(|projection| match projection { + PriorityInfoProjection((round, priority_info)) => { + self.votes.collect_priority(round, priority_info); + } + }); self.finalized_view_of_previous_block = backup.finalized_view_of_previous_block; self.finalized_view_of_current_block = backup.finalized_view_of_current_block; From a2359917dced32735d80127271c8f098e003d6f0 Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Wed, 11 Dec 2019 18:09:04 +0900 Subject: [PATCH 10/13] Switch jail function by CommonParams --- core/src/consensus/stake/mod.rs | 20 +++++++++++--------- types/src/common_params.rs | 8 +------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/core/src/consensus/stake/mod.rs b/core/src/consensus/stake/mod.rs index 02789499fe..bd33db2963 100644 --- a/core/src/consensus/stake/mod.rs +++ b/core/src/consensus/stake/mod.rs @@ -392,17 +392,16 @@ pub fn on_term_close( let current_term = metadata.current_term_id(); ctrace!(ENGINE, "on_term_close. current_term: {}", current_term); + let metadata = metadata.params().expect( + "Term close events can be called after the ChangeParams called, \ + so the metadata always has CommonParams", + ); + let custody_period = metadata.custody_period(); + let release_period = metadata.release_period(); + let (nomination_expiration, custody_until, kick_at) = { - let metadata = metadata.params().expect( - "Term close events can be called after the ChangeParams called, \ - so the metadata always has CommonParams", - ); let nomination_expiration = metadata.nomination_expiration(); assert_ne!(0, nomination_expiration); - let custody_period = metadata.custody_period(); - assert_ne!(0, custody_period); - let release_period = metadata.release_period(); - assert_ne!(0, release_period); (nomination_expiration, current_term + custody_period, current_term + release_period) }; @@ -412,7 +411,10 @@ pub fn on_term_close( let reverted: Vec<_> = expired.into_iter().chain(released).collect(); revert_delegations(state, &reverted)?; - jail(state, inactive_validators, custody_until, kick_at)?; + let jail_enabled = custody_period != 0 || release_period != 0; + if jail_enabled { + jail(state, inactive_validators, custody_until, kick_at)?; + } let validators = Validators::elect(state)?; validators.save_to_state(state)?; diff --git a/types/src/common_params.rs b/types/src/common_params.rs index e59c1eab9a..8d289dc425 100644 --- a/types/src/common_params.rs +++ b/types/src/common_params.rs @@ -181,12 +181,6 @@ impl CommonParams { if self.nomination_expiration == 0 { return Err("You should set the nomination expiration".to_string()) } - if self.custody_period == 0 { - return Err("You should set the custody period".to_string()) - } - if self.release_period == 0 { - return Err("You should set the release period".to_string()) - } if self.max_num_of_validators == 0 { return Err("You should set the maximum number of validators".to_string()) } @@ -205,7 +199,7 @@ impl CommonParams { self.min_num_of_validators, self.max_num_of_validators )) } - if self.custody_period >= self.release_period { + if self.custody_period > self.release_period { return Err(format!( "The release period({}) should be longer than the custody period({})", self.release_period, self.custody_period From 3f0524c9ce60210fa517d1f7ded87be800f326c8 Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Wed, 11 Dec 2019 20:48:07 +0900 Subject: [PATCH 11/13] Fix the test code --- test/src/e2e.dynval/1/dv.changeParams.test.ts | 12 ++++++------ test/src/e2e.dynval/1/dv.m-m'.test.ts | 7 +++++++ test/src/e2e.dynval/1/dv.n'.test.ts | 8 ++++++++ test/src/e2e.dynval/1/dv.n-1.test.ts | 4 ++++ test/src/e2e.dynval/2/dv.shutdown.test.ts | 12 ++++++++---- test/src/e2e.dynval/setup.ts | 8 ++++---- test/src/e2e.long/mempoolMinfee.test.ts | 2 +- test/src/e2e.long/staking.test.ts | 4 ++-- test/src/e2e/changeParams.test.ts | 6 +++--- test/src/scheme/tendermint-dynval.json | 2 +- test/src/scheme/tendermint-int.json | 2 +- 11 files changed, 45 insertions(+), 22 deletions(-) diff --git a/test/src/e2e.dynval/1/dv.changeParams.test.ts b/test/src/e2e.dynval/1/dv.changeParams.test.ts index 82ad2c8510..d815fd77b4 100644 --- a/test/src/e2e.dynval/1/dv.changeParams.test.ts +++ b/test/src/e2e.dynval/1/dv.changeParams.test.ts @@ -192,10 +192,10 @@ describe("Change commonParams that doesn't affects validator set", function() { it("should be applied after a term seconds", async function() { const initialTermSeconds = initialParams.termSeconds; const newTermSeconds = 5; - const margin = 1.3; + const margin = 1.5; this.slow((initialTermSeconds + newTermSeconds) * 1000 * margin); - this.timeout((initialTermSeconds + newTermSeconds) * 1000 * 2); + this.timeout((initialTermSeconds + newTermSeconds) * 1000 * 2.5); const term1Metadata = (await stake.getTermMetadata(nodes[0].sdk))!; { @@ -247,8 +247,8 @@ describe("Change commonParams that doesn't affects validator set", function() { it("Change minimum fee of pay transaction", async function() { const checkingNode = nodes[0]; - this.slow(4_000); - this.timeout(6_000); + this.slow(6_000); + this.timeout(12_000); const changeTxHash = await changeParams(checkingNode, 1, { ...initialParams, @@ -284,7 +284,7 @@ describe("Change commonParams that doesn't affects validator set", function() { it("Should apply larger metadata limit after increment", async function() { this.slow(6_000); - this.timeout(9_000); + this.timeout(12_000); const alice = validators[0]; const checkingNode = nodes[0]; @@ -321,7 +321,7 @@ describe("Change commonParams that doesn't affects validator set", function() { it("Should apply smaller metadata limit after decrement", async function() { this.slow(6_000); - this.timeout(9_000); + this.timeout(12_000); const alice = validators[0]; const checkingNode = nodes[0]; diff --git a/test/src/e2e.dynval/1/dv.m-m'.test.ts b/test/src/e2e.dynval/1/dv.m-m'.test.ts index 695d6d2f09..02739049a6 100644 --- a/test/src/e2e.dynval/1/dv.m-m'.test.ts +++ b/test/src/e2e.dynval/1/dv.m-m'.test.ts @@ -97,6 +97,13 @@ describe("Dynamic Validator M -> M' (Changed the subset, M, M’ = maximum numbe describe("1. Jail one of the validator", async function() { const { nodes } = withNodes(this, { ...nodeParams, + overrideParams: { + maxNumOfValidators, + delegationThreshold: 1000, + minDeposit: 10000, + custodyPeriod: 10, + releasePeriod: 30 + }, onBeforeEnable: async bootstrappingNodes => { await bootstrappingNodes[alice].clean(); // alice will be jailed! } diff --git a/test/src/e2e.dynval/1/dv.n'.test.ts b/test/src/e2e.dynval/1/dv.n'.test.ts index 2c2518d3c6..fa0ac0ea60 100644 --- a/test/src/e2e.dynval/1/dv.n'.test.ts +++ b/test/src/e2e.dynval/1/dv.n'.test.ts @@ -52,6 +52,10 @@ describe("Dynamic Validator N -> N'", function() { const betty = validators[4]; const { nodes } = withNodes(this, { promiseExpect, + overrideParams: { + custodyPeriod: 10, + releasePeriod: 30 + }, validators: [ { signer: validators[0], delegation: 4200, deposit: 100000 }, { signer: validators[1], delegation: 4100, deposit: 100000 }, @@ -112,6 +116,10 @@ describe("Dynamic Validator N -> N'", function() { const betty = validators[4]; const { nodes } = withNodes(this, { promiseExpect, + overrideParams: { + custodyPeriod: 10, + releasePeriod: 30 + }, validators: [ { signer: validators[0], delegation: 4200, deposit: 100000 }, { signer: validators[1], delegation: 4100, deposit: 100000 }, diff --git a/test/src/e2e.dynval/1/dv.n-1.test.ts b/test/src/e2e.dynval/1/dv.n-1.test.ts index 2ac409c63f..f85eb13796 100644 --- a/test/src/e2e.dynval/1/dv.n-1.test.ts +++ b/test/src/e2e.dynval/1/dv.n-1.test.ts @@ -71,6 +71,10 @@ describe("Dynamic Validator N -> N-1", function() { describe("A node is imprisoned to jail", async function() { const { nodes } = withNodes(this, { promiseExpect, + overrideParams: { + custodyPeriod: 10, + releasePeriod: 30 + }, validators: allDynValidators.map((signer, index) => ({ signer, delegation: 5_000, diff --git a/test/src/e2e.dynval/2/dv.shutdown.test.ts b/test/src/e2e.dynval/2/dv.shutdown.test.ts index 8717de37f0..cad8d3e7e7 100644 --- a/test/src/e2e.dynval/2/dv.shutdown.test.ts +++ b/test/src/e2e.dynval/2/dv.shutdown.test.ts @@ -52,7 +52,9 @@ describe("Shutdown test", function() { overrideParams: { minNumOfValidators: 4, maxNumOfValidators: 8, - delegationThreshold: 1 + delegationThreshold: 1, + custodyPeriod: 10, + releasePeriod: 30 }, validators: [ // Observer: no self-nomination, no-deposit @@ -149,7 +151,7 @@ describe("Shutdown test", function() { .and.to.include.members(getAlphaBetas().addrs); } - await termWaiter.waitForTermPeriods(1, 0.5); + await termWaiter.waitForTermPeriods(1, 2); // Revival await Promise.all(getAlphaBetas().nodes.map(node => node.start())); await fullyConnect(nodes, promiseExpect); @@ -196,7 +198,9 @@ describe("Shutdown test", function() { overrideParams: { minNumOfValidators: 3, maxNumOfValidators: 3, - delegationThreshold: 1 + delegationThreshold: 1, + custodyPeriod: 10, + releasePeriod: 30 }, validators: [ // Observer: no self-nomination, no deposit @@ -239,7 +243,7 @@ describe("Shutdown test", function() { .and.to.include.members(getValidators().addrs); } - await termWaiter.waitForTermPeriods(2, 0.5); + await termWaiter.waitForTermPeriods(2, 2); // Revival await Promise.all(getValidators().nodes.map(node => node.start())); await fullyConnect(nodes, promiseExpect); diff --git a/test/src/e2e.dynval/setup.ts b/test/src/e2e.dynval/setup.ts index 493f6ef602..60242269bd 100644 --- a/test/src/e2e.dynval/setup.ts +++ b/test/src/e2e.dynval/setup.ts @@ -354,8 +354,8 @@ export const defaultParams = { termSeconds: 15, nominationExpiration: 10, - custodyPeriod: 10, - releasePeriod: 30, + custodyPeriod: 0, + releasePeriod: 0, maxNumOfValidators: 5, minNumOfValidators: 3, delegationThreshold: 1000, @@ -465,7 +465,7 @@ export function setTermTestTimeout( ): TermWaiter { const { terms, params: { termSeconds } = defaultParams } = options; const slowMargin = 0.5; - const timeoutMargin = 2.0; + const timeoutMargin = 4.0; context.slow(termSeconds * (terms + slowMargin) * 1000); context.timeout(termSeconds * (terms + timeoutMargin) * 1000); function termPeriodsToTime(termPeriods: number, margin: number): number { @@ -485,7 +485,7 @@ export function setTermTestTimeout( ) { await node.waitForTermChange( waiterParams.target, - termPeriodsToTime(waiterParams.termPeriods, 0.5) + termPeriodsToTime(waiterParams.termPeriods, 2) ); } }; diff --git a/test/src/e2e.long/mempoolMinfee.test.ts b/test/src/e2e.long/mempoolMinfee.test.ts index ef6a0c5a49..9f42e57758 100644 --- a/test/src/e2e.long/mempoolMinfee.test.ts +++ b/test/src/e2e.long/mempoolMinfee.test.ts @@ -157,7 +157,7 @@ describe("MemPoolMinFees", async function() { recipient: validator0Address }); - await Promise.all(nodeArray.map(node => node.waitBlockNumber(4))); + await Promise.all(nodeArray.map(node => node.waitBlockNumber(3))); const expectedTrues = await Promise.all( nodeArray.map(node => node.sdk.rpc.chain.containsTransaction( diff --git a/test/src/e2e.long/staking.test.ts b/test/src/e2e.long/staking.test.ts index 2d5510f2e4..939c1c4a0f 100644 --- a/test/src/e2e.long/staking.test.ts +++ b/test/src/e2e.long/staking.test.ts @@ -37,12 +37,12 @@ import CodeChain from "../helper/spawn"; const RLP = require("rlp"); describe("Staking", function() { - this.timeout(60_000); + this.timeout(80_000); const promiseExpect = new PromiseExpect(); let nodes: CodeChain[]; beforeEach(async function() { - this.timeout(60_000); + this.timeout(80_000); const validatorAddresses = [ validator0Address, diff --git a/test/src/e2e/changeParams.test.ts b/test/src/e2e/changeParams.test.ts index af676236da..a9d856367b 100644 --- a/test/src/e2e/changeParams.test.ts +++ b/test/src/e2e/changeParams.test.ts @@ -819,7 +819,7 @@ describe("ChangeParams", function() { ).rejectedWith(/nomination expiration/); }); - it("custody period cannot be zero", async function() { + it.skip("custody period cannot be zero", async function() { const newParams = [ 0x20, // maxExtraDataSize 0x0400, // maxAssetSchemeMetadataSize @@ -882,7 +882,7 @@ describe("ChangeParams", function() { ).rejectedWith(/custody period/); }); - it("release period cannot be zero", async function() { + it.skip("release period cannot be zero", async function() { const newParams = [ 0x20, // maxExtraDataSize 0x0400, // maxAssetSchemeMetadataSize @@ -945,7 +945,7 @@ describe("ChangeParams", function() { ).rejectedWith(/release period/); }); - it("A release period cannot be equal to a custody period", async function() { + it.skip("A release period cannot be equal to a custody period", async function() { const newParams = [ 0x20, // maxExtraDataSize 0x0400, // maxAssetSchemeMetadataSize diff --git a/test/src/scheme/tendermint-dynval.json b/test/src/scheme/tendermint-dynval.json index ccadaa520d..0eefae40e4 100644 --- a/test/src/scheme/tendermint-dynval.json +++ b/test/src/scheme/tendermint-dynval.json @@ -9,7 +9,7 @@ "0xdb3a858d2bafd2cb5382fcf366b847a86b58b42ce1fc29fec0cb0315af881a2ad495045adbdbc86ef7a777b541c4e62a0747f25ff6068a5ec3a052c690c4ff8a", "0x42829b18de338aa3abf5e6d80cd511121bf9d34be9a135bbace32a3226479e7f3bb6af76c11dcc724a1666a22910d756b075d54d8fdd97be11efd7a0ac3bb222" ], - "timeoutPropose": 2000, + "timeoutPropose": 1500, "timeoutProposeDelta": 100, "timeoutPrevote": 1000, "timeoutPrevoteDelta": 100, diff --git a/test/src/scheme/tendermint-int.json b/test/src/scheme/tendermint-int.json index 92a0828bcb..3b45ac4907 100644 --- a/test/src/scheme/tendermint-int.json +++ b/test/src/scheme/tendermint-int.json @@ -9,7 +9,7 @@ "0xdb3a858d2bafd2cb5382fcf366b847a86b58b42ce1fc29fec0cb0315af881a2ad495045adbdbc86ef7a777b541c4e62a0747f25ff6068a5ec3a052c690c4ff8a", "0x42829b18de338aa3abf5e6d80cd511121bf9d34be9a135bbace32a3226479e7f3bb6af76c11dcc724a1666a22910d756b075d54d8fdd97be11efd7a0ac3bb222" ], - "timeoutPropose": 1000, + "timeoutPropose": 1500, "timeoutProposeDelta": 1000, "timeoutPrevote": 1000, "timeoutPrevoteDelta": 1000, From ad6cd2671600c02d9ede490bcda56344d7b6f7ba Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Mon, 16 Dec 2019 21:27:51 +0900 Subject: [PATCH 12/13] Remove needless weight data from Validator In randomized leader election environment, CodeChain doesn't need to manage weight values. Previously, it is utilized to guarantee stake-proportional leader election but now the property comes from the nature of the probability model. --- core/src/consensus/stake/action_data.rs | 34 ------------------------- core/src/consensus/stake/mod.rs | 6 ----- core/src/consensus/tendermint/engine.rs | 1 - 3 files changed, 41 deletions(-) diff --git a/core/src/consensus/stake/action_data.rs b/core/src/consensus/stake/action_data.rs index acc1add3b3..5841d06ac5 100644 --- a/core/src/consensus/stake/action_data.rs +++ b/core/src/consensus/stake/action_data.rs @@ -236,7 +236,6 @@ impl<'a> Delegation<'a> { #[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd, RlpDecodable, RlpEncodable)] pub struct Validator { - weight: StakeQuantity, delegation: StakeQuantity, deposit: Deposit, pubkey: Public, @@ -245,7 +244,6 @@ pub struct Validator { impl Validator { pub fn new_for_test(delegation: StakeQuantity, deposit: Deposit, pubkey: Public) -> Self { Self { - weight: delegation, delegation, deposit, pubkey, @@ -254,17 +252,12 @@ impl Validator { fn new(delegation: StakeQuantity, deposit: Deposit, pubkey: Public) -> Self { Self { - weight: delegation, delegation, deposit, pubkey, } } - fn reset(&mut self) { - self.weight = self.delegation; - } - pub fn pubkey(&self) -> &Public { &self.pubkey } @@ -345,28 +338,6 @@ impl Validators { Ok(()) } - pub fn update_weight(&mut self, block_author: &Address) { - let min_delegation = self.min_delegation(); - for Validator { - weight, - pubkey, - .. - } in self.0.iter_mut().rev() - { - if public_to_address(pubkey) == *block_author { - // block author - *weight = weight.saturating_sub(min_delegation); - break - } - // neglecting validators - *weight = weight.saturating_sub(min_delegation * 2); - } - if self.0.iter().all(|validator| validator.weight == 0) { - self.0.iter_mut().for_each(Validator::reset); - } - self.0.sort_unstable(); - } - pub fn remove(&mut self, target: &Address) { self.0.retain( |Validator { @@ -379,10 +350,6 @@ impl Validators { pub fn delegation(&self, pubkey: &Public) -> Option { self.0.iter().find(|validator| validator.pubkey == *pubkey).map(|&validator| validator.delegation) } - - fn min_delegation(&self) -> StakeQuantity { - self.0.iter().map(|&validator| validator.delegation).min().expect("There must be at least one validators") - } } impl Deref for Validators { @@ -1758,7 +1725,6 @@ mod tests { pubkey: *pubkey, deposit: 0, delegation: 0, - weight: 0, }) .collect(), ); diff --git a/core/src/consensus/stake/mod.rs b/core/src/consensus/stake/mod.rs index bd33db2963..8472e06ca2 100644 --- a/core/src/consensus/stake/mod.rs +++ b/core/src/consensus/stake/mod.rs @@ -341,12 +341,6 @@ pub fn move_current_to_previous_intermediate_rewards(state: &mut TopLevelState) rewards.save_to_state(state) } -pub fn update_validator_weights(state: &mut TopLevelState, block_author: &Address) -> StateResult<()> { - let mut validators = Validators::load_from_state(state)?; - validators.update_weight(block_author); - validators.save_to_state(state) -} - fn change_params( state: &mut TopLevelState, metadata_seq: u64, diff --git a/core/src/consensus/tendermint/engine.rs b/core/src/consensus/tendermint/engine.rs index dbfbf86198..6a39ef28c2 100644 --- a/core/src/consensus/tendermint/engine.rs +++ b/core/src/consensus/tendermint/engine.rs @@ -184,7 +184,6 @@ impl ConsensusEngine for Tendermint { self.machine.add_balance(block, &author, block_author_reward)?; } _ => { - stake::update_validator_weights(block.state_mut(), &author)?; stake::add_intermediate_rewards(block.state_mut(), author, block_author_reward)?; } } From 6cbf36af45a377e0c9decd49bdb1553dee5e98eb Mon Sep 17 00:00:00 2001 From: Seonpyo Kim Date: Tue, 17 Dec 2019 12:16:18 +0900 Subject: [PATCH 13/13] Add logging to get statistics --- core/src/consensus/tendermint/worker.rs | 28 +++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index 03f60c5a70..56d75a5120 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -709,20 +709,33 @@ impl Worker { let parent_block_hash = self.prev_block_hash(); if let Some(priority_info) = self.signer_priority_info(parent_block_hash) { if let TwoThirdsMajority::Lock(lock_view, locked_block_hash) = self.last_two_thirds_majority { - cinfo!(ENGINE, "I am eligible to be a proposer, I'll re-propose a locked block"); + cinfo!(ENGINE, "I am eligible to be a proposer, round-info {}-{} and {}. I'll re-propose a locked block with priority", + self.height, + self.view, + priority_info, + ); match self.locked_proposal_block(lock_view, locked_block_hash) { Ok(block) => self.repropose_block(priority_info, block), Err(error_msg) => cwarn!(ENGINE, "{}", error_msg), } } else { - cinfo!(ENGINE, "I am eligible to be a proposer, I'll create a block"); + cinfo!(ENGINE, "I am eligible to be a proposer, round-info {}-{} and {}. I'll create a block with priority", + self.height, + self.view, + priority_info, + ); self.update_sealing(parent_block_hash); self.step.wait_block_generation(priority_info, parent_block_hash); } } else { let sortition_round = vote_step.into(); let round_highest_priority = self.votes.get_highest_priority(sortition_round); - cinfo!(ENGINE, "I am not eligible to be a proposer, I'll request a proposal"); + cinfo!( + ENGINE, + "I am not eligible to be a proposer, round-info {}-{}. I'll request a proposal", + self.height, + self.view, + ); self.request_proposal_to_superiors(sortition_round, round_highest_priority); } } @@ -1733,7 +1746,14 @@ impl Worker { let block_view = BlockView::new(&bytes); let header_view = block_view.header(); let number = header_view.number(); - cinfo!(ENGINE, "Proposal received for {}-{:?}", number, header_view.hash()); + cinfo!( + ENGINE, + "Proposal received for ({},{})-{:?}. The priority info is {}", + number, + proposed_view, + header_view.hash(), + priority_info, + ); let parent_hash = header_view.parent_hash(); {