(index<- )        ./libstd/task.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  /*!
  12   * Utilities for managing and scheduling tasks
  13   *
  14   * An executing Rust program consists of a collection of tasks, each with their
  15   * own stack, and sole ownership of their allocated heap data. Tasks communicate
  16   * with each other using channels (see `std::comm` for more info about how
  17   * communication works).
  18   *
  19   * Failure in one task does not propagate to any others (not to parent, not to
  20   * child).  Failure propagation is instead handled by using the channel send()
  21   * and recv() methods which will fail if the other end has hung up already.
  22   *
  23   * Task Scheduling:
  24   *
  25   * By default, every task is created with the same "flavor" as the calling task.
  26   * This flavor refers to the scheduling mode, with two possibilities currently
  27   * being 1:1 and M:N modes. Green (M:N) tasks are cooperatively scheduled and
  28   * native (1:1) tasks are scheduled by the OS kernel.
  29   *
  30   * # Example
  31   *
  32   * ```rust
  33   * spawn(proc() {
  34   *     println!("Hello, World!");
  35   * })
  36   * ```
  37   */
  38  
  39  use any::Any;
  40  use comm::{Sender, Receiver, channel};
  41  use io::Writer;
  42  use kinds::{Send, marker};
  43  use option::{None, Some, Option};
  44  use owned::Box;
  45  use result::{Result, Ok, Err};
  46  use rt::local::Local;
  47  use rt::task::Task;
  48  use str::{Str, SendStr, IntoMaybeOwned};
  49  
  50  #[cfg(test)] use any::{AnyOwnExt, AnyRefExt};
  51  #[cfg(test)] use result;
  52  #[cfg(test)] use str::StrAllocating;
  53  #[cfg(test)] use realstd::result::ResultUnwrap;
  54  
  55  /// Indicates the manner in which a task exited.
  56  ///
  57  /// A task that completes without failing is considered to exit successfully.
  58  ///
  59  /// If you wish for this result's delivery to block until all
  60  /// children tasks complete, recommend using a result future.
  61  pub type TaskResult = Result<(), Box<Any:Send>>;
  62  
  63  /// Task configuration options
  64  pub struct TaskOpts {
  65      /// Enable lifecycle notifications on the given channel
  66      pub notify_chan: Option<Sender<TaskResult>>,
  67      /// A name for the task-to-be, for identification in failure messages
  68      pub name: Option<SendStr>,
  69      /// The size of the stack for the spawned task
  70      pub stack_size: Option<uint>,
  71      /// Task-local stdout
  72      pub stdout: Option<Box<Writer:Send>>,
  73      /// Task-local stderr
  74      pub stderr: Option<Box<Writer:Send>>,
  75  }
  76  
  77  /**
  78   * The task builder type.
  79   *
  80   * Provides detailed control over the properties and behavior of new tasks.
  81   */
  82  // NB: Builders are designed to be single-use because they do stateful
  83  // things that get weird when reusing - e.g. if you create a result future
  84  // it only applies to a single task, so then you have to maintain Some
  85  // potentially tricky state to ensure that everything behaves correctly
  86  // when you try to reuse the builder to spawn a new task. We'll just
  87  // sidestep that whole issue by making builders uncopyable and making
  88  // the run function move them in.
  89  pub struct TaskBuilder {
  90      /// Options to spawn the new task with
  91      pub opts: TaskOpts,
  92      gen_body: Option<proc(v: proc():Send):Send -> proc():Send>,
  93      nocopy: Option<marker::NoCopy>,
  94  }
  95  
  96  impl TaskBuilder {
  97       /// Generate the base configuration for spawning a task, off of which more
  98       /// configuration methods can be chained.
  99      pub fn new() -> TaskBuilder {
 100          TaskBuilder {
 101              opts: TaskOpts::new(),
 102              gen_body: None,
 103              nocopy: None,
 104          }
 105      }
 106  
 107      /// Get a future representing the exit status of the task.
 108      ///
 109      /// Taking the value of the future will block until the child task
 110      /// terminates. The future result return value will be created *before* the task is
 111      /// spawned; as such, do not invoke .get() on it directly;
 112      /// rather, store it in an outer variable/list for later use.
 113      ///
 114      /// # Failure
 115      /// Fails if a future_result was already set for this task.
 116      pub fn future_result(&mut self) -> Receiver<TaskResult> {
 117          // FIXME (#3725): Once linked failure and notification are
 118          // handled in the library, I can imagine implementing this by just
 119          // registering an arbitrary number of task::on_exit handlers and
 120          // sending out messages.
 121  
 122          if self.opts.notify_chan.is_some() {
 123              fail!("Can't set multiple future_results for one task!");
 124          }
 125  
 126          // Construct the future and give it to the caller.
 127          let (tx, rx) = channel();
 128  
 129          // Reconfigure self to use a notify channel.
 130          self.opts.notify_chan = Some(tx);
 131  
 132          rx
 133      }
 134  
 135      /// Name the task-to-be. Currently the name is used for identification
 136      /// only in failure messages.
 137      pub fn named<S: IntoMaybeOwned<'static>>(mut self, nameS) -> TaskBuilder {
 138          self.opts.name = Some(name.into_maybe_owned());
 139          self
 140      }
 141  
 142      /**
 143       * Add a wrapper to the body of the spawned task.
 144       *
 145       * Before the task is spawned it is passed through a 'body generator'
 146       * function that may perform local setup operations as well as wrap
 147       * the task body in remote setup operations. With this the behavior
 148       * of tasks can be extended in simple ways.
 149       *
 150       * This function augments the current body generator with a new body
 151       * generator by applying the task body which results from the
 152       * existing body generator to the new body generator.
 153       */
 154      pub fn with_wrapper(mut self,
 155                          wrapperproc(v: proc():Send):Send -> proc():Send)
 156          -> TaskBuilder
 157      {
 158          self.gen_body = match self.gen_body.take() {
 159              Some(prev) => Some(proc(body) { wrapper(prev(body)) }),
 160              None => Some(wrapper)
 161          };
 162          self
 163      }
 164  
 165      /**
 166       * Creates and executes a new child task
 167       *
 168       * Sets up a new task with its own call stack and schedules it to run
 169       * the provided unique closure. The task has the properties and behavior
 170       * specified by the task_builder.
 171       */
 172      pub fn spawn(mut self, fproc():Send) {
 173          let gen_body = self.gen_body.take();
 174          let f = match gen_body {
 175              Some(gen) => gen(f),
 176              None => f
 177          };
 178          let tBox<Task> = Local::take();
 179          t.spawn_sibling(self.opts, f);
 180      }
 181  
 182      /**
 183       * Execute a function in another task and return either the return value
 184       * of the function or result::err.
 185       *
 186       * # Return value
 187       *
 188       * If the function executed successfully then try returns result::ok
 189       * containing the value returned by the function. If the function fails
 190       * then try returns result::err containing nil.
 191       *
 192       * # Failure
 193       * Fails if a future_result was already set for this task.
 194       */
 195      pub fn try<T:Send>(mut self, fproc():Send -> T)
 196                 -> Result<T, Box<Any:Send>> {
 197          let (tx, rx) = channel();
 198  
 199          let result = self.future_result();
 200  
 201          self.spawn(proc() {
 202              tx.send(f());
 203          });
 204  
 205          match result.recv() {
 206              Ok(())     => Ok(rx.recv()),
 207              Err(cause) => Err(cause)
 208          }
 209      }
 210  }
 211  
 212  /* Task construction */
 213  
 214  impl TaskOpts {
 215      pub fn new() -> TaskOpts {
 216          /*!
 217           * The default task options
 218           */
 219  
 220          TaskOpts {
 221              notify_chan: None,
 222              name: None,
 223              stack_size: None,
 224              stdout: None,
 225              stderr: None,
 226          }
 227      }
 228  }
 229  
 230  /* Spawn convenience functions */
 231  
 232  /// Creates and executes a new child task
 233  ///
 234  /// Sets up a new task with its own call stack and schedules it to run
 235  /// the provided unique closure.
 236  ///
 237  /// This function is equivalent to `TaskBuilder::new().spawn(f)`.
 238  pub fn spawn(f: proc():Send) {
 239      TaskBuilder::new().spawn(f)
 240  }
 241  
 242  /// Execute a function in another task and return either the return value of
 243  /// the function or an error if the task failed
 244  ///
 245  /// This is equivalent to TaskBuilder::new().try
 246  pub fn try<T:Send>(f: proc():Send -> T) -> Result<T, Box<Any:Send>> {
 247      TaskBuilder::new().try(f)
 248  }
 249  
 250  
 251  /* Lifecycle functions */
 252  
 253  /// Read the name of the current task.
 254  pub fn with_task_name<U>(blk: |Option<&str>-> U) -> U {
 255      use rt::task::Task;
 256  
 257      let task = Local::borrow(None::<Task>);
 258      match task.name {
 259          Some(ref name) => blk(Some(name.as_slice())),
 260          None => blk(None)
 261      }
 262  }
 263  
 264  pub fn deschedule() {
 265      //! Yield control to the task scheduler
 266  
 267      use rt::local::Local;
 268  
 269      // FIXME(#7544): Optimize this, since we know we won't block.
 270      let taskBox<Task> = Local::take();
 271      task.yield_now();
 272  }
 273  
 274  pub fn failing() -> bool {
 275      //! True if the running task has failed
 276      use rt::task::Task;
 277      Local::borrow(None::<Task>).unwinder.unwinding()
 278  }
 279  
 280  // The following 8 tests test the following 2^3 combinations:
 281  // {un,}linked {un,}supervised failure propagation {up,down}wards.
 282  
 283  // !!! These tests are dangerous. If Something is buggy, they will hang, !!!
 284  // !!! instead of exiting cleanly. This might wedge the buildbots.       !!!
 285  
 286  #[test]
 287  fn test_unnamed_task() {
 288      spawn(proc() {
 289          with_task_name(|name| {
 290              assert!(name.is_none());
 291          })
 292      })
 293  }
 294  
 295  #[test]
 296  fn test_owned_named_task() {
 297      TaskBuilder::new().named("ada lovelace".to_owned()).spawn(proc() {
 298          with_task_name(|name| {
 299              assert!(name.unwrap() == "ada lovelace");
 300          })
 301      })
 302  }
 303  
 304  #[test]
 305  fn test_static_named_task() {
 306      TaskBuilder::new().named("ada lovelace").spawn(proc() {
 307          with_task_name(|name| {
 308              assert!(name.unwrap() == "ada lovelace");
 309          })
 310      })
 311  }
 312  
 313  #[test]
 314  fn test_send_named_task() {
 315      TaskBuilder::new().named("ada lovelace".into_maybe_owned()).spawn(proc() {
 316          with_task_name(|name| {
 317              assert!(name.unwrap() == "ada lovelace");
 318          })
 319      })
 320  }
 321  
 322  #[test]
 323  fn test_run_basic() {
 324      let (tx, rx) = channel();
 325      TaskBuilder::new().spawn(proc() {
 326          tx.send(());
 327      });
 328      rx.recv();
 329  }
 330  
 331  #[test]
 332  fn test_with_wrapper() {
 333      let (tx, rx) = channel();
 334      TaskBuilder::new().with_wrapper(proc(body) {
 335          let result: proc():Send = proc() {
 336              body();
 337              tx.send(());
 338          };
 339          result
 340      }).spawn(proc() { });
 341      rx.recv();
 342  }
 343  
 344  #[test]
 345  fn test_future_result() {
 346      let mut builder = TaskBuilder::new();
 347      let result = builder.future_result();
 348      builder.spawn(proc() {});
 349      assert!(result.recv().is_ok());
 350  
 351      let mut builder = TaskBuilder::new();
 352      let result = builder.future_result();
 353      builder.spawn(proc() {
 354          fail!();
 355      });
 356      assert!(result.recv().is_err());
 357  }
 358  
 359  #[test] #[should_fail]
 360  fn test_back_to_the_future_result() {
 361      let mut builder = TaskBuilder::new();
 362      builder.future_result();
 363      builder.future_result();
 364  }
 365  
 366  #[test]
 367  fn test_try_success() {
 368      match try(proc() {
 369          "Success!".to_owned()
 370      }).as_ref().map(|s| s.as_slice()) {
 371          result::Ok("Success!") => (),
 372          _ => fail!()
 373      }
 374  }
 375  
 376  #[test]
 377  fn test_try_fail() {
 378      match try(proc() {
 379          fail!()
 380      }) {
 381          result::Err(_) => (),
 382          result::Ok(()) => fail!()
 383      }
 384  }
 385  
 386  #[test]
 387  fn test_spawn_sched() {
 388      use clone::Clone;
 389  
 390      let (tx, rx) = channel();
 391  
 392      fn f(i: int, tx: Sender<()>) {
 393          let tx = tx.clone();
 394          spawn(proc() {
 395              if i == 0 {
 396                  tx.send(());
 397              } else {
 398                  f(i - 1, tx);
 399              }
 400          });
 401  
 402      }
 403      f(10, tx);
 404      rx.recv();
 405  }
 406  
 407  #[test]
 408  fn test_spawn_sched_childs_on_default_sched() {
 409      let (tx, rx) = channel();
 410  
 411      spawn(proc() {
 412          spawn(proc() {
 413              tx.send(());
 414          });
 415      });
 416  
 417      rx.recv();
 418  }
 419  
 420  #[cfg(test)]
 421  fn avoid_copying_the_body(spawnfn: |v: proc():Send|) {
 422      let (tx, rx) = channel::<uint>();
 423  
 424      let x = box 1;
 425      let x_in_parent = (&*x) as *int as uint;
 426  
 427      spawnfn(proc() {
 428          let x_in_child = (&*x) as *int as uint;
 429          tx.send(x_in_child);
 430      });
 431  
 432      let x_in_child = rx.recv();
 433      assert_eq!(x_in_parent, x_in_child);
 434  }
 435  
 436  #[test]
 437  fn test_avoid_copying_the_body_spawn() {
 438      avoid_copying_the_body(spawn);
 439  }
 440  
 441  #[test]
 442  fn test_avoid_copying_the_body_task_spawn() {
 443      avoid_copying_the_body(|f| {
 444          let builder = TaskBuilder::new();
 445          builder.spawn(proc() {
 446              f();
 447          });
 448      })
 449  }
 450  
 451  #[test]
 452  fn test_avoid_copying_the_body_try() {
 453      avoid_copying_the_body(|f| {
 454          let _ = try(proc() {
 455              f()
 456          });
 457      })
 458  }
 459  
 460  #[test]
 461  fn test_child_doesnt_ref_parent() {
 462      // If the child refcounts the parent task, this will stack overflow when
 463      // climbing the task tree to dereference each ancestor. (See #1789)
 464      // (well, it would if the constant were 8000+ - I lowered it to be more
 465      // valgrind-friendly. try this at home, instead..!)
 466      static generations: uint = 16;
 467      fn child_no(x: uint) -> proc():Send {
 468          return proc() {
 469              if x < generations {
 470                  TaskBuilder::new().spawn(child_no(x+1));
 471              }
 472          }
 473      }
 474      TaskBuilder::new().spawn(child_no(0));
 475  }
 476  
 477  #[test]
 478  fn test_simple_newsched_spawn() {
 479      spawn(proc()())
 480  }
 481  
 482  #[test]
 483  fn test_try_fail_message_static_str() {
 484      match try(proc() {
 485          fail!("static string");
 486      }) {
 487          Err(e) => {
 488              type T = &'static str;
 489              assert!(e.is::<T>());
 490              assert_eq!(*e.move::<T>().unwrap(), "static string");
 491          }
 492          Ok(()) => fail!()
 493      }
 494  }
 495  
 496  #[test]
 497  fn test_try_fail_message_owned_str() {
 498      match try(proc() {
 499          fail!("owned string".to_owned());
 500      }) {
 501          Err(e) => {
 502              type T = ~str;
 503              assert!(e.is::<T>());
 504              assert_eq!(*e.move::<T>().unwrap(), "owned string".to_owned());
 505          }
 506          Ok(()) => fail!()
 507      }
 508  }
 509  
 510  #[test]
 511  fn test_try_fail_message_any() {
 512      match try(proc() {
 513          fail!(box 413u16 as Box<Any:Send>);
 514      }) {
 515          Err(e) => {
 516              type T = Box<Any:Send>;
 517              assert!(e.is::<T>());
 518              let any = e.move::<T>().unwrap();
 519              assert!(any.is::<u16>());
 520              assert_eq!(*any.move::<u16>().unwrap(), 413u16);
 521          }
 522          Ok(()) => fail!()
 523      }
 524  }
 525  
 526  #[test]
 527  fn test_try_fail_message_unit_struct() {
 528      struct Juju;
 529  
 530      match try(proc() {
 531          fail!(Juju)
 532      }) {
 533          Err(ref e) if e.is::<Juju>() => {}
 534          Err(_) | Ok(()) => fail!()
 535      }
 536  }


