1  /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
   2   * Redistribution and use in source and binary forms, with or without
   3   * modification, are permitted provided that the following conditions are met:
   4   *
   5   *    1. Redistributions of source code must retain the above copyright notice,
   6   *       this list of conditions and the following disclaimer.
   7   *
   8   *    2. Redistributions in binary form must reproduce the above copyright
   9   *       notice, this list of conditions and the following disclaimer in the
  10   *       documentation and/or other materials provided with the distribution.
  11   *
  12   * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
  13   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
  14   * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
  15   * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  16   * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  17   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  18   * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  19   * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
  20   * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
  21   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  22   *
  23   * The views and conclusions contained in the software and documentation are
  24   * those of the authors and should not be interpreted as representing official
  25   * policies, either expressed or implied, of Dmitry Vyukov.
  26   */
  27  
  28  //! A mostly lock-free multi-producer, single consumer queue.
  29  //!
  30  //! This module contains an implementation of a concurrent MPSC queue. This
  31  //! queue can be used to share data between tasks, and is also used as the
  32  //! building block of channels in rust.
  33  //!
  34  //! Note that the current implementation of this queue has a caveat of the `pop`
  35  //! method, and see the method for more information about it. Due to this
  36  //! caveat, this queue may not be appropriate for all use-cases.
  37  
  38  // http://www.1024cores.net/home/lock-free-algorithms
  39  //                         /queues/non-intrusive-mpsc-node-based-queue
  40  
  41  use cast;
  42  use kinds::Send;
  43  use ops::Drop;
  44  use option::{Option, None, Some};
  45  use owned::Box;
  46  use ptr::RawPtr;
  47  use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed};
  48  
  49  /// A result of the `pop` function.
  50  pub enum PopResult<T> {
  51      /// Some data has been popped
  52      Data(T),
  53      /// The queue is empty
  54      Empty,
  55      /// The queue is in an inconsistent state. Popping data should succeed, but
  56      /// some pushers have yet to make enough progress in order allow a pop to
  57      /// succeed. It is recommended that a pop() occur "in the near future" in
  58      /// order to see if the sender has made progress or not
  59      Inconsistent,
  60  }
  61  
  62  struct Node<T> {
  63      next: AtomicPtr<Node<T>>,
  64      value: Option<T>,
  65  }
  66  
  67  /// The multi-producer single-consumer structure. This is not cloneable, but it
  68  /// may be safely shared so long as it is guaranteed that there is only one
  69  /// popper at a time (many pushers are allowed).
  70  pub struct Queue<T> {
  71      head: AtomicPtr<Node<T>>,
  72      tail: *mut Node<T>,
  73  }
  74  
  75  impl<T> Node<T> {
  76      unsafe fn new(v: Option<T>) -> *mut Node<T> {
  77          cast::transmute(box Node {
  78              next: AtomicPtr::new(0 as *mut Node<T>),
  79              value: v,
  80          })
  81      }
  82  }
  83  
  84  impl<T: Send> Queue<T> {
  85      /// Creates a new queue that is safe to share among multiple producers and
  86      /// one consumer.
  87      pub fn new() -> Queue<T> {
  88          let stub = unsafe { Node::new(None) };
  89          Queue {
  90              head: AtomicPtr::new(stub),
  91              tail: stub,
  92          }
  93      }
  94  
  95      /// Pushes a new value onto this queue.
  96      pub fn push(&mut self, t: T) {
  97          unsafe {
  98              let n = Node::new(Some(t));
  99              let prev = self.head.swap(n, AcqRel);
 100              (*prev).next.store(n, Release);
 101          }
 102      }
 103  
 104      /// Pops some data from this queue.
 105      ///
 106      /// Note that the current implementation means that this function cannot
 107      /// return `Option<T>`. It is possible for this queue to be in an
 108      /// inconsistent state where many pushes have succeeded and completely
 109      /// finished, but pops cannot return `Some(t)`. This inconsistent state
 110      /// happens when a pusher is pre-empted at an inopportune moment.
 111      ///
 112      /// This inconsistent state means that this queue does indeed have data, but
 113      /// it does not currently have access to it at this time.
 114      pub fn pop(&mut self) -> PopResult<T> {
 115          unsafe {
 116              let tail = self.tail;
 117              let next = (*tail).next.load(Acquire);
 118  
 119              if !next.is_null() {
 120                  self.tail = next;
 121                  assert!((*tail).value.is_none());
 122                  assert!((*next).value.is_some());
 123                  let ret = (*next).value.take_unwrap();
 124                  let _: Box<Node<T>> = cast::transmute(tail);
 125                  return Data(ret);
 126              }
 127  
 128              if self.head.load(Acquire) == tail {Empty} else {Inconsistent}
 129          }
 130      }
 131  
 132      /// Attempts to pop data from this queue, but doesn't attempt too hard. This
 133      /// will canonicalize inconsistent states to a `None` value.
 134      pub fn casual_pop(&mut self) -> Option<T> {
 135          match self.pop() {
 136              Data(t) => Some(t),
 137              Empty | Inconsistent => None,
 138          }
 139      }
 140  }
 141  
 142  #[unsafe_destructor]
 143  impl<T: Send> Drop for Queue<T> {
 144      fn drop(&mut self) {
 145          unsafe {
 146              let mut cur = self.tail;
 147              while !cur.is_null() {
 148                  let next = (*cur).next.load(Relaxed);
 149                  let _: Box<Node<T>> = cast::transmute(cur);
 150                  cur = next;
 151              }
 152          }
 153      }
 154  }
 155  
 156  #[cfg(test)]
 157  mod tests {
 158      use prelude::*;
 159  
 160      use native;
 161      use super::{Queue, Data, Empty, Inconsistent};
 162      use sync::arc::UnsafeArc;
 163  
 164      #[test]
 165      fn test_full() {
 166          let mut q = Queue::new();
 167          q.push(box 1);
 168          q.push(box 2);
 169      }
 170  
 171      #[test]
 172      fn test() {
 173          let nthreads = 8u;
 174          let nmsgs = 1000u;
 175          let mut q = Queue::new();
 176          match q.pop() {
 177              Empty => {}
 178              Inconsistent | Data(..) => fail!()
 179          }
 180          let (tx, rx) = channel();
 181          let q = UnsafeArc::new(q);
 182  
 183          for _ in range(0, nthreads) {
 184              let tx = tx.clone();
 185              let q = q.clone();
 186              native::task::spawn(proc() {
 187                  for i in range(0, nmsgs) {
 188                      unsafe { (*q.get()).push(i); }
 189                  }
 190                  tx.send(());
 191              });
 192          }
 193  
 194          let mut i = 0u;
 195          while i < nthreads * nmsgs {
 196              match unsafe { (*q.get()).pop() } {
 197                  Empty | Inconsistent => {},
 198                  Data(_) => { i += 1 }
 199              }
 200          }
 201          drop(tx);
 202          for _ in range(0, nthreads) {
 203              rx.recv();
 204          }
 205      }
 206  }