(index<- )        ./libgreen/sched.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
    1  // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
    2  // file at the top-level directory of this distribution and at
    3  // http://rust-lang.org/COPYRIGHT.
    4  //
    5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
    6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
    7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
    8  // option. This file may not be copied, modified, or distributed
    9  // except according to those terms.
   10  
   11  use std::cast;
   12  use std::rt::local::Local;
   13  use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop};
   14  use std::rt::task::BlockedTask;
   15  use std::rt::task::Task;
   16  use std::sync::deque;
   17  use std::unstable::mutex::NativeMutex;
   18  use std::raw;
   19  
   20  use rand::{XorShiftRng, Rng, Rand};
   21  
   22  use TaskState;
   23  use context::Context;
   24  use coroutine::Coroutine;
   25  use sleeper_list::SleeperList;
   26  use stack::StackPool;
   27  use task::{TypeSched, GreenTask, HomeSched, AnySched};
   28  use msgq = message_queue;
   29  
   30  /// A scheduler is responsible for coordinating the execution of Tasks
   31  /// on a single thread. The scheduler runs inside a slightly modified
   32  /// Rust Task. When not running this task is stored in the scheduler
   33  /// struct. The scheduler struct acts like a baton, all scheduling
   34  /// actions are transfers of the baton.
   35  ///
   36  /// FIXME: This creates too many callbacks to run_sched_once, resulting
   37  /// in too much allocation and too many events.
   38  pub struct Scheduler {
   39      /// ID number of the pool that this scheduler is a member of. When
   40      /// reawakening green tasks, this is used to ensure that tasks aren't
   41      /// reawoken on the wrong pool of schedulers.
   42      pub pool_id: uint,
   43      /// The pool of stacks that this scheduler has cached
   44      pub stack_pool: StackPool,
   45      /// Bookkeeping for the number of tasks which are currently running around
   46      /// inside this pool of schedulers
   47      pub task_state: TaskState,
   48      /// There are N work queues, one per scheduler.
   49      work_queue: deque::Worker<Box<GreenTask>>,
   50      /// Work queues for the other schedulers. These are created by
   51      /// cloning the core work queues.
   52      work_queues: Vec<deque::Stealer<Box<GreenTask>>>,
   53      /// The queue of incoming messages from other schedulers.
   54      /// These are enqueued by SchedHandles after which a remote callback
   55      /// is triggered to handle the message.
   56      message_queue: msgq::Consumer<SchedMessage>,
   57      /// Producer used to clone sched handles from
   58      message_producer: msgq::Producer<SchedMessage>,
   59      /// A shared list of sleeping schedulers. We'll use this to wake
   60      /// up schedulers when pushing work onto the work queue.
   61      sleeper_list: SleeperList,
   62      /// Indicates that we have previously pushed a handle onto the
   63      /// SleeperList but have not yet received the Wake message.
   64      /// Being `true` does not necessarily mean that the scheduler is
   65      /// not active since there are multiple event sources that may
   66      /// wake the scheduler. It just prevents the scheduler from pushing
   67      /// multiple handles onto the sleeper list.
   68      sleepy: bool,
   69      /// A flag to indicate we've received the shutdown message and should
   70      /// no longer try to go to sleep, but exit instead.
   71      no_sleep: bool,
   72      /// The scheduler runs on a special task. When it is not running
   73      /// it is stored here instead of the work queue.
   74      sched_task: Option<Box<GreenTask>>,
   75      /// An action performed after a context switch on behalf of the
   76      /// code running before the context switch
   77      cleanup_job: Option<CleanupJob>,
   78      /// If the scheduler shouldn't run some tasks, a friend to send
   79      /// them to.
   80      friend_handle: Option<SchedHandle>,
   81      /// Should this scheduler run any task, or only pinned tasks?
   82      run_anything: bool,
   83      /// A fast XorShift rng for scheduler use
   84      rng: XorShiftRng,
   85      /// A togglable idle callback
   86      idle_callback: Option<Box<PausableIdleCallback:Send>>,
   87      /// A countdown that starts at a random value and is decremented
   88      /// every time a yield check is performed. When it hits 0 a task
   89      /// will yield.
   90      yield_check_count: uint,
   91      /// A flag to tell the scheduler loop it needs to do some stealing
   92      /// in order to introduce randomness as part of a yield
   93      steal_for_yield: bool,
   94  
   95      // n.b. currently destructors of an object are run in top-to-bottom in order
   96      //      of field declaration. Due to its nature, the pausable idle callback
   97      //      must have some sort of handle to the event loop, so it needs to get
   98      //      destroyed before the event loop itself. For this reason, we destroy
   99      //      the event loop last to ensure that any unsafe references to it are
  100      //      destroyed before it's actually destroyed.
  101  
  102      /// The event loop used to drive the scheduler and perform I/O
  103      pub event_loop: Box<EventLoop:Send>,
  104  }
  105  
  106  /// An indication of how hard to work on a given operation, the difference
  107  /// mainly being whether memory is synchronized or not
  108  #[deriving(Eq)]
  109  enum EffortLevel {
  110      DontTryTooHard,
  111      GiveItYourBest
  112  }
  113  
  114  static MAX_YIELD_CHECKS: uint = 20000;
  115  
  116  fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
  117      let ruint = Rand::rand(rng);
  118      r % MAX_YIELD_CHECKS + 1
  119  }
  120  
  121  impl Scheduler {
  122  
  123      // * Initialization Functions
  124  
  125      pub fn new(pool_iduint,
  126                 event_loopBox<EventLoop:Send>,
  127                 work_queuedeque::Worker<Box<GreenTask>>,
  128                 work_queuesVec<deque::Stealer<Box<GreenTask>>>,
  129                 sleeper_listSleeperList,
  130                 stateTaskState)
  131          -> Scheduler {
  132  
  133          Scheduler::new_special(pool_id, event_loop, work_queue, work_queues,
  134                                 sleeper_list, true, None, state)
  135  
  136      }
  137  
  138      pub fn new_special(pool_iduint,
  139                         event_loopBox<EventLoop:Send>,
  140                         work_queuedeque::Worker<Box<GreenTask>>,
  141                         work_queuesVec<deque::Stealer<Box<GreenTask>>>,
  142                         sleeper_listSleeperList,
  143                         run_anythingbool,
  144                         friendOption<SchedHandle>,
  145                         stateTaskState)
  146          -> Scheduler {
  147  
  148          let (consumer, producer) = msgq::queue();
  149          let mut sched = Scheduler {
  150              pool_id: pool_id,
  151              sleeper_list: sleeper_list,
  152              message_queue: consumer,
  153              message_producer: producer,
  154              sleepy: false,
  155              no_sleep: false,
  156              event_loop: event_loop,
  157              work_queue: work_queue,
  158              work_queues: work_queues,
  159              stack_pool: StackPool::new(),
  160              sched_task: None,
  161              cleanup_job: None,
  162              run_anything: run_anything,
  163              friend_handle: friend,
  164              rng: new_sched_rng(),
  165              idle_callback: None,
  166              yield_check_count: 0,
  167              steal_for_yield: false,
  168              task_state: state,
  169          };
  170  
  171          sched.yield_check_count = reset_yield_check(&mut sched.rng);
  172  
  173          return sched;
  174      }
  175  
  176      // FIXME: This may eventually need to be refactored so that
  177      // the scheduler itself doesn't have to call event_loop.run.
  178      // That will be important for embedding the runtime into external
  179      // event loops.
  180  
  181      // Take a main task to run, and a scheduler to run it in. Create a
  182      // scheduler task and bootstrap into it.
  183      pub fn bootstrap(mut ~self) {
  184  
  185          // Build an Idle callback.
  186          let cb = box SchedRunner as Box<Callback:Send>;
  187          self.idle_callback = Some(self.event_loop.pausable_idle_callback(cb));
  188  
  189          // Create a task for the scheduler with an empty context.
  190          let sched_task = GreenTask::new_typed(Some(Coroutine::empty()),
  191                                                TypeSched);
  192  
  193          // Before starting our first task, make sure the idle callback
  194          // is active. As we do not start in the sleep state this is
  195          // important.
  196          self.idle_callback.get_mut_ref().resume();
  197  
  198          // Now, as far as all the scheduler state is concerned, we are inside
  199          // the "scheduler" context. The scheduler immediately hands over control
  200          // to the event loop, and this will only exit once the event loop no
  201          // longer has any references (handles or I/O objects).
  202          rtdebug!("starting scheduler {}", self.sched_id());
  203          let mut sched_task = self.run(sched_task);
  204  
  205          // Close the idle callback.
  206          let mut sched = sched_task.sched.take_unwrap();
  207          sched.idle_callback.take();
  208          // Make one go through the loop to run the close callback.
  209          let mut stask = sched.run(sched_task);
  210  
  211          // Now that we are done with the scheduler, clean up the
  212          // scheduler task. Do so by removing it from TLS and manually
  213          // cleaning up the memory it uses. As we didn't actually call
  214          // task.run() on the scheduler task we never get through all
  215          // the cleanup code it runs.
  216          rtdebug!("stopping scheduler {}", stask.sched.get_ref().sched_id());
  217  
  218          // Should not have any messages
  219          let message = stask.sched.get_mut_ref().message_queue.pop();
  220          rtassert!(match message { msgq::Empty => true, _ => false });
  221  
  222          stask.task.get_mut_ref().destroyed = true;
  223      }
  224  
  225      // This does not return a scheduler, as the scheduler is placed
  226      // inside the task.
  227      pub fn run(mut ~self, staskBox<GreenTask>) -> Box<GreenTask> {
  228  
  229          // This is unsafe because we need to place the scheduler, with
  230          // the event_loop inside, inside our task. But we still need a
  231          // mutable reference to the event_loop to give it the "run"
  232          // command.
  233          unsafe {
  234              let event_loop*mut Box<EventLoop:Send> = &mut self.event_loop;
  235              // Our scheduler must be in the task before the event loop
  236              // is started.
  237              stask.put_with_sched(self);
  238              (*event_loop).run();
  239          }
  240  
  241          //  This is a serious code smell, but this function could be done away
  242          //  with if necessary. The ownership of `stask` was transferred into
  243          //  local storage just before the event loop ran, so it is possible to
  244          //  transmute `stask` as a uint across the running of the event loop to
  245          //  re-acquire ownership here.
  246          //
  247          // This would involve removing the Task from TLS, removing the runtime,
  248          // forgetting the runtime, and then putting the task into `stask`. For
  249          // now, because we have `GreenTask::convert`, I chose to take this
  250          // method for cleanliness. This function is *not* a fundamental reason
  251          // why this function should exist.
  252          GreenTask::convert(Local::take())
  253      }
  254  
  255      // * Execution Functions - Core Loop Logic
  256  
  257      // This function is run from the idle callback on the uv loop, indicating
  258      // that there are no I/O events pending. When this function returns, we will
  259      // fall back to epoll() in the uv event loop, waiting for more things to
  260      // happen. We may come right back off epoll() if the idle callback is still
  261      // active, in which case we're truly just polling to see if I/O events are
  262      // complete.
  263      //
  264      // The model for this function is to execute as much work as possible while
  265      // still fairly considering I/O tasks. Falling back to epoll() frequently is
  266      // often quite expensive, so we attempt to avoid it as much as possible. If
  267      // we have any active I/O on the event loop, then we're forced to fall back
  268      // to epoll() in order to provide fairness, but as long as we're doing work
  269      // and there's no active I/O, we can continue to do work.
  270      //
  271      // If we try really hard to do some work, but no work is available to be
  272      // done, then we fall back to epoll() to block this thread waiting for more
  273      // work (instead of busy waiting).
  274      fn run_sched_once(mut ~self, staskBox<GreenTask>) {
  275          // Make sure that we're not lying in that the `stask` argument is indeed
  276          // the scheduler task for this scheduler.
  277          assert!(self.sched_task.is_none());
  278  
  279          // Assume that we need to continue idling unless we reach the
  280          // end of this function without performing an action.
  281          self.idle_callback.get_mut_ref().resume();
  282  
  283          // First we check for scheduler messages, these are higher
  284          // priority than regular tasks.
  285          let (mut sched, mut stask, mut did_work) =
  286              self.interpret_message_queue(stask, DontTryTooHard);
  287  
  288          // After processing a message, we consider doing some more work on the
  289          // event loop. The "keep going" condition changes after the first
  290          // iteration becase we don't want to spin here infinitely.
  291          //
  292          // Once we start doing work we can keep doing work so long as the
  293          // iteration does something. Note that we don't want to starve the
  294          // message queue here, so each iteration when we're done working we
  295          // check the message queue regardless of whether we did work or not.
  296          let mut keep_going = !did_work || !sched.event_loop.has_active_io();
  297          while keep_going {
  298              let (a, b, c) = match sched.do_work(stask) {
  299                  (sched, task, false) => {
  300                      sched.interpret_message_queue(task, GiveItYourBest)
  301                  }
  302                  (sched, task, true) => {
  303                      let (sched, task, _) =
  304                          sched.interpret_message_queue(task, GiveItYourBest);
  305                      (sched, task, true)
  306                  }
  307              };
  308              sched = a;
  309              stask = b;
  310              did_work = c;
  311  
  312              // We only keep going if we managed to do something productive and
  313              // also don't have any active I/O. If we didn't do anything, we
  314              // should consider going to sleep, and if we have active I/O we need
  315              // to poll for completion.
  316              keep_going = did_work && !sched.event_loop.has_active_io();
  317          }
  318  
  319          // If we ever did some work, then we shouldn't put our scheduler
  320          // entirely to sleep just yet. Leave the idle callback active and fall
  321          // back to epoll() to see what's going on.
  322          if did_work {
  323              return stask.put_with_sched(sched);
  324          }
  325  
  326          // If we got here then there was no work to do.
  327          // Generate a SchedHandle and push it to the sleeper list so
  328          // somebody can wake us up later.
  329          if !sched.sleepy && !sched.no_sleep {
  330              rtdebug!("scheduler has no work to do, going to sleep");
  331              sched.sleepy = true;
  332              let handle = sched.make_handle();
  333              sched.sleeper_list.push(handle);
  334              // Since we are sleeping, deactivate the idle callback.
  335              sched.idle_callback.get_mut_ref().pause();
  336          } else {
  337              rtdebug!("not sleeping, already doing so or no_sleep set");
  338              // We may not be sleeping, but we still need to deactivate
  339              // the idle callback.
  340              sched.idle_callback.get_mut_ref().pause();
  341          }
  342  
  343          // Finished a cycle without using the Scheduler. Place it back
  344          // in TLS.
  345          stask.put_with_sched(sched);
  346      }
  347  
  348      // This function returns None if the scheduler is "used", or it
  349      // returns the still-available scheduler. At this point all
  350      // message-handling will count as a turn of work, and as a result
  351      // return None.
  352      fn interpret_message_queue(mut ~self, staskBox<GreenTask>,
  353                                 effortEffortLevel)
  354              -> (Box<Scheduler>, Box<GreenTask>, bool)
  355      {
  356  
  357          let msg = if effort == DontTryTooHard {
  358              self.message_queue.casual_pop()
  359          } else {
  360              // When popping our message queue, we could see an "inconsistent"
  361              // state which means that we *should* be able to pop data, but we
  362              // are unable to at this time. Our options are:
  363              //
  364              //  1. Spin waiting for data
  365              //  2. Ignore this and pretend we didn't find a message
  366              //
  367              // If we choose route 1, then if the pusher in question is currently
  368              // pre-empted, we're going to take up our entire time slice just
  369              // spinning on this queue. If we choose route 2, then the pusher in
  370              // question is still guaranteed to make a send() on its async
  371              // handle, so we will guaranteed wake up and see its message at some
  372              // point.
  373              //
  374              // I have chosen to take route #2.
  375              match self.message_queue.pop() {
  376                  msgq::Data(t) => Some(t),
  377                  msgq::Empty | msgq::Inconsistent => None
  378              }
  379          };
  380  
  381          match msg {
  382              Some(PinnedTask(task)) => {
  383                  let mut task = task;
  384                  task.give_home(HomeSched(self.make_handle()));
  385                  let (sched, task) = self.resume_task_immediately(stask, task);
  386                  (sched, task, true)
  387              }
  388              Some(TaskFromFriend(task)) => {
  389                  rtdebug!("got a task from a friend. lovely!");
  390                  let (sched, task) =
  391                      self.process_task(stask, task,
  392                                        Scheduler::resume_task_immediately_cl);
  393                  (sched, task, true)
  394              }
  395              Some(RunOnce(task)) => {
  396                  // bypass the process_task logic to force running this task once
  397                  // on this home scheduler. This is often used for I/O (homing).
  398                  let (sched, task) = self.resume_task_immediately(stask, task);
  399                  (sched, task, true)
  400              }
  401              Some(Wake) => {
  402                  self.sleepy = false;
  403                  (self, stask, true)
  404              }
  405              Some(Shutdown) => {
  406                  rtdebug!("shutting down");
  407                  if self.sleepy {
  408                      // There may be an outstanding handle on the
  409                      // sleeper list.  Pop them all to make sure that's
  410                      // not the case.
  411                      loop {
  412                          match self.sleeper_list.pop() {
  413                              Some(handle) => {
  414                                  let mut handle = handle;
  415                                  handle.send(Wake);
  416                              }
  417                              None => break
  418                          }
  419                      }
  420                  }
  421                  // No more sleeping. After there are no outstanding
  422                  // event loop references we will shut down.
  423                  self.no_sleep = true;
  424                  self.sleepy = false;
  425                  (self, stask, true)
  426              }
  427              Some(NewNeighbor(neighbor)) => {
  428                  self.work_queues.push(neighbor);
  429                  (self, stask, false)
  430              }
  431              None => (self, stask, false)
  432          }
  433      }
  434  
  435      fn do_work(mut ~self, staskBox<GreenTask>)
  436                 -> (Box<Scheduler>, Box<GreenTask>, bool) {
  437          rtdebug!("scheduler calling do work");
  438          match self.find_work() {
  439              Some(task) => {
  440                  rtdebug!("found some work! running the task");
  441                  let (sched, task) =
  442                      self.process_task(stask, task,
  443                                        Scheduler::resume_task_immediately_cl);
  444                  (sched, task, true)
  445              }
  446              None => {
  447                  rtdebug!("no work was found, returning the scheduler struct");
  448                  (self, stask, false)
  449              }
  450          }
  451      }
  452  
  453      // Workstealing: In this iteration of the runtime each scheduler
  454      // thread has a distinct work queue. When no work is available
  455      // locally, make a few attempts to steal work from the queues of
  456      // other scheduler threads. If a few steals fail we end up in the
  457      // old "no work" path which is fine.
  458  
  459      // First step in the process is to find a task. This function does
  460      // that by first checking the local queue, and if there is no work
  461      // there, trying to steal from the remote work queues.
  462      fn find_work(&mut self) -> Option<Box<GreenTask>> {
  463          rtdebug!("scheduler looking for work");
  464          if !self.steal_for_yield {
  465              match self.work_queue.pop() {
  466                  Some(task) => {
  467                      rtdebug!("found a task locally");
  468                      return Some(task)
  469                  }
  470                  None => {
  471                      rtdebug!("scheduler trying to steal");
  472                      return self.try_steals();
  473                  }
  474              }
  475          } else {
  476              // During execution of the last task, it performed a 'yield',
  477              // so we're doing some work stealing in order to introduce some
  478              // scheduling randomness. Otherwise we would just end up popping
  479              // that same task again. This is pretty lame and is to work around
  480              // the problem that work stealing is not designed for 'non-strict'
  481              // (non-fork-join) task parallelism.
  482              self.steal_for_yield = false;
  483              match self.try_steals() {
  484                  Some(task) => {
  485                      rtdebug!("stole a task after yielding");
  486                      return Some(task);
  487                  }
  488                  None => {
  489                      rtdebug!("did not steal a task after yielding");
  490                      // Back to business
  491                      return self.find_work();
  492                  }
  493              }
  494          }
  495      }
  496  
  497      // Try stealing from all queues the scheduler knows about. This
  498      // naive implementation can steal from our own queue or from other
  499      // special schedulers.
  500      fn try_steals(&mut self) -> Option<Box<GreenTask>> {
  501          let work_queues = &mut self.work_queues;
  502          let len = work_queues.len();
  503          let start_index = self.rng.gen_range(0, len);
  504          for index in range(0, len).map(|i| (i + start_index) % len) {
  505              match work_queues.get_mut(index).steal() {
  506                  deque::Data(task) => {
  507                      rtdebug!("found task by stealing");
  508                      return Some(task)
  509                  }
  510                  _ => ()
  511              }
  512          };
  513          rtdebug!("giving up on stealing");
  514          return None;
  515      }
  516  
  517      // * Task Routing Functions - Make sure tasks send up in the right
  518      // place.
  519  
  520      fn process_task(mut ~self,
  521                      curBox<GreenTask>,
  522                      mut nextBox<GreenTask>,
  523                      schedule_fnSchedulingFn)
  524                      -> (Box<Scheduler>, Box<GreenTask>) {
  525          rtdebug!("processing a task");
  526  
  527          match next.take_unwrap_home() {
  528              HomeSched(home_handle) => {
  529                  if home_handle.sched_id != self.sched_id() {
  530                      rtdebug!("sending task home");
  531                      next.give_home(HomeSched(home_handle));
  532                      Scheduler::send_task_home(next);
  533                      (self, cur)
  534                  } else {
  535                      rtdebug!("running task here");
  536                      next.give_home(HomeSched(home_handle));
  537                      schedule_fn(self, cur, next)
  538                  }
  539              }
  540              AnySched if self.run_anything => {
  541                  rtdebug!("running anysched task here");
  542                  next.give_home(AnySched);
  543                  schedule_fn(self, cur, next)
  544              }
  545              AnySched => {
  546                  rtdebug!("sending task to friend");
  547                  next.give_home(AnySched);
  548                  self.send_to_friend(next);
  549                  (self, cur)
  550              }
  551          }
  552      }
  553  
  554      fn send_task_home(taskBox<GreenTask>) {
  555          let mut task = task;
  556          match task.take_unwrap_home() {
  557              HomeSched(mut home_handle) => home_handle.send(PinnedTask(task)),
  558              AnySched => rtabort!("error: cannot send anysched task home"),
  559          }
  560      }
  561  
  562      /// Take a non-homed task we aren't allowed to run here and send
  563      /// it to the designated friend scheduler to execute.
  564      fn send_to_friend(&mut self, taskBox<GreenTask>) {
  565          rtdebug!("sending a task to friend");
  566          match self.friend_handle {
  567              Some(ref mut handle) => {
  568                  handle.send(TaskFromFriend(task));
  569              }
  570              None => {
  571                  rtabort!("tried to send task to a friend but scheduler has no friends");
  572              }
  573          }
  574      }
  575  
  576      /// Schedule a task to be executed later.
  577      ///
  578      /// Pushes the task onto the work stealing queue and tells the
  579      /// event loop to run it later. Always use this instead of pushing
  580      /// to the work queue directly.
  581      pub fn enqueue_task(&mut self, taskBox<GreenTask>) {
  582  
  583          // We push the task onto our local queue clone.
  584          assert!(!task.is_sched());
  585          self.work_queue.push(task);
  586          match self.idle_callback {
  587              Some(ref mut idle) => idle.resume(),
  588              None => {} // allow enqueuing before the scheduler starts
  589          }
  590  
  591          // We've made work available. Notify a
  592          // sleeping scheduler.
  593  
  594          match self.sleeper_list.casual_pop() {
  595              Some(handle) => {
  596                  let mut handle = handle;
  597                  handle.send(Wake)
  598              }
  599              None => { (/* pass */) }
  600          };
  601      }
  602  
  603      // * Core Context Switching Functions
  604  
  605      // The primary function for changing contexts. In the current
  606      // design the scheduler is just a slightly modified GreenTask, so
  607      // all context swaps are from GreenTask to GreenTask. The only difference
  608      // between the various cases is where the inputs come from, and
  609      // what is done with the resulting task. That is specified by the
  610      // cleanup function f, which takes the scheduler and the
  611      // old task as inputs.
  612  
  613      pub fn change_task_context(mut ~self,
  614                                 current_taskBox<GreenTask>,
  615                                 mut next_taskBox<GreenTask>,
  616                                 f|&mut Scheduler, Box<GreenTask>|)
  617                                 -> Box<GreenTask> {
  618          let f_opaque = ClosureConverter::from_fn(f);
  619  
  620          let current_task_dupe = &*current_task as *GreenTask;
  621  
  622          // The current task is placed inside an enum with the cleanup
  623          // function. This enum is then placed inside the scheduler.
  624          self.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
  625  
  626          // The scheduler is then placed inside the next task.
  627          next_task.sched = Some(self);
  628  
  629          // However we still need an internal mutable pointer to the
  630          // original task. The strategy here was "arrange memory, then
  631          // get pointers", so we crawl back up the chain using
  632          // transmute to eliminate borrowck errors.
  633          unsafe {
  634  
  635              let sched&mut Scheduler =
  636                  cast::transmute_mut_lifetime(*next_task.sched.get_mut_ref());
  637  
  638              let current_task&mut GreenTask = match sched.cleanup_job {
  639                  Some(CleanupJob { task: ref mut task, .. }) => &mut **task,
  640                  None => rtabort!("no cleanup job")
  641              };
  642  
  643              let (current_task_context, next_task_context) =
  644                  Scheduler::get_contexts(current_task, next_task);
  645  
  646              // Done with everything - put the next task in TLS. This
  647              // works because due to transmute the borrow checker
  648              // believes that we have no internal pointers to
  649              // next_task.
  650              cast::forget(next_task);
  651  
  652              // The raw context swap operation. The next action taken
  653              // will be running the cleanup job from the context of the
  654              // next task.
  655              Context::swap(current_task_context, next_task_context);
  656          }
  657  
  658          // When the context swaps back to this task we immediately
  659          // run the cleanup job, as expected by the previously called
  660          // swap_contexts function.
  661          let mut current_taskBox<GreenTask> = unsafe {
  662              cast::transmute(current_task_dupe)
  663          };
  664          current_task.sched.get_mut_ref().run_cleanup_job();
  665  
  666          // See the comments in switch_running_tasks_and_then for why a lock
  667          // is acquired here. This is the resumption points and the "bounce"
  668          // that it is referring to.
  669          unsafe {
  670              let _guard = current_task.nasty_deschedule_lock.lock();
  671          }
  672          return current_task;
  673      }
  674  
  675      // Returns a mutable reference to both contexts involved in this
  676      // swap. This is unsafe - we are getting mutable internal
  677      // references to keep even when we don't own the tasks. It looks
  678      // kinda safe because we are doing transmutes before passing in
  679      // the arguments.
  680      pub fn get_contexts<'a>(current_task&mut GreenTask, next_task&mut GreenTask) ->
  681          (&'a mut Context, &'a mut Context) {
  682          let current_task_context =
  683              &mut current_task.coroutine.get_mut_ref().saved_context;
  684          let next_task_context =
  685                  &mut next_task.coroutine.get_mut_ref().saved_context;
  686          unsafe {
  687              (cast::transmute_mut_lifetime(current_task_context),
  688               cast::transmute_mut_lifetime(next_task_context))
  689          }
  690      }
  691  
  692      // * Context Swapping Helpers - Here be ugliness!
  693  
  694      pub fn resume_task_immediately(~self,
  695                                     curBox<GreenTask>,
  696                                     nextBox<GreenTask>)
  697                                     -> (Box<Scheduler>, Box<GreenTask>) {
  698          assert!(cur.is_sched());
  699          let mut cur = self.change_task_context(cur, next, |sched, stask| {
  700              assert!(sched.sched_task.is_none());
  701              sched.sched_task = Some(stask);
  702          });
  703          (cur.sched.take_unwrap(), cur)
  704      }
  705  
  706      fn resume_task_immediately_cl(schedBox<Scheduler>,
  707                                    curBox<GreenTask>,
  708                                    nextBox<GreenTask>)
  709                                    -> (Box<Scheduler>, Box<GreenTask>) {
  710          sched.resume_task_immediately(cur, next)
  711      }
  712  
  713      /// Block a running task, context switch to the scheduler, then pass the
  714      /// blocked task to a closure.
  715      ///
  716      /// # Safety note
  717      ///
  718      /// The closure here is a *stack* closure that lives in the
  719      /// running task.  It gets transmuted to the scheduler's lifetime
  720      /// and called while the task is blocked.
  721      ///
  722      /// This passes a Scheduler pointer to the fn after the context switch
  723      /// in order to prevent that fn from performing further scheduling operations.
  724      /// Doing further scheduling could easily result in infinite recursion.
  725      ///
  726      /// Note that if the closure provided relinquishes ownership of the
  727      /// BlockedTask, then it is possible for the task to resume execution before
  728      /// the closure has finished executing. This would naturally introduce a
  729      /// race if the closure and task shared portions of the environment.
  730      ///
  731      /// This situation is currently prevented, or in other words it is
  732      /// guaranteed that this function will not return before the given closure
  733      /// has returned.
  734      pub fn deschedule_running_task_and_then(mut ~self,
  735                                              curBox<GreenTask>,
  736                                              f|&mut Scheduler, BlockedTask|) {
  737          // Trickier - we need to get the scheduler task out of self
  738          // and use it as the destination.
  739          let stask = self.sched_task.take_unwrap();
  740          // Otherwise this is the same as below.
  741          self.switch_running_tasks_and_then(cur, stask, f)
  742      }
  743  
  744      pub fn switch_running_tasks_and_then(~self,
  745                                           curBox<GreenTask>,
  746                                           nextBox<GreenTask>,
  747                                           f|&mut Scheduler, BlockedTask|) {
  748          // And here comes one of the sad moments in which a lock is used in a
  749          // core portion of the rust runtime. As always, this is highly
  750          // undesirable, so there's a good reason behind it.
  751          //
  752          // There is an excellent outline of the problem in issue #8132, and it's
  753          // summarized in that `f` is executed on a sched task, but its
  754          // environment is on the previous task. If `f` relinquishes ownership of
  755          // the BlockedTask, then it may introduce a race where `f` is using the
  756          // environment as well as the code after the 'deschedule' block.
  757          //
  758          // The solution we have chosen to adopt for now is to acquire a
  759          // task-local lock around this block. The resumption of the task in
  760          // context switching will bounce on the lock, thereby waiting for this
  761          // block to finish, eliminating the race mentioned above.
  762          // fail!("should never return!");
  763          //
  764          // To actually maintain a handle to the lock, we use an unsafe pointer
  765          // to it, but we're guaranteed that the task won't exit until we've
  766          // unlocked the lock so there's no worry of this memory going away.
  767          let cur = self.change_task_context(cur, next, |sched, mut task| {
  768              let lock*mut NativeMutex = &mut task.nasty_deschedule_lock;
  769              unsafe {
  770                  let _guard = (*lock).lock();
  771                  f(sched, BlockedTask::block(task.swap()));
  772              }
  773          });
  774          cur.put();
  775      }
  776  
  777      fn switch_task(schedBox<Scheduler>,
  778                     curBox<GreenTask>,
  779                     nextBox<GreenTask>)
  780                     -> (Box<Scheduler>, Box<GreenTask>) {
  781          let mut cur = sched.change_task_context(cur, next, |sched, last_task| {
  782              if last_task.is_sched() {
  783                  assert!(sched.sched_task.is_none());
  784                  sched.sched_task = Some(last_task);
  785              } else {
  786                  sched.enqueue_task(last_task);
  787              }
  788          });
  789          (cur.sched.take_unwrap(), cur)
  790      }
  791  
  792      // * Task Context Helpers
  793  
  794      /// Called by a running task to end execution, after which it will
  795      /// be recycled by the scheduler for reuse in a new task.
  796      pub fn terminate_current_task(mut ~self, curBox<GreenTask>) -> ! {
  797          // Similar to deschedule running task and then, but cannot go through
  798          // the task-blocking path. The task is already dying.
  799          let stask = self.sched_task.take_unwrap();
  800          let _cur = self.change_task_context(cur, stask, |sched, mut dead_task| {
  801              let coroutine = dead_task.coroutine.take_unwrap();
  802              coroutine.recycle(&mut sched.stack_pool);
  803              sched.task_state.decrement();
  804          });
  805          fail!("should never return!");
  806      }
  807  
  808      pub fn run_task(~self, curBox<GreenTask>, nextBox<GreenTask>) {
  809          let (sched, task) =
  810              self.process_task(cur, next, Scheduler::switch_task);
  811          task.put_with_sched(sched);
  812      }
  813  
  814      pub fn run_task_later(mut curBox<GreenTask>, nextBox<GreenTask>) {
  815          let mut sched = cur.sched.take_unwrap();
  816          sched.enqueue_task(next);
  817          cur.put_with_sched(sched);
  818      }
  819  
  820      /// Yield control to the scheduler, executing another task. This is guaranteed
  821      /// to introduce some amount of randomness to the scheduler. Currently the
  822      /// randomness is a result of performing a round of work stealing (which
  823      /// may end up stealing from the current scheduler).
  824      pub fn yield_now(mut ~self, curBox<GreenTask>) {
  825          // Async handles trigger the scheduler by calling yield_now on the local
  826          // task, which eventually gets us to here. See comments in SchedRunner
  827          // for more info on this.
  828          if cur.is_sched() {
  829              assert!(self.sched_task.is_none());
  830              self.run_sched_once(cur);
  831          } else {
  832              self.yield_check_count = reset_yield_check(&mut self.rng);
  833              // Tell the scheduler to start stealing on the next iteration
  834              self.steal_for_yield = true;
  835              let stask = self.sched_task.take_unwrap();
  836              let cur = self.change_task_context(cur, stask, |sched, task| {
  837                  sched.enqueue_task(task);
  838              });
  839              cur.put()
  840          }
  841      }
  842  
  843      pub fn maybe_yield(mut ~self, curBox<GreenTask>) {
  844          // It's possible for sched tasks to possibly call this function, and it
  845          // just means that they're likely sending on channels (which
  846          // occasionally call this function). Sched tasks follow different paths
  847          // when executing yield_now(), which may possibly trip the assertion
  848          // below. For this reason, we just have sched tasks bail out soon.
  849          //
  850          // Sched tasks have no need to yield anyway because as soon as they
  851          // return they'll yield to other threads by falling back to the event
  852          // loop. Additionally, we completely control sched tasks, so we can make
  853          // sure that they never execute more than enough code.
  854          if cur.is_sched() {
  855              return cur.put_with_sched(self)
  856          }
  857  
  858          // The number of times to do the yield check before yielding, chosen
  859          // arbitrarily.
  860          rtassert!(self.yield_check_count > 0);
  861          self.yield_check_count -= 1;
  862          if self.yield_check_count == 0 {
  863              self.yield_now(cur);
  864          } else {
  865              cur.put_with_sched(self);
  866          }
  867      }
  868  
  869  
  870      // * Utility Functions
  871  
  872      pub fn sched_id(&self) -> uint { self as *Scheduler as uint }
  873  
  874      pub fn run_cleanup_job(&mut self) {
  875          let cleanup_job = self.cleanup_job.take_unwrap();
  876          cleanup_job.run(self)
  877      }
  878  
  879      pub fn make_handle(&mut self) -> SchedHandle {
  880          let remote = self.event_loop.remote_callback(box SchedRunner);
  881  
  882          return SchedHandle {
  883              remote: remote,
  884              queue: self.message_producer.clone(),
  885              sched_id: self.sched_id()
  886          }
  887      }
  888  }
  889  
  890  // Supporting types
  891  
  892  type SchedulingFn = fn(Box<Scheduler>, Box<GreenTask>, Box<GreenTask>)
  893                         -> (Box<Scheduler>, Box<GreenTask>);
  894  
  895  pub enum SchedMessage {
  896      Wake,
  897      Shutdown,
  898      NewNeighbor(deque::Stealer<Box<GreenTask>>),
  899      PinnedTask(Box<GreenTask>),
  900      TaskFromFriend(Box<GreenTask>),
  901      RunOnce(Box<GreenTask>),
  902  }
  903  
  904  pub struct SchedHandle {
  905      remote: Box<RemoteCallback:Send>,
  906      queue: msgq::Producer<SchedMessage>,
  907      pub sched_id: uint
  908  }
  909  
  910  impl SchedHandle {
  911      pub fn send(&mut self, msgSchedMessage) {
  912          self.queue.push(msg);
  913          self.remote.fire();
  914      }
  915  }
  916  
  917  struct SchedRunner;
  918  
  919  impl Callback for SchedRunner {
  920      fn call(&mut self) {
  921          // In theory, this function needs to invoke the `run_sched_once`
  922          // function on the scheduler. Sadly, we have no context here, except for
  923          // knowledge of the local `Task`. In order to avoid a call to
  924          // `GreenTask::convert`, we just call `yield_now` and the scheduler will
  925          // detect when a sched task performs a yield vs a green task performing
  926          // a yield (and act accordingly).
  927          //
  928          // This function could be converted to `GreenTask::convert` if
  929          // absolutely necessary, but for cleanliness it is much better to not
  930          // use the conversion function.
  931          let taskBox<Task> = Local::take();
  932          task.yield_now();
  933      }
  934  }
  935  
  936  struct CleanupJob {
  937      task: Box<GreenTask>,
  938      f: UnsafeTaskReceiver
  939  }
  940  
  941  impl CleanupJob {
  942      pub fn new(taskBox<GreenTask>, fUnsafeTaskReceiver) -> CleanupJob {
  943          CleanupJob {
  944              task: task,
  945              f: f
  946          }
  947      }
  948  
  949      pub fn run(self, sched&mut Scheduler) {
  950          let CleanupJob { task: task, f: f } = self;
  951          f.to_fn()(sched, task)
  952      }
  953  }
  954  
  955  // FIXME: Some hacks to put a || closure in Scheduler without borrowck
  956  // complaining
  957  type UnsafeTaskReceiver = raw::Closure;
  958  trait ClosureConverter {
  959      fn from_fn(|&mut Scheduler, Box<GreenTask>|) -> Self;
  960      fn to_fn(self) -> |&mut Scheduler, Box<GreenTask>|;
  961  }
  962  impl ClosureConverter for UnsafeTaskReceiver {
  963      fn from_fn(f|&mut Scheduler, Box<GreenTask>|) -> UnsafeTaskReceiver {
  964          unsafe { cast::transmute(f) }
  965      }
  966      fn to_fn(self) -> |&mut Scheduler, Box<GreenTask>| {
  967          unsafe { cast::transmute(self) }
  968      }
  969  }
  970  
  971  // On unix, we read randomness straight from /dev/urandom, but the
  972  // default constructor of an XorShiftRng does this via io::fs, which
  973  // relies on the scheduler existing, so we have to manually load
  974  // randomness. Windows has its own C API for this, so we don't need to
  975  // worry there.
  976  #[cfg(windows)]
  977  fn new_sched_rng() -> XorShiftRng {
  978      match XorShiftRng::new() {
  979          Ok(r) => r,
  980          Err(e) => {
  981              rtabort!("sched: failed to create seeded RNG: {}", e)
  982          }
  983      }
  984  }
  985  #[cfg(unix)]
  986  fn new_sched_rng() -> XorShiftRng {
  987      use libc;
  988      use std::mem;
  989      use rand::SeedableRng;
  990  
  991      let fd = "/dev/urandom".with_c_str(|name| {
  992          unsafe { libc::open(name, libc::O_RDONLY, 0) }
  993      });
  994      if fd == -1 {
  995          rtabort!("could not open /dev/urandom for reading.")
  996      }
  997  
  998      let mut seeds = [0u32, .. 4];
  999      let size = mem::size_of_val(&seeds);
 1000      loop {
 1001          let nbytes = unsafe {
 1002              libc::read(fd,
 1003                         seeds.as_mut_ptr() as *mut libc::c_void,
 1004                         size as libc::size_t)
 1005          };
 1006          rtassert!(nbytes as uint == size);
 1007  
 1008          if !seeds.iter().all(|x| *x == 0) {
 1009              break;
 1010          }
 1011      }
 1012  
 1013      unsafe {libc::close(fd);}
 1014  
 1015      SeedableRng::from_seed(seeds)
 1016  }
 1017  
 1018  #[cfg(test)]
 1019  mod test {
 1020      use rustuv;
 1021  
 1022      use std::task::TaskOpts;
 1023      use std::rt::task::Task;
 1024      use std::rt::local::Local;
 1025  
 1026      use {TaskState, PoolConfig, SchedPool};
 1027      use basic;
 1028      use sched::{TaskFromFriend, PinnedTask};
 1029      use task::{GreenTask, HomeSched};
 1030  
 1031      fn pool() -> SchedPool {
 1032          SchedPool::new(PoolConfig {
 1033              threads: 1,
 1034              event_loop_factory: basic::event_loop,
 1035          })
 1036      }
 1037  
 1038      fn run(f: proc():Send) {
 1039          let mut pool = pool();
 1040          pool.spawn(TaskOpts::new(), f);
 1041          pool.shutdown();
 1042      }
 1043  
 1044      fn sched_id() -> uint {
 1045          let mut task = Local::borrow(None::<Task>);
 1046          match task.maybe_take_runtime::<GreenTask>() {
 1047              Some(green) => {
 1048                  let ret = green.sched.get_ref().sched_id();
 1049                  task.put_runtime(green);
 1050                  return ret;
 1051              }
 1052              None => fail!()
 1053          }
 1054      }
 1055  
 1056      #[test]
 1057      fn trivial_run_in_newsched_task_test() {
 1058          let mut task_ran = false;
 1059          let task_ran_ptr: *mut bool = &mut task_ran;
 1060          run(proc() {
 1061              unsafe { *task_ran_ptr = true };
 1062              rtdebug!("executed from the new scheduler")
 1063          });
 1064          assert!(task_ran);
 1065      }
 1066  
 1067      #[test]
 1068      fn multiple_task_test() {
 1069          let total = 10;
 1070          let mut task_run_count = 0;
 1071          let task_run_count_ptr: *mut uint = &mut task_run_count;
 1072          // with only one thread this is safe to run in without worries of
 1073          // contention.
 1074          run(proc() {
 1075              for _ in range(0u, total) {
 1076                  spawn(proc() {
 1077                      unsafe { *task_run_count_ptr = *task_run_count_ptr + 1};
 1078                  });
 1079              }
 1080          });
 1081          assert!(task_run_count == total);
 1082      }
 1083  
 1084      #[test]
 1085      fn multiple_task_nested_test() {
 1086          let mut task_run_count = 0;
 1087          let task_run_count_ptr: *mut uint = &mut task_run_count;
 1088          run(proc() {
 1089              spawn(proc() {
 1090                  unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
 1091                  spawn(proc() {
 1092                      unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
 1093                      spawn(proc() {
 1094                          unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
 1095                      })
 1096                  })
 1097              })
 1098          });
 1099          assert!(task_run_count == 3);
 1100      }
 1101  
 1102      // A very simple test that confirms that a task executing on the
 1103      // home scheduler notices that it is home.
 1104      #[test]
 1105      fn test_home_sched() {
 1106          let mut pool = pool();
 1107  
 1108          let (dtx, drx) = channel();
 1109          {
 1110              let (tx, rx) = channel();
 1111              let mut handle1 = pool.spawn_sched();
 1112              let mut handle2 = pool.spawn_sched();
 1113  
 1114              handle1.send(TaskFromFriend(pool.task(TaskOpts::new(), proc() {
 1115                  tx.send(sched_id());
 1116              })));
 1117              let sched1_id = rx.recv();
 1118  
 1119              let mut task = pool.task(TaskOpts::new(), proc() {
 1120                  assert_eq!(sched_id(), sched1_id);
 1121                  dtx.send(());
 1122              });
 1123              task.give_home(HomeSched(handle1));
 1124              handle2.send(TaskFromFriend(task));
 1125          }
 1126          drx.recv();
 1127  
 1128          pool.shutdown();
 1129      }
 1130  
 1131      // An advanced test that checks all four possible states that a
 1132      // (task,sched) can be in regarding homes.
 1133  
 1134      #[test]
 1135      fn test_schedule_home_states() {
 1136          use sleeper_list::SleeperList;
 1137          use super::{Shutdown, Scheduler, SchedHandle};
 1138          use std::unstable::run_in_bare_thread;
 1139          use std::rt::thread::Thread;
 1140          use std::sync::deque::BufferPool;
 1141  
 1142          run_in_bare_thread(proc() {
 1143              let sleepers = SleeperList::new();
 1144              let mut pool = BufferPool::new();
 1145              let (normal_worker, normal_stealer) = pool.deque();
 1146              let (special_worker, special_stealer) = pool.deque();
 1147              let queues = vec![normal_stealer, special_stealer];
 1148              let (_p, state) = TaskState::new();
 1149  
 1150              // Our normal scheduler
 1151              let mut normal_sched = box Scheduler::new(
 1152                  1,
 1153                  basic::event_loop(),
 1154                  normal_worker,
 1155                  queues.clone(),
 1156                  sleepers.clone(),
 1157                  state.clone());
 1158  
 1159              let normal_handle = normal_sched.make_handle();
 1160              let friend_handle = normal_sched.make_handle();
 1161  
 1162              // Our special scheduler
 1163              let mut special_sched = box Scheduler::new_special(
 1164                  1,
 1165                  basic::event_loop(),
 1166                  special_worker,
 1167                  queues.clone(),
 1168                  sleepers.clone(),
 1169                  false,
 1170                  Some(friend_handle),
 1171                  state);
 1172  
 1173              let special_handle = special_sched.make_handle();
 1174  
 1175              let t1_handle = special_sched.make_handle();
 1176              let t4_handle = special_sched.make_handle();
 1177  
 1178              // Four test tasks:
 1179              //   1) task is home on special
 1180              //   2) task not homed, sched doesn't care
 1181              //   3) task not homed, sched requeues
 1182              //   4) task not home, send home
 1183  
 1184              // Grab both the scheduler and the task from TLS and check if the
 1185              // task is executing on an appropriate scheduler.
 1186              fn on_appropriate_sched() -> bool {
 1187                  use task::{TypeGreen, TypeSched, HomeSched};
 1188                  let task = GreenTask::convert(Local::take());
 1189                  let sched_id = task.sched.get_ref().sched_id();
 1190                  let run_any = task.sched.get_ref().run_anything;
 1191                  let ret = match task.task_type {
 1192                      TypeGreen(Some(AnySched)) => {
 1193                          run_any
 1194                      }
 1195                      TypeGreen(Some(HomeSched(SchedHandle {
 1196                          sched_id: ref id,
 1197                          ..
 1198                      }))) => {
 1199                          *id == sched_id
 1200                      }
 1201                      TypeGreen(None) => { fail!("task without home"); }
 1202                      TypeSched => { fail!("expected green task"); }
 1203                  };
 1204                  task.put();
 1205                  ret
 1206              }
 1207  
 1208              let task1 = GreenTask::new_homed(&mut special_sched.stack_pool,
 1209                                               None, HomeSched(t1_handle), proc() {
 1210                  rtassert!(on_appropriate_sched());
 1211              });
 1212  
 1213              let task2 = GreenTask::new(&mut normal_sched.stack_pool, None, proc() {
 1214                  rtassert!(on_appropriate_sched());
 1215              });
 1216  
 1217              let task3 = GreenTask::new(&mut normal_sched.stack_pool, None, proc() {
 1218                  rtassert!(on_appropriate_sched());
 1219              });
 1220  
 1221              let task4 = GreenTask::new_homed(&mut special_sched.stack_pool,
 1222                                               None, HomeSched(t4_handle), proc() {
 1223                  rtassert!(on_appropriate_sched());
 1224              });
 1225  
 1226              // Signal from the special task that we are done.
 1227              let (tx, rx) = channel::<()>();
 1228  
 1229              fn run(next: Box<GreenTask>) {
 1230                  let mut task = GreenTask::convert(Local::take());
 1231                  let sched = task.sched.take_unwrap();
 1232                  sched.run_task(task, next)
 1233              }
 1234  
 1235              let normal_task = GreenTask::new(&mut normal_sched.stack_pool, None, proc() {
 1236                  run(task2);
 1237                  run(task4);
 1238                  rx.recv();
 1239                  let mut nh = normal_handle;
 1240                  nh.send(Shutdown);
 1241                  let mut sh = special_handle;
 1242                  sh.send(Shutdown);
 1243              });
 1244              normal_sched.enqueue_task(normal_task);
 1245  
 1246              let special_task = GreenTask::new(&mut special_sched.stack_pool, None, proc() {
 1247                  run(task1);
 1248                  run(task3);
 1249                  tx.send(());
 1250              });
 1251              special_sched.enqueue_task(special_task);
 1252  
 1253              let normal_sched = normal_sched;
 1254              let normal_thread = Thread::start(proc() { normal_sched.bootstrap() });
 1255  
 1256              let special_sched = special_sched;
 1257              let special_thread = Thread::start(proc() { special_sched.bootstrap() });
 1258  
 1259              normal_thread.join();
 1260              special_thread.join();
 1261          });
 1262      }
 1263  
 1264      //#[test]
 1265      //fn test_stress_schedule_task_states() {
 1266      //    if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
 1267      //    let n = stress_factor() * 120;
 1268      //    for _ in range(0, n as int) {
 1269      //        test_schedule_home_states();
 1270      //    }
 1271      //}
 1272  
 1273      #[test]
 1274      fn test_io_callback() {
 1275          use std::io::timer;
 1276  
 1277          let mut pool = SchedPool::new(PoolConfig {
 1278              threads: 2,
 1279              event_loop_factory: rustuv::event_loop,
 1280          });
 1281  
 1282          // This is a regression test that when there are no schedulable tasks in
 1283          // the work queue, but we are performing I/O, that once we do put
 1284          // something in the work queue again the scheduler picks it up and
 1285          // doesn't exit before emptying the work queue
 1286          pool.spawn(TaskOpts::new(), proc() {
 1287              spawn(proc() {
 1288                  timer::sleep(10);
 1289              });
 1290          });
 1291  
 1292          pool.shutdown();
 1293      }
 1294  
 1295      #[test]
 1296      fn wakeup_across_scheds() {
 1297          let (tx1, rx1) = channel();
 1298          let (tx2, rx2) = channel();
 1299  
 1300          let mut pool1 = pool();
 1301          let mut pool2 = pool();
 1302  
 1303          pool1.spawn(TaskOpts::new(), proc() {
 1304              let id = sched_id();
 1305              tx1.send(());
 1306              rx2.recv();
 1307              assert_eq!(id, sched_id());
 1308          });
 1309  
 1310          pool2.spawn(TaskOpts::new(), proc() {
 1311              let id = sched_id();
 1312              rx1.recv();
 1313              assert_eq!(id, sched_id());
 1314              tx2.send(());
 1315          });
 1316  
 1317          pool1.shutdown();
 1318          pool2.shutdown();
 1319      }
 1320  
 1321      // A regression test that the final message is always handled.
 1322      // Used to deadlock because Shutdown was never recvd.
 1323      #[test]
 1324      fn no_missed_messages() {
 1325          let mut pool = pool();
 1326  
 1327          let task = pool.task(TaskOpts::new(), proc()());
 1328          pool.spawn_sched().send(TaskFromFriend(task));
 1329  
 1330          pool.shutdown();
 1331      }
 1332  
 1333      #[test]
 1334      fn multithreading() {
 1335          run(proc() {
 1336              let mut rxs = vec![];
 1337              for _ in range(0, 10) {
 1338                  let (tx, rx) = channel();
 1339                  spawn(proc() {
 1340                      tx.send(());
 1341                  });
 1342                  rxs.push(rx);
 1343              }
 1344  
 1345              loop {
 1346                  match rxs.pop() {
 1347                      Some(rx) => rx.recv(),
 1348                      None => break,
 1349                  }
 1350              }
 1351          });
 1352      }
 1353  
 1354       #[test]
 1355      fn thread_ring() {
 1356          run(proc() {
 1357              let (end_tx, end_rx) = channel();
 1358  
 1359              let n_tasks = 10;
 1360              let token = 2000;
 1361  
 1362              let (tx1, mut rx) = channel();
 1363              tx1.send((token, end_tx));
 1364              let mut i = 2;
 1365              while i <= n_tasks {
 1366                  let (tx, next_rx) = channel();
 1367                  let imm_i = i;
 1368                  let imm_rx = rx;
 1369                  spawn(proc() {
 1370                      roundtrip(imm_i, n_tasks, &imm_rx, &tx);
 1371                  });
 1372                  rx = next_rx;
 1373                  i += 1;
 1374              }
 1375              let rx = rx;
 1376              spawn(proc() {
 1377                  roundtrip(1, n_tasks, &rx, &tx1);
 1378              });
 1379  
 1380              end_rx.recv();
 1381          });
 1382  
 1383          fn roundtrip(id: int, n_tasks: int,
 1384                       rx: &Receiver<(int, Sender<()>)>,
 1385                       tx: &Sender<(int, Sender<()>)>) {
 1386              loop {
 1387                  match rx.recv() {
 1388                      (1, end_tx) => {
 1389                          debug!("{}\n", id);
 1390                          end_tx.send(());
 1391                          return;
 1392                      }
 1393                      (token, end_tx) => {
 1394                          debug!("thread: {}   got token: {}", id, token);
 1395                          tx.send((token - 1, end_tx));
 1396                          if token <= n_tasks {
 1397                              return;
 1398                          }
 1399                      }
 1400                  }
 1401              }
 1402          }
 1403      }
 1404  
 1405      #[test]
 1406      fn start_closure_dtor() {
 1407          // Regression test that the `start` task entrypoint can
 1408          // contain dtors that use task resources
 1409          run(proc() {
 1410              struct S { field: () }
 1411  
 1412              impl Drop for S {
 1413                  fn drop(&mut self) {
 1414                      let _foo = box 0;
 1415                  }
 1416              }
 1417  
 1418              let s = S { field: () };
 1419  
 1420              spawn(proc() {
 1421                  let _ss = &s;
 1422              });
 1423          });
 1424      }
 1425  
 1426      #[test]
 1427      fn dont_starve_1() {
 1428          let mut pool = SchedPool::new(PoolConfig {
 1429              threads: 2, // this must be > 1
 1430              event_loop_factory: basic::event_loop,
 1431          });
 1432          pool.spawn(TaskOpts::new(), proc() {
 1433              let (tx, rx) = channel();
 1434  
 1435              // This task should not be able to starve the sender;
 1436              // The sender should get stolen to another thread.
 1437              spawn(proc() {
 1438                  while rx.try_recv().is_err() { }
 1439              });
 1440  
 1441              tx.send(());
 1442          });
 1443          pool.shutdown();
 1444      }
 1445  
 1446      #[test]
 1447      fn dont_starve_2() {
 1448          run(proc() {
 1449              let (tx1, rx1) = channel();
 1450              let (tx2, _rx2) = channel();
 1451  
 1452              // This task should not be able to starve the other task.
 1453              // The sends should eventually yield.
 1454              spawn(proc() {
 1455                  while rx1.try_recv().is_err() {
 1456                      tx2.send(());
 1457                  }
 1458              });
 1459  
 1460              tx1.send(());
 1461          });
 1462      }
 1463  
 1464      // Regression test for a logic bug that would cause single-threaded
 1465      // schedulers to sleep forever after yielding and stealing another task.
 1466      #[test]
 1467      fn single_threaded_yield() {
 1468          use std::task::deschedule;
 1469          run(proc() {
 1470              for _ in range(0, 5) { deschedule(); }
 1471          });
 1472      }
 1473  
 1474      #[test]
 1475      fn test_spawn_sched_blocking() {
 1476          use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
 1477          static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
 1478  
 1479          // Testing that a task in one scheduler can block in foreign code
 1480          // without affecting other schedulers
 1481          for _ in range(0, 20) {
 1482              let mut pool = pool();
 1483              let (start_tx, start_rx) = channel();
 1484              let (fin_tx, fin_rx) = channel();
 1485  
 1486              let mut handle = pool.spawn_sched();
 1487              handle.send(PinnedTask(pool.task(TaskOpts::new(), proc() {
 1488                  unsafe {
 1489                      let guard = LOCK.lock();
 1490  
 1491                      start_tx.send(());
 1492                      guard.wait();   // block the scheduler thread
 1493                      guard.signal(); // let them know we have the lock
 1494                  }
 1495  
 1496                  fin_tx.send(());
 1497              })));
 1498              drop(handle);
 1499  
 1500              let mut handle = pool.spawn_sched();
 1501              handle.send(PinnedTask(pool.task(TaskOpts::new(), proc() {
 1502                  // Wait until the other task has its lock
 1503                  start_rx.recv();
 1504  
 1505                  fn pingpong(po: &Receiver<int>, ch: &Sender<int>) {
 1506                      let mut val = 20;
 1507                      while val > 0 {
 1508                          val = po.recv();
 1509                          let _ = ch.send_opt(val - 1);
 1510                      }
 1511                  }
 1512  
 1513                  let (setup_tx, setup_rx) = channel();
 1514                  let (parent_tx, parent_rx) = channel();
 1515                  spawn(proc() {
 1516                      let (child_tx, child_rx) = channel();
 1517                      setup_tx.send(child_tx);
 1518                      pingpong(&child_rx, &parent_tx);
 1519                  });
 1520  
 1521                  let child_tx = setup_rx.recv();
 1522                  child_tx.send(20);
 1523                  pingpong(&parent_rx, &child_tx);
 1524                  unsafe {
 1525                      let guard = LOCK.lock();
 1526                      guard.signal();   // wakeup waiting scheduler
 1527                      guard.wait();     // wait for them to grab the lock
 1528                  }
 1529              })));
 1530              drop(handle);
 1531  
 1532              fin_rx.recv();
 1533              pool.shutdown();
 1534          }
 1535          unsafe { LOCK.destroy(); }
 1536      }
 1537  }


