(index<- )        ./librustuv/homing.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! Homing I/O implementation
  12  //!
  13  //! In libuv, whenever a handle is created on an I/O loop it is illegal to use
  14  //! that handle outside of that I/O loop. We use libuv I/O with our green
  15  //! scheduler, and each green scheduler corresponds to a different I/O loop on a
  16  //! different OS thread. Green tasks are also free to roam among schedulers,
  17  //! which implies that it is possible to create an I/O handle on one event loop
  18  //! and then attempt to use it on another.
  19  //!
  20  //! In order to solve this problem, this module implements the notion of a
  21  //! "homing operation" which will transplant a task from its currently running
  22  //! scheduler back onto the original I/O loop. This is accomplished entirely at
  23  //! the librustuv layer with very little cooperation from the scheduler (which
  24  //! we don't even know exists technically).
  25  //!
  26  //! These homing operations are completed by first realizing that we're on the
  27  //! wrong I/O loop, then descheduling ourselves, sending ourselves to the
  28  //! correct I/O loop, and then waking up the I/O loop in order to process its
  29  //! local queue of tasks which need to run.
  30  //!
  31  //! This enqueueing is done with a concurrent queue from libstd, and the
  32  //! signalling is achieved with an async handle.
  33  
  34  #![allow(dead_code)]
  35  
  36  use std::cast;
  37  use std::rt::local::Local;
  38  use std::rt::rtio::LocalIo;
  39  use std::rt::task::{Task, BlockedTask};
  40  
  41  use ForbidUnwind;
  42  use queue::{Queue, QueuePool};
  43  
  44  /// A handle to a remote libuv event loop. This handle will keep the event loop
  45  /// alive while active in order to ensure that a homing operation can always be
  46  /// completed.
  47  ///
  48  /// Handles are clone-able in order to derive new handles from existing handles
  49  /// (very useful for when accepting a socket from a server).
  50  pub struct HomeHandle {
  51      queue: Queue,
  52      id: uint,
  53  }
  54  
  55  impl HomeHandle {
  56      pub fn new(iduint, pool&mut QueuePool) -> HomeHandle {
  57          HomeHandle { queue: pool.queue(), id: id }
  58      }
  59  
  60      fn send(&mut self, taskBlockedTask) {
  61          self.queue.push(task);
  62      }
  63  }
  64  
  65  impl Clone for HomeHandle {
  66      fn clone(&self) -> HomeHandle {
  67          HomeHandle {
  68              queue: self.queue.clone(),
  69              id: self.id,
  70          }
  71      }
  72  }
  73  
  74  pub fn local_id() -> uint {
  75      let mut io = match LocalIo::borrow() {
  76          Some(io) => io, None => return 0,
  77      };
  78      let io = io.get();
  79      unsafe {
  80          let (_vtable, ptr)(uint, uint) = cast::transmute(io);
  81          return ptr;
  82      }
  83  }
  84  
  85  pub trait HomingIO {
  86      fn home<'r>(&'r mut self) -> &'r mut HomeHandle;
  87  
  88      /// This function will move tasks to run on their home I/O scheduler. Note
  89      /// that this function does *not* pin the task to the I/O scheduler, but
  90      /// rather it simply moves it to running on the I/O scheduler.
  91      fn go_to_IO_home(&mut self) -> uint {
  92          let _f = ForbidUnwind::new("going home");
  93  
  94          let cur_loop_id = local_id();
  95          let destination = self.home().id;
  96  
  97          // Try at all costs to avoid the homing operation because it is quite
  98          // expensive. Hence, we only deschedule/send if we're not on the correct
  99          // event loop. If we're already on the home event loop, then we're good
 100          // to go (remember we have no preemption, so we're guaranteed to stay on
 101          // this event loop as long as we avoid the scheduler).
 102          if cur_loop_id != destination {
 103              let cur_taskBox<Task> = Local::take();
 104              cur_task.deschedule(1, |task| {
 105                  self.home().send(task);
 106                  Ok(())
 107              });
 108  
 109              // Once we wake up, assert that we're in the right location
 110              assert_eq!(local_id(), destination);
 111          }
 112  
 113          return destination;
 114      }
 115  
 116      /// Fires a single homing missile, returning another missile targeted back
 117      /// at the original home of this task. In other words, this function will
 118      /// move the local task to its I/O scheduler and then return an RAII wrapper
 119      /// which will return the task home.
 120      fn fire_homing_missile(&mut self) -> HomingMissile {
 121          HomingMissile { io_home: self.go_to_IO_home() }
 122      }
 123  }
 124  
 125  /// After a homing operation has been completed, this will return the current
 126  /// task back to its appropriate home (if applicable). The field is used to
 127  /// assert that we are where we think we are.
 128  pub struct HomingMissile {
 129      io_home: uint,
 130  }
 131  
 132  impl HomingMissile {
 133      /// Check at runtime that the task has *not* transplanted itself to a
 134      /// different I/O loop while executing.
 135      pub fn check(&self, msg&'static str) {
 136          assert!(local_id() == self.io_home, "{}", msg);
 137      }
 138  }
 139  
 140  impl Drop for HomingMissile {
 141      fn drop(&mut self) {
 142          let _f = ForbidUnwind::new("leaving home");
 143  
 144          // It would truly be a sad day if we had moved off the home I/O
 145          // scheduler while we were doing I/O.
 146          self.check("task moved away from the home scheduler");
 147      }
 148  }
 149  
 150  #[cfg(test)]
 151  mod test {
 152      use green::sched;
 153      use green::{SchedPool, PoolConfig};
 154      use std::rt::rtio::RtioUdpSocket;
 155      use std::io::test::next_test_ip4;
 156      use std::task::TaskOpts;
 157  
 158      use net::UdpWatcher;
 159      use super::super::local_loop;
 160  
 161      // On one thread, create a udp socket. Then send that socket to another
 162      // thread and destroy the socket on the remote thread. This should make sure
 163      // that homing kicks in for the socket to go back home to the original
 164      // thread, close itself, and then come back to the last thread.
 165      #[test]
 166      fn test_homing_closes_correctly() {
 167          let (tx, rx) = channel();
 168          let mut pool = SchedPool::new(PoolConfig {
 169              threads: 1,
 170              event_loop_factory: ::event_loop,
 171          });
 172  
 173          pool.spawn(TaskOpts::new(), proc() {
 174              let listener = UdpWatcher::bind(local_loop(), next_test_ip4());
 175              tx.send(listener.unwrap());
 176          });
 177  
 178          let task = pool.task(TaskOpts::new(), proc() {
 179              drop(rx.recv());
 180          });
 181          pool.spawn_sched().send(sched::TaskFromFriend(task));
 182  
 183          pool.shutdown();
 184      }
 185  
 186      #[test]
 187      fn test_homing_read() {
 188          let (tx, rx) = channel();
 189          let mut pool = SchedPool::new(PoolConfig {
 190              threads: 1,
 191              event_loop_factory: ::event_loop,
 192          });
 193  
 194          pool.spawn(TaskOpts::new(), proc() {
 195              let addr1 = next_test_ip4();
 196              let addr2 = next_test_ip4();
 197              let listener = UdpWatcher::bind(local_loop(), addr2);
 198              tx.send((listener.unwrap(), addr1));
 199              let mut listener = UdpWatcher::bind(local_loop(), addr1).unwrap();
 200              listener.sendto([1, 2, 3, 4], addr2).unwrap();
 201          });
 202  
 203          let task = pool.task(TaskOpts::new(), proc() {
 204              let (mut watcher, addr) = rx.recv();
 205              let mut buf = [0, ..10];
 206              assert_eq!(watcher.recvfrom(buf).unwrap(), (4, addr));
 207          });
 208          pool.spawn_sched().send(sched::TaskFromFriend(task));
 209  
 210          pool.shutdown();
 211      }
 212  }


