(index<- )        ./libstd/sync/mpmc_bounded_queue.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
   2   * Redistribution and use in source and binary forms, with or without
   3   * modification, are permitted provided that the following conditions are met:
   4   *
   5   *    1. Redistributions of source code must retain the above copyright notice,
   6   *       this list of conditions and the following disclaimer.
   7   *
   8   *    2. Redistributions in binary form must reproduce the above copyright
   9   *       notice, this list of conditions and the following disclaimer in the
  10   *       documentation and/or other materials provided with the distribution.
  11   *
  12   * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
  13   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
  14   * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
  15   * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  16   * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  17   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  18   * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  19   * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
  20   * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
  21   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  22   *
  23   * The views and conclusions contained in the software and documentation are
  24   * those of the authors and should not be interpreted as representing official
  25   * policies, either expressed or implied, of Dmitry Vyukov.
  26   */
  27  
  28  #![allow(missing_doc, dead_code)]
  29  
  30  // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
  31  
  32  use clone::Clone;
  33  use kinds::Send;
  34  use num::next_power_of_two;
  35  use option::{Option, Some, None};
  36  use sync::arc::UnsafeArc;
  37  use sync::atomics::{AtomicUint,Relaxed,Release,Acquire};
  38  use vec::Vec;
  39  
  40  struct Node<T> {
  41      sequence: AtomicUint,
  42      value: Option<T>,
  43  }
  44  
  45  struct State<T> {
  46      pad0: [u8, ..64],
  47      buffer: Vec<Node<T>>,
  48      mask: uint,
  49      pad1: [u8, ..64],
  50      enqueue_pos: AtomicUint,
  51      pad2: [u8, ..64],
  52      dequeue_pos: AtomicUint,
  53      pad3: [u8, ..64],
  54  }
  55  
  56  pub struct Queue<T> {
  57      state: UnsafeArc<State<T>>,
  58  }
  59  
  60  impl<T: Send> State<T> {
  61      fn with_capacity(capacityuint) -> State<T> {
  62          let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
  63              if capacity < 2 {
  64                  2u
  65              } else {
  66                  // use next power of 2 as capacity
  67                  next_power_of_two(capacity)
  68              }
  69          } else {
  70              capacity
  71          };
  72          let buffer = Vec::from_fn(capacity, |i| {
  73              Node { sequence:AtomicUint::new(i), value: None }
  74          });
  75          State{
  76              pad0: [0, ..64],
  77              buffer: buffer,
  78              mask: capacity-1,
  79              pad1: [0, ..64],
  80              enqueue_pos: AtomicUint::new(0),
  81              pad2: [0, ..64],
  82              dequeue_pos: AtomicUint::new(0),
  83              pad3: [0, ..64],
  84          }
  85      }
  86  
  87      fn push(&mut self, valueT) -> bool {
  88          let mask = self.mask;
  89          let mut pos = self.enqueue_pos.load(Relaxed);
  90          loop {
  91              let node = self.buffer.get_mut(pos & mask);
  92              let seq = node.sequence.load(Acquire);
  93              let diffint = seq as int - pos as int;
  94  
  95              if diff == 0 {
  96                  let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
  97                  if enqueue_pos == pos {
  98                      node.value = Some(value);
  99                      node.sequence.store(pos+1, Release);
 100                      break
 101                  } else {
 102                      pos = enqueue_pos;
 103                  }
 104              } else if diff < 0 {
 105                  return false
 106              } else {
 107                  pos = self.enqueue_pos.load(Relaxed);
 108              }
 109          }
 110          true
 111      }
 112  
 113      fn pop(&mut self) -> Option<T> {
 114          let mask = self.mask;
 115          let mut pos = self.dequeue_pos.load(Relaxed);
 116          loop {
 117              let node = self.buffer.get_mut(pos & mask);
 118              let seq = node.sequence.load(Acquire);
 119              let diffint = seq as int - (pos + 1) as int;
 120              if diff == 0 {
 121                  let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
 122                  if dequeue_pos == pos {
 123                      let value = node.value.take();
 124                      node.sequence.store(pos + mask + 1, Release);
 125                      return value
 126                  } else {
 127                      pos = dequeue_pos;
 128                  }
 129              } else if diff < 0 {
 130                  return None
 131              } else {
 132                  pos = self.dequeue_pos.load(Relaxed);
 133              }
 134          }
 135      }
 136  }
 137  
 138  impl<T: Send> Queue<T> {
 139      pub fn with_capacity(capacityuint) -> Queue<T> {
 140          Queue{
 141              state: UnsafeArc::new(State::with_capacity(capacity))
 142          }
 143      }
 144  
 145      pub fn push(&mut self, valueT) -> bool {
 146          unsafe { (*self.state.get()).push(value) }
 147      }
 148  
 149      pub fn pop(&mut self) -> Option<T> {
 150          unsafe { (*self.state.get()).pop() }
 151      }
 152  }
 153  
 154  impl<T: Send> Clone for Queue<T> {
 155      fn clone(&self) -> Queue<T> {
 156          Queue {
 157              state: self.state.clone()
 158          }
 159      }
 160  }
 161  
 162  #[cfg(test)]
 163  mod tests {
 164      use prelude::*;
 165      use super::Queue;
 166      use native;
 167  
 168      #[test]
 169      fn test() {
 170          let nthreads = 8u;
 171          let nmsgs = 1000u;
 172          let mut q = Queue::with_capacity(nthreads*nmsgs);
 173          assert_eq!(None, q.pop());
 174          let (tx, rx) = channel();
 175  
 176          for _ in range(0, nthreads) {
 177              let q = q.clone();
 178              let tx = tx.clone();
 179              native::task::spawn(proc() {
 180                  let mut q = q;
 181                  for i in range(0, nmsgs) {
 182                      assert!(q.push(i));
 183                  }
 184                  tx.send(());
 185              });
 186          }
 187  
 188          let mut completion_rxs = vec![];
 189          for _ in range(0, nthreads) {
 190              let (tx, rx) = channel();
 191              completion_rxs.push(rx);
 192              let q = q.clone();
 193              native::task::spawn(proc() {
 194                  let mut q = q;
 195                  let mut i = 0u;
 196                  loop {
 197                      match q.pop() {
 198                          None => {},
 199                          Some(_) => {
 200                              i += 1;
 201                              if i == nmsgs { break }
 202                          }
 203                      }
 204                  }
 205                  tx.send(i);
 206              });
 207          }
 208  
 209          for rx in completion_rxs.mut_iter() {
 210              assert_eq!(nmsgs, rx.recv());
 211          }
 212          for _ in range(0, nthreads) {
 213              rx.recv();
 214          }
 215      }
 216  }