(index<- )        ./libnative/task.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! Tasks implemented on top of OS threads
  12  //!
  13  //! This module contains the implementation of the 1:1 threading module required
  14  //! by rust tasks. This implements the necessary API traits laid out by std::rt
  15  //! in order to spawn new tasks and deschedule the current task.
  16  
  17  use std::any::Any;
  18  use std::cast;
  19  use std::rt::bookkeeping;
  20  use std::rt::env;
  21  use std::rt::local::Local;
  22  use std::rt::rtio;
  23  use std::rt::stack;
  24  use std::rt::task::{Task, BlockedTask, SendMessage};
  25  use std::rt::thread::Thread;
  26  use std::rt;
  27  use std::task::TaskOpts;
  28  use std::unstable::mutex::NativeMutex;
  29  
  30  use io;
  31  use task;
  32  
  33  /// Creates a new Task which is ready to execute as a 1:1 task.
  34  pub fn new(stack_bounds(uint, uint)) -> Box<Task> {
  35      let mut task = box Task::new();
  36      let mut ops = ops();
  37      ops.stack_bounds = stack_bounds;
  38      task.put_runtime(ops);
  39      return task;
  40  }
  41  
  42  fn ops() -> Box<Ops> {
  43      box Ops {
  44          lock: unsafe { NativeMutex::new() },
  45          awoken: false,
  46          io: io::IoFactory::new(),
  47          // these *should* get overwritten
  48          stack_bounds: (0, 0),
  49      }
  50  }
  51  
  52  /// Spawns a function with the default configuration
  53  pub fn spawn(f: proc():Send) {
  54      spawn_opts(TaskOpts::new(), f)
  55  }
  56  
  57  /// Spawns a new task given the configuration options and a procedure to run
  58  /// inside the task.
  59  pub fn spawn_opts(optsTaskOpts, f: proc():Send) {
  60      let TaskOpts {
  61          notify_chan, name, stack_size,
  62          stderr, stdout,
  63      } = opts;
  64  
  65      let mut task = box Task::new();
  66      task.name = name;
  67      task.stderr = stderr;
  68      task.stdout = stdout;
  69      match notify_chan {
  70          Some(chan) => { task.death.on_exit = Some(SendMessage(chan)); }
  71          None => {}
  72      }
  73  
  74      let stack = stack_size.unwrap_or(env::min_stack());
  75      let task = task;
  76      let ops = ops();
  77  
  78      // Note that this increment must happen *before* the spawn in order to
  79      // guarantee that if this task exits it will always end up waiting for the
  80      // spawned task to exit.
  81      bookkeeping::increment();
  82  
  83      // Spawning a new OS thread guarantees that __morestack will never get
  84      // triggered, but we must manually set up the actual stack bounds once this
  85      // function starts executing. This raises the lower limit by a bit because
  86      // by the time that this function is executing we've already consumed at
  87      // least a little bit of stack (we don't know the exact byte address at
  88      // which our stack started).
  89      Thread::spawn_stack(stack, proc() {
  90          let something_around_the_top_of_the_stack = 1;
  91          let addr = &something_around_the_top_of_the_stack as *int;
  92          let my_stack = addr as uint;
  93          unsafe {
  94              stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
  95          }
  96          let mut ops = ops;
  97          ops.stack_bounds = (my_stack - stack + 1024, my_stack);
  98  
  99          let mut f = Some(f);
 100          let mut task = task;
 101          task.put_runtime(ops);
 102          let t = task.run(|| { f.take_unwrap()() });
 103          drop(t);
 104          bookkeeping::decrement();
 105      })
 106  }
 107  
 108  // This structure is the glue between channels and the 1:1 scheduling mode. This
 109  // structure is allocated once per task.
 110  struct Ops {
 111      lock: NativeMutex,       // native synchronization
 112      awoken: bool,      // used to prevent spurious wakeups
 113      io: io::IoFactory, // local I/O factory
 114  
 115      // This field holds the known bounds of the stack in (lo, hi) form. Not all
 116      // native tasks necessarily know their precise bounds, hence this is
 117      // optional.
 118      stack_bounds: (uint, uint),
 119  }
 120  
 121  impl rt::Runtime for Ops {
 122      fn yield_now(~self, mut cur_taskBox<Task>) {
 123          // put the task back in TLS and then invoke the OS thread yield
 124          cur_task.put_runtime(self);
 125          Local::put(cur_task);
 126          Thread::yield_now();
 127      }
 128  
 129      fn maybe_yield(~self, mut cur_taskBox<Task>) {
 130          // just put the task back in TLS, on OS threads we never need to
 131          // opportunistically yield b/c the OS will do that for us (preemption)
 132          cur_task.put_runtime(self);
 133          Local::put(cur_task);
 134      }
 135  
 136      fn wrap(~self) -> Box<Any> {
 137          self as Box<Any>
 138      }
 139  
 140      fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }
 141  
 142      fn can_block(&self) -> bool { true }
 143  
 144      // This function gets a little interesting. There are a few safety and
 145      // ownership violations going on here, but this is all done in the name of
 146      // shared state. Additionally, all of the violations are protected with a
 147      // mutex, so in theory there are no races.
 148      //
 149      // The first thing we need to do is to get a pointer to the task's internal
 150      // mutex. This address will not be changing (because the task is allocated
 151      // on the heap). We must have this handle separately because the task will
 152      // have its ownership transferred to the given closure. We're guaranteed,
 153      // however, that this memory will remain valid because *this* is the current
 154      // task's execution thread.
 155      //
 156      // The next weird part is where ownership of the task actually goes. We
 157      // relinquish it to the `f` blocking function, but upon returning this
 158      // function needs to replace the task back in TLS. There is no communication
 159      // from the wakeup thread back to this thread about the task pointer, and
 160      // there's really no need to. In order to get around this, we cast the task
 161      // to a `uint` which is then used at the end of this function to cast back
 162      // to a `Box<Task>` object. Naturally, this looks like it violates
 163      // ownership semantics in that there may be two `Box<Task>` objects.
 164      //
 165      // The fun part is that the wakeup half of this implementation knows to
 166      // "forget" the task on the other end. This means that the awakening half of
 167      // things silently relinquishes ownership back to this thread, but not in a
 168      // way that the compiler can understand. The task's memory is always valid
 169      // for both tasks because these operations are all done inside of a mutex.
 170      //
 171      // You'll also find that if blocking fails (the `f` function hands the
 172      // BlockedTask back to us), we will `cast::forget` the handles. The
 173      // reasoning for this is the same logic as above in that the task silently
 174      // transfers ownership via the `uint`, not through normal compiler
 175      // semantics.
 176      //
 177      // On a mildly unrelated note, it should also be pointed out that OS
 178      // condition variables are susceptible to spurious wakeups, which we need to
 179      // be ready for. In order to accomodate for this fact, we have an extra
 180      // `awoken` field which indicates whether we were actually woken up via some
 181      // invocation of `reawaken`. This flag is only ever accessed inside the
 182      // lock, so there's no need to make it atomic.
 183      fn deschedule(mut ~self, timesuint, mut cur_taskBox<Task>,
 184                    f|BlockedTask-> Result<(), BlockedTask>) {
 185          let me = &mut *self as *mut Ops;
 186          cur_task.put_runtime(self);
 187  
 188          unsafe {
 189              let cur_task_dupe = &*cur_task as *Task;
 190              let task = BlockedTask::block(cur_task);
 191  
 192              if times == 1 {
 193                  let guard = (*me).lock.lock();
 194                  (*me).awoken = false;
 195                  match f(task) {
 196                      Ok(()) => {
 197                          while !(*me).awoken {
 198                              guard.wait();
 199                          }
 200                      }
 201                      Err(task) => { cast::forget(task.wake()); }
 202                  }
 203              } else {
 204                  let iter = task.make_selectable(times);
 205                  let guard = (*me).lock.lock();
 206                  (*me).awoken = false;
 207  
 208                  // Apply the given closure to all of the "selectable tasks",
 209                  // bailing on the first one that produces an error. Note that
 210                  // care must be taken such that when an error is occurred, we
 211                  // may not own the task, so we may still have to wait for the
 212                  // task to become available. In other words, if task.wake()
 213                  // returns `None`, then someone else has ownership and we must
 214                  // wait for their signal.
 215                  match iter.map(f).filter_map(|a| a.err()).next() {
 216                      None => {}
 217                      Some(task) => {
 218                          match task.wake() {
 219                              Some(task) => {
 220                                  cast::forget(task);
 221                                  (*me).awoken = true;
 222                              }
 223                              None => {}
 224                          }
 225                      }
 226                  }
 227                  while !(*me).awoken {
 228                      guard.wait();
 229                  }
 230              }
 231              // re-acquire ownership of the task
 232              cur_task = cast::transmute(cur_task_dupe);
 233          }
 234  
 235          // put the task back in TLS, and everything is as it once was.
 236          Local::put(cur_task);
 237      }
 238  
 239      // See the comments on `deschedule` for why the task is forgotten here, and
 240      // why it's valid to do so.
 241      fn reawaken(mut ~self, mut to_wakeBox<Task>) {
 242          unsafe {
 243              let me = &mut *self as *mut Ops;
 244              to_wake.put_runtime(self);
 245              cast::forget(to_wake);
 246              let guard = (*me).lock.lock();
 247              (*me).awoken = true;
 248              guard.signal();
 249          }
 250      }
 251  
 252      fn spawn_sibling(~self,
 253                       mut cur_taskBox<Task>,
 254                       optsTaskOpts,
 255                       fproc():Send) {
 256          cur_task.put_runtime(self);
 257          Local::put(cur_task);
 258  
 259          task::spawn_opts(opts, f);
 260      }
 261  
 262      fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
 263          Some(rtio::LocalIo::new(&mut self.io as &mut rtio::IoFactory))
 264      }
 265  }
 266  
 267  #[cfg(test)]
 268  mod tests {
 269      use std::rt::local::Local;
 270      use std::rt::task::Task;
 271      use std::task;
 272      use std::task::TaskOpts;
 273      use super::{spawn, spawn_opts, Ops};
 274  
 275      #[test]
 276      fn smoke() {
 277          let (tx, rx) = channel();
 278          spawn(proc() {
 279              tx.send(());
 280          });
 281          rx.recv();
 282      }
 283  
 284      #[test]
 285      fn smoke_fail() {
 286          let (tx, rx) = channel::<()>();
 287          spawn(proc() {
 288              let _tx = tx;
 289              fail!()
 290          });
 291          assert_eq!(rx.recv_opt(), Err(()));
 292      }
 293  
 294      #[test]
 295      fn smoke_opts() {
 296          let mut opts = TaskOpts::new();
 297          opts.name = Some("test".into_maybe_owned());
 298          opts.stack_size = Some(20 * 4096);
 299          let (tx, rx) = channel();
 300          opts.notify_chan = Some(tx);
 301          spawn_opts(opts, proc() {});
 302          assert!(rx.recv().is_ok());
 303      }
 304  
 305      #[test]
 306      fn smoke_opts_fail() {
 307          let mut opts = TaskOpts::new();
 308          let (tx, rx) = channel();
 309          opts.notify_chan = Some(tx);
 310          spawn_opts(opts, proc() { fail!() });
 311          assert!(rx.recv().is_err());
 312      }
 313  
 314      #[test]
 315      fn yield_test() {
 316          let (tx, rx) = channel();
 317          spawn(proc() {
 318              for _ in range(0, 10) { task::deschedule(); }
 319              tx.send(());
 320          });
 321          rx.recv();
 322      }
 323  
 324      #[test]
 325      fn spawn_children() {
 326          let (tx1, rx) = channel();
 327          spawn(proc() {
 328              let (tx2, rx) = channel();
 329              spawn(proc() {
 330                  let (tx3, rx) = channel();
 331                  spawn(proc() {
 332                      tx3.send(());
 333                  });
 334                  rx.recv();
 335                  tx2.send(());
 336              });
 337              rx.recv();
 338              tx1.send(());
 339          });
 340          rx.recv();
 341      }
 342  
 343      #[test]
 344      fn spawn_inherits() {
 345          let (tx, rx) = channel();
 346          spawn(proc() {
 347              spawn(proc() {
 348                  let mut task: Box<Task> = Local::take();
 349                  match task.maybe_take_runtime::<Ops>() {
 350                      Some(ops) => {
 351                          task.put_runtime(ops);
 352                      }
 353                      None => fail!(),
 354                  }
 355                  Local::put(task);
 356                  tx.send(());
 357              });
 358          });
 359          rx.recv();
 360      }
 361  }