From 629595b5ee450fad5c9b2a985ce4f37066ff66e5 Mon Sep 17 00:00:00 2001 From: Christian Blichmann Date: Tue, 20 Aug 2024 13:01:57 +0200 Subject: [PATCH] Add SAFETY comments and notes for `unsafe` blocks --- src/header.rs | 17 +++++++++--- src/raw.rs | 70 ++++++++++++++++++++++++++++++++++++++++++++++++- src/runnable.rs | 28 ++++++++++++++++++++ src/task.rs | 11 ++++++++ 4 files changed, 122 insertions(+), 4 deletions(-) diff --git a/src/header.rs b/src/header.rs index ee84035..ce9c42d 100644 --- a/src/header.rs +++ b/src/header.rs @@ -62,8 +62,10 @@ impl Header { let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel); // If the task was not notifying or registering an awaiter... + // Note: The NOTIFYING and REGISTERING bits act as locks on the awaiter UnsafeCell. if state & (NOTIFYING | REGISTERING) == 0 { // Take the waker out. + // SAFETY: self.awaiter is tied to atomic ordering operations on self.state. let waker = unsafe { (*self.awaiter.get()).take() }; // Unset the bit indicating that the task is notifying its awaiter. @@ -92,9 +94,10 @@ impl Header { let mut state = self.state.fetch_or(0, Ordering::Acquire); loop { - // There can't be two concurrent registrations because `Task` can only be polled - // by a unique pinned reference. - debug_assert!(state & REGISTERING == 0); + // There can't be two concurrent registrations because `Task` can only be polled by a + // unique pinned reference. Enforcing this here instead of marking the whole function + // unsafe. + assert!(state & REGISTERING == 0); // If we're in the notifying state at this moment, just wake and return without // registering. @@ -119,6 +122,8 @@ impl Header { } // Put the waker into the awaiter field. + // SAFETY: We have OR'd the state of the header with REGISTERING so we have a lock on + // self.awaiter and can write to it. unsafe { abort_on_panic(|| (*self.awaiter.get()) = Some(waker.clone())); } @@ -130,6 +135,12 @@ impl Header { loop { // If there was a notification, take the waker out of the awaiter field. if state & NOTIFYING != 0 { + // SAFETY: We have guaranteed that self.state is or'd with NOTIFYING, which + // prevents everyone else from writing to self.awaiter. So we know that we won't + // race with any writes from other threads. + // We can't reach this branch on the first loop through, but we can if someone + // notifies the task while we are in the middle of registering. Normally, they + // would also take a waker, but they won't if we have the NOTIFYING bit set. if let Some(w) = unsafe { (*self.awaiter.get()).take() } { abort_on_panic(|| waker = Some(w)); } diff --git a/src/raw.rs b/src/raw.rs index 7a45dad..279a27c 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -123,6 +123,9 @@ impl RawTask { let offset_r = offset_union; TaskLayout { + // SAFETY: layout came from a Layout::extend call, which dynamically checks the + // invariants for StdLayout and returns None if they are not met. The leap_unwrap! + // would have panicked before this point. layout: unsafe { layout.into_std() }, offset_s, offset_f, @@ -167,6 +170,8 @@ where Some(p) => p, }; + // SAFETY: task_layout.layout has the correct layout for a C-style struct of Header + // followed by S followed by union { F, T }. let raw = Self::from_ptr(ptr.as_ptr()); let crate::Builder { @@ -176,6 +181,10 @@ where } = builder; // Write the header as the first field of the task. + // SAFETY: This write it OK because it's through a mutable pointer to a Header that + // is definitely properly aligned and points to enough memory for a Header. We + // didn't pass our pointer through any const references or other const-ifying + // operations so the provenance is good. (raw.header as *mut Header).write(Header { state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), awaiter: UnsafeCell::new(None), @@ -195,12 +204,19 @@ where }); // Write the schedule function as the third field of the task. + // SAFETY: raw.schedule is also non-null, properly aligned, valid for writes of size + // size_of::(). (raw.schedule as *mut S).write(schedule); // Generate the future, now that the metadata has been pinned in place. + // SAFETY: Dereferencing raw.header is OK because it's properly initialized since we + // wrote to it. let future = abort_on_panic(|| future(&(*raw.header).metadata)); // Write the future as the fourth field of the task. + // SAFETY: This write is OK because raw.future is non-null, properly-aligned, and valid + // for writes of size F. Because we're not casting anything here we know it's the right + // type. raw.future.write(future); ptr @@ -208,12 +224,17 @@ where } /// Creates a `RawTask` from a raw task pointer. + /// + /// ptr must point to a region that has a size and alignment matching task layout, since doing + /// pointer arithmetic that leaves the region or creating unaligned pointers is UB. #[inline] - pub(crate) fn from_ptr(ptr: *const ()) -> Self { + pub(crate) unsafe fn from_ptr(ptr: *const ()) -> Self { let task_layout = Self::task_layout(); let p = ptr as *const u8; unsafe { + // SAFETY: We're just picking apart the given pointer into its constituent fields. + // These do correctly correspond to the fields as laid out in task_layout. Self { header: p as *const Header, schedule: p.add(task_layout.offset_s) as *const S, @@ -229,6 +250,8 @@ where Self::TASK_LAYOUT } /// Wakes a waker. + /// + /// Assumes ptr points to a valid task. unsafe fn wake(ptr: *const ()) { // This is just an optimization. If the schedule function has captured variables, then // we'll do less reference counting if we wake the waker by reference and then drop it. @@ -240,6 +263,8 @@ where let raw = Self::from_ptr(ptr); + // SAFETY: This is just loading the state. Note that this does implicitly create an + // &AtomicUsize, which is intentional. let mut state = (*raw.header).state.load(Ordering::Acquire); loop { @@ -295,6 +320,8 @@ where } /// Wakes a waker by reference. + /// + /// Assumes ptr points to a valid task. unsafe fn wake_by_ref(ptr: *const ()) { let raw = Self::from_ptr(ptr); @@ -346,6 +373,8 @@ where // because the schedule function cannot be destroyed while the waker is // still alive. let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); + // SAFETY: The task is still alive, so we can call its schedule + // function. (*raw.schedule).schedule(task, ScheduleInfo::new(false)); } @@ -394,9 +423,17 @@ where (*raw.header) .state .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); + // SAFETY: ptr still points to a valid task even though its refcount has dropped + // to zero. + // NOTE: We should make sure that the executor is properly dropping scheduled tasks + // with a refcount of zero. Self::schedule(ptr, ScheduleInfo::new(false)); } else { // Otherwise, destroy the task right away. + // NOTE: This isn't going to drop the output/result from the future. We have to + // have already dealt with it, so whoever is calling drop_waker needs to be + // checked. It looks like whoever sets the TASK bit to zero is affirming that they + // have moved or dropped the output/result. Self::destroy(ptr); } } @@ -435,6 +472,8 @@ where } let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); + // NOTE: The schedule function has to drop tasks with a refcount of zero. That's not + // happening in this function, so it has to be happening in the schedule member function. (*raw.schedule).schedule(task, info); } @@ -459,6 +498,9 @@ where /// /// The schedule function will be dropped, and the task will then get deallocated. /// The task must be closed before this function is called. + /// + /// NOTE: Whoever calls this function has to have already dealt with the return value of the + /// future or its error if it failed. We are not going to drop it! #[inline] unsafe fn destroy(ptr: *const ()) { let raw = Self::from_ptr(ptr); @@ -467,13 +509,18 @@ where // We need a safeguard against panics because destructors can panic. abort_on_panic(|| { // Drop the header along with the metadata. + // SAFETY: This points to a valid Header that we have permission to move out of and + // drop. (raw.header as *mut Header).drop_in_place(); // Drop the schedule function. + // SAFETY: This points to a valid S that we have permission to move out of and drop. (raw.schedule as *mut S).drop_in_place(); }); // Finally, deallocate the memory reserved by the task. + // SAFETY: We know that ptr was allocated with layout task_layout.layout, so deallocating + // it with the same layout is correct. alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); } @@ -482,9 +529,11 @@ where /// If polling its future panics, the task will be closed and the panic will be propagated into /// the caller. unsafe fn run(ptr: *const ()) -> bool { + // SAFETY: As long as it's a pointer to a valid task, we can get the raw form of it. let raw = Self::from_ptr(ptr); // Create a context from the raw task pointer and the vtable inside the its header. + // SAFETY: The implementation of RAW_WAKER_VTABLE is correct. let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE))); let cx = &mut Context::from_waker(&waker); @@ -507,6 +556,8 @@ where } // Drop the task reference. + // SAFETY: This pointer is definitely alive. The Waker that is registered into the + // executor holds it. Self::drop_ref(ptr); // Notify the awaiter that the future has been dropped. @@ -563,7 +614,10 @@ where match poll { Poll::Ready(out) => { // Replace the future with its output. + // SAFETY: We have exclusive access to the task so we can drop the future for it. Self::drop_future(ptr); + // SAFETY: raw.output definitely points to a valid memory location to hold the + // Output type of the future. raw.output.write(out); // The task is now completed. @@ -593,10 +647,12 @@ where // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { + // SAFETY: This is safe for the same reasons as we said earlier. awaiter = (*raw.header).take(None); } // Drop the task reference. + // SAFETY: We "own" the ref to this task and are allowed to drop it. Self::drop_ref(ptr); // Notify the awaiter that the future has been dropped. @@ -625,6 +681,9 @@ where if state & CLOSED != 0 && !future_dropped { // The thread that closed the task didn't drop the future because it was // running so now it's our responsibility to do so. + // SAFETY: This is corroborated by header.rs where they state that closing + // a task doesn't drop the future, it just marks it closed and puts it back + // in the polling queue so a poller can drop it. Self::drop_future(ptr); future_dropped = true; } @@ -648,6 +707,8 @@ where } // Drop the task reference. + // SAFETY: We're allowed to drop the ref as stated earlier. We + // checked that it won't accidentally be double-dropped. Self::drop_ref(ptr); // Notify the awaiter that the future has been dropped. @@ -657,10 +718,13 @@ where } else if state & SCHEDULED != 0 { // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. + // SAFETY: ptr definitely points to a valid task that hasn't been + // dropped. It has its SCHEDULED bit set. Self::schedule(ptr, ScheduleInfo::new(true)); return true; } else { // Drop the task reference. + // SAFETY: We're still allowed. Self::drop_ref(ptr); } break; @@ -697,6 +761,7 @@ where if state & CLOSED != 0 { // The thread that closed the task didn't drop the future because it // was running so now it's our responsibility to do so. + // SAFETY: If poll panicked then the thread didn't drop the future. RawTask::::drop_future(ptr); // Mark the task as not running and not scheduled. @@ -711,6 +776,7 @@ where } // Drop the task reference. + // SAFETY: We still have permission to drop a ref. RawTask::::drop_ref(ptr); // Notify the awaiter that the future has been dropped. @@ -729,6 +795,8 @@ where ) { Ok(state) => { // Drop the future because the task is now closed. + // SAFETY: This is effectively the same situation as earlier. + // TODO: DRY this up by refactoring this. RawTask::::drop_future(ptr); // Take the awaiter out. diff --git a/src/runnable.rs b/src/runnable.rs index 25d44dc..5f9b635 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -366,6 +366,7 @@ impl Builder { Fut::Output: Send + 'static, S: Schedule + Send + Sync + 'static, { + // SAFETY: This is reasonable because of the Send trait and 'static lifetime. unsafe { self.spawn_unchecked(future, schedule) } } @@ -438,6 +439,7 @@ impl Builder { self.id == thread_id(), "local task dropped by a thread that didn't spawn it" ); + // SAFETY: Drop is only called once per Checked, and Checked effectively owns F. unsafe { ManuallyDrop::drop(&mut self.inner); } @@ -452,6 +454,9 @@ impl Builder { self.id == thread_id(), "local task polled by a thread that didn't spawn it" ); + // SAFETY: The provided closure projects to a subfield, so the returned reference + // won't be movable as long as the original value is not movable. We're only + // passing this value to poll as pinned, so it won't get moved by us. unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } } } @@ -466,6 +471,9 @@ impl Builder { } }; + // SAFETY: We wrapped our future in a type that checks that we're on the correct thread + // before polling or dropping, so we'll panic if we ever go to poll or drop on the wrong + // thread instead of touching the given function which is !Send. unsafe { self.spawn_unchecked(future, schedule) } } @@ -570,6 +578,8 @@ where F::Output: Send + 'static, S: Schedule + Send + Sync + 'static, { + // SAFETY: F, F::Output, and S are Send + 'static, and S is additionally Sync so we're + // upholding all of the requirements to call spawn_unchecked. unsafe { spawn_unchecked(future, schedule) } } @@ -740,6 +750,14 @@ impl Runnable { let header = ptr as *const Header; mem::forget(self); + // SAFETY: vtable.schedule is unsafe to call with the implied requirements that ptr is + // non-null, and points to a live task. The assumption here is that our task is not + // currently scheduled because schedule consumes self (so a previous scheduling would have + // moved the value). However, canceling a task from the task half of the task-runnable pair + // also schedules the task. That sets the task as CLOSED, but we're not checking the CLOSED + // bit in our header before sending this task to the scheduler. So it's really on the + // scheduler to make sure that the same task doesn't get scheduled twice. Is this actually + // upheld in practice? unsafe { ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); } @@ -778,6 +796,9 @@ impl Runnable { let header = ptr as *const Header; mem::forget(self); + // SAFETY: Same as above, this API has mostly the same surface and is just a shim for the + // raw vtable behavior. This also has the same problem that run doesn't promise to check + // the header bits to avoid double-polling a cancelled task. unsafe { ((*header).vtable.run)(ptr) } } @@ -808,6 +829,9 @@ impl Runnable { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; + // SAFETY: This is also a shim for calling the raw clone_waker function. Additionally, we + // need to know that the returned RawWaker obeys the contracts for Waker and WakerVTable, + // but that's all implicit unfortunately. unsafe { let raw_waker = ((*header).vtable.clone_waker)(ptr); Waker::from_raw(raw_waker) @@ -815,6 +839,8 @@ impl Runnable { } fn header(&self) -> &Header { + // SAFETY: As long as Runnable exists, we're guaranteed that the pointed-to task won't be + // dropped. The header of a live task is always safe to alias shared. unsafe { &*(self.ptr.as_ptr() as *const Header) } } @@ -938,6 +964,8 @@ impl fmt::Debug for Runnable { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; + // TODO: Call .header() instead. That's why the convenience function for this exists in the + // first place. f.debug_struct("Runnable") .field("header", unsafe { &(*header) }) .finish() diff --git a/src/task.rs b/src/task.rs index 93990f5..0c9b77d 100644 --- a/src/task.rs +++ b/src/task.rs @@ -184,6 +184,9 @@ impl Task { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; + // TODO: This function claims that canceling a task will cause it to be scheduled so the + // executor can drop the future, but Runnable will drop the future in its own Drop, which + // should cause a race. This seems very bad. unsafe { let mut state = (*header).state.load(Ordering::Acquire); @@ -323,6 +326,8 @@ impl Task { let header = ptr as *const Header; unsafe { + // SAFETY: We only take the output if we're the ones who successfully set the CLOSED + // bit. let mut state = (*header).state.load(Ordering::Acquire); loop { @@ -332,6 +337,8 @@ impl Task { // dropped. if state & (SCHEDULED | RUNNING) != 0 { // Replace the waker with one associated with the current task. + // NOTE: This assumes that REGISTERING is mutually-exclusive with CLOSED | + // SCHEDULED | RUNNING. (*header).register(cx.waker()); // Reload the state after registering. It is possible changes occurred just @@ -413,6 +420,8 @@ impl Task { fn header(&self) -> &Header { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; + // SAFETY: We're holding a refcount on our allocated task header, so it won't be dropped + // while we're alive. unsafe { &*header } } @@ -423,6 +432,8 @@ impl Task { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; + // SAFETY: The only unsafe thing we're doing is dereferencing header, which is safe for the + // reason described above. unsafe { let state = (*header).state.load(Ordering::Acquire); state & (CLOSED | COMPLETED) != 0