(index<- ) ./libstd/comm/select.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Selection over an array of receivers
12 //!
13 //! This module contains the implementation machinery necessary for selecting
14 //! over a number of receivers. One large goal of this module is to provide an
15 //! efficient interface to selecting over any receiver of any type.
16 //!
17 //! This is achieved through an architecture of a "receiver set" in which
18 //! receivers are added to a set and then the entire set is waited on at once.
19 //! The set can be waited on multiple times to prevent re-adding each receiver
20 //! to the set.
21 //!
22 //! Usage of this module is currently encouraged to go through the use of the
23 //! `select!` macro. This macro allows naturally binding of variables to the
24 //! received values of receivers in a much more natural syntax then usage of the
25 //! `Select` structure directly.
26 //!
27 //! # Example
28 //!
29 //! ```rust
30 //! let (tx1, rx1) = channel();
31 //! let (tx2, rx2) = channel();
32 //!
33 //! tx1.send(1);
34 //! tx2.send(2);
35 //!
36 //! select! {
37 //! val = rx1.recv() => {
38 //! assert_eq!(val, 1);
39 //! },
40 //! val = rx2.recv() => {
41 //! assert_eq!(val, 2);
42 //! }
43 //! }
44 //! ```
45
46 #![allow(dead_code)]
47
48 use cast;
49 use cell::Cell;
50 use iter::Iterator;
51 use kinds::marker;
52 use kinds::Send;
53 use ops::Drop;
54 use option::{Some, None, Option};
55 use owned::Box;
56 use ptr::RawPtr;
57 use result::{Ok, Err, Result};
58 use rt::local::Local;
59 use rt::task::{Task, BlockedTask};
60 use super::Receiver;
61 use uint;
62
63 /// The "receiver set" of the select interface. This structure is used to manage
64 /// a set of receivers which are being selected over.
65 pub struct Select {
66 head: *mut Handle<'static, ()>,
67 tail: *mut Handle<'static, ()>,
68 next_id: Cell<uint>,
69 marker1: marker::NoSend,
70 }
71
72 /// A handle to a receiver which is currently a member of a `Select` set of
73 /// receivers. This handle is used to keep the receiver in the set as well as
74 /// interact with the underlying receiver.
75 pub struct Handle<'rx, T> {
76 /// The ID of this handle, used to compare against the return value of
77 /// `Select::wait()`
78 id: uint,
79 selector: &'rx Select,
80 next: *mut Handle<'static, ()>,
81 prev: *mut Handle<'static, ()>,
82 added: bool,
83 packet: &'rx Packet,
84
85 // due to our fun transmutes, we be sure to place this at the end. (nothing
86 // previous relies on T)
87 rx: &'rx Receiver<T>,
88 }
89
90 struct Packets { cur: *mut Handle<'static, ()> }
91
92 #[doc(hidden)]
93 pub trait Packet {
94 fn can_recv(&self) -> bool;
95 fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
96 fn abort_selection(&self) -> bool;
97 }
98
99 impl Select {
100 /// Creates a new selection structure. This set is initially empty and
101 /// `wait` will fail!() if called.
102 ///
103 /// Usage of this struct directly can sometimes be burdensome, and usage is
104 /// rather much easier through the `select!` macro.
105 pub fn new() -> Select {
106 Select {
107 marker1: marker::NoSend,
108 head: 0 as *mut Handle<'static, ()>,
109 tail: 0 as *mut Handle<'static, ()>,
110 next_id: Cell::new(1),
111 }
112 }
113
114 /// Creates a new handle into this receiver set for a new receiver. Note
115 /// that this does *not* add the receiver to the receiver set, for that you
116 /// must call the `add` method on the handle itself.
117 pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
118 let id = self.next_id.get();
119 self.next_id.set(id + 1);
120 Handle {
121 id: id,
122 selector: self,
123 next: 0 as *mut Handle<'static, ()>,
124 prev: 0 as *mut Handle<'static, ()>,
125 added: false,
126 rx: rx,
127 packet: rx,
128 }
129 }
130
131 /// Waits for an event on this receiver set. The returned value is *not* an
132 /// index, but rather an id. This id can be queried against any active
133 /// `Handle` structures (each one has an `id` method). The handle with
134 /// the matching `id` will have some sort of event available on it. The
135 /// event could either be that data is available or the corresponding
136 /// channel has been closed.
137 pub fn wait(&self) -> uint {
138 self.wait2(false)
139 }
140
141 /// Helper method for skipping the preflight checks during testing
142 fn wait2(&self, do_preflight_checks: bool) -> uint {
143 // Note that this is currently an inefficient implementation. We in
144 // theory have knowledge about all receivers in the set ahead of time,
145 // so this method shouldn't really have to iterate over all of them yet
146 // again. The idea with this "receiver set" interface is to get the
147 // interface right this time around, and later this implementation can
148 // be optimized.
149 //
150 // This implementation can be summarized by:
151 //
152 // fn select(receivers) {
153 // if any receiver ready { return ready index }
154 // deschedule {
155 // block on all receivers
156 // }
157 // unblock on all receivers
158 // return ready index
159 // }
160 //
161 // Most notably, the iterations over all of the receivers shouldn't be
162 // necessary.
163 unsafe {
164 let mut amt = 0;
165 for p in self.iter() {
166 amt += 1;
167 if do_preflight_checks && (*p).packet.can_recv() {
168 return (*p).id;
169 }
170 }
171 assert!(amt > 0);
172
173 let mut ready_index = amt;
174 let mut ready_id = uint::MAX;
175 let mut iter = self.iter().enumerate();
176
177 // Acquire a number of blocking contexts, and block on each one
178 // sequentially until one fails. If one fails, then abort
179 // immediately so we can go unblock on all the other receivers.
180 let task: Box<Task> = Local::take();
181 task.deschedule(amt, |task| {
182 // Prepare for the block
183 let (i, handle) = iter.next().unwrap();
184 match (*handle).packet.start_selection(task) {
185 Ok(()) => Ok(()),
186 Err(task) => {
187 ready_index = i;
188 ready_id = (*handle).id;
189 Err(task)
190 }
191 }
192 });
193
194 // Abort the selection process on each receiver. If the abort
195 // process returns `true`, then that means that the receiver is
196 // ready to receive some data. Note that this also means that the
197 // receiver may have yet to have fully read the `to_wake` field and
198 // woken us up (although the wakeup is guaranteed to fail).
199 //
200 // This situation happens in the window of where a sender invokes
201 // increment(), sees -1, and then decides to wake up the task. After
202 // all this is done, the sending thread will set `selecting` to
203 // `false`. Until this is done, we cannot return. If we were to
204 // return, then a sender could wake up a receiver which has gone
205 // back to sleep after this call to `select`.
206 //
207 // Note that it is a "fairly small window" in which an increment()
208 // views that it should wake a thread up until the `selecting` bit
209 // is set to false. For now, the implementation currently just spins
210 // in a yield loop. This is very distasteful, but this
211 // implementation is already nowhere near what it should ideally be.
212 // A rewrite should focus on avoiding a yield loop, and for now this
213 // implementation is tying us over to a more efficient "don't
214 // iterate over everything every time" implementation.
215 for handle in self.iter().take(ready_index) {
216 if (*handle).packet.abort_selection() {
217 ready_id = (*handle).id;
218 }
219 }
220
221 assert!(ready_id != uint::MAX);
222 return ready_id;
223 }
224 }
225
226 fn iter(&self) -> Packets { Packets { cur: self.head } }
227 }
228
229 impl<'rx, T: Send> Handle<'rx, T> {
230 /// Retrieve the id of this handle.
231 #[inline]
232 pub fn id(&self) -> uint { self.id }
233
234 /// Receive a value on the underlying receiver. Has the same semantics as
235 /// `Receiver.recv`
236 pub fn recv(&mut self) -> T { self.rx.recv() }
237 /// Block to receive a value on the underlying receiver, returning `Some` on
238 /// success or `None` if the channel disconnects. This function has the same
239 /// semantics as `Receiver.recv_opt`
240 pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
241
242 /// Adds this handle to the receiver set that the handle was created from. This
243 /// method can be called multiple times, but it has no effect if `add` was
244 /// called previously.
245 ///
246 /// This method is unsafe because it requires that the `Handle` is not moved
247 /// while it is added to the `Select` set.
248 pub unsafe fn add(&mut self) {
249 if self.added { return }
250 let selector: &mut Select = cast::transmute(&*self.selector);
251 let me: *mut Handle<'static, ()> = cast::transmute(&*self);
252
253 if selector.head.is_null() {
254 selector.head = me;
255 selector.tail = me;
256 } else {
257 (*me).prev = selector.tail;
258 assert!((*me).next.is_null());
259 (*selector.tail).next = me;
260 selector.tail = me;
261 }
262 self.added = true;
263 }
264
265 /// Removes this handle from the `Select` set. This method is unsafe because
266 /// it has no guarantee that the `Handle` was not moved since `add` was
267 /// called.
268 pub unsafe fn remove(&mut self) {
269 if !self.added { return }
270
271 let selector: &mut Select = cast::transmute(&*self.selector);
272 let me: *mut Handle<'static, ()> = cast::transmute(&*self);
273
274 if self.prev.is_null() {
275 assert_eq!(selector.head, me);
276 selector.head = self.next;
277 } else {
278 (*self.prev).next = self.next;
279 }
280 if self.next.is_null() {
281 assert_eq!(selector.tail, me);
282 selector.tail = self.prev;
283 } else {
284 (*self.next).prev = self.prev;
285 }
286
287 self.next = 0 as *mut Handle<'static, ()>;
288 self.prev = 0 as *mut Handle<'static, ()>;
289
290 self.added = false;
291 }
292 }
293
294 #[unsafe_destructor]
295 impl Drop for Select {
296 fn drop(&mut self) {
297 assert!(self.head.is_null());
298 assert!(self.tail.is_null());
299 }
300 }
301
302 #[unsafe_destructor]
303 impl<'rx, T: Send> Drop for Handle<'rx, T> {
304 fn drop(&mut self) {
305 unsafe { self.remove() }
306 }
307 }
308
309 impl Iterator<*mut Handle<'static, ()>> for Packets {
310 fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
311 if self.cur.is_null() {
312 None
313 } else {
314 let ret = Some(self.cur);
315 unsafe { self.cur = (*self.cur).next; }
316 ret
317 }
318 }
319 }
320
321 #[cfg(test)]
322 #[allow(unused_imports)]
323 mod test {
324 use super::super::*;
325 use prelude::*;
326
327 test!(fn smoke() {
328 let (tx1, rx1) = channel::<int>();
329 let (tx2, rx2) = channel::<int>();
330 tx1.send(1);
331 select! (
332 foo = rx1.recv() => { assert_eq!(foo, 1); },
333 _bar = rx2.recv() => { fail!() }
334 )
335 tx2.send(2);
336 select! (
337 _foo = rx1.recv() => { fail!() },
338 bar = rx2.recv() => { assert_eq!(bar, 2) }
339 )
340 drop(tx1);
341 select! (
342 foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
343 _bar = rx2.recv() => { fail!() }
344 )
345 drop(tx2);
346 select! (
347 bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
348 )
349 })
350
351 test!(fn smoke2() {
352 let (_tx1, rx1) = channel::<int>();
353 let (_tx2, rx2) = channel::<int>();
354 let (_tx3, rx3) = channel::<int>();
355 let (_tx4, rx4) = channel::<int>();
356 let (tx5, rx5) = channel::<int>();
357 tx5.send(4);
358 select! (
359 _foo = rx1.recv() => { fail!("1") },
360 _foo = rx2.recv() => { fail!("2") },
361 _foo = rx3.recv() => { fail!("3") },
362 _foo = rx4.recv() => { fail!("4") },
363 foo = rx5.recv() => { assert_eq!(foo, 4); }
364 )
365 })
366
367 test!(fn closed() {
368 let (_tx1, rx1) = channel::<int>();
369 let (tx2, rx2) = channel::<int>();
370 drop(tx2);
371
372 select! (
373 _a1 = rx1.recv_opt() => { fail!() },
374 a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
375 )
376 })
377
378 test!(fn unblocks() {
379 let (tx1, rx1) = channel::<int>();
380 let (_tx2, rx2) = channel::<int>();
381 let (tx3, rx3) = channel::<int>();
382
383 spawn(proc() {
384 for _ in range(0, 20) { task::deschedule(); }
385 tx1.send(1);
386 rx3.recv();
387 for _ in range(0, 20) { task::deschedule(); }
388 });
389
390 select! (
391 a = rx1.recv() => { assert_eq!(a, 1); },
392 _b = rx2.recv() => { fail!() }
393 )
394 tx3.send(1);
395 select! (
396 a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
397 _b = rx2.recv() => { fail!() }
398 )
399 })
400
401 test!(fn both_ready() {
402 let (tx1, rx1) = channel::<int>();
403 let (tx2, rx2) = channel::<int>();
404 let (tx3, rx3) = channel::<()>();
405
406 spawn(proc() {
407 for _ in range(0, 20) { task::deschedule(); }
408 tx1.send(1);
409 tx2.send(2);
410 rx3.recv();
411 });
412
413 select! (
414 a = rx1.recv() => { assert_eq!(a, 1); },
415 a = rx2.recv() => { assert_eq!(a, 2); }
416 )
417 select! (
418 a = rx1.recv() => { assert_eq!(a, 1); },
419 a = rx2.recv() => { assert_eq!(a, 2); }
420 )
421 assert_eq!(rx1.try_recv(), Err(Empty));
422 assert_eq!(rx2.try_recv(), Err(Empty));
423 tx3.send(());
424 })
425
426 test!(fn stress() {
427 static AMT: int = 10000;
428 let (tx1, rx1) = channel::<int>();
429 let (tx2, rx2) = channel::<int>();
430 let (tx3, rx3) = channel::<()>();
431
432 spawn(proc() {
433 for i in range(0, AMT) {
434 if i % 2 == 0 {
435 tx1.send(i);
436 } else {
437 tx2.send(i);
438 }
439 rx3.recv();
440 }
441 });
442
443 for i in range(0, AMT) {
444 select! (
445 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); },
446 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); }
447 )
448 tx3.send(());
449 }
450 })
451
452 test!(fn cloning() {
453 let (tx1, rx1) = channel::<int>();
454 let (_tx2, rx2) = channel::<int>();
455 let (tx3, rx3) = channel::<()>();
456
457 spawn(proc() {
458 rx3.recv();
459 tx1.clone();
460 assert_eq!(rx3.try_recv(), Err(Empty));
461 tx1.send(2);
462 rx3.recv();
463 });
464
465 tx3.send(());
466 select!(
467 _i1 = rx1.recv() => {},
468 _i2 = rx2.recv() => fail!()
469 )
470 tx3.send(());
471 })
472
473 test!(fn cloning2() {
474 let (tx1, rx1) = channel::<int>();
475 let (_tx2, rx2) = channel::<int>();
476 let (tx3, rx3) = channel::<()>();
477
478 spawn(proc() {
479 rx3.recv();
480 tx1.clone();
481 assert_eq!(rx3.try_recv(), Err(Empty));
482 tx1.send(2);
483 rx3.recv();
484 });
485
486 tx3.send(());
487 select!(
488 _i1 = rx1.recv() => {},
489 _i2 = rx2.recv() => fail!()
490 )
491 tx3.send(());
492 })
493
494 test!(fn cloning3() {
495 let (tx1, rx1) = channel::<()>();
496 let (tx2, rx2) = channel::<()>();
497 let (tx3, rx3) = channel::<()>();
498 spawn(proc() {
499 let s = Select::new();
500 let mut h1 = s.handle(&rx1);
501 let mut h2 = s.handle(&rx2);
502 unsafe { h2.add(); }
503 unsafe { h1.add(); }
504 assert_eq!(s.wait(), h2.id);
505 tx3.send(());
506 });
507
508 for _ in range(0, 1000) { task::deschedule(); }
509 drop(tx1.clone());
510 tx2.send(());
511 rx3.recv();
512 })
513
514 test!(fn preflight1() {
515 let (tx, rx) = channel();
516 tx.send(());
517 select!(
518 () = rx.recv() => {}
519 )
520 })
521
522 test!(fn preflight2() {
523 let (tx, rx) = channel();
524 tx.send(());
525 tx.send(());
526 select!(
527 () = rx.recv() => {}
528 )
529 })
530
531 test!(fn preflight3() {
532 let (tx, rx) = channel();
533 drop(tx.clone());
534 tx.send(());
535 select!(
536 () = rx.recv() => {}
537 )
538 })
539
540 test!(fn preflight4() {
541 let (tx, rx) = channel();
542 tx.send(());
543 let s = Select::new();
544 let mut h = s.handle(&rx);
545 unsafe { h.add(); }
546 assert_eq!(s.wait2(false), h.id);
547 })
548
549 test!(fn preflight5() {
550 let (tx, rx) = channel();
551 tx.send(());
552 tx.send(());
553 let s = Select::new();
554 let mut h = s.handle(&rx);
555 unsafe { h.add(); }
556 assert_eq!(s.wait2(false), h.id);
557 })
558
559 test!(fn preflight6() {
560 let (tx, rx) = channel();
561 drop(tx.clone());
562 tx.send(());
563 let s = Select::new();
564 let mut h = s.handle(&rx);
565 unsafe { h.add(); }
566 assert_eq!(s.wait2(false), h.id);
567 })
568
569 test!(fn preflight7() {
570 let (tx, rx) = channel::<()>();
571 drop(tx);
572 let s = Select::new();
573 let mut h = s.handle(&rx);
574 unsafe { h.add(); }
575 assert_eq!(s.wait2(false), h.id);
576 })
577
578 test!(fn preflight8() {
579 let (tx, rx) = channel();
580 tx.send(());
581 drop(tx);
582 rx.recv();
583 let s = Select::new();
584 let mut h = s.handle(&rx);
585 unsafe { h.add(); }
586 assert_eq!(s.wait2(false), h.id);
587 })
588
589 test!(fn preflight9() {
590 let (tx, rx) = channel();
591 drop(tx.clone());
592 tx.send(());
593 drop(tx);
594 rx.recv();
595 let s = Select::new();
596 let mut h = s.handle(&rx);
597 unsafe { h.add(); }
598 assert_eq!(s.wait2(false), h.id);
599 })
600
601 test!(fn oneshot_data_waiting() {
602 let (tx1, rx1) = channel();
603 let (tx2, rx2) = channel();
604 spawn(proc() {
605 select! {
606 () = rx1.recv() => {}
607 }
608 tx2.send(());
609 });
610
611 for _ in range(0, 100) { task::deschedule() }
612 tx1.send(());
613 rx2.recv();
614 })
615
616 test!(fn stream_data_waiting() {
617 let (tx1, rx1) = channel();
618 let (tx2, rx2) = channel();
619 tx1.send(());
620 tx1.send(());
621 rx1.recv();
622 rx1.recv();
623 spawn(proc() {
624 select! {
625 () = rx1.recv() => {}
626 }
627 tx2.send(());
628 });
629
630 for _ in range(0, 100) { task::deschedule() }
631 tx1.send(());
632 rx2.recv();
633 })
634
635 test!(fn shared_data_waiting() {
636 let (tx1, rx1) = channel();
637 let (tx2, rx2) = channel();
638 drop(tx1.clone());
639 tx1.send(());
640 rx1.recv();
641 spawn(proc() {
642 select! {
643 () = rx1.recv() => {}
644 }
645 tx2.send(());
646 });
647
648 for _ in range(0, 100) { task::deschedule() }
649 tx1.send(());
650 rx2.recv();
651 })
652
653 test!(fn sync1() {
654 let (tx, rx) = sync_channel(1);
655 tx.send(1);
656 select! {
657 n = rx.recv() => { assert_eq!(n, 1); }
658 }
659 })
660
661 test!(fn sync2() {
662 let (tx, rx) = sync_channel(0);
663 spawn(proc() {
664 for _ in range(0, 100) { task::deschedule() }
665 tx.send(1);
666 });
667 select! {
668 n = rx.recv() => { assert_eq!(n, 1); }
669 }
670 })
671
672 test!(fn sync3() {
673 let (tx1, rx1) = sync_channel(0);
674 let (tx2, rx2) = channel();
675 spawn(proc() { tx1.send(1); });
676 spawn(proc() { tx2.send(2); });
677 select! {
678 n = rx1.recv() => {
679 assert_eq!(n, 1);
680 assert_eq!(rx2.recv(), 2);
681 },
682 n = rx2.recv() => {
683 assert_eq!(n, 2);
684 assert_eq!(rx1.recv(), 1);
685 }
686 }
687 })
688 }
libstd/comm/select.rs:89:1-89:1 -struct- definition:
struct Packets { cur: *mut Handle<'static, ()> }
pub trait Packet {
fn can_recv(&self) -> bool;
references:- 3309: impl Iterator<*mut Handle<'static, ()>> for Packets {
310: fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
libstd/comm/select.rs:92:15-92:15 -trait- definition:
pub trait Packet {
fn can_recv(&self) -> bool;
fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>;
references:- 282: added: bool,
83: packet: &'rx Packet,
libstd/comm/mod.rs:
875: impl<T: Send> select::Packet for Receiver<T> {
876: fn can_recv(&self) -> bool {
libstd/comm/select.rs:74:43-74:43 -struct- definition:
/// interact with the underlying receiver.
pub struct Handle<'rx, T> {
/// The ID of this handle, used to compare against the return value of
references:- 19119: self.next_id.set(id + 1);
120: Handle {
121: id: id,
--
287: self.next = 0 as *mut Handle<'static, ()>;
288: self.prev = 0 as *mut Handle<'static, ()>;
--
309: impl Iterator<*mut Handle<'static, ()>> for Packets {
310: fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
311: if self.cur.is_null() {
libstd/comm/select.rs:64:54-64:54 -struct- definition:
/// a set of receivers which are being selected over.
pub struct Select {
head: *mut Handle<'static, ()>,
references:- 7105: pub fn new() -> Select {
106: Select {
107: marker1: marker::NoSend,
--
271: let selector: &mut Select = cast::transmute(&*self.selector);
272: let me: *mut Handle<'static, ()> = cast::transmute(&*self);
--
295: impl Drop for Select {
296: fn drop(&mut self) {