(index<- )        ./libstd/sync/deque.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! A (mostly) lock-free concurrent work-stealing deque
  12  //!
  13  //! This module contains an implementation of the Chase-Lev work stealing deque
  14  //! described in "Dynamic Circular Work-Stealing Deque". The implementation is
  15  //! heavily based on the pseudocode found in the paper.
  16  //!
  17  //! This implementation does not want to have the restriction of a garbage
  18  //! collector for reclamation of buffers, and instead it uses a shared pool of
  19  //! buffers. This shared pool is required for correctness in this
  20  //! implementation.
  21  //!
  22  //! The only lock-synchronized portions of this deque are the buffer allocation
  23  //! and deallocation portions. Otherwise all operations are lock-free.
  24  //!
  25  //! # Example
  26  //!
  27  //!     use std::rt::deque::BufferPool;
  28  //!
  29  //!     let mut pool = BufferPool::new();
  30  //!     let (mut worker, mut stealer) = pool.deque();
  31  //!
  32  //!     // Only the worker may push/pop
  33  //!     worker.push(1);
  34  //!     worker.pop();
  35  //!
  36  //!     // Stealers take data from the other end of the deque
  37  //!     worker.push(1);
  38  //!     stealer.steal();
  39  //!
  40  //!     // Stealers can be cloned to have many stealers stealing in parallel
  41  //!     worker.push(1);
  42  //!     let mut stealer2 = stealer.clone();
  43  //!     stealer2.steal();
  44  
  45  // NB: the "buffer pool" strategy is not done for speed, but rather for
  46  //     correctness. For more info, see the comment on `swap_buffer`
  47  
  48  // FIXME: all atomic operations in this module use a SeqCst ordering. That is
  49  //      probably overkill
  50  
  51  use cast;
  52  use clone::Clone;
  53  use iter::{range, Iterator};
  54  use kinds::Send;
  55  use libc;
  56  use mem;
  57  use ops::Drop;
  58  use option::{Option, Some, None};
  59  use owned::Box;
  60  use ptr;
  61  use ptr::RawPtr;
  62  use sync::arc::UnsafeArc;
  63  use sync::atomics::{AtomicInt, AtomicPtr, SeqCst};
  64  use unstable::sync::Exclusive;
  65  use slice::ImmutableVector;
  66  use vec::Vec;
  67  
  68  // Once the queue is less than 1/K full, then it will be downsized. Note that
  69  // the deque requires that this number be less than 2.
  70  static K: int = 4;
  71  
  72  // Minimum number of bits that a buffer size should be. No buffer will resize to
  73  // under this value, and all deques will initially contain a buffer of this
  74  // size.
  75  //
  76  // The size in question is 1 << MIN_BITS
  77  static MIN_BITS: int = 7;
  78  
  79  struct Deque<T> {
  80      bottom: AtomicInt,
  81      top: AtomicInt,
  82      array: AtomicPtr<Buffer<T>>,
  83      pool: BufferPool<T>,
  84  }
  85  
  86  /// Worker half of the work-stealing deque. This worker has exclusive access to
  87  /// one side of the deque, and uses `push` and `pop` method to manipulate it.
  88  ///
  89  /// There may only be one worker per deque.
  90  pub struct Worker<T> {
  91      deque: UnsafeArc<Deque<T>>,
  92  }
  93  
  94  /// The stealing half of the work-stealing deque. Stealers have access to the
  95  /// opposite end of the deque from the worker, and they only have access to the
  96  /// `steal` method.
  97  pub struct Stealer<T> {
  98      deque: UnsafeArc<Deque<T>>,
  99  }
 100  
 101  /// When stealing some data, this is an enumeration of the possible outcomes.
 102  #[deriving(Eq, Show)]
 103  pub enum Stolen<T> {
 104      /// The deque was empty at the time of stealing
 105      Empty,
 106      /// The stealer lost the race for stealing data, and a retry may return more
 107      /// data.
 108      Abort,
 109      /// The stealer has successfully stolen some data.
 110      Data(T),
 111  }
 112  
 113  /// The allocation pool for buffers used by work-stealing deques. Right now this
 114  /// structure is used for reclamation of memory after it is no longer in use by
 115  /// deques.
 116  ///
 117  /// This data structure is protected by a mutex, but it is rarely used. Deques
 118  /// will only use this structure when allocating a new buffer or deallocating a
 119  /// previous one.
 120  pub struct BufferPool<T> {
 121      pool: Exclusive<Vec<Box<Buffer<T>>>>,
 122  }
 123  
 124  /// An internal buffer used by the chase-lev deque. This structure is actually
 125  /// implemented as a circular buffer, and is used as the intermediate storage of
 126  /// the data in the deque.
 127  ///
 128  /// This type is implemented with *T instead of Vec<T> for two reasons:
 129  ///
 130  ///   1. There is nothing safe about using this buffer. This easily allows the
 131  ///      same value to be read twice in to rust, and there is nothing to
 132  ///      prevent this. The usage by the deque must ensure that one of the
 133  ///      values is forgotten. Furthermore, we only ever want to manually run
 134  ///      destructors for values in this buffer (on drop) because the bounds
 135  ///      are defined by the deque it's owned by.
 136  ///
 137  ///   2. We can certainly avoid bounds checks using *T instead of Vec<T>, although
 138  ///      LLVM is probably pretty good at doing this already.
 139  struct Buffer<T> {
 140      storage: *T,
 141      log_size: int,
 142  }
 143  
 144  impl<T: Send> BufferPool<T> {
 145      /// Allocates a new buffer pool which in turn can be used to allocate new
 146      /// deques.
 147      pub fn new() -> BufferPool<T> {
 148          BufferPool { pool: Exclusive::new(vec!()) }
 149      }
 150  
 151      /// Allocates a new work-stealing deque which will send/receiving memory to
 152      /// and from this buffer pool.
 153      pub fn deque(&mut self) -> (Worker<T>, Stealer<T>) {
 154          let (a, b) = UnsafeArc::new2(Deque::new(self.clone()));
 155          (Worker { deque: a }, Stealer { deque: b })
 156      }
 157  
 158      fn alloc(&mut self, bitsint) -> Box<Buffer<T>> {
 159          unsafe {
 160              self.pool.with(|pool| {
 161                  match pool.iter().position(|x| x.size() >= (1 << bits)) {
 162                      Some(i) => pool.remove(i).unwrap(),
 163                      None => box Buffer::new(bits)
 164                  }
 165              })
 166          }
 167      }
 168  
 169      fn free(&mut self, bufBox<Buffer<T>>) {
 170          unsafe {
 171              let mut buf = Some(buf);
 172              self.pool.with(|pool| {
 173                  let buf = buf.take_unwrap();
 174                  match pool.iter().position(|v| v.size() > buf.size()) {
 175                      Some(i) => pool.insert(i, buf),
 176                      None => pool.push(buf),
 177                  }
 178              })
 179          }
 180      }
 181  }
 182  
 183  impl<T: Send> Clone for BufferPool<T> {
 184      fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } }
 185  }
 186  
 187  impl<T: Send> Worker<T> {
 188      /// Pushes data onto the front of this work queue.
 189      pub fn push(&mut self, tT) {
 190          unsafe { (*self.deque.get()).push(t) }
 191      }
 192      /// Pops data off the front of the work queue, returning `None` on an empty
 193      /// queue.
 194      pub fn pop(&mut self) -> Option<T> {
 195          unsafe { (*self.deque.get()).pop() }
 196      }
 197  
 198      /// Gets access to the buffer pool that this worker is attached to. This can
 199      /// be used to create more deques which share the same buffer pool as this
 200      /// deque.
 201      pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
 202          unsafe { &mut (*self.deque.get()).pool }
 203      }
 204  }
 205  
 206  impl<T: Send> Stealer<T> {
 207      /// Steals work off the end of the queue (opposite of the worker's end)
 208      pub fn steal(&mut self) -> Stolen<T> {
 209          unsafe { (*self.deque.get()).steal() }
 210      }
 211  
 212      /// Gets access to the buffer pool that this stealer is attached to. This
 213      /// can be used to create more deques which share the same buffer pool as
 214      /// this deque.
 215      pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
 216          unsafe { &mut (*self.deque.get()).pool }
 217      }
 218  }
 219  
 220  impl<T: Send> Clone for Stealer<T> {
 221      fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } }
 222  }
 223  
 224  // Almost all of this code can be found directly in the paper so I'm not
 225  // personally going to heavily comment what's going on here.
 226  
 227  impl<T: Send> Deque<T> {
 228      fn new(mut poolBufferPool<T>) -> Deque<T> {
 229          let buf = pool.alloc(MIN_BITS);
 230          Deque {
 231              bottom: AtomicInt::new(0),
 232              top: AtomicInt::new(0),
 233              array: AtomicPtr::new(unsafe { cast::transmute(buf) }),
 234              pool: pool,
 235          }
 236      }
 237  
 238      unsafe fn push(&mut self, dataT) {
 239          let mut b = self.bottom.load(SeqCst);
 240          let t = self.top.load(SeqCst);
 241          let mut a = self.array.load(SeqCst);
 242          let size = b - t;
 243          if size >= (*a).size() - 1 {
 244              // You won't find this code in the chase-lev deque paper. This is
 245              // alluded to in a small footnote, however. We always free a buffer
 246              // when growing in order to prevent leaks.
 247              a = self.swap_buffer(b, a, (*a).resize(b, t, 1));
 248              b = self.bottom.load(SeqCst);
 249          }
 250          (*a).put(b, data);
 251          self.bottom.store(b + 1, SeqCst);
 252      }
 253  
 254      unsafe fn pop(&mut self) -> Option<T> {
 255          let b = self.bottom.load(SeqCst);
 256          let a = self.array.load(SeqCst);
 257          let b = b - 1;
 258          self.bottom.store(b, SeqCst);
 259          let t = self.top.load(SeqCst);
 260          let size = b - t;
 261          if size < 0 {
 262              self.bottom.store(t, SeqCst);
 263              return None;
 264          }
 265          let data = (*a).get(b);
 266          if size > 0 {
 267              self.maybe_shrink(b, t);
 268              return Some(data);
 269          }
 270          if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
 271              self.bottom.store(t + 1, SeqCst);
 272              return Some(data);
 273          } else {
 274              self.bottom.store(t + 1, SeqCst);
 275              cast::forget(data); // someone else stole this value
 276              return None;
 277          }
 278      }
 279  
 280      unsafe fn steal(&mut self) -> Stolen<T> {
 281          let t = self.top.load(SeqCst);
 282          let old = self.array.load(SeqCst);
 283          let b = self.bottom.load(SeqCst);
 284          let a = self.array.load(SeqCst);
 285          let size = b - t;
 286          if size <= 0 { return Empty }
 287          if size % (*a).size() == 0 {
 288              if a == old && t == self.top.load(SeqCst) {
 289                  return Empty
 290              }
 291              return Abort
 292          }
 293          let data = (*a).get(t);
 294          if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
 295              Data(data)
 296          } else {
 297              cast::forget(data); // someone else stole this value
 298              Abort
 299          }
 300      }
 301  
 302      unsafe fn maybe_shrink(&mut self, bint, tint) {
 303          let a = self.array.load(SeqCst);
 304          if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) {
 305              self.swap_buffer(b, a, (*a).resize(b, t, -1));
 306          }
 307      }
 308  
 309      // Helper routine not mentioned in the paper which is used in growing and
 310      // shrinking buffers to swap in a new buffer into place. As a bit of a
 311      // recap, the whole point that we need a buffer pool rather than just
 312      // calling malloc/free directly is that stealers can continue using buffers
 313      // after this method has called 'free' on it. The continued usage is simply
 314      // a read followed by a forget, but we must make sure that the memory can
 315      // continue to be read after we flag this buffer for reclamation.
 316      unsafe fn swap_buffer(&mut self, bint, old*mut Buffer<T>,
 317                            bufBuffer<T>) -> *mut Buffer<T> {
 318          let newbuf*mut Buffer<T> = cast::transmute(box buf);
 319          self.array.store(newbuf, SeqCst);
 320          let ss = (*newbuf).size();
 321          self.bottom.store(b + ss, SeqCst);
 322          let t = self.top.load(SeqCst);
 323          if self.top.compare_and_swap(t, t + ss, SeqCst) != t {
 324              self.bottom.store(b, SeqCst);
 325          }
 326          self.pool.free(cast::transmute(old));
 327          return newbuf;
 328      }
 329  }
 330  
 331  
 332  #[unsafe_destructor]
 333  impl<T: Send> Drop for Deque<T> {
 334      fn drop(&mut self) {
 335          let t = self.top.load(SeqCst);
 336          let b = self.bottom.load(SeqCst);
 337          let a = self.array.load(SeqCst);
 338          // Free whatever is leftover in the dequeue, and then move the buffer
 339          // back into the pool.
 340          for i in range(t, b) {
 341              let _T = unsafe { (*a).get(i) };
 342          }
 343          self.pool.free(unsafe { cast::transmute(a) });
 344      }
 345  }
 346  
 347  impl<T: Send> Buffer<T> {
 348      unsafe fn new(log_sizeint) -> Buffer<T> {
 349          let size = (1 << log_size) * mem::size_of::<T>();
 350          let buffer = libc::malloc(size as libc::size_t);
 351          assert!(!buffer.is_null());
 352          Buffer {
 353              storage: buffer as *T,
 354              log_size: log_size,
 355          }
 356      }
 357  
 358      fn size(&self) -> int { 1 << self.log_size }
 359  
 360      // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly
 361      fn mask(&self) -> int { (1 << self.log_size) - 1 }
 362  
 363      // This does not protect against loading duplicate values of the same cell,
 364      // nor does this clear out the contents contained within. Hence, this is a
 365      // very unsafe method which the caller needs to treat specially in case a
 366      // race is lost.
 367      unsafe fn get(&self, iint) -> T {
 368          ptr::read(self.storage.offset(i & self.mask()))
 369      }
 370  
 371      // Unsafe because this unsafely overwrites possibly uninitialized or
 372      // initialized data.
 373      unsafe fn put(&mut self, iint, tT) {
 374          let ptr = self.storage.offset(i & self.mask());
 375          ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1);
 376          cast::forget(t);
 377      }
 378  
 379      // Again, unsafe because this has incredibly dubious ownership violations.
 380      // It is assumed that this buffer is immediately dropped.
 381      unsafe fn resize(&self, bint, tint, deltaint) -> Buffer<T> {
 382          let mut buf = Buffer::new(self.log_size + delta);
 383          for i in range(t, b) {
 384              buf.put(i, self.get(i));
 385          }
 386          return buf;
 387      }
 388  }
 389  
 390  #[unsafe_destructor]
 391  impl<T: Send> Drop for Buffer<T> {
 392      fn drop(&mut self) {
 393          // It is assumed that all buffers are empty on drop.
 394          unsafe { libc::free(self.storage as *mut libc::c_void) }
 395      }
 396  }
 397  
 398  #[cfg(test)]
 399  mod tests {
 400      use prelude::*;
 401      use super::{Data, BufferPool, Abort, Empty, Worker, Stealer};
 402  
 403      use cast;
 404      use owned::Box;
 405      use rt::thread::Thread;
 406      use rand;
 407      use rand::Rng;
 408      use sync::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst,
 409                          AtomicUint, INIT_ATOMIC_UINT};
 410      use vec;
 411  
 412      #[test]
 413      fn smoke() {
 414          let mut pool = BufferPool::new();
 415          let (mut w, mut s) = pool.deque();
 416          assert_eq!(w.pop(), None);
 417          assert_eq!(s.steal(), Empty);
 418          w.push(1);
 419          assert_eq!(w.pop(), Some(1));
 420          w.push(1);
 421          assert_eq!(s.steal(), Data(1));
 422          w.push(1);
 423          assert_eq!(s.clone().steal(), Data(1));
 424      }
 425  
 426      #[test]
 427      fn stealpush() {
 428          static AMT: int = 100000;
 429          let mut pool = BufferPool::<int>::new();
 430          let (mut w, s) = pool.deque();
 431          let t = Thread::start(proc() {
 432              let mut s = s;
 433              let mut left = AMT;
 434              while left > 0 {
 435                  match s.steal() {
 436                      Data(i) => {
 437                          assert_eq!(i, 1);
 438                          left -= 1;
 439                      }
 440                      Abort | Empty => {}
 441                  }
 442              }
 443          });
 444  
 445          for _ in range(0, AMT) {
 446              w.push(1);
 447          }
 448  
 449          t.join();
 450      }
 451  
 452      #[test]
 453      fn stealpush_large() {
 454          static AMT: int = 100000;
 455          let mut pool = BufferPool::<(int, int)>::new();
 456          let (mut w, s) = pool.deque();
 457          let t = Thread::start(proc() {
 458              let mut s = s;
 459              let mut left = AMT;
 460              while left > 0 {
 461                  match s.steal() {
 462                      Data((1, 10)) => { left -= 1; }
 463                      Data(..) => fail!(),
 464                      Abort | Empty => {}
 465                  }
 466              }
 467          });
 468  
 469          for _ in range(0, AMT) {
 470              w.push((1, 10));
 471          }
 472  
 473          t.join();
 474      }
 475  
 476      fn stampede(mut w: Worker<Box<int>>, s: Stealer<Box<int>>,
 477                  nthreads: int, amt: uint) {
 478          for _ in range(0, amt) {
 479              w.push(box 20);
 480          }
 481          let mut remaining = AtomicUint::new(amt);
 482          let unsafe_remaining: *mut AtomicUint = &mut remaining;
 483  
 484          let threads = range(0, nthreads).map(|_| {
 485              let s = s.clone();
 486              Thread::start(proc() {
 487                  unsafe {
 488                      let mut s = s;
 489                      while (*unsafe_remaining).load(SeqCst) > 0 {
 490                          match s.steal() {
 491                              Data(box 20) => {
 492                                  (*unsafe_remaining).fetch_sub(1, SeqCst);
 493                              }
 494                              Data(..) => fail!(),
 495                              Abort | Empty => {}
 496                          }
 497                      }
 498                  }
 499              })
 500          }).collect::<Vec<Thread<()>>>();
 501  
 502          while remaining.load(SeqCst) > 0 {
 503              match w.pop() {
 504                  Some(box 20) => { remaining.fetch_sub(1, SeqCst); }
 505                  Some(..) => fail!(),
 506                  None => {}
 507              }
 508          }
 509  
 510          for thread in threads.move_iter() {
 511              thread.join();
 512          }
 513      }
 514  
 515      #[test]
 516      fn run_stampede() {
 517          let mut pool = BufferPool::<Box<int>>::new();
 518          let (w, s) = pool.deque();
 519          stampede(w, s, 8, 10000);
 520      }
 521  
 522      #[test]
 523      fn many_stampede() {
 524          static AMT: uint = 4;
 525          let mut pool = BufferPool::<Box<int>>::new();
 526          let threads = range(0, AMT).map(|_| {
 527              let (w, s) = pool.deque();
 528              Thread::start(proc() {
 529                  stampede(w, s, 4, 10000);
 530              })
 531          }).collect::<Vec<Thread<()>>>();
 532  
 533          for thread in threads.move_iter() {
 534              thread.join();
 535          }
 536      }
 537  
 538      #[test]
 539      fn stress() {
 540          static AMT: int = 100000;
 541          static NTHREADS: int = 8;
 542          static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
 543          static mut HITS: AtomicUint = INIT_ATOMIC_UINT;
 544          let mut pool = BufferPool::<int>::new();
 545          let (mut w, s) = pool.deque();
 546  
 547          let threads = range(0, NTHREADS).map(|_| {
 548              let s = s.clone();
 549              Thread::start(proc() {
 550                  unsafe {
 551                      let mut s = s;
 552                      loop {
 553                          match s.steal() {
 554                              Data(2) => { HITS.fetch_add(1, SeqCst); }
 555                              Data(..) => fail!(),
 556                              _ if DONE.load(SeqCst) => break,
 557                              _ => {}
 558                          }
 559                      }
 560                  }
 561              })
 562          }).collect::<Vec<Thread<()>>>();
 563  
 564          let mut rng = rand::task_rng();
 565          let mut expected = 0;
 566          while expected < AMT {
 567              if rng.gen_range(0, 3) == 2 {
 568                  match w.pop() {
 569                      None => {}
 570                      Some(2) => unsafe { HITS.fetch_add(1, SeqCst); },
 571                      Some(_) => fail!(),
 572                  }
 573              } else {
 574                  expected += 1;
 575                  w.push(2);
 576              }
 577          }
 578  
 579          unsafe {
 580              while HITS.load(SeqCst) < AMT as uint {
 581                  match w.pop() {
 582                      None => {}
 583                      Some(2) => { HITS.fetch_add(1, SeqCst); },
 584                      Some(_) => fail!(),
 585                  }
 586              }
 587              DONE.store(true, SeqCst);
 588          }
 589  
 590          for thread in threads.move_iter() {
 591              thread.join();
 592          }
 593  
 594          assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint);
 595      }
 596  
 597      #[test]
 598      #[ignore(cfg(windows))] // apparently windows scheduling is weird?
 599      fn no_starvation() {
 600          static AMT: int = 10000;
 601          static NTHREADS: int = 4;
 602          static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
 603          let mut pool = BufferPool::<(int, uint)>::new();
 604          let (mut w, s) = pool.deque();
 605  
 606          let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| {
 607              let s = s.clone();
 608              let unique_box = box AtomicUint::new(0);
 609              let thread_box = unsafe {
 610                  *cast::transmute::<&Box<AtomicUint>,
 611                                     **mut AtomicUint>(&unique_box)
 612              };
 613              (Thread::start(proc() {
 614                  unsafe {
 615                      let mut s = s;
 616                      loop {
 617                          match s.steal() {
 618                              Data((1, 2)) => {
 619                                  (*thread_box).fetch_add(1, SeqCst);
 620                              }
 621                              Data(..) => fail!(),
 622                              _ if DONE.load(SeqCst) => break,
 623                              _ => {}
 624                          }
 625                      }
 626                  }
 627              }), unique_box)
 628          }));
 629  
 630          let mut rng = rand::task_rng();
 631          let mut myhit = false;
 632          let mut iter = 0;
 633          'outer: loop {
 634              for _ in range(0, rng.gen_range(0, AMT)) {
 635                  if !myhit && rng.gen_range(0, 3) == 2 {
 636                      match w.pop() {
 637                          None => {}
 638                          Some((1, 2)) => myhit = true,
 639                          Some(_) => fail!(),
 640                      }
 641                  } else {
 642                      w.push((1, 2));
 643                  }
 644              }
 645              iter += 1;
 646  
 647              debug!("loop iteration {}", iter);
 648              for (i, slot) in hits.iter().enumerate() {
 649                  let amt = slot.load(SeqCst);
 650                  debug!("thread {}{}", i, amt);
 651                  if amt == 0 { continue 'outer; }
 652              }
 653              if myhit {
 654                  break
 655              }
 656          }
 657  
 658          unsafe { DONE.store(true, SeqCst); }
 659  
 660          for thread in threads.move_iter() {
 661              thread.join();
 662          }
 663      }
 664  }


