(index<- )        ./librustuv/uvio.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! The implementation of `rtio` for libuv
  12  
  13  use std::c_str::CString;
  14  use std::cast;
  15  use std::io::IoError;
  16  use std::io::net::ip::SocketAddr;
  17  use std::io::process::ProcessConfig;
  18  use std::io::signal::Signum;
  19  use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
  20                ReadWrite, FileStat};
  21  use std::io;
  22  use libc::c_int;
  23  use libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR,
  24                  S_IWUSR};
  25  use libc;
  26  use std::path::Path;
  27  use std::rt::rtio;
  28  use std::rt::rtio::{IoFactory, EventLoop};
  29  use ai = std::io::net::addrinfo;
  30  
  31  #[cfg(test)] use std::unstable::run_in_bare_thread;
  32  
  33  use super::{uv_error_to_io_error, Loop};
  34  
  35  use addrinfo::GetAddrInfoRequest;
  36  use async::AsyncWatcher;
  37  use file::{FsRequest, FileWatcher};
  38  use queue::QueuePool;
  39  use homing::HomeHandle;
  40  use idle::IdleWatcher;
  41  use net::{TcpWatcher, TcpListener, UdpWatcher};
  42  use pipe::{PipeWatcher, PipeListener};
  43  use process::Process;
  44  use signal::SignalWatcher;
  45  use timer::TimerWatcher;
  46  use tty::TtyWatcher;
  47  use uvll;
  48  
  49  // Obviously an Event Loop is always home.
  50  pub struct UvEventLoop {
  51      uvio: UvIoFactory
  52  }
  53  
  54  impl UvEventLoop {
  55      pub fn new() -> UvEventLoop {
  56          let mut loop_ = Loop::new();
  57          let handle_pool = QueuePool::new(&mut loop_);
  58          UvEventLoop {
  59              uvio: UvIoFactory {
  60                  loop_: loop_,
  61                  handle_pool: Some(handle_pool),
  62              }
  63          }
  64      }
  65  }
  66  
  67  impl Drop for UvEventLoop {
  68      fn drop(&mut self) {
  69          // Must first destroy the pool of handles before we destroy the loop
  70          // because otherwise the contained async handle will be destroyed after
  71          // the loop is free'd (use-after-free). We also must free the uv handle
  72          // after the loop has been closed because during the closing of the loop
  73          // the handle is required to be used apparently.
  74          //
  75          // Lastly, after we've closed the pool of handles we pump the event loop
  76          // one last time to run any closing callbacks to make sure the loop
  77          // shuts down cleanly.
  78          let handle = self.uvio.handle_pool.get_ref().handle();
  79          drop(self.uvio.handle_pool.take());
  80          self.run();
  81  
  82          self.uvio.loop_.close();
  83          unsafe { uvll::free_handle(handle) }
  84      }
  85  }
  86  
  87  impl EventLoop for UvEventLoop {
  88      fn run(&mut self) {
  89          self.uvio.loop_.run();
  90      }
  91  
  92      fn callback(&mut self, fproc()) {
  93          IdleWatcher::onetime(&mut self.uvio.loop_, f);
  94      }
  95  
  96      fn pausable_idle_callback(&mut self, cbBox<rtio::Callback:Send>)
  97                                -> Box<rtio::PausableIdleCallback:Send> {
  98          IdleWatcher::new(&mut self.uvio.loop_, cb)
  99                           as Box<rtio::PausableIdleCallback:Send>
 100      }
 101  
 102      fn remote_callback(&mut self, fBox<rtio::Callback:Send>)
 103                         -> Box<rtio::RemoteCallback:Send> {
 104          box AsyncWatcher::new(&mut self.uvio.loop_, f) as
 105              Box<rtio::RemoteCallback:Send>
 106      }
 107  
 108      fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> {
 109          let factory = &mut self.uvio as &mut rtio::IoFactory;
 110          Some(factory)
 111      }
 112  
 113      fn has_active_io(&self) -> bool {
 114          self.uvio.loop_.get_blockers() > 0
 115      }
 116  }
 117  
 118  #[test]
 119  fn test_callback_run_once() {
 120      run_in_bare_thread(proc() {
 121          let mut event_loop = UvEventLoop::new();
 122          let mut count = 0;
 123          let count_ptr: *mut int = &mut count;
 124          event_loop.callback(proc() {
 125              unsafe { *count_ptr += 1 }
 126          });
 127          event_loop.run();
 128          assert_eq!(count, 1);
 129      });
 130  }
 131  
 132  pub struct UvIoFactory {
 133      pub loop_: Loop,
 134      handle_pool: Option<Box<QueuePool>>,
 135  }
 136  
 137  impl UvIoFactory {
 138      pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle }
 139  
 140      pub fn make_handle(&mut self) -> HomeHandle {
 141          // It's understood by the homing code that the "local id" is just the
 142          // pointer of the local I/O factory cast to a uint.
 143          let iduint = unsafe { cast::transmute_copy(&self) };
 144          HomeHandle::new(id, &mut **self.handle_pool.get_mut_ref())
 145      }
 146  }
 147  
 148  impl IoFactory for UvIoFactory {
 149      // Connect to an address and return a new stream
 150      // NB: This blocks the task waiting on the connection.
 151      // It would probably be better to return a future
 152      fn tcp_connect(&mut self, addrSocketAddr, timeoutOption<u64>)
 153                     -> Result<Box<rtio::RtioTcpStream:Send>, IoError> {
 154          match TcpWatcher::connect(self, addr, timeout) {
 155              Ok(t) => Ok(box t as Box<rtio::RtioTcpStream:Send>),
 156              Err(e) => Err(uv_error_to_io_error(e)),
 157          }
 158      }
 159  
 160      fn tcp_bind(&mut self, addrSocketAddr)
 161                  -> Result<Box<rtio::RtioTcpListener:Send>, IoError> {
 162          match TcpListener::bind(self, addr) {
 163              Ok(t) => Ok(t as Box<rtio::RtioTcpListener:Send>),
 164              Err(e) => Err(uv_error_to_io_error(e)),
 165          }
 166      }
 167  
 168      fn udp_bind(&mut self, addrSocketAddr)
 169                  -> Result<Box<rtio::RtioUdpSocket:Send>, IoError> {
 170          match UdpWatcher::bind(self, addr) {
 171              Ok(u) => Ok(box u as Box<rtio::RtioUdpSocket:Send>),
 172              Err(e) => Err(uv_error_to_io_error(e)),
 173          }
 174      }
 175  
 176      fn timer_init(&mut self) -> Result<Box<rtio::RtioTimer:Send>, IoError> {
 177          Ok(TimerWatcher::new(self) as Box<rtio::RtioTimer:Send>)
 178      }
 179  
 180      fn get_host_addresses(&mut self, hostOption<&str>, servnameOption<&str>,
 181                            hintOption<ai::Hint>) -> Result<Vec<ai::Info>, IoError> {
 182          let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint);
 183          r.map_err(uv_error_to_io_error)
 184      }
 185  
 186      fn fs_from_raw_fd(&mut self, fdc_int, closertio::CloseBehavior)
 187                        -> Box<rtio::RtioFileStream:Send> {
 188          box FileWatcher::new(self, fd, close) as
 189              Box<rtio::RtioFileStream:Send>
 190      }
 191  
 192      fn fs_open(&mut self, path&CString, fmFileMode, faFileAccess)
 193                 -> Result<Box<rtio::RtioFileStream:Send>, IoError> {
 194          let flags = match fm {
 195              io::Open => 0,
 196              io::Append => libc::O_APPEND,
 197              io::Truncate => libc::O_TRUNC,
 198          };
 199          // Opening with a write permission must silently create the file.
 200          let (flags, mode) = match fa {
 201              io::Read => (flags | libc::O_RDONLY, 0),
 202              io::Write => (flags | libc::O_WRONLY | libc::O_CREAT,
 203                            libc::S_IRUSR | libc::S_IWUSR),
 204              io::ReadWrite => (flags | libc::O_RDWR | libc::O_CREAT,
 205                                libc::S_IRUSR | libc::S_IWUSR),
 206          };
 207  
 208          match FsRequest::open(self, path, flags as int, mode as int) {
 209              Ok(fs) => Ok(box fs as Box<rtio::RtioFileStream:Send>),
 210              Err(e) => Err(uv_error_to_io_error(e))
 211          }
 212      }
 213  
 214      fn fs_unlink(&mut self, path&CString) -> Result<(), IoError> {
 215          let r = FsRequest::unlink(&self.loop_, path);
 216          r.map_err(uv_error_to_io_error)
 217      }
 218      fn fs_lstat(&mut self, path&CString) -> Result<FileStat, IoError> {
 219          let r = FsRequest::lstat(&self.loop_, path);
 220          r.map_err(uv_error_to_io_error)
 221      }
 222      fn fs_stat(&mut self, path&CString) -> Result<FileStat, IoError> {
 223          let r = FsRequest::stat(&self.loop_, path);
 224          r.map_err(uv_error_to_io_error)
 225      }
 226      fn fs_mkdir(&mut self, path&CString,
 227                  permio::FilePermission) -> Result<(), IoError> {
 228          let r = FsRequest::mkdir(&self.loop_, path, perm.bits() as c_int);
 229          r.map_err(uv_error_to_io_error)
 230      }
 231      fn fs_rmdir(&mut self, path&CString) -> Result<(), IoError> {
 232          let r = FsRequest::rmdir(&self.loop_, path);
 233          r.map_err(uv_error_to_io_error)
 234      }
 235      fn fs_rename(&mut self, path&CString, to&CString) -> Result<(), IoError> {
 236          let r = FsRequest::rename(&self.loop_, path, to);
 237          r.map_err(uv_error_to_io_error)
 238      }
 239      fn fs_chmod(&mut self, path&CString,
 240                  permio::FilePermission) -> Result<(), IoError> {
 241          let r = FsRequest::chmod(&self.loop_, path, perm.bits() as c_int);
 242          r.map_err(uv_error_to_io_error)
 243      }
 244      fn fs_readdir(&mut self, path&CString, flagsc_int)
 245          -> Result<Vec<Path>, IoError>
 246      {
 247          let r = FsRequest::readdir(&self.loop_, path, flags);
 248          r.map_err(uv_error_to_io_error)
 249      }
 250      fn fs_link(&mut self, src&CString, dst&CString) -> Result<(), IoError> {
 251          let r = FsRequest::link(&self.loop_, src, dst);
 252          r.map_err(uv_error_to_io_error)
 253      }
 254      fn fs_symlink(&mut self, src&CString, dst&CString) -> Result<(), IoError> {
 255          let r = FsRequest::symlink(&self.loop_, src, dst);
 256          r.map_err(uv_error_to_io_error)
 257      }
 258      fn fs_chown(&mut self, path&CString, uidint, gidint) -> Result<(), IoError> {
 259          let r = FsRequest::chown(&self.loop_, path, uid, gid);
 260          r.map_err(uv_error_to_io_error)
 261      }
 262      fn fs_readlink(&mut self, path&CString) -> Result<Path, IoError> {
 263          let r = FsRequest::readlink(&self.loop_, path);
 264          r.map_err(uv_error_to_io_error)
 265      }
 266      fn fs_utime(&mut self, path&CString, atimeu64, mtimeu64)
 267          -> Result<(), IoError>
 268      {
 269          let r = FsRequest::utime(&self.loop_, path, atime, mtime);
 270          r.map_err(uv_error_to_io_error)
 271      }
 272  
 273      fn spawn(&mut self, configProcessConfig)
 274              -> Result<(Box<rtio::RtioProcess:Send>,
 275                         Vec<Option<Box<rtio::RtioPipe:Send>>>),
 276                        IoError>
 277      {
 278          match Process::spawn(self, config) {
 279              Ok((p, io)) => {
 280                  Ok((p as Box<rtio::RtioProcess:Send>,
 281                      io.move_iter().map(|i| i.map(|p| {
 282                          box p as Box<rtio::RtioPipe:Send>
 283                      })).collect()))
 284              }
 285              Err(e) => Err(uv_error_to_io_error(e)),
 286          }
 287      }
 288  
 289      fn kill(&mut self, pidlibc::pid_t, signumint) -> Result<(), IoError> {
 290          Process::kill(pid, signum).map_err(uv_error_to_io_error)
 291      }
 292  
 293      fn unix_bind(&mut self, path&CString)
 294                   -> Result<Box<rtio::RtioUnixListener:Send>, IoError> {
 295          match PipeListener::bind(self, path) {
 296              Ok(p) => Ok(p as Box<rtio::RtioUnixListener:Send>),
 297              Err(e) => Err(uv_error_to_io_error(e)),
 298          }
 299      }
 300  
 301      fn unix_connect(&mut self, path&CString, timeoutOption<u64>)
 302                      -> Result<Box<rtio::RtioPipe:Send>, IoError> {
 303          match PipeWatcher::connect(self, path, timeout) {
 304              Ok(p) => Ok(box p as Box<rtio::RtioPipe:Send>),
 305              Err(e) => Err(uv_error_to_io_error(e)),
 306          }
 307      }
 308  
 309      fn tty_open(&mut self, fdc_int, readablebool)
 310              -> Result<Box<rtio::RtioTTY:Send>, IoError> {
 311          match TtyWatcher::new(self, fd, readable) {
 312              Ok(tty) => Ok(box tty as Box<rtio::RtioTTY:Send>),
 313              Err(e) => Err(uv_error_to_io_error(e))
 314          }
 315      }
 316  
 317      fn pipe_open(&mut self, fdc_int)
 318                   -> Result<Box<rtio::RtioPipe:Send>, IoError> {
 319          match PipeWatcher::open(self, fd) {
 320              Ok(s) => Ok(box s as Box<rtio::RtioPipe:Send>),
 321              Err(e) => Err(uv_error_to_io_error(e))
 322          }
 323      }
 324  
 325      fn signal(&mut self, signumSignum, channelSender<Signum>)
 326          -> Result<Box<rtio::RtioSignal:Send>, IoError> {
 327          match SignalWatcher::new(self, signum, channel) {
 328              Ok(s) => Ok(s as Box<rtio::RtioSignal:Send>),
 329              Err(e) => Err(uv_error_to_io_error(e)),
 330          }
 331      }
 332  }


