From e517f49c6427b1c04bb39ace6da8e1c3ebd9bad7 Mon Sep 17 00:00:00 2001 From: Aljoscha Meyer Date: Mon, 6 Jan 2025 12:30:22 +0100 Subject: [PATCH 1/2] Implement and test SharedConsumer --- wb_async_utils/src/lib.rs | 3 + wb_async_utils/src/shared_consumer.rs | 390 ++++++++++++++++++++++++++ wb_async_utils/src/shared_producer.rs | 4 +- 3 files changed, 395 insertions(+), 2 deletions(-) create mode 100644 wb_async_utils/src/shared_consumer.rs diff --git a/wb_async_utils/src/lib.rs b/wb_async_utils/src/lib.rs index 5917851..8443b79 100644 --- a/wb_async_utils/src/lib.rs +++ b/wb_async_utils/src/lib.rs @@ -16,6 +16,9 @@ pub mod spsc; #[cfg(feature = "ufotofu_utils")] pub mod shared_producer; +#[cfg(feature = "ufotofu_utils")] +pub mod shared_consumer; + // This is safe if and only if the object pointed at by `reference` lives for at least `'longer`. // See https://doc.rust-lang.org/nightly/std/intrinsics/fn.transmute.html for more detail. pub(crate) unsafe fn extend_lifetime<'shorter, 'longer, T: ?Sized>(reference: &'shorter T) -> &'longer T { diff --git a/wb_async_utils/src/shared_consumer.rs b/wb_async_utils/src/shared_consumer.rs new file mode 100644 index 0000000..a21ad24 --- /dev/null +++ b/wb_async_utils/src/shared_consumer.rs @@ -0,0 +1,390 @@ +//! Provides [`SharedConsumer`], a way for creating multiple independent handles that coordinate termporary exclusive access to a shared underlying consumer. + +use core::{ + cell::Cell, + fmt::Debug, + ops::{Deref, DerefMut}, +}; + +use ufotofu::{BufferedConsumer, BulkConsumer, Consumer}; + +use crate::{mutex::WriteGuard, Mutex}; + +/// The state shared between all clones of the same [`SharedConsumer`]. This is fully opaque, but we expose it to give control over where it is allocated. +pub struct State { + m: Mutex>, + unclosed_handle_count: Cell, +} + +impl State { + /// Creates a new [`State`] for managing shared access to the same `consumer`. + pub fn new(consumer: C) -> Self { + State { + m: Mutex::new(MutexState { + c: consumer, + error: None, + }), + unclosed_handle_count: Cell::new(1), + } + } +} + +impl Debug for State +where + C: Consumer + Debug, + C::Error: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("State") + .field("m", &self.m) + .field("unclosed_handle_count", &self.unclosed_handle_count) + .finish() + } +} + +#[derive(Debug)] +struct MutexState { + c: C, + error: Option, +} + +/// A consumer adaptor that allows access to the same consumer from multiple parts in the codebase by providing a cloneable handle. +/// +/// This type provides three core pieces of functionality: ensuring exclusive access to the underlying consumer so that independent components do not interleave each other's data, ignoring all `close`s except for the very last one, and cloning and caching errors to present them to all handles. More specifically: +/// +/// A [`SharedConsumer`] handle does not itself implement the consumer traits. Instead, one must call the async [`access_consumer`](SharedConsumer::access_consumer) method, which grants a [`SharedConsumerAccess`] which implements the consumer traits. If another [`SharedConsumerAccess`] is currently alive, the method non-blocks until the inner consumer can be accessed safely. Pending accesses are woken up in FIFO-order. +/// +/// Calling `close` on any handle is a no-op that reports a success and drops the supplied final value, except when there exists exactly one handle which hasn't been closed yet. In that case, the inner consumer is closed with the supplied final value. If you create a [`SharedConsumer`] but then never call [`access_consumer`](SharedConsumer::access_consumer) to `close` the returned [`SharedConsumerAccess`], then the underlying consumer will never be closed. +/// +/// The `Error` type of the inner consumer must implement [`Clone`]. Once the inner consumer emits an error, all [`SharedConsumerAccess`] handles will emit clones of that value on all operations. The implementation ensures that the inner consumer is not used after an error. +/// +/// The shared state between all clones of the same [`SharedConsumer`] must be supplied via a reference of type `R` to an [opaque handle](State) at creation time; this gives control over how to allocate the state and manage its lifetime to the user. Typical choices for `R` would be an `Rc` or a `&shared_producer::State`. +/// +/// ``` +/// use core::time::Duration; +/// use either::Either::*; +/// use wb_async_utils::shared_consumer::*; +/// use smol::{Timer, block_on}; +/// use ufotofu::{Consumer, consumer::{TestConsumer, TestConsumerBuilder}}; +/// +/// let underlying_c: TestConsumer = TestConsumerBuilder::new(-4, 3).build(); +/// let state = State::new(underlying_c); +/// +/// let shared1 = SharedConsumer::new(&state); +/// let shared2 = shared1.clone(); +/// +/// let write_some_items1 = async { +/// { +/// let mut c_handle = shared1.access_consumer().await; +/// Timer::after(Duration::from_millis(50)).await; // Since we hold a handle right now, obtaining the second handle has to wait for us. +/// assert_eq!(Ok(()), c_handle.consume(1).await); +/// } +/// +/// Timer::after(Duration::from_millis(50)).await; // Having dropped p_handle, the other task can jump in now. +/// +/// { +/// let mut c_handle = shared1.access_consumer().await; +/// assert_eq!(Ok(()), c_handle.consume(3).await); +/// assert_eq!(Err(-4), c_handle.consume(4).await); +/// } +/// }; +/// +/// let write_some_items2 = async { +/// Timer::after(Duration::from_millis(10)).await; // Ensure that the other task "starts". +/// +/// { +/// let mut c_handle = shared2.access_consumer().await; +/// assert_eq!(Ok(()), c_handle.consume(2).await); +/// } +/// +/// Timer::after(Duration::from_millis(50)).await; +/// +/// let mut c_handle = shared2.access_consumer().await; +/// assert_eq!(Err(-4), c_handle.consume(4).await); // Replays a cached `-4` instead of using the underlying consumer. +/// }; +/// +/// block_on(futures::future::join(write_some_items1, write_some_items2)); +/// ``` +#[derive(Debug)] +pub struct SharedConsumer +where + C: Consumer, + R: Deref> + Clone, +{ + state_ref: R, +} + +impl Clone for SharedConsumer +where + C: Consumer, + R: Deref> + Clone, +{ + fn clone(&self) -> Self { + self.state_ref + .deref() + .unclosed_handle_count + .set(self.state_ref.deref().unclosed_handle_count.get() + 1); + + Self { + state_ref: self.state_ref.clone(), + } + } +} + +impl SharedConsumer +where + C: Consumer, + R: Deref> + Clone, +{ + /// Creates a new `SharedConsumer` from a cloneable reference to a [`State`]. + pub fn new(state_ref: R) -> Self { + Self { state_ref } + } + + /// Obtains exclusive access to the underlying consumer, waiting if necessary. + pub async fn access_consumer(&self) -> SharedConsumerAccess { + SharedConsumerAccess { + c: self.state_ref.deref().m.write().await, + unclosed_handle_count: &self.state_ref.deref().unclosed_handle_count, + } + } +} + +/// A handle that represents exclusive access to an underlying shared consumer. Implements the consumer traits and forwards method calls to the underlying consumer. After the underlying consumer has emitted its error, a [`SharedConsumerAccess`] replays copies of that error instead of continuing to call methods on the underlying consumer. +pub struct SharedConsumerAccess<'shared_consumer, C: Consumer> { + c: WriteGuard<'shared_consumer, MutexState>, + unclosed_handle_count: &'shared_consumer Cell, +} + +impl<'shared_consumer, C> Debug for SharedConsumerAccess<'shared_consumer, C> +where + C: Consumer + Debug, + C::Error: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SharedConsumerAccess") + .field("c", &self.c) + .field("unclosed_handle_count", &self.unclosed_handle_count) + .finish() + } +} + +impl<'shared_consumer, C> Consumer for SharedConsumerAccess<'shared_consumer, C> +where + C: Consumer, + C::Final: Clone, + C::Error: Clone, +{ + type Item = C::Item; + + type Final = C::Final; + + type Error = C::Error; + + async fn consume(&mut self, item: Self::Item) -> Result<(), Self::Error> { + let inner_state = self.c.deref_mut(); + + match inner_state.error.as_ref() { + Some(err) => Err(err.clone()), + None => match inner_state.c.consume(item).await { + Ok(()) => Ok(()), + Err(err) => { + inner_state.error = Some(err.clone()); + Err(err) + } + }, + } + } + + async fn close(&mut self, fin: Self::Final) -> Result<(), Self::Error> { + let inner_state = self.c.deref_mut(); + + match inner_state.error.as_ref() { + Some(err) => Err(err.clone()), + None => { + self.unclosed_handle_count + .set(self.unclosed_handle_count.get() - 1); + + if self.unclosed_handle_count.get() == 0 { + // Closing the final handle. + match inner_state.c.close(fin).await { + Ok(()) => Ok(()), + Err(err) => { + inner_state.error = Some(err.clone()); + Err(err) + } + } + } else { + // Not the final handle to be closed, so do nothing. + Ok(()) + } + } + } + } +} + +impl<'shared_consumer, C> BufferedConsumer for SharedConsumerAccess<'shared_consumer, C> +where + C: BufferedConsumer, + C::Final: Clone, + C::Error: Clone, +{ + async fn flush(&mut self) -> Result<(), Self::Error> { + let inner_state = self.c.deref_mut(); + + match inner_state.error.as_ref() { + Some(err) => Err(err.clone()), + None => match inner_state.c.flush().await { + Ok(()) => Ok(()), + Err(err) => { + inner_state.error = Some(err.clone()); + Err(err) + } + }, + } + } +} + +impl<'shared_consumer, C> BulkConsumer for SharedConsumerAccess<'shared_consumer, C> +where + C: BulkConsumer, + C::Final: Clone, + C::Error: Clone, +{ + async fn expose_slots<'a>(&'a mut self) -> Result<&'a mut [Self::Item], Self::Error> + where + Self::Item: 'a, + { + let inner_state = self.c.deref_mut(); + + match inner_state.error.as_ref() { + Some(err) => Err(err.clone()), + None => match inner_state.c.expose_slots().await { + Ok(slots) => Ok(slots), + Err(err) => { + inner_state.error = Some(err.clone()); + Err(err) + } + }, + } + } + + async fn consume_slots(&mut self, amount: usize) -> Result<(), Self::Error> { + let inner_state = self.c.deref_mut(); + + match inner_state.error.as_ref() { + Some(err) => Err(err.clone()), + None => match inner_state.c.consume_slots(amount).await { + Ok(()) => Ok(()), + Err(err) => { + inner_state.error = Some(err.clone()); + Err(err) + } + }, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use core::time::Duration; + use either::Either::{Left, Right}; + use smol::{block_on, Timer}; + use ufotofu::{ + consumer::{TestConsumer, TestConsumerBuilder}, + Consumer, Producer, + }; + use ufotofu_queues::Fixed; + + use crate::spsc::{self, new_spsc}; + + #[test] + fn test_shared_consumer_errors() { + let underlying_c: TestConsumer = TestConsumerBuilder::new(-4, 3).build(); + let state = State::new(underlying_c); + + let shared1 = SharedConsumer::new(&state); + let shared2 = shared1.clone(); + + let write_some_items1 = async { + { + let mut c_handle = shared1.access_consumer().await; + Timer::after(Duration::from_millis(50)).await; // Since we hold a handle right now, obtaining the second handle has to wait for us. + assert_eq!(Ok(()), c_handle.consume(1).await); + } + + Timer::after(Duration::from_millis(50)).await; // Having dropped p_handle, the other task can jump in now. + + { + let mut c_handle = shared1.access_consumer().await; + assert_eq!(Ok(()), c_handle.consume(3).await); + assert_eq!(Err(-4), c_handle.consume(4).await); + } + }; + + let write_some_items2 = async { + Timer::after(Duration::from_millis(10)).await; // Ensure that the other task "starts". + + { + let mut c_handle = shared2.access_consumer().await; + assert_eq!(Ok(()), c_handle.consume(2).await); + } + + Timer::after(Duration::from_millis(50)).await; + + let mut c_handle = shared2.access_consumer().await; + assert_eq!(Err(-4), c_handle.consume(4).await); // Replays a cached `-4` instead of using the underlying consumer. + }; + + block_on(futures::future::join(write_some_items1, write_some_items2)); + } + + #[test] + fn test_shared_consumer_closing() { + let spsc_state: spsc::State, i16, ()> = + spsc::State::new(Fixed::new(16 /* capacity */)); + let (sender, mut receiver) = new_spsc(&spsc_state); + + let state = State::new(sender); + let shared1 = SharedConsumer::new(&state); + let shared2 = shared1.clone(); + + let write_some_items1 = async { + { + let mut c_handle = shared1.access_consumer().await; + Timer::after(Duration::from_millis(50)).await; // Since we hold a handle right now, obtaining the second handle has to wait for us. + assert_eq!(Ok(()), c_handle.consume(1).await); + } + + Timer::after(Duration::from_millis(50)).await; // Having dropped p_handle, the other task can jump in now. + + { + let mut c_handle = shared1.access_consumer().await; + assert_eq!(Ok(()), c_handle.consume(3).await); + assert_eq!(Ok(()), c_handle.close(-1).await); + } + }; + + let write_some_items2 = async { + Timer::after(Duration::from_millis(10)).await; // Ensure that the other task "starts". + + { + let mut c_handle = shared2.access_consumer().await; + assert_eq!(Ok(()), c_handle.consume(2).await); + } + + Timer::after(Duration::from_millis(50)).await; + + let mut c_handle = shared2.access_consumer().await; + assert_eq!(Ok(()), c_handle.close(-2).await); // Replays a cached `-4` instead of using the underlying consumer. + + assert_eq!(Ok(Left(1)), receiver.produce().await); + assert_eq!(Ok(Left(2)), receiver.produce().await); + assert_eq!(Ok(Left(3)), receiver.produce().await); + assert_eq!(Ok(Right(-2)), receiver.produce().await); + }; + + block_on(futures::future::join(write_some_items1, write_some_items2)); + } +} diff --git a/wb_async_utils/src/shared_producer.rs b/wb_async_utils/src/shared_producer.rs index 3de7301..786514b 100644 --- a/wb_async_utils/src/shared_producer.rs +++ b/wb_async_utils/src/shared_producer.rs @@ -230,7 +230,7 @@ where let inner_state = self.0.deref_mut(); match inner_state.last.as_ref() { - Some(Ok(_fin)) => Ok(()), // Slurping becomes a no-op after the final value has been emitted on a different handle. + Some(Ok(_fin)) => Ok(()), // Consider_produced becomes a no-op after the final value has been emitted on a different handle. Some(Err(err)) => Err(err.clone()), None => match inner_state.p.consider_produced(amount).await { Ok(()) => Ok(()), @@ -279,7 +279,7 @@ mod tests { }; let read_some_items2 = async { - Timer::after(Duration::from_millis(10)).await; // ensure that the other task "starts" + Timer::after(Duration::from_millis(10)).await; // Ensure that the other task "starts". { let mut p_handle = shared2.access_producer().await; From c36c3e22e5a39ef3fc764f210151611271c66115 Mon Sep 17 00:00:00 2001 From: Aljoscha Meyer Date: Mon, 6 Jan 2025 12:32:46 +0100 Subject: [PATCH 2/2] Run clippy and cargo fmt --- wb_async_utils/src/lib.rs | 6 ++-- wb_async_utils/src/mutex.rs | 14 ++++----- wb_async_utils/src/shared_consumer.rs | 2 +- wb_async_utils/src/shared_producer.rs | 6 ++-- wb_async_utils/src/take_cell.rs | 41 ++++++++++++++------------- 5 files changed, 36 insertions(+), 33 deletions(-) diff --git a/wb_async_utils/src/lib.rs b/wb_async_utils/src/lib.rs index 8443b79..f2466f7 100644 --- a/wb_async_utils/src/lib.rs +++ b/wb_async_utils/src/lib.rs @@ -21,7 +21,9 @@ pub mod shared_consumer; // This is safe if and only if the object pointed at by `reference` lives for at least `'longer`. // See https://doc.rust-lang.org/nightly/std/intrinsics/fn.transmute.html for more detail. -pub(crate) unsafe fn extend_lifetime<'shorter, 'longer, T: ?Sized>(reference: &'shorter T) -> &'longer T { +pub(crate) unsafe fn extend_lifetime<'shorter, 'longer, T: ?Sized>( + reference: &'shorter T, +) -> &'longer T { core::mem::transmute::<&'shorter T, &'longer T>(reference) } @@ -31,4 +33,4 @@ pub(crate) unsafe fn extend_lifetime_mut<'shorter, 'longer, T: ?Sized>( reference: &'shorter mut T, ) -> &'longer mut T { core::mem::transmute::<&'shorter mut T, &'longer mut T>(reference) -} \ No newline at end of file +} diff --git a/wb_async_utils/src/mutex.rs b/wb_async_utils/src/mutex.rs index 938d9d7..939665f 100644 --- a/wb_async_utils/src/mutex.rs +++ b/wb_async_utils/src/mutex.rs @@ -23,7 +23,7 @@ pub struct Mutex { impl Mutex { /// Creates a new mutex storing the given value. - /// + /// /// ``` /// use wb_async_utils::mutex::*; /// @@ -384,8 +384,8 @@ impl Deref for ReadGuard<'_, T> { fn deref(&self) -> &T { let borrowed = unsafe { self.mutex.value.borrow() }; // Safe because a ReadGuard can never live at the same time as another guard or a `&mut Mutex`. - // We can only obtain references with a lifetime tied to `borrowed`, but we know the refs to be both alive and exclusive for as long - // as `self`. + // We can only obtain references with a lifetime tied to `borrowed`, but we know the refs to be both alive and exclusive for as long + // as `self`. unsafe { extend_lifetime(borrowed.deref()) } } } @@ -430,8 +430,8 @@ impl Deref for WriteGuard<'_, T> { fn deref(&self) -> &T { let borrowed = unsafe { self.mutex.value.borrow() }; // Safe because a WriteGuard can never live at the same time as another guard or a `&mut Mutex`. - // We can only obtain references with a lifetime tied to `borrowed`, but we know the refs to be both alive and exclusive for as long - // as `self`. + // We can only obtain references with a lifetime tied to `borrowed`, but we know the refs to be both alive and exclusive for as long + // as `self`. unsafe { extend_lifetime(borrowed.deref()) } } } @@ -439,8 +439,8 @@ impl Deref for WriteGuard<'_, T> { impl DerefMut for WriteGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { let mut borrowed = unsafe { self.mutex.value.borrow_mut() }; // Safe because a WriteGuard can never live at the same time as another or a `&mut Mutex`. - // We can only obtain references with a lifetime tied to `borrowed`, but we know the refs to be both alive and exclusive for as long - // as `self`. + // We can only obtain references with a lifetime tied to `borrowed`, but we know the refs to be both alive and exclusive for as long + // as `self`. unsafe { extend_lifetime_mut(borrowed.deref_mut()) } } } diff --git a/wb_async_utils/src/shared_consumer.rs b/wb_async_utils/src/shared_consumer.rs index a21ad24..e5f2579 100644 --- a/wb_async_utils/src/shared_consumer.rs +++ b/wb_async_utils/src/shared_consumer.rs @@ -156,7 +156,7 @@ pub struct SharedConsumerAccess<'shared_consumer, C: Consumer> { unclosed_handle_count: &'shared_consumer Cell, } -impl<'shared_consumer, C> Debug for SharedConsumerAccess<'shared_consumer, C> +impl Debug for SharedConsumerAccess<'_, C> where C: Consumer + Debug, C::Error: Debug, diff --git a/wb_async_utils/src/shared_producer.rs b/wb_async_utils/src/shared_producer.rs index 786514b..c5f2ae6 100644 --- a/wb_async_utils/src/shared_producer.rs +++ b/wb_async_utils/src/shared_producer.rs @@ -124,7 +124,7 @@ pub struct SharedProducerAccess<'shared_producer, P: Producer>( WriteGuard<'shared_producer, MutexState

