(index<- )        ./libnative/io/pipe_unix.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  use libc;
  12  use std::c_str::CString;
  13  use std::cast;
  14  use std::intrinsics;
  15  use std::io;
  16  use std::mem;
  17  use std::rt::rtio;
  18  use std::sync::arc::UnsafeArc;
  19  use std::unstable::mutex;
  20  
  21  use super::{IoResult, retry};
  22  use super::net;
  23  use super::util;
  24  use super::c;
  25  use super::file::fd_t;
  26  
  27  fn unix_socket(tylibc::c_int) -> IoResult<fd_t> {
  28      match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
  29          -1 => Err(super::last_error()),
  30          fd => Ok(fd)
  31      }
  32  }
  33  
  34  fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> {
  35      // the sun_path length is limited to SUN_LEN (with null)
  36      assert!(mem::size_of::<libc::sockaddr_storage>() >=
  37              mem::size_of::<libc::sockaddr_un>());
  38      let mut storagelibc::sockaddr_storage = unsafe { intrinsics::init() };
  39      let s&mut libc::sockaddr_un = unsafe { cast::transmute(&mut storage) };
  40  
  41      let len = addr.len();
  42      if len > s.sun_path.len() - 1 {
  43          return Err(io::IoError {
  44              kind: io::InvalidInput,
  45              desc: "path must be smaller than SUN_LEN",
  46              detail: None,
  47          })
  48      }
  49      s.sun_family = libc::AF_UNIX as libc::sa_family_t;
  50      for (slot, value) in s.sun_path.mut_iter().zip(addr.iter()) {
  51          *slot = value;
  52      }
  53  
  54      // count the null terminator
  55      let len = mem::size_of::<libc::sa_family_t>() + len + 1;
  56      return Ok((storage, len));
  57  }
  58  
  59  struct Inner {
  60      fd: fd_t,
  61      lock: mutex::NativeMutex,
  62  }
  63  
  64  impl Inner {
  65      fn new(fdfd_t) -> Inner {
  66          Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
  67      }
  68  }
  69  
  70  impl Drop for Inner {
  71      fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
  72  }
  73  
  74  fn connect(addr: &CString, tylibc::c_int,
  75             timeoutOption<u64>) -> IoResult<Inner> {
  76      let (addr, len) = try!(addr_to_sockaddr_un(addr));
  77      let inner = Inner::new(try!(unix_socket(ty)));
  78      let addrp = &addr as *_ as *libc::sockaddr;
  79      let len = len as libc::socklen_t;
  80  
  81      match timeout {
  82          None => {
  83              match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
  84                  -1 => Err(super::last_error()),
  85                  _  => Ok(inner)
  86              }
  87          }
  88          Some(timeout_ms) => {
  89              try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms));
  90              Ok(inner)
  91          }
  92      }
  93  }
  94  
  95  fn bind(addr: &CString, tylibc::c_int) -> IoResult<Inner> {
  96      let (addr, len) = try!(addr_to_sockaddr_un(addr));
  97      let inner = Inner::new(try!(unix_socket(ty)));
  98      let addrp = &addr as *libc::sockaddr_storage;
  99      match unsafe {
 100          libc::bind(inner.fd, addrp as *libc::sockaddr, len as libc::socklen_t)
 101      } {
 102          -1 => Err(super::last_error()),
 103          _  => Ok(inner)
 104      }
 105  }
 106  
 107  ////////////////////////////////////////////////////////////////////////////////
 108  // Unix Streams
 109  ////////////////////////////////////////////////////////////////////////////////
 110  
 111  pub struct UnixStream {
 112      inner: UnsafeArc<Inner>,
 113      read_deadline: u64,
 114      write_deadline: u64,
 115  }
 116  
 117  impl UnixStream {
 118      pub fn connect(addr&CString,
 119                     timeoutOption<u64>) -> IoResult<UnixStream> {
 120          connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
 121              UnixStream::new(UnsafeArc::new(inner))
 122          })
 123      }
 124  
 125      fn new(innerUnsafeArc<Inner>) -> UnixStream {
 126          UnixStream {
 127              inner: inner,
 128              read_deadline: 0,
 129              write_deadline: 0,
 130          }
 131      }
 132  
 133      fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } }
 134  
 135      #[cfg(target_os = "linux")]
 136      fn lock_nonblocking(&self) {}
 137  
 138      #[cfg(not(target_os = "linux"))]
 139      fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> {
 140          let ret = net::Guard {
 141              fd: self.fd(),
 142              guard: unsafe { (*self.inner.get()).lock.lock() },
 143          };
 144          assert!(util::set_nonblocking(self.fd(), true).is_ok());
 145          ret
 146      }
 147  }
 148  
 149  impl rtio::RtioPipe for UnixStream {
 150      fn read(&mut self, buf&mut [u8]) -> IoResult<uint> {
 151          let fd = self.fd();
 152          let dolock = || self.lock_nonblocking();
 153          let doread = |nb| unsafe {
 154              let flags = if nb {c::MSG_DONTWAIT} else {0};
 155              libc::recv(fd,
 156                         buf.as_mut_ptr() as *mut libc::c_void,
 157                         buf.len() as libc::size_t,
 158                         flags) as libc::c_int
 159          };
 160          net::read(fd, self.read_deadline, dolock, doread)
 161      }
 162  
 163      fn write(&mut self, buf&[u8]) -> IoResult<()> {
 164          let fd = self.fd();
 165          let dolock = || self.lock_nonblocking();
 166          let dowrite = |nbbool, buf*u8, lenuintunsafe {
 167              let flags = if nb {c::MSG_DONTWAIT} else {0};
 168              libc::send(fd,
 169                         buf as *mut libc::c_void,
 170                         len as libc::size_t,
 171                         flags) as i64
 172          };
 173          match net::write(fd, self.write_deadline, buf, true, dolock, dowrite) {
 174              Ok(_) => Ok(()),
 175              Err(e) => Err(e)
 176          }
 177      }
 178  
 179      fn clone(&self) -> Box<rtio::RtioPipe:Send> {
 180          box UnixStream::new(self.inner.clone()) as Box<rtio::RtioPipe:Send>
 181      }
 182  
 183      fn close_write(&mut self) -> IoResult<()> {
 184          super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
 185      }
 186      fn close_read(&mut self) -> IoResult<()> {
 187          super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
 188      }
 189      fn set_timeout(&mut self, timeoutOption<u64>) {
 190          let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 191          self.read_deadline = deadline;
 192          self.write_deadline = deadline;
 193      }
 194      fn set_read_timeout(&mut self, timeoutOption<u64>) {
 195          self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 196      }
 197      fn set_write_timeout(&mut self, timeoutOption<u64>) {
 198          self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 199      }
 200  }
 201  
 202  ////////////////////////////////////////////////////////////////////////////////
 203  // Unix Listener
 204  ////////////////////////////////////////////////////////////////////////////////
 205  
 206  pub struct UnixListener {
 207      inner: Inner,
 208      path: CString,
 209  }
 210  
 211  impl UnixListener {
 212      pub fn bind(addr&CString) -> IoResult<UnixListener> {
 213          bind(addr, libc::SOCK_STREAM).map(|fd| {
 214              UnixListener { inner: fd, path: addr.clone() }
 215          })
 216      }
 217  
 218      fn fd(&self) -> fd_t { self.inner.fd }
 219  
 220      pub fn native_listen(self, backlogint) -> IoResult<UnixAcceptor> {
 221          match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
 222              -1 => Err(super::last_error()),
 223              _ => Ok(UnixAcceptor { listener: self, deadline: 0 })
 224          }
 225      }
 226  }
 227  
 228  impl rtio::RtioUnixListener for UnixListener {
 229      fn listen(~self) -> IoResult<Box<rtio::RtioUnixAcceptor:Send>> {
 230          self.native_listen(128).map(|a| {
 231              box a as Box<rtio::RtioUnixAcceptor:Send>
 232          })
 233      }
 234  }
 235  
 236  pub struct UnixAcceptor {
 237      listener: UnixListener,
 238      deadline: u64,
 239  }
 240  
 241  impl UnixAcceptor {
 242      fn fd(&self) -> fd_t { self.listener.fd() }
 243  
 244      pub fn native_accept(&mut self) -> IoResult<UnixStream> {
 245          if self.deadline != 0 {
 246              try!(util::await(self.fd(), Some(self.deadline), util::Readable));
 247          }
 248          let mut storagelibc::sockaddr_storage = unsafe { intrinsics::init() };
 249          let storagep = &mut storage as *mut libc::sockaddr_storage;
 250          let size = mem::size_of::<libc::sockaddr_storage>();
 251          let mut size = size as libc::socklen_t;
 252          match retry(|| unsafe {
 253              libc::accept(self.fd(),
 254                           storagep as *mut libc::sockaddr,
 255                           &mut size as *mut libc::socklen_t) as libc::c_int
 256          }) {
 257              -1 => Err(super::last_error()),
 258              fd => Ok(UnixStream::new(UnsafeArc::new(Inner::new(fd))))
 259          }
 260      }
 261  }
 262  
 263  impl rtio::RtioUnixAcceptor for UnixAcceptor {
 264      fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe:Send>> {
 265          self.native_accept().map(|s| box s as Box<rtio::RtioPipe:Send>)
 266      }
 267      fn set_timeout(&mut self, timeoutOption<u64>) {
 268          self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 269      }
 270  }
 271  
 272  impl Drop for UnixListener {
 273      fn drop(&mut self) {
 274          // Unlink the path to the socket to ensure that it doesn't linger. We're
 275          // careful to unlink the path before we close the file descriptor to
 276          // prevent races where we unlink someone else's path.
 277          unsafe {
 278              let _ = libc::unlink(self.path.with_ref(|p| p));
 279          }
 280      }
 281  }


