Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[FRAME] Use 'ready' pages in XCMP suspend logic #2393

Merged
merged 11 commits into from
Mar 5, 2024
6 changes: 3 additions & 3 deletions cumulus/pallets/xcmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ impl<T: Config> Pallet<T> {
let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
let fp = T::XcmpQueue::footprint(sender);
// Assume that it will not fit into the current page:
let new_pages = fp.pages.saturating_add(1);
let new_pages = fp.ready_pages.saturating_add(1);
if new_pages > drop_threshold {
// This should not happen since the channel should have been suspended in
// [`on_queue_changed`].
Expand Down Expand Up @@ -673,12 +673,12 @@ impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
let suspended = suspended_channels.contains(&para);

if suspended && fp.pages <= resume_threshold {
if suspended && fp.ready_pages <= resume_threshold {
Self::send_signal(para, ChannelSignal::Resume);

suspended_channels.remove(&para);
<InboundXcmpSuspended<T>>::put(suspended_channels);
} else if !suspended && fp.pages >= suspend_threshold {
} else if !suspended && fp.ready_pages >= suspend_threshold {
log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
Self::send_signal(para, ChannelSignal::Suspend);

Expand Down
1 change: 1 addition & 0 deletions cumulus/pallets/xcmp-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl<T: OnQueueChanged<ParaId>> EnqueueMessage<ParaId> for EnqueueToLocalStorage
}
}
footprint.pages = footprint.storage.size as u32 / 16; // Number does not matter
footprint.ready_pages = footprint.pages;
footprint
}
}
Expand Down
6 changes: 5 additions & 1 deletion cumulus/pallets/xcmp-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ fn xcm_enqueueing_starts_dropping_on_overflow() {
// The drop threshold for pages is 48, the others numbers dont really matter:
assert_eq!(
<Test as Config>::XcmpQueue::footprint(1000.into()),
QueueFootprint { storage: Footprint { count: 256, size: 768 }, pages: 48 }
QueueFootprint {
storage: Footprint { count: 256, size: 768 },
pages: 48,
ready_pages: 48
}
);
})
}
Expand Down
5 changes: 5 additions & 0 deletions substrate/frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ fn process_some_messages(num_msgs: u32) {
ServiceWeight::set(Some(weight));
let consumed = next_block();

for origin in BookStateFor::<Test>::iter_keys() {
let fp = MessageQueue::footprint(origin);
assert_eq!(fp.pages, fp.ready_pages);
}

assert_eq!(consumed, weight, "\n{}", MessageQueue::debug_info());
assert_eq!(NumMessagesProcessed::take(), num_msgs as usize);
}
Expand Down
6 changes: 4 additions & 2 deletions substrate/frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ use frame_support::{
defensive,
pallet_prelude::*,
traits::{
DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError, Footprint, ProcessMessage,
ProcessMessageError, QueueFootprint, QueuePausedQuery, ServiceQueues,
DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError,
Footprint, ProcessMessage, ProcessMessageError, QueueFootprint, QueuePausedQuery,
ServiceQueues,
},
BoundedSlice, CloneNoBound, DefaultNoBound,
};
Expand Down Expand Up @@ -427,6 +428,7 @@ impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
fn from(book: BookState<MessageOrigin>) -> Self {
QueueFootprint {
pages: book.count,
ready_pages: book.end.defensive_saturating_sub(book.begin),
storage: Footprint { count: book.message_count, size: book.size },
}
}
Expand Down
4 changes: 2 additions & 2 deletions substrate/frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,6 @@ pub fn num_overweight_enqueued_events() -> u32 {
.count() as u32
}

pub fn fp(pages: u32, count: u64, size: u64) -> QueueFootprint {
QueueFootprint { storage: Footprint { count, size }, pages }
pub fn fp(pages: u32, ready_pages: u32, count: u64, size: u64) -> QueueFootprint {
QueueFootprint { storage: Footprint { count, size }, pages, ready_pages }
}
13 changes: 9 additions & 4 deletions substrate/frame/message-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,28 +1064,33 @@ fn footprint_num_pages_works() {
MessageQueue::enqueue_message(msg("weight=2"), Here);
MessageQueue::enqueue_message(msg("weight=3"), Here);

assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 16));
assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 2, 16));

// Mark the messages as overweight.
assert_eq!(MessageQueue::service_queues(1.into_weight()), 0.into_weight());
assert_eq!(System::events().len(), 2);
// Overweight does not change the footprint.
assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 16));
// `ready_pages` decreases but `page` count does not.
assert_eq!(MessageQueue::footprint(Here), fp(2, 0, 2, 16));

// Now execute the second message.
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(3.into_weight(), (Here, 1, 0))
.unwrap(),
3.into_weight()
);
assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 8));
assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 1, 8));
// And the first one:
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(2.into_weight(), (Here, 0, 0))
.unwrap(),
2.into_weight()
);
assert_eq!(MessageQueue::footprint(Here), Default::default());
assert_eq!(MessageQueue::footprint(Here), fp(0, 0, 0, 0));

// `ready_pages` and normal `pages` increases again:
MessageQueue::enqueue_message(msg("weight=3"), Here);
assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 1, 8));
})
}

Expand Down
2 changes: 2 additions & 0 deletions substrate/frame/support/src/traits/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ impl<OverweightAddr> ServiceQueues for NoopServiceQueues<OverweightAddr> {
pub struct QueueFootprint {
/// The number of pages in the queue (including overweight pages).
pub pages: u32,
/// The number of pages that are ready (not yet processed and also not overweight).
pub ready_pages: u32,
/// The storage footprint of the queue (including overweight messages).
pub storage: Footprint,
}
Expand Down