(index<- )        ./libstd/rt/sched.rs

    1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
    2  // file at the top-level directory of this distribution and at
    3  // http://rust-lang.org/COPYRIGHT.
    4  //
    5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
    6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
    7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
    8  // option. This file may not be copied, modified, or distributed
    9  // except according to those terms.
   10  
   11  use either::{Left, Right};
   12  use option::{Option, Some, None};
   13  use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
   14  use clone::Clone;
   15  use unstable::raw;
   16  use super::sleeper_list::SleeperList;
   17  use super::work_queue::WorkQueue;
   18  use super::stack::{StackPool};
   19  use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject};
   20  use super::context::Context;
   21  use super::task::{Task, AnySched, Sched};
   22  use super::message_queue::MessageQueue;
   23  use rt::kill::BlockedTask;
   24  use rt::local_ptr;
   25  use rt::local::Local;
   26  use rt::rtio::{RemoteCallback, PausibleIdleCallback};
   27  use borrow::{to_uint};
   28  use cell::Cell;
   29  use rand::{XorShiftRng, Rng, Rand};
   30  use iter::range;
   31  use vec::{OwnedVector};
   32  
   33  /// A scheduler is responsible for coordinating the execution of Tasks
   34  /// on a single thread. The scheduler runs inside a slightly modified
   35  /// Rust Task. When not running this task is stored in the scheduler
   36  /// struct. The scheduler struct acts like a baton, all scheduling
   37  /// actions are transfers of the baton.
   38  ///
   39  /// XXX: This creates too many callbacks to run_sched_once, resulting
   40  /// in too much allocation and too many events.
   41  pub struct Scheduler {
   42      /// There are N work queues, one per scheduler.
   43      priv work_queue: WorkQueue<~Task>,
   44      /// Work queues for the other schedulers. These are created by
   45      /// cloning the core work queues.
   46      work_queues: ~[WorkQueue<~Task>],
   47      /// The queue of incoming messages from other schedulers.
   48      /// These are enqueued by SchedHandles after which a remote callback
   49      /// is triggered to handle the message.
   50      priv message_queue: MessageQueue<SchedMessage>,
   51      /// A shared list of sleeping schedulers. We'll use this to wake
   52      /// up schedulers when pushing work onto the work queue.
   53      sleeper_list: SleeperList,
   54      /// Indicates that we have previously pushed a handle onto the
   55      /// SleeperList but have not yet received the Wake message.
   56      /// Being `true` does not necessarily mean that the scheduler is
   57      /// not active since there are multiple event sources that may
   58      /// wake the scheduler. It just prevents the scheduler from pushing
   59      /// multiple handles onto the sleeper list.
   60      priv sleepy: bool,
   61      /// A flag to indicate we've received the shutdown message and should
   62      /// no longer try to go to sleep, but exit instead.
   63      no_sleep: bool,
   64      stack_pool: StackPool,
   65      /// The event loop used to drive the scheduler and perform I/O
   66      event_loop: ~EventLoopObject,
   67      /// The scheduler runs on a special task. When it is not running
   68      /// it is stored here instead of the work queue.
   69      sched_task: Option<~Task>,
   70      /// An action performed after a context switch on behalf of the
   71      /// code running before the context switch
   72      cleanup_job: Option<CleanupJob>,
   73      /// Should this scheduler run any task, or only pinned tasks?
   74      run_anything: bool,
   75      /// If the scheduler shouldn't run some tasks, a friend to send
   76      /// them to.
   77      friend_handle: Option<SchedHandle>,
   78      /// A fast XorShift rng for scheduler use
   79      rng: XorShiftRng,
   80      /// A toggleable idle callback
   81      idle_callback: Option<~PausibleIdleCallback>,
   82      /// A countdown that starts at a random value and is decremented
   83      /// every time a yield check is performed. When it hits 0 a task
   84      /// will yield.
   85      yield_check_count: uint,
   86      /// A flag to tell the scheduler loop it needs to do some stealing
   87      /// in order to introduce randomness as part of a yield
   88      steal_for_yield: bool
   89  }
   90  
   91  /// An indication of how hard to work on a given operation, the difference
   92  /// mainly being whether memory is synchronized or not
   93  #[deriving(Eq)]
   94  enum EffortLevel {
   95      DontTryTooHard,
   96      GiveItYourBest
   97  }
   98  
   99  static MAX_YIELD_CHECKS: uint = 200;
  100  
  101  fn reset_yield_check(rng&mut XorShiftRng) -> uint {
  102      let ruint = Rand::rand(rng);
  103      r % MAX_YIELD_CHECKS + 1
  104  }
  105  
  106  impl Scheduler {
  107  
  108      // * Initialization Functions
  109  
  110      pub fn new(event_loop~EventLoopObject,
  111                 work_queueWorkQueue<~Task>,
  112                 work_queues~[WorkQueue<~Task>],
  113                 sleeper_listSleeperList)
  114          -> Scheduler {
  115  
  116          Scheduler::new_special(event_loop, work_queue,
  117                                 work_queues,
  118                                 sleeper_list, true, None)
  119  
  120      }
  121  
  122      pub fn new_special(event_loop~EventLoopObject,
  123                         work_queueWorkQueue<~Task>,
  124                         work_queues~[WorkQueue<~Task>],
  125                         sleeper_listSleeperList,
  126                         run_anythingbool,
  127                         friendOption<SchedHandle>)
  128          -> Scheduler {
  129  
  130          let mut sched = Scheduler {
  131              sleeper_list: sleeper_list,
  132              message_queue: MessageQueue::new(),
  133              sleepy: false,
  134              no_sleep: false,
  135              event_loop: event_loop,
  136              work_queue: work_queue,
  137              work_queues: work_queues,
  138              stack_pool: StackPool::new(),
  139              sched_task: None,
  140              cleanup_job: None,
  141              run_anything: run_anything,
  142              friend_handle: friend,
  143              rng: new_sched_rng(),
  144              idle_callback: None,
  145              yield_check_count: 0,
  146              steal_for_yield: false
  147          };
  148  
  149          sched.yield_check_count = reset_yield_check(&mut sched.rng);
  150  
  151          return sched;
  152      }
  153  
  154      // XXX: This may eventually need to be refactored so that
  155      // the scheduler itself doesn't have to call event_loop.run.
  156      // That will be important for embedding the runtime into external
  157      // event loops.
  158  
  159      // Take a main task to run, and a scheduler to run it in. Create a
  160      // scheduler task and bootstrap into it.
  161      pub fn bootstrap(~self, task~Task) {
  162  
  163          let mut this = self;
  164  
  165          // Build an Idle callback.
  166          this.idle_callback = Some(this.event_loop.pausible_idle_callback());
  167  
  168          // Initialize the TLS key.
  169          local_ptr::init_tls_key();
  170  
  171          // Create a task for the scheduler with an empty context.
  172          let sched_task = ~Task::new_sched_task();
  173  
  174          // Now that we have an empty task struct for the scheduler
  175          // task, put it in TLS.
  176          Local::put::(sched_task);
  177  
  178          // Before starting our first task, make sure the idle callback
  179          // is active. As we do not start in the sleep state this is
  180          // important.
  181          this.idle_callback.get_mut_ref().start(Scheduler::run_sched_once);
  182  
  183          // Now, as far as all the scheduler state is concerned, we are
  184          // inside the "scheduler" context. So we can act like the
  185          // scheduler and resume the provided task.
  186          this.resume_task_immediately(task);
  187  
  188          // Now we are back in the scheduler context, having
  189          // successfully run the input task. Start by running the
  190          // scheduler. Grab it out of TLS - performing the scheduler
  191          // action will have given it away.
  192          let sched~Scheduler = Local::take();
  193  
  194          rtdebug!("starting scheduler {}", sched.sched_id());
  195          sched.run();
  196  
  197          // Close the idle callback.
  198          let mut sched~Scheduler = Local::take();
  199          sched.idle_callback.get_mut_ref().close();
  200          // Make one go through the loop to run the close callback.
  201          sched.run();
  202  
  203          // Now that we are done with the scheduler, clean up the
  204          // scheduler task. Do so by removing it from TLS and manually
  205          // cleaning up the memory it uses. As we didn't actually call
  206          // task.run() on the scheduler task we never get through all
  207          // the cleanup code it runs.
  208          let mut stask~Task = Local::take();
  209  
  210          rtdebug!("stopping scheduler {}", stask.sched.get_ref().sched_id());
  211  
  212          // Should not have any messages
  213          let message = stask.sched.get_mut_ref().message_queue.pop();
  214          rtassert!(message.is_none());
  215  
  216          stask.destroyed = true;
  217      }
  218  
  219      // This does not return a scheduler, as the scheduler is placed
  220      // inside the task.
  221      pub fn run(~self) {
  222  
  223          let mut self_sched = self;
  224  
  225          // This is unsafe because we need to place the scheduler, with
  226          // the event_loop inside, inside our task. But we still need a
  227          // mutable reference to the event_loop to give it the "run"
  228          // command.
  229          unsafe {
  230              let event_loop*mut ~EventLoopObject = &mut self_sched.event_loop;
  231  
  232              // Our scheduler must be in the task before the event loop
  233              // is started.
  234              let self_sched = Cell::new(self_sched);
  235              do Local::borrow |stask&mut Task{
  236                  stask.sched = Some(self_sched.take());
  237              };
  238  
  239              (*event_loop).run();
  240          }
  241      }
  242  
  243      // * Execution Functions - Core Loop Logic
  244  
  245      // The model for this function is that you continue through it
  246      // until you either use the scheduler while performing a schedule
  247      // action, in which case you give it away and return early, or
  248      // you reach the end and sleep. In the case that a scheduler
  249      // action is performed the loop is evented such that this function
  250      // is called again.
  251      fn run_sched_once() {
  252  
  253          // When we reach the scheduler context via the event loop we
  254          // already have a scheduler stored in our local task, so we
  255          // start off by taking it. This is the only path through the
  256          // scheduler where we get the scheduler this way.
  257          let mut sched~Scheduler = Local::take();
  258  
  259          // Assume that we need to continue idling unless we reach the
  260          // end of this function without performing an action.
  261          sched.idle_callback.get_mut_ref().resume();
  262  
  263          // First we check for scheduler messages, these are higher
  264          // priority than regular tasks.
  265          let sched = match sched.interpret_message_queue(DontTryTooHard) {
  266              Some(sched) => sched,
  267              None => return
  268          };
  269  
  270          // This helper will use a randomized work-stealing algorithm
  271          // to find work.
  272          let sched = match sched.do_work() {
  273              Some(sched) => sched,
  274              None => return
  275          };
  276  
  277          // Now, before sleeping we need to find out if there really
  278          // were any messages. Give it your best!
  279          let mut sched = match sched.interpret_message_queue(GiveItYourBest) {
  280              Some(sched) => sched,
  281              None => return
  282          };
  283  
  284          // If we got here then there was no work to do.
  285          // Generate a SchedHandle and push it to the sleeper list so
  286          // somebody can wake us up later.
  287          if !sched.sleepy && !sched.no_sleep {
  288              rtdebug!("scheduler has no work to do, going to sleep");
  289              sched.sleepy = true;
  290              let handle = sched.make_handle();
  291              sched.sleeper_list.push(handle);
  292              // Since we are sleeping, deactivate the idle callback.
  293              sched.idle_callback.get_mut_ref().pause();
  294          } else {
  295              rtdebug!("not sleeping, already doing so or no_sleep set");
  296              // We may not be sleeping, but we still need to deactivate
  297              // the idle callback.
  298              sched.idle_callback.get_mut_ref().pause();
  299          }
  300  
  301          // Finished a cycle without using the Scheduler. Place it back
  302          // in TLS.
  303          Local::put(sched);
  304      }
  305  
  306      // This function returns None if the scheduler is "used", or it
  307      // returns the still-available scheduler. At this point all
  308      // message-handling will count as a turn of work, and as a result
  309      // return None.
  310      fn interpret_message_queue(~self, effortEffortLevel) -> Option<~Scheduler> {
  311  
  312          let mut this = self;
  313  
  314          let msg = if effort == DontTryTooHard {
  315              // Do a cheap check that may miss messages
  316              this.message_queue.casual_pop()
  317          } else {
  318              this.message_queue.pop()
  319          };
  320  
  321          match msg {
  322              Some(PinnedTask(task)) => {
  323                  let mut task = task;
  324                  task.give_home(Sched(this.make_handle()));
  325                  this.resume_task_immediately(task);
  326                  return None;
  327              }
  328              Some(TaskFromFriend(task)) => {
  329                  rtdebug!("got a task from a friend. lovely!");
  330                  this.process_task(task, Scheduler::resume_task_immediately_cl);
  331                  return None;
  332              }
  333              Some(Wake) => {
  334                  this.sleepy = false;
  335                  Local::put(this);
  336                  return None;
  337              }
  338              Some(Shutdown) => {
  339                  rtdebug!("shutting down");
  340                  if this.sleepy {
  341                      // There may be an outstanding handle on the
  342                      // sleeper list.  Pop them all to make sure that's
  343                      // not the case.
  344                      loop {
  345                          match this.sleeper_list.pop() {
  346                              Some(handle) => {
  347                                  let mut handle = handle;
  348                                  handle.send(Wake);
  349                              }
  350                              None => break
  351                          }
  352                      }
  353                  }
  354                  // No more sleeping. After there are no outstanding
  355                  // event loop references we will shut down.
  356                  this.no_sleep = true;
  357                  this.sleepy = false;
  358                  Local::put(this);
  359                  return None;
  360              }
  361              None => {
  362                  return Some(this);
  363              }
  364          }
  365      }
  366  
  367      fn do_work(~self) -> Option<~Scheduler> {
  368          let mut this = self;
  369  
  370          rtdebug!("scheduler calling do work");
  371          match this.find_work() {
  372              Some(task) => {
  373                  rtdebug!("found some work! processing the task");
  374                  this.process_task(task, Scheduler::resume_task_immediately_cl);
  375                  return None;
  376              }
  377              None => {
  378                  rtdebug!("no work was found, returning the scheduler struct");
  379                  return Some(this);
  380              }
  381          }
  382      }
  383  
  384      // Workstealing: In this iteration of the runtime each scheduler
  385      // thread has a distinct work queue. When no work is available
  386      // locally, make a few attempts to steal work from the queues of
  387      // other scheduler threads. If a few steals fail we end up in the
  388      // old "no work" path which is fine.
  389  
  390      // First step in the process is to find a task. This function does
  391      // that by first checking the local queue, and if there is no work
  392      // there, trying to steal from the remote work queues.
  393      fn find_work(&mut self) -> Option<~Task> {
  394          rtdebug!("scheduler looking for work");
  395          if !self.steal_for_yield {
  396              match self.work_queue.pop() {
  397                  Some(task) => {
  398                      rtdebug!("found a task locally");
  399                      return Some(task)
  400                  }
  401                  None => {
  402                      rtdebug!("scheduler trying to steal");
  403                      return self.try_steals();
  404                  }
  405              }
  406          } else {
  407              // During execution of the last task, it performed a 'yield',
  408              // so we're doing some work stealing in order to introduce some
  409              // scheduling randomness. Otherwise we would just end up popping
  410              // that same task again. This is pretty lame and is to work around
  411              // the problem that work stealing is not designed for 'non-strict'
  412              // (non-fork-join) task parallelism.
  413              self.steal_for_yield = false;
  414              match self.try_steals() {
  415                  Some(task) => {
  416                      rtdebug!("stole a task after yielding");
  417                      return Some(task);
  418                  }
  419                  None => {
  420                      rtdebug!("did not steal a task after yielding");
  421                      // Back to business
  422                      return self.find_work();
  423                  }
  424              }
  425          }
  426      }
  427  
  428      // Try stealing from all queues the scheduler knows about. This
  429      // naive implementation can steal from our own queue or from other
  430      // special schedulers.
  431      fn try_steals(&mut self) -> Option<~Task> {
  432          let work_queues = &mut self.work_queues;
  433          let len = work_queues.len();
  434          let start_index = self.rng.gen_integer_range(0, len);
  435          for index in range(0, len).map(|i| (i + start_index) % len) {
  436              match work_queues[index].steal() {
  437                  Some(task) => {
  438                      rtdebug!("found task by stealing");
  439                      return Some(task)
  440                  }
  441                  None => ()
  442              }
  443          };
  444          rtdebug!("giving up on stealing");
  445          return None;
  446      }
  447  
  448      // * Task Routing Functions - Make sure tasks send up in the right
  449      // place.
  450  
  451      fn process_task(~self, task~Task,
  452                      schedule_fnSchedulingFn) {
  453          let mut this = self;
  454          let mut task = task;
  455  
  456          rtdebug!("processing a task");
  457  
  458          let home = task.take_unwrap_home();
  459          match home {
  460              Sched(home_handle) => {
  461                  if home_handle.sched_id != this.sched_id() {
  462                      rtdebug!("sending task home");
  463                      task.give_home(Sched(home_handle));
  464                      Scheduler::send_task_home(task);
  465                      Local::put(this);
  466                  } else {
  467                      rtdebug!("running task here");
  468                      task.give_home(Sched(home_handle));
  469                      schedule_fn(this, task);
  470                  }
  471              }
  472              AnySched if this.run_anything => {
  473                  rtdebug!("running anysched task here");
  474                  task.give_home(AnySched);
  475                  schedule_fn(this, task);
  476              }
  477              AnySched => {
  478                  rtdebug!("sending task to friend");
  479                  task.give_home(AnySched);
  480                  this.send_to_friend(task);
  481                  Local::put(this);
  482              }
  483          }
  484      }
  485  
  486      fn send_task_home(task~Task) {
  487          let mut task = task;
  488          let mut home = task.take_unwrap_home();
  489          match home {
  490              Sched(ref mut home_handle) => {
  491                  home_handle.send(PinnedTask(task));
  492              }
  493              AnySched => {
  494                          rtabort!("error: cannot send anysched task home");
  495              }
  496          }
  497      }
  498  
  499      /// Take a non-homed task we aren't allowed to run here and send
  500      /// it to the designated friend scheduler to execute.
  501      fn send_to_friend(&mut self, task~Task) {
  502          rtdebug!("sending a task to friend");
  503          match self.friend_handle {
  504              Some(ref mut handle) => {
  505                  handle.send(TaskFromFriend(task));
  506              }
  507              None => {
  508                  rtabort!("tried to send task to a friend but scheduler has no friends");
  509              }
  510          }
  511      }
  512  
  513      /// Schedule a task to be executed later.
  514      ///
  515      /// Pushes the task onto the work stealing queue and tells the
  516      /// event loop to run it later. Always use this instead of pushing
  517      /// to the work queue directly.
  518      pub fn enqueue_task(&mut self, task~Task) {
  519  
  520          let this = self;
  521  
  522          // We push the task onto our local queue clone.
  523          this.work_queue.push(task);
  524          this.idle_callback.get_mut_ref().resume();
  525  
  526          // We've made work available. Notify a
  527          // sleeping scheduler.
  528  
  529          match this.sleeper_list.casual_pop() {
  530              Some(handle) => {
  531                          let mut handle = handle;
  532                  handle.send(Wake)
  533              }
  534              None => { (/* pass */) }
  535          };
  536      }
  537  
  538      /// As enqueue_task, but with the possibility for the blocked task to
  539      /// already have been killed.
  540      pub fn enqueue_blocked_task(&mut self, blocked_taskBlockedTask) {
  541          do blocked_task.wake().map |task| {
  542              self.enqueue_task(task);
  543          };
  544      }
  545  
  546      // * Core Context Switching Functions
  547  
  548      // The primary function for changing contexts. In the current
  549      // design the scheduler is just a slightly modified GreenTask, so
  550      // all context swaps are from Task to Task. The only difference
  551      // between the various cases is where the inputs come from, and
  552      // what is done with the resulting task. That is specified by the
  553      // cleanup function f, which takes the scheduler and the
  554      // old task as inputs.
  555  
  556      pub fn change_task_context(~self,
  557                                 next_task~Task,
  558                                 f&fn(&mut Scheduler, ~Task)) {
  559          let mut this = self;
  560  
  561          // The current task is grabbed from TLS, not taken as an input.
  562          // Doing an unsafe_take to avoid writing back a null pointer -
  563          // We're going to call `put` later to do that.
  564          let current_task~Task = unsafe { Local::unsafe_take() };
  565  
  566          // Check that the task is not in an atomically() section (e.g.,
  567          // holding a pthread mutex, which could deadlock the scheduler).
  568          current_task.death.assert_may_sleep();
  569  
  570          // These transmutes do something fishy with a closure.
  571          let f_fake_region = unsafe {
  572              transmute::<&fn(&mut Scheduler, ~Task),
  573                          &fn(&mut Scheduler, ~Task)>(f)
  574          };
  575          let f_opaque = ClosureConverter::from_fn(f_fake_region);
  576  
  577          // The current task is placed inside an enum with the cleanup
  578          // function. This enum is then placed inside the scheduler.
  579          this.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
  580  
  581          // The scheduler is then placed inside the next task.
  582          let mut next_task = next_task;
  583          next_task.sched = Some(this);
  584  
  585          // However we still need an internal mutable pointer to the
  586          // original task. The strategy here was "arrange memory, then
  587          // get pointers", so we crawl back up the chain using
  588          // transmute to eliminate borrowck errors.
  589          unsafe {
  590  
  591              let sched&mut Scheduler =
  592                  transmute_mut_region(*next_task.sched.get_mut_ref());
  593  
  594              let current_task&mut Task = match sched.cleanup_job {
  595                  Some(CleanupJob { task: ref task, _ }) => {
  596                      let task_ptr*~Task = task;
  597                      transmute_mut_region(*transmute_mut_unsafe(task_ptr))
  598                  }
  599                  None => {
  600                      rtabort!("no cleanup job");
  601                  }
  602              };
  603  
  604              let (current_task_context, next_task_context) =
  605                  Scheduler::get_contexts(current_task, next_task);
  606  
  607              // Done with everything - put the next task in TLS. This
  608              // works because due to transmute the borrow checker
  609              // believes that we have no internal pointers to
  610              // next_task.
  611              Local::put(next_task);
  612  
  613              // The raw context swap operation. The next action taken
  614              // will be running the cleanup job from the context of the
  615              // next task.
  616              Context::swap(current_task_context, next_task_context);
  617          }
  618  
  619          // When the context swaps back to this task we immediately
  620          // run the cleanup job, as expected by the previously called
  621          // swap_contexts function.
  622          unsafe {
  623              let task*mut Task = Local::unsafe_borrow();
  624              (*task).sched.get_mut_ref().run_cleanup_job();
  625  
  626              // Must happen after running the cleanup job (of course).
  627              (*task).death.check_killed((*task).unwinder.unwinding);
  628          }
  629      }
  630  
  631      // Returns a mutable reference to both contexts involved in this
  632      // swap. This is unsafe - we are getting mutable internal
  633      // references to keep even when we don't own the tasks. It looks
  634      // kinda safe because we are doing transmutes before passing in
  635      // the arguments.
  636      pub fn get_contexts<'a>(current_task&mut Task, next_task&mut Task) ->
  637          (&'a mut Context, &'a mut Context) {
  638          let current_task_context =
  639              &mut current_task.coroutine.get_mut_ref().saved_context;
  640          let next_task_context =
  641                  &mut next_task.coroutine.get_mut_ref().saved_context;
  642          unsafe {
  643              (transmute_mut_region(current_task_context),
  644               transmute_mut_region(next_task_context))
  645          }
  646      }
  647  
  648      // * Context Swapping Helpers - Here be ugliness!
  649  
  650      pub fn resume_task_immediately(~self, task~Task) {
  651          do self.change_task_context(task) |sched, stask| {
  652              sched.sched_task = Some(stask);
  653          }
  654      }
  655  
  656      fn resume_task_immediately_cl(sched~Scheduler,
  657                                    task~Task) {
  658          sched.resume_task_immediately(task)
  659      }
  660  
  661  
  662      pub fn resume_blocked_task_immediately(~self, blocked_taskBlockedTask) {
  663          match blocked_task.wake() {
  664              Some(task) => { self.resume_task_immediately(task); }
  665              None => Local::put(self)
  666          };
  667      }
  668  
  669      /// Block a running task, context switch to the scheduler, then pass the
  670      /// blocked task to a closure.
  671      ///
  672      /// # Safety note
  673      ///
  674      /// The closure here is a *stack* closure that lives in the
  675      /// running task.  It gets transmuted to the scheduler's lifetime
  676      /// and called while the task is blocked.
  677      ///
  678      /// This passes a Scheduler pointer to the fn after the context switch
  679      /// in order to prevent that fn from performing further scheduling operations.
  680      /// Doing further scheduling could easily result in infinite recursion.
  681      pub fn deschedule_running_task_and_then(~self, f&fn(&mut Scheduler, BlockedTask)) {
  682          // Trickier - we need to get the scheduler task out of self
  683          // and use it as the destination.
  684          let mut this = self;
  685          let stask = this.sched_task.take_unwrap();
  686          // Otherwise this is the same as below.
  687          this.switch_running_tasks_and_then(stask, f);
  688      }
  689  
  690      pub fn switch_running_tasks_and_then(~self, next_task~Task,
  691                                           f&fn(&mut Scheduler, BlockedTask)) {
  692          // This is where we convert the BlockedTask-taking closure into one
  693          // that takes just a Task, and is aware of the block-or-killed protocol.
  694          do self.change_task_context(next_task) |sched, task| {
  695              // Task might need to receive a kill signal instead of blocking.
  696              // We can call the "and_then" only if it blocks successfully.
  697              match BlockedTask::try_block(task) {
  698                  Left(killed_task) => sched.enqueue_task(killed_task),
  699                  Right(blocked_task) => f(sched, blocked_task),
  700              }
  701          }
  702      }
  703  
  704      fn switch_task(sched~Scheduler, task~Task) {
  705          do sched.switch_running_tasks_and_then(task) |sched, last_task| {
  706              sched.enqueue_blocked_task(last_task);
  707          };
  708      }
  709  
  710      // * Task Context Helpers
  711  
  712      /// Called by a running task to end execution, after which it will
  713      /// be recycled by the scheduler for reuse in a new task.
  714      pub fn terminate_current_task(~self) {
  715          // Similar to deschedule running task and then, but cannot go through
  716          // the task-blocking path. The task is already dying.
  717          let mut this = self;
  718          let stask = this.sched_task.take_unwrap();
  719          do this.change_task_context(stask) |sched, mut dead_task| {
  720              let coroutine = dead_task.coroutine.take_unwrap();
  721              coroutine.recycle(&mut sched.stack_pool);
  722          }
  723      }
  724  
  725      pub fn run_task(task~Task) {
  726          let sched~Scheduler = Local::take();
  727          sched.process_task(task, Scheduler::switch_task);
  728      }
  729  
  730      pub fn run_task_later(next_task~Task) {
  731          let next_task = Cell::new(next_task);
  732          do Local::borrow |sched&mut Scheduler{
  733              sched.enqueue_task(next_task.take());
  734          };
  735      }
  736  
  737      /// Yield control to the scheduler, executing another task. This is guaranteed
  738      /// to introduce some amount of randomness to the scheduler. Currently the
  739      /// randomness is a result of performing a round of work stealing (which
  740      /// may end up stealing from the current scheduler).
  741      pub fn yield_now(~self) {
  742          let mut this = self;
  743          this.yield_check_count = reset_yield_check(&mut this.rng);
  744          // Tell the scheduler to start stealing on the next iteration
  745          this.steal_for_yield = true;
  746          do this.deschedule_running_task_and_then |sched, task| {
  747              sched.enqueue_blocked_task(task);
  748          }
  749      }
  750  
  751      pub fn maybe_yield(~self) {
  752          // The number of times to do the yield check before yielding, chosen arbitrarily.
  753          let mut this = self;
  754          rtassert!(this.yield_check_count > 0);
  755          this.yield_check_count -= 1;
  756          if this.yield_check_count == 0 {
  757              this.yield_now();
  758          } else {
  759              Local::put(this);
  760          }
  761      }
  762  
  763  
  764      // * Utility Functions
  765  
  766      pub fn sched_id(&self) -> uint { to_uint(self) }
  767  
  768      pub fn run_cleanup_job(&mut self) {
  769          let cleanup_job = self.cleanup_job.take_unwrap();
  770          cleanup_job.run(self);
  771      }
  772  
  773      pub fn make_handle(&mut self) -> SchedHandle {
  774          let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
  775  
  776          return SchedHandle {
  777              remote: remote,
  778              queue: self.message_queue.clone(),
  779              sched_id: self.sched_id()
  780          };
  781      }
  782  }
  783  
  784  // Supporting types
  785  
  786  type SchedulingFn = ~fn(~Scheduler, ~Task);
  787  
  788  pub enum SchedMessage {
  789      Wake,
  790      Shutdown,
  791      PinnedTask(~Task),
  792      TaskFromFriend(~Task)
  793  }
  794  
  795  pub struct SchedHandle {
  796      priv remote: ~RemoteCallbackObject,
  797      priv queue: MessageQueue<SchedMessage>,
  798      sched_id: uint
  799  }
  800  
  801  impl SchedHandle {
  802      pub fn send(&mut self, msgSchedMessage) {
  803          self.queue.push(msg);
  804          self.remote.fire();
  805      }
  806      pub fn send_task_from_friend(&mut self, friend~Task) {
  807          self.send(TaskFromFriend(friend));
  808      }
  809      pub fn send_shutdown(&mut self) {
  810          self.send(Shutdown);
  811      }
  812  }
  813  
  814  struct CleanupJob {
  815      task: ~Task,
  816      f: UnsafeTaskReceiver
  817  }
  818  
  819  impl CleanupJob {
  820      pub fn new(task~Task, fUnsafeTaskReceiver) -> CleanupJob {
  821          CleanupJob {
  822              task: task,
  823              f: f
  824          }
  825      }
  826  
  827      pub fn run(self, sched&mut Scheduler) {
  828          let CleanupJob { task: task, f: f } = self;
  829          f.to_fn()(sched, task)
  830      }
  831  }
  832  
  833  // XXX: Some hacks to put a &fn in Scheduler without borrowck
  834  // complaining
  835  type UnsafeTaskReceiver = raw::Closure;
  836  trait ClosureConverter {
  837      fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
  838      fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
  839  }
  840  impl ClosureConverter for UnsafeTaskReceiver {
  841      fn from_fn(f&fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
  842          unsafe { transmute(f) }
  843      }
  844      fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } }
  845  }
  846  
  847  // On unix, we read randomness straight from /dev/urandom, but the
  848  // default constructor of an XorShiftRng does this via io::file, which
  849  // relies on the scheduler existing, so we have to manually load
  850  // randomness. Windows has its own C API for this, so we don't need to
  851  // worry there.
  852  #[cfg(windows)]
  853  fn new_sched_rng() -> XorShiftRng {
  854      XorShiftRng::new()
  855  }
  856  #[cfg(unix)]
  857  #[fixed_stack_segment] #[inline(never)]
  858  fn new_sched_rng() -> XorShiftRng {
  859      use libc;
  860      use sys;
  861      use c_str::ToCStr;
  862      use vec::MutableVector;
  863      use iter::Iterator;
  864      use rand::SeedableRng;
  865  
  866      let fd = do "/dev/urandom".with_c_str |name| {
  867          unsafe { libc::open(name, libc::O_RDONLY, 0) }
  868      };
  869      if fd == -1 {
  870          rtabort!("could not open /dev/urandom for reading.")
  871      }
  872  
  873      let mut seeds = [0u32, .. 4];
  874      let size = sys::size_of_val(&seeds);
  875      loop {
  876          let nbytes = do seeds.as_mut_buf |buf, _| {
  877              unsafe {
  878                  libc::read(fd,
  879                             buf as *mut libc::c_void,
  880                             size as libc::size_t)
  881              }
  882          };
  883          rtassert!(nbytes as uint == size);
  884  
  885          if !seeds.iter().all(|x| *x == 0) {
  886              break;
  887          }
  888      }
  889  
  890      unsafe {libc::close(fd);}
  891  
  892      SeedableRng::from_seed(seeds)
  893  }
  894  
  895  #[cfg(test)]
  896  mod test {
  897      extern mod extra;
  898  
  899      use prelude::*;
  900      use rt::test::*;
  901      use unstable::run_in_bare_thread;
  902      use borrow::to_uint;
  903      use rt::local::*;
  904      use rt::sched::{Scheduler};
  905      use cell::Cell;
  906      use rt::thread::Thread;
  907      use rt::task::{Task, Sched};
  908      use rt::util;
  909      use option::{Some};
  910  
  911      #[test]
  912      fn trivial_run_in_newsched_task_test() {
  913          let mut task_ran = false;
  914          let task_ran_ptr: *mut bool = &mut task_ran;
  915          do run_in_newsched_task || {
  916              unsafe { *task_ran_ptr = true };
  917              rtdebug!("executed from the new scheduler")
  918          }
  919          assert!(task_ran);
  920      }
  921  
  922      #[test]
  923      fn multiple_task_test() {
  924          let total = 10;
  925          let mut task_run_count = 0;
  926          let task_run_count_ptr: *mut uint = &mut task_run_count;
  927          do run_in_newsched_task || {
  928              for _ in range(0u, total) {
  929                  do spawntask || {
  930                      unsafe { *task_run_count_ptr = *task_run_count_ptr + 1};
  931                  }
  932              }
  933          }
  934          assert!(task_run_count == total);
  935      }
  936  
  937      #[test]
  938      fn multiple_task_nested_test() {
  939          let mut task_run_count = 0;
  940          let task_run_count_ptr: *mut uint = &mut task_run_count;
  941          do run_in_newsched_task || {
  942              do spawntask || {
  943                  unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
  944                  do spawntask || {
  945                      unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
  946                      do spawntask || {
  947                          unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
  948                      }
  949                  }
  950              }
  951          }
  952          assert!(task_run_count == 3);
  953      }
  954  
  955      // Confirm that a sched_id actually is the uint form of the
  956      // pointer to the scheduler struct.
  957      #[test]
  958      fn simple_sched_id_test() {
  959          do run_in_bare_thread {
  960              let sched = ~new_test_uv_sched();
  961              assert!(to_uint(sched) == sched.sched_id());
  962          }
  963      }
  964  
  965      // Compare two scheduler ids that are different, this should never
  966      // fail but may catch a mistake someday.
  967      #[test]
  968      fn compare_sched_id_test() {
  969          do run_in_bare_thread {
  970              let sched_one = ~new_test_uv_sched();
  971              let sched_two = ~new_test_uv_sched();
  972              assert!(sched_one.sched_id() != sched_two.sched_id());
  973          }
  974      }
  975  
  976  
  977      // A very simple test that confirms that a task executing on the
  978      // home scheduler notices that it is home.
  979      #[test]
  980      fn test_home_sched() {
  981          do run_in_bare_thread {
  982              let mut task_ran = false;
  983              let task_ran_ptr: *mut bool = &mut task_ran;
  984  
  985              let mut sched = ~new_test_uv_sched();
  986              let sched_handle = sched.make_handle();
  987  
  988              let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, None,
  989                                                  Sched(sched_handle)) {
  990                  unsafe { *task_ran_ptr = true };
  991                  assert!(Task::on_appropriate_sched());
  992              };
  993  
  994              let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
  995              task.death.on_exit = Some(on_exit);
  996  
  997              sched.bootstrap(task);
  998          }
  999      }
 1000  
 1001      // An advanced test that checks all four possible states that a
 1002      // (task,sched) can be in regarding homes.
 1003  
 1004      #[test]
 1005      fn test_schedule_home_states() {
 1006  
 1007          use rt::uv::uvio::UvEventLoop;
 1008          use rt::sleeper_list::SleeperList;
 1009          use rt::work_queue::WorkQueue;
 1010          use rt::sched::Shutdown;
 1011          use borrow;
 1012          use rt::comm::*;
 1013  
 1014          do run_in_bare_thread {
 1015  
 1016              let sleepers = SleeperList::new();
 1017              let normal_queue = WorkQueue::new();
 1018              let special_queue = WorkQueue::new();
 1019              let queues = ~[normal_queue.clone(), special_queue.clone()];
 1020  
 1021              // Our normal scheduler
 1022              let mut normal_sched = ~Scheduler::new(
 1023                  ~UvEventLoop::new(),
 1024                  normal_queue,
 1025                  queues.clone(),
 1026                  sleepers.clone());
 1027  
 1028              let normal_handle = Cell::new(normal_sched.make_handle());
 1029  
 1030              let friend_handle = normal_sched.make_handle();
 1031  
 1032              // Our special scheduler
 1033              let mut special_sched = ~Scheduler::new_special(
 1034                  ~UvEventLoop::new(),
 1035                  special_queue.clone(),
 1036                  queues.clone(),
 1037                  sleepers.clone(),
 1038                  false,
 1039                  Some(friend_handle));
 1040  
 1041              let special_handle = Cell::new(special_sched.make_handle());
 1042  
 1043              let t1_handle = special_sched.make_handle();
 1044              let t4_handle = special_sched.make_handle();
 1045  
 1046              // Four test tasks:
 1047              //   1) task is home on special
 1048              //   2) task not homed, sched doesn't care
 1049              //   3) task not homed, sched requeues
 1050              //   4) task not home, send home
 1051  
 1052              let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None,
 1053                                                   Sched(t1_handle)) || {
 1054                  rtassert!(Task::on_appropriate_sched());
 1055              };
 1056              rtdebug!("task1 id: **{}**", borrow::to_uint(task1));
 1057  
 1058              let task2 = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
 1059                  rtassert!(Task::on_appropriate_sched());
 1060              };
 1061  
 1062              let task3 = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
 1063                  rtassert!(Task::on_appropriate_sched());
 1064              };
 1065  
 1066              let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None,
 1067                                                   Sched(t4_handle)) {
 1068                  rtassert!(Task::on_appropriate_sched());
 1069              };
 1070              rtdebug!("task4 id: **{}**", borrow::to_uint(task4));
 1071  
 1072              let task1 = Cell::new(task1);
 1073              let task2 = Cell::new(task2);
 1074              let task3 = Cell::new(task3);
 1075              let task4 = Cell::new(task4);
 1076  
 1077              // Signal from the special task that we are done.
 1078              let (port, chan) = oneshot::<()>();
 1079              let port = Cell::new(port);
 1080              let chan = Cell::new(chan);
 1081  
 1082              let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) {
 1083                  rtdebug!("*about to submit task2*");
 1084                  Scheduler::run_task(task2.take());
 1085                  rtdebug!("*about to submit task4*");
 1086                  Scheduler::run_task(task4.take());
 1087                  rtdebug!("*normal_task done*");
 1088                  port.take().recv();
 1089                  let mut nh = normal_handle.take();
 1090                  nh.send(Shutdown);
 1091                  let mut sh = special_handle.take();
 1092                  sh.send(Shutdown);
 1093              };
 1094  
 1095              rtdebug!("normal task: {}", borrow::to_uint(normal_task));
 1096  
 1097              let special_task = ~do Task::new_root(&mut special_sched.stack_pool, None) {
 1098                  rtdebug!("*about to submit task1*");
 1099                  Scheduler::run_task(task1.take());
 1100                  rtdebug!("*about to submit task3*");
 1101                  Scheduler::run_task(task3.take());
 1102                  rtdebug!("*done with special_task*");
 1103                  chan.take().send(());
 1104              };
 1105  
 1106              rtdebug!("special task: {}", borrow::to_uint(special_task));
 1107  
 1108              let special_sched = Cell::new(special_sched);
 1109              let normal_sched = Cell::new(normal_sched);
 1110              let special_task = Cell::new(special_task);
 1111              let normal_task = Cell::new(normal_task);
 1112  
 1113              let normal_thread = do Thread::start {
 1114                  normal_sched.take().bootstrap(normal_task.take());
 1115                  rtdebug!("finished with normal_thread");
 1116              };
 1117  
 1118              let special_thread = do Thread::start {
 1119                  special_sched.take().bootstrap(special_task.take());
 1120                  rtdebug!("finished with special_sched");
 1121              };
 1122  
 1123              normal_thread.join();
 1124              special_thread.join();
 1125          }
 1126      }
 1127  
 1128      #[test]
 1129      fn test_stress_schedule_task_states() {
 1130          if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
 1131          let n = stress_factor() * 120;
 1132          for _ in range(0, n as int) {
 1133              test_schedule_home_states();
 1134          }
 1135      }
 1136  
 1137      #[test]
 1138      fn test_io_callback() {
 1139          // This is a regression test that when there are no schedulable tasks
 1140          // in the work queue, but we are performing I/O, that once we do put
 1141          // something in the work queue again the scheduler picks it up and doesn't
 1142          // exit before emptying the work queue
 1143          do run_in_newsched_task {
 1144              do spawntask {
 1145                  let sched: ~Scheduler = Local::take();
 1146                  do sched.deschedule_running_task_and_then |sched, task| {
 1147                      let task = Cell::new(task);
 1148                      do sched.event_loop.callback_ms(10) {
 1149                          rtdebug!("in callback");
 1150                          let mut sched: ~Scheduler = Local::take();
 1151                          sched.enqueue_blocked_task(task.take());
 1152                          Local::put(sched);
 1153                      }
 1154                  }
 1155              }
 1156          }
 1157      }
 1158  
 1159      #[test]
 1160      fn handle() {
 1161          use rt::comm::*;
 1162  
 1163          do run_in_bare_thread {
 1164              let (port, chan) = oneshot::<()>();
 1165              let port = Cell::new(port);
 1166              let chan = Cell::new(chan);
 1167  
 1168              let thread_one = do Thread::start {
 1169                  let chan = Cell::new(chan.take());
 1170                  do run_in_newsched_task_core {
 1171                      chan.take().send(());
 1172                  }
 1173              };
 1174  
 1175              let thread_two = do Thread::start {
 1176                  let port = Cell::new(port.take());
 1177                  do run_in_newsched_task_core {
 1178                      port.take().recv();
 1179                  }
 1180              };
 1181  
 1182              thread_two.join();
 1183              thread_one.join();
 1184          }
 1185      }
 1186  
 1187      // A regression test that the final message is always handled.
 1188      // Used to deadlock because Shutdown was never recvd.
 1189      #[test]
 1190      fn no_missed_messages() {
 1191          use rt::work_queue::WorkQueue;
 1192          use rt::sleeper_list::SleeperList;
 1193          use rt::stack::StackPool;
 1194          use rt::uv::uvio::UvEventLoop;
 1195          use rt::sched::{Shutdown, TaskFromFriend};
 1196          use util;
 1197  
 1198          do run_in_bare_thread {
 1199              do stress_factor().times {
 1200                  let sleepers = SleeperList::new();
 1201                  let queue = WorkQueue::new();
 1202                  let queues = ~[queue.clone()];
 1203  
 1204                  let mut sched = ~Scheduler::new(
 1205                      ~UvEventLoop::new(),
 1206                      queue,
 1207                      queues.clone(),
 1208                      sleepers.clone());
 1209  
 1210                  let mut handle = sched.make_handle();
 1211  
 1212                  let sched = Cell::new(sched);
 1213  
 1214                  let thread = do Thread::start {
 1215                      let mut sched = sched.take();
 1216                      let bootstrap_task = ~Task::new_root(&mut sched.stack_pool, None, ||());
 1217                      sched.bootstrap(bootstrap_task);
 1218                  };
 1219  
 1220                  let mut stack_pool = StackPool::new();
 1221                  let task = ~Task::new_root(&mut stack_pool, None, ||());
 1222                  handle.send(TaskFromFriend(task));
 1223  
 1224                  handle.send(Shutdown);
 1225                  util::ignore(handle);
 1226  
 1227                  thread.join();
 1228              }
 1229          }
 1230      }
 1231  
 1232      #[test]
 1233      fn multithreading() {
 1234          use rt::comm::*;
 1235          use num::Times;
 1236          use vec::OwnedVector;
 1237          use container::Container;
 1238  
 1239          do run_in_mt_newsched_task {
 1240              let mut ports = ~[];
 1241              do 10.times {
 1242                  let (port, chan) = oneshot();
 1243                  let chan_cell = Cell::new(chan);
 1244                  do spawntask_later {
 1245                      chan_cell.take().send(());
 1246                  }
 1247                  ports.push(port);
 1248              }
 1249  
 1250              while !ports.is_empty() {
 1251                  ports.pop().recv();
 1252              }
 1253          }
 1254      }
 1255  
 1256       #[test]
 1257      fn thread_ring() {
 1258          use rt::comm::*;
 1259          use comm::{GenericPort, GenericChan};
 1260  
 1261          do run_in_mt_newsched_task {
 1262                  let (end_port, end_chan) = oneshot();
 1263  
 1264              let n_tasks = 10;
 1265              let token = 2000;
 1266  
 1267                  let (p, ch1) = stream();
 1268              let mut p = p;
 1269                  ch1.send((token, end_chan));
 1270                  let mut i = 2;
 1271              while i <= n_tasks {
 1272                  let (next_p, ch) = stream();
 1273                  let imm_i = i;
 1274                  let imm_p = p;
 1275                  do spawntask_random {
 1276                      roundtrip(imm_i, n_tasks, &imm_p, &ch);
 1277                  };
 1278                  p = next_p;
 1279                  i += 1;
 1280              }
 1281              let imm_p = p;
 1282              let imm_ch = ch1;
 1283              do spawntask_random {
 1284                  roundtrip(1, n_tasks, &imm_p, &imm_ch);
 1285              }
 1286  
 1287              end_port.recv();
 1288          }
 1289  
 1290          fn roundtrip(id: int, n_tasks: int,
 1291                       p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) {
 1292              while (true) {
 1293                  match p.recv() {
 1294                      (1, end_chan) => {
 1295                                  debug2!("{}\n", id);
 1296                                  end_chan.send(());
 1297                                  return;
 1298                      }
 1299                      (token, end_chan) => {
 1300                          debug2!("thread: {}   got token: {}", id, token);
 1301                          ch.send((token - 1, end_chan));
 1302                          if token <= n_tasks {
 1303                              return;
 1304                          }
 1305                      }
 1306                  }
 1307              }
 1308          }
 1309      }
 1310  
 1311      #[test]
 1312      fn start_closure_dtor() {
 1313          use ops::Drop;
 1314  
 1315          // Regression test that the `start` task entrypoint can
 1316          // contain dtors that use task resources
 1317          do run_in_newsched_task {
 1318              struct S { field: () }
 1319  
 1320              impl Drop for S {
 1321                  fn drop(&mut self) {
 1322                      let _foo = @0;
 1323                  }
 1324              }
 1325  
 1326              let s = S { field: () };
 1327  
 1328              do spawntask {
 1329                  let _ss = &s;
 1330              }
 1331          }
 1332      }
 1333  
 1334      // FIXME: #9407: xfail-test
 1335      fn dont_starve_1() {
 1336          use rt::comm::oneshot;
 1337  
 1338          do stress_factor().times {
 1339              do run_in_mt_newsched_task {
 1340                  let (port, chan) = oneshot();
 1341  
 1342                  // This task should not be able to starve the sender;
 1343                  // The sender should get stolen to another thread.
 1344                  do spawntask {
 1345                      while !port.peek() { }
 1346                  }
 1347  
 1348                  chan.send(());
 1349              }
 1350          }
 1351      }
 1352  
 1353      #[test]
 1354      fn dont_starve_2() {
 1355          use rt::comm::oneshot;
 1356  
 1357          do stress_factor().times {
 1358              do run_in_newsched_task {
 1359                  let (port, chan) = oneshot();
 1360                  let (_port2, chan2) = stream();
 1361  
 1362                  // This task should not be able to starve the other task.
 1363                  // The sends should eventually yield.
 1364                  do spawntask {
 1365                      while !port.peek() {
 1366                          chan2.send(());
 1367                      }
 1368                  }
 1369  
 1370                  chan.send(());
 1371              }
 1372          }
 1373      }
 1374  
 1375      // Regression test for a logic bug that would cause single-threaded schedulers
 1376      // to sleep forever after yielding and stealing another task.
 1377      #[test]
 1378      fn single_threaded_yield() {
 1379          use task::{spawn, spawn_sched, SingleThreaded, deschedule};
 1380          use num::Times;
 1381  
 1382          do spawn_sched(SingleThreaded) {
 1383              do 5.times { deschedule(); }
 1384          }
 1385          do spawn { }
 1386          do spawn { }
 1387      }
 1388  }