libstd/task.rs:63:31-63:31 -struct- definition:
/// Task configuration options
pub struct TaskOpts {
    /// Enable lifecycle notifications on the given channel
references:- 6
220:         TaskOpts {
221:             notify_chan: None,
libstd/rt/mod.rs:
164:                      cur_task: Box<Task>,
165:                      opts: TaskOpts,
166:                      f: proc():Send);
libstd/rt/task.rs:
240:     /// the `opts` structure and will run `f` as the body of its code.
241:     pub fn spawn_sibling(mut ~self, opts: TaskOpts, f: proc():Send) {
242:         let ops = self.imp.take_unwrap();
libstd/task.rs:
90:     /// Options to spawn the new task with
91:     pub opts: TaskOpts,
92:     gen_body: Option<proc(v: proc():Send):Send -> proc():Send>,


libstd/task.rs:60:62-60:62 -NK_AS_STR_TODO- definition:
/// children tasks complete, recommend using a result future.
pub type TaskResult = Result<(), Box<Any:Send>>;
/// Task configuration options
references:- 6
115:     /// Fails if a future_result was already set for this task.
116:     pub fn future_result(&mut self) -> Receiver<TaskResult> {
117:         // FIXME (#3725): Once linked failure and notification are
libstd/rt/task.rs:
74:     /// until all its watched children exit before collecting the status.
75:     Execute(proc(TaskResult):Send),
76:     /// A channel to send the result of the task on when the task exits
77:     SendMessage(Sender<TaskResult>),
78: }
libstd/rt/unwind.rs:
164:     pub fn result(&mut self) -> TaskResult {
165:         if self.unwinding {
libstd/rt/task.rs:
391:     /// Collect failure exit codes from children and propagate them to a parent.
392:     pub fn collect_failure(&mut self, result: TaskResult) {
393:         match self.on_exit.take() {


libstd/task.rs:88:34-88:34 -struct- definition:
// the run function move them in.
pub struct TaskBuilder {
    /// Options to spawn the new task with
references:- 5
99:     pub fn new() -> TaskBuilder {
100:         TaskBuilder {
101:             opts: TaskOpts::new(),
--
155:                         wrapper: proc(v: proc():Send):Send -> proc():Send)
156:         -> TaskBuilder
157:     {