libgreen/sched.rs:37:48-37:48 -struct- definition:
/// in too much allocation and too many events.
pub struct Scheduler {
    /// ID number of the pool that this scheduler is a member of. When
references:- 27
148:         let (consumer, producer) = msgq::queue();
149:         let mut sched = Scheduler {
150:             pool_id: pool_id,
--
735:                                             cur: Box<GreenTask>,
736:                                             f: |&mut Scheduler, BlockedTask|) {
737:         // Trickier - we need to get the scheduler task out of self
--
958: trait ClosureConverter {
959:     fn from_fn(|&mut Scheduler, Box<GreenTask>|) -> Self;
960:     fn to_fn(self) -> |&mut Scheduler, Box<GreenTask>|;
--
962: impl ClosureConverter for UnsafeTaskReceiver {
963:     fn from_fn(f: |&mut Scheduler, Box<GreenTask>|) -> UnsafeTaskReceiver {
964:         unsafe { cast::transmute(f) }
libgreen/task.rs:
279:     pub fn put_with_sched(mut ~self, sched: Box<Scheduler>) {
280:         assert!(self.sched.is_none());
libgreen/sched.rs:
965:     }
966:     fn to_fn(self) -> |&mut Scheduler, Box<GreenTask>| {
967:         unsafe { cast::transmute(self) }


libgreen/sched.rs:935:1-935:1 -struct- definition:
struct CleanupJob {
    task: Box<GreenTask>,
    f: UnsafeTaskReceiver
references:- 6
942:     pub fn new(task: Box<GreenTask>, f: UnsafeTaskReceiver) -> CleanupJob {
943:         CleanupJob {
944:             task: task,
--
949:     pub fn run(self, sched: &mut Scheduler) {
950:         let CleanupJob { task: task, f: f } = self;
951:         f.to_fn()(sched, task)


libgreen/sched.rs:894:1-894:1 -enum- definition:
pub enum SchedMessage {
    Wake,
    Shutdown,
references:- 4
910: impl SchedHandle {
911:     pub fn send(&mut self, msg: SchedMessage) {
912:         self.queue.push(msg);


libgreen/sched.rs:956:15-956:15 -NK_AS_STR_TODO- definition:
// complaining
type UnsafeTaskReceiver = raw::Closure;
trait ClosureConverter {
references:- 4
941: impl CleanupJob {
942:     pub fn new(task: Box<GreenTask>, f: UnsafeTaskReceiver) -> CleanupJob {
943:         CleanupJob {
--
961: }
962: impl ClosureConverter for UnsafeTaskReceiver {
963:     fn from_fn(f: |&mut Scheduler, Box<GreenTask>|) -> UnsafeTaskReceiver {
964:         unsafe { cast::transmute(f) }


libgreen/sched.rs:108:16-108:16 -enum- definition:
enum EffortLevel {
    DontTryTooHard,
    GiveItYourBest
references:- 4
107: /// mainly being whether memory is synchronized or not
109: enum EffortLevel {
--
352:     fn interpret_message_queue(mut ~self, stask: Box<GreenTask>,
353:                                effort: EffortLevel)
354:             -> (Box<Scheduler>, Box<GreenTask>, bool)


libgreen/sched.rs:957:40-957:40 -trait- definition:
type UnsafeTaskReceiver = raw::Closure;
trait ClosureConverter {
    fn from_fn(|&mut Scheduler, Box<GreenTask>|) -> Self;
references:- 2
961: }
962: impl ClosureConverter for UnsafeTaskReceiver {
963:     fn from_fn(f: |&mut Scheduler, Box<GreenTask>|) -> UnsafeTaskReceiver {


libgreen/sched.rs:903:1-903:1 -struct- definition:
pub struct SchedHandle {
    remote: Box<RemoteCallback:Send>,
    queue: msgq::Producer<SchedMessage>,
references:- 16
882:         return SchedHandle {
883:             remote: remote,
--
910: impl SchedHandle {
911:     pub fn send(&mut self, msg: SchedMessage) {
libgreen/sleeper_list.rs:
31:     pub fn pop(&mut self) -> Option<SchedHandle> {
32:         self.q.pop()
--
35:     pub fn casual_pop(&mut self) -> Option<SchedHandle> {
36:         self.q.pop()
libgreen/task.rs:
79:     AnySched,
80:     HomeSched(SchedHandle),
81: }
--
327:             let mtx = &mut self.nasty_deschedule_lock as *mut NativeMutex;
328:             let handle = self.handle.get_mut_ref() as *mut SchedHandle;
329:             let _guard = (*mtx).lock();
libgreen/lib.rs:
477:     /// other schedulers currently in the scheduler pool.
478:     pub fn spawn_sched(&mut self) -> SchedHandle {
479:         let (worker, stealer) = self.deque_pool.deque();
libgreen/sleeper_list.rs:
18: pub struct SleeperList {
19:     q: Queue<SchedHandle>,
20: }


libgreen/sched.rs:115:1-115:1 -fn- definition:
fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
    let r: uint = Rand::rand(rng);
    r % MAX_YIELD_CHECKS + 1
references:- 2
831:         } else {
832:             self.yield_check_count = reset_yield_check(&mut self.rng);
833:             // Tell the scheduler to start stealing on the next iteration