(index<- )        ./libnative/io/net.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  use libc;
  12  use std::cast;
  13  use std::io::net::ip;
  14  use std::io;
  15  use std::mem;
  16  use std::rt::rtio;
  17  use std::sync::arc::UnsafeArc;
  18  use std::unstable::mutex;
  19  
  20  use super::{IoResult, retry, keep_going};
  21  use super::c;
  22  use super::util;
  23  
  24  ////////////////////////////////////////////////////////////////////////////////
  25  // sockaddr and misc bindings
  26  ////////////////////////////////////////////////////////////////////////////////
  27  
  28  #[cfg(windows)] pub type sock_t = libc::SOCKET;
  29  #[cfg(unix)]    pub type sock_t = super::file::fd_t;
  30  
  31  pub fn htons(u: u16) -> u16 {
  32      mem::to_be16(u)
  33  }
  34  pub fn ntohs(u: u16) -> u16 {
  35      mem::from_be16(u)
  36  }
  37  
  38  enum InAddr {
  39      InAddr(libc::in_addr),
  40      In6Addr(libc::in6_addr),
  41  }
  42  
  43  fn ip_to_inaddr(ipip::IpAddr) -> InAddr {
  44      match ip {
  45          ip::Ipv4Addr(a, b, c, d) => {
  46              InAddr(libc::in_addr {
  47                  s_addr: (d as u32 << 24) |
  48                          (c as u32 << 16) |
  49                          (b as u32 <<  8) |
  50                          (a as u32 <<  0)
  51              })
  52          }
  53          ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
  54              In6Addr(libc::in6_addr {
  55                  s6_addr: [
  56                      htons(a),
  57                      htons(b),
  58                      htons(c),
  59                      htons(d),
  60                      htons(e),
  61                      htons(f),
  62                      htons(g),
  63                      htons(h),
  64                  ]
  65              })
  66          }
  67      }
  68  }
  69  
  70  fn addr_to_sockaddr(addrip::SocketAddr) -> (libc::sockaddr_storage, uint) {
  71      unsafe {
  72          let storagelibc::sockaddr_storage = mem::init();
  73          let len = match ip_to_inaddr(addr.ip) {
  74              InAddr(inaddr) => {
  75                  let storage*mut libc::sockaddr_in = cast::transmute(&storage);
  76                  (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
  77                  (*storage).sin_port = htons(addr.port);
  78                  (*storage).sin_addr = inaddr;
  79                  mem::size_of::<libc::sockaddr_in>()
  80              }
  81              In6Addr(inaddr) => {
  82                  let storage*mut libc::sockaddr_in6 = cast::transmute(&storage);
  83                  (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
  84                  (*storage).sin6_port = htons(addr.port);
  85                  (*storage).sin6_addr = inaddr;
  86                  mem::size_of::<libc::sockaddr_in6>()
  87              }
  88          };
  89          return (storage, len);
  90      }
  91  }
  92  
  93  fn socket(addrip::SocketAddr, tylibc::c_int) -> IoResult<sock_t> {
  94      unsafe {
  95          let fam = match addr.ip {
  96              ip::Ipv4Addr(..) => libc::AF_INET,
  97              ip::Ipv6Addr(..) => libc::AF_INET6,
  98          };
  99          match libc::socket(fam, ty, 0) {
 100              -1 => Err(super::last_error()),
 101              fd => Ok(fd),
 102          }
 103      }
 104  }
 105  
 106  fn setsockopt<T>(fdsock_t, optlibc::c_int, vallibc::c_int,
 107                   payloadT) -> IoResult<()> {
 108      unsafe {
 109          let payload = &payload as *T as *libc::c_void;
 110          let ret = libc::setsockopt(fd, opt, val,
 111                                     payload,
 112                                     mem::size_of::<T>() as libc::socklen_t);
 113          if ret != 0 {
 114              Err(last_error())
 115          } else {
 116              Ok(())
 117          }
 118      }
 119  }
 120  
 121  pub fn getsockopt<T: Copy>(fdsock_t, optlibc::c_int,
 122                             vallibc::c_int) -> IoResult<T> {
 123      unsafe {
 124          let mut slotT = mem::init();
 125          let mut len = mem::size_of::<T>() as libc::socklen_t;
 126          let ret = c::getsockopt(fd, opt, val,
 127                                  &mut slot as *mut _ as *mut _,
 128                                  &mut len);
 129          if ret != 0 {
 130              Err(last_error())
 131          } else {
 132              assert!(len as uint == mem::size_of::<T>());
 133              Ok(slot)
 134          }
 135      }
 136  }
 137  
 138  #[cfg(windows)]
 139  fn last_error() -> io::IoError {
 140      io::IoError::from_errno(unsafe { c::WSAGetLastError() } as uint, true)
 141  }
 142  
 143  #[cfg(not(windows))]
 144  fn last_error() -> io::IoError {
 145      super::last_error()
 146  }
 147  
 148  #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
 149  #[cfg(unix)]    unsafe fn close(socksock_t) { let _ = libc::close(sock); }
 150  
 151  fn sockname(fdsock_t,
 152              f: extern "system" unsafe fn(sock_t, *mut libc::sockaddr,
 153                                           *mut libc::socklen_t) -> libc::c_int)
 154      -> IoResult<ip::SocketAddr>
 155  {
 156      let mut storagelibc::sockaddr_storage = unsafe { mem::init() };
 157      let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
 158      unsafe {
 159          let storage = &mut storage as *mut libc::sockaddr_storage;
 160          let ret = f(fd,
 161                      storage as *mut libc::sockaddr,
 162                      &mut len as *mut libc::socklen_t);
 163          if ret != 0 {
 164              return Err(last_error())
 165          }
 166      }
 167      return sockaddr_to_addr(&storage, len as uint);
 168  }
 169  
 170  pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
 171                          len: uint) -> IoResult<ip::SocketAddr> {
 172      match storage.ss_family as libc::c_int {
 173          libc::AF_INET => {
 174              assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
 175              let storage&libc::sockaddr_in = unsafe {
 176                  cast::transmute(storage)
 177              };
 178              let addr = storage.sin_addr.s_addr as u32;
 179              let a = (addr >>  0) as u8;
 180              let b = (addr >>  8) as u8;
 181              let c = (addr >> 16) as u8;
 182              let d = (addr >> 24) as u8;
 183              Ok(ip::SocketAddr {
 184                  ip: ip::Ipv4Addr(a, b, c, d),
 185                  port: ntohs(storage.sin_port),
 186              })
 187          }
 188          libc::AF_INET6 => {
 189              assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
 190              let storage&libc::sockaddr_in6 = unsafe {
 191                  cast::transmute(storage)
 192              };
 193              let a = ntohs(storage.sin6_addr.s6_addr[0]);
 194              let b = ntohs(storage.sin6_addr.s6_addr[1]);
 195              let c = ntohs(storage.sin6_addr.s6_addr[2]);
 196              let d = ntohs(storage.sin6_addr.s6_addr[3]);
 197              let e = ntohs(storage.sin6_addr.s6_addr[4]);
 198              let f = ntohs(storage.sin6_addr.s6_addr[5]);
 199              let g = ntohs(storage.sin6_addr.s6_addr[6]);
 200              let h = ntohs(storage.sin6_addr.s6_addr[7]);
 201              Ok(ip::SocketAddr {
 202                  ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
 203                  port: ntohs(storage.sin6_port),
 204              })
 205          }
 206          _ => {
 207              Err(io::standard_error(io::OtherIoError))
 208          }
 209      }
 210  }
 211  
 212  #[cfg(unix)]
 213  pub fn init() {}
 214  
 215  #[cfg(windows)]
 216  pub fn init() {
 217  
 218      unsafe {
 219          use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
 220          static mut INITIALIZED: bool = false;
 221          static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
 222  
 223          let _guard = LOCK.lock();
 224          if !INITIALIZED {
 225              let mut data: c::WSADATA = mem::init();
 226              let ret = c::WSAStartup(0x202,      // version 2.2
 227                                      &mut data);
 228              assert_eq!(ret, 0);
 229              INITIALIZED = true;
 230          }
 231      }
 232  }
 233  
 234  ////////////////////////////////////////////////////////////////////////////////
 235  // TCP streams
 236  ////////////////////////////////////////////////////////////////////////////////
 237  
 238  pub struct TcpStream {
 239      inner: UnsafeArc<Inner>,
 240      read_deadline: u64,
 241      write_deadline: u64,
 242  }
 243  
 244  struct Inner {
 245      fd: sock_t,
 246      lock: mutex::NativeMutex,
 247  }
 248  
 249  pub struct Guard<'a> {
 250      pub fd: sock_t,
 251      pub guard: mutex::LockGuard<'a>,
 252  }
 253  
 254  impl Inner {
 255      fn new(fdsock_t) -> Inner {
 256          Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
 257      }
 258  }
 259  
 260  impl TcpStream {
 261      pub fn connect(addrip::SocketAddr,
 262                     timeoutOption<u64>) -> IoResult<TcpStream> {
 263          let fd = try!(socket(addr, libc::SOCK_STREAM));
 264          let ret = TcpStream::new(Inner::new(fd));
 265  
 266          let (addr, len) = addr_to_sockaddr(addr);
 267          let addrp = &addr as *_ as *libc::sockaddr;
 268          let len = len as libc::socklen_t;
 269  
 270          match timeout {
 271              Some(timeout) => {
 272                  try!(util::connect_timeout(fd, addrp, len, timeout));
 273                  Ok(ret)
 274              },
 275              None => {
 276                  match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
 277                      -1 => Err(last_error()),
 278                      _ => Ok(ret),
 279                  }
 280              }
 281          }
 282      }
 283  
 284      fn new(innerInner) -> TcpStream {
 285          TcpStream {
 286              inner: UnsafeArc::new(inner),
 287              read_deadline: 0,
 288              write_deadline: 0,
 289          }
 290      }
 291  
 292      pub fn fd(&self) -> sock_t {
 293          // This unsafety is fine because it's just a read-only arc
 294          unsafe { (*self.inner.get()).fd }
 295      }
 296  
 297      fn set_nodelay(&mut self, nodelaybool) -> IoResult<()> {
 298          setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
 299                     nodelay as libc::c_int)
 300      }
 301  
 302      fn set_keepalive(&mut self, secondsOption<uint>) -> IoResult<()> {
 303          let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
 304                               seconds.is_some() as libc::c_int);
 305          match seconds {
 306              Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
 307              None => ret,
 308          }
 309      }
 310  
 311      #[cfg(target_os = "macos")]
 312      fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<(){
 313          setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
 314                     seconds as libc::c_int)
 315      }
 316      #[cfg(target_os = "freebsd")]
 317      fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<(){
 318          setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
 319                     seconds as libc::c_int)
 320      }
 321      #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))]
 322      fn set_tcp_keepalive(&mut self, _secondsuint) -> IoResult<()> {
 323          Ok(())
 324      }
 325  
 326      #[cfg(target_os = "linux")]
 327      fn lock_nonblocking(&self) {}
 328  
 329      #[cfg(not(target_os = "linux"))]
 330      fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
 331          let ret = Guard {
 332              fd: self.fd(),
 333              guard: unsafe { (*self.inner.get()).lock.lock() },
 334          };
 335          assert!(util::set_nonblocking(self.fd(), true).is_ok());
 336          ret
 337      }
 338  }
 339  
 340  #[cfg(windows)] type wrlen = libc::c_int;
 341  #[cfg(not(windows))] type wrlen = libc::size_t;
 342  
 343  impl rtio::RtioTcpStream for TcpStream {
 344      fn read(&mut self, buf&mut [u8]) -> IoResult<uint> {
 345          let fd = self.fd();
 346          let dolock = || self.lock_nonblocking();
 347          let doread = |nb| unsafe {
 348              let flags = if nb {c::MSG_DONTWAIT} else {0};
 349              libc::recv(fd,
 350                         buf.as_mut_ptr() as *mut libc::c_void,
 351                         buf.len() as wrlen,
 352                         flags) as libc::c_int
 353          };
 354          read(fd, self.read_deadline, dolock, doread)
 355      }
 356  
 357      fn write(&mut self, buf&[u8]) -> IoResult<()> {
 358          let fd = self.fd();
 359          let dolock = || self.lock_nonblocking();
 360          let dowrite = |nbbool, buf*u8, lenuintunsafe {
 361              let flags = if nb {c::MSG_DONTWAIT} else {0};
 362              libc::send(fd,
 363                         buf as *mut libc::c_void,
 364                         len as wrlen,
 365                         flags) as i64
 366          };
 367          match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
 368              Ok(_) => Ok(()),
 369              Err(e) => Err(e)
 370          }
 371      }
 372      fn peer_name(&mut self) -> IoResult<ip::SocketAddr> {
 373          sockname(self.fd(), libc::getpeername)
 374      }
 375      fn control_congestion(&mut self) -> IoResult<()> {
 376          self.set_nodelay(false)
 377      }
 378      fn nodelay(&mut self) -> IoResult<()> {
 379          self.set_nodelay(true)
 380      }
 381      fn keepalive(&mut self, delay_in_secondsuint) -> IoResult<()> {
 382          self.set_keepalive(Some(delay_in_seconds))
 383      }
 384      fn letdie(&mut self) -> IoResult<()> {
 385          self.set_keepalive(None)
 386      }
 387  
 388      fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
 389          box TcpStream {
 390              inner: self.inner.clone(),
 391              read_deadline: 0,
 392              write_deadline: 0,
 393          } as Box<rtio::RtioTcpStream:Send>
 394      }
 395  
 396      fn close_write(&mut self) -> IoResult<()> {
 397          super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
 398      }
 399      fn close_read(&mut self) -> IoResult<()> {
 400          super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
 401      }
 402  
 403      fn set_timeout(&mut self, timeoutOption<u64>) {
 404          let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 405          self.read_deadline = deadline;
 406          self.write_deadline = deadline;
 407      }
 408      fn set_read_timeout(&mut self, timeoutOption<u64>) {
 409          self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 410      }
 411      fn set_write_timeout(&mut self, timeoutOption<u64>) {
 412          self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 413      }
 414  }
 415  
 416  impl rtio::RtioSocket for TcpStream {
 417      fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
 418          sockname(self.fd(), libc::getsockname)
 419      }
 420  }
 421  
 422  impl Drop for Inner {
 423      fn drop(&mut self) { unsafe { close(self.fd); } }
 424  }
 425  
 426  #[unsafe_destructor]
 427  impl<'a> Drop for Guard<'a> {
 428      fn drop(&mut self) {
 429          assert!(util::set_nonblocking(self.fd, false).is_ok());
 430      }
 431  }
 432  
 433  ////////////////////////////////////////////////////////////////////////////////
 434  // TCP listeners
 435  ////////////////////////////////////////////////////////////////////////////////
 436  
 437  pub struct TcpListener {
 438      inner: Inner,
 439  }
 440  
 441  impl TcpListener {
 442      pub fn bind(addrip::SocketAddr) -> IoResult<TcpListener> {
 443          let fd = try!(socket(addr, libc::SOCK_STREAM));
 444          let ret = TcpListener { inner: Inner::new(fd) };
 445  
 446          let (addr, len) = addr_to_sockaddr(addr);
 447          let addrp = &addr as *_ as *libc::sockaddr;
 448          let len = len as libc::socklen_t;
 449  
 450          // On platforms with Berkeley-derived sockets, this allows
 451          // to quickly rebind a socket, without needing to wait for
 452          // the OS to clean up the previous one.
 453          if cfg!(unix) {
 454              try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
 455                              1 as libc::c_int));
 456          }
 457  
 458          match unsafe { libc::bind(fd, addrp, len) } {
 459              -1 => Err(last_error()),
 460              _ => Ok(ret),
 461          }
 462      }
 463  
 464      pub fn fd(&self) -> sock_t { self.inner.fd }
 465  
 466      pub fn native_listen(self, backlogint) -> IoResult<TcpAcceptor> {
 467          match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
 468              -1 => Err(last_error()),
 469              _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
 470          }
 471      }
 472  }
 473  
 474  impl rtio::RtioTcpListener for TcpListener {
 475      fn listen(~self) -> IoResult<Box<rtio::RtioTcpAcceptor:Send>> {
 476          self.native_listen(128).map(|a| {
 477              box a as Box<rtio::RtioTcpAcceptor:Send>
 478          })
 479      }
 480  }
 481  
 482  impl rtio::RtioSocket for TcpListener {
 483      fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
 484          sockname(self.fd(), libc::getsockname)
 485      }
 486  }
 487  
 488  pub struct TcpAcceptor {
 489      listener: TcpListener,
 490      deadline: u64,
 491  }
 492  
 493  impl TcpAcceptor {
 494      pub fn fd(&self) -> sock_t { self.listener.fd() }
 495  
 496      pub fn native_accept(&mut self) -> IoResult<TcpStream> {
 497          if self.deadline != 0 {
 498              try!(util::await(self.fd(), Some(self.deadline), util::Readable));
 499          }
 500          unsafe {
 501              let mut storagelibc::sockaddr_storage = mem::init();
 502              let storagep = &mut storage as *mut libc::sockaddr_storage;
 503              let size = mem::size_of::<libc::sockaddr_storage>();
 504              let mut size = size as libc::socklen_t;
 505              match retry(|| {
 506                  libc::accept(self.fd(),
 507                               storagep as *mut libc::sockaddr,
 508                               &mut size as *mut libc::socklen_t) as libc::c_int
 509              }) as sock_t {
 510                  -1 => Err(last_error()),
 511                  fd => Ok(TcpStream::new(Inner::new(fd))),
 512              }
 513          }
 514      }
 515  }
 516  
 517  impl rtio::RtioSocket for TcpAcceptor {
 518      fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
 519          sockname(self.fd(), libc::getsockname)
 520      }
 521  }
 522  
 523  impl rtio::RtioTcpAcceptor for TcpAcceptor {
 524      fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream:Send>> {
 525          self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream:Send>)
 526      }
 527  
 528      fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
 529      fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
 530      fn set_timeout(&mut self, timeoutOption<u64>) {
 531          self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 532      }
 533  }
 534  
 535  ////////////////////////////////////////////////////////////////////////////////
 536  // UDP
 537  ////////////////////////////////////////////////////////////////////////////////
 538  
 539  pub struct UdpSocket {
 540      inner: UnsafeArc<Inner>,
 541      read_deadline: u64,
 542      write_deadline: u64,
 543  }
 544  
 545  impl UdpSocket {
 546      pub fn bind(addrip::SocketAddr) -> IoResult<UdpSocket> {
 547          let fd = try!(socket(addr, libc::SOCK_DGRAM));
 548          let ret = UdpSocket {
 549              inner: UnsafeArc::new(Inner::new(fd)),
 550              read_deadline: 0,
 551              write_deadline: 0,
 552          };
 553  
 554          let (addr, len) = addr_to_sockaddr(addr);
 555          let addrp = &addr as *_ as *libc::sockaddr;
 556          let len = len as libc::socklen_t;
 557  
 558          match unsafe { libc::bind(fd, addrp, len) } {
 559              -1 => Err(last_error()),
 560              _ => Ok(ret),
 561          }
 562      }
 563  
 564      pub fn fd(&self) -> sock_t {
 565          // unsafety is fine because it's just a read-only arc
 566          unsafe { (*self.inner.get()).fd }
 567      }
 568  
 569      pub fn set_broadcast(&mut self, onbool) -> IoResult<()> {
 570          setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
 571                     on as libc::c_int)
 572      }
 573  
 574      pub fn set_multicast_loop(&mut self, onbool) -> IoResult<()> {
 575          setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
 576                     on as libc::c_int)
 577      }
 578  
 579      pub fn set_membership(&mut self, addrip::IpAddr,
 580                            optlibc::c_int) -> IoResult<()> {
 581          match ip_to_inaddr(addr) {
 582              InAddr(addr) => {
 583                  let mreq = libc::ip_mreq {
 584                      imr_multiaddr: addr,
 585                      // interface == INADDR_ANY
 586                      imr_interface: libc::in_addr { s_addr: 0x0 },
 587                  };
 588                  setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
 589              }
 590              In6Addr(addr) => {
 591                  let mreq = libc::ip6_mreq {
 592                      ipv6mr_multiaddr: addr,
 593                      ipv6mr_interface: 0,
 594                  };
 595                  setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
 596              }
 597          }
 598      }
 599  
 600      #[cfg(target_os = "linux")]
 601      fn lock_nonblocking(&self) {}
 602  
 603      #[cfg(not(target_os = "linux"))]
 604      fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
 605          let ret = Guard {
 606              fd: self.fd(),
 607              guard: unsafe { (*self.inner.get()).lock.lock() },
 608          };
 609          assert!(util::set_nonblocking(self.fd(), true).is_ok());
 610          ret
 611      }
 612  }
 613  
 614  impl rtio::RtioSocket for UdpSocket {
 615      fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
 616          sockname(self.fd(), libc::getsockname)
 617      }
 618  }
 619  
 620  #[cfg(windows)] type msglen_t = libc::c_int;
 621  #[cfg(unix)]    type msglen_t = libc::size_t;
 622  
 623  impl rtio::RtioUdpSocket for UdpSocket {
 624      fn recvfrom(&mut self, buf&mut [u8]) -> IoResult<(uint, ip::SocketAddr)> {
 625          let fd = self.fd();
 626          let mut storagelibc::sockaddr_storage = unsafe { mem::init() };
 627          let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
 628          let mut addrlenlibc::socklen_t =
 629                  mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
 630  
 631          let dolock = || self.lock_nonblocking();
 632          let doread = |nb| unsafe {
 633              let flags = if nb {c::MSG_DONTWAIT} else {0};
 634              libc::recvfrom(fd,
 635                             buf.as_mut_ptr() as *mut libc::c_void,
 636                             buf.len() as msglen_t,
 637                             flags,
 638                             storagep,
 639                             &mut addrlen) as libc::c_int
 640          };
 641          let n = try!(read(fd, self.read_deadline, dolock, doread));
 642          sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
 643              Ok((n as uint, addr))
 644          })
 645      }
 646  
 647      fn sendto(&mut self, buf&[u8], dstip::SocketAddr) -> IoResult<()> {
 648          let (dst, dstlen) = addr_to_sockaddr(dst);
 649          let dstp = &dst as *_ as *libc::sockaddr;
 650          let dstlen = dstlen as libc::socklen_t;
 651  
 652          let fd = self.fd();
 653          let dolock = || self.lock_nonblocking();
 654          let dowrite = |nb, buf*u8, lenuintunsafe {
 655              let flags = if nb {c::MSG_DONTWAIT} else {0};
 656              libc::sendto(fd,
 657                           buf as *libc::c_void,
 658                           len as msglen_t,
 659                           flags,
 660                           dstp,
 661                           dstlen) as i64
 662          };
 663  
 664          let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
 665          if n != buf.len() {
 666              Err(io::IoError {
 667                  kind: io::ShortWrite(n),
 668                  desc: "couldn't send entire packet at once",
 669                  detail: None,
 670              })
 671          } else {
 672              Ok(())
 673          }
 674      }
 675  
 676      fn join_multicast(&mut self, multiip::IpAddr) -> IoResult<()> {
 677          match multi {
 678              ip::Ipv4Addr(..) => {
 679                  self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
 680              }
 681              ip::Ipv6Addr(..) => {
 682                  self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
 683              }
 684          }
 685      }
 686      fn leave_multicast(&mut self, multiip::IpAddr) -> IoResult<()> {
 687          match multi {
 688              ip::Ipv4Addr(..) => {
 689                  self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
 690              }
 691              ip::Ipv6Addr(..) => {
 692                  self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
 693              }
 694          }
 695      }
 696  
 697      fn loop_multicast_locally(&mut self) -> IoResult<()> {
 698          self.set_multicast_loop(true)
 699      }
 700      fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
 701          self.set_multicast_loop(false)
 702      }
 703  
 704      fn multicast_time_to_live(&mut self, ttlint) -> IoResult<()> {
 705          setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
 706                     ttl as libc::c_int)
 707      }
 708      fn time_to_live(&mut self, ttlint) -> IoResult<()> {
 709          setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
 710      }
 711  
 712      fn hear_broadcasts(&mut self) -> IoResult<()> {
 713          self.set_broadcast(true)
 714      }
 715      fn ignore_broadcasts(&mut self) -> IoResult<()> {
 716          self.set_broadcast(false)
 717      }
 718  
 719      fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
 720          box UdpSocket {
 721              inner: self.inner.clone(),
 722              read_deadline: 0,
 723              write_deadline: 0,
 724          } as Box<rtio::RtioUdpSocket:Send>
 725      }
 726  
 727      fn set_timeout(&mut self, timeoutOption<u64>) {
 728          let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 729          self.read_deadline = deadline;
 730          self.write_deadline = deadline;
 731      }
 732      fn set_read_timeout(&mut self, timeoutOption<u64>) {
 733          self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 734      }
 735      fn set_write_timeout(&mut self, timeoutOption<u64>) {
 736          self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 737      }
 738  }
 739  
 740  ////////////////////////////////////////////////////////////////////////////////
 741  // Timeout helpers
 742  //
 743  // The read/write functions below are the helpers for reading/writing a socket
 744  // with a possible deadline specified. This is generally viewed as a timed out
 745  // I/O operation.
 746  //
 747  // From the application's perspective, timeouts apply to the I/O object, not to
 748  // the underlying file descriptor (it's one timeout per object). This means that
 749  // we can't use the SO_RCVTIMEO and corresponding send timeout option.
 750  //
 751  // The next idea to implement timeouts would be to use nonblocking I/O. An
 752  // invocation of select() would wait (with a timeout) for a socket to be ready.
 753  // Once its ready, we can perform the operation. Note that the operation *must*
 754  // be nonblocking, even though select() says the socket is ready. This is
 755  // because some other thread could have come and stolen our data (handles can be
 756  // cloned).
 757  //
 758  // To implement nonblocking I/O, the first option we have is to use the
 759  // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
 760  // I/O objects, so this was initially viewed as unwise.
 761  //
 762  // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
 763  // send/recv, but the niftiness wears off once you realize it only works well on
 764  // linux [1] [2]. This means that it's pretty easy to get a nonblocking
 765  // operation on linux (no flag fidding, no affecting other objects), but not on
 766  // other platforms.
 767  //
 768  // To work around this constraint on other platforms, we end up using the
 769  // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
 770  // could cause other objects' blocking operations to suddenly become
 771  // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
 772  // falls back to using the same code path as nonblocking operations, but with an
 773  // infinite timeout (select + send/recv). This helps emulate blocking
 774  // reads/writes despite the underlying descriptor being nonblocking, as well as
 775  // optimizing the fast path of just hitting one syscall in the good case.
 776  //
 777  // As a final caveat, this implementation uses a mutex so only one thread is
 778  // doing a nonblocking operation at at time. This is the operation that comes
 779  // after the select() (at which point we think the socket is ready). This is
 780  // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
 781  // expect (wouldn't want someone turning it on when it should be off!). All
 782  // operations performed in the lock are *nonblocking* to avoid holding the mutex
 783  // forever.
 784  //
 785  // So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
 786  // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
 787  // reads/writes are still blocking.
 788  //
 789  // Fun, fun!
 790  //
 791  // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
 792  // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
 793  
 794  pub fn read<T>(fdsock_t,
 795                 deadline: u64,
 796                 lock: || -> T,
 797                 read: |bool| -> libc::c_int) -> IoResult<uint> {
 798      let mut ret = -1;
 799      if deadline == 0 {
 800          ret = retry(|| read(false));
 801      }
 802  
 803      if deadline != 0 || (ret == -1 && util::wouldblock()) {
 804          let deadline = match deadline {
 805              0 => None,
 806              n => Some(n),
 807          };
 808          loop {
 809              // With a timeout, first we wait for the socket to become
 810              // readable using select(), specifying the relevant timeout for
 811              // our previously set deadline.
 812              try!(util::await(fd, deadline, util::Readable));
 813  
 814              // At this point, we're still within the timeout, and we've
 815              // determined that the socket is readable (as returned by
 816              // select). We must still read the socket in *nonblocking* mode
 817              // because some other thread could come steal our data. If we
 818              // fail to read some data, we retry (hence the outer loop) and
 819              // wait for the socket to become readable again.
 820              let _guard = lock();
 821              match retry(|| read(deadline.is_some())) {
 822                  -1 if util::wouldblock() => { assert!(deadline.is_some()); }
 823                  -1 => return Err(last_error()),
 824                 n => { ret = n; break }
 825              }
 826          }
 827      }
 828  
 829      match ret {
 830          0 => Err(io::standard_error(io::EndOfFile)),
 831          n if n < 0 => Err(last_error()),
 832          n => Ok(n as uint)
 833      }
 834  }
 835  
 836  pub fn write<T>(fdsock_t,
 837                  deadline: u64,
 838                  buf: &[u8],
 839                  write_everything: bool,
 840                  lock: || -> T,
 841                  write: |bool, *u8, uint| -> i64) -> IoResult<uint> {
 842      let mut ret = -1;
 843      let mut written = 0;
 844      if deadline == 0 {
 845          if write_everything {
 846              ret = keep_going(buf, |inner, len| {
 847                  written = buf.len() - len;
 848                  write(false, inner, len)
 849              });
 850          } else {
 851              ret = retry(|| {
 852                  write(false, buf.as_ptr(), buf.len()) as libc::c_int
 853              }) as i64;
 854              if ret > 0 { written = ret as uint; }
 855          }
 856      }
 857  
 858      if deadline != 0 || (ret == -1 && util::wouldblock()) {
 859          let deadline = match deadline {
 860              0 => None,
 861              n => Some(n),
 862          };
 863          while written < buf.len() && (write_everything || written == 0) {
 864              // As with read(), first wait for the socket to be ready for
 865              // the I/O operation.
 866              match util::await(fd, deadline, util::Writable) {
 867                  Err(ref e) if e.kind == io::TimedOut && written > 0 => {
 868                      assert!(deadline.is_some());
 869                      return Err(io::IoError {
 870                          kind: io::ShortWrite(written),
 871                          desc: "short write",
 872                          detail: None,
 873                      })
 874                  }
 875                  Err(e) => return Err(e),
 876                  Ok(()) => {}
 877              }
 878  
 879              // Also as with read(), we use MSG_DONTWAIT to guard ourselves
 880              // against unforseen circumstances.
 881              let _guard = lock();
 882              let ptr = buf.slice_from(written).as_ptr();
 883              let len = buf.len() - written;
 884              match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
 885                  -1 if util::wouldblock() => {}
 886                  -1 => return Err(last_error()),
 887                  n => { written += n as uint; }
 888              }
 889          }
 890          ret = 0;
 891      }
 892      if ret < 0 {
 893          Err(last_error())
 894      } else {
 895          Ok(written)
 896      }
 897  }


