(index<- )        ./libstd/rt/uv/uvio.rs

    1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
    2  // file at the top-level directory of this distribution and at
    3  // http://rust-lang.org/COPYRIGHT.
    4  //
    5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
    6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
    7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
    8  // option. This file may not be copied, modified, or distributed
    9  // except according to those terms.
   10  
   11  use c_str::ToCStr;
   12  use cast::transmute;
   13  use cast;
   14  use cell::Cell;
   15  use clone::Clone;
   16  use libc::{c_int, c_uint, c_void, pid_t};
   17  use ops::Drop;
   18  use option::*;
   19  use ptr;
   20  use str;
   21  use str::Str;
   22  use result::*;
   23  use rt::io::IoError;
   24  use rt::io::net::ip::{SocketAddr, IpAddr};
   25  use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
   26  use rt::io::process::ProcessConfig;
   27  use rt::kill::BlockedTask;
   28  use rt::local::Local;
   29  use rt::rtio::*;
   30  use rt::sched::{Scheduler, SchedHandle};
   31  use rt::tube::Tube;
   32  use rt::task::SchedHome;
   33  use rt::uv::*;
   34  use rt::uv::idle::IdleWatcher;
   35  use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr, accum_sockaddrs};
   36  use rt::uv::addrinfo::GetAddrInfoRequest;
   37  use unstable::sync::Exclusive;
   38  use path::{GenericPath, Path};
   39  use super::super::io::support::PathLike;
   40  use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
   41            S_IRUSR, S_IWUSR, S_IRWXU};
   42  use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
   43               CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite,
   44               FileStat};
   45  use task;
   46  
   47  #[cfg(test)] use container::Container;
   48  #[cfg(test)] use unstable::run_in_bare_thread;
   49  #[cfg(test)] use rt::test::{spawntask,
   50                              next_test_ip4,
   51                              run_in_mt_newsched_task};
   52  #[cfg(test)] use iter::{Iterator, range};
   53  #[cfg(test)] use rt::comm::oneshot;
   54  
   55  // XXX we should not be calling uvll functions in here.
   56  
   57  trait HomingIO {
   58  
   59      fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
   60  
   61      /* XXX This will move pinned tasks to do IO on the proper scheduler
   62       * and then move them back to their home.
   63       */
   64      fn go_to_IO_home(&mut self) -> SchedHome {
   65          use rt::sched::PinnedTask;
   66  
   67          do task::unkillable { // FIXME(#8674)
   68              let mut old = None;
   69              {
   70                  let ptr = &mut old;
   71                  let scheduler~Scheduler = Local::take();
   72                  do scheduler.deschedule_running_task_and_then |_, task| {
   73                      /* FIXME(#8674) if the task was already killed then wake
   74                       * will return None. In that case, the home pointer will never be set.
   75                       *
   76                       * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
   77                       */
   78                      do task.wake().map |mut task| {
   79                          *ptr = Some(task.take_unwrap_home());
   80                          self.home().send(PinnedTask(task));
   81                      };
   82                  }
   83              }
   84              old.expect("No old home because task had already been killed.")
   85          }
   86      }
   87  
   88      // XXX dummy self param
   89      fn restore_original_home(_dummy_selfOption<Self>, oldSchedHome) {
   90          use rt::sched::TaskFromFriend;
   91  
   92          let old = Cell::new(old);
   93          do task::unkillable { // FIXME(#8674)
   94              let scheduler~Scheduler = Local::take();
   95              do scheduler.deschedule_running_task_and_then |scheduler, task| {
   96                  /* FIXME(#8674) if the task was already killed then wake
   97                   * will return None. In that case, the home pointer will never be restored.
   98                   *
   99                   * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
  100                   */
  101                  do task.wake().map |mut task| {
  102                      task.give_home(old.take());
  103                      scheduler.make_handle().send(TaskFromFriend(task));
  104                  };
  105              }
  106          }
  107      }
  108  
  109      fn home_for_io<A>(&mut self, io&fn(&mut Self) -> A) -> A {
  110          let home = self.go_to_IO_home();
  111          let a = io(self); // do IO
  112          HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
  113          a // return the result of the IO
  114      }
  115  
  116      fn home_for_io_consume<A>(self, io&fn(Self) -> A) -> A {
  117          let mut this = self;
  118          let home = this.go_to_IO_home();
  119          let a = io(this); // do IO
  120          HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
  121          a // return the result of the IO
  122      }
  123  
  124      fn home_for_io_with_sched<A>(&mut self, io_sched&fn(&mut Self, ~Scheduler) -> A) -> A {
  125          let home = self.go_to_IO_home();
  126          let a = do task::unkillable { // FIXME(#8674)
  127              let scheduler~Scheduler = Local::take();
  128              io_sched(self, scheduler) // do IO and scheduling action
  129          };
  130          HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
  131          a // return result of IO
  132      }
  133  }
  134  
  135  // get a handle for the current scheduler
  136  macro_rules! get_handle_to_current_scheduler(
  137      () => (do Local::borrow |sched&mut Scheduler{ sched.make_handle() })
  138  )
  139  
  140  enum SocketNameKind {
  141      TcpPeer,
  142      Tcp,
  143      Udp
  144  }
  145  
  146  fn socket_name<T, U: Watcher + NativeHandle<*T>>(skSocketNameKind,
  147                                                   handleU) -> Result<SocketAddr, IoError> {
  148      let getsockname = match sk {
  149          TcpPeer => uvll::tcp_getpeername,
  150          Tcp     => uvll::tcp_getsockname,
  151          Udp     => uvll::udp_getsockname,
  152      };
  153  
  154      // Allocate a sockaddr_storage
  155      // since we don't know if it's ipv4 or ipv6
  156      let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
  157  
  158      let r = unsafe {
  159          getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
  160      };
  161  
  162      if r != 0 {
  163          let status = status_to_maybe_uv_error(r);
  164          return Err(uv_error_to_io_error(status.unwrap()));
  165      }
  166  
  167      let addr = unsafe {
  168          if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
  169              net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
  170          } else {
  171              net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
  172          }
  173      };
  174  
  175      unsafe { uvll::free_sockaddr_storage(r_addr); }
  176  
  177      Ok(addr)
  178  
  179  }
  180  
  181  // Obviously an Event Loop is always home.
  182  pub struct UvEventLoop {
  183      uvio: UvIoFactory
  184  }
  185  
  186  impl UvEventLoop {
  187      pub fn new() -> UvEventLoop {
  188          UvEventLoop {
  189              uvio: UvIoFactory(Loop::new())
  190          }
  191      }
  192  }
  193  
  194  impl Drop for UvEventLoop {
  195      fn drop(&mut self) {
  196          self.uvio.uv_loop().close();
  197      }
  198  }
  199  
  200  impl EventLoop for UvEventLoop {
  201      fn run(&mut self) {
  202          self.uvio.uv_loop().run();
  203      }
  204  
  205      fn callback(&mut self, f~fn()) {
  206          let mut idle_watcher =  IdleWatcher::new(self.uvio.uv_loop());
  207          do idle_watcher.start |mut idle_watcher, status| {
  208              assert!(status.is_none());
  209              idle_watcher.stop();
  210              idle_watcher.close(||());
  211              f();
  212          }
  213      }
  214  
  215      fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
  216          let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
  217          return ~UvPausibleIdleCallback {
  218              watcher: idle_watcher,
  219              idle_flag: false,
  220              closed: false
  221          };
  222      }
  223  
  224      fn callback_ms(&mut self, msu64, f~fn()) {
  225          let mut timer =  TimerWatcher::new(self.uvio.uv_loop());
  226          do timer.start(ms, 0) |timer, status| {
  227              assert!(status.is_none());
  228              timer.close(||());
  229              f();
  230          }
  231      }
  232  
  233      fn remote_callback(&mut self, f~fn()) -> ~RemoteCallbackObject {
  234          ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
  235      }
  236  
  237      fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
  238          Some(&mut self.uvio)
  239      }
  240  }
  241  
  242  pub struct UvPausibleIdleCallback {
  243      watcher: IdleWatcher,
  244      idle_flag: bool,
  245      closed: bool
  246  }
  247  
  248  impl UvPausibleIdleCallback {
  249      #[inline]
  250      pub fn start(&mut self, f~fn()) {
  251          do self.watcher.start |_idle_watcher, _status| {
  252              f();
  253          };
  254          self.idle_flag = true;
  255      }
  256      #[inline]
  257      pub fn pause(&mut self) {
  258          if self.idle_flag == true {
  259              self.watcher.stop();
  260              self.idle_flag = false;
  261          }
  262      }
  263      #[inline]
  264      pub fn resume(&mut self) {
  265          if self.idle_flag == false {
  266              self.watcher.restart();
  267              self.idle_flag = true;
  268          }
  269      }
  270      #[inline]
  271      pub fn close(&mut self) {
  272          self.pause();
  273          if !self.closed {
  274              self.closed = true;
  275              self.watcher.close(||{});
  276          }
  277      }
  278  }
  279  
  280  #[test]
  281  fn test_callback_run_once() {
  282      do run_in_bare_thread {
  283          let mut event_loop = UvEventLoop::new();
  284          let mut count = 0;
  285          let count_ptr: *mut int = &mut count;
  286          do event_loop.callback {
  287              unsafe { *count_ptr += 1 }
  288          }
  289          event_loop.run();
  290          assert_eq!(count, 1);
  291      }
  292  }
  293  
  294  // The entire point of async is to call into a loop from other threads so it does not need to home.
  295  pub struct UvRemoteCallback {
  296      // The uv async handle for triggering the callback
  297      async: AsyncWatcher,
  298      // A flag to tell the callback to exit, set from the dtor. This is
  299      // almost never contested - only in rare races with the dtor.
  300      exit_flag: Exclusive<bool>
  301  }
  302  
  303  impl UvRemoteCallback {
  304      pub fn new(loop_&mut Loop, f~fn()) -> UvRemoteCallback {
  305          let exit_flag = Exclusive::new(false);
  306          let exit_flag_clone = exit_flag.clone();
  307          let async = do AsyncWatcher::new(loop_) |watcher, status| {
  308              assert!(status.is_none());
  309  
  310              // The synchronization logic here is subtle. To review,
  311              // the uv async handle type promises that, after it is
  312              // triggered the remote callback is definitely called at
  313              // least once. UvRemoteCallback needs to maintain those
  314              // semantics while also shutting down cleanly from the
  315              // dtor. In our case that means that, when the
  316              // UvRemoteCallback dtor calls `async.send()`, here `f` is
  317              // always called later.
  318  
  319              // In the dtor both the exit flag is set and the async
  320              // callback fired under a lock.  Here, before calling `f`,
  321              // we take the lock and check the flag. Because we are
  322              // checking the flag before calling `f`, and the flag is
  323              // set under the same lock as the send, then if the flag
  324              // is set then we're guaranteed to call `f` after the
  325              // final send.
  326  
  327              // If the check was done after `f()` then there would be a
  328              // period between that call and the check where the dtor
  329              // could be called in the other thread, missing the final
  330              // callback while still destroying the handle.
  331  
  332              let should_exit = unsafe {
  333                  exit_flag_clone.with_imm(|&should_exit| should_exit)
  334              };
  335  
  336              f();
  337  
  338              if should_exit {
  339                  watcher.close(||());
  340              }
  341  
  342          };
  343          UvRemoteCallback {
  344              async: async,
  345              exit_flag: exit_flag
  346          }
  347      }
  348  }
  349  
  350  impl RemoteCallback for UvRemoteCallback {
  351      fn fire(&mut self) { self.async.send() }
  352  }
  353  
  354  impl Drop for UvRemoteCallback {
  355      fn drop(&mut self) {
  356          unsafe {
  357              let this&mut UvRemoteCallback = cast::transmute_mut(self);
  358              do this.exit_flag.with |should_exit| {
  359                  // NB: These two things need to happen atomically. Otherwise
  360                  // the event handler could wake up due to a *previous*
  361                  // signal and see the exit flag, destroying the handle
  362                  // before the final send.
  363                  *should_exit = true;
  364                  this.async.send();
  365              }
  366          }
  367      }
  368  }
  369  
  370  #[cfg(test)]
  371  mod test_remote {
  372      use cell::Cell;
  373      use rt::test::*;
  374      use rt::thread::Thread;
  375      use rt::tube::Tube;
  376      use rt::rtio::EventLoop;
  377      use rt::local::Local;
  378      use rt::sched::Scheduler;
  379  
  380      #[test]
  381      fn test_uv_remote() {
  382          do run_in_mt_newsched_task {
  383              let mut tube = Tube::new();
  384              let tube_clone = tube.clone();
  385              let remote_cell = Cell::new_empty();
  386              do Local::borrow |sched: &mut Scheduler| {
  387                  let tube_clone = tube_clone.clone();
  388                  let tube_clone_cell = Cell::new(tube_clone);
  389                  let remote = do sched.event_loop.remote_callback {
  390                      // This could be called multiple times
  391                      if !tube_clone_cell.is_empty() {
  392                          tube_clone_cell.take().send(1);
  393                      }
  394                  };
  395                  remote_cell.put_back(remote);
  396              }
  397              let thread = do Thread::start {
  398                  remote_cell.take().fire();
  399              };
  400  
  401              assert!(tube.recv() == 1);
  402              thread.join();
  403          }
  404      }
  405  }
  406  
  407  pub struct UvIoFactory(Loop);
  408  
  409  impl UvIoFactory {
  410      pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
  411          match self { &UvIoFactory(ref mut ptr) => ptr }
  412      }
  413  }
  414  
  415  /// Helper for a variety of simple uv_fs_* functions that
  416  /// have no ret val
  417  fn uv_fs_helper<P: PathLike>(loop_&mut Loop, path&P,
  418                               cb~fn(&mut FsRequest, &mut Loop, &P,
  419                                       ~fn(&FsRequest, Option<UvError>)))
  420          -> Result<(), IoError> {
  421      let result_cell = Cell::new_empty();
  422      let result_cell_ptr*Cell<Result<(), IoError>> = &result_cell;
  423      let path_cell = Cell::new(path);
  424      do task::unkillable { // FIXME(#8674)
  425          let scheduler~Scheduler = Local::take();
  426          let mut new_req = FsRequest::new();
  427          do scheduler.deschedule_running_task_and_then |_, task| {
  428              let task_cell = Cell::new(task);
  429              let path = path_cell.take();
  430              do cb(&mut new_req, loop_, path) |_, err| {
  431                  let res = match err {
  432                      None => Ok(()),
  433                      Some(err) => Err(uv_error_to_io_error(err))
  434                  };
  435                  unsafe { (*result_cell_ptr).put_back(res); }
  436                  let scheduler~Scheduler = Local::take();
  437                  scheduler.resume_blocked_task_immediately(task_cell.take());
  438              };
  439          }
  440      }
  441      assert!(!result_cell.is_empty());
  442      return result_cell.take();
  443  }
  444  
  445  impl IoFactory for UvIoFactory {
  446      // Connect to an address and return a new stream
  447      // NB: This blocks the task waiting on the connection.
  448      // It would probably be better to return a future
  449      fn tcp_connect(&mut self, addrSocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
  450          // Create a cell in the task to hold the result. We will fill
  451          // the cell before resuming the task.
  452          let result_cell = Cell::new_empty();
  453          let result_cell_ptr*Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
  454  
  455          // Block this task and take ownership, switch to scheduler context
  456          do task::unkillable { // FIXME(#8674)
  457              let scheduler~Scheduler = Local::take();
  458              do scheduler.deschedule_running_task_and_then |_, task| {
  459  
  460                  let mut tcp = TcpWatcher::new(self.uv_loop());
  461                  let task_cell = Cell::new(task);
  462  
  463                  // Wait for a connection
  464                  do tcp.connect(addr) |stream, status| {
  465                      match status {
  466                          None => {
  467                              let tcp = NativeHandle::from_native_handle(stream.native_handle());
  468                              let home = get_handle_to_current_scheduler!();
  469                              let res = Ok(~UvTcpStream { watcher: tcp, home: home });
  470  
  471                              // Store the stream in the task's stack
  472                              unsafe { (*result_cell_ptr).put_back(res); }
  473  
  474                              // Context switch
  475                              let scheduler~Scheduler = Local::take();
  476                              scheduler.resume_blocked_task_immediately(task_cell.take());
  477                          }
  478                          Some(_) => {
  479                              let task_cell = Cell::new(task_cell.take());
  480                              do stream.close {
  481                                  let res = Err(uv_error_to_io_error(status.unwrap()));
  482                                  unsafe { (*result_cell_ptr).put_back(res); }
  483                                  let scheduler~Scheduler = Local::take();
  484                                  scheduler.resume_blocked_task_immediately(task_cell.take());
  485                              }
  486                          }
  487                      }
  488                  }
  489              }
  490          }
  491  
  492          assert!(!result_cell.is_empty());
  493          return result_cell.take();
  494      }
  495  
  496      fn tcp_bind(&mut self, addrSocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
  497          let mut watcher = TcpWatcher::new(self.uv_loop());
  498          match watcher.bind(addr) {
  499              Ok(_) => {
  500                  let home = get_handle_to_current_scheduler!();
  501                  Ok(~UvTcpListener::new(watcher, home))
  502              }
  503              Err(uverr) => {
  504                  do task::unkillable { // FIXME(#8674)
  505                      let scheduler~Scheduler = Local::take();
  506                      do scheduler.deschedule_running_task_and_then |_, task| {
  507                          let task_cell = Cell::new(task);
  508                          do watcher.as_stream().close {
  509                              let scheduler~Scheduler = Local::take();
  510                              scheduler.resume_blocked_task_immediately(task_cell.take());
  511                          }
  512                      }
  513                      Err(uv_error_to_io_error(uverr))
  514                  }
  515              }
  516          }
  517      }
  518  
  519      fn udp_bind(&mut self, addrSocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
  520          let mut watcher = UdpWatcher::new(self.uv_loop());
  521          match watcher.bind(addr) {
  522              Ok(_) => {
  523                  let home = get_handle_to_current_scheduler!();
  524                  Ok(~UvUdpSocket { watcher: watcher, home: home })
  525              }
  526              Err(uverr) => {
  527                  do task::unkillable { // FIXME(#8674)
  528                      let scheduler~Scheduler = Local::take();
  529                      do scheduler.deschedule_running_task_and_then |_, task| {
  530                          let task_cell = Cell::new(task);
  531                          do watcher.close {
  532                              let scheduler~Scheduler = Local::take();
  533                              scheduler.resume_blocked_task_immediately(task_cell.take());
  534                          }
  535                      }
  536                      Err(uv_error_to_io_error(uverr))
  537                  }
  538              }
  539          }
  540      }
  541  
  542      fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
  543          let watcher = TimerWatcher::new(self.uv_loop());
  544          let home = get_handle_to_current_scheduler!();
  545          Ok(~UvTimer::new(watcher, home))
  546      }
  547  
  548      fn fs_from_raw_fd(&mut self, fdc_int, close_on_dropbool) -> ~RtioFileStream {
  549          let loop_ = Loop {handle: self.uv_loop().native_handle()};
  550          let home = get_handle_to_current_scheduler!();
  551          ~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream
  552      }
  553  
  554      fn fs_open<P: PathLike>(&mut self, path&P, fmFileMode, faFileAccess)
  555          -> Result<~RtioFileStream, IoError> {
  556          let mut flags = match fm {
  557              Open => 0,
  558              Create => O_CREAT,
  559              OpenOrCreate => O_CREAT,
  560              Append => O_APPEND,
  561              Truncate => O_TRUNC,
  562              CreateOrTruncate => O_TRUNC | O_CREAT
  563          };
  564          flags = match fa {
  565              Read => flags | O_RDONLY,
  566              Write => flags | O_WRONLY,
  567              ReadWrite => flags | O_RDWR
  568          };
  569          let create_mode = match fm {
  570              Create|OpenOrCreate|CreateOrTruncate =>
  571                  S_IRUSR | S_IWUSR,
  572              _ => 0
  573          };
  574          let result_cell = Cell::new_empty();
  575          let result_cell_ptr*Cell<Result<~RtioFileStream,
  576                                             IoError>> = &result_cell;
  577          let path_cell = Cell::new(path);
  578          do task::unkillable { // FIXME(#8674)
  579              let scheduler~Scheduler = Local::take();
  580              let open_req = file::FsRequest::new();
  581              do scheduler.deschedule_running_task_and_then |_, task| {
  582                  let task_cell = Cell::new(task);
  583                  let path = path_cell.take();
  584                  do open_req.open(self.uv_loop(), path, flags as int, create_mode as int)
  585                        |req,err| {
  586                      if err.is_none() {
  587                          let loop_ = Loop {handle: req.get_loop().native_handle()};
  588                          let home = get_handle_to_current_scheduler!();
  589                          let fd = req.get_result() as c_int;
  590                          let fs = ~UvFileStream::new(
  591                              loop_, fd, true, home) as ~RtioFileStream;
  592                          let res = Ok(fs);
  593                          unsafe { (*result_cell_ptr).put_back(res); }
  594                          let scheduler~Scheduler = Local::take();
  595                          scheduler.resume_blocked_task_immediately(task_cell.take());
  596                      } else {
  597                          let res = Err(uv_error_to_io_error(err.unwrap()));
  598                          unsafe { (*result_cell_ptr).put_back(res); }
  599                          let scheduler~Scheduler = Local::take();
  600                          scheduler.resume_blocked_task_immediately(task_cell.take());
  601                      }
  602                  };
  603              };
  604          };
  605          assert!(!result_cell.is_empty());
  606          return result_cell.take();
  607      }
  608  
  609      fn fs_unlink<P: PathLike>(&mut self, path&P) -> Result<(), IoError> {
  610          do uv_fs_helper(self.uv_loop(), path) |unlink_req, l, p, cb| {
  611              do unlink_req.unlink(l, p) |req, err| {
  612                  cb(req, err)
  613              };
  614          }
  615      }
  616      fn fs_stat<P: PathLike>(&mut self, path&P) -> Result<FileStat, IoError> {
  617          use str::StrSlice;
  618          let result_cell = Cell::new_empty();
  619          let result_cell_ptr*Cell<Result<FileStat,
  620                                             IoError>> = &result_cell;
  621          let path_cell = Cell::new(path);
  622          do task::unkillable { // FIXME(#8674)
  623              let scheduler~Scheduler = Local::take();
  624              let stat_req = file::FsRequest::new();
  625              do scheduler.deschedule_running_task_and_then |_, task| {
  626                  let task_cell = Cell::new(task);
  627                  let path = path_cell.take();
  628                  let path_str = path.path_as_str(|p| p.to_owned());
  629                  do stat_req.stat(self.uv_loop(), path)
  630                        |req,err| {
  631                      let res = match err {
  632                          None => {
  633                              let stat = req.get_stat();
  634                              Ok(FileStat {
  635                                  path: Path::new(path_str.as_slice()),
  636                                  is_file: stat.is_file(),
  637                                  is_dir: stat.is_dir(),
  638                                  size: stat.st_size,
  639                                  created: stat.st_ctim.tv_sec as u64,
  640                                  modified: stat.st_mtim.tv_sec as u64,
  641                                  accessed: stat.st_atim.tv_sec as u64
  642                              })
  643                          },
  644                          Some(e) => {
  645                              Err(uv_error_to_io_error(e))
  646                          }
  647                      };
  648                      unsafe { (*result_cell_ptr).put_back(res); }
  649                      let scheduler~Scheduler = Local::take();
  650                      scheduler.resume_blocked_task_immediately(task_cell.take());
  651                  };
  652              };
  653          };
  654          assert!(!result_cell.is_empty());
  655          return result_cell.take();
  656      }
  657  
  658      fn get_host_addresses(&mut self, host&str) -> Result<~[IpAddr], IoError> {
  659          let result_cell = Cell::new_empty();
  660          let result_cell_ptr*Cell<Result<~[IpAddr], IoError>> = &result_cell;
  661          let host_ptr*&str = &host;
  662          let addrinfo_req = GetAddrInfoRequest::new();
  663          let addrinfo_req_cell = Cell::new(addrinfo_req);
  664          do task::unkillable { // FIXME(#8674)
  665              let scheduler~Scheduler = Local::take();
  666              do scheduler.deschedule_running_task_and_then |_, task| {
  667                  let task_cell = Cell::new(task);
  668                  let mut addrinfo_req = addrinfo_req_cell.take();
  669                  unsafe {
  670                      do addrinfo_req.getaddrinfo(self.uv_loop(),
  671                                                  Some(*host_ptr),
  672                                                  None, None) |_, addrinfo, err| {
  673                          let res = match err {
  674                              None => Ok(accum_sockaddrs(addrinfo).map(|addr| addr.ip.clone())),
  675                              Some(err) => Err(uv_error_to_io_error(err))
  676                          };
  677                          (*result_cell_ptr).put_back(res);
  678                          let scheduler~Scheduler = Local::take();
  679                          scheduler.resume_blocked_task_immediately(task_cell.take());
  680                      }
  681                  }
  682              }
  683          }
  684          addrinfo_req.delete();
  685          assert!(!result_cell.is_empty());
  686          return result_cell.take();
  687      }
  688      fn fs_mkdir<P: PathLike>(&mut self, path&P) -> Result<(), IoError> {
  689          let mode = S_IRWXU as int;
  690          do uv_fs_helper(self.uv_loop(), path) |mkdir_req, l, p, cb| {
  691              do mkdir_req.mkdir(l, p, mode as int) |req, err| {
  692                  cb(req, err)
  693              };
  694          }
  695      }
  696      fn fs_rmdir<P: PathLike>(&mut self, path&P) -> Result<(), IoError> {
  697          do uv_fs_helper(self.uv_loop(), path) |rmdir_req, l, p, cb| {
  698              do rmdir_req.rmdir(l, p) |req, err| {
  699                  cb(req, err)
  700              };
  701          }
  702      }
  703      fn fs_readdir<P: PathLike>(&mut self, path&P, flagsc_int) ->
  704          Result<~[Path], IoError> {
  705          use str::StrSlice;
  706          let result_cell = Cell::new_empty();
  707          let result_cell_ptr*Cell<Result<~[Path],
  708                                             IoError>> = &result_cell;
  709          let path_cell = Cell::new(path);
  710          do task::unkillable { // FIXME(#8674)
  711              let scheduler~Scheduler = Local::take();
  712              let stat_req = file::FsRequest::new();
  713              do scheduler.deschedule_running_task_and_then |_, task| {
  714                  let task_cell = Cell::new(task);
  715                  let path = path_cell.take();
  716                  let path_str = path.path_as_str(|p| p.to_owned());
  717                  do stat_req.readdir(self.uv_loop(), path, flags)
  718                        |req,err| {
  719                      let res = match err {
  720                          None => {
  721                              let rel_paths = req.get_paths();
  722                              let mut paths = ~[];
  723                              for r in rel_paths.iter() {
  724                                  let mut p = Path::new(path_str.as_slice());
  725                                  p.push(r.as_slice());
  726                                  paths.push(p);
  727                              }
  728                              Ok(paths)
  729                          },
  730                          Some(e) => {
  731                              Err(uv_error_to_io_error(e))
  732                          }
  733                      };
  734                      unsafe { (*result_cell_ptr).put_back(res); }
  735                      let scheduler~Scheduler = Local::take();
  736                      scheduler.resume_blocked_task_immediately(task_cell.take());
  737                  };
  738              };
  739          };
  740          assert!(!result_cell.is_empty());
  741          return result_cell.take();
  742      }
  743  
  744      fn pipe_init(&mut self, ipcbool) -> Result<~RtioUnboundPipeObject, IoError> {
  745          let home = get_handle_to_current_scheduler!();
  746          Ok(~UvUnboundPipe { pipe: Pipe::new(self.uv_loop(), ipc), home: home })
  747      }
  748  
  749      fn spawn(&mut self, configProcessConfig)
  750              -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>
  751      {
  752          // Sadly, we must create the UvProcess before we actually call uv_spawn
  753          // so that the exit_cb can close over it and notify it when the process
  754          // has exited.
  755          let mut ret = ~UvProcess {
  756              process: Process::new(),
  757              home: None,
  758              exit_status: None,
  759              term_signal: None,
  760              exit_error: None,
  761              descheduled: None,
  762          };
  763          let ret_ptr = unsafe {
  764              *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
  765          };
  766  
  767          // The purpose of this exit callback is to record the data about the
  768          // exit and then wake up the task which may be waiting for the process
  769          // to exit. This is all performed in the current io-loop, and the
  770          // implementation of UvProcess ensures that reading these fields always
  771          // occurs on the current io-loop.
  772          let exit_cbExitCallback = |_, exit_status, term_signal, error| {
  773              unsafe {
  774                  assert!((*ret_ptr).exit_status.is_none());
  775                  (*ret_ptr).exit_status = Some(exit_status);
  776                  (*ret_ptr).term_signal = Some(term_signal);
  777                  (*ret_ptr).exit_error = error;
  778                  match (*ret_ptr).descheduled.take() {
  779                      Some(task) => {
  780                          let scheduler~Scheduler = Local::take();
  781                          scheduler.resume_blocked_task_immediately(task);
  782                      }
  783                      None => {}
  784                  }
  785              }
  786          };
  787  
  788          match ret.process.spawn(self.uv_loop(), config, exit_cb) {
  789              Ok(io) => {
  790                  // Only now do we actually get a handle to this scheduler.
  791                  ret.home = Some(get_handle_to_current_scheduler!());
  792                  Ok((ret, io))
  793              }
  794              Err(uverr) => {
  795                  // We still need to close the process handle we created, but
  796                  // that's taken care for us in the destructor of UvProcess
  797                  Err(uv_error_to_io_error(uverr))
  798              }
  799          }
  800      }
  801  }
  802  
  803  pub struct UvTcpListener {
  804      watcher : TcpWatcher,
  805      home: SchedHandle,
  806  }
  807  
  808  impl HomingIO for UvTcpListener {
  809      fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
  810  }
  811  
  812  impl UvTcpListener {
  813      fn new(watcherTcpWatcher, homeSchedHandle) -> UvTcpListener {
  814          UvTcpListener { watcher: watcher, home: home }
  815      }
  816  }
  817  
  818  impl Drop for UvTcpListener {
  819      fn drop(&mut self) {
  820          do self.home_for_io_with_sched |self_, scheduler| {
  821              do scheduler.deschedule_running_task_and_then |_, task| {
  822                  let task = Cell::new(task);
  823                  do self_.watcher.as_stream().close {
  824                      let scheduler~Scheduler = Local::take();
  825                      scheduler.resume_blocked_task_immediately(task.take());
  826                  }
  827              }
  828          }
  829      }
  830  }
  831  
  832  impl RtioSocket for UvTcpListener {
  833      fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
  834          do self.home_for_io |self_| {
  835              socket_name(Tcp, self_.watcher)
  836          }
  837      }
  838  }
  839  
  840  impl RtioTcpListener for UvTcpListener {
  841      fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> {
  842          do self.home_for_io_consume |self_| {
  843              let mut acceptor = ~UvTcpAcceptor::new(self_);
  844              let incoming = Cell::new(acceptor.incoming.clone());
  845              do acceptor.listener.watcher.listen |mut server, status| {
  846                  do incoming.with_mut_ref |incoming| {
  847                      let inc = match status {
  848                          Some(_) => Err(standard_error(OtherIoError)),
  849                          None => {
  850                              let inc = TcpWatcher::new(&server.event_loop());
  851                              // first accept call in the callback guarenteed to succeed
  852                              server.accept(inc.as_stream());
  853                              let home = get_handle_to_current_scheduler!();
  854                              Ok(~UvTcpStream { watcher: inc, home: home })
  855                          }
  856                      };
  857                      incoming.send(inc);
  858                  }
  859              };
  860              Ok(acceptor)
  861          }
  862      }
  863  }
  864  
  865  pub struct UvTcpAcceptor {
  866      listener: UvTcpListener,
  867      incoming: Tube<Result<~RtioTcpStreamObject, IoError>>,
  868  }
  869  
  870  impl HomingIO for UvTcpAcceptor {
  871      fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
  872  }
  873  
  874  impl UvTcpAcceptor {
  875      fn new(listenerUvTcpListener) -> UvTcpAcceptor {
  876          UvTcpAcceptor { listener: listener, incoming: Tube::new() }
  877      }
  878  }
  879  
  880  impl RtioSocket for UvTcpAcceptor {
  881      fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
  882          do self.home_for_io |self_| {
  883              socket_name(Tcp, self_.listener.watcher)
  884          }
  885      }
  886  }
  887  
  888  impl RtioTcpAcceptor for UvTcpAcceptor {
  889      fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
  890          do self.home_for_io |self_| {
  891              self_.incoming.recv()
  892          }
  893      }
  894  
  895      fn accept_simultaneously(&mut self) -> Result<(), IoError> {
  896          do self.home_for_io |self_| {
  897              let r = unsafe {
  898                  uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int)
  899              };
  900  
  901              match status_to_maybe_uv_error(r) {
  902                  Some(err) => Err(uv_error_to_io_error(err)),
  903                  None => Ok(())
  904              }
  905          }
  906      }
  907  
  908      fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
  909          do self.home_for_io |self_| {
  910              let r = unsafe {
  911                  uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int)
  912              };
  913  
  914              match status_to_maybe_uv_error(r) {
  915                  Some(err) => Err(uv_error_to_io_error(err)),
  916                  None => Ok(())
  917              }
  918          }
  919      }
  920  }
  921  
  922  fn read_stream(mut watcherStreamWatcher,
  923                 scheduler~Scheduler,
  924                 buf&mut [u8]) -> Result<uint, IoError> {
  925      let result_cell = Cell::new_empty();
  926      let result_cell_ptr*Cell<Result<uint, IoError>> = &result_cell;
  927  
  928      let buf_ptr*&mut [u8] = &buf;
  929      do scheduler.deschedule_running_task_and_then |_sched, task| {
  930          let task_cell = Cell::new(task);
  931          // XXX: We shouldn't reallocate these callbacks every
  932          // call to read
  933          let allocAllocCallback = |_| unsafe {
  934              slice_to_uv_buf(*buf_ptr)
  935          };
  936          do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
  937  
  938              // Stop reading so that no read callbacks are
  939              // triggered before the user calls `read` again.
  940              // XXX: Is there a performance impact to calling
  941              // stop here?
  942              watcher.read_stop();
  943  
  944              let result = if status.is_none() {
  945                  assert!(nread >= 0);
  946                  Ok(nread as uint)
  947              } else {
  948                  Err(uv_error_to_io_error(status.unwrap()))
  949              };
  950  
  951              unsafe { (*result_cell_ptr).put_back(result); }
  952  
  953              let scheduler~Scheduler = Local::take();
  954              scheduler.resume_blocked_task_immediately(task_cell.take());
  955          }
  956      }
  957  
  958      assert!(!result_cell.is_empty());
  959      result_cell.take()
  960  }
  961  
  962  fn write_stream(mut watcherStreamWatcher,
  963                  scheduler~Scheduler,
  964                  buf&[u8]) -> Result<(), IoError> {
  965      let result_cell = Cell::new_empty();
  966      let result_cell_ptr*Cell<Result<(), IoError>> = &result_cell;
  967      let buf_ptr*&[u8] = &buf;
  968      do scheduler.deschedule_running_task_and_then |_, task| {
  969          let task_cell = Cell::new(task);
  970          let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
  971          do watcher.write(buf) |_watcher, status| {
  972              let result = if status.is_none() {
  973                  Ok(())
  974              } else {
  975                  Err(uv_error_to_io_error(status.unwrap()))
  976              };
  977  
  978              unsafe { (*result_cell_ptr).put_back(result); }
  979  
  980              let scheduler~Scheduler = Local::take();
  981              scheduler.resume_blocked_task_immediately(task_cell.take());
  982          }
  983      }
  984  
  985      assert!(!result_cell.is_empty());
  986      result_cell.take()
  987  }
  988  
  989  pub struct UvUnboundPipe {
  990      pipe: Pipe,
  991      home: SchedHandle,
  992  }
  993  
  994  impl HomingIO for UvUnboundPipe {
  995      fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
  996  }
  997  
  998  impl Drop for UvUnboundPipe {
  999      fn drop(&mut self) {
 1000          do self.home_for_io |self_| {
 1001              let scheduler~Scheduler = Local::take();
 1002              do scheduler.deschedule_running_task_and_then |_, task| {
 1003                  let task_cell = Cell::new(task);
 1004                  do self_.pipe.close {
 1005                      let scheduler~Scheduler = Local::take();
 1006                      scheduler.resume_blocked_task_immediately(task_cell.take());
 1007                  }
 1008              }
 1009          }
 1010      }
 1011  }
 1012  
 1013  impl UvUnboundPipe {
 1014      pub unsafe fn bind(~self) -> UvPipeStream {
 1015          UvPipeStream { inner: self }
 1016      }
 1017  }
 1018  
 1019  pub struct UvPipeStream {
 1020      priv inner: ~UvUnboundPipe,
 1021  }
 1022  
 1023  impl UvPipeStream {
 1024      pub fn new(inner~UvUnboundPipe) -> UvPipeStream {
 1025          UvPipeStream { inner: inner }
 1026      }
 1027  }
 1028  
 1029  impl RtioPipe for UvPipeStream {
 1030      fn read(&mut self, buf&mut [u8]) -> Result<uint, IoError> {
 1031          do self.inner.home_for_io_with_sched |self_, scheduler| {
 1032              read_stream(self_.pipe.as_stream(), scheduler, buf)
 1033          }
 1034      }
 1035      fn write(&mut self, buf&[u8]) -> Result<(), IoError> {
 1036          do self.inner.home_for_io_with_sched |self_, scheduler| {
 1037              write_stream(self_.pipe.as_stream(), scheduler, buf)
 1038          }
 1039      }
 1040  }
 1041  
 1042  pub struct UvTcpStream {
 1043      watcher: TcpWatcher,
 1044      home: SchedHandle,
 1045  }
 1046  
 1047  impl HomingIO for UvTcpStream {
 1048      fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 1049  }
 1050  
 1051  impl Drop for UvTcpStream {
 1052      fn drop(&mut self) {
 1053          do self.home_for_io_with_sched |self_, scheduler| {
 1054              do scheduler.deschedule_running_task_and_then |_, task| {
 1055                  let task_cell = Cell::new(task);
 1056                  do self_.watcher.as_stream().close {
 1057                      let scheduler~Scheduler = Local::take();
 1058                      scheduler.resume_blocked_task_immediately(task_cell.take());
 1059                  }
 1060              }
 1061          }
 1062      }
 1063  }
 1064  
 1065  impl RtioSocket for UvTcpStream {
 1066      fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
 1067          do self.home_for_io |self_| {
 1068              socket_name(Tcp, self_.watcher)
 1069          }
 1070      }
 1071  }
 1072  
 1073  impl RtioTcpStream for UvTcpStream {
 1074      fn read(&mut self, buf&mut [u8]) -> Result<uint, IoError> {
 1075          do self.home_for_io_with_sched |self_, scheduler| {
 1076              read_stream(self_.watcher.as_stream(), scheduler, buf)
 1077          }
 1078      }
 1079  
 1080      fn write(&mut self, buf&[u8]) -> Result<(), IoError> {
 1081          do self.home_for_io_with_sched |self_, scheduler| {
 1082              write_stream(self_.watcher.as_stream(), scheduler, buf)
 1083          }
 1084      }
 1085  
 1086      fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
 1087          do self.home_for_io |self_| {
 1088              socket_name(TcpPeer, self_.watcher)
 1089          }
 1090      }
 1091  
 1092      fn control_congestion(&mut self) -> Result<(), IoError> {
 1093          do self.home_for_io |self_| {
 1094              let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
 1095  
 1096              match status_to_maybe_uv_error(r) {
 1097                  Some(err) => Err(uv_error_to_io_error(err)),
 1098                  None => Ok(())
 1099              }
 1100          }
 1101      }
 1102  
 1103      fn nodelay(&mut self) -> Result<(), IoError> {
 1104          do self.home_for_io |self_| {
 1105              let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
 1106  
 1107              match status_to_maybe_uv_error(r) {
 1108                  Some(err) => Err(uv_error_to_io_error(err)),
 1109                  None => Ok(())
 1110              }
 1111          }
 1112      }
 1113  
 1114      fn keepalive(&mut self, delay_in_secondsuint) -> Result<(), IoError> {
 1115          do self.home_for_io |self_| {
 1116              let r = unsafe {
 1117                  uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
 1118                                      delay_in_seconds as c_uint)
 1119              };
 1120  
 1121              match status_to_maybe_uv_error(r) {
 1122                  Some(err) => Err(uv_error_to_io_error(err)),
 1123                  None => Ok(())
 1124              }
 1125          }
 1126      }
 1127  
 1128      fn letdie(&mut self) -> Result<(), IoError> {
 1129          do self.home_for_io |self_| {
 1130              let r = unsafe {
 1131                  uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
 1132              };
 1133  
 1134              match status_to_maybe_uv_error(r) {
 1135                  Some(err) => Err(uv_error_to_io_error(err)),
 1136                  None => Ok(())
 1137              }
 1138          }
 1139      }
 1140  }
 1141  
 1142  pub struct UvUdpSocket {
 1143      watcher: UdpWatcher,
 1144      home: SchedHandle,
 1145  }
 1146  
 1147  impl HomingIO for UvUdpSocket {
 1148      fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 1149  }
 1150  
 1151  impl Drop for UvUdpSocket {
 1152      fn drop(&mut self) {
 1153          do self.home_for_io_with_sched |self_, scheduler| {
 1154              do scheduler.deschedule_running_task_and_then |_, task| {
 1155                  let task_cell = Cell::new(task);
 1156                  do self_.watcher.close {
 1157                      let scheduler~Scheduler = Local::take();
 1158                      scheduler.resume_blocked_task_immediately(task_cell.take());
 1159                  }
 1160              }
 1161          }
 1162      }
 1163  }
 1164  
 1165  impl RtioSocket for UvUdpSocket {
 1166      fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
 1167          do self.home_for_io |self_| {
 1168              socket_name(Udp, self_.watcher)
 1169          }
 1170      }
 1171  }
 1172  
 1173  impl RtioUdpSocket for UvUdpSocket {
 1174      fn recvfrom(&mut self, buf&mut [u8]) -> Result<(uint, SocketAddr), IoError> {
 1175          do self.home_for_io_with_sched |self_, scheduler| {
 1176              let result_cell = Cell::new_empty();
 1177              let result_cell_ptr*Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
 1178  
 1179              let buf_ptr*&mut [u8] = &buf;
 1180              do scheduler.deschedule_running_task_and_then |_, task| {
 1181                  let task_cell = Cell::new(task);
 1182                  let allocAllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
 1183                  do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
 1184                      let _ = flags; // /XXX add handling for partials?
 1185  
 1186                      watcher.recv_stop();
 1187  
 1188                      let result = match status {
 1189                          None => {
 1190                              assert!(nread >= 0);
 1191                              Ok((nread as uint, addr))
 1192                          }
 1193                          Some(err) => Err(uv_error_to_io_error(err)),
 1194                      };
 1195  
 1196                      unsafe { (*result_cell_ptr).put_back(result); }
 1197  
 1198                      let scheduler~Scheduler = Local::take();
 1199                      scheduler.resume_blocked_task_immediately(task_cell.take());
 1200                  }
 1201              }
 1202  
 1203              assert!(!result_cell.is_empty());
 1204              result_cell.take()
 1205          }
 1206      }
 1207  
 1208      fn sendto(&mut self, buf&[u8], dstSocketAddr) -> Result<(), IoError> {
 1209          do self.home_for_io_with_sched |self_, scheduler| {
 1210              let result_cell = Cell::new_empty();
 1211              let result_cell_ptr*Cell<Result<(), IoError>> = &result_cell;
 1212              let buf_ptr*&[u8] = &buf;
 1213              do scheduler.deschedule_running_task_and_then |_, task| {
 1214                  let task_cell = Cell::new(task);
 1215                  let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
 1216                  do self_.watcher.send(buf, dst) |_watcher, status| {
 1217  
 1218                      let result = match status {
 1219                          None => Ok(()),
 1220                          Some(err) => Err(uv_error_to_io_error(err)),
 1221                      };
 1222  
 1223                      unsafe { (*result_cell_ptr).put_back(result); }
 1224  
 1225                      let scheduler~Scheduler = Local::take();
 1226                      scheduler.resume_blocked_task_immediately(task_cell.take());
 1227                  }
 1228              }
 1229  
 1230              assert!(!result_cell.is_empty());
 1231              result_cell.take()
 1232          }
 1233      }
 1234  
 1235      fn join_multicast(&mut self, multiIpAddr) -> Result<(), IoError> {
 1236          do self.home_for_io |self_| {
 1237              let r = unsafe {
 1238                  do multi.to_str().with_c_str |m_addr| {
 1239                      uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
 1240                                               ptr::null(), uvll::UV_JOIN_GROUP)
 1241                  }
 1242              };
 1243  
 1244              match status_to_maybe_uv_error(r) {
 1245                  Some(err) => Err(uv_error_to_io_error(err)),
 1246                  None => Ok(())
 1247              }
 1248          }
 1249      }
 1250  
 1251      fn leave_multicast(&mut self, multiIpAddr) -> Result<(), IoError> {
 1252          do self.home_for_io |self_| {
 1253              let r = unsafe {
 1254                  do multi.to_str().with_c_str |m_addr| {
 1255                      uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
 1256                                               ptr::null(), uvll::UV_LEAVE_GROUP)
 1257                  }
 1258              };
 1259  
 1260              match status_to_maybe_uv_error(r) {
 1261                  Some(err) => Err(uv_error_to_io_error(err)),
 1262                  None => Ok(())
 1263              }
 1264          }
 1265      }
 1266  
 1267      fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
 1268          do self.home_for_io |self_| {
 1269  
 1270              let r = unsafe {
 1271                  uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
 1272              };
 1273  
 1274              match status_to_maybe_uv_error(r) {
 1275                  Some(err) => Err(uv_error_to_io_error(err)),
 1276                  None => Ok(())
 1277              }
 1278          }
 1279      }
 1280  
 1281      fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
 1282          do self.home_for_io |self_| {
 1283  
 1284              let r = unsafe {
 1285                  uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
 1286              };
 1287  
 1288              match status_to_maybe_uv_error(r) {
 1289                  Some(err) => Err(uv_error_to_io_error(err)),
 1290                  None => Ok(())
 1291              }
 1292          }
 1293      }
 1294  
 1295      fn multicast_time_to_live(&mut self, ttlint) -> Result<(), IoError> {
 1296          do self.home_for_io |self_| {
 1297  
 1298              let r = unsafe {
 1299                  uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
 1300              };
 1301  
 1302              match status_to_maybe_uv_error(r) {
 1303                  Some(err) => Err(uv_error_to_io_error(err)),
 1304                  None => Ok(())
 1305              }
 1306          }
 1307      }
 1308  
 1309      fn time_to_live(&mut self, ttlint) -> Result<(), IoError> {
 1310          do self.home_for_io |self_| {
 1311  
 1312              let r = unsafe {
 1313                  uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
 1314              };
 1315  
 1316              match status_to_maybe_uv_error(r) {
 1317                  Some(err) => Err(uv_error_to_io_error(err)),
 1318                  None => Ok(())
 1319              }
 1320          }
 1321      }
 1322  
 1323      fn hear_broadcasts(&mut self) -> Result<(), IoError> {
 1324          do self.home_for_io |self_| {
 1325  
 1326              let r = unsafe {
 1327                  uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
 1328              };
 1329  
 1330              match status_to_maybe_uv_error(r) {
 1331                  Some(err) => Err(uv_error_to_io_error(err)),
 1332                  None => Ok(())
 1333              }
 1334          }
 1335      }
 1336  
 1337      fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
 1338          do self.home_for_io |self_| {
 1339  
 1340              let r = unsafe {
 1341                  uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
 1342              };
 1343  
 1344              match status_to_maybe_uv_error(r) {
 1345                  Some(err) => Err(uv_error_to_io_error(err)),
 1346                  None => Ok(())
 1347              }
 1348          }
 1349      }
 1350  }
 1351  
 1352  pub struct UvTimer {
 1353      watcher: timer::TimerWatcher,
 1354      home: SchedHandle,
 1355  }
 1356  
 1357  impl HomingIO for UvTimer {
 1358      fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 1359  }
 1360  
 1361  impl UvTimer {
 1362      fn new(wtimer::TimerWatcher, homeSchedHandle) -> UvTimer {
 1363          UvTimer { watcher: w, home: home }
 1364      }
 1365  }
 1366  
 1367  impl Drop for UvTimer {
 1368      fn drop(&mut self) {
 1369          do self.home_for_io_with_sched |self_, scheduler| {
 1370              rtdebug!("closing UvTimer");
 1371              do scheduler.deschedule_running_task_and_then |_, task| {
 1372                  let task_cell = Cell::new(task);
 1373                  do self_.watcher.close {
 1374                      let scheduler~Scheduler = Local::take();
 1375                      scheduler.resume_blocked_task_immediately(task_cell.take());
 1376                  }
 1377              }
 1378          }
 1379      }
 1380  }
 1381  
 1382  impl RtioTimer for UvTimer {
 1383      fn sleep(&mut self, msecsu64) {
 1384          do self.home_for_io_with_sched |self_, scheduler| {
 1385              do scheduler.deschedule_running_task_and_then |_sched, task| {
 1386                  rtdebug!("sleep: entered scheduler context");
 1387                  let task_cell = Cell::new(task);
 1388                  do self_.watcher.start(msecs, 0) |_, status| {
 1389                      assert!(status.is_none());
 1390                      let scheduler~Scheduler = Local::take();
 1391                      scheduler.resume_blocked_task_immediately(task_cell.take());
 1392                  }
 1393              }
 1394              self_.watcher.stop();
 1395          }
 1396      }
 1397  }
 1398  
 1399  pub struct UvFileStream {
 1400      loop_: Loop,
 1401      fd: c_int,
 1402      close_on_drop: bool,
 1403      home: SchedHandle
 1404  }
 1405  
 1406  impl HomingIO for UvFileStream {
 1407      fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
 1408  }
 1409  
 1410  impl UvFileStream {
 1411      fn new(loop_Loop, fdc_int, close_on_dropbool,
 1412             homeSchedHandle) -> UvFileStream {
 1413          UvFileStream {
 1414              loop_: loop_,
 1415              fd: fd,
 1416              close_on_drop: close_on_drop,
 1417              home: home
 1418          }
 1419      }
 1420      fn base_read(&mut self, buf&mut [u8], offseti64) -> Result<int, IoError> {
 1421          let result_cell = Cell::new_empty();
 1422          let result_cell_ptr*Cell<Result<int, IoError>> = &result_cell;
 1423          let buf_ptr*&mut [u8] = &buf;
 1424          do self.home_for_io_with_sched |self_, scheduler| {
 1425              do scheduler.deschedule_running_task_and_then |_, task| {
 1426                  let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
 1427                  let task_cell = Cell::new(task);
 1428                  let read_req = file::FsRequest::new();
 1429                  do read_req.read(&self_.loop_, self_.fd, buf, offset) |req, uverr| {
 1430                      let res = match uverr  {
 1431                          None => Ok(req.get_result() as int),
 1432                          Some(err) => Err(uv_error_to_io_error(err))
 1433                      };
 1434                      unsafe { (*result_cell_ptr).put_back(res); }
 1435                      let scheduler~Scheduler = Local::take();
 1436                      scheduler.resume_blocked_task_immediately(task_cell.take());
 1437                  };
 1438              };
 1439          };
 1440          result_cell.take()
 1441      }
 1442      fn base_write(&mut self, buf&[u8], offseti64) -> Result<(), IoError> {
 1443          let result_cell = Cell::new_empty();
 1444          let result_cell_ptr*Cell<Result<(), IoError>> = &result_cell;
 1445          let buf_ptr*&[u8] = &buf;
 1446          do self.home_for_io_with_sched |self_, scheduler| {
 1447              do scheduler.deschedule_running_task_and_then |_, task| {
 1448                  let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
 1449                  let task_cell = Cell::new(task);
 1450                  let write_req = file::FsRequest::new();
 1451                  do write_req.write(&self_.loop_, self_.fd, buf, offset) |_, uverr| {
 1452                      let res = match uverr  {
 1453                          None => Ok(()),
 1454                          Some(err) => Err(uv_error_to_io_error(err))
 1455                      };
 1456                      unsafe { (*result_cell_ptr).put_back(res); }
 1457                      let scheduler~Scheduler = Local::take();
 1458                      scheduler.resume_blocked_task_immediately(task_cell.take());
 1459                  };
 1460              };
 1461          };
 1462          result_cell.take()
 1463      }
 1464      fn seek_common(&mut self, posi64, whencec_int) ->
 1465          Result<u64, IoError>{
 1466          #[fixed_stack_segment]; #[inline(never)];
 1467          unsafe {
 1468              match lseek(self.fd, pos as off_t, whence) {
 1469                  -1 => {
 1470                      Err(IoError {
 1471                          kind: OtherIoError,
 1472                          desc: "Failed to lseek.",
 1473                          detail: None
 1474                      })
 1475                  },
 1476                  n => Ok(n as u64)
 1477              }
 1478          }
 1479      }
 1480  }
 1481  
 1482  impl Drop for UvFileStream {
 1483      fn drop(&mut self) {
 1484          if self.close_on_drop {
 1485              do self.home_for_io_with_sched |self_, scheduler| {
 1486                  do scheduler.deschedule_running_task_and_then |_, task| {
 1487                      let task_cell = Cell::new(task);
 1488                      let close_req = file::FsRequest::new();
 1489                      do close_req.close(&self_.loop_, self_.fd) |_,_| {
 1490                          let scheduler~Scheduler = Local::take();
 1491                          scheduler.resume_blocked_task_immediately(task_cell.take());
 1492                      };
 1493                  };
 1494              }
 1495          }
 1496      }
 1497  }
 1498  
 1499  impl RtioFileStream for UvFileStream {
 1500      fn read(&mut self, buf&mut [u8]) -> Result<int, IoError> {
 1501          self.base_read(buf, -1)
 1502      }
 1503      fn write(&mut self, buf&[u8]) -> Result<(), IoError> {
 1504          self.base_write(buf, -1)
 1505      }
 1506      fn pread(&mut self, buf&mut [u8], offsetu64) -> Result<int, IoError> {
 1507          self.base_read(buf, offset as i64)
 1508      }
 1509      fn pwrite(&mut self, buf&[u8], offsetu64) -> Result<(), IoError> {
 1510          self.base_write(buf, offset as i64)
 1511      }
 1512      fn seek(&mut self, posi64, whenceSeekStyle) -> Result<u64, IoError> {
 1513          use libc::{SEEK_SET, SEEK_CUR, SEEK_END};
 1514          let whence = match whence {
 1515              SeekSet => SEEK_SET,
 1516              SeekCur => SEEK_CUR,
 1517              SeekEnd => SEEK_END
 1518          };
 1519          self.seek_common(pos, whence)
 1520      }
 1521      fn tell(&self) -> Result<u64, IoError> {
 1522          use libc::SEEK_CUR;
 1523          // this is temporary
 1524          let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
 1525          self_.seek_common(0, SEEK_CUR)
 1526      }
 1527      fn flush(&mut self) -> Result<(), IoError> {
 1528          Ok(())
 1529      }
 1530  }
 1531  
 1532  pub struct UvProcess {
 1533      process: process::Process,
 1534  
 1535      // Sadly, this structure must be created before we return it, so in that
 1536      // brief interim the `home` is None.
 1537      home: Option<SchedHandle>,
 1538  
 1539      // All None until the process exits (exit_error may stay None)
 1540      priv exit_status: Option<int>,
 1541      priv term_signal: Option<int>,
 1542      priv exit_error: Option<UvError>,
 1543  
 1544      // Used to store which task to wake up from the exit_cb
 1545      priv descheduled: Option<BlockedTask>,
 1546  }
 1547  
 1548  impl HomingIO for UvProcess {
 1549      fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() }
 1550  }
 1551  
 1552  impl Drop for UvProcess {
 1553      fn drop(&mut self) {
 1554          let close = |self_&mut UvProcess{
 1555              let scheduler~Scheduler = Local::take();
 1556              do scheduler.deschedule_running_task_and_then |_, task| {
 1557                  let task = Cell::new(task);
 1558                  do self_.process.close {
 1559                      let scheduler~Scheduler = Local::take();
 1560                      scheduler.resume_blocked_task_immediately(task.take());
 1561                  }
 1562              }
 1563          };
 1564  
 1565          // If home is none, then this process never actually successfully
 1566          // spawned, so there's no need to switch event loops
 1567          if self.home.is_none() {
 1568              close(self)
 1569          } else {
 1570              self.home_for_io(close)
 1571          }
 1572      }
 1573  }
 1574  
 1575  impl RtioProcess for UvProcess {
 1576      fn id(&self) -> pid_t {
 1577          self.process.pid()
 1578      }
 1579  
 1580      fn kill(&mut self, signalint) -> Result<(), IoError> {
 1581          do self.home_for_io |self_| {
 1582              match self_.process.kill(signal) {
 1583                  Ok(()) => Ok(()),
 1584                  Err(uverr) => Err(uv_error_to_io_error(uverr))
 1585              }
 1586          }
 1587      }
 1588  
 1589      fn wait(&mut self) -> int {
 1590          // Make sure (on the home scheduler) that we have an exit status listed
 1591          do self.home_for_io |self_| {
 1592              match self_.exit_status {
 1593                  Some(*) => {}
 1594                  None => {
 1595                      // If there's no exit code previously listed, then the
 1596                      // process's exit callback has yet to be invoked. We just
 1597                      // need to deschedule ourselves and wait to be reawoken.
 1598                      let scheduler~Scheduler = Local::take();
 1599                      do scheduler.deschedule_running_task_and_then |_, task| {
 1600                          assert!(self_.descheduled.is_none());
 1601                          self_.descheduled = Some(task);
 1602                      }
 1603                      assert!(self_.exit_status.is_some());
 1604                  }
 1605              }
 1606          }
 1607  
 1608          self.exit_status.unwrap()
 1609      }
 1610  }
 1611  
 1612  #[test]
 1613  fn test_simple_io_no_connect() {
 1614      do run_in_mt_newsched_task {
 1615          unsafe {
 1616              let io: *mut IoFactoryObject = Local::unsafe_borrow();
 1617              let addr = next_test_ip4();
 1618              let maybe_chan = (*io).tcp_connect(addr);
 1619              assert!(maybe_chan.is_err());
 1620          }
 1621      }
 1622  }
 1623  
 1624  #[test]
 1625  fn test_simple_udp_io_bind_only() {
 1626      do run_in_mt_newsched_task {
 1627          unsafe {
 1628              let io: *mut IoFactoryObject = Local::unsafe_borrow();
 1629              let addr = next_test_ip4();
 1630              let maybe_socket = (*io).udp_bind(addr);
 1631              assert!(maybe_socket.is_ok());
 1632          }
 1633      }
 1634  }
 1635  
 1636  #[test]
 1637  fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
 1638      use rt::sleeper_list::SleeperList;
 1639      use rt::work_queue::WorkQueue;
 1640      use rt::thread::Thread;
 1641      use rt::task::Task;
 1642      use rt::sched::{Shutdown, TaskFromFriend};
 1643      do run_in_bare_thread {
 1644          let sleepers = SleeperList::new();
 1645          let work_queue1 = WorkQueue::new();
 1646          let work_queue2 = WorkQueue::new();
 1647          let queues = ~[work_queue1.clone(), work_queue2.clone()];
 1648  
 1649          let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
 1650                                           sleepers.clone());
 1651          let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
 1652                                           sleepers.clone());
 1653  
 1654          let handle1 = Cell::new(sched1.make_handle());
 1655          let handle2 = Cell::new(sched2.make_handle());
 1656          let tasksFriendHandle = Cell::new(sched2.make_handle());
 1657  
 1658          let on_exit: ~fn(bool) = |exit_status| {
 1659              handle1.take().send(Shutdown);
 1660              handle2.take().send(Shutdown);
 1661              rtassert!(exit_status);
 1662          };
 1663  
 1664          let test_function: ~fn() = || {
 1665              let io: *mut IoFactoryObject = unsafe {
 1666                  Local::unsafe_borrow()
 1667              };
 1668              let addr = next_test_ip4();
 1669              let maybe_socket = unsafe { (*io).udp_bind(addr) };
 1670              // this socket is bound to this event loop
 1671              assert!(maybe_socket.is_ok());
 1672  
 1673              // block self on sched1
 1674              do task::unkillable { // FIXME(#8674)
 1675                  let scheduler: ~Scheduler = Local::take();
 1676                  do scheduler.deschedule_running_task_and_then |_, task| {
 1677                      // unblock task
 1678                      do task.wake().map |task| {
 1679                        // send self to sched2
 1680                        tasksFriendHandle.take().send(TaskFromFriend(task));
 1681                      };
 1682                      // sched1 should now sleep since it has nothing else to do
 1683                  }
 1684              }
 1685              // sched2 will wake up and get the task
 1686              // as we do nothing else, the function ends and the socket goes out of scope
 1687              // sched2 will start to run the destructor
 1688              // the destructor will first block the task, set it's home as sched1, then enqueue it
 1689              // sched2 will dequeue the task, see that it has a home, and send it to sched1
 1690              // sched1 will wake up, exec the close function on the correct loop, and then we're done
 1691          };
 1692  
 1693          let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
 1694          main_task.death.on_exit = Some(on_exit);
 1695          let main_task = Cell::new(main_task);
 1696  
 1697          let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
 1698  
 1699          let sched1 = Cell::new(sched1);
 1700          let sched2 = Cell::new(sched2);
 1701  
 1702          let thread1 = do Thread::start {
 1703              sched1.take().bootstrap(main_task.take());
 1704          };
 1705          let thread2 = do Thread::start {
 1706              sched2.take().bootstrap(null_task.take());
 1707          };
 1708  
 1709          thread1.join();
 1710          thread2.join();
 1711      }
 1712  }
 1713  
 1714  #[test]
 1715  fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
 1716      use rt::sleeper_list::SleeperList;
 1717      use rt::work_queue::WorkQueue;
 1718      use rt::thread::Thread;
 1719      use rt::task::Task;
 1720      use rt::comm::oneshot;
 1721      use rt::sched::Shutdown;
 1722      do run_in_bare_thread {
 1723          let sleepers = SleeperList::new();
 1724          let work_queue1 = WorkQueue::new();
 1725          let work_queue2 = WorkQueue::new();
 1726          let queues = ~[work_queue1.clone(), work_queue2.clone()];
 1727  
 1728          let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
 1729                                           sleepers.clone());
 1730          let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
 1731                                           sleepers.clone());
 1732  
 1733          let handle1 = Cell::new(sched1.make_handle());
 1734          let handle2 = Cell::new(sched2.make_handle());
 1735  
 1736          let (port, chan) = oneshot();
 1737          let port = Cell::new(port);
 1738          let chan = Cell::new(chan);
 1739  
 1740          let body1: ~fn() = || {
 1741              let io: *mut IoFactoryObject = unsafe {
 1742                  Local::unsafe_borrow()
 1743              };
 1744              let addr = next_test_ip4();
 1745              let socket = unsafe { (*io).udp_bind(addr) };
 1746              assert!(socket.is_ok());
 1747              chan.take().send(socket);
 1748          };
 1749  
 1750          let body2: ~fn() = || {
 1751              let socket = port.take().recv();
 1752              assert!(socket.is_ok());
 1753              /* The socket goes out of scope and the destructor is called.
 1754               * The destructor:
 1755               *  - sends itself back to sched1
 1756               *  - frees the socket
 1757               *  - resets the home of the task to whatever it was previously
 1758               */
 1759          };
 1760  
 1761          let on_exit: ~fn(bool) = |exit| {
 1762              handle1.take().send(Shutdown);
 1763              handle2.take().send(Shutdown);
 1764              rtassert!(exit);
 1765          };
 1766  
 1767          let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
 1768  
 1769          let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
 1770          task2.death.on_exit = Some(on_exit);
 1771          let task2 = Cell::new(task2);
 1772  
 1773          let sched1 = Cell::new(sched1);
 1774          let sched2 = Cell::new(sched2);
 1775  
 1776          let thread1 = do Thread::start {
 1777              sched1.take().bootstrap(task1.take());
 1778          };
 1779          let thread2 = do Thread::start {
 1780              sched2.take().bootstrap(task2.take());
 1781          };
 1782  
 1783          thread1.join();
 1784          thread2.join();
 1785      }
 1786  }
 1787  
 1788  #[test]
 1789  fn test_simple_tcp_server_and_client() {
 1790      do run_in_mt_newsched_task {
 1791          let addr = next_test_ip4();
 1792          let (port, chan) = oneshot();
 1793          let port = Cell::new(port);
 1794          let chan = Cell::new(chan);
 1795  
 1796          // Start the server first so it's listening when we connect
 1797          do spawntask {
 1798              unsafe {
 1799                  let io: *mut IoFactoryObject = Local::unsafe_borrow();
 1800                  let listener = (*io).tcp_bind(addr).unwrap();
 1801                  let mut acceptor = listener.listen().unwrap();
 1802                  chan.take().send(());
 1803                  let mut stream = acceptor.accept().unwrap();
 1804                  let mut buf = [0, .. 2048];
 1805                  let nread = stream.read(buf).unwrap();
 1806                  assert_eq!(nread, 8);
 1807                  for i in range(0u, nread) {
 1808                      rtdebug!("{}", buf[i]);
 1809                      assert_eq!(buf[i], i as u8);
 1810                  }
 1811              }
 1812          }
 1813  
 1814          do spawntask {
 1815              unsafe {
 1816                  port.take().recv();
 1817                  let io: *mut IoFactoryObject = Local::unsafe_borrow();
 1818                  let mut stream = (*io).tcp_connect(addr).unwrap();
 1819                  stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
 1820              }
 1821          }
 1822      }
 1823  }
 1824  
 1825  #[test]
 1826  fn test_simple_tcp_server_and_client_on_diff_threads() {
 1827      use rt::sleeper_list::SleeperList;
 1828      use rt::work_queue::WorkQueue;
 1829      use rt::thread::Thread;
 1830      use rt::task::Task;
 1831      use rt::sched::{Shutdown};
 1832      do run_in_bare_thread {
 1833          let sleepers = SleeperList::new();
 1834  
 1835          let server_addr = next_test_ip4();
 1836          let client_addr = server_addr.clone();
 1837  
 1838          let server_work_queue = WorkQueue::new();
 1839          let client_work_queue = WorkQueue::new();
 1840          let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
 1841  
 1842          let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
 1843                                                 queues.clone(), sleepers.clone());
 1844          let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
 1845                                                 queues.clone(), sleepers.clone());
 1846  
 1847          let server_handle = Cell::new(server_sched.make_handle());
 1848          let client_handle = Cell::new(client_sched.make_handle());
 1849  
 1850          let server_on_exit: ~fn(bool) = |exit_status| {
 1851              server_handle.take().send(Shutdown);
 1852              rtassert!(exit_status);
 1853          };
 1854  
 1855          let client_on_exit: ~fn(bool) = |exit_status| {
 1856              client_handle.take().send(Shutdown);
 1857              rtassert!(exit_status);
 1858          };
 1859  
 1860          let server_fn: ~fn() = || {
 1861              let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
 1862              let listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
 1863              let mut acceptor = listener.listen().unwrap();
 1864              let mut stream = acceptor.accept().unwrap();
 1865              let mut buf = [0, .. 2048];
 1866              let nread = stream.read(buf).unwrap();
 1867              assert_eq!(nread, 8);
 1868              for i in range(0u, nread) {
 1869                  assert_eq!(buf[i], i as u8);
 1870              }
 1871          };
 1872  
 1873          let client_fn: ~fn() = || {
 1874              let io: *mut IoFactoryObject = unsafe {
 1875                  Local::unsafe_borrow()
 1876              };
 1877              let mut stream = unsafe { (*io).tcp_connect(client_addr) };
 1878              while stream.is_err() {
 1879                  stream = unsafe { (*io).tcp_connect(client_addr) };
 1880              }
 1881              stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
 1882          };
 1883  
 1884          let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
 1885          server_task.death.on_exit = Some(server_on_exit);
 1886          let server_task = Cell::new(server_task);
 1887  
 1888          let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
 1889          client_task.death.on_exit = Some(client_on_exit);
 1890          let client_task = Cell::new(client_task);
 1891  
 1892          let server_sched = Cell::new(server_sched);
 1893          let client_sched = Cell::new(client_sched);
 1894  
 1895          let server_thread = do Thread::start {
 1896              server_sched.take().bootstrap(server_task.take());
 1897          };
 1898          let client_thread = do Thread::start {
 1899              client_sched.take().bootstrap(client_task.take());
 1900          };
 1901  
 1902          server_thread.join();
 1903          client_thread.join();
 1904      }
 1905  }
 1906  
 1907  #[test]
 1908  fn test_simple_udp_server_and_client() {
 1909      do run_in_mt_newsched_task {
 1910          let server_addr = next_test_ip4();
 1911          let client_addr = next_test_ip4();
 1912          let (port, chan) = oneshot();
 1913          let port = Cell::new(port);
 1914          let chan = Cell::new(chan);
 1915  
 1916          do spawntask {
 1917              unsafe {
 1918                  let io: *mut IoFactoryObject = Local::unsafe_borrow();
 1919                  let mut server_socket = (*io).udp_bind(server_addr).unwrap();
 1920                  chan.take().send(());
 1921                  let mut buf = [0, .. 2048];
 1922                  let (nread,src) = server_socket.recvfrom(buf).unwrap();
 1923                  assert_eq!(nread, 8);
 1924                  for i in range(0u, nread) {
 1925                      rtdebug!("{}", buf[i]);
 1926                      assert_eq!(buf[i], i as u8);
 1927                  }
 1928                  assert_eq!(src, client_addr);
 1929              }
 1930          }
 1931  
 1932          do spawntask {
 1933              unsafe {
 1934                  let io: *mut IoFactoryObject = Local::unsafe_borrow();
 1935                  let mut client_socket = (*io).udp_bind(client_addr).unwrap();
 1936                  port.take().recv();
 1937                  client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
 1938              }
 1939          }
 1940      }
 1941  }
 1942  
 1943  #[test] #[ignore(reason = "busted")]
 1944  fn test_read_and_block() {
 1945      do run_in_mt_newsched_task {
 1946          let addr = next_test_ip4();
 1947          let (port, chan) = oneshot();
 1948          let port = Cell::new(port);
 1949          let chan = Cell::new(chan);
 1950  
 1951          do spawntask {
 1952              let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
 1953              let listener = unsafe { (*io).tcp_bind(addr).unwrap() };
 1954              let mut acceptor = listener.listen().unwrap();
 1955              chan.take().send(());
 1956              let mut stream = acceptor.accept().unwrap();
 1957              let mut buf = [0, .. 2048];
 1958  
 1959              let expected = 32;
 1960              let mut current = 0;
 1961              let mut reads = 0;
 1962  
 1963              while current < expected {
 1964                  let nread = stream.read(buf).unwrap();
 1965                  for i in range(0u, nread) {
 1966                      let val = buf[i] as uint;
 1967                      assert_eq!(val, current % 8);
 1968                      current += 1;
 1969                  }
 1970                  reads += 1;
 1971  
 1972                  do task::unkillable { // FIXME(#8674)
 1973                      let scheduler: ~Scheduler = Local::take();
 1974                      // Yield to the other task in hopes that it
 1975                      // will trigger a read callback while we are
 1976                      // not ready for it
 1977                      do scheduler.deschedule_running_task_and_then |sched, task| {
 1978                          let task = Cell::new(task);
 1979                          sched.enqueue_blocked_task(task.take());
 1980                      }
 1981                  }
 1982              }
 1983  
 1984              // Make sure we had multiple reads
 1985              assert!(reads > 1);
 1986          }
 1987  
 1988          do spawntask {
 1989              unsafe {
 1990                  port.take().recv();
 1991                  let io: *mut IoFactoryObject = Local::unsafe_borrow();
 1992                  let mut stream = (*io).tcp_connect(addr).unwrap();
 1993                  stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
 1994                  stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
 1995                  stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
 1996                  stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
 1997              }
 1998          }
 1999  
 2000      }
 2001  }
 2002  
 2003  #[test]
 2004  fn test_read_read_read() {
 2005      do run_in_mt_newsched_task {
 2006          let addr = next_test_ip4();
 2007          static MAX: uint = 500000;
 2008          let (port, chan) = oneshot();
 2009          let port = Cell::new(port);
 2010          let chan = Cell::new(chan);
 2011  
 2012          do spawntask {
 2013              unsafe {
 2014                  let io: *mut IoFactoryObject = Local::unsafe_borrow();
 2015                  let listener = (*io).tcp_bind(addr).unwrap();
 2016                  let mut acceptor = listener.listen().unwrap();
 2017                  chan.take().send(());
 2018                  let mut stream = acceptor.accept().unwrap();
 2019                  let buf = [1, .. 2048];
 2020                  let mut total_bytes_written = 0;
 2021                  while total_bytes_written < MAX {
 2022                      stream.write(buf);
 2023                      total_bytes_written += buf.len();
 2024                  }
 2025              }
 2026          }
 2027  
 2028          do spawntask {
 2029              unsafe {
 2030                  port.take().recv();
 2031                  let io: *mut IoFactoryObject = Local::unsafe_borrow();
 2032                  let mut stream = (*io).tcp_connect(addr).unwrap();
 2033                  let mut buf = [0, .. 2048];
 2034                  let mut total_bytes_read = 0;
 2035                  while total_bytes_read < MAX {
 2036                      let nread = stream.read(buf).unwrap();
 2037                      rtdebug!("read {} bytes", nread);
 2038                      total_bytes_read += nread;
 2039                      for i in range(0u, nread) {
 2040                          assert_eq!(buf[i], 1);
 2041                      }
 2042                  }
 2043                  rtdebug!("read {} bytes total", total_bytes_read);
 2044              }
 2045          }
 2046      }
 2047  }
 2048  
 2049  #[test]
 2050  fn test_udp_twice() {
 2051      do run_in_mt_newsched_task {
 2052          let server_addr = next_test_ip4();
 2053          let client_addr = next_test_ip4();
 2054          let (port, chan) = oneshot();
 2055          let port = Cell::new(port);
 2056          let chan = Cell::new(chan);
 2057  
 2058          do spawntask {
 2059              unsafe {
 2060                  let io: *mut IoFactoryObject = Local::unsafe_borrow();
 2061                  let mut client = (*io).udp_bind(client_addr).unwrap();
 2062                  port.take().recv();
 2063                  assert!(client.sendto([1], server_addr).is_ok());
 2064                  assert!(client.sendto([2], server_addr).is_ok());
 2065              }
 2066          }
 2067  
 2068          do spawntask {
 2069              unsafe {
 2070                  let io: *mut IoFactoryObject = Local::unsafe_borrow();
 2071                  let mut server = (*io).udp_bind(server_addr).unwrap();
 2072                  chan.take().send(());
 2073                  let mut buf1 = [0];
 2074                  let mut buf2 = [0];
 2075                  let (nread1, src1) = server.recvfrom(buf1).unwrap();
 2076                  let (nread2, src2) = server.recvfrom(buf2).unwrap();
 2077                  assert_eq!(nread1, 1);
 2078                  assert_eq!(nread2, 1);
 2079                  assert_eq!(src1, client_addr);
 2080                  assert_eq!(src2, client_addr);
 2081                  assert_eq!(buf1[0], 1);
 2082                  assert_eq!(buf2[0], 2);
 2083              }
 2084          }
 2085      }
 2086  }
 2087  
 2088  #[test]
 2089  fn test_udp_many_read() {
 2090      do run_in_mt_newsched_task {
 2091          let server_out_addr = next_test_ip4();
 2092          let server_in_addr = next_test_ip4();
 2093          let client_out_addr = next_test_ip4();
 2094          let client_in_addr = next_test_ip4();
 2095          static MAX: uint = 500_000;
 2096  
 2097          let (p1, c1) = oneshot();
 2098          let (p2, c2) = oneshot();
 2099  
 2100          let first = Cell::new((p1, c2));
 2101          let second = Cell::new((p2, c1));
 2102  
 2103          do spawntask {
 2104              unsafe {
 2105                  let io: *mut IoFactoryObject = Local::unsafe_borrow();
 2106                  let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
 2107                  let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
 2108                  let (port, chan) = first.take();
 2109                  chan.send(());
 2110                  port.recv();
 2111                  let msg = [1, .. 2048];
 2112                  let mut total_bytes_sent = 0;
 2113                  let mut buf = [1];
 2114                  while buf[0] == 1 {
 2115                      // send more data
 2116                      assert!(server_out.sendto(msg, client_in_addr).is_ok());
 2117                      total_bytes_sent += msg.len();
 2118                      // check if the client has received enough
 2119                      let res = server_in.recvfrom(buf);
 2120                      assert!(res.is_ok());
 2121                      let (nread, src) = res.unwrap();
 2122                      assert_eq!(nread, 1);
 2123                      assert_eq!(src, client_out_addr);
 2124                  }
 2125                  assert!(total_bytes_sent >= MAX);
 2126              }
 2127          }
 2128  
 2129          do spawntask {
 2130              unsafe {
 2131                  let io: *mut IoFactoryObject = Local::unsafe_borrow();
 2132                  let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
 2133                  let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
 2134                  let (port, chan) = second.take();
 2135                  port.recv();
 2136                  chan.send(());
 2137                  let mut total_bytes_recv = 0;
 2138                  let mut buf = [0, .. 2048];
 2139                  while total_bytes_recv < MAX {
 2140                      // ask for more
 2141                      assert!(client_out.sendto([1], server_in_addr).is_ok());
 2142                      // wait for data
 2143                      let res = client_in.recvfrom(buf);
 2144                      assert!(res.is_ok());
 2145                      let (nread, src) = res.unwrap();
 2146                      assert_eq!(src, server_out_addr);
 2147                      total_bytes_recv += nread;
 2148                      for i in range(0u, nread) {
 2149                          assert_eq!(buf[i], 1);
 2150                      }
 2151                  }
 2152                  // tell the server we're done
 2153                  assert!(client_out.sendto([0], server_in_addr).is_ok());
 2154              }
 2155          }
 2156      }
 2157  }
 2158  
 2159  #[test]
 2160  fn test_timer_sleep_simple() {
 2161      do run_in_mt_newsched_task {
 2162          unsafe {
 2163              let io: *mut IoFactoryObject = Local::unsafe_borrow();
 2164              let timer = (*io).timer_init();
 2165              do timer.map_move |mut t| { t.sleep(1) };
 2166          }
 2167      }
 2168  }
 2169  
 2170  fn file_test_uvio_full_simple_impl() {
 2171      use str::StrSlice; // why does this have to be explicitly imported to work?
 2172                         // compiler was complaining about no trait for str that
 2173                         // does .as_bytes() ..
 2174      use path::Path;
 2175      use rt::io::{Open, Create, ReadWrite, Read};
 2176      unsafe {
 2177          let io*mut IoFactoryObject = Local::unsafe_borrow();
 2178          let write_val = "hello uvio!";
 2179          let path = "./tmp/file_test_uvio_full.txt";
 2180          {
 2181              let create_fm = Create;
 2182              let create_fa = ReadWrite;
 2183              let mut fd = (*io).fs_open(&Path::new(path), create_fm, create_fa).unwrap();
 2184              let write_buf = write_val.as_bytes();
 2185              fd.write(write_buf);
 2186          }
 2187          {
 2188              let ro_fm = Open;
 2189              let ro_fa = Read;
 2190              let mut fd = (*io).fs_open(&Path::new(path), ro_fm, ro_fa).unwrap();
 2191              let mut read_vec = [0, .. 1028];
 2192              let nread = fd.read(read_vec).unwrap();
 2193              let read_val = str::from_utf8(read_vec.slice(0, nread as uint));
 2194              assert!(read_val == write_val.to_owned());
 2195          }
 2196          (*io).fs_unlink(&Path::new(path));
 2197      }
 2198  }
 2199  
 2200  #[test]
 2201  fn file_test_uvio_full_simple() {
 2202      do run_in_mt_newsched_task {
 2203          file_test_uvio_full_simple_impl();
 2204      }
 2205  }
 2206  
 2207  fn uvio_naive_print(input&str) {
 2208      use str::StrSlice;
 2209      unsafe {
 2210          use libc::{STDOUT_FILENO};
 2211          let io*mut IoFactoryObject = Local::unsafe_borrow();
 2212          {
 2213              let mut fd = (*io).fs_from_raw_fd(STDOUT_FILENO, false);
 2214              let write_buf = input.as_bytes();
 2215              fd.write(write_buf);
 2216          }
 2217      }
 2218  }
 2219  
 2220  #[test]
 2221  fn file_test_uvio_write_to_stdout() {
 2222      do run_in_mt_newsched_task {
 2223          uvio_naive_print("jubilation\n");
 2224      }
 2225  }

