(index<- )        ./libsync/mpsc_intrusive.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Wed Apr  9 17:27:03 2014
   1  /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
   2   * Redistribution and use in source and binary forms, with or without
   3   * modification, are permitted provided that the following conditions are met:
   4   *
   5   *    1. Redistributions of source code must retain the above copyright notice,
   6   *       this list of conditions and the following disclaimer.
   7   *
   8   *    2. Redistributions in binary form must reproduce the above copyright
   9   *       notice, this list of conditions and the following disclaimer in the
  10   *       documentation and/or other materials provided with the distribution.
  11   *
  12   * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
  13   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
  14   * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
  15   * EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  16   * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  17   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
  18   * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  19   * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  20   * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
  21   * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  22   *
  23   * The views and conclusions contained in the software and documentation are
  24   * those of the authors and should not be interpreted as representing official
  25   * policies, either expressed or implied, of Dmitry Vyukov.
  26   */
  27  
  28  //! A mostly lock-free multi-producer, single consumer queue.
  29  //!
  30  //! This module implements an intrusive MPSC queue. This queue is incredibly
  31  //! unsafe (due to use of unsafe pointers for nodes), and hence is not public.
  32  
  33  // http://www.1024cores.net/home/lock-free-algorithms
  34  //                         /queues/intrusive-mpsc-node-based-queue
  35  
  36  use std::cast;
  37  use std::sync::atomics;
  38  use std::ty::Unsafe;
  39  
  40  // NB: all links are done as AtomicUint instead of AtomicPtr to allow for static
  41  // initialization.
  42  
  43  pub struct Node<T> {
  44      pub next: atomics::AtomicUint,
  45      pub data: T,
  46  }
  47  
  48  pub struct DummyNode {
  49      pub next: atomics::AtomicUint,
  50  }
  51  
  52  pub struct Queue<T> {
  53      pub head: atomics::AtomicUint,
  54      pub tail: Unsafe<*mut Node<T>>,
  55      pub stub: DummyNode,
  56  }
  57  
  58  impl<T: Send> Queue<T> {
  59      pub fn new() -> Queue<T> {
  60          Queue {
  61              head: atomics::AtomicUint::new(0),
  62              tail: Unsafe::new(0 as *mut Node<T>),
  63              stub: DummyNode {
  64                  next: atomics::AtomicUint::new(0),
  65              },
  66          }
  67      }
  68  
  69      pub unsafe fn push(&self, node*mut Node<T>) {
  70          (*node).next.store(0, atomics::Release);
  71          let prev = self.head.swap(node as uint, atomics::AcqRel);
  72  
  73          // Note that this code is slightly modified to allow static
  74          // initialization of these queues with rust's flavor of static
  75          // initialization.
  76          if prev == 0 {
  77              self.stub.next.store(node as uint, atomics::Release);
  78          } else {
  79              let prev = prev as *mut Node<T>;
  80              (*prev).next.store(node as uint, atomics::Release);
  81          }
  82      }
  83  
  84      /// You'll note that the other MPSC queue in std::sync is non-intrusive and
  85      /// returns a `PopResult` here to indicate when the queue is inconsistent.
  86      /// An "inconsistent state" in the other queue means that a pusher has
  87      /// pushed, but it hasn't finished linking the rest of the chain.
  88      ///
  89      /// This queue also suffers from this problem, but I currently haven't been
  90      /// able to detangle when this actually happens. This code is translated
  91      /// verbatim from the website above, and is more complicated than the
  92      /// non-intrusive version.
  93      ///
  94      /// Right now consumers of this queue must be ready for this fact. Just
  95      /// because `pop` returns `None` does not mean that there is not data
  96      /// on the queue.
  97      pub unsafe fn pop(&self) -> Option<*mut Node<T>> {
  98          let tail = *self.tail.get();
  99          let mut tail = if !tail.is_null() {tail} else {
 100              cast::transmute(&self.stub)
 101          };
 102          let mut next = (*tail).next(atomics::Relaxed);
 103          if tail as uint == &self.stub as *DummyNode as uint {
 104              if next.is_null() {
 105                  return None;
 106              }
 107              *self.tail.get() = next;
 108              tail = next;
 109              next = (*next).next(atomics::Relaxed);
 110          }
 111          if !next.is_null() {
 112              *self.tail.get() = next;
 113              return Some(tail);
 114          }
 115          let head = self.head.load(atomics::Acquire) as *mut Node<T>;
 116          if tail != head {
 117              return None;
 118          }
 119          let stub = cast::transmute(&self.stub);
 120          self.push(stub);
 121          next = (*tail).next(atomics::Relaxed);
 122          if !next.is_null() {
 123              *self.tail.get() = next;
 124              return Some(tail);
 125          }
 126          return None
 127      }
 128  }
 129  
 130  impl<T: Send> Node<T> {
 131      pub fn new(tT) -> Node<T> {
 132          Node {
 133              data: t,
 134              next: atomics::AtomicUint::new(0),
 135          }
 136      }
 137      pub unsafe fn next(&self, ordatomics::Ordering) -> *mut Node<T> {
 138          cast::transmute::<uint, *mut Node<T>>(self.next.load(ord))
 139      }
 140  }


