(index<- )        ./librustuv/pipe.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  use libc;
  12  use std::c_str::CString;
  13  use std::cast;
  14  use std::io::IoError;
  15  use std::io;
  16  use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
  17  use std::rt::task::BlockedTask;
  18  
  19  use homing::{HomingIO, HomeHandle};
  20  use net;
  21  use rc::Refcount;
  22  use stream::StreamWatcher;
  23  use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
  24  use timeout::{AcceptTimeout, ConnectCtx, AccessTimeout};
  25  use uvio::UvIoFactory;
  26  use uvll;
  27  
  28  pub struct PipeWatcher {
  29      stream: StreamWatcher,
  30      home: HomeHandle,
  31      defused: bool,
  32      refcount: Refcount,
  33  
  34      // see comments in TcpWatcher for why these exist
  35      write_access: AccessTimeout,
  36      read_access: AccessTimeout,
  37  }
  38  
  39  pub struct PipeListener {
  40      home: HomeHandle,
  41      pipe: *uvll::uv_pipe_t,
  42      outgoing: Sender<Result<Box<RtioPipe:Send>, IoError>>,
  43      incoming: Receiver<Result<Box<RtioPipe:Send>, IoError>>,
  44  }
  45  
  46  pub struct PipeAcceptor {
  47      listener: Box<PipeListener>,
  48      timeout: AcceptTimeout,
  49  }
  50  
  51  // PipeWatcher implementation and traits
  52  
  53  impl PipeWatcher {
  54      // Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
  55      // get bound to some other source (this is normally a helper method paired
  56      // with another call).
  57      pub fn new(io&mut UvIoFactory, ipcbool) -> PipeWatcher {
  58          let home = io.make_handle();
  59          PipeWatcher::new_home(&io.loop_, home, ipc)
  60      }
  61  
  62      pub fn new_home(loop_&Loop, homeHomeHandle, ipcbool) -> PipeWatcher {
  63          let handle = unsafe {
  64              let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
  65              assert!(!handle.is_null());
  66              let ipc = ipc as libc::c_int;
  67              assert_eq!(uvll::uv_pipe_init(loop_.handle, handle, ipc), 0);
  68              handle
  69          };
  70          PipeWatcher {
  71              stream: StreamWatcher::new(handle),
  72              home: home,
  73              defused: false,
  74              refcount: Refcount::new(),
  75              read_access: AccessTimeout::new(),
  76              write_access: AccessTimeout::new(),
  77          }
  78      }
  79  
  80      pub fn open(io&mut UvIoFactory, filelibc::c_int)
  81          -> Result<PipeWatcher, UvError>
  82      {
  83          let pipe = PipeWatcher::new(io, false);
  84          match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
  85              0 => Ok(pipe),
  86              n => Err(UvError(n))
  87          }
  88      }
  89  
  90      pub fn connect(io&mut UvIoFactory, name&CString, timeoutOption<u64>)
  91          -> Result<PipeWatcher, UvError>
  92      {
  93          let pipe = PipeWatcher::new(io, false);
  94          let cx = ConnectCtx { status: -1, task: None, timer: None };
  95          cx.connect(pipe, timeout, io, |req, pipe, cb| {
  96              unsafe {
  97                  uvll::uv_pipe_connect(req.handle, pipe.handle(),
  98                                        name.with_ref(|p| p), cb)
  99              }
 100              0
 101          })
 102      }
 103  
 104      pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
 105  
 106      // Unwraps the underlying uv pipe. This cancels destruction of the pipe and
 107      // allows the pipe to get moved elsewhere
 108      fn unwrap(mut self) -> *uvll::uv_pipe_t {
 109          self.defused = true;
 110          return self.stream.handle;
 111      }
 112  }
 113  
 114  impl RtioPipe for PipeWatcher {
 115      fn read(&mut self, buf&mut [u8]) -> Result<uint, IoError> {
 116          let m = self.fire_homing_missile();
 117          let guard = try!(self.read_access.grant(m));
 118  
 119          // see comments in close_read about this check
 120          if guard.access.is_closed() {
 121              return Err(io::standard_error(io::EndOfFile))
 122          }
 123  
 124          self.stream.read(buf).map_err(uv_error_to_io_error)
 125      }
 126  
 127      fn write(&mut self, buf&[u8]) -> Result<(), IoError> {
 128          let m = self.fire_homing_missile();
 129          let guard = try!(self.write_access.grant(m));
 130          self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
 131      }
 132  
 133      fn clone(&self) -> Box<RtioPipe:Send> {
 134          box PipeWatcher {
 135              stream: StreamWatcher::new(self.stream.handle),
 136              defused: false,
 137              home: self.home.clone(),
 138              refcount: self.refcount.clone(),
 139              read_access: self.read_access.clone(),
 140              write_access: self.write_access.clone(),
 141          } as Box<RtioPipe:Send>
 142      }
 143  
 144      fn close_read(&mut self) -> Result<(), IoError> {
 145          // The current uv_shutdown method only shuts the writing half of the
 146          // connection, and no method is provided to shut down the reading half
 147          // of the connection. With a lack of method, we emulate shutting down
 148          // the reading half of the connection by manually returning early from
 149          // all future calls to `read`.
 150          //
 151          // Note that we must be careful to ensure that *all* cloned handles see
 152          // the closing of the read half, so we stored the "is closed" bit in the
 153          // Access struct, not in our own personal watcher. Additionally, the
 154          // homing missile is used as a locking mechanism to ensure there is no
 155          // contention over this bit.
 156          //
 157          // To shutdown the read half, we must first flag the access as being
 158          // closed, and then afterwards we cease any pending read. Note that this
 159          // ordering is crucial because we could in theory be rescheduled during
 160          // the uv_read_stop which means that another read invocation could leak
 161          // in before we set the flag.
 162          let task = {
 163              let m = self.fire_homing_missile();
 164              self.read_access.access.close(&m);
 165              self.stream.cancel_read(uvll::EOF as libc::ssize_t)
 166          };
 167          let _ = task.map(|t| t.reawaken());
 168          Ok(())
 169      }
 170  
 171      fn close_write(&mut self) -> Result<(), IoError> {
 172          let _m = self.fire_homing_missile();
 173          net::shutdown(self.stream.handle, &self.uv_loop())
 174      }
 175  
 176      fn set_timeout(&mut self, timeoutOption<u64>) {
 177          self.set_read_timeout(timeout);
 178          self.set_write_timeout(timeout);
 179      }
 180  
 181      fn set_read_timeout(&mut self, msOption<u64>) {
 182          let _m = self.fire_homing_missile();
 183          let loop_ = self.uv_loop();
 184          self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
 185                                       &self.stream as *_ as uint);
 186  
 187          fn cancel_read(stream: uint) -> Option<BlockedTask> {
 188              let stream&mut StreamWatcher = unsafe { cast::transmute(stream) };
 189              stream.cancel_read(uvll::ECANCELED as libc::ssize_t)
 190          }
 191      }
 192  
 193      fn set_write_timeout(&mut self, msOption<u64>) {
 194          let _m = self.fire_homing_missile();
 195          let loop_ = self.uv_loop();
 196          self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
 197                                        &self.stream as *_ as uint);
 198  
 199          fn cancel_write(streamuint) -> Option<BlockedTask> {
 200              let stream&mut StreamWatcher = unsafe { cast::transmute(stream) };
 201              stream.cancel_write()
 202          }
 203      }
 204  }
 205  
 206  impl HomingIO for PipeWatcher {
 207      fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
 208  }
 209  
 210  impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
 211      fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
 212  }
 213  
 214  impl Drop for PipeWatcher {
 215      fn drop(&mut self) {
 216          let _m = self.fire_homing_missile();
 217          if !self.defused && self.refcount.decrement() {
 218              self.close();
 219          }
 220      }
 221  }
 222  
 223  // PipeListener implementation and traits
 224  
 225  impl PipeListener {
 226      pub fn bind(io&mut UvIoFactory, name&CString)
 227          -> Result<Box<PipeListener>, UvError>
 228      {
 229          let pipe = PipeWatcher::new(io, false);
 230          match unsafe {
 231              uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
 232          } {
 233              0 => {
 234                  // If successful, unwrap the PipeWatcher because we control how
 235                  // we close the pipe differently. We can't rely on
 236                  // StreamWatcher's default close method.
 237                  let (tx, rx) = channel();
 238                  let p = box PipeListener {
 239                      home: io.make_handle(),
 240                      pipe: pipe.unwrap(),
 241                      incoming: rx,
 242                      outgoing: tx,
 243                  };
 244                  Ok(p.install())
 245              }
 246              n => Err(UvError(n))
 247          }
 248      }
 249  }
 250  
 251  impl RtioUnixListener for PipeListener {
 252      fn listen(~self) -> Result<Box<RtioUnixAcceptor:Send>, IoError> {
 253          // create the acceptor object from ourselves
 254          let mut acceptor = box PipeAcceptor {
 255              listener: self,
 256              timeout: AcceptTimeout::new(),
 257          };
 258  
 259          let _m = acceptor.fire_homing_missile();
 260          // FIXME: the 128 backlog should be configurable
 261          match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
 262              0 => Ok(acceptor as Box<RtioUnixAcceptor:Send>),
 263              n => Err(uv_error_to_io_error(UvError(n))),
 264          }
 265      }
 266  }
 267  
 268  impl HomingIO for PipeListener {
 269      fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
 270  }
 271  
 272  impl UvHandle<uvll::uv_pipe_t> for PipeListener {
 273      fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
 274  }
 275  
 276  extern fn listen_cb(server: *uvll::uv_stream_t, statuslibc::c_int) {
 277      assert!(status != uvll::ECANCELED);
 278  
 279      let pipe&mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
 280      let msg = match status {
 281          0 => {
 282              let loop_ = Loop::wrap(unsafe {
 283                  uvll::get_loop_for_uv_handle(server)
 284              });
 285              let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
 286              assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
 287              Ok(box client as Box<RtioPipe:Send>)
 288          }
 289          n => Err(uv_error_to_io_error(UvError(n)))
 290      };
 291      pipe.outgoing.send(msg);
 292  }
 293  
 294  impl Drop for PipeListener {
 295      fn drop(&mut self) {
 296          let _m = self.fire_homing_missile();
 297          self.close();
 298      }
 299  }
 300  
 301  // PipeAcceptor implementation and traits
 302  
 303  impl RtioUnixAcceptor for PipeAcceptor {
 304      fn accept(&mut self) -> Result<Box<RtioPipe:Send>, IoError> {
 305          self.timeout.accept(&self.listener.incoming)
 306      }
 307  
 308      fn set_timeout(&mut self, timeout_msOption<u64>) {
 309          match timeout_ms {
 310              None => self.timeout.clear(),
 311              Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
 312          }
 313      }
 314  }
 315  
 316  impl HomingIO for PipeAcceptor {
 317      fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
 318  }
 319  
 320  #[cfg(test)]
 321  mod tests {
 322      use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
 323      use std::io::test::next_test_unix;
 324  
 325      use super::{PipeWatcher, PipeListener};
 326      use super::super::local_loop;
 327  
 328      #[test]
 329      fn connect_err() {
 330          match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
 331                                     None) {
 332              Ok(..) => fail!(),
 333              Err(..) => {}
 334          }
 335      }
 336  
 337      #[test]
 338      fn bind_err() {
 339          match PipeListener::bind(local_loop(), &"path/to/nowhere".to_c_str()) {
 340              Ok(..) => fail!(),
 341              Err(e) => assert_eq!(e.name(), "EACCES".to_owned()),
 342          }
 343      }
 344  
 345      #[test]
 346      fn bind() {
 347          let p = next_test_unix().to_c_str();
 348          match PipeListener::bind(local_loop(), &p) {
 349              Ok(..) => {}
 350              Err(..) => fail!(),
 351          }
 352      }
 353  
 354      #[test] #[should_fail]
 355      fn bind_fail() {
 356          let p = next_test_unix().to_c_str();
 357          let _w = PipeListener::bind(local_loop(), &p).unwrap();
 358          fail!();
 359      }
 360  
 361      #[test]
 362      fn connect() {
 363          let path = next_test_unix();
 364          let path2 = path.clone();
 365          let (tx, rx) = channel();
 366  
 367          spawn(proc() {
 368              let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
 369              let mut p = p.listen().unwrap();
 370              tx.send(());
 371              let mut client = p.accept().unwrap();
 372              let mut buf = [0];
 373              assert!(client.read(buf).unwrap() == 1);
 374              assert_eq!(buf[0], 1);
 375              assert!(client.write([2]).is_ok());
 376          });
 377          rx.recv();
 378          let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
 379          assert!(c.write([1]).is_ok());
 380          let mut buf = [0];
 381          assert!(c.read(buf).unwrap() == 1);
 382          assert_eq!(buf[0], 2);
 383      }
 384  
 385      #[test] #[should_fail]
 386      fn connect_fail() {
 387          let path = next_test_unix();
 388          let path2 = path.clone();
 389          let (tx, rx) = channel();
 390  
 391          spawn(proc() {
 392              let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
 393              let mut p = p.listen().unwrap();
 394              tx.send(());
 395              drop(p.accept().unwrap());
 396          });
 397          rx.recv();
 398          let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
 399          fail!()
 400  
 401      }
 402  }


librustuv/pipe.rs:27:1-27:1 -struct- definition:
pub struct PipeWatcher {
    stream: StreamWatcher,
    home: HomeHandle,
references:- 13
69:         };
70:         PipeWatcher {
71:             stream: StreamWatcher::new(handle),
--
133:     fn clone(&self) -> Box<RtioPipe:Send> {
134:         box PipeWatcher {
135:             stream: StreamWatcher::new(self.stream.handle),
--
206: impl HomingIO for PipeWatcher {
207:     fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
--
214: impl Drop for PipeWatcher {
215:     fn drop(&mut self) {
librustuv/process.rs:
137:                     io: &process::StdioContainer,
138:                     io_loop: &mut UvIoFactory) -> Option<PipeWatcher> {
139:     match *io {
librustuv/pipe.rs:
210: impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
211:     fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }


librustuv/pipe.rs:45:1-45:1 -struct- definition:
pub struct PipeAcceptor {
    listener: Box<PipeListener>,
    timeout: AcceptTimeout,
references:- 3
253:         // create the acceptor object from ourselves
254:         let mut acceptor = box PipeAcceptor {
255:             listener: self,
--
303: impl RtioUnixAcceptor for PipeAcceptor {
304:     fn accept(&mut self) -> Result<Box<RtioPipe:Send>, IoError> {
--
316: impl HomingIO for PipeAcceptor {
317:     fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }


librustuv/pipe.rs:38:1-38:1 -struct- definition:
pub struct PipeListener {
    home: HomeHandle,
    pipe: *uvll::uv_pipe_t,
references:- 9
237:                 let (tx, rx) = channel();
238:                 let p = box PipeListener {
239:                     home: io.make_handle(),
--
272: impl UvHandle<uvll::uv_pipe_t> for PipeListener {
273:     fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
--
294: impl Drop for PipeListener {
295:     fn drop(&mut self) {