(index<- )        ./libstd/select.rs

   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  #[allow(missing_doc)];
  12  
  13  use cell::Cell;
  14  use comm;
  15  use container::Container;
  16  use iter::{Iterator, DoubleEndedIterator};
  17  use option::*;
  18  // use either::{Either, Left, Right};
  19  // use rt::kill::BlockedTask;
  20  use rt::local::Local;
  21  use rt::shouldnt_be_public::{EventLoop, Scheduler, SelectInner, SelectPortInner};
  22  use task;
  23  use unstable::finally::Finally;
  24  use vec::{OwnedVector, MutableVector};
  25  
  26  /// Trait for message-passing primitives that can be select()ed on.
  27  pub trait Select : SelectInner { }
  28  
  29  /// Trait for message-passing primitives that can use the select2() convenience wrapper.
  30  // (This is separate from the above trait to enable heterogeneous lists of ports
  31  // that implement Select on different types to use select().)
  32  pub trait SelectPort<T> : SelectPortInner<T> { }
  33  
  34  /// Receive a message from any one of many ports at once. Returns the index of the
  35  /// port whose data is ready. (If multiple are ready, returns the lowest index.)
  36  pub fn select<A: Select>(ports&mut [A]) -> uint {
  37      if ports.is_empty() {
  38          fail2!("can't select on an empty list");
  39      }
  40  
  41      for (index, port) in ports.mut_iter().enumerate() {
  42          if port.optimistic_check() {
  43              return index;
  44          }
  45      }
  46  
  47      // If one of the ports already contains data when we go to block on it, we
  48      // don't bother enqueueing on the rest of them, so we shouldn't bother
  49      // unblocking from it either. This is just for efficiency, not correctness.
  50      // (If not, we need to unblock from all of them. Length is a placeholder.)
  51      let mut ready_index = ports.len();
  52  
  53      // XXX: We're using deschedule...and_then in an unsafe way here (see #8132),
  54      // in that we need to continue mutating the ready_index in the environment
  55      // after letting the task get woken up. The and_then closure needs to delay
  56      // the task from resuming until all ports have become blocked_on.
  57      let (p,c) = comm::oneshot();
  58      let p = Cell::new(p);
  59      let c = Cell::new(c);
  60  
  61      do (|| {
  62          let c = Cell::new(c.take());
  63          let sched~Scheduler = Local::take();
  64          do sched.deschedule_running_task_and_then |sched, task| {
  65              let task_handles = task.make_selectable(ports.len());
  66  
  67              for (index, (port, task_handle)) in
  68                      ports.mut_iter().zip(task_handles.move_iter()).enumerate() {
  69                  // If one of the ports has data by now, it will wake the handle.
  70                  if port.block_on(sched, task_handle) {
  71                      ready_index = index;
  72                      break;
  73                  }
  74              }
  75  
  76              let c = Cell::new(c.take());
  77              do sched.event_loop.callback { c.take().send_deferred(()) }
  78          }
  79      }).finally {
  80          let p = Cell::new(p.take());
  81          // Unkillable is necessary not because getting killed is dangerous here,
  82          // but to force the recv not to use the same kill-flag that we used for
  83          // selecting. Otherwise a user-sender could spuriously wakeup us here.
  84          do task::unkillable { p.take().recv(); }
  85      }
  86  
  87      // Task resumes. Now unblock ourselves from all the ports we blocked on.
  88      // If the success index wasn't reset, 'take' will just take all of them.
  89      // Iterate in reverse so the 'earliest' index that's ready gets returned.
  90      for (index, port) in ports.mut_slice(0, ready_index).mut_iter().enumerate().invert() {
  91          if port.unblock_from() {
  92              ready_index = index;
  93          }
  94      }
  95  
  96      assert!(ready_index < ports.len());
  97      return ready_index;
  98  }
  99  
 100  /* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
 101  
 102  impl <'self> Select for &'self mut Select {
 103      fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
 104      fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
 105          self.block_on(sched, task)
 106      }
 107      fn unblock_from(&mut self) -> bool { self.unblock_from() }
 108  }
 109  
 110  pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
 111          -> Either<(Option<TA>, B), (A, Option<TB>)> {
 112      let result = {
 113          let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
 114          select(ports)
 115      };
 116      match result {
 117          0 => Left ((a.recv_ready(), b)),
 118          1 => Right((a, b.recv_ready())),
 119          x => fail2!("impossible case in select2: {:?}", x)
 120      }
 121  }
 122  
 123  */
 124  
 125  #[cfg(test)]
 126  mod test {
 127      use super::*;
 128      use clone::Clone;
 129      use num::Times;
 130      use option::*;
 131      use rt::comm::*;
 132      use rt::test::*;
 133      use vec::*;
 134      use comm::GenericChan;
 135      use task;
 136      use cell::Cell;
 137      use iter::{Iterator, range};
 138  
 139      #[test] #[should_fail]
 140      fn select_doesnt_get_trolled() {
 141          select::<PortOne<()>>([]);
 142      }
 143  
 144      /* non-blocking select tests */
 145  
 146      #[cfg(test)]
 147      fn select_helper(num_ports: uint, send_on_chans: &[uint]) {
 148          // Unfortunately this does not actually test the block_on early-break
 149          // codepath in select -- racing between the sender and the receiver in
 150          // separate tasks is necessary to get around the optimistic check.
 151          let (ports, chans) = unzip(range(0, num_ports).map(|_| oneshot::<()>()));
 152          let mut dead_chans = ~[];
 153          let mut ports = ports;
 154          for (i, chan) in chans.move_iter().enumerate() {
 155              if send_on_chans.contains(&i) {
 156                  chan.send(());
 157              } else {
 158                  dead_chans.push(chan);
 159              }
 160          }
 161          let ready_index = select(ports);
 162          assert!(send_on_chans.contains(&ready_index));
 163          assert!(ports.swap_remove(ready_index).recv_ready().is_some());
 164          let _ = dead_chans;
 165  
 166          // Same thing with streams instead.
 167          // FIXME(#7971): This should be in a macro but borrowck isn't smart enough.
 168          let (ports, chans) = unzip(range(0, num_ports).map(|_| stream::<()>()));
 169          let mut dead_chans = ~[];
 170          let mut ports = ports;
 171          for (i, chan) in chans.move_iter().enumerate() {
 172              if send_on_chans.contains(&i) {
 173                  chan.send(());
 174              } else {
 175                  dead_chans.push(chan);
 176              }
 177          }
 178          let ready_index = select(ports);
 179          assert!(send_on_chans.contains(&ready_index));
 180          assert!(ports.swap_remove(ready_index).recv_ready().is_some());
 181          let _ = dead_chans;
 182      }
 183  
 184      #[test]
 185      fn select_one() {
 186          do run_in_newsched_task { select_helper(1, [0]) }
 187      }
 188  
 189      #[test]
 190      fn select_two() {
 191          // NB. I would like to have a test that tests the first one that is
 192          // ready is the one that's returned, but that can't be reliably tested
 193          // with the randomized behaviour of optimistic_check.
 194          do run_in_newsched_task { select_helper(2, [1]) }
 195          do run_in_newsched_task { select_helper(2, [0]) }
 196          do run_in_newsched_task { select_helper(2, [1,0]) }
 197      }
 198  
 199      #[test]
 200      fn select_a_lot() {
 201          do run_in_newsched_task { select_helper(12, [7,8,9]) }
 202      }
 203  
 204      #[test]
 205      fn select_stream() {
 206          use util;
 207          use comm::GenericChan;
 208  
 209          // Sends 10 buffered packets, and uses select to retrieve them all.
 210          // Puts the port in a different spot in the vector each time.
 211          do run_in_newsched_task {
 212              let (ports, _) = unzip(range(0u, 10).map(|_| stream::<int>()));
 213              let (port, chan) = stream();
 214              do 10.times { chan.send(31337); }
 215              let mut ports = ports;
 216              let mut port = Some(port);
 217              let order = [5u,0,4,3,2,6,9,8,7,1];
 218              for &index in order.iter() {
 219                  // put the port in the vector at any index
 220                  util::swap(port.get_mut_ref(), &mut ports[index]);
 221                  assert!(select(ports) == index);
 222                  // get it back out
 223                  util::swap(port.get_mut_ref(), &mut ports[index]);
 224                  // NB. Not recv(), because optimistic_check randomly fails.
 225                  assert!(port.get_ref().recv_ready().unwrap() == 31337);
 226              }
 227          }
 228      }
 229  
 230      #[test]
 231      fn select_unkillable() {
 232          do run_in_newsched_task {
 233              do task::unkillable { select_helper(2, [1]) }
 234          }
 235      }
 236  
 237      /* blocking select tests */
 238  
 239      #[test]
 240      fn select_blocking() {
 241          select_blocking_helper(true);
 242          select_blocking_helper(false);
 243  
 244          fn select_blocking_helper(killable: bool) {
 245              do run_in_newsched_task {
 246                  let (p1,_c) = oneshot();
 247                  let (p2,c2) = oneshot();
 248                  let mut ports = [p1,p2];
 249  
 250                  let (p3,c3) = oneshot();
 251                  let (p4,c4) = oneshot();
 252  
 253                  let x = Cell::new((c2, p3, c4));
 254                  do task::spawn {
 255                      let (c2, p3, c4) = x.take();
 256                      p3.recv();   // handshake parent
 257                      c4.send(()); // normal receive
 258                      task::deschedule();
 259                      c2.send(()); // select receive
 260                  }
 261  
 262                  // Try to block before child sends on c2.
 263                  c3.send(());
 264                  p4.recv();
 265                  if killable {
 266                      assert!(select(ports) == 1);
 267                  } else {
 268                      do task::unkillable { assert!(select(ports) == 1); }
 269                  }
 270              }
 271          }
 272      }
 273  
 274      #[test]
 275      fn select_racing_senders() {
 276          static NUM_CHANS: uint = 10;
 277  
 278          select_racing_senders_helper(true,  ~[0,1,2,3,4,5,6,7,8,9]);
 279          select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]);
 280          select_racing_senders_helper(true,  ~[0,1,2]);
 281          select_racing_senders_helper(false, ~[0,1,2]);
 282          select_racing_senders_helper(true,  ~[3,4,5,6]);
 283          select_racing_senders_helper(false, ~[3,4,5,6]);
 284          select_racing_senders_helper(true,  ~[7,8,9]);
 285          select_racing_senders_helper(false, ~[7,8,9]);
 286  
 287          fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
 288              use rt::test::spawntask_random;
 289  
 290              do run_in_newsched_task {
 291                  // A bit of stress, since ordinarily this is just smoke and mirrors.
 292                  do 4.times {
 293                      let send_on_chans = send_on_chans.clone();
 294                      do task::spawn {
 295                          let mut ports = ~[];
 296                          for i in range(0u, NUM_CHANS) {
 297                              let (p,c) = oneshot();
 298                              ports.push(p);
 299                              if send_on_chans.contains(&i) {
 300                                  let c = Cell::new(c);
 301                                  do spawntask_random {
 302                                      task::deschedule();
 303                                      c.take().send(());
 304                                  }
 305                              }
 306                          }
 307                          // nondeterministic result, but should succeed
 308                          if killable {
 309                              select(ports);
 310                          } else {
 311                              do task::unkillable { select(ports); }
 312                          }
 313                      }
 314                  }
 315              }
 316          }
 317      }
 318  
 319      #[test]
 320      fn select_killed() {
 321          do run_in_newsched_task {
 322              let (success_p, success_c) = oneshot::<bool>();
 323              let success_c = Cell::new(success_c);
 324              do task::try {
 325                  let success_c = Cell::new(success_c.take());
 326                  do task::unkillable {
 327                      let (p,c) = oneshot();
 328                      let c = Cell::new(c);
 329                      do task::spawn {
 330                          let (dead_ps, dead_cs) = unzip(range(0u, 5).map(|_| oneshot::<()>()));
 331                          let mut ports = dead_ps;
 332                          select(ports); // should get killed; nothing should leak
 333                          c.take().send(()); // must not happen
 334                          // Make sure dead_cs doesn't get closed until after select.
 335                          let _ = dead_cs;
 336                      }
 337                      do task::spawn {
 338                          fail2!(); // should kill sibling awake
 339                      }
 340  
 341                      // wait for killed selector to close (NOT send on) its c.
 342                      // hope to send 'true'.
 343                      success_c.take().send(p.try_recv().is_none());
 344                  }
 345              };
 346              assert!(success_p.recv());
 347          }
 348      }
 349  }