Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

paras-scheduler: Fix migration to V1 #1969

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion polkadot/runtime/parachains/src/assigner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ impl<T: Config> Pallet<T> {
fn is_bulk_core(core_idx: &CoreIndex) -> bool {
let parachain_cores =
<ParachainAssigner<T> as AssignmentProvider<BlockNumberFor<T>>>::session_core_count();
(0..parachain_cores).contains(&core_idx.0)

core_idx.0 < parachain_cores
}
}

Expand Down
4 changes: 2 additions & 2 deletions polkadot/runtime/parachains/src/assigner_parachains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod pallet {

impl<T: Config> AssignmentProvider<BlockNumberFor<T>> for Pallet<T> {
fn session_core_count() -> u32 {
<paras::Pallet<T>>::parachains().len() as u32
paras::Parachains::<T>::decode_len().unwrap_or(0) as u32
}

fn pop_assignment_for_core(
Expand All @@ -62,7 +62,7 @@ impl<T: Config> AssignmentProvider<BlockNumberFor<T>> for Pallet<T> {
max_availability_timeouts: 0,
// The next assignment already goes to the same [`ParaId`], this can be any number
// that's high enough to clear the time it takes to clear backing/availability.
ttl: BlockNumberFor::<T>::from(10u32),
ttl: 10u32.into(),
}
}
}
13 changes: 5 additions & 8 deletions polkadot/runtime/parachains/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,14 +605,10 @@ impl<T: Config> Pallet<T> {
/// Moves all elements in the claimqueue forward.
fn move_claimqueue_forward() {
let mut cq = ClaimQueue::<T>::get();
for (_, core_queue) in cq.iter_mut() {
for core_queue in cq.values_mut() {
// First pop the finished claims from the front.
match core_queue.front() {
None => {},
Some(None) => {
core_queue.pop_front();
},
Some(_) => {},
if let Some(None) = core_queue.front() {
core_queue.pop_front();
}
}

Expand All @@ -628,9 +624,10 @@ impl<T: Config> Pallet<T> {

// This can only happen on new sessions at which we move all assignments back to the
// provider. Hence, there's nothing we need to do here.
if ValidatorGroups::<T>::get().is_empty() {
if ValidatorGroups::<T>::decode_len().map_or(true, |l| l == 0) {
return
}

let n_lookahead = Self::claimqueue_lookahead();
let n_session_cores = T::AssignmentProvider::session_core_count();
let cq = ClaimQueue::<T>::get();
Expand Down
111 changes: 74 additions & 37 deletions polkadot/runtime/parachains/src/scheduler/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,45 @@ use frame_support::{
mod v0 {
use super::*;

use primitives::CollatorId;
use primitives::{CollatorId, Id};

#[storage_alias]
pub(super) type Scheduled<T: Config> = StorageValue<Pallet<T>, Vec<CoreAssignment>, ValueQuery>;

#[derive(Encode, Decode)]
pub struct QueuedParathread {
claim: primitives::ParathreadEntry,
core_offset: u32,
}
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub struct ParathreadClaim(pub Id, pub CollatorId);

#[derive(Encode, Decode, Default)]
pub struct ParathreadClaimQueue {
queue: Vec<QueuedParathread>,
next_core_offset: u32,
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub struct ParathreadEntry {
/// The claim.
pub claim: ParathreadClaim,
/// Number of retries.
pub retries: u32,
}

// Only here to facilitate the migration.
impl ParathreadClaimQueue {
pub fn len(self) -> usize {
self.queue.len()
}
/// What is occupying a specific availability core.
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub enum CoreOccupied {
/// A parathread.
Parathread(ParathreadEntry),
/// A parachain.
Parachain,
}

/// The actual type isn't important, as we only delete the key in the state.
#[storage_alias]
pub(super) type ParathreadQueue<T: Config> =
StorageValue<Pallet<T>, ParathreadClaimQueue, ValueQuery>;
pub(crate) type AvailabilityCores<T: Config> =
StorageValue<Pallet<T>, Vec<Option<CoreOccupied>>, ValueQuery>;

/// The actual type isn't important, as we only delete the key in the state.
#[storage_alias]
pub(super) type ParathreadClaimIndex<T: Config> =
StorageValue<Pallet<T>, Vec<ParaId>, ValueQuery>;
pub(super) type ParathreadQueue<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;

#[storage_alias]
pub(super) type ParathreadClaimIndex<T: Config> = StorageValue<Pallet<T>, (), ValueQuery>;

/// The assignment type.
#[derive(Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
Expand Down Expand Up @@ -108,30 +117,36 @@ pub mod v1 {

#[cfg(feature = "try-runtime")]
fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::DispatchError> {
log::trace!(
let n: u32 = v0::Scheduled::<T>::get().len() as u32 +
v0::AvailabilityCores::<T>::get().iter().filter(|c| c.is_some()).count() as u32;

log::info!(
target: crate::scheduler::LOG_TARGET,
"Scheduled before migration: {}",
v0::Scheduled::<T>::get().len()
"Number of scheduled and waiting for availability before: {n}",
);

let bytes = u32::to_be_bytes(v0::Scheduled::<T>::get().len() as u32);

Ok(bytes.to_vec())
Ok(n.encode())
}

#[cfg(feature = "try-runtime")]
fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::DispatchError> {
log::trace!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");
log::info!(target: crate::scheduler::LOG_TARGET, "Running post_upgrade()");

ensure!(
v0::Scheduled::<T>::get().len() == 0,
v0::Scheduled::<T>::get().is_empty(),
"Scheduled should be empty after the migration"
);

let sched_len = u32::from_be_bytes(state.try_into().unwrap());
let expected_len = u32::decode(&mut &state[..]).unwrap();
let availability_cores_waiting = super::AvailabilityCores::<T>::get()
.iter()
.filter(|c| !matches!(c, CoreOccupied::Free))
.count();

ensure!(
Pallet::<T>::claimqueue_len() as u32 == sched_len,
"Scheduled completely moved to ClaimQueue after migration"
Pallet::<T>::claimqueue_len() as u32 + availability_cores_waiting as u32 ==
expected_len,
"ClaimQueue and AvailabilityCores should have the correct length",
);

Ok(())
Expand All @@ -142,11 +157,8 @@ pub mod v1 {
pub fn migrate_to_v1<T: crate::scheduler::Config>() -> Weight {
let mut weight: Weight = Weight::zero();

let pq = v0::ParathreadQueue::<T>::take();
let pq_len = pq.len() as u64;

let pci = v0::ParathreadClaimIndex::<T>::take();
let pci_len = pci.len() as u64;
v0::ParathreadQueue::<T>::kill();
v0::ParathreadClaimIndex::<T>::kill();

let now = <frame_system::Pallet<T>>::block_number();
let scheduled = v0::Scheduled::<T>::take();
Expand All @@ -158,10 +170,35 @@ pub fn migrate_to_v1<T: crate::scheduler::Config>() -> Weight {
Pallet::<T>::add_to_claimqueue(core_idx, pe);
}

let parachains = paras::Pallet::<T>::parachains();
let availability_cores = v0::AvailabilityCores::<T>::take();
let mut new_availability_cores = Vec::new();

for (core_index, core) in availability_cores.into_iter().enumerate() {
let new_core = if let Some(core) = core {
match core {
v0::CoreOccupied::Parachain => CoreOccupied::Paras(ParasEntry::new(
Assignment::new(parachains[core_index]),
now,
)),
v0::CoreOccupied::Parathread(entry) =>
CoreOccupied::Paras(ParasEntry::new(Assignment::new(entry.claim.0), now)),
}
} else {
CoreOccupied::Free
};

new_availability_cores.push(new_core);
}

super::AvailabilityCores::<T>::set(new_availability_cores);

// 2x as once for Scheduled and once for Claimqueue
weight = weight.saturating_add(T::DbWeight::get().reads_writes(2 * sched_len, 2 * sched_len));
weight = weight.saturating_add(T::DbWeight::get().reads_writes(pq_len, pq_len));
weight = weight.saturating_add(T::DbWeight::get().reads_writes(pci_len, pci_len));
// reading parachains + availability_cores, writing AvailabilityCores
weight = weight.saturating_add(T::DbWeight::get().reads_writes(2, 1));
// 2x kill
weight = weight.saturating_add(T::DbWeight::get().writes(2));

weight
}