(index<- )        ./libstd/rt/task.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! Language-level runtime services that should reasonably expected
  12  //! to be available 'everywhere'. Local heaps, GC, unwinding,
  13  //! local storage, and logging. Even a 'freestanding' Rust would likely want
  14  //! to implement this.
  15  
  16  use any::AnyOwnExt;
  17  use cast;
  18  use cleanup;
  19  use clone::Clone;
  20  use comm::Sender;
  21  use io::Writer;
  22  use iter::{Iterator, Take};
  23  use kinds::Send;
  24  use local_data;
  25  use ops::Drop;
  26  use option::{Option, Some, None};
  27  use owned::Box;
  28  use prelude::drop;
  29  use result::{Result, Ok, Err};
  30  use rt::Runtime;
  31  use rt::local::Local;
  32  use rt::local_heap::LocalHeap;
  33  use rt::rtio::LocalIo;
  34  use rt::unwind::Unwinder;
  35  use str::SendStr;
  36  use sync::arc::UnsafeArc;
  37  use sync::atomics::{AtomicUint, SeqCst};
  38  use task::{TaskResult, TaskOpts};
  39  use unstable::finally::Finally;
  40  
  41  /// The Task struct represents all state associated with a rust
  42  /// task. There are at this point two primary "subtypes" of task,
  43  /// however instead of using a subtype we just have a "task_type" field
  44  /// in the struct. This contains a pointer to another struct that holds
  45  /// the type-specific state.
  46  pub struct Task {
  47      pub heap: LocalHeap,
  48      pub gc: GarbageCollector,
  49      pub storage: LocalStorage,
  50      pub unwinder: Unwinder,
  51      pub death: Death,
  52      pub destroyed: bool,
  53      pub name: Option<SendStr>,
  54  
  55      pub stdout: Option<Box<Writer:Send>>,
  56      pub stderr: Option<Box<Writer:Send>>,
  57  
  58      imp: Option<Box<Runtime:Send>>,
  59  }
  60  
  61  pub struct GarbageCollector;
  62  pub struct LocalStorage(pub Option<local_data::Map>);
  63  
  64  /// A handle to a blocked task. Usually this means having the Box<Task>
  65  /// pointer by ownership, but if the task is killable, a killer can steal it
  66  /// at any time.
  67  pub enum BlockedTask {
  68      Owned(Box<Task>),
  69      Shared(UnsafeArc<AtomicUint>),
  70  }
  71  
  72  pub enum DeathAction {
  73      /// Action to be done with the exit code. If set, also makes the task wait
  74      /// until all its watched children exit before collecting the status.
  75      Execute(proc(TaskResult):Send),
  76      /// A channel to send the result of the task on when the task exits
  77      SendMessage(Sender<TaskResult>),
  78  }
  79  
  80  /// Per-task state related to task death, killing, failure, etc.
  81  pub struct Death {
  82      pub on_exit: Option<DeathAction>,
  83  }
  84  
  85  pub struct BlockedTasks {
  86      inner: UnsafeArc<AtomicUint>,
  87  }
  88  
  89  impl Task {
  90      pub fn new() -> Task {
  91          Task {
  92              heap: LocalHeap::new(),
  93              gc: GarbageCollector,
  94              storage: LocalStorage(None),
  95              unwinder: Unwinder::new(),
  96              death: Death::new(),
  97              destroyed: false,
  98              name: None,
  99              stdout: None,
 100              stderr: None,
 101              imp: None,
 102          }
 103      }
 104  
 105      /// Executes the given closure as if it's running inside this task. The task
 106      /// is consumed upon entry, and the destroyed task is returned from this
 107      /// function in order for the caller to free. This function is guaranteed to
 108      /// not unwind because the closure specified is run inside of a `rust_try`
 109      /// block. (this is the only try/catch block in the world).
 110      ///
 111      /// This function is *not* meant to be abused as a "try/catch" block. This
 112      /// is meant to be used at the absolute boundaries of a task's lifetime, and
 113      /// only for that purpose.
 114      pub fn run(~self, mut f||) -> Box<Task> {
 115          // Need to put ourselves into TLS, but also need access to the unwinder.
 116          // Unsafely get a handle to the task so we can continue to use it after
 117          // putting it in tls (so we can invoke the unwinder).
 118          let handle*mut Task = unsafe {
 119              *cast::transmute::<&Box<Task>, &*mut Task>(&self)
 120          };
 121          Local::put(self);
 122  
 123          // The only try/catch block in the world. Attempt to run the task's
 124          // client-specified code and catch any failures.
 125          let try_block = || {
 126  
 127              // Run the task main function, then do some cleanup.
 128              f.finally(|| {
 129                  #[allow(unused_must_use)]
 130                  fn close_outputs() {
 131                      let mut task = Local::borrow(None::<Task>);
 132                      let stderr = task.stderr.take();
 133                      let stdout = task.stdout.take();
 134                      drop(task);
 135                      match stdout { Some(mut w) => { w.flush(); }, None => {} }
 136                      match stderr { Some(mut w) => { w.flush(); }, None => {} }
 137                  }
 138  
 139                  // First, flush/destroy the user stdout/logger because these
 140                  // destructors can run arbitrary code.
 141                  close_outputs();
 142  
 143                  // First, destroy task-local storage. This may run user dtors.
 144                  //
 145                  // FIXME #8302: Dear diary. I'm so tired and confused.
 146                  // There's some interaction in rustc between the box
 147                  // annihilator and the TLS dtor by which TLS is
 148                  // accessed from annihilated box dtors *after* TLS is
 149                  // destroyed. Somehow setting TLS back to null, as the
 150                  // old runtime did, makes this work, but I don't currently
 151                  // understand how. I would expect that, if the annihilator
 152                  // reinvokes TLS while TLS is uninitialized, that
 153                  // TLS would be reinitialized but never destroyed,
 154                  // but somehow this works. I have no idea what's going
 155                  // on but this seems to make things magically work. FML.
 156                  //
 157                  // (added after initial comment) A possible interaction here is
 158                  // that the destructors for the objects in TLS themselves invoke
 159                  // TLS, or possibly some destructors for those objects being
 160                  // annihilated invoke TLS. Sadly these two operations seemed to
 161                  // be intertwined, and miraculously work for now...
 162                  let mut task = Local::borrow(None::<Task>);
 163                  let storage_map = {
 164                      let &LocalStorage(ref mut optmap) = &mut task.storage;
 165                      optmap.take()
 166                  };
 167                  drop(task);
 168                  drop(storage_map);
 169  
 170                  // Destroy remaining boxes. Also may run user dtors.
 171                  unsafe { cleanup::annihilate(); }
 172  
 173                  // Finally, just in case user dtors printed/logged during TLS
 174                  // cleanup and annihilation, re-destroy stdout and the logger.
 175                  // Note that these will have been initialized with a
 176                  // runtime-provided type which we have control over what the
 177                  // destructor does.
 178                  close_outputs();
 179              })
 180          };
 181  
 182          unsafe { (*handle).unwinder.try(try_block); }
 183  
 184          // Here we must unsafely borrow the task in order to not remove it from
 185          // TLS. When collecting failure, we may attempt to send on a channel (or
 186          // just run aribitrary code), so we must be sure to still have a local
 187          // task in TLS.
 188          unsafe {
 189              let me*mut Task = Local::unsafe_borrow();
 190              (*me).death.collect_failure((*me).unwinder.result());
 191          }
 192          let mut meBox<Task> = Local::take();
 193          me.destroyed = true;
 194          return me;
 195      }
 196  
 197      /// Inserts a runtime object into this task, transferring ownership to the
 198      /// task. It is illegal to replace a previous runtime object in this task
 199      /// with this argument.
 200      pub fn put_runtime(&mut self, opsBox<Runtime:Send>) {
 201          assert!(self.imp.is_none());
 202          self.imp = Some(ops);
 203      }
 204  
 205      /// Attempts to extract the runtime as a specific type. If the runtime does
 206      /// not have the provided type, then the runtime is not removed. If the
 207      /// runtime does have the specified type, then it is removed and returned
 208      /// (transfer of ownership).
 209      ///
 210      /// It is recommended to only use this method when *absolutely necessary*.
 211      /// This function may not be available in the future.
 212      pub fn maybe_take_runtime<T: 'static>(&mut self) -> Option<Box<T>> {
 213          // This is a terrible, terrible function. The general idea here is to
 214          // take the runtime, cast it to Box<Any>, check if it has the right
 215          // type, and then re-cast it back if necessary. The method of doing
 216          // this is pretty sketchy and involves shuffling vtables of trait
 217          // objects around, but it gets the job done.
 218          //
 219          // FIXME: This function is a serious code smell and should be avoided at
 220          //      all costs. I have yet to think of a method to avoid this
 221          //      function, and I would be saddened if more usage of the function
 222          //      crops up.
 223          unsafe {
 224              let imp = self.imp.take_unwrap();
 225              let &(vtable, _)&(uint, uint) = cast::transmute(&imp);
 226              match imp.wrap().move::<T>() {
 227                  Ok(t) => Some(t),
 228                  Err(t) => {
 229                      let (_, obj)(uint, uint) = cast::transmute(t);
 230                      let objBox<Runtime:Send> =
 231                          cast::transmute((vtable, obj));
 232                      self.put_runtime(obj);
 233                      None
 234                  }
 235              }
 236          }
 237      }
 238  
 239      /// Spawns a sibling to this task. The newly spawned task is configured with
 240      /// the `opts` structure and will run `f` as the body of its code.
 241      pub fn spawn_sibling(mut ~self, optsTaskOpts, fproc():Send) {
 242          let ops = self.imp.take_unwrap();
 243          ops.spawn_sibling(self, opts, f)
 244      }
 245  
 246      /// Deschedules the current task, invoking `f` `amt` times. It is not
 247      /// recommended to use this function directly, but rather communication
 248      /// primitives in `std::comm` should be used.
 249      pub fn deschedule(mut ~self, amtuint,
 250                        f|BlockedTask-> Result<(), BlockedTask>) {
 251          let ops = self.imp.take_unwrap();
 252          ops.deschedule(amt, self, f)
 253      }
 254  
 255      /// Wakes up a previously blocked task, optionally specifying whether the
 256      /// current task can accept a change in scheduling. This function can only
 257      /// be called on tasks that were previously blocked in `deschedule`.
 258      pub fn reawaken(mut ~self) {
 259          let ops = self.imp.take_unwrap();
 260          ops.reawaken(self);
 261      }
 262  
 263      /// Yields control of this task to another task. This function will
 264      /// eventually return, but possibly not immediately. This is used as an
 265      /// opportunity to allow other tasks a chance to run.
 266      pub fn yield_now(mut ~self) {
 267          let ops = self.imp.take_unwrap();
 268          ops.yield_now(self);
 269      }
 270  
 271      /// Similar to `yield_now`, except that this function may immediately return
 272      /// without yielding (depending on what the runtime decides to do).
 273      pub fn maybe_yield(mut ~self) {
 274          let ops = self.imp.take_unwrap();
 275          ops.maybe_yield(self);
 276      }
 277  
 278      /// Acquires a handle to the I/O factory that this task contains, normally
 279      /// stored in the task's runtime. This factory may not always be available,
 280      /// which is why the return type is `Option`
 281      pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> {
 282          self.imp.get_mut_ref().local_io()
 283      }
 284  
 285      /// Returns the stack bounds for this task in (lo, hi) format. The stack
 286      /// bounds may not be known for all tasks, so the return value may be
 287      /// `None`.
 288      pub fn stack_bounds(&self) -> (uint, uint) {
 289          self.imp.get_ref().stack_bounds()
 290      }
 291  
 292      /// Returns whether it is legal for this task to block the OS thread that it
 293      /// is running on.
 294      pub fn can_block(&self) -> bool {
 295          self.imp.get_ref().can_block()
 296      }
 297  }
 298  
 299  impl Drop for Task {
 300      fn drop(&mut self) {
 301          rtdebug!("called drop for a task: {}", self as *mut Task as uint);
 302          rtassert!(self.destroyed);
 303      }
 304  }
 305  
 306  impl Iterator<BlockedTask> for BlockedTasks {
 307      fn next(&mut self) -> Option<BlockedTask> {
 308          Some(Shared(self.inner.clone()))
 309      }
 310  }
 311  
 312  impl BlockedTask {
 313      /// Returns Some if the task was successfully woken; None if already killed.
 314      pub fn wake(self) -> Option<Box<Task>> {
 315          match self {
 316              Owned(task) => Some(task),
 317              Shared(arc) => unsafe {
 318                  match (*arc.get()).swap(0, SeqCst) {
 319                      0 => None,
 320                      n => Some(cast::transmute(n)),
 321                  }
 322              }
 323          }
 324      }
 325  
 326      /// Reawakens this task if ownership is acquired. If finer-grained control
 327      /// is desired, use `wake` instead.
 328      pub fn reawaken(self) {
 329          self.wake().map(|t| t.reawaken());
 330      }
 331  
 332      // This assertion has two flavours because the wake involves an atomic op.
 333      // In the faster version, destructors will fail dramatically instead.
 334      #[cfg(not(test))] pub fn trash(self) { }
 335      #[cfg(test)]      pub fn trash(self) { assert!(self.wake().is_none()); }
 336  
 337      /// Create a blocked task, unless the task was already killed.
 338      pub fn block(taskBox<Task>) -> BlockedTask {
 339          Owned(task)
 340      }
 341  
 342      /// Converts one blocked task handle to a list of many handles to the same.
 343      pub fn make_selectable(self, num_handlesuint) -> Take<BlockedTasks> {
 344          let arc = match self {
 345              Owned(task) => {
 346                  let flag = unsafe { AtomicUint::new(cast::transmute(task)) };
 347                  UnsafeArc::new(flag)
 348              }
 349              Shared(arc) => arc.clone(),
 350          };
 351          BlockedTasks{ inner: arc }.take(num_handles)
 352      }
 353  
 354      /// Convert to an unsafe uint value. Useful for storing in a pipe's state
 355      /// flag.
 356      #[inline]
 357      pub unsafe fn cast_to_uint(self) -> uint {
 358          match self {
 359              Owned(task) => {
 360                  let blocked_task_ptruint = cast::transmute(task);
 361                  rtassert!(blocked_task_ptr & 0x1 == 0);
 362                  blocked_task_ptr
 363              }
 364              Shared(arc) => {
 365                  let blocked_task_ptruint = cast::transmute(box arc);
 366                  rtassert!(blocked_task_ptr & 0x1 == 0);
 367                  blocked_task_ptr | 0x1
 368              }
 369          }
 370      }
 371  
 372      /// Convert from an unsafe uint value. Useful for retrieving a pipe's state
 373      /// flag.
 374      #[inline]
 375      pub unsafe fn cast_from_uint(blocked_task_ptruint) -> BlockedTask {
 376          if blocked_task_ptr & 0x1 == 0 {
 377              Owned(cast::transmute(blocked_task_ptr))
 378          } else {
 379              let ptrBox<UnsafeArc<AtomicUint>> =
 380                  cast::transmute(blocked_task_ptr & !1);
 381              Shared(*ptr)
 382          }
 383      }
 384  }
 385  
 386  impl Death {
 387      pub fn new() -> Death {
 388          Death { on_exit: None, }
 389      }
 390  
 391      /// Collect failure exit codes from children and propagate them to a parent.
 392      pub fn collect_failure(&mut self, resultTaskResult) {
 393          match self.on_exit.take() {
 394              Some(Execute(f)) => f(result),
 395              Some(SendMessage(ch)) => { let _ = ch.send_opt(result); }
 396              None => {}
 397          }
 398      }
 399  }
 400  
 401  impl Drop for Death {
 402      fn drop(&mut self) {
 403          // make this type noncopyable
 404      }
 405  }
 406  
 407  #[cfg(test)]
 408  mod test {
 409      use super::*;
 410      use prelude::*;
 411      use task;
 412  
 413      #[test]
 414      fn local_heap() {
 415          let a = @5;
 416          let b = a;
 417          assert!(*a == 5);
 418          assert!(*b == 5);
 419      }
 420  
 421      #[test]
 422      fn tls() {
 423          local_data_key!(key: @~str)
 424          key.replace(Some(@"data".to_owned()));
 425          assert_eq!(key.get().unwrap().as_slice(), "data");
 426          local_data_key!(key2: @~str)
 427          key2.replace(Some(@"data".to_owned()));
 428          assert_eq!(key2.get().unwrap().as_slice(), "data");
 429      }
 430  
 431      #[test]
 432      fn unwind() {
 433          let result = task::try(proc()());
 434          rtdebug!("trying first assert");
 435          assert!(result.is_ok());
 436          let result = task::try::<()>(proc() fail!());
 437          rtdebug!("trying second assert");
 438          assert!(result.is_err());
 439      }
 440  
 441      #[test]
 442      fn rng() {
 443          use rand::{StdRng, Rng};
 444          let mut r = StdRng::new().ok().unwrap();
 445          let _ = r.next_u32();
 446      }
 447  
 448      #[test]
 449      fn logging() {
 450          info!("here i am. logging in a newsched task");
 451      }
 452  
 453      #[test]
 454      fn comm_stream() {
 455          let (tx, rx) = channel();
 456          tx.send(10);
 457          assert!(rx.recv() == 10);
 458      }
 459  
 460      #[test]
 461      fn comm_shared_chan() {
 462          let (tx, rx) = channel();
 463          tx.send(10);
 464          assert!(rx.recv() == 10);
 465      }
 466  
 467      #[test]
 468      fn heap_cycles() {
 469          use cell::RefCell;
 470          use option::{Option, Some, None};
 471  
 472          struct List {
 473              next: Option<@RefCell<List>>,
 474          }
 475  
 476          let a = @RefCell::new(List { next: None });
 477          let b = @RefCell::new(List { next: Some(a) });
 478  
 479          {
 480              let mut a = a.borrow_mut();
 481              a.next = Some(b);
 482          }
 483      }
 484  
 485      #[test]
 486      #[should_fail]
 487      fn test_begin_unwind() {
 488          use rt::unwind::begin_unwind;
 489          begin_unwind("cause", file!(), line!())
 490      }
 491  
 492      // Task blocking tests
 493  
 494      #[test]
 495      fn block_and_wake() {
 496          let task = box Task::new();
 497          let mut task = BlockedTask::block(task).wake().unwrap();
 498          task.destroyed = true;
 499      }
 500  }


libstd/rt/task.rs:66:17-66:17 -enum- definition:
/// at any time.
pub enum BlockedTask {
    Owned(Box<Task>),
references:- 39
libstd/comm/mod.rs:
libstd/comm/select.rs:
libstd/comm/oneshot.rs:
libstd/comm/stream.rs:
libstd/comm/shared.rs:
libstd/comm/sync.rs:
libstd/rt/mod.rs:
libstd/rt/task.rs:


libstd/rt/task.rs:130:16-130:16 -fn- definition:
                fn close_outputs() {
                    let mut task = Local::borrow(None::<Task>);
                    let stderr = task.stderr.take();
references:- 2
177:                 // destructor does.
178:                 close_outputs();
179:             })


libstd/rt/task.rs:45:29-45:29 -struct- definition:
/// the type-specific state.
pub struct Task {
    pub heap: LocalHeap,
references:- 57
libstd/rt/rtio.rs:
libstd/rt/local.rs:
libstd/rt/local_heap.rs:
libstd/rt/unwind.rs:
libstd/rt/stack.rs:
libstd/task.rs:
libstd/comm/mod.rs:
libstd/comm/select.rs:
libstd/comm/oneshot.rs:
libstd/comm/stream.rs:
libstd/comm/shared.rs:
libstd/comm/sync.rs:
libstd/local_data.rs:
libstd/io/stdio.rs:
libstd/rt/mod.rs:


libstd/rt/task.rs:84:1-84:1 -struct- definition:
pub struct BlockedTasks {
    inner: UnsafeArc<AtomicUint>,
}
references:- 3
342:     /// Converts one blocked task handle to a list of many handles to the same.
343:     pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTasks> {
344:         let arc = match self {
--
350:         };
351:         BlockedTasks{ inner: arc }.take(num_handles)
352:     }


libstd/rt/task.rs:80:65-80:65 -struct- definition:
/// Per-task state related to task death, killing, failure, etc.
pub struct Death {
    pub on_exit: Option<DeathAction>,
references:- 5
401: impl Drop for Death {
402:     fn drop(&mut self) {