From f011220ea80502e4d41c1bf279929b9e341c3222 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 18 May 2023 19:51:39 +0400 Subject: [PATCH 1/3] handle goahead signal for unincluded segment --- pallets/parachain-system/src/lib.rs | 71 +++++++++++----- pallets/parachain-system/src/tests.rs | 55 ++++++++++++ .../src/unincluded_segment.rs | 84 ++++++++++++++++++- 3 files changed, 185 insertions(+), 25 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 8c8a57107dd..5e1eb12ec99 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -225,6 +225,7 @@ pub mod pallet { fn on_finalize(_: T::BlockNumber) { >::kill(); >::kill(); + let relay_upgrade_go_ahead = >::take(); assert!( >::exists(), @@ -338,20 +339,30 @@ pub mod pallet { .collect(); let used_bandwidth = UsedBandwidth { ump_msg_count, ump_total_bytes, hrmp_outgoing }; + + let mut aggregated_segment = + AggregatedUnincludedSegment::::get().unwrap_or_default(); + let consumed_go_ahead_signal = + if aggregated_segment.consumed_go_ahead_signal().is_some() { + // Some ancestor within the segment already processed this signal -- validated during + // inherent creation. + None + } else { + relay_upgrade_go_ahead + }; // The bandwidth constructed was ensured to satisfy relay chain constraints. - let ancestor = Ancestor::new_unchecked(used_bandwidth); + let ancestor = Ancestor::new_unchecked(used_bandwidth, consumed_go_ahead_signal); let watermark = HrmpWatermark::::get(); let watermark_update = HrmpWatermarkUpdate::new(watermark, LastRelayChainBlockNumber::::get()); - AggregatedUnincludedSegment::::mutate(|agg| { - let agg = agg.get_or_insert_with(SegmentTracker::default); - // TODO: In order of this panic to be correct, outbound message source should - // respect bandwidth limits as well. - // - agg.append(&ancestor, watermark_update, &total_bandwidth_out) - .expect("unincluded segment limits exceeded"); - }); + // TODO: In order of this panic to be correct, outbound message source should + // respect bandwidth limits as well. + // + aggregated_segment + .append(&ancestor, watermark_update, &total_bandwidth_out) + .expect("unincluded segment limits exceeded"); + AggregatedUnincludedSegment::::put(aggregated_segment); // Check in `on_initialize` guarantees there's space for this block. UnincludedSegment::::append(ancestor); } @@ -425,7 +436,7 @@ pub mod pallet { 4 + hrmp_max_message_num_per_candidate as u64, ); - // Always try to read `MaxUnincludedLen` in `on_finalize`. + // Always try to read `UpgradeGoAhead` in `on_finalize`. weight += T::DbWeight::get().reads(1); weight @@ -456,6 +467,9 @@ pub mod pallet { "ValidationData must be updated only once in a block", ); + // TODO: This is more than zero, but will need benchmarking to figure out what. + let mut total_weight = Weight::zero(); + // NOTE: the inherent data is expected to be unique, even if this block is built // in the context of the same relay parent as the previous one. In particular, // the inherent shouldn't contain messages that were already processed by any of the @@ -486,6 +500,8 @@ pub mod pallet { // Update the desired maximum capacity according to the consensus hook. let (consensus_hook_weight, capacity) = T::ConsensusHook::on_state_proof(&relay_state_proof); + total_weight += consensus_hook_weight; + total_weight += Self::maybe_drop_included_ancestors(&relay_state_proof, capacity); // initialization logic: we know that this runs exactly once every block, // which means we can put the initialization logic here to remove the @@ -493,7 +509,19 @@ pub mod pallet { let upgrade_go_ahead_signal = relay_state_proof .read_upgrade_go_ahead_signal() .expect("Invalid upgrade go ahead signal"); + + let upgrade_signal_in_segment = AggregatedUnincludedSegment::::get() + .as_ref() + .and_then(SegmentTracker::consumed_go_ahead_signal); + if let Some(signal_in_segment) = upgrade_signal_in_segment.as_ref() { + // Unincluded ancestor consuming upgrade signal is still within the segment, + // sanity check that it matches with the signal from relay chain. + assert_eq!(upgrade_go_ahead_signal, Some(*signal_in_segment)); + } match upgrade_go_ahead_signal { + Some(_signal) if upgrade_signal_in_segment.is_some() => { + // Do nothing, processing logic was executed by unincluded ancestor. + }, Some(relay_chain::UpgradeGoAhead::GoAhead) => { assert!( >::exists(), @@ -518,6 +546,7 @@ pub mod pallet { .read_upgrade_restriction_signal() .expect("Invalid upgrade restriction signal"), ); + >::put(upgrade_go_ahead_signal); let host_config = relay_state_proof .read_abridged_host_configuration() @@ -533,18 +562,6 @@ pub mod pallet { ::on_validation_data(&vfp); - // TODO: This is more than zero, but will need benchmarking to figure out what. - // NOTE: We don't account for the amount of processed messages from - // downward and horizontal channels in the unincluded segment. - // - // This is correct only because the current implementation always attempts - // to exhaust each message queue and panics if the DMQ head doesn't match. - // - // If one or more messages were ever "re-processed" in a parachain block before its - // ancestor was included, the MQC heads wouldn't match and the block would be invalid. - // - // - let mut total_weight = consensus_hook_weight; total_weight += Self::process_inbound_downward_messages( relevant_messaging_state.dmq_mqc_head, downward_messages, @@ -554,7 +571,6 @@ pub mod pallet { horizontal_messages, vfp.relay_parent_number, ); - total_weight += Self::maybe_drop_included_ancestors(&relay_state_proof, capacity); Ok(PostDispatchInfo { actual_weight: Some(total_weight), pays_fee: Pays::No }) } @@ -719,6 +735,15 @@ pub mod pallet { pub(super) type UpgradeRestrictionSignal = StorageValue<_, Option, ValueQuery>; + /// Optional upgrade go-ahead signal from the relay-chain. + /// + /// This storage item is a mirror of the corresponding value for the current parachain from the + /// relay-chain. This value is ephemeral which means it doesn't hit the storage. This value is + /// set after the inherent. + #[pallet::storage] + pub(super) type UpgradeGoAhead = + StorageValue<_, Option, ValueQuery>; + /// The state proof for the last relay parent block. /// /// This field is meant to be updated each block with the validation data inherent. Therefore, diff --git a/pallets/parachain-system/src/tests.rs b/pallets/parachain-system/src/tests.rs index 574ab43078d..a7b39b4482c 100755 --- a/pallets/parachain-system/src/tests.rs +++ b/pallets/parachain-system/src/tests.rs @@ -495,6 +495,61 @@ fn unincluded_segment_is_limited() { .add(124, || {}); // The previous block wasn't included yet, should panic in `create_inherent`. } +#[test] +fn unincluded_code_upgrade_handles_signal() { + CONSENSUS_HOOK.with(|c| { + *c.borrow_mut() = Box::new(|_| (Weight::zero(), NonZeroU32::new(2).unwrap().into())) + }); + + BlockTests::new() + .with_inclusion_delay(1) + .with_relay_sproof_builder(|_, block_number, builder| { + if block_number > 123 && block_number <= 125 { + builder.upgrade_go_ahead = Some(relay_chain::UpgradeGoAhead::GoAhead); + } + }) + .add(123, || { + assert_ok!(System::set_code(RawOrigin::Root.into(), Default::default())); + }) + .add_with_post_test( + 124, + || {}, + || { + assert!( + !>::exists(), + "validation function must have been unset" + ); + }, + ) + .add_with_post_test( + 125, + || { + // The signal is present in relay state proof and ignored. + // Block that processed the signal is still not included. + }, + || { + let segment = >::get(); + assert_eq!(segment.len(), 2); + let aggregated_segment = + >::get().expect("segment is non-empty"); + assert_eq!( + aggregated_segment.consumed_go_ahead_signal(), + Some(relay_chain::UpgradeGoAhead::GoAhead) + ); + }, + ) + .add_with_post_test( + 126, + || {}, + || { + let aggregated_segment = + >::get().expect("segment is non-empty"); + // Block that processed the signal is included. + assert!(aggregated_segment.consumed_go_ahead_signal().is_none()); + }, + ); +} + #[test] fn events() { BlockTests::new() diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index a61c5725556..0694bc6956e 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -138,6 +138,9 @@ pub enum BandwidthUpdateError { /// Latest tracked HRMP watermark. latest: relay_chain::BlockNumber, }, + /// Upgrade signal sent by relay chain was already processed by + /// some ancestor from the segment. + UpgradeGoAheadAlreadyProcessed, } /// The number of messages and size in bytes submitted to HRMP channel. @@ -268,12 +271,16 @@ pub struct Ancestor { /// Output head data hash of this block. This may be optional in case the head data has not /// yet been posted on chain, but should be updated during initialization of the next block. para_head_hash: Option, + consumed_go_ahead_signal: Option, } impl Ancestor { /// Creates new ancestor without validating the bandwidth used. - pub fn new_unchecked(used_bandwidth: UsedBandwidth) -> Self { - Self { used_bandwidth, para_head_hash: None } + pub fn new_unchecked( + used_bandwidth: UsedBandwidth, + consumed_go_ahead_signal: Option, + ) -> Self { + Self { used_bandwidth, para_head_hash: None, consumed_go_ahead_signal } } /// Returns [`UsedBandwidth`] of this block. @@ -328,6 +335,7 @@ pub struct SegmentTracker { used_bandwidth: UsedBandwidth, /// The mark which specifies the block number up to which all inbound HRMP messages are processed. hrmp_watermark: Option, + consumed_go_ahead_signal: Option, /// `H` is the type of para head hash. phantom_data: PhantomData, } @@ -342,6 +350,9 @@ impl SegmentTracker { new_watermark: HrmpWatermarkUpdate, limits: &OutboundBandwidthLimits, ) -> Result<(), BandwidthUpdateError> { + if self.consumed_go_ahead_signal.is_some() && block.consumed_go_ahead_signal.is_some() { + return Err(BandwidthUpdateError::UpgradeGoAheadAlreadyProcessed) + } if let Some(watermark) = self.hrmp_watermark.as_ref() { if let HrmpWatermarkUpdate::Trunk(new) = new_watermark { if &new <= watermark { @@ -354,6 +365,10 @@ impl SegmentTracker { } self.used_bandwidth = self.used_bandwidth.append(block.used_bandwidth(), limits)?; + + if let Some(consumed) = block.consumed_go_ahead_signal.as_ref() { + self.consumed_go_ahead_signal.replace(*consumed); + } self.hrmp_watermark.replace(match new_watermark { HrmpWatermarkUpdate::Trunk(w) | HrmpWatermarkUpdate::Head(w) => w, }); @@ -364,6 +379,11 @@ impl SegmentTracker { /// Removes previously added block from the tracker. pub fn subtract(&mut self, block: &Ancestor) { self.used_bandwidth.subtract(block.used_bandwidth()); + if let Some(consumed) = block.consumed_go_ahead_signal.as_ref() { + // This is the same signal stored in the tracker. + let signal_in_segment = self.consumed_go_ahead_signal.take(); + assert_eq!(signal_in_segment, Some(*consumed)); + } // Watermark doesn't need to be updated since the is always dropped // from the tail of the segment. } @@ -372,6 +392,11 @@ impl SegmentTracker { pub fn used_bandwidth(&self) -> &UsedBandwidth { &self.used_bandwidth } + + /// Return go ahead signal consumed by some ancestor in a segment, if any. + pub fn consumed_go_ahead_signal(&self) -> Option { + self.consumed_go_ahead_signal.clone() + } } #[cfg(test)] @@ -549,6 +574,7 @@ mod tests { let ancestor_0 = Ancestor { used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), para_head_hash: None::, + consumed_go_ahead_signal: None, }; segment .append(&ancestor_0, HrmpWatermarkUpdate::Trunk(0), &limits) @@ -558,6 +584,7 @@ mod tests { let ancestor = Ancestor { used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), para_head_hash: None::, + consumed_go_ahead_signal: None, }; segment .append(&ancestor, HrmpWatermarkUpdate::Trunk(watermark), &limits) @@ -568,6 +595,7 @@ mod tests { let ancestor_5 = Ancestor { used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), para_head_hash: None::, + consumed_go_ahead_signal: None, }; assert_matches!( segment.append(&ancestor_5, HrmpWatermarkUpdate::Trunk(5), &limits), @@ -587,6 +615,7 @@ mod tests { let ancestor = Ancestor { used_bandwidth: create_used_hrmp([(para_1, para_1_update)].into()), para_head_hash: None::, + consumed_go_ahead_signal: None, }; segment .append(&ancestor, HrmpWatermarkUpdate::Trunk(6), &limits) @@ -621,6 +650,7 @@ mod tests { let ancestor_0 = Ancestor { used_bandwidth: create_used_ump((1, 10)), para_head_hash: None::, + consumed_go_ahead_signal: None, }; segment .append(&ancestor_0, HrmpWatermarkUpdate::Trunk(0), &limits) @@ -630,6 +660,7 @@ mod tests { let ancestor = Ancestor { used_bandwidth: create_used_ump((1, 10)), para_head_hash: None::, + consumed_go_ahead_signal: None, }; segment .append(&ancestor, HrmpWatermarkUpdate::Trunk(watermark), &limits) @@ -639,6 +670,7 @@ mod tests { let ancestor_4 = Ancestor { used_bandwidth: create_used_ump((1, 30)), para_head_hash: None::, + consumed_go_ahead_signal: None, }; assert_matches!( segment.append(&ancestor_4, HrmpWatermarkUpdate::Trunk(4), &limits), @@ -651,6 +683,7 @@ mod tests { let ancestor = Ancestor { used_bandwidth: create_used_ump((1, 5)), para_head_hash: None::, + consumed_go_ahead_signal: None, }; segment .append(&ancestor, HrmpWatermarkUpdate::Trunk(4), &limits) @@ -671,6 +704,7 @@ mod tests { let ancestor = Ancestor { used_bandwidth: UsedBandwidth::default(), para_head_hash: None::, + consumed_go_ahead_signal: None, }; let limits = OutboundBandwidthLimits { ump_messages_remaining: 0, @@ -734,6 +768,7 @@ mod tests { let ancestor_0 = Ancestor { used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), para_head_hash: None::, + consumed_go_ahead_signal: None, }; segment .append(&ancestor_0, HrmpWatermarkUpdate::Head(0), &limits) @@ -742,6 +777,7 @@ mod tests { let ancestor_1 = Ancestor { used_bandwidth: create_used_hrmp([(para_1, para_1_update)].into()), para_head_hash: None::, + consumed_go_ahead_signal: None, }; segment .append(&ancestor_1, HrmpWatermarkUpdate::Head(1), &limits) @@ -755,4 +791,48 @@ mod tests { segment.subtract(&ancestor_1); assert_eq!(segment.used_bandwidth.hrmp_outgoing.len(), 0); } + + #[test] + fn segment_go_ahead_signal_is_unique() { + let limits = OutboundBandwidthLimits { + ump_messages_remaining: 0, + ump_bytes_remaining: 0, + hrmp_outgoing: BTreeMap::default(), + }; + + let mut segment = SegmentTracker::default(); + + let ancestor_0 = Ancestor { + used_bandwidth: UsedBandwidth::default(), + para_head_hash: None::, + consumed_go_ahead_signal: Some(relay_chain::UpgradeGoAhead::GoAhead), + }; + segment + .append(&ancestor_0, HrmpWatermarkUpdate::Head(0), &limits) + .expect("update is within the limits"); + + let ancestor_1 = Ancestor { + used_bandwidth: UsedBandwidth::default(), + para_head_hash: None::, + consumed_go_ahead_signal: None, + }; + segment + .append(&ancestor_1, HrmpWatermarkUpdate::Head(1), &limits) + .expect("update is within the limits"); + + let ancestor_2 = Ancestor { + used_bandwidth: UsedBandwidth::default(), + para_head_hash: None::, + consumed_go_ahead_signal: Some(relay_chain::UpgradeGoAhead::Abort), + }; + assert_matches!( + segment.append(&ancestor_2, HrmpWatermarkUpdate::Head(2), &limits), + Err(BandwidthUpdateError::UpgradeGoAheadAlreadyProcessed) + ); + + segment.subtract(&ancestor_0); + segment + .append(&ancestor_2, HrmpWatermarkUpdate::Head(1), &limits) + .expect("update is within the limits"); + } } From 32669a257c619cd2817044763352291cb33feb77 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 18 May 2023 20:01:19 +0400 Subject: [PATCH 2/3] doc comment --- pallets/parachain-system/src/unincluded_segment.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index 0694bc6956e..53907f441cd 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -271,6 +271,7 @@ pub struct Ancestor { /// Output head data hash of this block. This may be optional in case the head data has not /// yet been posted on chain, but should be updated during initialization of the next block. para_head_hash: Option, + /// Optional go-ahead signal sent by the relay-chain this ancestor has processed. consumed_go_ahead_signal: Option, } @@ -335,6 +336,8 @@ pub struct SegmentTracker { used_bandwidth: UsedBandwidth, /// The mark which specifies the block number up to which all inbound HRMP messages are processed. hrmp_watermark: Option, + /// Optional go-ahead signal sent by the relay-chain some ancestor from the segment has processed. + /// Only single block is allowed to have this set within the whole segment. consumed_go_ahead_signal: Option, /// `H` is the type of para head hash. phantom_data: PhantomData, From c89ff87e82670a11cfe29f40c900222db6045a28 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 22 May 2023 16:10:12 +0400 Subject: [PATCH 3/3] add test --- pallets/parachain-system/src/tests.rs | 54 +++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/pallets/parachain-system/src/tests.rs b/pallets/parachain-system/src/tests.rs index a7b39b4482c..8d7b4de2b2e 100755 --- a/pallets/parachain-system/src/tests.rs +++ b/pallets/parachain-system/src/tests.rs @@ -550,6 +550,60 @@ fn unincluded_code_upgrade_handles_signal() { ); } +#[test] +fn unincluded_code_upgrade_scheduled_after_go_ahead() { + CONSENSUS_HOOK.with(|c| { + *c.borrow_mut() = Box::new(|_| (Weight::zero(), NonZeroU32::new(2).unwrap().into())) + }); + + BlockTests::new() + .with_inclusion_delay(1) + .with_relay_sproof_builder(|_, block_number, builder| { + if block_number > 123 && block_number <= 125 { + builder.upgrade_go_ahead = Some(relay_chain::UpgradeGoAhead::GoAhead); + } + }) + .add(123, || { + assert_ok!(System::set_code(RawOrigin::Root.into(), Default::default())); + }) + .add_with_post_test( + 124, + || {}, + || { + assert!( + !>::exists(), + "validation function must have been unset" + ); + // The previous go-ahead signal was processed, schedule another upgrade. + assert_ok!(System::set_code(RawOrigin::Root.into(), Default::default())); + }, + ) + .add_with_post_test( + 125, + || { + // The signal is present in relay state proof and ignored. + // Block that processed the signal is still not included. + }, + || { + let segment = >::get(); + assert_eq!(segment.len(), 2); + let aggregated_segment = + >::get().expect("segment is non-empty"); + assert_eq!( + aggregated_segment.consumed_go_ahead_signal(), + Some(relay_chain::UpgradeGoAhead::GoAhead) + ); + }, + ) + .add_with_post_test( + 126, + || {}, + || { + assert!(>::exists(), "upgrade is pending"); + }, + ); +} + #[test] fn events() { BlockTests::new()