libnative/io/net.rs:30:1-30:1 -fn- definition:
pub fn htons(u: u16) -> u16 {
    mem::to_be16(u)
}
references:- 10
56:                     htons(a),
57:                     htons(b),
58:                     htons(c),
--
76:                 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
77:                 (*storage).sin_port = htons(addr.port);
78:                 (*storage).sin_addr = inaddr;
--
83:                 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
84:                 (*storage).sin6_port = htons(addr.port);
85:                 (*storage).sin6_addr = inaddr;


libnative/io/net.rs:793:1-793:1 -fn- definition:
pub fn read<T>(fd: sock_t,
               deadline: u64,
               lock: || -> T,
references:- 3
353:         };
354:         read(fd, self.read_deadline, dolock, doread)
355:     }
--
640:         };
641:         let n = try!(read(fd, self.read_deadline, dolock, doread));
642:         sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
libnative/io/pipe_unix.rs:
159:         };
160:         net::read(fd, self.read_deadline, dolock, doread)
161:     }


libnative/io/net.rs:150:1-150:1 -fn- definition:
fn sockname(fd: sock_t,
            f: extern "system" unsafe fn(sock_t, *mut libc::sockaddr,
                                         *mut libc::socklen_t) -> libc::c_int)
references:- 5
372:     fn peer_name(&mut self) -> IoResult<ip::SocketAddr> {
373:         sockname(self.fd(), libc::getpeername)
374:     }
--
615:     fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
616:         sockname(self.fd(), libc::getsockname)
617:     }


