(index<- ) ./libstd/comm/shared.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /// Shared channels
12 ///
13 /// This is the flavor of channels which are not necessarily optimized for any
14 /// particular use case, but are the most general in how they are used. Shared
15 /// channels are cloneable allowing for multiple senders.
16 ///
17 /// High level implementation details can be found in the comment of the parent
18 /// module. You'll also note that the implementation of the shared and stream
19 /// channels are quite similar, and this is no coincidence!
20
21 use cmp;
22 use int;
23 use iter::Iterator;
24 use kinds::Send;
25 use ops::Drop;
26 use option::{Some, None, Option};
27 use owned::Box;
28 use result::{Ok, Err, Result};
29 use rt::local::Local;
30 use rt::task::{Task, BlockedTask};
31 use rt::thread::Thread;
32 use sync::atomics;
33 use unstable::mutex::NativeMutex;
34
35 use mpsc = sync::mpsc_queue;
36
37 static DISCONNECTED: int = int::MIN;
38 static FUDGE: int = 1024;
39 #[cfg(test)]
40 static MAX_STEALS: int = 5;
41 #[cfg(not(test))]
42 static MAX_STEALS: int = 1 << 20;
43
44 pub struct Packet<T> {
45 queue: mpsc::Queue<T>,
46 cnt: atomics::AtomicInt, // How many items are on this channel
47 steals: int, // How many times has a port received without blocking?
48 to_wake: atomics::AtomicUint, // Task to wake up
49
50 // The number of channels which are currently using this packet.
51 channels: atomics::AtomicInt,
52
53 // See the discussion in Port::drop and the channel send methods for what
54 // these are used for
55 port_dropped: atomics::AtomicBool,
56 sender_drain: atomics::AtomicInt,
57
58 // this lock protects various portions of this implementation during
59 // select()
60 select_lock: NativeMutex,
61 }
62
63 pub enum Failure {
64 Empty,
65 Disconnected,
66 }
67
68 impl<T: Send> Packet<T> {
69 // Creation of a packet *must* be followed by a call to inherit_blocker
70 pub fn new() -> Packet<T> {
71 let p = Packet {
72 queue: mpsc::Queue::new(),
73 cnt: atomics::AtomicInt::new(0),
74 steals: 0,
75 to_wake: atomics::AtomicUint::new(0),
76 channels: atomics::AtomicInt::new(2),
77 port_dropped: atomics::AtomicBool::new(false),
78 sender_drain: atomics::AtomicInt::new(0),
79 select_lock: unsafe { NativeMutex::new() },
80 };
81 // see comments in inherit_blocker about why we grab this lock
82 unsafe { p.select_lock.lock_noguard() }
83 return p;
84 }
85
86 // This function is used at the creation of a shared packet to inherit a
87 // previously blocked task. This is done to prevent spurious wakeups of
88 // tasks in select().
89 //
90 // This can only be called at channel-creation time
91 pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
92 match task {
93 Some(task) => {
94 assert_eq!(self.cnt.load(atomics::SeqCst), 0);
95 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
96 self.to_wake.store(unsafe { task.cast_to_uint() },
97 atomics::SeqCst);
98 self.cnt.store(-1, atomics::SeqCst);
99
100 // This store is a little sketchy. What's happening here is
101 // that we're transferring a blocker from a oneshot or stream
102 // channel to this shared channel. In doing so, we never
103 // spuriously wake them up and rather only wake them up at the
104 // appropriate time. This implementation of shared channels
105 // assumes that any blocking recv() will undo the increment of
106 // steals performed in try_recv() once the recv is complete.
107 // This thread that we're inheriting, however, is not in the
108 // middle of recv. Hence, the first time we wake them up,
109 // they're going to wake up from their old port, move on to the
110 // upgraded port, and then call the block recv() function.
111 //
112 // When calling this function, they'll find there's data
113 // immediately available, counting it as a steal. This in fact
114 // wasn't a steal because we appropriately blocked them waiting
115 // for data.
116 //
117 // To offset this bad increment, we initially set the steal
118 // count to -1. You'll find some special code in
119 // abort_selection() as well to ensure that this -1 steal count
120 // doesn't escape too far.
121 self.steals = -1;
122 }
123 None => {}
124 }
125
126 // When the shared packet is constructed, we grabbed this lock. The
127 // purpose of this lock is to ensure that abort_selection() doesn't
128 // interfere with this method. After we unlock this lock, we're
129 // signifying that we're done modifying self.cnt and self.to_wake and
130 // the port is ready for the world to continue using it.
131 unsafe { self.select_lock.unlock_noguard() }
132 }
133
134 pub fn send(&mut self, t: T) -> Result<(), T> {
135 // See Port::drop for what's going on
136 if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
137
138 // Note that the multiple sender case is a little tricker
139 // semantically than the single sender case. The logic for
140 // incrementing is "add and if disconnected store disconnected".
141 // This could end up leading some senders to believe that there
142 // wasn't a disconnect if in fact there was a disconnect. This means
143 // that while one thread is attempting to re-store the disconnected
144 // states, other threads could walk through merrily incrementing
145 // this very-negative disconnected count. To prevent senders from
146 // spuriously attempting to send when the channels is actually
147 // disconnected, the count has a ranged check here.
148 //
149 // This is also done for another reason. Remember that the return
150 // value of this function is:
151 //
152 // `true` == the data *may* be received, this essentially has no
153 // meaning
154 // `false` == the data will *never* be received, this has a lot of
155 // meaning
156 //
157 // In the SPSC case, we have a check of 'queue.is_empty()' to see
158 // whether the data was actually received, but this same condition
159 // means nothing in a multi-producer context. As a result, this
160 // preflight check serves as the definitive "this will never be
161 // received". Once we get beyond this check, we have permanently
162 // entered the realm of "this may be received"
163 if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE {
164 return Err(t)
165 }
166
167 self.queue.push(t);
168 match self.cnt.fetch_add(1, atomics::SeqCst) {
169 -1 => {
170 self.take_to_wake().wake().map(|t| t.reawaken());
171 }
172
173 // In this case, we have possibly failed to send our data, and
174 // we need to consider re-popping the data in order to fully
175 // destroy it. We must arbitrate among the multiple senders,
176 // however, because the queues that we're using are
177 // single-consumer queues. In order to do this, all exiting
178 // pushers will use an atomic count in order to count those
179 // flowing through. Pushers who see 0 are required to drain as
180 // much as possible, and then can only exit when they are the
181 // only pusher (otherwise they must try again).
182 n if n < DISCONNECTED + FUDGE => {
183 // see the comment in 'try' for a shared channel for why this
184 // window of "not disconnected" is ok.
185 self.cnt.store(DISCONNECTED, atomics::SeqCst);
186
187 if self.sender_drain.fetch_add(1, atomics::SeqCst) == 0 {
188 loop {
189 // drain the queue, for info on the thread yield see the
190 // discussion in try_recv
191 loop {
192 match self.queue.pop() {
193 mpsc::Data(..) => {}
194 mpsc::Empty => break,
195 mpsc::Inconsistent => Thread::yield_now(),
196 }
197 }
198 // maybe we're done, if we're not the last ones
199 // here, then we need to go try again.
200 if self.sender_drain.fetch_sub(1, atomics::SeqCst) == 1 {
201 break
202 }
203 }
204
205 // At this point, there may still be data on the queue,
206 // but only if the count hasn't been incremented and
207 // some other sender hasn't finished pushing data just
208 // yet. That sender in question will drain its own data.
209 }
210 }
211
212 // Can't make any assumptions about this case like in the SPSC case.
213 _ => {}
214 }
215
216 Ok(())
217 }
218
219 pub fn recv(&mut self) -> Result<T, Failure> {
220 // This code is essentially the exact same as that found in the stream
221 // case (see stream.rs)
222 match self.try_recv() {
223 Err(Empty) => {}
224 data => return data,
225 }
226
227 let task: Box<Task> = Local::take();
228 task.deschedule(1, |task| {
229 self.decrement(task)
230 });
231
232 match self.try_recv() {
233 data @ Ok(..) => { self.steals -= 1; data }
234 data => data,
235 }
236 }
237
238 // Essentially the exact same thing as the stream decrement function.
239 fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
240 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
241 let n = unsafe { task.cast_to_uint() };
242 self.to_wake.store(n, atomics::SeqCst);
243
244 let steals = self.steals;
245 self.steals = 0;
246
247 match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
248 DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
249 // If we factor in our steals and notice that the channel has no
250 // data, we successfully sleep
251 n => {
252 assert!(n >= 0);
253 if n - steals <= 0 { return Ok(()) }
254 }
255 }
256
257 self.to_wake.store(0, atomics::SeqCst);
258 Err(unsafe { BlockedTask::cast_from_uint(n) })
259 }
260
261 pub fn try_recv(&mut self) -> Result<T, Failure> {
262 let ret = match self.queue.pop() {
263 mpsc::Data(t) => Some(t),
264 mpsc::Empty => None,
265
266 // This is a bit of an interesting case. The channel is
267 // reported as having data available, but our pop() has
268 // failed due to the queue being in an inconsistent state.
269 // This means that there is some pusher somewhere which has
270 // yet to complete, but we are guaranteed that a pop will
271 // eventually succeed. In this case, we spin in a yield loop
272 // because the remote sender should finish their enqueue
273 // operation "very quickly".
274 //
275 // Note that this yield loop does *not* attempt to do a green
276 // yield (regardless of the context), but *always* performs an
277 // OS-thread yield. The reasoning for this is that the pusher in
278 // question which is causing the inconsistent state is
279 // guaranteed to *not* be a blocked task (green tasks can't get
280 // pre-empted), so it must be on a different OS thread. Also,
281 // `try_recv` is normally a "guaranteed no rescheduling" context
282 // in a green-thread situation. By yielding control of the
283 // thread, we will hopefully allow time for the remote task on
284 // the other OS thread to make progress.
285 //
286 // Avoiding this yield loop would require a different queue
287 // abstraction which provides the guarantee that after M
288 // pushes have succeeded, at least M pops will succeed. The
289 // current queues guarantee that if there are N active
290 // pushes, you can pop N times once all N have finished.
291 mpsc::Inconsistent => {
292 let data;
293 loop {
294 Thread::yield_now();
295 match self.queue.pop() {
296 mpsc::Data(t) => { data = t; break }
297 mpsc::Empty => fail!("inconsistent => empty"),
298 mpsc::Inconsistent => {}
299 }
300 }
301 Some(data)
302 }
303 };
304 match ret {
305 // See the discussion in the stream implementation for why we
306 // might decrement steals.
307 Some(data) => {
308 if self.steals > MAX_STEALS {
309 match self.cnt.swap(0, atomics::SeqCst) {
310 DISCONNECTED => {
311 self.cnt.store(DISCONNECTED, atomics::SeqCst);
312 }
313 n => {
314 let m = cmp::min(n, self.steals);
315 self.steals -= m;
316 self.bump(n - m);
317 }
318 }
319 assert!(self.steals >= 0);
320 }
321 self.steals += 1;
322 Ok(data)
323 }
324
325 // See the discussion in the stream implementation for why we try
326 // again.
327 None => {
328 match self.cnt.load(atomics::SeqCst) {
329 n if n != DISCONNECTED => Err(Empty),
330 _ => {
331 match self.queue.pop() {
332 mpsc::Data(t) => Ok(t),
333 mpsc::Empty => Err(Disconnected),
334 // with no senders, an inconsistency is impossible.
335 mpsc::Inconsistent => unreachable!(),
336 }
337 }
338 }
339 }
340 }
341 }
342
343 // Prepares this shared packet for a channel clone, essentially just bumping
344 // a refcount.
345 pub fn clone_chan(&mut self) {
346 self.channels.fetch_add(1, atomics::SeqCst);
347 }
348
349 // Decrement the reference count on a channel. This is called whenever a
350 // Chan is dropped and may end up waking up a receiver. It's the receiver's
351 // responsibility on the other end to figure out that we've disconnected.
352 pub fn drop_chan(&mut self) {
353 match self.channels.fetch_sub(1, atomics::SeqCst) {
354 1 => {}
355 n if n > 1 => return,
356 n => fail!("bad number of channels left {}", n),
357 }
358
359 match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
360 -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
361 DISCONNECTED => {}
362 n => { assert!(n >= 0); }
363 }
364 }
365
366 // See the long discussion inside of stream.rs for why the queue is drained,
367 // and why it is done in this fashion.
368 pub fn drop_port(&mut self) {
369 self.port_dropped.store(true, atomics::SeqCst);
370 let mut steals = self.steals;
371 while {
372 let cnt = self.cnt.compare_and_swap(
373 steals, DISCONNECTED, atomics::SeqCst);
374 cnt != DISCONNECTED && cnt != steals
375 } {
376 // See the discussion in 'try_recv' for why we yield
377 // control of this thread.
378 loop {
379 match self.queue.pop() {
380 mpsc::Data(..) => { steals += 1; }
381 mpsc::Empty | mpsc::Inconsistent => break,
382 }
383 }
384 }
385 }
386
387 // Consumes ownership of the 'to_wake' field.
388 fn take_to_wake(&mut self) -> BlockedTask {
389 let task = self.to_wake.load(atomics::SeqCst);
390 self.to_wake.store(0, atomics::SeqCst);
391 assert!(task != 0);
392 unsafe { BlockedTask::cast_from_uint(task) }
393 }
394
395 ////////////////////////////////////////////////////////////////////////////
396 // select implementation
397 ////////////////////////////////////////////////////////////////////////////
398
399 // Helper function for select, tests whether this port can receive without
400 // blocking (obviously not an atomic decision).
401 //
402 // This is different than the stream version because there's no need to peek
403 // at the queue, we can just look at the local count.
404 pub fn can_recv(&mut self) -> bool {
405 let cnt = self.cnt.load(atomics::SeqCst);
406 cnt == DISCONNECTED || cnt - self.steals > 0
407 }
408
409 // increment the count on the channel (used for selection)
410 fn bump(&mut self, amt: int) -> int {
411 match self.cnt.fetch_add(amt, atomics::SeqCst) {
412 DISCONNECTED => {
413 self.cnt.store(DISCONNECTED, atomics::SeqCst);
414 DISCONNECTED
415 }
416 n => n
417 }
418 }
419
420 // Inserts the blocked task for selection on this port, returning it back if
421 // the port already has data on it.
422 //
423 // The code here is the same as in stream.rs, except that it doesn't need to
424 // peek at the channel to see if an upgrade is pending.
425 pub fn start_selection(&mut self,
426 task: BlockedTask) -> Result<(), BlockedTask> {
427 match self.decrement(task) {
428 Ok(()) => Ok(()),
429 Err(task) => {
430 let prev = self.bump(1);
431 assert!(prev == DISCONNECTED || prev >= 0);
432 return Err(task);
433 }
434 }
435 }
436
437 // Cancels a previous task waiting on this port, returning whether there's
438 // data on the port.
439 //
440 // This is similar to the stream implementation (hence fewer comments), but
441 // uses a different value for the "steals" variable.
442 pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
443 // Before we do anything else, we bounce on this lock. The reason for
444 // doing this is to ensure that any upgrade-in-progress is gone and
445 // done with. Without this bounce, we can race with inherit_blocker
446 // about looking at and dealing with to_wake. Once we have acquired the
447 // lock, we are guaranteed that inherit_blocker is done.
448 unsafe {
449 let _guard = self.select_lock.lock();
450 }
451
452 // Like the stream implementation, we want to make sure that the count
453 // on the channel goes non-negative. We don't know how negative the
454 // stream currently is, so instead of using a steal value of 1, we load
455 // the channel count and figure out what we should do to make it
456 // positive.
457 let steals = {
458 let cnt = self.cnt.load(atomics::SeqCst);
459 if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
460 };
461 let prev = self.bump(steals + 1);
462
463 if prev == DISCONNECTED {
464 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
465 true
466 } else {
467 let cur = prev + steals + 1;
468 assert!(cur >= 0);
469 if prev < 0 {
470 self.take_to_wake().trash();
471 } else {
472 while self.to_wake.load(atomics::SeqCst) != 0 {
473 Thread::yield_now();
474 }
475 }
476 // if the number of steals is -1, it was the pre-emptive -1 steal
477 // count from when we inherited a blocker. This is fine because
478 // we're just going to overwrite it with a real value.
479 assert!(self.steals == 0 || self.steals == -1);
480 self.steals = steals;
481 prev >= 0
482 }
483 }
484 }
485
486 #[unsafe_destructor]
487 impl<T: Send> Drop for Packet<T> {
488 fn drop(&mut self) {
489 // Note that this load is not only an assert for correctness about
490 // disconnection, but also a proper fence before the read of
491 // `to_wake`, so this assert cannot be removed with also removing
492 // the `to_wake` assert.
493 assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
494 assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
495 assert_eq!(self.channels.load(atomics::SeqCst), 0);
496 }
497 }
libstd/comm/shared.rs:62:1-62:1 -enum- definition:
pub enum Failure {
Empty,
Disconnected,
references:- 2219: pub fn recv(&mut self) -> Result<T, Failure> {
220: // This code is essentially the exact same as that found in the stream
--
261: pub fn try_recv(&mut self) -> Result<T, Failure> {
262: let ret = match self.queue.pop() {
libstd/comm/shared.rs:43:1-43:1 -struct- definition:
pub struct Packet<T> {
queue: mpsc::Queue<T>,
cnt: atomics::AtomicInt, // How many items are on this channel
references:- 570: pub fn new() -> Packet<T> {
71: let p = Packet {
72: queue: mpsc::Queue::new(),
--
487: impl<T: Send> Drop for Packet<T> {
488: fn drop(&mut self) {
libstd/comm/mod.rs:
390: Stream(UnsafeArc<stream::Packet<T>>),
391: Shared(UnsafeArc<shared::Packet<T>>),
392: Sync(UnsafeArc<sync::Packet<T>>),
libstd/comm/shared.rs:
68: impl<T: Send> Packet<T> {
69: // Creation of a packet *must* be followed by a call to inherit_blocker