(index<- )        ./librustuv/net.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
    1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
    2  // file at the top-level directory of this distribution and at
    3  // http://rust-lang.org/COPYRIGHT.
    4  //
    5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
    6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
    7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
    8  // option. This file may not be copied, modified, or distributed
    9  // except according to those terms.
   10  
   11  use libc::{size_t, ssize_t, c_int, c_void, c_uint};
   12  use libc;
   13  use std::cast;
   14  use std::io;
   15  use std::io::IoError;
   16  use std::io::net::ip;
   17  use std::mem;
   18  use std::ptr;
   19  use std::rt::rtio;
   20  use std::rt::task::BlockedTask;
   21  
   22  use homing::{HomingIO, HomeHandle};
   23  use rc::Refcount;
   24  use stream::StreamWatcher;
   25  use super::{Loop, Request, UvError, Buf, status_to_io_result,
   26              uv_error_to_io_error, UvHandle, slice_to_uv_buf,
   27              wait_until_woken_after, wakeup};
   28  use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
   29  use uvio::UvIoFactory;
   30  use uvll;
   31  
   32  ////////////////////////////////////////////////////////////////////////////////
   33  /// Generic functions related to dealing with sockaddr things
   34  ////////////////////////////////////////////////////////////////////////////////
   35  
   36  pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
   37  pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
   38  
   39  pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
   40                          len: uint) -> ip::SocketAddr {
   41      match storage.ss_family as c_int {
   42          libc::AF_INET => {
   43              assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
   44              let storage&libc::sockaddr_in = unsafe {
   45                  cast::transmute(storage)
   46              };
   47              let addr = storage.sin_addr.s_addr as u32;
   48              let a = (addr >>  0) as u8;
   49              let b = (addr >>  8) as u8;
   50              let c = (addr >> 16) as u8;
   51              let d = (addr >> 24) as u8;
   52              ip::SocketAddr {
   53                  ip: ip::Ipv4Addr(a, b, c, d),
   54                  port: ntohs(storage.sin_port),
   55              }
   56          }
   57          libc::AF_INET6 => {
   58              assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
   59              let storage&libc::sockaddr_in6 = unsafe {
   60                  cast::transmute(storage)
   61              };
   62              let a = ntohs(storage.sin6_addr.s6_addr[0]);
   63              let b = ntohs(storage.sin6_addr.s6_addr[1]);
   64              let c = ntohs(storage.sin6_addr.s6_addr[2]);
   65              let d = ntohs(storage.sin6_addr.s6_addr[3]);
   66              let e = ntohs(storage.sin6_addr.s6_addr[4]);
   67              let f = ntohs(storage.sin6_addr.s6_addr[5]);
   68              let g = ntohs(storage.sin6_addr.s6_addr[6]);
   69              let h = ntohs(storage.sin6_addr.s6_addr[7]);
   70              ip::SocketAddr {
   71                  ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
   72                  port: ntohs(storage.sin6_port),
   73              }
   74          }
   75          n => {
   76              fail!("unknown family {}", n);
   77          }
   78      }
   79  }
   80  
   81  fn addr_to_sockaddr(addrip::SocketAddr) -> (libc::sockaddr_storage, uint) {
   82      unsafe {
   83          let mut storagelibc::sockaddr_storage = mem::init();
   84          let len = match addr.ip {
   85              ip::Ipv4Addr(a, b, c, d) => {
   86                  let storage&mut libc::sockaddr_in =
   87                      cast::transmute(&mut storage);
   88                  (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
   89                  (*storage).sin_port = htons(addr.port);
   90                  (*storage).sin_addr = libc::in_addr {
   91                      s_addr: (d as u32 << 24) |
   92                              (c as u32 << 16) |
   93                              (b as u32 <<  8) |
   94                              (a as u32 <<  0)
   95                  };
   96                  mem::size_of::<libc::sockaddr_in>()
   97              }
   98              ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
   99                  let storage&mut libc::sockaddr_in6 =
  100                      cast::transmute(&mut storage);
  101                  storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
  102                  storage.sin6_port = htons(addr.port);
  103                  storage.sin6_addr = libc::in6_addr {
  104                      s6_addr: [
  105                          htons(a),
  106                          htons(b),
  107                          htons(c),
  108                          htons(d),
  109                          htons(e),
  110                          htons(f),
  111                          htons(g),
  112                          htons(h),
  113                      ]
  114                  };
  115                  mem::size_of::<libc::sockaddr_in6>()
  116              }
  117          };
  118          return (storage, len);
  119      }
  120  }
  121  
  122  enum SocketNameKind {
  123      TcpPeer,
  124      Tcp,
  125      Udp
  126  }
  127  
  128  fn socket_name(skSocketNameKind,
  129                 handle: *c_void) -> Result<ip::SocketAddr, IoError> {
  130      let getsockname = match sk {
  131          TcpPeer => uvll::uv_tcp_getpeername,
  132          Tcp     => uvll::uv_tcp_getsockname,
  133          Udp     => uvll::uv_udp_getsockname,
  134      };
  135  
  136      // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
  137      let mut sockaddrlibc::sockaddr_storage = unsafe { mem::init() };
  138      let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
  139  
  140      let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
  141      match unsafe {
  142          getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
  143      } {
  144          0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
  145          n => Err(uv_error_to_io_error(UvError(n)))
  146      }
  147  }
  148  
  149  ////////////////////////////////////////////////////////////////////////////////
  150  /// TCP implementation
  151  ////////////////////////////////////////////////////////////////////////////////
  152  
  153  pub struct TcpWatcher {
  154      handle: *uvll::uv_tcp_t,
  155      stream: StreamWatcher,
  156      home: HomeHandle,
  157      refcount: Refcount,
  158  
  159      // libuv can't support concurrent reads and concurrent writes of the same
  160      // stream object, so we use these access guards in order to arbitrate among
  161      // multiple concurrent reads and writes. Note that libuv *can* read and
  162      // write simultaneously, it just can't read and read simultaneously.
  163      read_access: AccessTimeout,
  164      write_access: AccessTimeout,
  165  }
  166  
  167  pub struct TcpListener {
  168      home: HomeHandle,
  169      handle: *uvll::uv_pipe_t,
  170      closing_task: Option<BlockedTask>,
  171      outgoing: Sender<Result<Box<rtio::RtioTcpStream:Send>, IoError>>,
  172      incoming: Receiver<Result<Box<rtio::RtioTcpStream:Send>, IoError>>,
  173  }
  174  
  175  pub struct TcpAcceptor {
  176      listener: Box<TcpListener>,
  177      timeout: AcceptTimeout,
  178  }
  179  
  180  // TCP watchers (clients/streams)
  181  
  182  impl TcpWatcher {
  183      pub fn new(io&mut UvIoFactory) -> TcpWatcher {
  184          let handle = io.make_handle();
  185          TcpWatcher::new_home(&io.loop_, handle)
  186      }
  187  
  188      fn new_home(loop_&Loop, homeHomeHandle) -> TcpWatcher {
  189          let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
  190          assert_eq!(unsafe {
  191              uvll::uv_tcp_init(loop_.handle, handle)
  192          }, 0);
  193          TcpWatcher {
  194              home: home,
  195              handle: handle,
  196              stream: StreamWatcher::new(handle),
  197              refcount: Refcount::new(),
  198              read_access: AccessTimeout::new(),
  199              write_access: AccessTimeout::new(),
  200          }
  201      }
  202  
  203      pub fn connect(io&mut UvIoFactory,
  204                     addressip::SocketAddr,
  205                     timeoutOption<u64>) -> Result<TcpWatcher, UvError> {
  206          let tcp = TcpWatcher::new(io);
  207          let cx = ConnectCtx { status: -1, task: None, timer: None };
  208          let (addr, _len) = addr_to_sockaddr(address);
  209          let addr_p = &addr as *_ as *libc::sockaddr;
  210          cx.connect(tcp, timeout, io, |req, tcp, cb| {
  211              unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
  212          })
  213      }
  214  }
  215  
  216  impl HomingIO for TcpWatcher {
  217      fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
  218  }
  219  
  220  impl rtio::RtioSocket for TcpWatcher {
  221      fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
  222          let _m = self.fire_homing_missile();
  223          socket_name(Tcp, self.handle)
  224      }
  225  }
  226  
  227  impl rtio::RtioTcpStream for TcpWatcher {
  228      fn read(&mut self, buf&mut [u8]) -> Result<uint, IoError> {
  229          let m = self.fire_homing_missile();
  230          let guard = try!(self.read_access.grant(m));
  231  
  232          // see comments in close_read about this check
  233          if guard.access.is_closed() {
  234              return Err(io::standard_error(io::EndOfFile))
  235          }
  236  
  237          self.stream.read(buf).map_err(uv_error_to_io_error)
  238      }
  239  
  240      fn write(&mut self, buf&[u8]) -> Result<(), IoError> {
  241          let m = self.fire_homing_missile();
  242          let guard = try!(self.write_access.grant(m));
  243          self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
  244      }
  245  
  246      fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
  247          let _m = self.fire_homing_missile();
  248          socket_name(TcpPeer, self.handle)
  249      }
  250  
  251      fn control_congestion(&mut self) -> Result<(), IoError> {
  252          let _m = self.fire_homing_missile();
  253          status_to_io_result(unsafe {
  254              uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
  255          })
  256      }
  257  
  258      fn nodelay(&mut self) -> Result<(), IoError> {
  259          let _m = self.fire_homing_missile();
  260          status_to_io_result(unsafe {
  261              uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
  262          })
  263      }
  264  
  265      fn keepalive(&mut self, delay_in_secondsuint) -> Result<(), IoError> {
  266          let _m = self.fire_homing_missile();
  267          status_to_io_result(unsafe {
  268              uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
  269                                     delay_in_seconds as c_uint)
  270          })
  271      }
  272  
  273      fn letdie(&mut self) -> Result<(), IoError> {
  274          let _m = self.fire_homing_missile();
  275          status_to_io_result(unsafe {
  276              uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
  277          })
  278      }
  279  
  280      fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
  281          box TcpWatcher {
  282              handle: self.handle,
  283              stream: StreamWatcher::new(self.handle),
  284              home: self.home.clone(),
  285              refcount: self.refcount.clone(),
  286              read_access: self.read_access.clone(),
  287              write_access: self.write_access.clone(),
  288          } as Box<rtio::RtioTcpStream:Send>
  289      }
  290  
  291      fn close_read(&mut self) -> Result<(), IoError> {
  292          // see comments in PipeWatcher::close_read
  293          let task = {
  294              let m = self.fire_homing_missile();
  295              self.read_access.access.close(&m);
  296      self.stream.cancel_read(uvll::EOF as libc::ssize_t)
  297          };
  298          let _ = task.map(|t| t.reawaken());
  299          Ok(())
  300      }
  301  
  302      fn close_write(&mut self) -> Result<(), IoError> {
  303          let _m = self.fire_homing_missile();
  304          shutdown(self.handle, &self.uv_loop())
  305      }
  306  
  307      fn set_timeout(&mut self, timeoutOption<u64>) {
  308          self.set_read_timeout(timeout);
  309          self.set_write_timeout(timeout);
  310      }
  311  
  312      fn set_read_timeout(&mut self, msOption<u64>) {
  313          let _m = self.fire_homing_missile();
  314          let loop_ = self.uv_loop();
  315          self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
  316                                       &self.stream as *_ as uint);
  317  
  318          fn cancel_read(streamuint) -> Option<BlockedTask> {
  319              let stream&mut StreamWatcher = unsafe { cast::transmute(stream) };
  320              stream.cancel_read(uvll::ECANCELED as ssize_t)
  321          }
  322      }
  323  
  324      fn set_write_timeout(&mut self, msOption<u64>) {
  325          let _m = self.fire_homing_missile();
  326          let loop_ = self.uv_loop();
  327          self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
  328                                        &self.stream as *_ as uint);
  329  
  330          fn cancel_write(streamuint) -> Option<BlockedTask> {
  331              let stream&mut StreamWatcher = unsafe { cast::transmute(stream) };
  332              stream.cancel_write()
  333          }
  334      }
  335  }
  336  
  337  impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
  338      fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
  339  }
  340  
  341  impl Drop for TcpWatcher {
  342      fn drop(&mut self) {
  343          let _m = self.fire_homing_missile();
  344          if self.refcount.decrement() {
  345              self.close();
  346          }
  347      }
  348  }
  349  
  350  // TCP listeners (unbound servers)
  351  
  352  impl TcpListener {
  353      pub fn bind(io&mut UvIoFactory, addressip::SocketAddr)
  354                  -> Result<Box<TcpListener>, UvError> {
  355          let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
  356          assert_eq!(unsafe {
  357              uvll::uv_tcp_init(io.uv_loop(), handle)
  358          }, 0);
  359          let (tx, rx) = channel();
  360          let l = box TcpListener {
  361              home: io.make_handle(),
  362              handle: handle,
  363              closing_task: None,
  364              outgoing: tx,
  365              incoming: rx,
  366          };
  367          let (addr, _len) = addr_to_sockaddr(address);
  368          let res = unsafe {
  369              let addr_p = &addr as *libc::sockaddr_storage;
  370              uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
  371          };
  372          return match res {
  373              0 => Ok(l.install()),
  374              n => Err(UvError(n))
  375          };
  376      }
  377  }
  378  
  379  impl HomingIO for TcpListener {
  380      fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
  381  }
  382  
  383  impl UvHandle<uvll::uv_tcp_t> for TcpListener {
  384      fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
  385  }
  386  
  387  impl rtio::RtioSocket for TcpListener {
  388      fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
  389          let _m = self.fire_homing_missile();
  390          socket_name(Tcp, self.handle)
  391      }
  392  }
  393  
  394  impl rtio::RtioTcpListener for TcpListener {
  395      fn listen(~self) -> Result<Box<rtio::RtioTcpAcceptor:Send>, IoError> {
  396          // create the acceptor object from ourselves
  397          let mut acceptor = box TcpAcceptor {
  398              listener: self,
  399              timeout: AcceptTimeout::new(),
  400          };
  401  
  402          let _m = acceptor.fire_homing_missile();
  403          // FIXME: the 128 backlog should be configurable
  404          match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
  405              0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor:Send>),
  406              n => Err(uv_error_to_io_error(UvError(n))),
  407          }
  408      }
  409  }
  410  
  411  extern fn listen_cb(server: *uvll::uv_stream_t, statusc_int) {
  412      assert!(status != uvll::ECANCELED);
  413      let tcp&mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
  414      let msg = match status {
  415          0 => {
  416              let loop_ = Loop::wrap(unsafe {
  417                  uvll::get_loop_for_uv_handle(server)
  418              });
  419              let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
  420              assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
  421              Ok(box client as Box<rtio::RtioTcpStream:Send>)
  422          }
  423          n => Err(uv_error_to_io_error(UvError(n)))
  424      };
  425      tcp.outgoing.send(msg);
  426  }
  427  
  428  impl Drop for TcpListener {
  429      fn drop(&mut self) {
  430          let _m = self.fire_homing_missile();
  431          self.close();
  432      }
  433  }
  434  
  435  // TCP acceptors (bound servers)
  436  
  437  impl HomingIO for TcpAcceptor {
  438      fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
  439  }
  440  
  441  impl rtio::RtioSocket for TcpAcceptor {
  442      fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
  443          let _m = self.fire_homing_missile();
  444          socket_name(Tcp, self.listener.handle)
  445      }
  446  }
  447  
  448  impl rtio::RtioTcpAcceptor for TcpAcceptor {
  449      fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream:Send>, IoError> {
  450          self.timeout.accept(&self.listener.incoming)
  451      }
  452  
  453      fn accept_simultaneously(&mut self) -> Result<(), IoError> {
  454          let _m = self.fire_homing_missile();
  455          status_to_io_result(unsafe {
  456              uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
  457          })
  458      }
  459  
  460      fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
  461          let _m = self.fire_homing_missile();
  462          status_to_io_result(unsafe {
  463              uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
  464          })
  465      }
  466  
  467      fn set_timeout(&mut self, msOption<u64>) {
  468          let _m = self.fire_homing_missile();
  469          match ms {
  470              None => self.timeout.clear(),
  471              Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
  472          }
  473      }
  474  }
  475  
  476  ////////////////////////////////////////////////////////////////////////////////
  477  /// UDP implementation
  478  ////////////////////////////////////////////////////////////////////////////////
  479  
  480  pub struct UdpWatcher {
  481      handle: *uvll::uv_udp_t,
  482      home: HomeHandle,
  483  
  484      // See above for what these fields are
  485      refcount: Refcount,
  486      read_access: AccessTimeout,
  487      write_access: AccessTimeout,
  488  
  489      blocked_sender: Option<BlockedTask>,
  490  }
  491  
  492  struct UdpRecvCtx {
  493      task: Option<BlockedTask>,
  494      buf: Option<Buf>,
  495      result: Option<(ssize_t, Option<ip::SocketAddr>)>,
  496  }
  497  
  498  struct UdpSendCtx {
  499      result: c_int,
  500      data: Option<Vec<u8>>,
  501      udp: *mut UdpWatcher,
  502  }
  503  
  504  impl UdpWatcher {
  505      pub fn bind(io&mut UvIoFactory, addressip::SocketAddr)
  506                  -> Result<UdpWatcher, UvError> {
  507          let udp = UdpWatcher {
  508              handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
  509              home: io.make_handle(),
  510              refcount: Refcount::new(),
  511              read_access: AccessTimeout::new(),
  512              write_access: AccessTimeout::new(),
  513              blocked_sender: None,
  514          };
  515          assert_eq!(unsafe {
  516              uvll::uv_udp_init(io.uv_loop(), udp.handle)
  517          }, 0);
  518          let (addr, _len) = addr_to_sockaddr(address);
  519          let result = unsafe {
  520              let addr_p = &addr as *libc::sockaddr_storage;
  521              uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
  522          };
  523          return match result {
  524              0 => Ok(udp),
  525              n => Err(UvError(n)),
  526          };
  527      }
  528  }
  529  
  530  impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
  531      fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
  532  }
  533  
  534  impl HomingIO for UdpWatcher {
  535      fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
  536  }
  537  
  538  impl rtio::RtioSocket for UdpWatcher {
  539      fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
  540          let _m = self.fire_homing_missile();
  541          socket_name(Udp, self.handle)
  542      }
  543  }
  544  
  545  impl rtio::RtioUdpSocket for UdpWatcher {
  546      fn recvfrom(&mut self, buf&mut [u8])
  547          -> Result<(uint, ip::SocketAddr), IoError>
  548      {
  549          let loop_ = self.uv_loop();
  550          let m = self.fire_homing_missile();
  551          let _guard = try!(self.read_access.grant(m));
  552  
  553          return match unsafe {
  554              uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
  555          } {
  556              0 => {
  557                  let mut cx = UdpRecvCtx {
  558                      task: None,
  559                      buf: Some(slice_to_uv_buf(buf)),
  560                      result: None,
  561                  };
  562                  let handle = self.handle;
  563                  wait_until_woken_after(&mut cx.task, &loop_, || {
  564                      unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
  565                  });
  566                  match cx.result.take_unwrap() {
  567                      (n, _) if n < 0 =>
  568                          Err(uv_error_to_io_error(UvError(n as c_int))),
  569                      (n, addr) => Ok((n as uint, addr.unwrap()))
  570                  }
  571              }
  572              n => Err(uv_error_to_io_error(UvError(n)))
  573          };
  574  
  575          extern fn alloc_cb(handle*uvll::uv_udp_t,
  576                             _suggested_sizesize_t,
  577                             buf*mut Buf) {
  578              unsafe {
  579                  let cx = uvll::get_data_for_uv_handle(handle);
  580                  let cx = &mut *(cx as *mut UdpRecvCtx);
  581                  *buf = cx.buf.take().expect("recv alloc_cb called more than once")
  582              }
  583          }
  584  
  585          extern fn recv_cb(handle*uvll::uv_udp_t, nreadssize_t, buf*Buf,
  586                            addr*libc::sockaddr, _flagsc_uint) {
  587              assert!(nread != uvll::ECANCELED as ssize_t);
  588              let cx = unsafe {
  589                  &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
  590              };
  591  
  592              // When there's no data to read the recv callback can be a no-op.
  593              // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
  594              // this we just drop back to kqueue and wait for the next callback.
  595              if nread == 0 {
  596                  cx.buf = Some(unsafe { *buf });
  597                  return
  598              }
  599  
  600              unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) }
  601              let addr = if addr == ptr::null() {
  602                  None
  603              } else {
  604                  let len = mem::size_of::<libc::sockaddr_storage>();
  605                  Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
  606              };
  607              cx.result = Some((nread, addr));
  608              wakeup(&mut cx.task);
  609          }
  610      }
  611  
  612      fn sendto(&mut self, buf&[u8], dstip::SocketAddr) -> Result<(), IoError> {
  613          let m = self.fire_homing_missile();
  614          let loop_ = self.uv_loop();
  615          let guard = try!(self.write_access.grant(m));
  616  
  617          let mut req = Request::new(uvll::UV_UDP_SEND);
  618          let (addr, _len) = addr_to_sockaddr(dst);
  619          let addr_p = &addr as *_ as *libc::sockaddr;
  620  
  621          // see comments in StreamWatcher::write for why we may allocate a buffer
  622          // here.
  623          let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None};
  624          let uv_buf = if guard.can_timeout {
  625              slice_to_uv_buf(data.get_ref().as_slice())
  626          } else {
  627              slice_to_uv_buf(buf)
  628          };
  629  
  630          return match unsafe {
  631              uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb)
  632          } {
  633              0 => {
  634                  req.defuse(); // uv callback now owns this request
  635                  let mut cx = UdpSendCtx {
  636                      result: uvll::ECANCELED, data: data, udp: self as *mut _
  637                  };
  638                  wait_until_woken_after(&mut self.blocked_sender, &loop_, || {
  639                      req.set_data(&cx);
  640                  });
  641  
  642                  if cx.result != uvll::ECANCELED {
  643                      return match cx.result {
  644                          0 => Ok(()),
  645                          n => Err(uv_error_to_io_error(UvError(n)))
  646                      }
  647                  }
  648                  let new_cx = box UdpSendCtx {
  649                      result: 0,
  650                      udp: 0 as *mut UdpWatcher,
  651                      data: cx.data.take(),
  652                  };
  653                  unsafe {
  654                      req.set_data(&*new_cx);
  655                      cast::forget(new_cx);
  656                  }
  657                  Err(uv_error_to_io_error(UvError(cx.result)))
  658              }
  659              n => Err(uv_error_to_io_error(UvError(n)))
  660          };
  661  
  662          // This function is the same as stream::write_cb, but adapted for udp
  663          // instead of streams.
  664          extern fn send_cb(req*uvll::uv_udp_send_t, statusc_int) {
  665              let req = Request::wrap(req);
  666              let cx&mut UdpSendCtx = unsafe { req.get_data() };
  667              cx.result = status;
  668  
  669              if cx.udp as uint != 0 {
  670                  let udp&mut UdpWatcher = unsafe { &mut *cx.udp };
  671                  wakeup(&mut udp.blocked_sender);
  672              } else {
  673                  let _cxBox<UdpSendCtx> = unsafe { cast::transmute(cx) };
  674              }
  675          }
  676      }
  677  
  678      fn join_multicast(&mut self, multiip::IpAddr) -> Result<(), IoError> {
  679          let _m = self.fire_homing_missile();
  680          status_to_io_result(unsafe {
  681              multi.to_str().with_c_str(|m_addr| {
  682                  uvll::uv_udp_set_membership(self.handle,
  683                                              m_addr, ptr::null(),
  684                                              uvll::UV_JOIN_GROUP)
  685              })
  686          })
  687      }
  688  
  689      fn leave_multicast(&mut self, multiip::IpAddr) -> Result<(), IoError> {
  690          let _m = self.fire_homing_missile();
  691          status_to_io_result(unsafe {
  692              multi.to_str().with_c_str(|m_addr| {
  693                  uvll::uv_udp_set_membership(self.handle,
  694                                              m_addr, ptr::null(),
  695                                              uvll::UV_LEAVE_GROUP)
  696              })
  697          })
  698      }
  699  
  700      fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
  701          let _m = self.fire_homing_missile();
  702          status_to_io_result(unsafe {
  703              uvll::uv_udp_set_multicast_loop(self.handle,
  704                                              1 as c_int)
  705          })
  706      }
  707  
  708      fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
  709          let _m = self.fire_homing_missile();
  710          status_to_io_result(unsafe {
  711              uvll::uv_udp_set_multicast_loop(self.handle,
  712                                              0 as c_int)
  713          })
  714      }
  715  
  716      fn multicast_time_to_live(&mut self, ttlint) -> Result<(), IoError> {
  717          let _m = self.fire_homing_missile();
  718          status_to_io_result(unsafe {
  719              uvll::uv_udp_set_multicast_ttl(self.handle,
  720                                             ttl as c_int)
  721          })
  722      }
  723  
  724      fn time_to_live(&mut self, ttlint) -> Result<(), IoError> {
  725          let _m = self.fire_homing_missile();
  726          status_to_io_result(unsafe {
  727              uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
  728          })
  729      }
  730  
  731      fn hear_broadcasts(&mut self) -> Result<(), IoError> {
  732          let _m = self.fire_homing_missile();
  733          status_to_io_result(unsafe {
  734              uvll::uv_udp_set_broadcast(self.handle,
  735                                         1 as c_int)
  736          })
  737      }
  738  
  739      fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
  740          let _m = self.fire_homing_missile();
  741          status_to_io_result(unsafe {
  742              uvll::uv_udp_set_broadcast(self.handle,
  743                                         0 as c_int)
  744          })
  745      }
  746  
  747      fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
  748          box UdpWatcher {
  749              handle: self.handle,
  750              home: self.home.clone(),
  751              refcount: self.refcount.clone(),
  752              write_access: self.write_access.clone(),
  753              read_access: self.read_access.clone(),
  754              blocked_sender: None,
  755          } as Box<rtio::RtioUdpSocket:Send>
  756      }
  757  
  758      fn set_timeout(&mut self, timeoutOption<u64>) {
  759          self.set_read_timeout(timeout);
  760          self.set_write_timeout(timeout);
  761      }
  762  
  763      fn set_read_timeout(&mut self, msOption<u64>) {
  764          let _m = self.fire_homing_missile();
  765          let loop_ = self.uv_loop();
  766          self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
  767                                       self.handle as uint);
  768  
  769          fn cancel_read(streamuint) -> Option<BlockedTask> {
  770              // This method is quite similar to StreamWatcher::cancel_read, see
  771              // there for more information
  772              let handle = stream as *uvll::uv_udp_t;
  773              assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0);
  774              let data = unsafe {
  775                  let data = uvll::get_data_for_uv_handle(handle);
  776                  if data.is_null() { return None }
  777                  uvll::set_data_for_uv_handle(handle, 0 as *int);
  778                  &mut *(data as *mut UdpRecvCtx)
  779              };
  780              data.result = Some((uvll::ECANCELED as ssize_t, None));
  781              data.task.take()
  782          }
  783      }
  784  
  785      fn set_write_timeout(&mut self, msOption<u64>) {
  786          let _m = self.fire_homing_missile();
  787          let loop_ = self.uv_loop();
  788          self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
  789                                        self as *mut _ as uint);
  790  
  791          fn cancel_write(streamuint) -> Option<BlockedTask> {
  792              let stream&mut UdpWatcher = unsafe { cast::transmute(stream) };
  793              stream.blocked_sender.take()
  794          }
  795      }
  796  }
  797  
  798  impl Drop for UdpWatcher {
  799      fn drop(&mut self) {
  800          // Send ourselves home to close this handle (blocking while doing so).
  801          let _m = self.fire_homing_missile();
  802          if self.refcount.decrement() {
  803              self.close();
  804          }
  805      }
  806  }
  807  
  808  ////////////////////////////////////////////////////////////////////////////////
  809  // Shutdown helper
  810  ////////////////////////////////////////////////////////////////////////////////
  811  
  812  pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
  813      struct Ctx {
  814          slot: Option<BlockedTask>,
  815          status: c_int,
  816      }
  817      let mut req = Request::new(uvll::UV_SHUTDOWN);
  818  
  819      return match unsafe { uvll::uv_shutdown(req.handle, handle, shutdown_cb) } {
  820          0 => {
  821              req.defuse(); // uv callback now owns this request
  822              let mut cx = Ctx { slot: None, status: 0 };
  823  
  824              wait_until_woken_after(&mut cx.slot, loop_, || {
  825                  req.set_data(&cx);
  826              });
  827  
  828              status_to_io_result(cx.status)
  829          }
  830          n => Err(uv_error_to_io_error(UvError(n)))
  831      };
  832  
  833      extern fn shutdown_cb(req: *uvll::uv_shutdown_t, statuslibc::c_int) {
  834          let req = Request::wrap(req);
  835          assert!(status != uvll::ECANCELED);
  836          let cx&mut Ctx = unsafe { req.get_data() };
  837          cx.status = status;
  838          wakeup(&mut cx.slot);
  839      }
  840  }
  841  
  842  #[cfg(test)]
  843  mod test {
  844      use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
  845                          RtioUdpSocket};
  846      use std::io::test::{next_test_ip4, next_test_ip6};
  847  
  848      use super::{UdpWatcher, TcpWatcher, TcpListener};
  849      use super::super::local_loop;
  850  
  851      #[test]
  852      fn connect_close_ip4() {
  853          match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
  854              Ok(..) => fail!(),
  855              Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
  856          }
  857      }
  858  
  859      #[test]
  860      fn connect_close_ip6() {
  861          match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
  862              Ok(..) => fail!(),
  863              Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
  864          }
  865      }
  866  
  867      #[test]
  868      fn udp_bind_close_ip4() {
  869          match UdpWatcher::bind(local_loop(), next_test_ip4()) {
  870              Ok(..) => {}
  871              Err(..) => fail!()
  872          }
  873      }
  874  
  875      #[test]
  876      fn udp_bind_close_ip6() {
  877          match UdpWatcher::bind(local_loop(), next_test_ip6()) {
  878              Ok(..) => {}
  879              Err(..) => fail!()
  880          }
  881      }
  882  
  883      #[test]
  884      fn listen_ip4() {
  885          let (tx, rx) = channel();
  886          let addr = next_test_ip4();
  887  
  888          spawn(proc() {
  889              let w = match TcpListener::bind(local_loop(), addr) {
  890                  Ok(w) => w, Err(e) => fail!("{:?}", e)
  891              };
  892              let mut w = match w.listen() {
  893                  Ok(w) => w, Err(e) => fail!("{:?}", e),
  894              };
  895              tx.send(());
  896              match w.accept() {
  897                  Ok(mut stream) => {
  898                      let mut buf = [0u8, ..10];
  899                      match stream.read(buf) {
  900                          Ok(10) => {} e => fail!("{:?}", e),
  901                      }
  902                      for i in range(0, 10u8) {
  903                          assert_eq!(buf[i as uint], i + 1);
  904                      }
  905                  }
  906                  Err(e) => fail!("{:?}", e)
  907              }
  908          });
  909  
  910          rx.recv();
  911          let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
  912              Ok(w) => w, Err(e) => fail!("{:?}", e)
  913          };
  914          match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
  915              Ok(()) => {}, Err(e) => fail!("{:?}", e)
  916          }
  917      }
  918  
  919      #[test]
  920      fn listen_ip6() {
  921          let (tx, rx) = channel();
  922          let addr = next_test_ip6();
  923  
  924          spawn(proc() {
  925              let w = match TcpListener::bind(local_loop(), addr) {
  926                  Ok(w) => w, Err(e) => fail!("{:?}", e)
  927              };
  928              let mut w = match w.listen() {
  929                  Ok(w) => w, Err(e) => fail!("{:?}", e),
  930              };
  931              tx.send(());
  932              match w.accept() {
  933                  Ok(mut stream) => {
  934                      let mut buf = [0u8, ..10];
  935                      match stream.read(buf) {
  936                          Ok(10) => {} e => fail!("{:?}", e),
  937                      }
  938                      for i in range(0, 10u8) {
  939                          assert_eq!(buf[i as uint], i + 1);
  940                      }
  941                  }
  942                  Err(e) => fail!("{:?}", e)
  943              }
  944          });
  945  
  946          rx.recv();
  947          let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
  948              Ok(w) => w, Err(e) => fail!("{:?}", e)
  949          };
  950          match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
  951              Ok(()) => {}, Err(e) => fail!("{:?}", e)
  952          }
  953      }
  954  
  955      #[test]
  956      fn udp_recv_ip4() {
  957          let (tx, rx) = channel();
  958          let client = next_test_ip4();
  959          let server = next_test_ip4();
  960  
  961          spawn(proc() {
  962              match UdpWatcher::bind(local_loop(), server) {
  963                  Ok(mut w) => {
  964                      tx.send(());
  965                      let mut buf = [0u8, ..10];
  966                      match w.recvfrom(buf) {
  967                          Ok((10, addr)) => assert_eq!(addr, client),
  968                          e => fail!("{:?}", e),
  969                      }
  970                      for i in range(0, 10u8) {
  971                          assert_eq!(buf[i as uint], i + 1);
  972                      }
  973                  }
  974                  Err(e) => fail!("{:?}", e)
  975              }
  976          });
  977  
  978          rx.recv();
  979          let mut w = match UdpWatcher::bind(local_loop(), client) {
  980              Ok(w) => w, Err(e) => fail!("{:?}", e)
  981          };
  982          match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
  983              Ok(()) => {}, Err(e) => fail!("{:?}", e)
  984          }
  985      }
  986  
  987      #[test]
  988      fn udp_recv_ip6() {
  989          let (tx, rx) = channel();
  990          let client = next_test_ip6();
  991          let server = next_test_ip6();
  992  
  993          spawn(proc() {
  994              match UdpWatcher::bind(local_loop(), server) {
  995                  Ok(mut w) => {
  996                      tx.send(());
  997                      let mut buf = [0u8, ..10];
  998                      match w.recvfrom(buf) {
  999                          Ok((10, addr)) => assert_eq!(addr, client),
 1000                          e => fail!("{:?}", e),
 1001                      }
 1002                      for i in range(0, 10u8) {
 1003                          assert_eq!(buf[i as uint], i + 1);
 1004                      }
 1005                  }
 1006                  Err(e) => fail!("{:?}", e)
 1007              }
 1008          });
 1009  
 1010          rx.recv();
 1011          let mut w = match UdpWatcher::bind(local_loop(), client) {
 1012              Ok(w) => w, Err(e) => fail!("{:?}", e)
 1013          };
 1014          match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
 1015              Ok(()) => {}, Err(e) => fail!("{:?}", e)
 1016          }
 1017      }
 1018  
 1019      #[test]
 1020      fn test_read_read_read() {
 1021          let addr = next_test_ip4();
 1022          static MAX: uint = 5000;
 1023          let (tx, rx) = channel();
 1024  
 1025          spawn(proc() {
 1026              let listener = TcpListener::bind(local_loop(), addr).unwrap();
 1027              let mut acceptor = listener.listen().unwrap();
 1028              tx.send(());
 1029              let mut stream = acceptor.accept().unwrap();
 1030              let buf = [1, .. 2048];
 1031              let mut total_bytes_written = 0;
 1032              while total_bytes_written < MAX {
 1033                  assert!(stream.write(buf).is_ok());
 1034                  uvdebug!("wrote bytes");
 1035                  total_bytes_written += buf.len();
 1036              }
 1037          });
 1038  
 1039          rx.recv();
 1040          let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
 1041          let mut buf = [0, .. 2048];
 1042          let mut total_bytes_read = 0;
 1043          while total_bytes_read < MAX {
 1044              let nread = stream.read(buf).unwrap();
 1045              total_bytes_read += nread;
 1046              for i in range(0u, nread) {
 1047                  assert_eq!(buf[i], 1);
 1048              }
 1049          }
 1050          uvdebug!("read {} bytes total", total_bytes_read);
 1051      }
 1052  
 1053      #[test]
 1054      #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
 1055      fn test_udp_twice() {
 1056          let server_addr = next_test_ip4();
 1057          let client_addr = next_test_ip4();
 1058          let (tx, rx) = channel();
 1059  
 1060          spawn(proc() {
 1061              let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
 1062              rx.recv();
 1063              assert!(client.sendto([1], server_addr).is_ok());
 1064              assert!(client.sendto([2], server_addr).is_ok());
 1065          });
 1066  
 1067          let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
 1068          tx.send(());
 1069          let mut buf1 = [0];
 1070          let mut buf2 = [0];
 1071          let (nread1, src1) = server.recvfrom(buf1).unwrap();
 1072          let (nread2, src2) = server.recvfrom(buf2).unwrap();
 1073          assert_eq!(nread1, 1);
 1074          assert_eq!(nread2, 1);
 1075          assert_eq!(src1, client_addr);
 1076          assert_eq!(src2, client_addr);
 1077          assert_eq!(buf1[0], 1);
 1078          assert_eq!(buf2[0], 2);
 1079      }
 1080  
 1081      #[test]
 1082      fn test_udp_many_read() {
 1083          let server_out_addr = next_test_ip4();
 1084          let server_in_addr = next_test_ip4();
 1085          let client_out_addr = next_test_ip4();
 1086          let client_in_addr = next_test_ip4();
 1087          static MAX: uint = 500_000;
 1088  
 1089          let (tx1, rx1) = channel::<()>();
 1090          let (tx2, rx2) = channel::<()>();
 1091  
 1092          spawn(proc() {
 1093              let l = local_loop();
 1094              let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
 1095              let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
 1096              let (tx, rx) = (tx2, rx1);
 1097              tx.send(());
 1098              rx.recv();
 1099              let msg = [1, .. 2048];
 1100              let mut total_bytes_sent = 0;
 1101              let mut buf = [1];
 1102              while buf[0] == 1 {
 1103                  // send more data
 1104                  assert!(server_out.sendto(msg, client_in_addr).is_ok());
 1105                  total_bytes_sent += msg.len();
 1106                  // check if the client has received enough
 1107                  let res = server_in.recvfrom(buf);
 1108                  assert!(res.is_ok());
 1109                  let (nread, src) = res.unwrap();
 1110                  assert_eq!(nread, 1);
 1111                  assert_eq!(src, client_out_addr);
 1112              }
 1113              assert!(total_bytes_sent >= MAX);
 1114          });
 1115  
 1116          let l = local_loop();
 1117          let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
 1118          let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
 1119          let (tx, rx) = (tx1, rx2);
 1120          rx.recv();
 1121          tx.send(());
 1122          let mut total_bytes_recv = 0;
 1123          let mut buf = [0, .. 2048];
 1124          while total_bytes_recv < MAX {
 1125              // ask for more
 1126              assert!(client_out.sendto([1], server_in_addr).is_ok());
 1127              // wait for data
 1128              let res = client_in.recvfrom(buf);
 1129              assert!(res.is_ok());
 1130              let (nread, src) = res.unwrap();
 1131              assert_eq!(src, server_out_addr);
 1132              total_bytes_recv += nread;
 1133              for i in range(0u, nread) {
 1134                  assert_eq!(buf[i], 1);
 1135              }
 1136          }
 1137          // tell the server we're done
 1138          assert!(client_out.sendto([0], server_in_addr).is_ok());
 1139      }
 1140  
 1141      #[test]
 1142      fn test_read_and_block() {
 1143          let addr = next_test_ip4();
 1144          let (tx, rx) = channel::<Receiver<()>>();
 1145  
 1146          spawn(proc() {
 1147              let rx = rx.recv();
 1148              let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
 1149              stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
 1150              stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
 1151              rx.recv();
 1152              stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
 1153              stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
 1154              rx.recv();
 1155          });
 1156  
 1157          let listener = TcpListener::bind(local_loop(), addr).unwrap();
 1158          let mut acceptor = listener.listen().unwrap();
 1159          let (tx2, rx2) = channel();
 1160          tx.send(rx2);
 1161          let mut stream = acceptor.accept().unwrap();
 1162          let mut buf = [0, .. 2048];
 1163  
 1164          let expected = 32;
 1165          let mut current = 0;
 1166          let mut reads = 0;
 1167  
 1168          while current < expected {
 1169              let nread = stream.read(buf).unwrap();
 1170              for i in range(0u, nread) {
 1171                  let val = buf[i] as uint;
 1172                  assert_eq!(val, current % 8);
 1173                  current += 1;
 1174              }
 1175              reads += 1;
 1176  
 1177              let _ = tx2.send_opt(());
 1178          }
 1179  
 1180          // Make sure we had multiple reads
 1181          assert!(reads > 1);
 1182      }
 1183  
 1184      #[test]
 1185      fn test_simple_tcp_server_and_client_on_diff_threads() {
 1186          let addr = next_test_ip4();
 1187  
 1188          spawn(proc() {
 1189              let listener = TcpListener::bind(local_loop(), addr).unwrap();
 1190              let mut acceptor = listener.listen().unwrap();
 1191              let mut stream = acceptor.accept().unwrap();
 1192              let mut buf = [0, .. 2048];
 1193              let nread = stream.read(buf).unwrap();
 1194              assert_eq!(nread, 8);
 1195              for i in range(0u, nread) {
 1196                  assert_eq!(buf[i], i as u8);
 1197              }
 1198          });
 1199  
 1200          let mut stream = TcpWatcher::connect(local_loop(), addr, None);
 1201          while stream.is_err() {
 1202              stream = TcpWatcher::connect(local_loop(), addr, None);
 1203          }
 1204          stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
 1205      }
 1206  
 1207      #[should_fail] #[test]
 1208      fn tcp_listener_fail_cleanup() {
 1209          let addr = next_test_ip4();
 1210          let w = TcpListener::bind(local_loop(), addr).unwrap();
 1211          let _w = w.listen().unwrap();
 1212          fail!();
 1213      }
 1214  
 1215      #[should_fail] #[test]
 1216      fn tcp_stream_fail_cleanup() {
 1217          let (tx, rx) = channel();
 1218          let addr = next_test_ip4();
 1219  
 1220          spawn(proc() {
 1221              let w = TcpListener::bind(local_loop(), addr).unwrap();
 1222              let mut w = w.listen().unwrap();
 1223              tx.send(());
 1224              drop(w.accept().unwrap());
 1225          });
 1226          rx.recv();
 1227          let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
 1228          fail!();
 1229      }
 1230  
 1231      #[should_fail] #[test]
 1232      fn udp_listener_fail_cleanup() {
 1233          let addr = next_test_ip4();
 1234          let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
 1235          fail!();
 1236      }
 1237  
 1238      #[should_fail] #[test]
 1239      fn udp_fail_other_task() {
 1240          let addr = next_test_ip4();
 1241          let (tx, rx) = channel();
 1242  
 1243          // force the handle to be created on a different scheduler, failure in
 1244          // the original task will force a homing operation back to this
 1245          // scheduler.
 1246          spawn(proc() {
 1247              let w = UdpWatcher::bind(local_loop(), addr).unwrap();
 1248              tx.send(w);
 1249          });
 1250  
 1251          let _w = rx.recv();
 1252          fail!();
 1253      }
 1254  }