libstd/rt/sched.rs:857:40-857:40 -fn- definition:
#[fixed_stack_segment] #[inline(never)]
fn new_sched_rng() -> XorShiftRng {
references:-
143:             rng: new_sched_rng(),


libstd/rt/sched.rs:785:1-785:1 -ty- definition:

type SchedulingFn = ~fn(~Scheduler, ~Task);
references:-
452:                     schedule_fn: SchedulingFn) {


libstd/rt/sched.rs:835:40-835:40 -trait- definition:
type UnsafeTaskReceiver = raw::Closure;
trait ClosureConverter {
references:-
840: impl ClosureConverter for UnsafeTaskReceiver {
837:     fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;


libstd/rt/sched.rs:813:1-813:1 -struct- definition:

struct CleanupJob {
references:-
72:     cleanup_job: Option<CleanupJob>,
820:     pub fn new(task: ~Task, f: UnsafeTaskReceiver) -> CleanupJob {
828:         let CleanupJob { task: task, f: f } = self;
821:         CleanupJob {
819: impl CleanupJob {
595:                 Some(CleanupJob { task: ref task, _ }) => {


libstd/rt/sched.rs:787:1-787:1 -enum- definition:

pub enum SchedMessage {
references:-
797:     priv queue: MessageQueue<SchedMessage>,
802:     pub fn send(&mut self, msg: SchedMessage) {
50:     priv message_queue: MessageQueue<SchedMessage>,


libstd/rt/sched.rs:93:16-93:16 -enum- definition:
#[deriving(Eq)]
enum EffortLevel {
references:-
93: #[deriving(Eq)]
93: #[deriving(Eq)]
310:     fn interpret_message_queue(~self, effort: EffortLevel) -> Option<~Scheduler> {
93: #[deriving(Eq)]


libstd/rt/sched.rs:100:1-100:1 -fn- definition:

fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
references:-
743:         this.yield_check_count = reset_yield_check(&mut this.rng);
149:         sched.yield_check_count = reset_yield_check(&mut sched.rng);


libstd/rt/sched.rs:794:1-794:1 -struct- definition:

pub struct SchedHandle {
references:-
127:                        friend: Option<SchedHandle>)
77:     friend_handle: Option<SchedHandle>,
773:     pub fn make_handle(&mut self) -> SchedHandle {
776:         return SchedHandle {
801: impl SchedHandle {
libstd/rt/uv/uvio.rs:
1044:     home: SchedHandle,
813:     fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
991:     home: SchedHandle,
1358:     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1412:            home: SchedHandle) -> UvFileStream {
1403:     home: SchedHandle
1537:     home: Option<SchedHandle>,
1407:     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
871:     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
1144:     home: SchedHandle,
809:     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1549:     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() }
1362:     fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
805:     home: SchedHandle,
995:     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1148:     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1048:     fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
59:     fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
1354:     home: SchedHandle,
libstd/rt/sleeper_list.rs:
54:     pub fn pop(&mut self) -> Option<SchedHandle> {
43:     pub fn push(&mut self, handle: SchedHandle) {
70:     pub fn casual_pop(&mut self) -> Option<SchedHandle> {
28:     stack: ~[SchedHandle],
libstd/rt/task.rs:
79:     Sched(SchedHandle)
315:                 GreenTask(Some(Sched(SchedHandle { sched_id: ref id, _ }))) => {
(294)(278)

libstd/rt/sched.rs:40:48-40:48 -struct- definition:
/// in too much allocation and too many events.
pub struct Scheduler {
references:-
691:                                          f: &fn(&mut Scheduler, BlockedTask)) {
681:     pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) {
656:     fn resume_task_immediately_cl(sched: ~Scheduler,
786: type SchedulingFn = ~fn(~Scheduler, ~Task);
841:     fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
367:     fn do_work(~self) -> Option<~Scheduler> {
704:     fn switch_task(sched: ~Scheduler, task: ~Task) {
838:     fn to_fn(self) -> &fn(&mut Scheduler, ~Task);
732:         do Local::borrow |sched: &mut Scheduler| {
558:                                f: &fn(&mut Scheduler, ~Task)) {
257:         let mut sched: ~Scheduler = Local::take();
827:     pub fn run(self, sched: &mut Scheduler) {
573:                         &fn(&mut Scheduler, ~Task)>(f)
844:     fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } }
130:         let mut sched = Scheduler {
198:         let mut sched: ~Scheduler = Local::take();
726:         let sched: ~Scheduler = Local::take();
114:         -> Scheduler {
310:     fn interpret_message_queue(~self, effort: EffortLevel) -> Option<~Scheduler> {
837:     fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self;
572:             transmute::<&fn(&mut Scheduler, ~Task),
591:             let sched: &mut Scheduler =
192:         let sched: ~Scheduler = Local::take();
106: impl Scheduler {
128:         -> Scheduler {
libstd/rt/uv/uvio.rs:
124:     fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
457:             let scheduler: ~Scheduler = Local::take();
1598:                     let scheduler: ~Scheduler = Local::take();
137:     () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
137:     () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
(94)(735)(1559)(1225)(963)(1374)(137)(137)(923)(594)(137)(509)(1057)(71)(436)(1457)(711)(649)(475)(678)(1390)(953)(425)(665)(1157)(505)(528)(483)(532)(1198)(579)(127)(623)(1001)(137)(1005)(1435)(1490)(780)(137)(137)(980)(1555)(137)(599)(824)
libstd/rt/local.rs:
(133)(108)(60)(95)(114)(59)(100)(83)(68)(96)(75)
libstd/rt/test.rs:
(30)
libstd/rt/tube.rs:
(54)(70)
libstd/rt/comm.rs:
libstd/rt/select.rs:
libstd/task/mod.rs:
libstd/task/spawn.rs:
libstd/select.rs:
libstd/rt/task.rs:
..15more..


libstd/rt/sched.rs:834:15-834:15 -ty- definition:
// complaining
type UnsafeTaskReceiver = raw::Closure;
references:-
816:     f: UnsafeTaskReceiver
841:     fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver {
820:     pub fn new(task: ~Task, f: UnsafeTaskReceiver) -> CleanupJob {
840: impl ClosureConverter for UnsafeTaskReceiver {