1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2 * Redistribution and use in source and binary forms, with or without
3 * modification, are permitted provided that the following conditions are met:
4 *
5 * 1. Redistributions of source code must retain the above copyright notice,
6 * this list of conditions and the following disclaimer.
7 *
8 * 2. Redistributions in binary form must reproduce the above copyright
9 * notice, this list of conditions and the following disclaimer in the
10 * documentation and/or other materials provided with the distribution.
11 *
12 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15 * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22 *
23 * The views and conclusions contained in the software and documentation are
24 * those of the authors and should not be interpreted as representing official
25 * policies, either expressed or implied, of Dmitry Vyukov.
26 */
27
28 #![allow(missing_doc, dead_code)]
29
30 // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
31
32 use clone::Clone;
33 use kinds::Send;
34 use num::next_power_of_two;
35 use option::{Option, Some, None};
36 use sync::arc::UnsafeArc;
37 use sync::atomics::{AtomicUint,Relaxed,Release,Acquire};
38 use vec::Vec;
39
40 struct Node<T> {
41 sequence: AtomicUint,
42 value: Option<T>,
43 }
44
45 struct State<T> {
46 pad0: [u8, ..64],
47 buffer: Vec<Node<T>>,
48 mask: uint,
49 pad1: [u8, ..64],
50 enqueue_pos: AtomicUint,
51 pad2: [u8, ..64],
52 dequeue_pos: AtomicUint,
53 pad3: [u8, ..64],
54 }
55
56 pub struct Queue<T> {
57 state: UnsafeArc<State<T>>,
58 }
59
60 impl<T: Send> State<T> {
61 fn with_capacity(capacity: uint) -> State<T> {
62 let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
63 if capacity < 2 {
64 2u
65 } else {
66 // use next power of 2 as capacity
67 next_power_of_two(capacity)
68 }
69 } else {
70 capacity
71 };
72 let buffer = Vec::from_fn(capacity, |i| {
73 Node { sequence:AtomicUint::new(i), value: None }
74 });
75 State{
76 pad0: [0, ..64],
77 buffer: buffer,
78 mask: capacity-1,
79 pad1: [0, ..64],
80 enqueue_pos: AtomicUint::new(0),
81 pad2: [0, ..64],
82 dequeue_pos: AtomicUint::new(0),
83 pad3: [0, ..64],
84 }
85 }
86
87 fn push(&mut self, value: T) -> bool {
88 let mask = self.mask;
89 let mut pos = self.enqueue_pos.load(Relaxed);
90 loop {
91 let node = self.buffer.get_mut(pos & mask);
92 let seq = node.sequence.load(Acquire);
93 let diff: int = seq as int - pos as int;
94
95 if diff == 0 {
96 let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
97 if enqueue_pos == pos {
98 node.value = Some(value);
99 node.sequence.store(pos+1, Release);
100 break
101 } else {
102 pos = enqueue_pos;
103 }
104 } else if diff < 0 {
105 return false
106 } else {
107 pos = self.enqueue_pos.load(Relaxed);
108 }
109 }
110 true
111 }
112
113 fn pop(&mut self) -> Option<T> {
114 let mask = self.mask;
115 let mut pos = self.dequeue_pos.load(Relaxed);
116 loop {
117 let node = self.buffer.get_mut(pos & mask);
118 let seq = node.sequence.load(Acquire);
119 let diff: int = seq as int - (pos + 1) as int;
120 if diff == 0 {
121 let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
122 if dequeue_pos == pos {
123 let value = node.value.take();
124 node.sequence.store(pos + mask + 1, Release);
125 return value
126 } else {
127 pos = dequeue_pos;
128 }
129 } else if diff < 0 {
130 return None
131 } else {
132 pos = self.dequeue_pos.load(Relaxed);
133 }
134 }
135 }
136 }
137
138 impl<T: Send> Queue<T> {
139 pub fn with_capacity(capacity: uint) -> Queue<T> {
140 Queue{
141 state: UnsafeArc::new(State::with_capacity(capacity))
142 }
143 }
144
145 pub fn push(&mut self, value: T) -> bool {
146 unsafe { (*self.state.get()).push(value) }
147 }
148
149 pub fn pop(&mut self) -> Option<T> {
150 unsafe { (*self.state.get()).pop() }
151 }
152 }
153
154 impl<T: Send> Clone for Queue<T> {
155 fn clone(&self) -> Queue<T> {
156 Queue {
157 state: self.state.clone()
158 }
159 }
160 }
161
162 #[cfg(test)]
163 mod tests {
164 use prelude::*;
165 use super::Queue;
166 use native;
167
168 #[test]
169 fn test() {
170 let nthreads = 8u;
171 let nmsgs = 1000u;
172 let mut q = Queue::with_capacity(nthreads*nmsgs);
173 assert_eq!(None, q.pop());
174 let (tx, rx) = channel();
175
176 for _ in range(0, nthreads) {
177 let q = q.clone();
178 let tx = tx.clone();
179 native::task::spawn(proc() {
180 let mut q = q;
181 for i in range(0, nmsgs) {
182 assert!(q.push(i));
183 }
184 tx.send(());
185 });
186 }
187
188 let mut completion_rxs = vec![];
189 for _ in range(0, nthreads) {
190 let (tx, rx) = channel();
191 completion_rxs.push(rx);
192 let q = q.clone();
193 native::task::spawn(proc() {
194 let mut q = q;
195 let mut i = 0u;
196 loop {
197 match q.pop() {
198 None => {},
199 Some(_) => {
200 i += 1;
201 if i == nmsgs { break }
202 }
203 }
204 }
205 tx.send(i);
206 });
207 }
208
209 for rx in completion_rxs.mut_iter() {
210 assert_eq!(nmsgs, rx.recv());
211 }
212 for _ in range(0, nthreads) {
213 rx.recv();
214 }
215 }
216 }
libstd/sync/mpmc_bounded_queue.rs:39:1-39:1 -struct- definition:
struct Node<T> {
sequence: AtomicUint,
value: Option<T>,
references:- 272: let buffer = Vec::from_fn(capacity, |i| {
73: Node { sequence:AtomicUint::new(i), value: None }
74: });
libstd/sync/mpmc_bounded_queue.rs:44:1-44:1 -struct- definition:
struct State<T> {
pad0: [u8, ..64],
buffer: Vec<Node<T>>,
references:- 474: });
75: State{
76: pad0: [0, ..64],
libstd/sync/mpmc_bounded_queue.rs:55:1-55:1 -struct- definition:
pub struct Queue<T> {
state: UnsafeArc<State<T>>,
}
references:- 6155: fn clone(&self) -> Queue<T> {
156: Queue {
157: state: self.state.clone()