librustuv/homing.rs:49:61-49:61 -struct- definition:
/// (very useful for when accepting a socket from a server).
pub struct HomeHandle {
    queue: Queue,
references:- 34
librustuv/uvio.rs:
librustuv/file.rs:
librustuv/net.rs:
librustuv/timer.rs:
librustuv/process.rs:
librustuv/pipe.rs:
librustuv/tty.rs:
librustuv/signal.rs:
librustuv/timeout.rs:
librustuv/file.rs:


librustuv/homing.rs:84:1-84:1 -trait- definition:
pub trait HomingIO {
    fn home<'r>(&'r mut self) -> &'r mut HomeHandle;
    /// This function will move tasks to run on their home I/O scheduler. Note
references:- 13
librustuv/file.rs:
355: impl HomingIO for FileWatcher {
356:     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
librustuv/net.rs:
379: impl HomingIO for TcpListener {
380:     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
--
534: impl HomingIO for UdpWatcher {
535:     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
librustuv/process.rs:
203: impl HomingIO for Process {
204:     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
librustuv/pipe.rs:
206: impl HomingIO for PipeWatcher {
207:     fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
--
268: impl HomingIO for PipeListener {
269:     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
--
316: impl HomingIO for PipeAcceptor {
317:     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
librustuv/tty.rs:
126: impl HomingIO for TtyWatcher {
127:     fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
librustuv/signal.rs:
57: impl HomingIO for SignalWatcher {
58:     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
librustuv/timeout.rs:
360:     pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
361:         &mut self, ms: u64, t: &mut T
librustuv/timer.rs:
68: impl HomingIO for TimerWatcher {
69:     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }


librustuv/homing.rs:73:1-73:1 -fn- definition:
pub fn local_id() -> uint {
    let mut io = match LocalIo::borrow() {
        Some(io) => io, None => return 0,
references:- 5
94:         let cur_loop_id = local_id();
95:         let destination = self.home().id;
--
135:     pub fn check(&self, msg: &'static str) {
136:         assert!(local_id() == self.io_home, "{}", msg);
137:     }
librustuv/lib.rs:
206:             msg: s,
207:             io: homing::local_id(),
208:         }
--
213:     fn drop(&mut self) {
214:         assert!(self.io == homing::local_id(),
215:                 "didnt want a scheduler switch: {}",


librustuv/homing.rs:127:46-127:46 -struct- definition:
/// assert that we are where we think we are.
pub struct HomingMissile {
    io_home: uint,
references:- 8
120:     fn fire_homing_missile(&mut self) -> HomingMissile {
121:         HomingMissile { io_home: self.go_to_IO_home() }
122:     }
--
140: impl Drop for HomingMissile {
141:     fn drop(&mut self) {
librustuv/access.rs:
50:     pub fn grant<'a>(&'a mut self, token: uint,
51:                      missile: HomingMissile) -> Guard<'a> {
52:         // This unsafety is actually OK because the homing missile argument
--
71:     pub fn close(&self, _missile: &HomingMissile) {
72:         // This unsafety is OK because with a homing missile we're guaranteed to
librustuv/timeout.rs:
71:     /// error.
72:     pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a>> {
73:         // First, flag that we're attempting to acquire access. This will allow