(index<- ) ./libstd/select.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 #[allow(missing_doc)];
12
13 use cell::Cell;
14 use comm;
15 use container::Container;
16 use iter::{Iterator, DoubleEndedIterator};
17 use option::*;
18 // use either::{Either, Left, Right};
19 // use rt::kill::BlockedTask;
20 use rt::local::Local;
21 use rt::shouldnt_be_public::{EventLoop, Scheduler, SelectInner, SelectPortInner};
22 use task;
23 use unstable::finally::Finally;
24 use vec::{OwnedVector, MutableVector};
25
26 /// Trait for message-passing primitives that can be select()ed on.
27 pub trait Select : SelectInner { }
28
29 /// Trait for message-passing primitives that can use the select2() convenience wrapper.
30 // (This is separate from the above trait to enable heterogeneous lists of ports
31 // that implement Select on different types to use select().)
32 pub trait SelectPort<T> : SelectPortInner<T> { }
33
34 /// Receive a message from any one of many ports at once. Returns the index of the
35 /// port whose data is ready. (If multiple are ready, returns the lowest index.)
36 pub fn select<A: Select>(ports: &mut [A]) -> uint {
37 if ports.is_empty() {
38 fail2!("can't select on an empty list");
39 }
40
41 for (index, port) in ports.mut_iter().enumerate() {
42 if port.optimistic_check() {
43 return index;
44 }
45 }
46
47 // If one of the ports already contains data when we go to block on it, we
48 // don't bother enqueueing on the rest of them, so we shouldn't bother
49 // unblocking from it either. This is just for efficiency, not correctness.
50 // (If not, we need to unblock from all of them. Length is a placeholder.)
51 let mut ready_index = ports.len();
52
53 // XXX: We're using deschedule...and_then in an unsafe way here (see #8132),
54 // in that we need to continue mutating the ready_index in the environment
55 // after letting the task get woken up. The and_then closure needs to delay
56 // the task from resuming until all ports have become blocked_on.
57 let (p,c) = comm::oneshot();
58 let p = Cell::new(p);
59 let c = Cell::new(c);
60
61 do (|| {
62 let c = Cell::new(c.take());
63 let sched: ~Scheduler = Local::take();
64 do sched.deschedule_running_task_and_then |sched, task| {
65 let task_handles = task.make_selectable(ports.len());
66
67 for (index, (port, task_handle)) in
68 ports.mut_iter().zip(task_handles.move_iter()).enumerate() {
69 // If one of the ports has data by now, it will wake the handle.
70 if port.block_on(sched, task_handle) {
71 ready_index = index;
72 break;
73 }
74 }
75
76 let c = Cell::new(c.take());
77 do sched.event_loop.callback { c.take().send_deferred(()) }
78 }
79 }).finally {
80 let p = Cell::new(p.take());
81 // Unkillable is necessary not because getting killed is dangerous here,
82 // but to force the recv not to use the same kill-flag that we used for
83 // selecting. Otherwise a user-sender could spuriously wakeup us here.
84 do task::unkillable { p.take().recv(); }
85 }
86
87 // Task resumes. Now unblock ourselves from all the ports we blocked on.
88 // If the success index wasn't reset, 'take' will just take all of them.
89 // Iterate in reverse so the 'earliest' index that's ready gets returned.
90 for (index, port) in ports.mut_slice(0, ready_index).mut_iter().enumerate().invert() {
91 if port.unblock_from() {
92 ready_index = index;
93 }
94 }
95
96 assert!(ready_index < ports.len());
97 return ready_index;
98 }
99
100 /* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
101
102 impl <'self> Select for &'self mut Select {
103 fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
104 fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
105 self.block_on(sched, task)
106 }
107 fn unblock_from(&mut self) -> bool { self.unblock_from() }
108 }
109
110 pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
111 -> Either<(Option<TA>, B), (A, Option<TB>)> {
112 let result = {
113 let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
114 select(ports)
115 };
116 match result {
117 0 => Left ((a.recv_ready(), b)),
118 1 => Right((a, b.recv_ready())),
119 x => fail2!("impossible case in select2: {:?}", x)
120 }
121 }
122
123 */
124
125 #[cfg(test)]
126 mod test {
127 use super::*;
128 use clone::Clone;
129 use num::Times;
130 use option::*;
131 use rt::comm::*;
132 use rt::test::*;
133 use vec::*;
134 use comm::GenericChan;
135 use task;
136 use cell::Cell;
137 use iter::{Iterator, range};
138
139 #[test] #[should_fail]
140 fn select_doesnt_get_trolled() {
141 select::<PortOne<()>>([]);
142 }
143
144 /* non-blocking select tests */
145
146 #[cfg(test)]
147 fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
148 // Unfortunately this does not actually test the block_on early-break
149 // codepath in select -- racing between the sender and the receiver in
150 // separate tasks is necessary to get around the optimistic check.
151 let (ports, chans) = unzip(range(0, num_ports).map(|_| oneshot::<()>()));
152 let mut dead_chans = ~[];
153 let mut ports = ports;
154 for (i, chan) in chans.move_iter().enumerate() {
155 if send_on_chans.contains(&i) {
156 chan.send(());
157 } else {
158 dead_chans.push(chan);
159 }
160 }
161 let ready_index = select(ports);
162 assert!(send_on_chans.contains(&ready_index));
163 assert!(ports.swap_remove(ready_index).recv_ready().is_some());
164 let _ = dead_chans;
165
166 // Same thing with streams instead.
167 // FIXME(#7971): This should be in a macro but borrowck isn't smart enough.
168 let (ports, chans) = unzip(range(0, num_ports).map(|_| stream::<()>()));
169 let mut dead_chans = ~[];
170 let mut ports = ports;
171 for (i, chan) in chans.move_iter().enumerate() {
172 if send_on_chans.contains(&i) {
173 chan.send(());
174 } else {
175 dead_chans.push(chan);
176 }
177 }
178 let ready_index = select(ports);
179 assert!(send_on_chans.contains(&ready_index));
180 assert!(ports.swap_remove(ready_index).recv_ready().is_some());
181 let _ = dead_chans;
182 }
183
184 #[test]
185 fn select_one() {
186 do run_in_newsched_task { select_helper(1, [0]) }
187 }
188
189 #[test]
190 fn select_two() {
191 // NB. I would like to have a test that tests the first one that is
192 // ready is the one that's returned, but that can't be reliably tested
193 // with the randomized behaviour of optimistic_check.
194 do run_in_newsched_task { select_helper(2, [1]) }
195 do run_in_newsched_task { select_helper(2, [0]) }
196 do run_in_newsched_task { select_helper(2, [1,0]) }
197 }
198
199 #[test]
200 fn select_a_lot() {
201 do run_in_newsched_task { select_helper(12, [7,8,9]) }
202 }
203
204 #[test]
205 fn select_stream() {
206 use util;
207 use comm::GenericChan;
208
209 // Sends 10 buffered packets, and uses select to retrieve them all.
210 // Puts the port in a different spot in the vector each time.
211 do run_in_newsched_task {
212 let (ports, _) = unzip(range(0u, 10).map(|_| stream::<int>()));
213 let (port, chan) = stream();
214 do 10.times { chan.send(31337); }
215 let mut ports = ports;
216 let mut port = Some(port);
217 let order = [5u,0,4,3,2,6,9,8,7,1];
218 for &index in order.iter() {
219 // put the port in the vector at any index
220 util::swap(port.get_mut_ref(), &mut ports[index]);
221 assert!(select(ports) == index);
222 // get it back out
223 util::swap(port.get_mut_ref(), &mut ports[index]);
224 // NB. Not recv(), because optimistic_check randomly fails.
225 assert!(port.get_ref().recv_ready().unwrap() == 31337);
226 }
227 }
228 }
229
230 #[test]
231 fn select_unkillable() {
232 do run_in_newsched_task {
233 do task::unkillable { select_helper(2, [1]) }
234 }
235 }
236
237 /* blocking select tests */
238
239 #[test]
240 fn select_blocking() {
241 select_blocking_helper(true);
242 select_blocking_helper(false);
243
244 fn select_blocking_helper(killable: bool) {
245 do run_in_newsched_task {
246 let (p1,_c) = oneshot();
247 let (p2,c2) = oneshot();
248 let mut ports = [p1,p2];
249
250 let (p3,c3) = oneshot();
251 let (p4,c4) = oneshot();
252
253 let x = Cell::new((c2, p3, c4));
254 do task::spawn {
255 let (c2, p3, c4) = x.take();
256 p3.recv(); // handshake parent
257 c4.send(()); // normal receive
258 task::deschedule();
259 c2.send(()); // select receive
260 }
261
262 // Try to block before child sends on c2.
263 c3.send(());
264 p4.recv();
265 if killable {
266 assert!(select(ports) == 1);
267 } else {
268 do task::unkillable { assert!(select(ports) == 1); }
269 }
270 }
271 }
272 }
273
274 #[test]
275 fn select_racing_senders() {
276 static NUM_CHANS: uint = 10;
277
278 select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]);
279 select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]);
280 select_racing_senders_helper(true, ~[0,1,2]);
281 select_racing_senders_helper(false, ~[0,1,2]);
282 select_racing_senders_helper(true, ~[3,4,5,6]);
283 select_racing_senders_helper(false, ~[3,4,5,6]);
284 select_racing_senders_helper(true, ~[7,8,9]);
285 select_racing_senders_helper(false, ~[7,8,9]);
286
287 fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
288 use rt::test::spawntask_random;
289
290 do run_in_newsched_task {
291 // A bit of stress, since ordinarily this is just smoke and mirrors.
292 do 4.times {
293 let send_on_chans = send_on_chans.clone();
294 do task::spawn {
295 let mut ports = ~[];
296 for i in range(0u, NUM_CHANS) {
297 let (p,c) = oneshot();
298 ports.push(p);
299 if send_on_chans.contains(&i) {
300 let c = Cell::new(c);
301 do spawntask_random {
302 task::deschedule();
303 c.take().send(());
304 }
305 }
306 }
307 // nondeterministic result, but should succeed
308 if killable {
309 select(ports);
310 } else {
311 do task::unkillable { select(ports); }
312 }
313 }
314 }
315 }
316 }
317 }
318
319 #[test]
320 fn select_killed() {
321 do run_in_newsched_task {
322 let (success_p, success_c) = oneshot::<bool>();
323 let success_c = Cell::new(success_c);
324 do task::try {
325 let success_c = Cell::new(success_c.take());
326 do task::unkillable {
327 let (p,c) = oneshot();
328 let c = Cell::new(c);
329 do task::spawn {
330 let (dead_ps, dead_cs) = unzip(range(0u, 5).map(|_| oneshot::<()>()));
331 let mut ports = dead_ps;
332 select(ports); // should get killed; nothing should leak
333 c.take().send(()); // must not happen
334 // Make sure dead_cs doesn't get closed until after select.
335 let _ = dead_cs;
336 }
337 do task::spawn {
338 fail2!(); // should kill sibling awake
339 }
340
341 // wait for killed selector to close (NOT send on) its c.
342 // hope to send 'true'.
343 success_c.take().send(p.try_recv().is_none());
344 }
345 };
346 assert!(success_p.recv());
347 }
348 }
349 }