(index<- ) ./libstd/sync/deque.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! A (mostly) lock-free concurrent work-stealing deque
12 //!
13 //! This module contains an implementation of the Chase-Lev work stealing deque
14 //! described in "Dynamic Circular Work-Stealing Deque". The implementation is
15 //! heavily based on the pseudocode found in the paper.
16 //!
17 //! This implementation does not want to have the restriction of a garbage
18 //! collector for reclamation of buffers, and instead it uses a shared pool of
19 //! buffers. This shared pool is required for correctness in this
20 //! implementation.
21 //!
22 //! The only lock-synchronized portions of this deque are the buffer allocation
23 //! and deallocation portions. Otherwise all operations are lock-free.
24 //!
25 //! # Example
26 //!
27 //! use std::rt::deque::BufferPool;
28 //!
29 //! let mut pool = BufferPool::new();
30 //! let (mut worker, mut stealer) = pool.deque();
31 //!
32 //! // Only the worker may push/pop
33 //! worker.push(1);
34 //! worker.pop();
35 //!
36 //! // Stealers take data from the other end of the deque
37 //! worker.push(1);
38 //! stealer.steal();
39 //!
40 //! // Stealers can be cloned to have many stealers stealing in parallel
41 //! worker.push(1);
42 //! let mut stealer2 = stealer.clone();
43 //! stealer2.steal();
44
45 // NB: the "buffer pool" strategy is not done for speed, but rather for
46 // correctness. For more info, see the comment on `swap_buffer`
47
48 // FIXME: all atomic operations in this module use a SeqCst ordering. That is
49 // probably overkill
50
51 use cast;
52 use clone::Clone;
53 use iter::{range, Iterator};
54 use kinds::Send;
55 use libc;
56 use mem;
57 use ops::Drop;
58 use option::{Option, Some, None};
59 use owned::Box;
60 use ptr;
61 use ptr::RawPtr;
62 use sync::arc::UnsafeArc;
63 use sync::atomics::{AtomicInt, AtomicPtr, SeqCst};
64 use unstable::sync::Exclusive;
65 use slice::ImmutableVector;
66 use vec::Vec;
67
68 // Once the queue is less than 1/K full, then it will be downsized. Note that
69 // the deque requires that this number be less than 2.
70 static K: int = 4;
71
72 // Minimum number of bits that a buffer size should be. No buffer will resize to
73 // under this value, and all deques will initially contain a buffer of this
74 // size.
75 //
76 // The size in question is 1 << MIN_BITS
77 static MIN_BITS: int = 7;
78
79 struct Deque<T> {
80 bottom: AtomicInt,
81 top: AtomicInt,
82 array: AtomicPtr<Buffer<T>>,
83 pool: BufferPool<T>,
84 }
85
86 /// Worker half of the work-stealing deque. This worker has exclusive access to
87 /// one side of the deque, and uses `push` and `pop` method to manipulate it.
88 ///
89 /// There may only be one worker per deque.
90 pub struct Worker<T> {
91 deque: UnsafeArc<Deque<T>>,
92 }
93
94 /// The stealing half of the work-stealing deque. Stealers have access to the
95 /// opposite end of the deque from the worker, and they only have access to the
96 /// `steal` method.
97 pub struct Stealer<T> {
98 deque: UnsafeArc<Deque<T>>,
99 }
100
101 /// When stealing some data, this is an enumeration of the possible outcomes.
102 #[deriving(Eq, Show)]
103 pub enum Stolen<T> {
104 /// The deque was empty at the time of stealing
105 Empty,
106 /// The stealer lost the race for stealing data, and a retry may return more
107 /// data.
108 Abort,
109 /// The stealer has successfully stolen some data.
110 Data(T),
111 }
112
113 /// The allocation pool for buffers used by work-stealing deques. Right now this
114 /// structure is used for reclamation of memory after it is no longer in use by
115 /// deques.
116 ///
117 /// This data structure is protected by a mutex, but it is rarely used. Deques
118 /// will only use this structure when allocating a new buffer or deallocating a
119 /// previous one.
120 pub struct BufferPool<T> {
121 pool: Exclusive<Vec<Box<Buffer<T>>>>,
122 }
123
124 /// An internal buffer used by the chase-lev deque. This structure is actually
125 /// implemented as a circular buffer, and is used as the intermediate storage of
126 /// the data in the deque.
127 ///
128 /// This type is implemented with *T instead of Vec<T> for two reasons:
129 ///
130 /// 1. There is nothing safe about using this buffer. This easily allows the
131 /// same value to be read twice in to rust, and there is nothing to
132 /// prevent this. The usage by the deque must ensure that one of the
133 /// values is forgotten. Furthermore, we only ever want to manually run
134 /// destructors for values in this buffer (on drop) because the bounds
135 /// are defined by the deque it's owned by.
136 ///
137 /// 2. We can certainly avoid bounds checks using *T instead of Vec<T>, although
138 /// LLVM is probably pretty good at doing this already.
139 struct Buffer<T> {
140 storage: *T,
141 log_size: int,
142 }
143
144 impl<T: Send> BufferPool<T> {
145 /// Allocates a new buffer pool which in turn can be used to allocate new
146 /// deques.
147 pub fn new() -> BufferPool<T> {
148 BufferPool { pool: Exclusive::new(vec!()) }
149 }
150
151 /// Allocates a new work-stealing deque which will send/receiving memory to
152 /// and from this buffer pool.
153 pub fn deque(&mut self) -> (Worker<T>, Stealer<T>) {
154 let (a, b) = UnsafeArc::new2(Deque::new(self.clone()));
155 (Worker { deque: a }, Stealer { deque: b })
156 }
157
158 fn alloc(&mut self, bits: int) -> Box<Buffer<T>> {
159 unsafe {
160 self.pool.with(|pool| {
161 match pool.iter().position(|x| x.size() >= (1 << bits)) {
162 Some(i) => pool.remove(i).unwrap(),
163 None => box Buffer::new(bits)
164 }
165 })
166 }
167 }
168
169 fn free(&mut self, buf: Box<Buffer<T>>) {
170 unsafe {
171 let mut buf = Some(buf);
172 self.pool.with(|pool| {
173 let buf = buf.take_unwrap();
174 match pool.iter().position(|v| v.size() > buf.size()) {
175 Some(i) => pool.insert(i, buf),
176 None => pool.push(buf),
177 }
178 })
179 }
180 }
181 }
182
183 impl<T: Send> Clone for BufferPool<T> {
184 fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } }
185 }
186
187 impl<T: Send> Worker<T> {
188 /// Pushes data onto the front of this work queue.
189 pub fn push(&mut self, t: T) {
190 unsafe { (*self.deque.get()).push(t) }
191 }
192 /// Pops data off the front of the work queue, returning `None` on an empty
193 /// queue.
194 pub fn pop(&mut self) -> Option<T> {
195 unsafe { (*self.deque.get()).pop() }
196 }
197
198 /// Gets access to the buffer pool that this worker is attached to. This can
199 /// be used to create more deques which share the same buffer pool as this
200 /// deque.
201 pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
202 unsafe { &mut (*self.deque.get()).pool }
203 }
204 }
205
206 impl<T: Send> Stealer<T> {
207 /// Steals work off the end of the queue (opposite of the worker's end)
208 pub fn steal(&mut self) -> Stolen<T> {
209 unsafe { (*self.deque.get()).steal() }
210 }
211
212 /// Gets access to the buffer pool that this stealer is attached to. This
213 /// can be used to create more deques which share the same buffer pool as
214 /// this deque.
215 pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
216 unsafe { &mut (*self.deque.get()).pool }
217 }
218 }
219
220 impl<T: Send> Clone for Stealer<T> {
221 fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } }
222 }
223
224 // Almost all of this code can be found directly in the paper so I'm not
225 // personally going to heavily comment what's going on here.
226
227 impl<T: Send> Deque<T> {
228 fn new(mut pool: BufferPool<T>) -> Deque<T> {
229 let buf = pool.alloc(MIN_BITS);
230 Deque {
231 bottom: AtomicInt::new(0),
232 top: AtomicInt::new(0),
233 array: AtomicPtr::new(unsafe { cast::transmute(buf) }),
234 pool: pool,
235 }
236 }
237
238 unsafe fn push(&mut self, data: T) {
239 let mut b = self.bottom.load(SeqCst);
240 let t = self.top.load(SeqCst);
241 let mut a = self.array.load(SeqCst);
242 let size = b - t;
243 if size >= (*a).size() - 1 {
244 // You won't find this code in the chase-lev deque paper. This is
245 // alluded to in a small footnote, however. We always free a buffer
246 // when growing in order to prevent leaks.
247 a = self.swap_buffer(b, a, (*a).resize(b, t, 1));
248 b = self.bottom.load(SeqCst);
249 }
250 (*a).put(b, data);
251 self.bottom.store(b + 1, SeqCst);
252 }
253
254 unsafe fn pop(&mut self) -> Option<T> {
255 let b = self.bottom.load(SeqCst);
256 let a = self.array.load(SeqCst);
257 let b = b - 1;
258 self.bottom.store(b, SeqCst);
259 let t = self.top.load(SeqCst);
260 let size = b - t;
261 if size < 0 {
262 self.bottom.store(t, SeqCst);
263 return None;
264 }
265 let data = (*a).get(b);
266 if size > 0 {
267 self.maybe_shrink(b, t);
268 return Some(data);
269 }
270 if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
271 self.bottom.store(t + 1, SeqCst);
272 return Some(data);
273 } else {
274 self.bottom.store(t + 1, SeqCst);
275 cast::forget(data); // someone else stole this value
276 return None;
277 }
278 }
279
280 unsafe fn steal(&mut self) -> Stolen<T> {
281 let t = self.top.load(SeqCst);
282 let old = self.array.load(SeqCst);
283 let b = self.bottom.load(SeqCst);
284 let a = self.array.load(SeqCst);
285 let size = b - t;
286 if size <= 0 { return Empty }
287 if size % (*a).size() == 0 {
288 if a == old && t == self.top.load(SeqCst) {
289 return Empty
290 }
291 return Abort
292 }
293 let data = (*a).get(t);
294 if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
295 Data(data)
296 } else {
297 cast::forget(data); // someone else stole this value
298 Abort
299 }
300 }
301
302 unsafe fn maybe_shrink(&mut self, b: int, t: int) {
303 let a = self.array.load(SeqCst);
304 if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) {
305 self.swap_buffer(b, a, (*a).resize(b, t, -1));
306 }
307 }
308
309 // Helper routine not mentioned in the paper which is used in growing and
310 // shrinking buffers to swap in a new buffer into place. As a bit of a
311 // recap, the whole point that we need a buffer pool rather than just
312 // calling malloc/free directly is that stealers can continue using buffers
313 // after this method has called 'free' on it. The continued usage is simply
314 // a read followed by a forget, but we must make sure that the memory can
315 // continue to be read after we flag this buffer for reclamation.
316 unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer<T>,
317 buf: Buffer<T>) -> *mut Buffer<T> {
318 let newbuf: *mut Buffer<T> = cast::transmute(box buf);
319 self.array.store(newbuf, SeqCst);
320 let ss = (*newbuf).size();
321 self.bottom.store(b + ss, SeqCst);
322 let t = self.top.load(SeqCst);
323 if self.top.compare_and_swap(t, t + ss, SeqCst) != t {
324 self.bottom.store(b, SeqCst);
325 }
326 self.pool.free(cast::transmute(old));
327 return newbuf;
328 }
329 }
330
331
332 #[unsafe_destructor]
333 impl<T: Send> Drop for Deque<T> {
334 fn drop(&mut self) {
335 let t = self.top.load(SeqCst);
336 let b = self.bottom.load(SeqCst);
337 let a = self.array.load(SeqCst);
338 // Free whatever is leftover in the dequeue, and then move the buffer
339 // back into the pool.
340 for i in range(t, b) {
341 let _: T = unsafe { (*a).get(i) };
342 }
343 self.pool.free(unsafe { cast::transmute(a) });
344 }
345 }
346
347 impl<T: Send> Buffer<T> {
348 unsafe fn new(log_size: int) -> Buffer<T> {
349 let size = (1 << log_size) * mem::size_of::<T>();
350 let buffer = libc::malloc(size as libc::size_t);
351 assert!(!buffer.is_null());
352 Buffer {
353 storage: buffer as *T,
354 log_size: log_size,
355 }
356 }
357
358 fn size(&self) -> int { 1 << self.log_size }
359
360 // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly
361 fn mask(&self) -> int { (1 << self.log_size) - 1 }
362
363 // This does not protect against loading duplicate values of the same cell,
364 // nor does this clear out the contents contained within. Hence, this is a
365 // very unsafe method which the caller needs to treat specially in case a
366 // race is lost.
367 unsafe fn get(&self, i: int) -> T {
368 ptr::read(self.storage.offset(i & self.mask()))
369 }
370
371 // Unsafe because this unsafely overwrites possibly uninitialized or
372 // initialized data.
373 unsafe fn put(&mut self, i: int, t: T) {
374 let ptr = self.storage.offset(i & self.mask());
375 ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1);
376 cast::forget(t);
377 }
378
379 // Again, unsafe because this has incredibly dubious ownership violations.
380 // It is assumed that this buffer is immediately dropped.
381 unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> {
382 let mut buf = Buffer::new(self.log_size + delta);
383 for i in range(t, b) {
384 buf.put(i, self.get(i));
385 }
386 return buf;
387 }
388 }
389
390 #[unsafe_destructor]
391 impl<T: Send> Drop for Buffer<T> {
392 fn drop(&mut self) {
393 // It is assumed that all buffers are empty on drop.
394 unsafe { libc::free(self.storage as *mut libc::c_void) }
395 }
396 }
397
398 #[cfg(test)]
399 mod tests {
400 use prelude::*;
401 use super::{Data, BufferPool, Abort, Empty, Worker, Stealer};
402
403 use cast;
404 use owned::Box;
405 use rt::thread::Thread;
406 use rand;
407 use rand::Rng;
408 use sync::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst,
409 AtomicUint, INIT_ATOMIC_UINT};
410 use vec;
411
412 #[test]
413 fn smoke() {
414 let mut pool = BufferPool::new();
415 let (mut w, mut s) = pool.deque();
416 assert_eq!(w.pop(), None);
417 assert_eq!(s.steal(), Empty);
418 w.push(1);
419 assert_eq!(w.pop(), Some(1));
420 w.push(1);
421 assert_eq!(s.steal(), Data(1));
422 w.push(1);
423 assert_eq!(s.clone().steal(), Data(1));
424 }
425
426 #[test]
427 fn stealpush() {
428 static AMT: int = 100000;
429 let mut pool = BufferPool::<int>::new();
430 let (mut w, s) = pool.deque();
431 let t = Thread::start(proc() {
432 let mut s = s;
433 let mut left = AMT;
434 while left > 0 {
435 match s.steal() {
436 Data(i) => {
437 assert_eq!(i, 1);
438 left -= 1;
439 }
440 Abort | Empty => {}
441 }
442 }
443 });
444
445 for _ in range(0, AMT) {
446 w.push(1);
447 }
448
449 t.join();
450 }
451
452 #[test]
453 fn stealpush_large() {
454 static AMT: int = 100000;
455 let mut pool = BufferPool::<(int, int)>::new();
456 let (mut w, s) = pool.deque();
457 let t = Thread::start(proc() {
458 let mut s = s;
459 let mut left = AMT;
460 while left > 0 {
461 match s.steal() {
462 Data((1, 10)) => { left -= 1; }
463 Data(..) => fail!(),
464 Abort | Empty => {}
465 }
466 }
467 });
468
469 for _ in range(0, AMT) {
470 w.push((1, 10));
471 }
472
473 t.join();
474 }
475
476 fn stampede(mut w: Worker<Box<int>>, s: Stealer<Box<int>>,
477 nthreads: int, amt: uint) {
478 for _ in range(0, amt) {
479 w.push(box 20);
480 }
481 let mut remaining = AtomicUint::new(amt);
482 let unsafe_remaining: *mut AtomicUint = &mut remaining;
483
484 let threads = range(0, nthreads).map(|_| {
485 let s = s.clone();
486 Thread::start(proc() {
487 unsafe {
488 let mut s = s;
489 while (*unsafe_remaining).load(SeqCst) > 0 {
490 match s.steal() {
491 Data(box 20) => {
492 (*unsafe_remaining).fetch_sub(1, SeqCst);
493 }
494 Data(..) => fail!(),
495 Abort | Empty => {}
496 }
497 }
498 }
499 })
500 }).collect::<Vec<Thread<()>>>();
501
502 while remaining.load(SeqCst) > 0 {
503 match w.pop() {
504 Some(box 20) => { remaining.fetch_sub(1, SeqCst); }
505 Some(..) => fail!(),
506 None => {}
507 }
508 }
509
510 for thread in threads.move_iter() {
511 thread.join();
512 }
513 }
514
515 #[test]
516 fn run_stampede() {
517 let mut pool = BufferPool::<Box<int>>::new();
518 let (w, s) = pool.deque();
519 stampede(w, s, 8, 10000);
520 }
521
522 #[test]
523 fn many_stampede() {
524 static AMT: uint = 4;
525 let mut pool = BufferPool::<Box<int>>::new();
526 let threads = range(0, AMT).map(|_| {
527 let (w, s) = pool.deque();
528 Thread::start(proc() {
529 stampede(w, s, 4, 10000);
530 })
531 }).collect::<Vec<Thread<()>>>();
532
533 for thread in threads.move_iter() {
534 thread.join();
535 }
536 }
537
538 #[test]
539 fn stress() {
540 static AMT: int = 100000;
541 static NTHREADS: int = 8;
542 static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
543 static mut HITS: AtomicUint = INIT_ATOMIC_UINT;
544 let mut pool = BufferPool::<int>::new();
545 let (mut w, s) = pool.deque();
546
547 let threads = range(0, NTHREADS).map(|_| {
548 let s = s.clone();
549 Thread::start(proc() {
550 unsafe {
551 let mut s = s;
552 loop {
553 match s.steal() {
554 Data(2) => { HITS.fetch_add(1, SeqCst); }
555 Data(..) => fail!(),
556 _ if DONE.load(SeqCst) => break,
557 _ => {}
558 }
559 }
560 }
561 })
562 }).collect::<Vec<Thread<()>>>();
563
564 let mut rng = rand::task_rng();
565 let mut expected = 0;
566 while expected < AMT {
567 if rng.gen_range(0, 3) == 2 {
568 match w.pop() {
569 None => {}
570 Some(2) => unsafe { HITS.fetch_add(1, SeqCst); },
571 Some(_) => fail!(),
572 }
573 } else {
574 expected += 1;
575 w.push(2);
576 }
577 }
578
579 unsafe {
580 while HITS.load(SeqCst) < AMT as uint {
581 match w.pop() {
582 None => {}
583 Some(2) => { HITS.fetch_add(1, SeqCst); },
584 Some(_) => fail!(),
585 }
586 }
587 DONE.store(true, SeqCst);
588 }
589
590 for thread in threads.move_iter() {
591 thread.join();
592 }
593
594 assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint);
595 }
596
597 #[test]
598 #[ignore(cfg(windows))] // apparently windows scheduling is weird?
599 fn no_starvation() {
600 static AMT: int = 10000;
601 static NTHREADS: int = 4;
602 static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
603 let mut pool = BufferPool::<(int, uint)>::new();
604 let (mut w, s) = pool.deque();
605
606 let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| {
607 let s = s.clone();
608 let unique_box = box AtomicUint::new(0);
609 let thread_box = unsafe {
610 *cast::transmute::<&Box<AtomicUint>,
611 **mut AtomicUint>(&unique_box)
612 };
613 (Thread::start(proc() {
614 unsafe {
615 let mut s = s;
616 loop {
617 match s.steal() {
618 Data((1, 2)) => {
619 (*thread_box).fetch_add(1, SeqCst);
620 }
621 Data(..) => fail!(),
622 _ if DONE.load(SeqCst) => break,
623 _ => {}
624 }
625 }
626 }
627 }), unique_box)
628 }));
629
630 let mut rng = rand::task_rng();
631 let mut myhit = false;
632 let mut iter = 0;
633 'outer: loop {
634 for _ in range(0, rng.gen_range(0, AMT)) {
635 if !myhit && rng.gen_range(0, 3) == 2 {
636 match w.pop() {
637 None => {}
638 Some((1, 2)) => myhit = true,
639 Some(_) => fail!(),
640 }
641 } else {
642 w.push((1, 2));
643 }
644 }
645 iter += 1;
646
647 debug!("loop iteration {}", iter);
648 for (i, slot) in hits.iter().enumerate() {
649 let amt = slot.load(SeqCst);
650 debug!("thread {}: {}", i, amt);
651 if amt == 0 { continue 'outer; }
652 }
653 if myhit {
654 break
655 }
656 }
657
658 unsafe { DONE.store(true, SeqCst); }
659
660 for thread in threads.move_iter() {
661 thread.join();
662 }
663 }
664 }
libstd/sync/deque.rs:89:44-89:44 -struct- definition:
/// There may only be one worker per deque.
pub struct Worker<T> {
deque: UnsafeArc<Deque<T>>,
references:- 3154: let (a, b) = UnsafeArc::new2(Deque::new(self.clone()));
155: (Worker { deque: a }, Stealer { deque: b })
156: }
--
187: impl<T: Send> Worker<T> {
188: /// Pushes data onto the front of this work queue.
libstd/sync/deque.rs:96:20-96:20 -struct- definition:
/// `steal` method.
pub struct Stealer<T> {
deque: UnsafeArc<Deque<T>>,
references:- 6154: let (a, b) = UnsafeArc::new2(Deque::new(self.clone()));
155: (Worker { deque: a }, Stealer { deque: b })
156: }
--
220: impl<T: Send> Clone for Stealer<T> {
221: fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } }
222: }
libstd/sync/deque.rs:138:61-138:61 -struct- definition:
/// LLVM is probably pretty good at doing this already.
struct Buffer<T> {
storage: *T,
references:- 13351: assert!(!buffer.is_null());
352: Buffer {
353: storage: buffer as *T,
--
391: impl<T: Send> Drop for Buffer<T> {
392: fn drop(&mut self) {
libstd/sync/deque.rs:78:1-78:1 -struct- definition:
struct Deque<T> {
bottom: AtomicInt,
top: AtomicInt,
references:- 6229: let buf = pool.alloc(MIN_BITS);
230: Deque {
231: bottom: AtomicInt::new(0),
--
333: impl<T: Send> Drop for Deque<T> {
334: fn drop(&mut self) {
libstd/sync/deque.rs:119:18-119:18 -struct- definition:
/// previous one.
pub struct BufferPool<T> {
pool: Exclusive<Vec<Box<Buffer<T>>>>,
references:- 10147: pub fn new() -> BufferPool<T> {
148: BufferPool { pool: Exclusive::new(vec!()) }
149: }
--
183: impl<T: Send> Clone for BufferPool<T> {
184: fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } }
185: }
--
227: impl<T: Send> Deque<T> {
228: fn new(mut pool: BufferPool<T>) -> Deque<T> {
229: let buf = pool.alloc(MIN_BITS);
libstd/sync/deque.rs:102:22-102:22 -enum- definition:
pub enum Stolen<T> {
/// The deque was empty at the time of stealing
Empty,
references:- 6101: /// When stealing some data, this is an enumeration of the possible outcomes.
103: pub enum Stolen<T> {
--
207: /// Steals work off the end of the queue (opposite of the worker's end)
208: pub fn steal(&mut self) -> Stolen<T> {
209: unsafe { (*self.deque.get()).steal() }
--
280: unsafe fn steal(&mut self) -> Stolen<T> {
281: let t = self.top.load(SeqCst);