(index<- )        ./libgreen/task.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! The Green Task implementation
  12  //!
  13  //! This module contains the glue to the libstd runtime necessary to integrate
  14  //! M:N scheduling. This GreenTask structure is hidden as a trait object in all
  15  //! rust tasks and virtual calls are made in order to interface with it.
  16  //!
  17  //! Each green task contains a scheduler if it is currently running, and it also
  18  //! contains the rust task itself in order to juggle around ownership of the
  19  //! values.
  20  
  21  use std::any::Any;
  22  use std::cast;
  23  use std::raw;
  24  use std::rt::Runtime;
  25  use std::rt::env;
  26  use std::rt::local::Local;
  27  use std::rt::rtio;
  28  use std::rt::stack;
  29  use std::rt::task::{Task, BlockedTask, SendMessage};
  30  use std::task::TaskOpts;
  31  use std::unstable::mutex::NativeMutex;
  32  
  33  use context::Context;
  34  use coroutine::Coroutine;
  35  use sched::{Scheduler, SchedHandle, RunOnce};
  36  use stack::StackPool;
  37  
  38  /// The necessary fields needed to keep track of a green task (as opposed to a
  39  /// 1:1 task).
  40  pub struct GreenTask {
  41      /// Coroutine that this task is running on, otherwise known as the register
  42      /// context and the stack that this task owns. This field is optional to
  43      /// relinquish ownership back to a scheduler to recycle stacks at a later
  44      /// date.
  45      pub coroutine: Option<Coroutine>,
  46  
  47      /// Optional handle back into the home sched pool of this task. This field
  48      /// is lazily initialized.
  49      pub handle: Option<SchedHandle>,
  50  
  51      /// Slot for maintaining ownership of a scheduler. If a task is running,
  52      /// this value will be Some(sched) where the task is running on "sched".
  53      pub sched: Option<Box<Scheduler>>,
  54  
  55      /// Temporary ownership slot of a std::rt::task::Task object. This is used
  56      /// to squirrel that libstd task away while we're performing green task
  57      /// operations.
  58      pub task: Option<Box<Task>>,
  59  
  60      /// Dictates whether this is a sched task or a normal green task
  61      pub task_type: TaskType,
  62  
  63      /// Home pool that this task was spawned into. This field is lazily
  64      /// initialized until when the task is initially scheduled, and is used to
  65      /// make sure that tasks are always woken up in the correct pool of
  66      /// schedulers.
  67      pub pool_id: uint,
  68  
  69      // See the comments in the scheduler about why this is necessary
  70      pub nasty_deschedule_lock: NativeMutex,
  71  }
  72  
  73  pub enum TaskType {
  74      TypeGreen(Option<Home>),
  75      TypeSched,
  76  }
  77  
  78  pub enum Home {
  79      AnySched,
  80      HomeSched(SchedHandle),
  81  }
  82  
  83  /// Trampoline code for all new green tasks which are running around. This
  84  /// function is passed through to Context::new as the initial rust landing pad
  85  /// for all green tasks. This code is actually called after the initial context
  86  /// switch onto a green thread.
  87  ///
  88  /// The first argument to this function is the `Box<GreenTask>` pointer, and
  89  /// the next two arguments are the user-provided procedure for running code.
  90  ///
  91  /// The goal for having this weird-looking function is to reduce the number of
  92  /// allocations done on a green-task startup as much as possible.
  93  extern fn bootstrap_green_task(task: uint, code: *(), env: *()) -> ! {
  94      // Acquire ownership of the `proc()`
  95      let startproc() = unsafe {
  96          cast::transmute(raw::Procedure { code: code, env: env })
  97      };
  98  
  99      // Acquire ownership of the `Box<GreenTask>`
 100      let mut taskBox<GreenTask> = unsafe { cast::transmute(task) };
 101  
 102      // First code after swap to this new context. Run our cleanup job
 103      task.pool_id = {
 104          let sched = task.sched.get_mut_ref();
 105          sched.run_cleanup_job();
 106          sched.task_state.increment();
 107          sched.pool_id
 108      };
 109  
 110      // Convert our green task to a libstd task and then execute the code
 111      // requested. This is the "try/catch" block for this green task and
 112      // is the wrapper for *all* code run in the task.
 113      let mut start = Some(start);
 114      let task = task.swap().run(|| start.take_unwrap()());
 115  
 116      // Once the function has exited, it's time to run the termination
 117      // routine. This means we need to context switch one more time but
 118      // clean ourselves up on the other end. Since we have no way of
 119      // preserving a handle to the GreenTask down to this point, this
 120      // unfortunately must call `GreenTask::convert`. In order to avoid
 121      // this we could add a `terminate` function to the `Runtime` trait
 122      // in libstd, but that seems less appropriate since the coversion
 123      // method exists.
 124      GreenTask::convert(task).terminate()
 125  }
 126  
 127  impl GreenTask {
 128      /// Creates a new green task which is not homed to any particular scheduler
 129      /// and will not have any contained Task structure.
 130      pub fn new(stack_pool&mut StackPool,
 131                 stack_sizeOption<uint>,
 132                 startproc():Send) -> Box<GreenTask> {
 133          GreenTask::new_homed(stack_pool, stack_size, AnySched, start)
 134      }
 135  
 136      /// Creates a new task (like `new`), but specifies the home for new task.
 137      pub fn new_homed(stack_pool&mut StackPool,
 138                       stack_sizeOption<uint>,
 139                       homeHome,
 140                       startproc():Send) -> Box<GreenTask> {
 141          // Allocate ourselves a GreenTask structure
 142          let mut ops = GreenTask::new_typed(None, TypeGreen(Some(home)));
 143  
 144          // Allocate a stack for us to run on
 145          let stack_size = stack_size.unwrap_or_else(|| env::min_stack());
 146          let mut stack = stack_pool.take_stack(stack_size);
 147          let context = Context::new(bootstrap_green_task, ops.as_uint(), start,
 148                                     &mut stack);
 149  
 150          // Package everything up in a coroutine and return
 151          ops.coroutine = Some(Coroutine {
 152              current_stack_segment: stack,
 153              saved_context: context,
 154          });
 155          return ops;
 156      }
 157  
 158      /// Creates a new green task with the specified coroutine and type, this is
 159      /// useful when creating scheduler tasks.
 160      pub fn new_typed(coroutineOption<Coroutine>,
 161                       task_typeTaskType) -> Box<GreenTask> {
 162          box GreenTask {
 163              pool_id: 0,
 164              coroutine: coroutine,
 165              task_type: task_type,
 166              sched: None,
 167              handle: None,
 168              nasty_deschedule_lock: unsafe { NativeMutex::new() },
 169              task: Some(box Task::new()),
 170          }
 171      }
 172  
 173      /// Creates a new green task with the given configuration options for the
 174      /// contained Task object. The given stack pool is also used to allocate a
 175      /// new stack for this task.
 176      pub fn configure(pool&mut StackPool,
 177                       optsTaskOpts,
 178                       fproc():Send) -> Box<GreenTask> {
 179          let TaskOpts {
 180              notify_chan, name, stack_size,
 181              stderr, stdout,
 182          } = opts;
 183  
 184          let mut green = GreenTask::new(pool, stack_size, f);
 185          {
 186              let task = green.task.get_mut_ref();
 187              task.name = name;
 188              task.stderr = stderr;
 189              task.stdout = stdout;
 190              match notify_chan {
 191                  Some(chan) => {
 192                      task.death.on_exit = Some(SendMessage(chan));
 193                  }
 194                  None => {}
 195              }
 196          }
 197          return green;
 198      }
 199  
 200      /// Just like the `maybe_take_runtime` function, this function should *not*
 201      /// exist. Usage of this function is _strongly_ discouraged. This is an
 202      /// absolute last resort necessary for converting a libstd task to a green
 203      /// task.
 204      ///
 205      /// This function will assert that the task is indeed a green task before
 206      /// returning (and will kill the entire process if this is wrong).
 207      pub fn convert(mut taskBox<Task>) -> Box<GreenTask> {
 208          match task.maybe_take_runtime::<GreenTask>() {
 209              Some(mut green) => {
 210                  green.put_task(task);
 211                  green
 212              }
 213              None => rtabort!("not a green task any more?"),
 214          }
 215      }
 216  
 217      pub fn give_home(&mut self, new_homeHome) {
 218          match self.task_type {
 219              TypeGreen(ref mut home) => { *home = Some(new_home); }
 220              TypeSched => rtabort!("type error: used SchedTask as GreenTask"),
 221          }
 222      }
 223  
 224      pub fn take_unwrap_home(&mut self) -> Home {
 225          match self.task_type {
 226              TypeGreen(ref mut home) => home.take_unwrap(),
 227              TypeSched => rtabort!("type error: used SchedTask as GreenTask"),
 228          }
 229      }
 230  
 231      // New utility functions for homes.
 232  
 233      pub fn is_home_no_tls(&self, sched&Scheduler) -> bool {
 234          match self.task_type {
 235              TypeGreen(Some(AnySched)) => { false }
 236              TypeGreen(Some(HomeSched(SchedHandle { sched_id: ref id, .. }))) => {
 237                  *id == sched.sched_id()
 238              }
 239              TypeGreen(None) => { rtabort!("task without home"); }
 240              TypeSched => {
 241                  // Awe yea
 242                  rtabort!("type error: expected: TypeGreen, found: TaskSched");
 243              }
 244          }
 245      }
 246  
 247      pub fn homed(&self) -> bool {
 248          match self.task_type {
 249              TypeGreen(Some(AnySched)) => { false }
 250              TypeGreen(Some(HomeSched(SchedHandle { .. }))) => { true }
 251              TypeGreen(None) => {
 252                  rtabort!("task without home");
 253              }
 254              TypeSched => {
 255                  rtabort!("type error: expected: TypeGreen, found: TaskSched");
 256              }
 257          }
 258      }
 259  
 260      pub fn is_sched(&self) -> bool {
 261          match self.task_type {
 262              TypeGreen(..) => false, TypeSched => true,
 263          }
 264      }
 265  
 266      // Unsafe functions for transferring ownership of this GreenTask across
 267      // context switches
 268  
 269      pub fn as_uint(&self) -> uint {
 270          self as *GreenTask as uint
 271      }
 272  
 273      pub unsafe fn from_uint(valuint) -> Box<GreenTask> {
 274          cast::transmute(val)
 275      }
 276  
 277      // Runtime glue functions and helpers
 278  
 279      pub fn put_with_sched(mut ~self, schedBox<Scheduler>) {
 280          assert!(self.sched.is_none());
 281          self.sched = Some(sched);
 282          self.put();
 283      }
 284  
 285      pub fn put_task(&mut self, taskBox<Task>) {
 286          assert!(self.task.is_none());
 287          self.task = Some(task);
 288      }
 289  
 290      pub fn swap(mut ~self) -> Box<Task> {
 291          let mut task = self.task.take_unwrap();
 292          task.put_runtime(self);
 293          return task;
 294      }
 295  
 296      pub fn put(~self) {
 297          assert!(self.sched.is_some());
 298          Local::put(self.swap());
 299      }
 300  
 301      fn terminate(mut ~self) -> ! {
 302          let sched = self.sched.take_unwrap();
 303          sched.terminate_current_task(self)
 304      }
 305  
 306      // This function is used to remotely wakeup this green task back on to its
 307      // original pool of schedulers. In order to do so, each tasks arranges a
 308      // SchedHandle upon descheduling to be available for sending itself back to
 309      // the original pool.
 310      //
 311      // Note that there is an interesting transfer of ownership going on here. We
 312      // must relinquish ownership of the green task, but then also send the task
 313      // over the handle back to the original scheduler. In order to safely do
 314      // this, we leverage the already-present "nasty descheduling lock". The
 315      // reason for doing this is that each task will bounce on this lock after
 316      // resuming after a context switch. By holding the lock over the enqueueing
 317      // of the task, we're guaranteed that the SchedHandle's memory will be valid
 318      // for this entire function.
 319      //
 320      // An alternative would include having incredibly cheaply cloneable handles,
 321      // but right now a SchedHandle is something like 6 allocations, so it is
 322      // *not* a cheap operation to clone a handle. Until the day comes that we
 323      // need to optimize this, a lock should do just fine (it's completely
 324      // uncontended except for when the task is rescheduled).
 325      fn reawaken_remotely(mut ~self) {
 326          unsafe {
 327              let mtx = &mut self.nasty_deschedule_lock as *mut NativeMutex;
 328              let handle = self.handle.get_mut_ref() as *mut SchedHandle;
 329              let _guard = (*mtx).lock();
 330              (*handle).send(RunOnce(self));
 331          }
 332      }
 333  }
 334  
 335  impl Runtime for GreenTask {
 336      fn yield_now(mut ~self, cur_taskBox<Task>) {
 337          self.put_task(cur_task);
 338          let sched = self.sched.take_unwrap();
 339          sched.yield_now(self);
 340      }
 341  
 342      fn maybe_yield(mut ~self, cur_taskBox<Task>) {
 343          self.put_task(cur_task);
 344          let sched = self.sched.take_unwrap();
 345          sched.maybe_yield(self);
 346      }
 347  
 348      fn deschedule(mut ~self, timesuint, cur_taskBox<Task>,
 349                    f|BlockedTask-> Result<(), BlockedTask>) {
 350          self.put_task(cur_task);
 351          let mut sched = self.sched.take_unwrap();
 352  
 353          // In order for this task to be reawoken in all possible contexts, we
 354          // may need a handle back in to the current scheduler. When we're woken
 355          // up in anything other than the local scheduler pool, this handle is
 356          // used to send this task back into the scheduler pool.
 357          if self.handle.is_none() {
 358              self.handle = Some(sched.make_handle());
 359              self.pool_id = sched.pool_id;
 360          }
 361  
 362          // This code is pretty standard, except for the usage of
 363          // `GreenTask::convert`. Right now if we use `reawaken` directly it will
 364          // expect for there to be a task in local TLS, but that is not true for
 365          // this deschedule block (because the scheduler must retain ownership of
 366          // the task while the cleanup job is running). In order to get around
 367          // this for now, we invoke the scheduler directly with the converted
 368          // Task => GreenTask structure.
 369          if times == 1 {
 370              sched.deschedule_running_task_and_then(self, |sched, task| {
 371                  match f(task) {
 372                      Ok(()) => {}
 373                      Err(t) => {
 374                          t.wake().map(|t| {
 375                              sched.enqueue_task(GreenTask::convert(t))
 376                          });
 377                      }
 378                  }
 379              });
 380          } else {
 381              sched.deschedule_running_task_and_then(self, |sched, task| {
 382                  for task in task.make_selectable(times) {
 383                      match f(task) {
 384                          Ok(()) => {},
 385                          Err(task) => {
 386                              task.wake().map(|t| {
 387                                  sched.enqueue_task(GreenTask::convert(t))
 388                              });
 389                              break
 390                          }
 391                      }
 392                  }
 393              });
 394          }
 395      }
 396  
 397      fn reawaken(mut ~self, to_wakeBox<Task>) {
 398          self.put_task(to_wake);
 399          assert!(self.sched.is_none());
 400  
 401          // Optimistically look for a local task, but if one's not available to
 402          // inspect (in order to see if it's in the same sched pool as we are),
 403          // then just use our remote wakeup routine and carry on!
 404          let mut running_taskBox<Task> = match Local::try_take() {
 405              Some(task) => task,
 406              None => return self.reawaken_remotely()
 407          };
 408  
 409          // Waking up a green thread is a bit of a tricky situation. We have no
 410          // guarantee about where the current task is running. The options we
 411          // have for where this current task is running are:
 412          //
 413          //  1. Our original scheduler pool
 414          //  2. Some other scheduler pool
 415          //  3. Something that isn't a scheduler pool
 416          //
 417          // In order to figure out what case we're in, this is the reason that
 418          // the `maybe_take_runtime` function exists. Using this function we can
 419          // dynamically check to see which of these cases is the current
 420          // situation and then dispatch accordingly.
 421          //
 422          // In case 1, we just use the local scheduler to resume ourselves
 423          // immediately (if a rescheduling is possible).
 424          //
 425          // In case 2 and 3, we need to remotely reawaken ourself in order to be
 426          // transplanted back to the correct scheduler pool.
 427          match running_task.maybe_take_runtime::<GreenTask>() {
 428              Some(mut running_green_task) => {
 429                  running_green_task.put_task(running_task);
 430                  let sched = running_green_task.sched.take_unwrap();
 431  
 432                  if sched.pool_id == self.pool_id {
 433                      sched.run_task(running_green_task, self);
 434                  } else {
 435                      self.reawaken_remotely();
 436  
 437                      // put that thing back where it came from!
 438                      running_green_task.put_with_sched(sched);
 439                  }
 440              }
 441              None => {
 442                  self.reawaken_remotely();
 443                  Local::put(running_task);
 444              }
 445          }
 446      }
 447  
 448      fn spawn_sibling(mut ~self,
 449                       cur_taskBox<Task>,
 450                       optsTaskOpts,
 451                       fproc():Send) {
 452          self.put_task(cur_task);
 453  
 454          // Spawns a task into the current scheduler. We allocate the new task's
 455          // stack from the scheduler's stack pool, and then configure it
 456          // accordingly to `opts`. Afterwards we bootstrap it immediately by
 457          // switching to it.
 458          //
 459          // Upon returning, our task is back in TLS and we're good to return.
 460          let mut sched = self.sched.take_unwrap();
 461          let sibling = GreenTask::configure(&mut sched.stack_pool, opts, f);
 462          sched.run_task(self, sibling)
 463      }
 464  
 465      // Local I/O is provided by the scheduler's event loop
 466      fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
 467          match self.sched.get_mut_ref().event_loop.io() {
 468              Some(io) => Some(rtio::LocalIo::new(io)),
 469              None => None,
 470          }
 471      }
 472  
 473      fn stack_bounds(&self) -> (uint, uint) {
 474          let c = self.coroutine.as_ref()
 475              .expect("GreenTask.stack_bounds called without a coroutine");
 476  
 477          // Don't return the red zone as part of the usable stack of this task,
 478          // it's essentially an implementation detail.
 479          (c.current_stack_segment.start() as uint + stack::RED_ZONE,
 480           c.current_stack_segment.end() as uint)
 481      }
 482  
 483      fn can_block(&self) -> bool { false }
 484  
 485      fn wrap(~self) -> Box<Any> { self as Box<Any> }
 486  }
 487  
 488  #[cfg(test)]
 489  mod tests {
 490      use std::rt::local::Local;
 491      use std::rt::task::Task;
 492      use std::task;
 493      use std::task::TaskOpts;
 494  
 495      use super::super::{PoolConfig, SchedPool};
 496      use super::GreenTask;
 497  
 498      fn spawn_opts(opts: TaskOpts, f: proc():Send) {
 499          let mut pool = SchedPool::new(PoolConfig {
 500              threads: 1,
 501              event_loop_factory: ::rustuv::event_loop,
 502          });
 503          pool.spawn(opts, f);
 504          pool.shutdown();
 505      }
 506  
 507      #[test]
 508      fn smoke() {
 509          let (tx, rx) = channel();
 510          spawn_opts(TaskOpts::new(), proc() {
 511              tx.send(());
 512          });
 513          rx.recv();
 514      }
 515  
 516      #[test]
 517      fn smoke_fail() {
 518          let (tx, rx) = channel::<int>();
 519          spawn_opts(TaskOpts::new(), proc() {
 520              let _tx = tx;
 521              fail!()
 522          });
 523          assert_eq!(rx.recv_opt(), Err(()));
 524      }
 525  
 526      #[test]
 527      fn smoke_opts() {
 528          let mut opts = TaskOpts::new();
 529          opts.name = Some("test".into_maybe_owned());
 530          opts.stack_size = Some(20 * 4096);
 531          let (tx, rx) = channel();
 532          opts.notify_chan = Some(tx);
 533          spawn_opts(opts, proc() {});
 534          assert!(rx.recv().is_ok());
 535      }
 536  
 537      #[test]
 538      fn smoke_opts_fail() {
 539          let mut opts = TaskOpts::new();
 540          let (tx, rx) = channel();
 541          opts.notify_chan = Some(tx);
 542          spawn_opts(opts, proc() { fail!() });
 543          assert!(rx.recv().is_err());
 544      }
 545  
 546      #[test]
 547      fn yield_test() {
 548          let (tx, rx) = channel();
 549          spawn_opts(TaskOpts::new(), proc() {
 550              for _ in range(0, 10) { task::deschedule(); }
 551              tx.send(());
 552          });
 553          rx.recv();
 554      }
 555  
 556      #[test]
 557      fn spawn_children() {
 558          let (tx1, rx) = channel();
 559          spawn_opts(TaskOpts::new(), proc() {
 560              let (tx2, rx) = channel();
 561              spawn(proc() {
 562                  let (tx3, rx) = channel();
 563                  spawn(proc() {
 564                      tx3.send(());
 565                  });
 566                  rx.recv();
 567                  tx2.send(());
 568              });
 569              rx.recv();
 570              tx1.send(());
 571          });
 572          rx.recv();
 573      }
 574  
 575      #[test]
 576      fn spawn_inherits() {
 577          let (tx, rx) = channel();
 578          spawn_opts(TaskOpts::new(), proc() {
 579              spawn(proc() {
 580                  let mut task: Box<Task> = Local::take();
 581                  match task.maybe_take_runtime::<GreenTask>() {
 582                      Some(ops) => {
 583                          task.put_runtime(ops);
 584                      }
 585                      None => fail!(),
 586                  }
 587                  Local::put(task);
 588                  tx.send(());
 589              });
 590          });
 591          rx.recv();
 592      }
 593  }