>, ); -impl<'shared_producer, P> Debug for SharedProducerAccess<'shared_producer, P> +impl

Debug for SharedProducerAccess<'_, P> where P: Producer + Debug, P::Final: Debug, @@ -156,9 +156,7 @@ where Some(Ok(fin)) => Ok(Right(fin.clone())), Some(Err(err)) => Err(err.clone()), None => match inner_state.p.produce().await { - Ok(Left(item)) => { - Ok(Left(item)) - } + Ok(Left(item)) => Ok(Left(item)), Ok(Right(fin)) => { inner_state.last = Some(Ok(fin.clone())); Ok(Right(fin)) diff --git a/wb_async_utils/src/take_cell.rs b/wb_async_utils/src/take_cell.rs index 8637c79..f402f79 100644 --- a/wb_async_utils/src/take_cell.rs +++ b/wb_async_utils/src/take_cell.rs @@ -3,8 +3,11 @@ use core::{ pin::Pin, task::{Context, Poll, Waker}, }; -use std::{collections::VecDeque, ops::{Deref, DerefMut}}; use std::fmt; +use std::{ + collections::VecDeque, + ops::{Deref, DerefMut}, +}; use fairly_unsafe_cell::FairlyUnsafeCell; @@ -24,10 +27,10 @@ impl Default for TakeCell { impl TakeCell { /// Creates a new, empty [`TakeCell`]. - /// + /// /// ``` /// use wb_async_utils::TakeCell; - /// + /// /// let c = TakeCell::<()>::new(); /// assert_eq!(None, c.into_inner()); /// ``` @@ -39,10 +42,10 @@ impl TakeCell { } /// Creates a new [`TakeCell`] storing the given value. - /// + /// /// ``` /// use wb_async_utils::TakeCell; - /// + /// /// let c = TakeCell::new_with(17); /// assert_eq!(Some(17), c.into_inner()); /// ``` @@ -54,10 +57,10 @@ impl TakeCell { } /// Consumes the [`TakeCell`] and returns the wrapped value, if any. - /// + /// /// ``` /// use wb_async_utils::TakeCell; - /// + /// /// let c = TakeCell::new_with(17); /// assert_eq!(Some(17), c.into_inner()); /// ``` @@ -66,13 +69,13 @@ impl TakeCell { } /// Returns whether the [`TakeCell`] is currently empty. - /// + /// /// ``` /// use wb_async_utils::TakeCell; - /// + /// /// let c1 = TakeCell::<()>::new(); /// assert!(c1.is_empty()); - /// + /// /// let c2 = TakeCell::new_with(17); /// assert!(!c2.is_empty()); /// ``` @@ -102,7 +105,7 @@ impl TakeCell { } /// Sets the value in the cell, and returns the old value (if any). If the cell was empty, wakes the oldest pending async method call that was waiting for a value in the cell. - /// + /// /// ``` /// use futures::join; /// use wb_async_utils::TakeCell; @@ -134,13 +137,13 @@ impl TakeCell { } /// Takes the current value out of the cell if there is one, or returns `None` otherwise. - /// + /// /// ``` /// use wb_async_utils::TakeCell; - /// + /// /// let c1 = TakeCell::<()>::new(); /// assert_eq!(None, c1.try_take()); - /// + /// /// let c2 = TakeCell::new_with(17); /// assert_eq!(Some(17), c2.try_take()); /// ``` @@ -167,7 +170,7 @@ impl TakeCell { /// Set the value based on the current value (or abscence thereof). /// /// If the cell was empty, this wakes the oldest pending async method call that was waiting for a value in the cell. - /// + /// /// ``` /// use futures::join; /// use wb_async_utils::TakeCell; @@ -192,7 +195,7 @@ impl TakeCell { /// Fallibly set the value based on the current value (or absence thereof). If `with` returns an `Err`, the cell is emptied. /// /// If the cell was empty and `with` returned `Ok`, this wakes the oldest pending async method call that was waiting for a value in the cell. - /// + /// /// ``` /// use futures::join; /// use wb_async_utils::TakeCell; @@ -217,9 +220,9 @@ impl TakeCell { self.set(with(self.try_take())?); Ok(()) } - + /// Returns how many tasks are currently waiting for the cell to be filled. - /// + /// /// ``` /// use futures::join; /// use wb_async_utils::TakeCell; @@ -252,7 +255,7 @@ impl TakeCell { fn park(&self, cx: &mut Context<'_>) { let mut parked = unsafe { self.parked.borrow_mut() }; - + parked.deref_mut().push_back(cx.waker().clone()); } }