libnative/io/net.rs:29:16-29:16 -NK_AS_STR_TODO- definition:
pub fn htons(u: u16) -> u16 {
    mem::to_be16(u)
}
references:- 20
249: pub struct Guard<'a> {
250:     pub fd: sock_t,
251:     pub guard: mutex::LockGuard<'a>,
--
564:     pub fn fd(&self) -> sock_t {
565:         // unsafety is fine because it's just a read-only arc
--
794: pub fn read<T>(fd: sock_t,
795:                deadline: u64,
--
836: pub fn write<T>(fd: sock_t,
837:                 deadline: u64,
libnative/io/util.rs:
138: pub fn await(fd: net::sock_t, deadline: Option<u64>,
139:              status: SocketStatus) -> IoResult<()> {
libnative/io/net.rs:
508:                              &mut size as *mut libc::socklen_t) as libc::c_int
509:             }) as sock_t {
510:                 -1 => Err(last_error()),


libnative/io/net.rs:243:1-243:1 -struct- definition:
struct Inner {
    fd: sock_t,
    lock: mutex::NativeMutex,
references:- 8
255:     fn new(fd: sock_t) -> Inner {
256:         Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
257:     }
--
284:     fn new(inner: Inner) -> TcpStream {
285:         TcpStream {
--
422: impl Drop for Inner {
423:     fn drop(&mut self) { unsafe { close(self.fd); } }
--
539: pub struct UdpSocket {
540:     inner: UnsafeArc<Inner>,
541:     read_deadline: u64,


libnative/io/net.rs:92:1-92:1 -fn- definition:
fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
    unsafe {
        let fam = match addr.ip {
references:- 3
262:                    timeout: Option<u64>) -> IoResult<TcpStream> {
263:         let fd = try!(socket(addr, libc::SOCK_STREAM));
264:         let ret = TcpStream::new(Inner::new(fd));
--
442:     pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
443:         let fd = try!(socket(addr, libc::SOCK_STREAM));
444:         let ret = TcpListener { inner: Inner::new(fd) };
--
546:     pub fn bind(addr: ip::SocketAddr) -> IoResult<UdpSocket> {
547:         let fd = try!(socket(addr, libc::SOCK_DGRAM));
548:         let ret = UdpSocket {


libnative/io/net.rs:436:1-436:1 -struct- definition:
pub struct TcpListener {
    inner: Inner,
}
references:- 6
443:         let fd = try!(socket(addr, libc::SOCK_STREAM));
444:         let ret = TcpListener { inner: Inner::new(fd) };
--
488: pub struct TcpAcceptor {
489:     listener: TcpListener,
490:     deadline: u64,


libnative/io/net.rs:169:1-169:1 -fn- definition:
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
                        len: uint) -> IoResult<ip::SocketAddr> {
    match storage.ss_family as libc::c_int {
references:- 3
641:         let n = try!(read(fd, self.read_deadline, dolock, doread));
642:         sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
643:             Ok((n as uint, addr))
libnative/io/addrinfo.rs:
63:             unsafe {
64:                 let addr = match sockaddr_to_addr(cast::transmute((*rp).ai_addr),
65:                                                   (*rp).ai_addrlen as uint) {
libnative/io/net.rs:
166:     }
167:     return sockaddr_to_addr(&storage, len as uint);
168: }


libnative/io/net.rs:69:1-69:1 -fn- definition:
fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
    unsafe {
        let storage: libc::sockaddr_storage = mem::init();
references:- 4
446:         let (addr, len) = addr_to_sockaddr(addr);
447:         let addrp = &addr as *_ as *libc::sockaddr;
--
647:     fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> IoResult<()> {
648:         let (dst, dstlen) = addr_to_sockaddr(dst);
649:         let dstp = &dst as *_ as *libc::sockaddr;


libnative/io/net.rs:341:21-341:21 -NK_AS_STR_TODO- definition:
impl rtio::RtioTcpStream for TcpStream {
    fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
        let fd = self.fd();
references:- 2
363:                        buf as *mut libc::c_void,
364:                        len as wrlen,
365:                        flags) as i64


libnative/io/net.rs:538:1-538:1 -struct- definition:
pub struct UdpSocket {
    inner: UnsafeArc<Inner>,
    read_deadline: u64,
references:- 6
547:         let fd = try!(socket(addr, libc::SOCK_DGRAM));
548:         let ret = UdpSocket {
549:             inner: UnsafeArc::new(Inner::new(fd)),
--
719:     fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
720:         box UdpSocket {
721:             inner: self.inner.clone(),


libnative/io/net.rs:237:1-237:1 -struct- definition:
pub struct TcpStream {
    inner: UnsafeArc<Inner>,
    read_deadline: u64,
references:- 8
284:     fn new(inner: Inner) -> TcpStream {
285:         TcpStream {
286:             inner: UnsafeArc::new(inner),
--
388:     fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
389:         box TcpStream {
390:             inner: self.inner.clone(),
--
416: impl rtio::RtioSocket for TcpStream {
417:     fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
--
496:     pub fn native_accept(&mut self) -> IoResult<TcpStream> {
497:         if self.deadline != 0 {


libnative/io/net.rs:42:1-42:1 -fn- definition:
fn ip_to_inaddr(ip: ip::IpAddr) -> InAddr {
    match ip {
        ip::Ipv4Addr(a, b, c, d) => {
references:- 2
72:         let storage: libc::sockaddr_storage = mem::init();
73:         let len = match ip_to_inaddr(addr.ip) {
74:             InAddr(inaddr) => {
--
580:                           opt: libc::c_int) -> IoResult<()> {
581:         match ip_to_inaddr(addr) {
582:             InAddr(addr) => {


libnative/io/net.rs:105:1-105:1 -fn- definition:
fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
                 payload: T) -> IoResult<()> {
    unsafe {
references:- 9
704:     fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
705:         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
706:                    ttl as libc::c_int)
--
708:     fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
709:         setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
710:     }


libnative/io/net.rs:143:21-143:21 -fn- definition:
fn last_error() -> io::IoError {
    super::last_error()
}
references:- 12
892:     if ret < 0 {
893:         Err(last_error())
894:     } else {


libnative/io/net.rs:621:16-621:16 -NK_AS_STR_TODO- definition:
impl rtio::RtioUdpSocket for UdpSocket {
    fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, ip::SocketAddr)> {
        let fd = self.fd();
references:- 2
635:                            buf.as_mut_ptr() as *mut libc::c_void,
636:                            buf.len() as msglen_t,
637:                            flags,
--
657:                          buf as *libc::c_void,
658:                          len as msglen_t,
659:                          flags,


libnative/io/net.rs:33:2-33:2 -fn- definition:
}
pub fn ntohs(u: u16) -> u16 {
    mem::from_be16(u)
references:- 10
198:             let f = ntohs(storage.sin6_addr.s6_addr[5]);
199:             let g = ntohs(storage.sin6_addr.s6_addr[6]);
200:             let h = ntohs(storage.sin6_addr.s6_addr[7]);
201:             Ok(ip::SocketAddr {
202:                 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
203:                 port: ntohs(storage.sin6_port),
204:             })


libnative/io/net.rs:487:1-487:1 -struct- definition:
pub struct TcpAcceptor {
    listener: TcpListener,
    deadline: u64,
references:- 5
468:             -1 => Err(last_error()),
469:             _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
470:         }
--
523: impl rtio::RtioTcpAcceptor for TcpAcceptor {
524:     fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream:Send>> {


libnative/io/net.rs:835:1-835:1 -fn- definition:
pub fn write<T>(fd: sock_t,
                deadline: u64,
                buf: &[u8],
references:- 3
366:         };
367:         match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
368:             Ok(_) => Ok(()),
--
664:         let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
665:         if n != buf.len() {
libnative/io/pipe_unix.rs:
172:         };
173:         match net::write(fd, self.write_deadline, buf, true, dolock, dowrite) {
174:             Ok(_) => Ok(()),