libstd/rt/uv/uvio.rs:294:100-294:100 -struct- definition:
// The entire point of async is to call into a loop from other threads so it does not need to home.
pub struct UvRemoteCallback {
references:-
354: impl Drop for UvRemoteCallback {
350: impl RemoteCallback for UvRemoteCallback {
343:         UvRemoteCallback {
304:     pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
357:             let this: &mut UvRemoteCallback = cast::transmute_mut(self);
303: impl UvRemoteCallback {
libstd/rt/rtio.rs:
28: pub type RemoteCallbackObject = uvio::UvRemoteCallback;


libstd/rt/uv/uvio.rs:1141:1-1141:1 -struct- definition:

pub struct UvUdpSocket {
references:-
524:                 Ok(~UvUdpSocket { watcher: watcher, home: home })
1165: impl RtioSocket for UvUdpSocket {
1147: impl HomingIO for UvUdpSocket {
1173: impl RtioUdpSocket for UvUdpSocket {
1151: impl Drop for UvUdpSocket {
libstd/rt/rtio.rs:
33: pub type RtioUdpSocketObject = uvio::UvUdpSocket;


libstd/rt/uv/uvio.rs:961:1-961:1 -fn- definition:

fn write_stream(mut watcher: StreamWatcher,
references:-
1037:             write_stream(self_.pipe.as_stream(), scheduler, buf)
1082:             write_stream(self_.watcher.as_stream(), scheduler, buf)


libstd/rt/uv/uvio.rs:802:1-802:1 -struct- definition:

pub struct UvTcpListener {
references:-
812: impl UvTcpListener {
814:         UvTcpListener { watcher: watcher, home: home }
813:     fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
832: impl RtioSocket for UvTcpListener {
840: impl RtioTcpListener for UvTcpListener {
818: impl Drop for UvTcpListener {
866:     listener: UvTcpListener,
808: impl HomingIO for UvTcpListener {
875:     fn new(listener: UvTcpListener) -> UvTcpAcceptor {
libstd/rt/rtio.rs:
32: pub type RtioTcpListenerObject = uvio::UvTcpListener;


libstd/rt/uv/uvio.rs:416:20-416:20 -fn- definition:
/// have no ret val
fn uv_fs_helper<P: PathLike>(loop_: &mut Loop, path: &P,
references:-
690:         do uv_fs_helper(self.uv_loop(), path) |mkdir_req, l, p, cb| {
697:         do uv_fs_helper(self.uv_loop(), path) |rmdir_req, l, p, cb| {
610:         do uv_fs_helper(self.uv_loop(), path) |unlink_req, l, p, cb| {


libstd/rt/uv/uvio.rs:1018:1-1018:1 -struct- definition:

pub struct UvPipeStream {
references:-
1024:     pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream {
1023: impl UvPipeStream {
1015:         UvPipeStream { inner: self }
1025:         UvPipeStream { inner: inner }
1014:     pub unsafe fn bind(~self) -> UvPipeStream {
1029: impl RtioPipe for UvPipeStream {
libstd/rt/uv/process.rs:
147:                     io: StdioContainer) -> Option<UvPipeStream> {
47:                     -> Result<~[Option<UvPipeStream>], uv::UvError>
libstd/rt/rtio.rs:
36: pub type RtioPipeObject = uvio::UvPipeStream;


libstd/rt/uv/uvio.rs:921:1-921:1 -fn- definition:

fn read_stream(mut watcher: StreamWatcher,
references:-
1032:             read_stream(self_.pipe.as_stream(), scheduler, buf)
1076:             read_stream(self_.watcher.as_stream(), scheduler, buf)


libstd/rt/uv/uvio.rs:56:1-56:1 -trait- definition:

trait HomingIO {
references:-
1047: impl HomingIO for UvTcpStream {
1357: impl HomingIO for UvTimer {
1406: impl HomingIO for UvFileStream {
112:         HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
89:     fn restore_original_home(_dummy_self: Option<Self>, old: SchedHome) {
1147: impl HomingIO for UvUdpSocket {
1548: impl HomingIO for UvProcess {
994: impl HomingIO for UvUnboundPipe {
124:     fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
120:         HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
109:     fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
130:         HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
808: impl HomingIO for UvTcpListener {
870: impl HomingIO for UvTcpAcceptor {
116:     fn home_for_io_consume<A>(self, io: &fn(Self) -> A) -> A {


libstd/rt/uv/uvio.rs:181:43-181:43 -struct- definition:
// Obviously an Event Loop is always home.
pub struct UvEventLoop {
references:-
186: impl UvEventLoop {
188:         UvEventLoop {
194: impl Drop for UvEventLoop {
200: impl EventLoop for UvEventLoop {
187:     pub fn new() -> UvEventLoop {
libstd/rt/rtio.rs:
27: pub type EventLoopObject = uvio::UvEventLoop;


libstd/rt/uv/uvio.rs:1351:1-1351:1 -struct- definition:

pub struct UvTimer {
references:-
1357: impl HomingIO for UvTimer {
1367: impl Drop for UvTimer {
1362:     fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
1361: impl UvTimer {
1363:         UvTimer { watcher: w, home: home }
1382: impl RtioTimer for UvTimer {
libstd/rt/rtio.rs:
34: pub type RtioTimerObject = uvio::UvTimer;


libstd/rt/uv/uvio.rs:241:1-241:1 -struct- definition:

pub struct UvPausibleIdleCallback {
references:-
248: impl UvPausibleIdleCallback {
217:         return ~UvPausibleIdleCallback {
libstd/rt/rtio.rs:
35: pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;


libstd/rt/uv/uvio.rs:1531:1-1531:1 -struct- definition:

pub struct UvProcess {
references:-
1554:         let close = |self_: &mut UvProcess| {
1552: impl Drop for UvProcess {
1548: impl HomingIO for UvProcess {
764:             *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
755:         let mut ret = ~UvProcess {
1575: impl RtioProcess for UvProcess {
764:             *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
libstd/rt/rtio.rs:
38: pub type RtioProcessObject = uvio::UvProcess;


libstd/rt/uv/uvio.rs:988:1-988:1 -struct- definition:

pub struct UvUnboundPipe {
references:-
746:         Ok(~UvUnboundPipe { pipe: Pipe::new(self.uv_loop(), ipc), home: home })
998: impl Drop for UvUnboundPipe {
1020:     priv inner: ~UvUnboundPipe,
1024:     pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream {
1013: impl UvUnboundPipe {
994: impl HomingIO for UvUnboundPipe {
libstd/rt/rtio.rs:
37: pub type RtioUnboundPipeObject = uvio::UvUnboundPipe;


libstd/rt/uv/uvio.rs:406:1-406:1 -struct- definition:

pub struct UvIoFactory(Loop);
references:-
409: impl UvIoFactory {
183:     uvio: UvIoFactory
445: impl IoFactory for UvIoFactory {
libstd/rt/rtio.rs:
29: pub type IoFactoryObject = uvio::UvIoFactory;


libstd/rt/uv/uvio.rs:864:1-864:1 -struct- definition:

pub struct UvTcpAcceptor {
references:-
870: impl HomingIO for UvTcpAcceptor {
875:     fn new(listener: UvTcpListener) -> UvTcpAcceptor {
874: impl UvTcpAcceptor {
880: impl RtioSocket for UvTcpAcceptor {
888: impl RtioTcpAcceptor for UvTcpAcceptor {
876:         UvTcpAcceptor { listener: listener, incoming: Tube::new() }
libstd/rt/rtio.rs:
31: pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor;


libstd/rt/uv/uvio.rs:139:1-139:1 -enum- definition:

enum SocketNameKind {
references:-
146: fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,


libstd/rt/uv/uvio.rs:1041:1-1041:1 -struct- definition:

pub struct UvTcpStream {
references:-
1073: impl RtioTcpStream for UvTcpStream {
469:                             let res = Ok(~UvTcpStream { watcher: tcp, home: home });
1047: impl HomingIO for UvTcpStream {
1065: impl RtioSocket for UvTcpStream {
854:                             Ok(~UvTcpStream { watcher: inc, home: home })
1051: impl Drop for UvTcpStream {
libstd/rt/rtio.rs:
30: pub type RtioTcpStreamObject = uvio::UvTcpStream;


libstd/rt/uv/uvio.rs:1398:1-1398:1 -struct- definition:

pub struct UvFileStream {
references:-
1482: impl Drop for UvFileStream {
1524:         let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
1413:         UvFileStream {
1412:            home: SchedHandle) -> UvFileStream {
1524:         let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
1406: impl HomingIO for UvFileStream {
1499: impl RtioFileStream for UvFileStream {
1410: impl UvFileStream {


libstd/rt/uv/uvio.rs:145:1-145:1 -fn- definition:

fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
references:-
883:             socket_name(Tcp, self_.listener.watcher)
1068:             socket_name(Tcp, self_.watcher)
835:             socket_name(Tcp, self_.watcher)
1168:             socket_name(Udp, self_.watcher)
1088:             socket_name(TcpPeer, self_.watcher)