libnative/io/pipe_unix.rs:205:1-205:1 -struct- definition:
pub struct UnixListener {
    inner: Inner,
    path: CString,
references:- 6
213:         bind(addr, libc::SOCK_STREAM).map(|fd| {
214:             UnixListener { inner: fd, path: addr.clone() }
215:         })
--
272: impl Drop for UnixListener {
273:     fn drop(&mut self) {


libnative/io/pipe_unix.rs:235:1-235:1 -struct- definition:
pub struct UnixAcceptor {
    listener: UnixListener,
    deadline: u64,
references:- 4
222:             -1 => Err(super::last_error()),
223:             _ => Ok(UnixAcceptor { listener: self, deadline: 0 })
224:         }
--
241: impl UnixAcceptor {
242:     fn fd(&self) -> fd_t { self.listener.fd() }
--
263: impl rtio::RtioUnixAcceptor for UnixAcceptor {
264:     fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe:Send>> {


libnative/io/pipe_unix.rs:33:1-33:1 -fn- definition:
fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> {
    // the sun_path length is limited to SUN_LEN (with null)
    assert!(mem::size_of::<libc::sockaddr_storage>() >=
references:- 2
95: fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
96:     let (addr, len) = try!(addr_to_sockaddr_un(addr));
97:     let inner = Inner::new(try!(unix_socket(ty)));


libnative/io/pipe_unix.rs:110:1-110:1 -struct- definition:
pub struct UnixStream {
    inner: UnsafeArc<Inner>,
    read_deadline: u64,
references:- 6
125:     fn new(inner: UnsafeArc<Inner>) -> UnixStream {
126:         UnixStream {
--
244:     pub fn native_accept(&mut self) -> IoResult<UnixStream> {
245:         if self.deadline != 0 {


libnative/io/pipe_unix.rs:26:1-26:1 -fn- definition:
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
    match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
        -1 => Err(super::last_error()),
references:- 2
96:     let (addr, len) = try!(addr_to_sockaddr_un(addr));
97:     let inner = Inner::new(try!(unix_socket(ty)));
98:     let addrp = &addr as *libc::sockaddr_storage;


libnative/io/pipe_unix.rs:58:1-58:1 -struct- definition:
struct Inner {
    fd: fd_t,
    lock: mutex::NativeMutex,
references:- 9
65:     fn new(fd: fd_t) -> Inner {
66:         Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
67:     }
--
111: pub struct UnixStream {
112:     inner: UnsafeArc<Inner>,
113:     read_deadline: u64,
--
206: pub struct UnixListener {
207:     inner: Inner,
208:     path: CString,