(index<- )        ./libstd/sync/spsc_queue.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
   2   * Redistribution and use in source and binary forms, with or without
   3   * modification, are permitted provided that the following conditions are met:
   4   *
   5   *    1. Redistributions of source code must retain the above copyright notice,
   6   *       this list of conditions and the following disclaimer.
   7   *
   8   *    2. Redistributions in binary form must reproduce the above copyright
   9   *       notice, this list of conditions and the following disclaimer in the
  10   *       documentation and/or other materials provided with the distribution.
  11   *
  12   * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
  13   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
  14   * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
  15   * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  16   * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  17   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  18   * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  19   * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
  20   * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
  21   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  22   *
  23   * The views and conclusions contained in the software and documentation are
  24   * those of the authors and should not be interpreted as representing official
  25   * policies, either expressed or implied, of Dmitry Vyukov.
  26   */
  27  
  28  // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
  29  
  30  //! A single-producer single-consumer concurrent queue
  31  //!
  32  //! This module contains the implementation of an SPSC queue which can be used
  33  //! concurrently between two tasks. This data structure is safe to use and
  34  //! enforces the semantics that there is one pusher and one popper.
  35  
  36  use cast;
  37  use kinds::Send;
  38  use ops::Drop;
  39  use option::{Some, None, Option};
  40  use owned::Box;
  41  use ptr::RawPtr;
  42  use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
  43  
  44  // Node within the linked list queue of messages to send
  45  struct Node<T> {
  46      // FIXME: this could be an uninitialized T if we're careful enough, and
  47      //      that would reduce memory usage (and be a bit faster).
  48      //      is it worth it?
  49      value: Option<T>,           // nullable for re-use of nodes
  50      next: AtomicPtr<Node<T>>,   // next node in the queue
  51  }
  52  
  53  /// The single-producer single-consumer queue. This structure is not cloneable,
  54  /// but it can be safely shared in an UnsafeArc if it is guaranteed that there
  55  /// is only one popper and one pusher touching the queue at any one point in
  56  /// time.
  57  pub struct Queue<T> {
  58      // consumer fields
  59      tail: *mut Node<T>, // where to pop from
  60      tail_prev: AtomicPtr<Node<T>>, // where to pop from
  61  
  62      // producer fields
  63      head: *mut Node<T>,      // where to push to
  64      first: *mut Node<T>,     // where to get new nodes from
  65      tail_copy: *mut Node<T>, // between first/tail
  66  
  67      // Cache maintenance fields. Additions and subtractions are stored
  68      // separately in order to allow them to use nonatomic addition/subtraction.
  69      cache_bound: uint,
  70      cache_additions: AtomicUint,
  71      cache_subtractions: AtomicUint,
  72  }
  73  
  74  impl<T: Send> Node<T> {
  75      fn new() -> *mut Node<T> {
  76          unsafe {
  77              cast::transmute(box Node {
  78                  value: None,
  79                  next: AtomicPtr::new(0 as *mut Node<T>),
  80              })
  81          }
  82      }
  83  }
  84  
  85  impl<T: Send> Queue<T> {
  86      /// Creates a new queue. The producer returned is connected to the consumer
  87      /// to push all data to the consumer.
  88      ///
  89      /// # Arguments
  90      ///
  91      ///   * `bound` - This queue implementation is implemented with a linked
  92      ///               list, and this means that a push is always a malloc. In
  93      ///               order to amortize this cost, an internal cache of nodes is
  94      ///               maintained to prevent a malloc from always being
  95      ///               necessary. This bound is the limit on the size of the
  96      ///               cache (if desired). If the value is 0, then the cache has
  97      ///               no bound. Otherwise, the cache will never grow larger than
  98      ///               `bound` (although the queue itself could be much larger.
  99      pub fn new(bounduint) -> Queue<T> {
 100          let n1 = Node::new();
 101          let n2 = Node::new();
 102          unsafe { (*n1).next.store(n2, Relaxed) }
 103          Queue {
 104              tail: n2,
 105              tail_prev: AtomicPtr::new(n1),
 106              head: n2,
 107              first: n1,
 108              tail_copy: n1,
 109              cache_bound: bound,
 110              cache_additions: AtomicUint::new(0),
 111              cache_subtractions: AtomicUint::new(0),
 112          }
 113      }
 114  
 115      /// Pushes a new value onto this queue. Note that to use this function
 116      /// safely, it must be externally guaranteed that there is only one pusher.
 117      pub fn push(&mut self, tT) {
 118          unsafe {
 119              // Acquire a node (which either uses a cached one or allocates a new
 120              // one), and then append this to the 'head' node.
 121              let n = self.alloc();
 122              assert!((*n).value.is_none());
 123              (*n).value = Some(t);
 124              (*n).next.store(0 as *mut Node<T>, Relaxed);
 125              (*self.head).next.store(n, Release);
 126              self.head = n;
 127          }
 128      }
 129  
 130      unsafe fn alloc(&mut self) -> *mut Node<T> {
 131          // First try to see if we can consume the 'first' node for our uses.
 132          // We try to avoid as many atomic instructions as possible here, so
 133          // the addition to cache_subtractions is not atomic (plus we're the
 134          // only one subtracting from the cache).
 135          if self.first != self.tail_copy {
 136              if self.cache_bound > 0 {
 137                  let b = self.cache_subtractions.load(Relaxed);
 138                  self.cache_subtractions.store(b + 1, Relaxed);
 139              }
 140              let ret = self.first;
 141              self.first = (*ret).next.load(Relaxed);
 142              return ret;
 143          }
 144          // If the above fails, then update our copy of the tail and try
 145          // again.
 146          self.tail_copy = self.tail_prev.load(Acquire);
 147          if self.first != self.tail_copy {
 148              if self.cache_bound > 0 {
 149                  let b = self.cache_subtractions.load(Relaxed);
 150                  self.cache_subtractions.store(b + 1, Relaxed);
 151              }
 152              let ret = self.first;
 153              self.first = (*ret).next.load(Relaxed);
 154              return ret;
 155          }
 156          // If all of that fails, then we have to allocate a new node
 157          // (there's nothing in the node cache).
 158          Node::new()
 159      }
 160  
 161      /// Attempts to pop a value from this queue. Remember that to use this type
 162      /// safely you must ensure that there is only one popper at a time.
 163      pub fn pop(&mut self) -> Option<T> {
 164          unsafe {
 165              // The `tail` node is not actually a used node, but rather a
 166              // sentinel from where we should start popping from. Hence, look at
 167              // tail's next field and see if we can use it. If we do a pop, then
 168              // the current tail node is a candidate for going into the cache.
 169              let tail = self.tail;
 170              let next = (*tail).next.load(Acquire);
 171              if next.is_null() { return None }
 172              assert!((*next).value.is_some());
 173              let ret = (*next).value.take();
 174  
 175              self.tail = next;
 176              if self.cache_bound == 0 {
 177                  self.tail_prev.store(tail, Release);
 178              } else {
 179                  // FIXME: this is dubious with overflow.
 180                  let additions = self.cache_additions.load(Relaxed);
 181                  let subtractions = self.cache_subtractions.load(Relaxed);
 182                  let size = additions - subtractions;
 183  
 184                  if size < self.cache_bound {
 185                      self.tail_prev.store(tail, Release);
 186                      self.cache_additions.store(additions + 1, Relaxed);
 187                  } else {
 188                      (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed);
 189                      // We have successfully erased all references to 'tail', so
 190                      // now we can safely drop it.
 191                      let _Box<Node<T>> = cast::transmute(tail);
 192                  }
 193              }
 194              return ret;
 195          }
 196      }
 197  
 198      /// Attempts to peek at the head of the queue, returning `None` if the queue
 199      /// has no data currently
 200      pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
 201          // This is essentially the same as above with all the popping bits
 202          // stripped out.
 203          unsafe {
 204              let tail = self.tail;
 205              let next = (*tail).next.load(Acquire);
 206              if next.is_null() { return None }
 207              return (*next).value.as_mut();
 208          }
 209      }
 210  }
 211  
 212  #[unsafe_destructor]
 213  impl<T: Send> Drop for Queue<T> {
 214      fn drop(&mut self) {
 215          unsafe {
 216              let mut cur = self.first;
 217              while !cur.is_null() {
 218                  let next = (*cur).next.load(Relaxed);
 219                  let _nBox<Node<T>> = cast::transmute(cur);
 220                  cur = next;
 221              }
 222          }
 223      }
 224  }
 225  
 226  #[cfg(test)]
 227  mod test {
 228      use prelude::*;
 229      use native;
 230      use super::Queue;
 231      use sync::arc::UnsafeArc;
 232  
 233      #[test]
 234      fn smoke() {
 235          let mut q = Queue::new(0);
 236          q.push(1);
 237          q.push(2);
 238          assert_eq!(q.pop(), Some(1));
 239          assert_eq!(q.pop(), Some(2));
 240          assert_eq!(q.pop(), None);
 241          q.push(3);
 242          q.push(4);
 243          assert_eq!(q.pop(), Some(3));
 244          assert_eq!(q.pop(), Some(4));
 245          assert_eq!(q.pop(), None);
 246      }
 247  
 248      #[test]
 249      fn drop_full() {
 250          let mut q = Queue::new(0);
 251          q.push(box 1);
 252          q.push(box 2);
 253      }
 254  
 255      #[test]
 256      fn smoke_bound() {
 257          let mut q = Queue::new(1);
 258          q.push(1);
 259          q.push(2);
 260          assert_eq!(q.pop(), Some(1));
 261          assert_eq!(q.pop(), Some(2));
 262          assert_eq!(q.pop(), None);
 263          q.push(3);
 264          q.push(4);
 265          assert_eq!(q.pop(), Some(3));
 266          assert_eq!(q.pop(), Some(4));
 267          assert_eq!(q.pop(), None);
 268      }
 269  
 270      #[test]
 271      fn stress() {
 272          stress_bound(0);
 273          stress_bound(1);
 274  
 275          fn stress_bound(bound: uint) {
 276              let (a, b) = UnsafeArc::new2(Queue::new(bound));
 277              let (tx, rx) = channel();
 278              native::task::spawn(proc() {
 279                  for _ in range(0, 100000) {
 280                      loop {
 281                          match unsafe { (*b.get()).pop() } {
 282                              Some(1) => break,
 283                              Some(_) => fail!(),
 284                              None => {}
 285                          }
 286                      }
 287                  }
 288                  tx.send(());
 289              });
 290              for _ in range(0, 100000) {
 291                  unsafe { (*a.get()).push(1); }
 292              }
 293              rx.recv();
 294          }
 295      }
 296  }


