1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2 * Redistribution and use in source and binary forms, with or without
3 * modification, are permitted provided that the following conditions are met:
4 *
5 * 1. Redistributions of source code must retain the above copyright notice,
6 * this list of conditions and the following disclaimer.
7 *
8 * 2. Redistributions in binary form must reproduce the above copyright
9 * notice, this list of conditions and the following disclaimer in the
10 * documentation and/or other materials provided with the distribution.
11 *
12 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15 * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22 *
23 * The views and conclusions contained in the software and documentation are
24 * those of the authors and should not be interpreted as representing official
25 * policies, either expressed or implied, of Dmitry Vyukov.
26 */
27
28 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
29
30 //! A single-producer single-consumer concurrent queue
31 //!
32 //! This module contains the implementation of an SPSC queue which can be used
33 //! concurrently between two tasks. This data structure is safe to use and
34 //! enforces the semantics that there is one pusher and one popper.
35
36 use cast;
37 use kinds::Send;
38 use ops::Drop;
39 use option::{Some, None, Option};
40 use owned::Box;
41 use ptr::RawPtr;
42 use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
43
44 // Node within the linked list queue of messages to send
45 struct Node<T> {
46 // FIXME: this could be an uninitialized T if we're careful enough, and
47 // that would reduce memory usage (and be a bit faster).
48 // is it worth it?
49 value: Option<T>, // nullable for re-use of nodes
50 next: AtomicPtr<Node<T>>, // next node in the queue
51 }
52
53 /// The single-producer single-consumer queue. This structure is not cloneable,
54 /// but it can be safely shared in an UnsafeArc if it is guaranteed that there
55 /// is only one popper and one pusher touching the queue at any one point in
56 /// time.
57 pub struct Queue<T> {
58 // consumer fields
59 tail: *mut Node<T>, // where to pop from
60 tail_prev: AtomicPtr<Node<T>>, // where to pop from
61
62 // producer fields
63 head: *mut Node<T>, // where to push to
64 first: *mut Node<T>, // where to get new nodes from
65 tail_copy: *mut Node<T>, // between first/tail
66
67 // Cache maintenance fields. Additions and subtractions are stored
68 // separately in order to allow them to use nonatomic addition/subtraction.
69 cache_bound: uint,
70 cache_additions: AtomicUint,
71 cache_subtractions: AtomicUint,
72 }
73
74 impl<T: Send> Node<T> {
75 fn new() -> *mut Node<T> {
76 unsafe {
77 cast::transmute(box Node {
78 value: None,
79 next: AtomicPtr::new(0 as *mut Node<T>),
80 })
81 }
82 }
83 }
84
85 impl<T: Send> Queue<T> {
86 /// Creates a new queue. The producer returned is connected to the consumer
87 /// to push all data to the consumer.
88 ///
89 /// # Arguments
90 ///
91 /// * `bound` - This queue implementation is implemented with a linked
92 /// list, and this means that a push is always a malloc. In
93 /// order to amortize this cost, an internal cache of nodes is
94 /// maintained to prevent a malloc from always being
95 /// necessary. This bound is the limit on the size of the
96 /// cache (if desired). If the value is 0, then the cache has
97 /// no bound. Otherwise, the cache will never grow larger than
98 /// `bound` (although the queue itself could be much larger.
99 pub fn new(bound: uint) -> Queue<T> {
100 let n1 = Node::new();
101 let n2 = Node::new();
102 unsafe { (*n1).next.store(n2, Relaxed) }
103 Queue {
104 tail: n2,
105 tail_prev: AtomicPtr::new(n1),
106 head: n2,
107 first: n1,
108 tail_copy: n1,
109 cache_bound: bound,
110 cache_additions: AtomicUint::new(0),
111 cache_subtractions: AtomicUint::new(0),
112 }
113 }
114
115 /// Pushes a new value onto this queue. Note that to use this function
116 /// safely, it must be externally guaranteed that there is only one pusher.
117 pub fn push(&mut self, t: T) {
118 unsafe {
119 // Acquire a node (which either uses a cached one or allocates a new
120 // one), and then append this to the 'head' node.
121 let n = self.alloc();
122 assert!((*n).value.is_none());
123 (*n).value = Some(t);
124 (*n).next.store(0 as *mut Node<T>, Relaxed);
125 (*self.head).next.store(n, Release);
126 self.head = n;
127 }
128 }
129
130 unsafe fn alloc(&mut self) -> *mut Node<T> {
131 // First try to see if we can consume the 'first' node for our uses.
132 // We try to avoid as many atomic instructions as possible here, so
133 // the addition to cache_subtractions is not atomic (plus we're the
134 // only one subtracting from the cache).
135 if self.first != self.tail_copy {
136 if self.cache_bound > 0 {
137 let b = self.cache_subtractions.load(Relaxed);
138 self.cache_subtractions.store(b + 1, Relaxed);
139 }
140 let ret = self.first;
141 self.first = (*ret).next.load(Relaxed);
142 return ret;
143 }
144 // If the above fails, then update our copy of the tail and try
145 // again.
146 self.tail_copy = self.tail_prev.load(Acquire);
147 if self.first != self.tail_copy {
148 if self.cache_bound > 0 {
149 let b = self.cache_subtractions.load(Relaxed);
150 self.cache_subtractions.store(b + 1, Relaxed);
151 }
152 let ret = self.first;
153 self.first = (*ret).next.load(Relaxed);
154 return ret;
155 }
156 // If all of that fails, then we have to allocate a new node
157 // (there's nothing in the node cache).
158 Node::new()
159 }
160
161 /// Attempts to pop a value from this queue. Remember that to use this type
162 /// safely you must ensure that there is only one popper at a time.
163 pub fn pop(&mut self) -> Option<T> {
164 unsafe {
165 // The `tail` node is not actually a used node, but rather a
166 // sentinel from where we should start popping from. Hence, look at
167 // tail's next field and see if we can use it. If we do a pop, then
168 // the current tail node is a candidate for going into the cache.
169 let tail = self.tail;
170 let next = (*tail).next.load(Acquire);
171 if next.is_null() { return None }
172 assert!((*next).value.is_some());
173 let ret = (*next).value.take();
174
175 self.tail = next;
176 if self.cache_bound == 0 {
177 self.tail_prev.store(tail, Release);
178 } else {
179 // FIXME: this is dubious with overflow.
180 let additions = self.cache_additions.load(Relaxed);
181 let subtractions = self.cache_subtractions.load(Relaxed);
182 let size = additions - subtractions;
183
184 if size < self.cache_bound {
185 self.tail_prev.store(tail, Release);
186 self.cache_additions.store(additions + 1, Relaxed);
187 } else {
188 (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed);
189 // We have successfully erased all references to 'tail', so
190 // now we can safely drop it.
191 let _: Box<Node<T>> = cast::transmute(tail);
192 }
193 }
194 return ret;
195 }
196 }
197
198 /// Attempts to peek at the head of the queue, returning `None` if the queue
199 /// has no data currently
200 pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
201 // This is essentially the same as above with all the popping bits
202 // stripped out.
203 unsafe {
204 let tail = self.tail;
205 let next = (*tail).next.load(Acquire);
206 if next.is_null() { return None }
207 return (*next).value.as_mut();
208 }
209 }
210 }
211
212 #[unsafe_destructor]
213 impl<T: Send> Drop for Queue<T> {
214 fn drop(&mut self) {
215 unsafe {
216 let mut cur = self.first;
217 while !cur.is_null() {
218 let next = (*cur).next.load(Relaxed);
219 let _n: Box<Node<T>> = cast::transmute(cur);
220 cur = next;
221 }
222 }
223 }
224 }
225
226 #[cfg(test)]
227 mod test {
228 use prelude::*;
229 use native;
230 use super::Queue;
231 use sync::arc::UnsafeArc;
232
233 #[test]
234 fn smoke() {
235 let mut q = Queue::new(0);
236 q.push(1);
237 q.push(2);
238 assert_eq!(q.pop(), Some(1));
239 assert_eq!(q.pop(), Some(2));
240 assert_eq!(q.pop(), None);
241 q.push(3);
242 q.push(4);
243 assert_eq!(q.pop(), Some(3));
244 assert_eq!(q.pop(), Some(4));
245 assert_eq!(q.pop(), None);
246 }
247
248 #[test]
249 fn drop_full() {
250 let mut q = Queue::new(0);
251 q.push(box 1);
252 q.push(box 2);
253 }
254
255 #[test]
256 fn smoke_bound() {
257 let mut q = Queue::new(1);
258 q.push(1);
259 q.push(2);
260 assert_eq!(q.pop(), Some(1));
261 assert_eq!(q.pop(), Some(2));
262 assert_eq!(q.pop(), None);
263 q.push(3);
264 q.push(4);
265 assert_eq!(q.pop(), Some(3));
266 assert_eq!(q.pop(), Some(4));
267 assert_eq!(q.pop(), None);
268 }
269
270 #[test]
271 fn stress() {
272 stress_bound(0);
273 stress_bound(1);
274
275 fn stress_bound(bound: uint) {
276 let (a, b) = UnsafeArc::new2(Queue::new(bound));
277 let (tx, rx) = channel();
278 native::task::spawn(proc() {
279 for _ in range(0, 100000) {
280 loop {
281 match unsafe { (*b.get()).pop() } {
282 Some(1) => break,
283 Some(_) => fail!(),
284 None => {}
285 }
286 }
287 }
288 tx.send(());
289 });
290 for _ in range(0, 100000) {
291 unsafe { (*a.get()).push(1); }
292 }
293 rx.recv();
294 }
295 }
296 }