diff --git a/rocketmq/src/arc_mut.rs b/rocketmq/src/arc_mut.rs new file mode 100644 index 00000000..1c74b5f3 --- /dev/null +++ b/rocketmq/src/arc_mut.rs @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#![allow(dead_code)] + +use std::cell::SyncUnsafeCell; +use std::hash::Hash; +use std::hash::Hasher; +use std::ops::Deref; +use std::ops::DerefMut; +use std::sync::Arc; +use std::sync::Weak; + +pub struct WeakArcMut { + inner: Weak>, +} + +// Implementation of PartialEq for WeakCellWrapper +impl PartialEq for WeakArcMut { + fn eq(&self, other: &Self) -> bool { + // Upgrade the Weak references to Arc, then compare the inner values + if let (Some(self_arc), Some(other_arc)) = (self.inner.upgrade(), other.inner.upgrade()) { + unsafe { *self_arc.get() == *other_arc.get() } + } else { + false + } + } +} + +// Implementation of Eq for WeakCellWrapper +impl Eq for WeakArcMut {} + +// Implementation of Hash for WeakCellWrapper +impl Hash for WeakArcMut { + fn hash(&self, state: &mut H) { + if let Some(arc) = self.inner.upgrade() { + unsafe { (*arc.get()).hash(state) } + } + } +} + +impl Clone for WeakArcMut { + fn clone(&self) -> Self { + WeakArcMut { + inner: self.inner.clone(), + } + } +} + +impl WeakArcMut { + pub fn upgrade(&self) -> Option> { + self.inner.upgrade().map(|value| ArcMut { inner: value }) + } +} + +#[derive(Default)] +pub struct ArcMut { + inner: Arc>, +} + +// Implementation of PartialEq for ArcRefCellWrapper +impl PartialEq for ArcMut { + fn eq(&self, other: &Self) -> bool { + // Compare the inner values by borrowing them unsafely + unsafe { *self.inner.get() == *other.inner.get() } + } +} + +impl Hash for ArcMut { + fn hash(&self, state: &mut H) { + // Compute the hash of the inner value + unsafe { (*self.inner.get()).hash(state) } + } +} + +// Implementation of Eq for ArcRefCellWrapper +// Eq implies PartialEq, so we don't need to add any methods here +impl Eq for ArcMut {} + +impl ArcMut { + #[allow(clippy::mut_from_ref)] + pub fn mut_from_ref(&self) -> &mut T { + unsafe { &mut *self.inner.get() } + } + + pub fn downgrade(this: &Self) -> WeakArcMut { + WeakArcMut { + inner: Arc::downgrade(&this.inner), + } + } + + pub fn get_inner(&self) -> &Arc> { + &self.inner + } +} + +impl ArcMut { + #[inline] + pub fn new(value: T) -> Self { + Self { + inner: Arc::new(SyncUnsafeCell::new(value)), + } + } +} + +impl Clone for ArcMut { + fn clone(&self) -> Self { + ArcMut { + inner: Arc::clone(&self.inner), + } + } +} + +impl AsRef for ArcMut { + fn as_ref(&self) -> &T { + unsafe { &*self.inner.get() } + } +} + +impl AsMut for ArcMut { + fn as_mut(&mut self) -> &mut T { + unsafe { &mut *self.inner.get() } + } +} + +impl Deref for ArcMut { + type Target = T; + fn deref(&self) -> &Self::Target { + self.as_ref() + } +} + +impl DerefMut for ArcMut { + fn deref_mut(&mut self) -> &mut Self::Target { + self.as_mut() + } +} + +pub struct SyncUnsafeCellWrapper { + inner: SyncUnsafeCell, +} + +impl SyncUnsafeCellWrapper { + #[inline] + pub fn new(value: T) -> Self { + Self { + inner: SyncUnsafeCell::new(value), + } + } +} + +impl SyncUnsafeCellWrapper { + #[allow(clippy::mut_from_ref)] + pub fn mut_from_ref(&self) -> &mut T { + unsafe { &mut *self.inner.get() } + } +} + +impl AsRef for SyncUnsafeCellWrapper { + fn as_ref(&self) -> &T { + unsafe { &*self.inner.get() } + } +} + +impl AsMut for SyncUnsafeCellWrapper { + fn as_mut(&mut self) -> &mut T { + &mut *self.inner.get_mut() + } +} + +impl Deref for SyncUnsafeCellWrapper { + type Target = T; + fn deref(&self) -> &Self::Target { + self.as_ref() + } +} + +impl DerefMut for SyncUnsafeCellWrapper { + fn deref_mut(&mut self) -> &mut Self::Target { + self.as_mut() + } +} + +#[cfg(test)] +mod arc_cell_wrapper_tests { + use std::sync::Arc; + + use super::*; + + #[test] + fn new_creates_arc_cell_wrapper_with_provided_value() { + let wrapper = ArcMut::new(10); + assert_eq!(*wrapper.as_ref(), 10); + } + + #[test] + fn clone_creates_a_new_instance_with_same_value() { + let wrapper = ArcMut::new(20); + let cloned_wrapper = wrapper.clone(); + assert_eq!(*cloned_wrapper.as_ref(), 20); + } + + #[test] + fn as_ref_returns_immutable_reference_to_value() { + let wrapper = ArcMut::new(30); + assert_eq!(*wrapper.as_ref(), 30); + } + + #[test] + fn as_mut_returns_mutable_reference_to_value() { + let mut wrapper = ArcMut::new(40); + *wrapper.as_mut() = 50; + assert_eq!(*wrapper.as_ref(), 50); + } + + #[test] + fn deref_returns_reference_to_inner_value() { + let wrapper = ArcMut::new(60); + assert_eq!(*wrapper, 60); + } + + #[test] + fn deref_mut_allows_modification_of_inner_value() { + let mut wrapper = ArcMut::new(70); + *wrapper = 80; + assert_eq!(*wrapper, 80); + } + + #[test] + fn multiple_clones_share_the_same_underlying_data() { + let wrapper = ArcMut::new(Arc::new(90)); + let cloned_wrapper1 = wrapper.clone(); + let cloned_wrapper2 = wrapper.clone(); + + assert_eq!(Arc::strong_count(wrapper.as_ref()), 1); + assert_eq!(Arc::strong_count(cloned_wrapper1.as_ref()), 1); + assert_eq!(Arc::strong_count(cloned_wrapper2.as_ref()), 1); + } +} diff --git a/rocketmq/src/lib.rs b/rocketmq/src/lib.rs index af1f3d8d..2e9ca42b 100644 --- a/rocketmq/src/lib.rs +++ b/rocketmq/src/lib.rs @@ -14,10 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#![feature(sync_unsafe_cell)] + +mod arc_mut; pub mod count_down_latch; pub mod rocketmq_tokio_lock; mod shutdown; +pub use arc_mut::ArcMut; +pub use arc_mut::SyncUnsafeCellWrapper; +pub use arc_mut::WeakArcMut; pub use count_down_latch::CountDownLatch; /// Re-export rocketmq main. pub use rocketmq::main;