libstd/sync/spsc_queue.rs:44:57-44:57 -struct- definition:
// Node within the linked list queue of messages to send
struct Node<T> {
    // FIXME: this could be an uninitialized T if we're careful enough, and
references:- 14
76:         unsafe {
77:             cast::transmute(box Node {
78:                 value: None,
79:                 next: AtomicPtr::new(0 as *mut Node<T>),
80:             })
--
130:     unsafe fn alloc(&mut self) -> *mut Node<T> {
131:         // First try to see if we can consume the 'first' node for our uses.
--
190:                     // now we can safely drop it.
191:                     let _: Box<Node<T>> = cast::transmute(tail);
192:                 }
--
218:                 let next = (*cur).next.load(Relaxed);
219:                 let _n: Box<Node<T>> = cast::transmute(cur);
220:                 cur = next;


libstd/sync/spsc_queue.rs:56:10-56:10 -struct- definition:
/// time.
pub struct Queue<T> {
    // consumer fields
references:- 5
102:         unsafe { (*n1).next.store(n2, Relaxed) }
103:         Queue {
104:             tail: n2,
libstd/comm/stream.rs:
41: pub struct Packet<T> {
42:     queue: spsc::Queue<Message<T>>, // internal queue for all message
libstd/sync/spsc_queue.rs:
213: impl<T: Send> Drop for Queue<T> {
214:     fn drop(&mut self) {