librustuv/uvio.rs:49:43-49:43 -struct- definition:
// Obviously an Event Loop is always home.
pub struct UvEventLoop {
    uvio: UvIoFactory
references:- 5
57:         let handle_pool = QueuePool::new(&mut loop_);
58:         UvEventLoop {
59:             uvio: UvIoFactory {
--
87: impl EventLoop for UvEventLoop {
88:     fn run(&mut self) {


librustuv/uvio.rs:131:1-131:1 -struct- definition:
pub struct UvIoFactory {
    pub loop_: Loop,
    handle_pool: Option<Box<QueuePool>>,
references:- 20
58:         UvEventLoop {
59:             uvio: UvIoFactory {
60:                 loop_: loop_,
--
148: impl IoFactory for UvIoFactory {
149:     // Connect to an address and return a new stream
librustuv/file.rs:
39: impl FsRequest {
40:     pub fn open(io: &mut UvIoFactory, path: &CString, flags: int, mode: int)
41:         -> Result<FileWatcher, UvError>
--
359: impl FileWatcher {
360:     pub fn new(io: &mut UvIoFactory, fd: c_int,
361:                close: rtio::CloseBehavior) -> FileWatcher {
librustuv/net.rs:
352: impl TcpListener {
353:     pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
354:                 -> Result<Box<TcpListener>, UvError> {
--
504: impl UdpWatcher {
505:     pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
506:                 -> Result<UdpWatcher, UvError> {
librustuv/timer.rs:
34: impl TimerWatcher {
35:     pub fn new(io: &mut UvIoFactory) -> Box<TimerWatcher> {
36:         let handle = io.make_handle();
librustuv/process.rs:
41:     /// occurred.
42:     pub fn spawn(io_loop: &mut UvIoFactory, config: process::ProcessConfig)
43:                 -> Result<(Box<Process>, Vec<Option<PipeWatcher>>), UvError>
--
137:                     io: &process::StdioContainer,
138:                     io_loop: &mut UvIoFactory) -> Option<PipeWatcher> {
139:     match *io {
librustuv/pipe.rs:
80:     pub fn open(io: &mut UvIoFactory, file: libc::c_int)
81:         -> Result<PipeWatcher, UvError>
--
225: impl PipeListener {
226:     pub fn bind(io: &mut UvIoFactory, name: &CString)
227:         -> Result<Box<PipeListener>, UvError>
librustuv/tty.rs:
29: impl TtyWatcher {
30:     pub fn new(io: &mut UvIoFactory, fd: libc::c_int, readable: bool)
31:         -> Result<TtyWatcher, UvError>
librustuv/signal.rs:
28: impl SignalWatcher {
29:     pub fn new(io: &mut UvIoFactory, signum: Signum, channel: Sender<Signum>)
30:                -> Result<Box<SignalWatcher>, UvError> {
librustuv/timeout.rs:
226:     pub fn connect<T>(
227:         mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
228:         f: |&Request, &T, uvll::uv_connect_cb| -> c_int
librustuv/uvio.rs:
137: impl UvIoFactory {
138:     pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle }