1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2 * Redistribution and use in source and binary forms, with or without
3 * modification, are permitted provided that the following conditions are met:
4 *
5 * 1. Redistributions of source code must retain the above copyright notice,
6 * this list of conditions and the following disclaimer.
7 *
8 * 2. Redistributions in binary form must reproduce the above copyright
9 * notice, this list of conditions and the following disclaimer in the
10 * documentation and/or other materials provided with the distribution.
11 *
12 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
15 * EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
18 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
20 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
21 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22 *
23 * The views and conclusions contained in the software and documentation are
24 * those of the authors and should not be interpreted as representing official
25 * policies, either expressed or implied, of Dmitry Vyukov.
26 */
27
28 //! A mostly lock-free multi-producer, single consumer queue.
29 //!
30 //! This module implements an intrusive MPSC queue. This queue is incredibly
31 //! unsafe (due to use of unsafe pointers for nodes), and hence is not public.
32
33 // http://www.1024cores.net/home/lock-free-algorithms
34 // /queues/intrusive-mpsc-node-based-queue
35
36 use std::cast;
37 use std::sync::atomics;
38 use std::ty::Unsafe;
39
40 // NB: all links are done as AtomicUint instead of AtomicPtr to allow for static
41 // initialization.
42
43 pub struct Node<T> {
44 pub next: atomics::AtomicUint,
45 pub data: T,
46 }
47
48 pub struct DummyNode {
49 pub next: atomics::AtomicUint,
50 }
51
52 pub struct Queue<T> {
53 pub head: atomics::AtomicUint,
54 pub tail: Unsafe<*mut Node<T>>,
55 pub stub: DummyNode,
56 }
57
58 impl<T: Send> Queue<T> {
59 pub fn new() -> Queue<T> {
60 Queue {
61 head: atomics::AtomicUint::new(0),
62 tail: Unsafe::new(0 as *mut Node<T>),
63 stub: DummyNode {
64 next: atomics::AtomicUint::new(0),
65 },
66 }
67 }
68
69 pub unsafe fn push(&self, node: *mut Node<T>) {
70 (*node).next.store(0, atomics::Release);
71 let prev = self.head.swap(node as uint, atomics::AcqRel);
72
73 // Note that this code is slightly modified to allow static
74 // initialization of these queues with rust's flavor of static
75 // initialization.
76 if prev == 0 {
77 self.stub.next.store(node as uint, atomics::Release);
78 } else {
79 let prev = prev as *mut Node<T>;
80 (*prev).next.store(node as uint, atomics::Release);
81 }
82 }
83
84 /// You'll note that the other MPSC queue in std::sync is non-intrusive and
85 /// returns a `PopResult` here to indicate when the queue is inconsistent.
86 /// An "inconsistent state" in the other queue means that a pusher has
87 /// pushed, but it hasn't finished linking the rest of the chain.
88 ///
89 /// This queue also suffers from this problem, but I currently haven't been
90 /// able to detangle when this actually happens. This code is translated
91 /// verbatim from the website above, and is more complicated than the
92 /// non-intrusive version.
93 ///
94 /// Right now consumers of this queue must be ready for this fact. Just
95 /// because `pop` returns `None` does not mean that there is not data
96 /// on the queue.
97 pub unsafe fn pop(&self) -> Option<*mut Node<T>> {
98 let tail = *self.tail.get();
99 let mut tail = if !tail.is_null() {tail} else {
100 cast::transmute(&self.stub)
101 };
102 let mut next = (*tail).next(atomics::Relaxed);
103 if tail as uint == &self.stub as *DummyNode as uint {
104 if next.is_null() {
105 return None;
106 }
107 *self.tail.get() = next;
108 tail = next;
109 next = (*next).next(atomics::Relaxed);
110 }
111 if !next.is_null() {
112 *self.tail.get() = next;
113 return Some(tail);
114 }
115 let head = self.head.load(atomics::Acquire) as *mut Node<T>;
116 if tail != head {
117 return None;
118 }
119 let stub = cast::transmute(&self.stub);
120 self.push(stub);
121 next = (*tail).next(atomics::Relaxed);
122 if !next.is_null() {
123 *self.tail.get() = next;
124 return Some(tail);
125 }
126 return None
127 }
128 }
129
130 impl<T: Send> Node<T> {
131 pub fn new(t: T) -> Node<T> {
132 Node {
133 data: t,
134 next: atomics::AtomicUint::new(0),
135 }
136 }
137 pub unsafe fn next(&self, ord: atomics::Ordering) -> *mut Node<T> {
138 cast::transmute::<uint, *mut Node<T>>(self.next.load(ord))
139 }
140 }
libsync/mpsc_intrusive.rs:47:1-47:1 -struct- definition:
pub struct DummyNode {
pub next: atomics::AtomicUint,
}
references:- 462: tail: Unsafe::new(0 as *mut Node<T>),
63: stub: DummyNode {
64: next: atomics::AtomicUint::new(0),
libsync/mutex.rs:
169: },
170: stub: q::DummyNode {
171: next: atomics::INIT_ATOMIC_UINT,
libsync/mpsc_intrusive.rs:
102: let mut next = (*tail).next(atomics::Relaxed);
103: if tail as uint == &self.stub as *DummyNode as uint {
104: if next.is_null() {
libsync/mpsc_intrusive.rs:51:1-51:1 -struct- definition:
pub struct Queue<T> {
pub head: atomics::AtomicUint,
pub tail: Unsafe<*mut Node<T>>,
references:- 559: pub fn new() -> Queue<T> {
60: Queue {
61: head: atomics::AtomicUint::new(0),
libsync/mutex.rs:
163: green_cnt: atomics::INIT_ATOMIC_UINT,
164: q: q::Queue {
165: head: atomics::INIT_ATOMIC_UINT,
libsync/mpsc_intrusive.rs:
58: impl<T: Send> Queue<T> {
59: pub fn new() -> Queue<T> {
60: Queue {
libsync/mutex.rs:
143: /// to figure out when to dequeue and enqueue.
144: q: q::Queue<uint>,
145: green_cnt: atomics::AtomicUint,
libsync/mpsc_intrusive.rs:
58: impl<T: Send> Queue<T> {
59: pub fn new() -> Queue<T> {
libsync/mpsc_intrusive.rs:42:1-42:1 -struct- definition:
pub struct Node<T> {
pub next: atomics::AtomicUint,
pub data: T,
references:- 12131: pub fn new(t: T) -> Node<T> {
132: Node {
133: data: t,
--
137: pub unsafe fn next(&self, ord: atomics::Ordering) -> *mut Node<T> {
138: cast::transmute::<uint, *mut Node<T>>(self.next.load(ord))
139: }
libsync/mutex.rs:
166: tail: Unsafe {
167: value: 0 as *mut q::Node<uint>,
168: marker1: marker::InvariantType,
libsync/mpsc_intrusive.rs:
96: /// on the queue.
97: pub unsafe fn pop(&self) -> Option<*mut Node<T>> {
98: let tail = *self.tail.get();