(index<- )        ./libstd/rt/tube.rs

   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! A very simple unsynchronized channel type for sending buffered data from
  12  //! scheduler context to task context.
  13  //!
  14  //! XXX: This would be safer to use if split into two types like Port/Chan
  15  
  16  use option::*;
  17  use clone::Clone;
  18  use super::rc::RC;
  19  use rt::sched::Scheduler;
  20  use rt::kill::BlockedTask;
  21  use rt::local::Local;
  22  use vec::OwnedVector;
  23  use container::Container;
  24  
  25  struct TubeState<T> {
  26      blocked_task: Option<BlockedTask>,
  27      buf: ~[T]
  28  }
  29  
  30  pub struct Tube<T> {
  31      p: RC<TubeState<T>>
  32  }
  33  
  34  impl<T> Tube<T> {
  35      pub fn new() -> Tube<T> {
  36          Tube {
  37              p: RC::new(TubeState {
  38                  blocked_task: None,
  39                  buf: ~[]
  40              })
  41          }
  42      }
  43  
  44      pub fn send(&mut self, valT) {
  45          rtdebug!("tube send");
  46          unsafe {
  47              let state = self.p.unsafe_borrow_mut();
  48              (*state).buf.push(val);
  49  
  50              if (*state).blocked_task.is_some() {
  51                  // There's a waiting task. Wake it up
  52                  rtdebug!("waking blocked tube");
  53                  let task = (*state).blocked_task.take_unwrap();
  54                  let sched~Scheduler = Local::take();
  55                  sched.resume_blocked_task_immediately(task);
  56              }
  57          }
  58      }
  59  
  60      pub fn recv(&mut self) -> T {
  61          unsafe {
  62              let state = self.p.unsafe_borrow_mut();
  63              if !(*state).buf.is_empty() {
  64                  return (*state).buf.shift();
  65              } else {
  66                  // Block and wait for the next message
  67                  rtdebug!("blocking on tube recv");
  68                  assert!(self.p.refcount() > 1); // There better be somebody to wake us up
  69                  assert!((*state).blocked_task.is_none());
  70                  let sched~Scheduler = Local::take();
  71                  do sched.deschedule_running_task_and_then |_, task| {
  72                      (*state).blocked_task = Some(task);
  73                  }
  74                  rtdebug!("waking after tube recv");
  75                  let buf = &mut (*state).buf;
  76                  assert!(!buf.is_empty());
  77                  return buf.shift();
  78              }
  79          }
  80      }
  81  }
  82  
  83  impl<T> Clone for Tube<T> {
  84      fn clone(&self) -> Tube<T> {
  85          Tube { p: self.p.clone() }
  86      }
  87  }
  88  
  89  #[cfg(test)]
  90  mod test {
  91      use cell::Cell;
  92      use rt::test::*;
  93      use rt::rtio::EventLoop;
  94      use rt::sched::Scheduler;
  95      use rt::local::Local;
  96      use super::*;
  97      use prelude::*;
  98  
  99      #[test]
 100      fn simple_test() {
 101          do run_in_newsched_task {
 102              let mut tube: Tube<int> = Tube::new();
 103              let tube_clone = tube.clone();
 104              let tube_clone_cell = Cell::new(tube_clone);
 105              let sched: ~Scheduler = Local::take();
 106              do sched.deschedule_running_task_and_then |sched, task| {
 107                  let mut tube_clone = tube_clone_cell.take();
 108                  tube_clone.send(1);
 109                  sched.enqueue_blocked_task(task);
 110              }
 111  
 112              assert!(tube.recv() == 1);
 113          }
 114      }
 115  
 116      #[test]
 117      fn blocking_test() {
 118          do run_in_newsched_task {
 119              let mut tube: Tube<int> = Tube::new();
 120              let tube_clone = tube.clone();
 121              let tube_clone = Cell::new(tube_clone);
 122              let sched: ~Scheduler = Local::take();
 123              do sched.deschedule_running_task_and_then |sched, task| {
 124                  let tube_clone = Cell::new(tube_clone.take());
 125                  do sched.event_loop.callback {
 126                      let mut tube_clone = tube_clone.take();
 127                      // The task should be blocked on this now and
 128                      // sending will wake it up.
 129                      tube_clone.send(1);
 130                  }
 131                  sched.enqueue_blocked_task(task);
 132              }
 133  
 134              assert!(tube.recv() == 1);
 135          }
 136      }
 137  
 138      #[test]
 139      fn many_blocking_test() {
 140          static MAX: int = 100;
 141  
 142          do run_in_newsched_task {
 143              let mut tube: Tube<int> = Tube::new();
 144              let tube_clone = tube.clone();
 145              let tube_clone = Cell::new(tube_clone);
 146              let sched: ~Scheduler = Local::take();
 147              do sched.deschedule_running_task_and_then |sched, task| {
 148                  callback_send(tube_clone.take(), 0);
 149  
 150                  fn callback_send(tube: Tube<int>, i: int) {
 151                      if i == 100 { return; }
 152  
 153                      let tube = Cell::new(Cell::new(tube));
 154                      do Local::borrow |sched: &mut Scheduler| {
 155                          let tube = tube.take();
 156                          do sched.event_loop.callback {
 157                              let mut tube = tube.take();
 158                              // The task should be blocked on this now and
 159                              // sending will wake it up.
 160                              tube.send(i);
 161                              callback_send(tube, i + 1);
 162                          }
 163                      }
 164                  }
 165  
 166                  sched.enqueue_blocked_task(task);
 167              }
 168  
 169              for i in range(0, MAX) {
 170                  let j = tube.recv();
 171                  assert!(j == i);
 172              }
 173          }
 174      }
 175  }