diff --git a/src/barrier.rs b/src/barrier.rs index bacce90..259ac54 100644 --- a/src/barrier.rs +++ b/src/barrier.rs @@ -19,13 +19,13 @@ use {Pulse, Signal, Waiting, Signals}; pub struct Inner { pub count: AtomicUsize, - pub trigger: Mutex> + pub trigger: Mutex>, } /// A `Barrier` can listen for 1 or more `Signals`. It will only transition /// to a `Pulsed` state once all the `Signals` have `Pulsed`. pub struct Barrier { - inner: Arc + inner: Arc, } pub struct Handle(pub Arc); @@ -34,9 +34,9 @@ impl Barrier { /// Create a new Barrier from an Vector of `Siganl`s pub fn new(pulses: &[Signal]) -> Barrier { // count items - let inner = Arc::new(Inner{ + let inner = Arc::new(Inner { count: AtomicUsize::new(pulses.len()), - trigger: Mutex::new(None) + trigger: Mutex::new(None), }); for pulse in pulses { @@ -45,13 +45,13 @@ impl Barrier { Barrier { inner: inner } } - } impl Signals for Barrier { fn signal(&self) -> Signal { - let mut guard = self.inner.trigger.lock().unwrap(); let (p, t) = Signal::new(); + + let mut guard = self.inner.trigger.lock().unwrap(); if self.inner.count.load(Ordering::Relaxed) == 0 { t.pulse(); } else { diff --git a/src/lib.rs b/src/lib.rs index 5c0e1ea..f874e76 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,7 +15,7 @@ extern crate atom; extern crate time; -use std::sync::atomic::{AtomicUsize}; +use std::sync::atomic::AtomicUsize; use std::thread; use std::mem; use std::fmt; @@ -38,7 +38,7 @@ mod fnbox; /// and Waiting is Dropped struct Inner { state: AtomicUsize, - waiting: Atom> + waiting: Atom>, } // TODO 64bit sized, probably does not matter now @@ -49,12 +49,12 @@ const REF_COUNT: usize = !TX_FLAGS; struct Waiting { next: Option>, - wake: Wake + wake: Wake, } impl GetNextMut for Box { type NextPtr = Option>; - + fn get_next(&mut self) -> &mut Option> { &mut self.next } @@ -64,7 +64,7 @@ enum Wake { Thread(thread::Thread), Select(select::Handle), Barrier(barrier::Handle), - Callback(Box) + Callback(Box), } impl Waiting { @@ -106,28 +106,30 @@ impl Waiting { fn thread() -> Box { Box::new(Waiting { next: None, - wake: Wake::Thread(thread::current()) + wake: Wake::Thread(thread::current()), }) } fn select(handle: select::Handle) -> Box { - Box::new(Waiting{ + Box::new(Waiting { next: None, - wake: Wake::Select(handle) + wake: Wake::Select(handle), }) } fn barrier(handle: barrier::Handle) -> Box { - Box::new(Waiting{ + Box::new(Waiting { next: None, - wake: Wake::Barrier(handle) + wake: Wake::Barrier(handle), }) } - fn callback(cb: F) -> Box where F: FnOnce() + 'static { - Box::new(Waiting{ + fn callback(cb: F) -> Box + where F: FnOnce() + 'static + { + Box::new(Waiting { next: None, - wake: Wake::Callback(Box::new(cb)) + wake: Wake::Callback(Box::new(cb)), }) } } @@ -142,7 +144,7 @@ unsafe impl Sync for Pulse {} /// as to never allow it to fire again. `Dropping` a pulse will `pulse` /// The signal, but the signal will enter an error state. pub struct Pulse { - inner: *mut Inner + inner: *mut Inner, } impl fmt::Debug for Pulse { @@ -154,10 +156,8 @@ impl fmt::Debug for Pulse { fn delete_inner(state: usize, inner: *mut Inner) { if state & REF_COUNT == 1 { - let inner: Box = unsafe { - mem::transmute(inner) - }; - drop(inner); + let inner: Box = unsafe { mem::transmute(inner) }; + drop(inner); } } @@ -174,9 +174,7 @@ impl Pulse { /// Create a Pulse from a usize. This is naturally unsafe. #[inline] pub unsafe fn cast_from_usize(ptr: usize) -> Pulse { - Pulse { - inner: mem::transmute(ptr) - } + Pulse { inner: mem::transmute(ptr) } } /// Convert a trigger to a `usize`, This is unsafe @@ -203,7 +201,7 @@ impl Pulse { let id = unsafe { mem::transmute(self.inner) }; match self.inner().waiting.take() { None => (), - Some(v) => Waiting::wake(v, id) + Some(v) => Waiting::wake(v, id), } } @@ -232,12 +230,15 @@ unsafe impl Sync for Signal {} /// pulse has fired, and no longer exists. Errored means the pulse was dropped /// without firing. This normally means a programming error of some sort. pub struct Signal { - inner: *mut Inner + inner: *mut Inner, } impl fmt::Debug for Signal { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "Signal(id={:?}, pending={:?})", self.id(), self.is_pending()) + write!(f, + "Signal(id={:?}, pending={:?})", + self.id(), + self.is_pending()) } } @@ -262,27 +263,22 @@ impl Signal { pub fn new() -> (Signal, Pulse) { let inner = Box::new(Inner { state: AtomicUsize::new(2), - waiting: Atom::empty() + waiting: Atom::empty(), }); - let inner = unsafe {mem::transmute(inner)}; + let inner = unsafe { mem::transmute(inner) }; - (Signal { - inner: inner - }, - Pulse { - inner: inner, - }) + (Signal { inner: inner }, Pulse { inner: inner }) } /// Create a signal that is already pulsed pub fn pulsed() -> Signal { let inner = Box::new(Inner { state: AtomicUsize::new(1 | PULSED), - waiting: Atom::empty() + waiting: Atom::empty(), }); - let inner = unsafe {mem::transmute(inner)}; + let inner = unsafe { mem::transmute(inner) }; Signal { inner: inner } } @@ -299,7 +295,7 @@ impl Signal { match (flags & TX_DROP == TX_DROP, flags & PULSED == PULSED) { (_, true) => SignalState::Pulsed, (true, _) => SignalState::Dropped, - (_, _) => SignalState::Pending + (_, _) => SignalState::Pending, } } @@ -346,7 +342,7 @@ impl Signal { let id = self.add_to_waitlist(waiter); ArmedSignal { id: id, - pulse: self + pulse: self, } } @@ -382,7 +378,9 @@ impl Signal { }) } - pub fn callback(self, cb: F) where F: FnOnce() + 'static { + pub fn callback(self, cb: F) + where F: FnOnce() + 'static + { self.add_to_waitlist(Waiting::callback(cb)); } } @@ -392,7 +390,7 @@ impl Signal { pub enum SignalState { Pending, Pulsed, - Dropped + Dropped, } impl IntoRawPtr for Pulse { @@ -431,7 +429,7 @@ impl FromRawPtr for Signal { #[derive(Debug, PartialEq, Eq)] pub enum WaitError { /// The `Pulse` was dropped before it could `Pulse` - Dropped + Dropped, } /// Represents the possible errors from a wait timeout @@ -440,18 +438,20 @@ pub enum TimeoutError { /// A `WaitError` has occurred Error(WaitError), /// The `Signal` timed-out before a `Pulse` was observed. - Timeout + Timeout, } struct ArmedSignal { pulse: Signal, - id: usize + id: usize, } impl Deref for ArmedSignal { type Target = Signal; - fn deref(&self) -> &Signal { &self.pulse } + fn deref(&self) -> &Signal { + &self.pulse + } } impl ArmedSignal { @@ -511,7 +511,7 @@ impl Scheduler for ThreadScheduler { match signal.state() { SignalState::Pending => (), SignalState::Pulsed => return Ok(()), - SignalState::Dropped => return Err(WaitError::Dropped) + SignalState::Dropped => return Err(WaitError::Dropped), } } } @@ -525,7 +525,7 @@ impl Scheduler for ThreadScheduler { if signal.is_pending() { now = (precise_time_s() * 1000.) as u64; if now > end { - return Err(TimeoutError::Timeout) + return Err(TimeoutError::Timeout); } thread::park_timeout_ms((end - now) as u32); } @@ -534,7 +534,7 @@ impl Scheduler for ThreadScheduler { match signal.state() { SignalState::Pending => (), SignalState::Pulsed => return Ok(()), - SignalState::Dropped => return Err(TimeoutError::Error(WaitError::Dropped)) + SignalState::Dropped => return Err(TimeoutError::Error(WaitError::Dropped)), } } } @@ -566,8 +566,10 @@ pub fn swap_scheduler(sched: Box) -> Option> { } /// Call the suppled closure using the supplied schedulee -pub fn with_scheduler(f: F, sched: Box) -> Option> where F: FnOnce() { +pub fn with_scheduler(f: F, sched: Box) -> Option> + where F: FnOnce() +{ let old = swap_scheduler(sched); f(); old.and_then(|o| swap_scheduler(o)) -} \ No newline at end of file +} diff --git a/src/select.rs b/src/select.rs index 81bae01..701bf02 100644 --- a/src/select.rs +++ b/src/select.rs @@ -19,7 +19,7 @@ use {Signal, ArmedSignal, Pulse, Waiting, Barrier, Signals}; pub struct Inner { pub ready: Vec, - pub trigger: Option + pub trigger: Option, } pub struct Handle(pub Arc>); @@ -31,18 +31,18 @@ pub struct Handle(pub Arc>); /// pending. pub struct Select { inner: Arc>, - signals: HashMap + signals: HashMap, } impl Select { /// Create a new empty `Select` pub fn new() -> Select { Select { - inner: Arc::new(Mutex::new(Inner{ + inner: Arc::new(Mutex::new(Inner { ready: Vec::new(), - trigger: None + trigger: None, })), - signals: HashMap::new() + signals: HashMap::new(), } } @@ -58,17 +58,17 @@ impl Select { /// Remove a `Signal1 from the `Select` using it's unique id. pub fn remove(&mut self, id: usize) -> Option { - self.signals.remove(&id) + self.signals + .remove(&id) .map(|x| x.disarm()) } /// Convert all the signals present in the `Select` into a `Barrier` pub fn into_barrier(self) -> Barrier { - let vec: Vec = - self.signals - .into_iter() - .map(|(_, p)| p.disarm()) - .collect(); + let vec: Vec = self.signals + .into_iter() + .map(|(_, p)| p.disarm()) + .collect(); Barrier::new(&vec) } @@ -79,7 +79,7 @@ impl Select { pub fn try_next(&mut self) -> Option { let mut guard = self.inner.lock().unwrap(); if let Some(x) = guard.ready.pop() { - return Some(self.signals.remove(&x).map(|x| x.disarm()).unwrap()) + return Some(self.signals.remove(&x).map(|x| x.disarm()).unwrap()); } None } @@ -124,7 +124,7 @@ impl Signals for Select { } else { t.pulse(); } - pulse + pulse } } @@ -133,7 +133,7 @@ impl Signals for Select { /// will return an supplied object. pub struct SelectMap { select: Select, - items: HashMap + items: HashMap, } impl SelectMap { @@ -141,7 +141,7 @@ impl SelectMap { pub fn new() -> SelectMap { SelectMap { select: Select::new(), - items: HashMap::new() + items: HashMap::new(), } } @@ -162,7 +162,9 @@ impl SelectMap { } /// Get the number of items in the `SelectMap` - pub fn len(&self) -> usize { self.items.len() } + pub fn len(&self) -> usize { + self.items.len() + } } impl Iterator for SelectMap { @@ -171,7 +173,7 @@ impl Iterator for SelectMap { fn next(&mut self) -> Option<(Signal, T)> { self.select.next().map(|x| { let id = x.id(); - (x, self.items.remove(&id).unwrap()) + (x, self.items.remove(&id).unwrap()) }) } }