libstd/sync/deque.rs:89:44-89:44 -struct- definition:
/// There may only be one worker per deque.
pub struct Worker<T> {
    deque: UnsafeArc<Deque<T>>,
references:- 3
154:         let (a, b) = UnsafeArc::new2(Deque::new(self.clone()));
155:         (Worker { deque: a }, Stealer { deque: b })
156:     }
--
187: impl<T: Send> Worker<T> {
188:     /// Pushes data onto the front of this work queue.


libstd/sync/deque.rs:96:20-96:20 -struct- definition:
/// `steal` method.
pub struct Stealer<T> {
    deque: UnsafeArc<Deque<T>>,
references:- 6
154:         let (a, b) = UnsafeArc::new2(Deque::new(self.clone()));
155:         (Worker { deque: a }, Stealer { deque: b })
156:     }
--
220: impl<T: Send> Clone for Stealer<T> {
221:     fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } }
222: }


libstd/sync/deque.rs:138:61-138:61 -struct- definition:
///      LLVM is probably pretty good at doing this already.
struct Buffer<T> {
    storage: *T,
references:- 13
351:         assert!(!buffer.is_null());
352:         Buffer {
353:             storage: buffer as *T,
--
391: impl<T: Send> Drop for Buffer<T> {
392:     fn drop(&mut self) {


libstd/sync/deque.rs:78:1-78:1 -struct- definition:
struct Deque<T> {
    bottom: AtomicInt,
    top: AtomicInt,
references:- 6
229:         let buf = pool.alloc(MIN_BITS);
230:         Deque {
231:             bottom: AtomicInt::new(0),
--
333: impl<T: Send> Drop for Deque<T> {
334:     fn drop(&mut self) {


libstd/sync/deque.rs:119:18-119:18 -struct- definition:
/// previous one.
pub struct BufferPool<T> {
    pool: Exclusive<Vec<Box<Buffer<T>>>>,
references:- 10
147:     pub fn new() -> BufferPool<T> {
148:         BufferPool { pool: Exclusive::new(vec!()) }
149:     }
--
183: impl<T: Send> Clone for BufferPool<T> {
184:     fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } }
185: }
--
227: impl<T: Send> Deque<T> {
228:     fn new(mut pool: BufferPool<T>) -> Deque<T> {
229:         let buf = pool.alloc(MIN_BITS);


libstd/sync/deque.rs:102:22-102:22 -enum- definition:
pub enum Stolen<T> {
    /// The deque was empty at the time of stealing
    Empty,
references:- 6
101: /// When stealing some data, this is an enumeration of the possible outcomes.
103: pub enum Stolen<T> {
--
207:     /// Steals work off the end of the queue (opposite of the worker's end)
208:     pub fn steal(&mut self) -> Stolen<T> {
209:         unsafe { (*self.deque.get()).steal() }
--
280:     unsafe fn steal(&mut self) -> Stolen<T> {
281:         let t = self.top.load(SeqCst);