From 8299c9748312b0aac64774371929453a8ae2aa2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Sat, 21 Oct 2023 01:44:11 +0200 Subject: [PATCH 1/2] paras-scheduler: Fix migration to V1 The migration was missing to migrate `AvailabilityCores`. If this isn't migrated, all parachains in the availability phase would stall until the next session is started. This pull request fixes this by migrating this data. Besides that it is doing some cosmetics. --- polkadot/runtime/parachains/src/assigner.rs | 3 +- .../parachains/src/assigner_parachains.rs | 4 +- polkadot/runtime/parachains/src/scheduler.rs | 13 +- .../parachains/src/scheduler/migration.rs | 111 ++++++++++++------ 4 files changed, 83 insertions(+), 48 deletions(-) diff --git a/polkadot/runtime/parachains/src/assigner.rs b/polkadot/runtime/parachains/src/assigner.rs index b21e857a47137..9e408df61dc18 100644 --- a/polkadot/runtime/parachains/src/assigner.rs +++ b/polkadot/runtime/parachains/src/assigner.rs @@ -53,7 +53,8 @@ impl Pallet { fn is_bulk_core(core_idx: &CoreIndex) -> bool { let parachain_cores = as AssignmentProvider>>::session_core_count(); - (0..parachain_cores).contains(&core_idx.0) + + core_idx.0 < parachain_cores } } diff --git a/polkadot/runtime/parachains/src/assigner_parachains.rs b/polkadot/runtime/parachains/src/assigner_parachains.rs index d605d86605151..866e8290052a8 100644 --- a/polkadot/runtime/parachains/src/assigner_parachains.rs +++ b/polkadot/runtime/parachains/src/assigner_parachains.rs @@ -39,7 +39,7 @@ pub mod pallet { impl AssignmentProvider> for Pallet { fn session_core_count() -> u32 { - >::parachains().len() as u32 + paras::Parachains::::decode_len().unwrap_or(0) as u32 } fn pop_assignment_for_core( @@ -62,7 +62,7 @@ impl AssignmentProvider> for Pallet { max_availability_timeouts: 0, // The next assignment already goes to the same [`ParaId`], this can be any number // that's high enough to clear the time it takes to clear backing/availability. - ttl: BlockNumberFor::::from(10u32), + ttl: 10u32.into(), } } } diff --git a/polkadot/runtime/parachains/src/scheduler.rs b/polkadot/runtime/parachains/src/scheduler.rs index 60b2a9254600e..d088158fe4ace 100644 --- a/polkadot/runtime/parachains/src/scheduler.rs +++ b/polkadot/runtime/parachains/src/scheduler.rs @@ -605,14 +605,10 @@ impl Pallet { /// Moves all elements in the claimqueue forward. fn move_claimqueue_forward() { let mut cq = ClaimQueue::::get(); - for (_, core_queue) in cq.iter_mut() { + for core_queue in cq.values_mut() { // First pop the finished claims from the front. - match core_queue.front() { - None => {}, - Some(None) => { - core_queue.pop_front(); - }, - Some(_) => {}, + if let Some(None) = core_queue.front() { + core_queue.pop_front(); } } @@ -628,9 +624,10 @@ impl Pallet { // This can only happen on new sessions at which we move all assignments back to the // provider. Hence, there's nothing we need to do here. - if ValidatorGroups::::get().is_empty() { + if ValidatorGroups::::decode_len().map_or(true, |l| l == 0) { return } + let n_lookahead = Self::claimqueue_lookahead(); let n_session_cores = T::AssignmentProvider::session_core_count(); let cq = ClaimQueue::::get(); diff --git a/polkadot/runtime/parachains/src/scheduler/migration.rs b/polkadot/runtime/parachains/src/scheduler/migration.rs index c1ce95b10e3cb..bb9a647e955ca 100644 --- a/polkadot/runtime/parachains/src/scheduler/migration.rs +++ b/polkadot/runtime/parachains/src/scheduler/migration.rs @@ -25,36 +25,45 @@ use frame_support::{ mod v0 { use super::*; - use primitives::CollatorId; + use primitives::{CollatorId, Id}; + #[storage_alias] pub(super) type Scheduled = StorageValue, Vec, ValueQuery>; - #[derive(Encode, Decode)] - pub struct QueuedParathread { - claim: primitives::ParathreadEntry, - core_offset: u32, - } + #[derive(Clone, Encode, Decode)] + #[cfg_attr(feature = "std", derive(PartialEq))] + pub struct ParathreadClaim(pub Id, pub CollatorId); - #[derive(Encode, Decode, Default)] - pub struct ParathreadClaimQueue { - queue: Vec, - next_core_offset: u32, + #[derive(Clone, Encode, Decode)] + #[cfg_attr(feature = "std", derive(PartialEq))] + pub struct ParathreadEntry { + /// The claim. + pub claim: ParathreadClaim, + /// Number of retries. + pub retries: u32, } - // Only here to facilitate the migration. - impl ParathreadClaimQueue { - pub fn len(self) -> usize { - self.queue.len() - } + /// What is occupying a specific availability core. + #[derive(Clone, Encode, Decode)] + #[cfg_attr(feature = "std", derive(PartialEq))] + pub enum CoreOccupied { + /// A parathread. + Parathread(ParathreadEntry), + /// A parachain. + Parachain, } + /// The actual type isn't important, as we only delete the key in the state. #[storage_alias] - pub(super) type ParathreadQueue = - StorageValue, ParathreadClaimQueue, ValueQuery>; + pub(crate) type AvailabilityCores = + StorageValue, Vec>, ValueQuery>; + /// The actual type isn't important, as we only delete the key in the state. #[storage_alias] - pub(super) type ParathreadClaimIndex = - StorageValue, Vec, ValueQuery>; + pub(super) type ParathreadQueue = StorageValue, (), ValueQuery>; + + #[storage_alias] + pub(super) type ParathreadClaimIndex = StorageValue, (), ValueQuery>; /// The assignment type. #[derive(Clone, Encode, Decode, TypeInfo, RuntimeDebug)] @@ -108,30 +117,36 @@ pub mod v1 { #[cfg(feature = "try-runtime")] fn pre_upgrade() -> Result, sp_runtime::DispatchError> { - log::trace!( + let n: u32 = v0::Scheduled::::get().len() as u32 + + v0::AvailabilityCores::::get().iter().filter(|c| c.is_some()).count() as u32; + + log::info!( target: crate::scheduler::LOG_TARGET, - "Scheduled before migration: {}", - v0::Scheduled::::get().len() + "Number of scheduled and waiting for availability before: {n}", ); - let bytes = u32::to_be_bytes(v0::Scheduled::::get().len() as u32); - - Ok(bytes.to_vec()) + Ok(n.encode()) } #[cfg(feature = "try-runtime")] fn post_upgrade(state: Vec) -> Result<(), sp_runtime::DispatchError> { - log::trace!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()"); + log::info!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()"); ensure!( - v0::Scheduled::::get().len() == 0, + v0::Scheduled::::get().is_empty(), "Scheduled should be empty after the migration" ); - let sched_len = u32::from_be_bytes(state.try_into().unwrap()); + let expected_len = u32::decode(&mut &state[..]).unwrap(); + let availability_cores_waiting = super::AvailabilityCores::::get() + .iter() + .filter(|c| !matches!(c, CoreOccupied::Free)) + .count(); + ensure!( - Pallet::::claimqueue_len() as u32 == sched_len, - "Scheduled completely moved to ClaimQueue after migration" + Pallet::::claimqueue_len() as u32 + availability_cores_waiting as u32 == + expected_len, + "ClaimQueue and AvailabilityCores should have the correct length", ); Ok(()) @@ -142,11 +157,8 @@ pub mod v1 { pub fn migrate_to_v1() -> Weight { let mut weight: Weight = Weight::zero(); - let pq = v0::ParathreadQueue::::take(); - let pq_len = pq.len() as u64; - - let pci = v0::ParathreadClaimIndex::::take(); - let pci_len = pci.len() as u64; + v0::ParathreadQueue::::kill(); + v0::ParathreadClaimIndex::::kill(); let now = >::block_number(); let scheduled = v0::Scheduled::::take(); @@ -158,10 +170,35 @@ pub fn migrate_to_v1() -> Weight { Pallet::::add_to_claimqueue(core_idx, pe); } + let parachains = paras::Pallet::::parachains(); + let availability_cores = v0::AvailabilityCores::::take(); + let mut new_availability_cores = Vec::new(); + + for (core_index, core) in availability_cores.into_iter().enumerate() { + let new_core = if let Some(core) = core { + match core { + v0::CoreOccupied::Parachain => CoreOccupied::Paras(ParasEntry::new( + Assignment::new(parachains[core_index]), + now, + )), + v0::CoreOccupied::Parathread(entry) => + CoreOccupied::Paras(ParasEntry::new(Assignment::new(entry.claim.0), now)), + } + } else { + CoreOccupied::Free + }; + + new_availability_cores.push(new_core); + } + + super::AvailabilityCores::::set(new_availability_cores); + // 2x as once for Scheduled and once for Claimqueue weight = weight.saturating_add(T::DbWeight::get().reads_writes(2 * sched_len, 2 * sched_len)); - weight = weight.saturating_add(T::DbWeight::get().reads_writes(pq_len, pq_len)); - weight = weight.saturating_add(T::DbWeight::get().reads_writes(pci_len, pci_len)); + // reading parachains + availability_cores, writing AvailabilityCores + weight = weight.saturating_add(T::DbWeight::get().reads_writes(2, 1)); + // 2x kill + weight = weight.saturating_add(T::DbWeight::get().writes(2)); weight } From f27a5a38509dff9bc45c569ae5703569e95dd805 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Sat, 21 Oct 2023 19:19:08 +0200 Subject: [PATCH 2/2] Easier code --- polkadot/runtime/parachains/src/scheduler.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/polkadot/runtime/parachains/src/scheduler.rs b/polkadot/runtime/parachains/src/scheduler.rs index d088158fe4ace..af48019124de2 100644 --- a/polkadot/runtime/parachains/src/scheduler.rs +++ b/polkadot/runtime/parachains/src/scheduler.rs @@ -683,8 +683,7 @@ impl Pallet { fn add_to_claimqueue(core_idx: CoreIndex, pe: ParasEntry>) { ClaimQueue::::mutate(|la| { - let la_deque = la.entry(core_idx).or_insert_with(|| VecDeque::new()); - la_deque.push_back(Some(pe)); + la.entry(core_idx).or_default().push_back(Some(pe)); }); }