From 6244e731c44a0b0e91c58c64ee49dc1f53df3b24 Mon Sep 17 00:00:00 2001 From: Nick Fitzgerald Date: Tue, 20 Feb 2024 09:47:40 -0800 Subject: [PATCH] Avoid taking a write lock in `RegisteredType::{root,clone,drop}` By moving the registration count into the `Arc`, that is pulling the `Arc` outwards from containing just the `WasmFuncType` to the registration count as well, and turning it into an atomic, we can manipulate the registration count without a write lock. Once that is done, we have the following: * `RegisteredType::root` only needs a read lock, not a write lock. * `RegisteredType::clone`, which used to need a write lock, doesn't need any locking anymore. * `RegisteredType::drop` doesn't need any locking most of the time. The exception is when this is this drop that moves the refcount to zero, in which case grabbing a write lock is still necessary to remove the type from the registry. --- crates/wasmtime/src/runtime/type_registry.rs | 363 +++++++++++-------- 1 file changed, 207 insertions(+), 156 deletions(-) diff --git a/crates/wasmtime/src/runtime/type_registry.rs b/crates/wasmtime/src/runtime/type_registry.rs index a08a5ea78cdc..22685551c368 100644 --- a/crates/wasmtime/src/runtime/type_registry.rs +++ b/crates/wasmtime/src/runtime/type_registry.rs @@ -5,10 +5,19 @@ use crate::Engine; use std::{ - collections::HashMap, + borrow::Borrow, + collections::{HashMap, HashSet}, convert::TryFrom, fmt::Debug, - sync::{Arc, RwLock}, + hash::{Hash, Hasher}, + ops::Deref, + sync::{ + atomic::{ + AtomicUsize, + Ordering::{AcqRel, Acquire}, + }, + Arc, RwLock, + }, }; use wasmtime_environ::{ EngineOrModuleTypeIndex, ModuleInternedTypeIndex, ModuleTypes, PrimaryMap, TypeTrace, @@ -140,7 +149,7 @@ impl Drop for TypeCollection { .0 .write() .unwrap() - .unregister_types(self); + .unregister_type_collection(self); } } } @@ -160,80 +169,61 @@ fn entry_index(index: VMSharedTypeIndex) -> usize { /// Dereferences to its underlying `WasmFuncType`. pub struct RegisteredType { engine: Engine, - index: VMSharedTypeIndex, - - // This field is not *strictly* necessary to have in this type, since we - // could always grab the registry's lock and look it up by index, but - // holding this reference should make accessing the actual type that much - // cheaper. - ty: Arc, + entry: OccupiedEntry, } impl Debug for RegisteredType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let RegisteredType { - engine: _, - index, - ty, - } = self; + let RegisteredType { engine: _, entry } = self; f.debug_struct("RegisteredType") - .field("index", index) - .field("ty", ty) + .field("entry", entry) .finish_non_exhaustive() } } impl Clone for RegisteredType { fn clone(&self) -> Self { - { - let i = entry_index(self.index); - let mut registry = self.engine.signatures().0.write().unwrap(); - let entry = registry.entries[i].unwrap_occupied_mut(); - entry.references += 1; - log::trace!( - "cloned registered type {:?} (references -> {})", - self.index, - entry.references - ); - } - + self.entry.incref("cloning RegisteredType"); RegisteredType { engine: self.engine.clone(), - index: self.index, - ty: Arc::clone(&self.ty), + entry: self.entry.clone(), } } } impl Drop for RegisteredType { fn drop(&mut self) { - self.engine - .signatures() - .0 - .write() - .unwrap() - .unregister_entry(self.index); + if self.entry.decref("dropping RegisteredType") { + self.engine + .signatures() + .0 + .write() + .unwrap() + .unregister_entry(self.entry.0.index); + } } } impl std::ops::Deref for RegisteredType { - type Target = Arc; + type Target = WasmFuncType; fn deref(&self) -> &Self::Target { - &self.ty + &self.entry.0.ty } } impl PartialEq for RegisteredType { fn eq(&self, other: &Self) -> bool { - let eq = Arc::ptr_eq(&self.ty, &other.ty); + let eq = Arc::ptr_eq(&self.entry.0, &other.entry.0); if cfg!(debug_assertions) { if eq { - assert_eq!(self.index, other.index); assert!(Engine::same(&self.engine, &other.engine)); } else { - assert!(self.index != other.index || !Engine::same(&self.engine, &other.engine)); + assert!( + self.entry.0.index != other.entry.0.index + || !Engine::same(&self.engine, &other.engine) + ); } } @@ -243,9 +233,9 @@ impl PartialEq for RegisteredType { impl Eq for RegisteredType {} -impl std::hash::Hash for RegisteredType { - fn hash(&self, state: &mut H) { - let ptr = Arc::as_ptr(&self.ty); +impl Hash for RegisteredType { + fn hash(&self, state: &mut H) { + let ptr = Arc::as_ptr(&self.entry.0); ptr.hash(state); } } @@ -254,7 +244,7 @@ impl RegisteredType { /// Constructs a new `RegisteredType`, registering the given type with the /// engine's `TypeRegistry`. pub fn new(engine: &Engine, ty: WasmFuncType) -> RegisteredType { - let (index, ty) = { + let entry = { let mut inner = engine.signatures().0.write().unwrap(); log::trace!("RegisteredType::new({ty:?})"); @@ -270,7 +260,7 @@ impl RegisteredType { inner.register_canonicalized(ty) }; - RegisteredType::from_parts(engine.clone(), index, ty) + RegisteredType::from_parts(engine.clone(), entry) } /// Create an owning handle to the given index's associated type. @@ -281,34 +271,37 @@ impl RegisteredType { /// Returns `None` if `index` is not registered in the given engine's /// registry. pub fn root(engine: &Engine, index: VMSharedTypeIndex) -> Option { - let i = entry_index(index); - let ty = { - let mut inner = engine.signatures().0.write().unwrap(); - let e = inner.entries.get_mut(i)?.as_occupied_mut()?; - e.references += 1; - log::trace!("rooting {index:?} (references -> {})", e.references); - Arc::clone(&e.ty) + let entry = { + let i = entry_index(index); + let inner = engine.signatures().0.read().unwrap(); + let e = inner.entries.get(i)?.as_occupied()?; + + // NB: make sure to incref while the lock is held to prevent: + // + // * This thread: read locks registry, gets entry E, unlocks registry + // * Other thread: drops `RegisteredType` for entry E, decref + // reaches zero, unregisters entry + // * This thread: increfs entry, but it isn't in the registry anymore + e.incref("RegisteredType::root"); + + e.clone() }; - Some(RegisteredType::from_parts(engine.clone(), index, ty)) + + Some(RegisteredType::from_parts(engine.clone(), entry)) } /// Construct a new `RegisteredType`. /// /// It is the caller's responsibility to ensure that the entry's reference /// count has already been incremented. - fn from_parts(engine: Engine, index: VMSharedTypeIndex, ty: Arc) -> Self { - debug_assert!({ - let registry = engine.signatures().0.read().unwrap(); - let i = entry_index(index); - let e = registry.entries[i].as_occupied().unwrap(); - e.references > 0 - }); - RegisteredType { engine, index, ty } + fn from_parts(engine: Engine, entry: OccupiedEntry) -> Self { + debug_assert!(entry.0.registrations.load(Acquire) != 0); + RegisteredType { engine, entry } } /// Get this registered type's index. pub fn index(&self) -> VMSharedTypeIndex { - self.index + self.entry.0.index } /// Get the engine whose registry this type is registered within. @@ -317,10 +310,72 @@ impl RegisteredType { } } +/// A Wasm function type, its `VMSharedTypeIndex`, and its registration count. #[derive(Debug)] -struct OccupiedEntry { - ty: Arc, - references: usize, +struct OccupiedEntryInner { + ty: WasmFuncType, + index: VMSharedTypeIndex, + registrations: AtomicUsize, +} + +/// Implements `Borrow`, `Eq`, and `Hash` by forwarding to the underlying Wasm +/// function type, so that this can be a hash consing key in +/// `TypeRegistryInner::map`. +#[derive(Clone, Debug)] +struct OccupiedEntry(Arc); + +impl Deref for OccupiedEntry { + type Target = WasmFuncType; + + fn deref(&self) -> &Self::Target { + &self.0.ty + } +} + +impl PartialEq for OccupiedEntry { + fn eq(&self, other: &Self) -> bool { + self.0.ty == other.0.ty + } +} + +impl Eq for OccupiedEntry {} + +impl Hash for OccupiedEntry { + fn hash(&self, state: &mut H) { + self.0.ty.hash(state); + } +} + +impl Borrow for OccupiedEntry { + fn borrow(&self) -> &WasmFuncType { + &self.0.ty + } +} + +impl OccupiedEntry { + /// Increment the registration count. + fn incref(&self, why: &str) { + let old_count = self.0.registrations.fetch_add(1, AcqRel); + log::trace!( + "increment registration count for {:?} (registrations -> {}): {why}", + self.0.index, + old_count + 1 + ); + } + + /// Decrement the registration count and return `true` if the registration + /// count reached zero and this entry should be removed from the registry. + #[must_use = "caller must remove entry from registry if `decref` returns `true`"] + fn decref(&self, why: &str) -> bool { + let old_count = self.0.registrations.fetch_add(1, AcqRel); + debug_assert_ne!(old_count, 0); + log::trace!( + "decrement registration count for {:?} (registrations -> {}): {why}", + self.0.index, + old_count - 1 + ); + old_count == 1 + } } #[derive(Debug)] @@ -352,17 +407,10 @@ impl RegistryEntry { } } - fn as_occupied_mut(&mut self) -> Option<&mut OccupiedEntry> { - match self { - Self::Occupied(o) => Some(o), - Self::Vacant { .. } => None, - } - } - - fn unwrap_occupied_mut(&mut self) -> &mut OccupiedEntry { + fn unwrap_occupied(&self) -> &OccupiedEntry { match self { Self::Occupied(o) => o, - Self::Vacant { .. } => panic!("unwrap_occupied_mut on vacant entry"), + Self::Vacant { .. } => panic!("unwrap_occupied on vacant entry"), } } @@ -378,7 +426,7 @@ impl RegistryEntry { struct TypeRegistryInner { // A map from the Wasm function type to a `VMSharedTypeIndex`, for all // the Wasm function types we have already registered. - map: HashMap, VMSharedTypeIndex>, + map: HashSet, // A map from `VMSharedTypeIndex::bits()` to the type index's associated // Wasm type. @@ -410,8 +458,8 @@ impl TypeRegistryInner { for (idx, ty) in types.wasm_types() { let mut ty = ty.clone(); self.canonicalize(&map, &mut ty); - let (shared_type_index, _) = self.register_canonicalized(ty); - let map_idx = map.push(shared_type_index); + let entry = self.register_canonicalized(ty); + let map_idx = map.push(entry.0.index); assert_eq!(idx, map_idx); } map @@ -501,18 +549,14 @@ impl TypeRegistryInner { /// The type must be canonicalized and must not already exist in the /// registry. /// - /// Does not increment the new entry's reference count, that is the - /// responsibility of callers. - fn register_new(&mut self, ty: Arc) -> VMSharedTypeIndex { + /// Initializes the new entry's registration count to one, and callers + /// should not further increment the registration count. + fn register_new(&mut self, ty: WasmFuncType) -> OccupiedEntry { assert!( self.is_canonicalized(&ty), "ty is not already canonicalized: {ty:?}" ); - let index = self.alloc_vacant_entry(); - let old_map_entry = self.map.insert(ty.clone(), index); - assert!(old_map_entry.is_none()); - // Increment the ref count of each existing type that is referenced from // this new type. Those types shouldn't be dropped while this type is // still alive. @@ -520,64 +564,62 @@ impl TypeRegistryInner { EngineOrModuleTypeIndex::Engine(id) => { let id = VMSharedTypeIndex::new(id); let i = entry_index(id); - let e = self.entries[i].unwrap_occupied_mut(); - e.references += 1; - log::trace!( - "new type has edge to {id:?} (references -> {})", - e.references - ); + let e = self.entries[i].unwrap_occupied(); + e.incref("new type references existing type in TypeRegistryInner::register_new"); Ok(()) } EngineOrModuleTypeIndex::Module(_) => unreachable!("should be canonicalized"), }) .unwrap(); + let index = self.alloc_vacant_entry(); + let entry = OccupiedEntry(Arc::new(OccupiedEntryInner { + ty, + index, + registrations: AtomicUsize::new(1), + })); + let is_new_entry = self.map.insert(entry.clone()); + assert!(is_new_entry); + let i = entry_index(index); assert!(self.entries[i].is_vacant()); - self.entries[i] = RegistryEntry::Occupied(OccupiedEntry { - ty, - // NB: It is the caller's responsibility to increment this. - references: 0, - }); + self.entries[i] = RegistryEntry::Occupied(entry.clone()); - index + entry } /// Register the given canonicalized type, incrementing its reference count. - fn register_canonicalized( - &mut self, - ty: WasmFuncType, - ) -> (VMSharedTypeIndex, Arc) { + fn register_canonicalized(&mut self, ty: WasmFuncType) -> OccupiedEntry { assert!( self.is_canonicalized(&ty), - "ty is not already canonicalized: {ty:?}" + "type is not already canonicalized: {ty:?}" ); - let index = if let Some(i) = self.map.get(&ty) { - *i + if let Some(entry) = self.map.get(&ty) { + entry.incref( + "registering already-registered type in TypeRegistryInner::register_canonicalized", + ); + entry.clone() } else { - self.register_new(Arc::new(ty)) - }; - - let i = entry_index(index); - let entry = self.entries[i].unwrap_occupied_mut(); - entry.references += 1; - - log::trace!( - "registered {index:?} = {:?} (references -> {})", - entry.ty, - entry.references - ); - - (index, Arc::clone(&entry.ty)) + self.register_new(ty) + } } - fn unregister_types(&mut self, collection: &TypeCollection) { - for (_, index) in collection.types.iter() { - self.unregister_entry(*index); + fn unregister_type_collection(&mut self, collection: &TypeCollection) { + for (_, id) in collection.types.iter() { + let i = entry_index(*id); + let e = self.entries[i].unwrap_occupied(); + if e.decref("TypeRegistryInner::unregister_type_collection") { + self.unregister_entry(*id); + } } } + /// Remove an entry from the registry. + /// + /// This does *not* decrement the entry's registration count, it should + /// instead be invoked after a previous decrement operation observed zero + /// remaining registrations. fn unregister_entry(&mut self, index: VMSharedTypeIndex) { log::trace!("unregistering {index:?}"); @@ -586,41 +628,50 @@ impl TypeRegistryInner { while let Some(id) = self.drop_stack.pop() { let i = entry_index(id); - let entry = self.entries[i].unwrap_occupied_mut(); - - assert!(entry.references > 0); - entry.references -= 1; - log::trace!( - "unregistered {index:?} (references -> {})", - entry.references - ); + let entry = self.entries[i].unwrap_occupied(); + + // We need to double check whether the entry is still at zero + // registrations: Between the time that we observed a zero and + // acquired the lock to call this function, another thread could + // have registered the type and found the 0-registrations entry in + // `self.map` and incremented its count. + // + // We don't need to worry about any concurrent increments during + // this function's invocation after we check for zero because we + // have exclusive access to `&mut self` and therefore no one can + // create a new reference to this entry and bring it back to life. + if entry.0.registrations.load(Acquire) != 0 { + continue; + } - if entry.references == 0 { - // Enqueue the other types that are (shallowly/non-transitively) - // referenced from this type for having their ref count - // decremented as well. This type is no longer holding them - // alive. - entry - .ty - .trace::<_, ()>(&mut |idx| match idx { - EngineOrModuleTypeIndex::Engine(child_id) => { - let child_id = VMSharedTypeIndex::new(child_id); - log::trace!("dropping {id:?} enqueues {child_id:?} for unregistration"); + // Decrement any other types that this type was + // (shallowly/non-transitively) keeping alive. + entry + .0 + .ty + .trace::<_, ()>(&mut |idx| match idx { + EngineOrModuleTypeIndex::Engine(child_id) => { + let child_id = VMSharedTypeIndex::new(child_id); + let child_index = entry_index(child_id); + let child_entry = self.entries[child_index].unwrap_occupied(); + if child_entry.decref( + "referenced by unregistered type in TypeCollection::unregister_entry", + ) { self.drop_stack.push(child_id); - Ok(()) } - EngineOrModuleTypeIndex::Module(_) => { - unreachable!("should be canonicalized") - } - }) - .unwrap(); - - self.map.remove(&entry.ty); - self.entries[i] = RegistryEntry::Vacant { - next_vacant: self.first_vacant.take(), - }; - self.first_vacant = Some(index); - } + Ok(()) + } + EngineOrModuleTypeIndex::Module(_) => { + unreachable!("should be canonicalized") + } + }) + .unwrap(); + + self.map.remove(entry); + self.entries[i] = RegistryEntry::Vacant { + next_vacant: self.first_vacant.take(), + }; + self.first_vacant = Some(id); } } } @@ -662,10 +713,10 @@ impl TypeRegistry { /// still using the resulting value! Use the `RegisteredType::root` /// constructor if you need to ensure that property and you don't have some /// other mechanism already keeping the type registered. - pub fn borrow(&self, index: VMSharedTypeIndex) -> Option> { + pub fn borrow(&self, index: VMSharedTypeIndex) -> Option> { let i = entry_index(index); let inner = self.0.read().unwrap(); let e = inner.entries.get(i)?; - Some(e.as_occupied()?.ty.clone()) + e.as_occupied().cloned() } }