libsync/mpsc_intrusive.rs:47:1-47:1 -struct- definition:
pub struct DummyNode {
    pub next: atomics::AtomicUint,
}
references:- 4
62:             tail: Unsafe::new(0 as *mut Node<T>),
63:             stub: DummyNode {
64:                 next: atomics::AtomicUint::new(0),
libsync/mutex.rs:
169:         },
170:         stub: q::DummyNode {
171:             next: atomics::INIT_ATOMIC_UINT,
libsync/mpsc_intrusive.rs:
102:         let mut next = (*tail).next(atomics::Relaxed);
103:         if tail as uint == &self.stub as *DummyNode as uint {
104:             if next.is_null() {


libsync/mpsc_intrusive.rs:51:1-51:1 -struct- definition:
pub struct Queue<T> {
    pub head: atomics::AtomicUint,
    pub tail: Unsafe<*mut Node<T>>,
references:- 5
59:     pub fn new() -> Queue<T> {
60:         Queue {
61:             head: atomics::AtomicUint::new(0),
libsync/mutex.rs:
163:     green_cnt: atomics::INIT_ATOMIC_UINT,
164:     q: q::Queue {
165:         head: atomics::INIT_ATOMIC_UINT,
libsync/mpsc_intrusive.rs:
58: impl<T: Send> Queue<T> {
59:     pub fn new() -> Queue<T> {
60:         Queue {
libsync/mutex.rs:
143:     /// to figure out when to dequeue and enqueue.
144:     q: q::Queue<uint>,
145:     green_cnt: atomics::AtomicUint,
libsync/mpsc_intrusive.rs:
58: impl<T: Send> Queue<T> {
59:     pub fn new() -> Queue<T> {


libsync/mpsc_intrusive.rs:42:1-42:1 -struct- definition:
pub struct Node<T> {
    pub next: atomics::AtomicUint,
    pub data: T,
references:- 12
131:     pub fn new(t: T) -> Node<T> {
132:         Node {
133:             data: t,
--
137:     pub unsafe fn next(&self, ord: atomics::Ordering) -> *mut Node<T> {
138:         cast::transmute::<uint, *mut Node<T>>(self.next.load(ord))
139:     }
libsync/mutex.rs:
166:         tail: Unsafe {
167:             value: 0 as *mut q::Node<uint>,
168:             marker1: marker::InvariantType,
libsync/mpsc_intrusive.rs:
96:     /// on the queue.
97:     pub unsafe fn pop(&self) -> Option<*mut Node<T>> {
98:         let tail = *self.tail.get();