From 336f6796b6cf0f083f42a594f383ef38903b59de Mon Sep 17 00:00:00 2001 From: Jason Toffaletti Date: Sun, 17 Nov 2013 01:11:34 -0800 Subject: [PATCH] improve work queue --- src/libstd/rt/work_queue.rs | 270 +++++++++++++++++++++++++++++++----- 1 file changed, 239 insertions(+), 31 deletions(-) diff --git a/src/libstd/rt/work_queue.rs b/src/libstd/rt/work_queue.rs index 24792f3904e51..c5211833a13f4 100644 --- a/src/libstd/rt/work_queue.rs +++ b/src/libstd/rt/work_queue.rs @@ -8,68 +8,276 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use container::Container; -use option::*; -use vec::OwnedVector; -use unstable::sync::Exclusive; -use cell::Cell; -use kinds::Send; +use vec; +use unstable::atomics::{atomic_store, atomic_load, AtomicUint, fence, SeqCst, Acquire, Release, Relaxed}; +use unstable::sync::{UnsafeArc, LittleLock}; +use cast; +use option::{Option, Some, None}; +use iter::range; use clone::Clone; +use kinds::Send; pub struct WorkQueue { - // XXX: Another mystery bug fixed by boxing this lock - priv queue: ~Exclusive<~[T]> + priv state: UnsafeArc>, } impl WorkQueue { pub fn new() -> WorkQueue { - WorkQueue { - queue: ~Exclusive::new(~[]) + WorkQueue::with_capacity(2) + } + + pub fn with_capacity(capacity: uint) -> WorkQueue { + WorkQueue{ + state: UnsafeArc::new(State::with_capacity(capacity)) } } pub fn push(&mut self, value: T) { + unsafe { (*self.state.get()).push(value) } + } + + pub fn pop(&mut self) -> Option { + unsafe { (*self.state.get()).pop() } + } + + pub fn steal(&mut self) -> Option { + unsafe { (*self.state.get()).steal() } + } + + pub fn is_empty(&mut self) -> bool { + unsafe { (*self.state.get()).is_empty() } + } + + pub fn len(&mut self) -> uint { + unsafe { (*self.state.get()).len() } + } +} + +impl Clone for WorkQueue { + fn clone(&self) -> WorkQueue { + WorkQueue { + state: self.state.clone() + } + } +} + +struct State { + array: ~[*mut T], + mask: uint, + headIndex: AtomicUint, + tailIndex: AtomicUint, + lock: LittleLock, +} + +impl State { + fn with_capacity(size: uint) -> State { + let mut state = State{ + array: vec::with_capacity(size), + mask: size-1, + headIndex: AtomicUint::new(0), + tailIndex: AtomicUint::new(0), + lock: LittleLock::new() + }; unsafe { - let value = Cell::new(value); - self.queue.with(|q| q.unshift(value.take()) ); + vec::raw::set_len(&mut state.array, size); } + state } - pub fn pop(&mut self) -> Option { + fn push(&mut self, value: T) { + let mut tail = self.tailIndex.load(Acquire); + if tail < self.headIndex.load(Acquire) + self.mask { + unsafe { + atomic_store(&mut self.array[tail & self.mask], cast::transmute(value), Relaxed); + } + self.tailIndex.store(tail+1, Release); + } else { + unsafe { + let value: *mut T = cast::transmute(value); + self.lock.lock(|| { + let head = self.headIndex.load(Acquire); + let count = self.len(); + if count >= self.mask { + let arraySize = self.array.len(); + let mask = self.mask; + let mut newArray = vec::with_capacity(arraySize*2); + vec::raw::set_len(&mut newArray, arraySize*2); + for i in range(0, count) { + newArray[i] = self.array[(i+head) & mask]; + } + self.array = newArray; + self.headIndex.store(0, Release); + self.tailIndex.store(count, Release); + tail = count; + self.mask = (mask * 2) | 1; + } + atomic_store(&mut self.array[tail & self.mask], value, Relaxed); + self.tailIndex.store(tail+1, Release); + }); + } + } + } + + fn pop(&mut self) -> Option { + let mut tail = self.tailIndex.load(Acquire); + if tail == 0 { + return None + } + tail -= 1; + self.tailIndex.store(tail, Release); + fence(SeqCst); unsafe { - do self.queue.with |q| { - if !q.is_empty() { - Some(q.shift()) - } else { - None - } + if self.headIndex.load(Acquire) <= tail { + Some(cast::transmute(atomic_load(&mut self.array[tail & self.mask], Relaxed))) + } else { + self.lock.lock(|| { + if self.headIndex.load(Acquire) <= tail { + Some(cast::transmute(atomic_load(&mut self.array[tail & self.mask], Relaxed))) + } else { + self.tailIndex.store(tail+1, Release); + None + } + }) } } } - pub fn steal(&mut self) -> Option { + fn steal(&mut self) -> Option { unsafe { - do self.queue.with |q| { - if !q.is_empty() { - Some(q.pop()) + match self.lock.try_lock(|| { + let head = self.headIndex.load(Acquire); + self.headIndex.store(head+1, Release); + fence(SeqCst); + if head < self.tailIndex.load(Acquire) { + Some(cast::transmute(atomic_load(&mut self.array[head & self.mask], Relaxed))) } else { + self.headIndex.store(head, Release); None } + }) { + Some(T) => T, + None => None } } } - pub fn is_empty(&self) -> bool { - unsafe { - self.queue.with_imm(|q| q.is_empty() ) - } + fn is_empty(&self) -> bool { + self.headIndex.load(Acquire) >= self.tailIndex.load(Acquire) + } + + fn len(&self) -> uint { + self.tailIndex.load(Acquire) - self.headIndex.load(Acquire) } } -impl Clone for WorkQueue { - fn clone(&self) -> WorkQueue { - WorkQueue { - queue: self.queue.clone() +#[cfg(test)] +mod tests { + use prelude::*; + use task; + use comm; + use unstable::sync::{UnsafeArc}; + use unstable::atomics::{AtomicUint, Relaxed}; + use super::WorkQueue; + + #[test] + fn test() { + let mut q = WorkQueue::with_capacity(10); + q.push(1); + assert_eq!(Some(1), q.pop()); + assert_eq!(None, q.steal()); + q.push(2); + assert_eq!(Some(2), q.steal()); + } + + #[test] + fn test_grow() { + let mut q = WorkQueue::with_capacity(2); + q.push(1); + assert_eq!(Some(1), q.pop()); + assert_eq!(None, q.steal()); + q.push(2); + assert_eq!(Some(2), q.steal()); + q.push(3); + q.push(4); + assert_eq!(Some(4), q.pop()); + assert_eq!(Some(3), q.pop()); + assert_eq!(None, q.steal()); + } + + #[test] + fn test_steal() { + let work_units = 1000u; + let stealers = 8u; + let q = WorkQueue::with_capacity(100); + let counter = UnsafeArc::new(AtomicUint::new(0)); + let mut completion_ports = ~[]; + + let (port, chan) = comm::stream(); + let (completion_port, completion_chan) = comm::stream(); + completion_ports.push(completion_port); + chan.send(q.clone()); + { + let counter = counter.clone(); + do task::spawn_sched(task::SingleThreaded) { + let mut q = port.recv(); + for i in range(0, work_units) { + q.push(i); + } + + let mut count = 0u; + loop { + match q.pop() { + Some(_) => unsafe { + count += 1; + (*counter.get()).fetch_add(1, Relaxed); + // simulate work + task::deschedule(); + }, + None => break, + } + } + debug!("count: {}", count); + completion_chan.send(0); + } + } + + for _ in range(0, stealers) { + let (port, chan) = comm::stream(); + let (completion_port, completion_chan) = comm::stream(); + completion_ports.push(completion_port); + chan.send(q.clone()); + let counter = counter.clone(); + do task::spawn_sched(task::SingleThreaded) { + let mut count = 0u; + let mut q = port.recv(); + loop { + match q.steal() { + Some(_) => unsafe { + count += 1; + (*counter.get()).fetch_add(1, Relaxed); + }, + None => (), + } + // simulate work + task::deschedule(); + unsafe { + if (*counter.get()).load(Relaxed) == work_units { + break + } + } + } + debug!("count: {}", count); + completion_chan.send(0); + } + } + + // wait for all tasks to finish work + for completion_port in completion_ports.iter() { + assert_eq!(0, completion_port.recv()); + } + + unsafe { + assert_eq!(work_units, (*counter.get()).load(Relaxed)); } } }