librustuv/net.rs:491:1-491:1 -struct- definition:
struct UdpRecvCtx {
    task: Option<BlockedTask>,
    buf: Option<Buf>,
references:- 4
556:             0 => {
557:                 let mut cx = UdpRecvCtx {
558:                     task: None,
--
588:             let cx = unsafe {
589:                 &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
590:             };
--
777:                 uvll::set_data_for_uv_handle(handle, 0 as *int);
778:                 &mut *(data as *mut UdpRecvCtx)
779:             };


librustuv/net.rs:127:1-127:1 -fn- definition:
fn socket_name(sk: SocketNameKind,
               handle: *c_void) -> Result<ip::SocketAddr, IoError> {
    let getsockname = match sk {
references:- 5
389:         let _m = self.fire_homing_missile();
390:         socket_name(Tcp, self.handle)
391:     }
--
443:         let _m = self.fire_homing_missile();
444:         socket_name(Tcp, self.listener.handle)
445:     }
--
540:         let _m = self.fire_homing_missile();
541:         socket_name(Udp, self.handle)
542:     }


librustuv/net.rs:174:1-174:1 -struct- definition:
pub struct TcpAcceptor {
    listener: Box<TcpListener>,
    timeout: AcceptTimeout,
references:- 4
437: impl HomingIO for TcpAcceptor {
438:     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
--
448: impl rtio::RtioTcpAcceptor for TcpAcceptor {
449:     fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream:Send>, IoError> {


librustuv/net.rs:38:1-38:1 -fn- definition:
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
                        len: uint) -> ip::SocketAddr {
    match storage.ss_family as c_int {
references:- 3
143:     } {
144:         0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
145:         n => Err(uv_error_to_io_error(UvError(n)))
librustuv/addrinfo.rs:
142:         loop {
143:             let rustaddr = net::sockaddr_to_addr(cast::transmute((*addr).ai_addr),
144:                                                  (*addr).ai_addrlen as uint);
librustuv/net.rs:
604:                 let len = mem::size_of::<libc::sockaddr_storage>();
605:                 Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
606:             };


librustuv/net.rs:35:1-35:1 -fn- definition:
pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
references:- 10
111:                         htons(g),
112:                         htons(h),
113:                     ]


librustuv/net.rs:166:1-166:1 -struct- definition:
pub struct TcpListener {
    home: HomeHandle,
    handle: *uvll::uv_pipe_t,
references:- 10
359:         let (tx, rx) = channel();
360:         let l = box TcpListener {
361:             home: io.make_handle(),
--
383: impl UvHandle<uvll::uv_tcp_t> for TcpListener {
384:     fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
--
412:     assert!(status != uvll::ECANCELED);
413:     let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
414:     let msg = match status {
--
428: impl Drop for TcpListener {
429:     fn drop(&mut self) {


librustuv/net.rs:813:4-813:4 -struct- definition:
    struct Ctx {
        slot: Option<BlockedTask>,
        status: c_int,
references:- 2
821:             req.defuse(); // uv callback now owns this request
822:             let mut cx = Ctx { slot: None, status: 0 };
--
835:         assert!(status != uvll::ECANCELED);
836:         let cx: &mut Ctx = unsafe { req.get_data() };
837:         cx.status = status;


librustuv/net.rs:152:1-152:1 -struct- definition:
pub struct TcpWatcher {
    handle: *uvll::uv_tcp_t,
    stream: StreamWatcher,
references:- 11
192:         }, 0);
193:         TcpWatcher {
194:             home: home,
--
341: impl Drop for TcpWatcher {
342:     fn drop(&mut self) {


librustuv/net.rs:36:48-36:48 -fn- definition:
pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
references:- 10
68:             let g = ntohs(storage.sin6_addr.s6_addr[6]);
69:             let h = ntohs(storage.sin6_addr.s6_addr[7]);
70:             ip::SocketAddr {
71:                 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
72:                 port: ntohs(storage.sin6_port),
73:             }


librustuv/net.rs:497:1-497:1 -struct- definition:
struct UdpSendCtx {
    result: c_int,
    data: Option<Vec<u8>>,
references:- 4
634:                 req.defuse(); // uv callback now owns this request
635:                 let mut cx = UdpSendCtx {
636:                     result: uvll::ECANCELED, data: data, udp: self as *mut _
--
647:                 }
648:                 let new_cx = box UdpSendCtx {
649:                     result: 0,
--
665:             let req = Request::wrap(req);
666:             let cx: &mut UdpSendCtx = unsafe { req.get_data() };
667:             cx.result = status;
--
672:             } else {
673:                 let _cx: Box<UdpSendCtx> = unsafe { cast::transmute(cx) };
674:             }


librustuv/net.rs:811:1-811:1 -fn- definition:
pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
    struct Ctx {
        slot: Option<BlockedTask>,
references:- 2
303:         let _m = self.fire_homing_missile();
304:         shutdown(self.handle, &self.uv_loop())
305:     }
librustuv/pipe.rs:
172:         let _m = self.fire_homing_missile();
173:         net::shutdown(self.stream.handle, &self.uv_loop())
174:     }


librustuv/net.rs:479:1-479:1 -struct- definition:
pub struct UdpWatcher {
    handle: *uvll::uv_udp_t,
    home: HomeHandle,
references:- 13
506:                 -> Result<UdpWatcher, UvError> {
507:         let udp = UdpWatcher {
508:             handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
--
747:     fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
748:         box UdpWatcher {
749:             handle: self.handle,
--
791:         fn cancel_write(stream: uint) -> Option<BlockedTask> {
792:             let stream: &mut UdpWatcher = unsafe { cast::transmute(stream) };
793:             stream.blocked_sender.take()
--
798: impl Drop for UdpWatcher {
799:     fn drop(&mut self) {


librustuv/net.rs:80:1-80:1 -fn- definition:
fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
    unsafe {
        let mut storage: libc::sockaddr_storage = mem::init();
references:- 4
207:         let cx = ConnectCtx { status: -1, task: None, timer: None };
208:         let (addr, _len) = addr_to_sockaddr(address);
209:         let addr_p = &addr as *_ as *libc::sockaddr;
--
366:         };
367:         let (addr, _len) = addr_to_sockaddr(address);
368:         let res = unsafe {
--
517:         }, 0);
518:         let (addr, _len) = addr_to_sockaddr(address);
519:         let result = unsafe {
--
617:         let mut req = Request::new(uvll::UV_UDP_SEND);
618:         let (addr, _len) = addr_to_sockaddr(dst);
619:         let addr_p = &addr as *_ as *libc::sockaddr;