(index<- ) ./librustuv/uvio.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! The implementation of `rtio` for libuv
12
13 use std::c_str::CString;
14 use std::cast;
15 use std::io::IoError;
16 use std::io::net::ip::SocketAddr;
17 use std::io::process::ProcessConfig;
18 use std::io::signal::Signum;
19 use std::io::{FileMode, FileAccess, Open, Append, Truncate, Read, Write,
20 ReadWrite, FileStat};
21 use std::io;
22 use libc::c_int;
23 use libc::{O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR,
24 S_IWUSR};
25 use libc;
26 use std::path::Path;
27 use std::rt::rtio;
28 use std::rt::rtio::{IoFactory, EventLoop};
29 use ai = std::io::net::addrinfo;
30
31 #[cfg(test)] use std::unstable::run_in_bare_thread;
32
33 use super::{uv_error_to_io_error, Loop};
34
35 use addrinfo::GetAddrInfoRequest;
36 use async::AsyncWatcher;
37 use file::{FsRequest, FileWatcher};
38 use queue::QueuePool;
39 use homing::HomeHandle;
40 use idle::IdleWatcher;
41 use net::{TcpWatcher, TcpListener, UdpWatcher};
42 use pipe::{PipeWatcher, PipeListener};
43 use process::Process;
44 use signal::SignalWatcher;
45 use timer::TimerWatcher;
46 use tty::TtyWatcher;
47 use uvll;
48
49 // Obviously an Event Loop is always home.
50 pub struct UvEventLoop {
51 uvio: UvIoFactory
52 }
53
54 impl UvEventLoop {
55 pub fn new() -> UvEventLoop {
56 let mut loop_ = Loop::new();
57 let handle_pool = QueuePool::new(&mut loop_);
58 UvEventLoop {
59 uvio: UvIoFactory {
60 loop_: loop_,
61 handle_pool: Some(handle_pool),
62 }
63 }
64 }
65 }
66
67 impl Drop for UvEventLoop {
68 fn drop(&mut self) {
69 // Must first destroy the pool of handles before we destroy the loop
70 // because otherwise the contained async handle will be destroyed after
71 // the loop is free'd (use-after-free). We also must free the uv handle
72 // after the loop has been closed because during the closing of the loop
73 // the handle is required to be used apparently.
74 //
75 // Lastly, after we've closed the pool of handles we pump the event loop
76 // one last time to run any closing callbacks to make sure the loop
77 // shuts down cleanly.
78 let handle = self.uvio.handle_pool.get_ref().handle();
79 drop(self.uvio.handle_pool.take());
80 self.run();
81
82 self.uvio.loop_.close();
83 unsafe { uvll::free_handle(handle) }
84 }
85 }
86
87 impl EventLoop for UvEventLoop {
88 fn run(&mut self) {
89 self.uvio.loop_.run();
90 }
91
92 fn callback(&mut self, f: proc()) {
93 IdleWatcher::onetime(&mut self.uvio.loop_, f);
94 }
95
96 fn pausable_idle_callback(&mut self, cb: Box<rtio::Callback:Send>)
97 -> Box<rtio::PausableIdleCallback:Send> {
98 IdleWatcher::new(&mut self.uvio.loop_, cb)
99 as Box<rtio::PausableIdleCallback:Send>
100 }
101
102 fn remote_callback(&mut self, f: Box<rtio::Callback:Send>)
103 -> Box<rtio::RemoteCallback:Send> {
104 box AsyncWatcher::new(&mut self.uvio.loop_, f) as
105 Box<rtio::RemoteCallback:Send>
106 }
107
108 fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> {
109 let factory = &mut self.uvio as &mut rtio::IoFactory;
110 Some(factory)
111 }
112
113 fn has_active_io(&self) -> bool {
114 self.uvio.loop_.get_blockers() > 0
115 }
116 }
117
118 #[test]
119 fn test_callback_run_once() {
120 run_in_bare_thread(proc() {
121 let mut event_loop = UvEventLoop::new();
122 let mut count = 0;
123 let count_ptr: *mut int = &mut count;
124 event_loop.callback(proc() {
125 unsafe { *count_ptr += 1 }
126 });
127 event_loop.run();
128 assert_eq!(count, 1);
129 });
130 }
131
132 pub struct UvIoFactory {
133 pub loop_: Loop,
134 handle_pool: Option<Box<QueuePool>>,
135 }
136
137 impl UvIoFactory {
138 pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle }
139
140 pub fn make_handle(&mut self) -> HomeHandle {
141 // It's understood by the homing code that the "local id" is just the
142 // pointer of the local I/O factory cast to a uint.
143 let id: uint = unsafe { cast::transmute_copy(&self) };
144 HomeHandle::new(id, &mut **self.handle_pool.get_mut_ref())
145 }
146 }
147
148 impl IoFactory for UvIoFactory {
149 // Connect to an address and return a new stream
150 // NB: This blocks the task waiting on the connection.
151 // It would probably be better to return a future
152 fn tcp_connect(&mut self, addr: SocketAddr, timeout: Option<u64>)
153 -> Result<Box<rtio::RtioTcpStream:Send>, IoError> {
154 match TcpWatcher::connect(self, addr, timeout) {
155 Ok(t) => Ok(box t as Box<rtio::RtioTcpStream:Send>),
156 Err(e) => Err(uv_error_to_io_error(e)),
157 }
158 }
159
160 fn tcp_bind(&mut self, addr: SocketAddr)
161 -> Result<Box<rtio::RtioTcpListener:Send>, IoError> {
162 match TcpListener::bind(self, addr) {
163 Ok(t) => Ok(t as Box<rtio::RtioTcpListener:Send>),
164 Err(e) => Err(uv_error_to_io_error(e)),
165 }
166 }
167
168 fn udp_bind(&mut self, addr: SocketAddr)
169 -> Result<Box<rtio::RtioUdpSocket:Send>, IoError> {
170 match UdpWatcher::bind(self, addr) {
171 Ok(u) => Ok(box u as Box<rtio::RtioUdpSocket:Send>),
172 Err(e) => Err(uv_error_to_io_error(e)),
173 }
174 }
175
176 fn timer_init(&mut self) -> Result<Box<rtio::RtioTimer:Send>, IoError> {
177 Ok(TimerWatcher::new(self) as Box<rtio::RtioTimer:Send>)
178 }
179
180 fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
181 hint: Option<ai::Hint>) -> Result<Vec<ai::Info>, IoError> {
182 let r = GetAddrInfoRequest::run(&self.loop_, host, servname, hint);
183 r.map_err(uv_error_to_io_error)
184 }
185
186 fn fs_from_raw_fd(&mut self, fd: c_int, close: rtio::CloseBehavior)
187 -> Box<rtio::RtioFileStream:Send> {
188 box FileWatcher::new(self, fd, close) as
189 Box<rtio::RtioFileStream:Send>
190 }
191
192 fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess)
193 -> Result<Box<rtio::RtioFileStream:Send>, IoError> {
194 let flags = match fm {
195 io::Open => 0,
196 io::Append => libc::O_APPEND,
197 io::Truncate => libc::O_TRUNC,
198 };
199 // Opening with a write permission must silently create the file.
200 let (flags, mode) = match fa {
201 io::Read => (flags | libc::O_RDONLY, 0),
202 io::Write => (flags | libc::O_WRONLY | libc::O_CREAT,
203 libc::S_IRUSR | libc::S_IWUSR),
204 io::ReadWrite => (flags | libc::O_RDWR | libc::O_CREAT,
205 libc::S_IRUSR | libc::S_IWUSR),
206 };
207
208 match FsRequest::open(self, path, flags as int, mode as int) {
209 Ok(fs) => Ok(box fs as Box<rtio::RtioFileStream:Send>),
210 Err(e) => Err(uv_error_to_io_error(e))
211 }
212 }
213
214 fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> {
215 let r = FsRequest::unlink(&self.loop_, path);
216 r.map_err(uv_error_to_io_error)
217 }
218 fn fs_lstat(&mut self, path: &CString) -> Result<FileStat, IoError> {
219 let r = FsRequest::lstat(&self.loop_, path);
220 r.map_err(uv_error_to_io_error)
221 }
222 fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError> {
223 let r = FsRequest::stat(&self.loop_, path);
224 r.map_err(uv_error_to_io_error)
225 }
226 fn fs_mkdir(&mut self, path: &CString,
227 perm: io::FilePermission) -> Result<(), IoError> {
228 let r = FsRequest::mkdir(&self.loop_, path, perm.bits() as c_int);
229 r.map_err(uv_error_to_io_error)
230 }
231 fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> {
232 let r = FsRequest::rmdir(&self.loop_, path);
233 r.map_err(uv_error_to_io_error)
234 }
235 fn fs_rename(&mut self, path: &CString, to: &CString) -> Result<(), IoError> {
236 let r = FsRequest::rename(&self.loop_, path, to);
237 r.map_err(uv_error_to_io_error)
238 }
239 fn fs_chmod(&mut self, path: &CString,
240 perm: io::FilePermission) -> Result<(), IoError> {
241 let r = FsRequest::chmod(&self.loop_, path, perm.bits() as c_int);
242 r.map_err(uv_error_to_io_error)
243 }
244 fn fs_readdir(&mut self, path: &CString, flags: c_int)
245 -> Result<Vec<Path>, IoError>
246 {
247 let r = FsRequest::readdir(&self.loop_, path, flags);
248 r.map_err(uv_error_to_io_error)
249 }
250 fn fs_link(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
251 let r = FsRequest::link(&self.loop_, src, dst);
252 r.map_err(uv_error_to_io_error)
253 }
254 fn fs_symlink(&mut self, src: &CString, dst: &CString) -> Result<(), IoError> {
255 let r = FsRequest::symlink(&self.loop_, src, dst);
256 r.map_err(uv_error_to_io_error)
257 }
258 fn fs_chown(&mut self, path: &CString, uid: int, gid: int) -> Result<(), IoError> {
259 let r = FsRequest::chown(&self.loop_, path, uid, gid);
260 r.map_err(uv_error_to_io_error)
261 }
262 fn fs_readlink(&mut self, path: &CString) -> Result<Path, IoError> {
263 let r = FsRequest::readlink(&self.loop_, path);
264 r.map_err(uv_error_to_io_error)
265 }
266 fn fs_utime(&mut self, path: &CString, atime: u64, mtime: u64)
267 -> Result<(), IoError>
268 {
269 let r = FsRequest::utime(&self.loop_, path, atime, mtime);
270 r.map_err(uv_error_to_io_error)
271 }
272
273 fn spawn(&mut self, config: ProcessConfig)
274 -> Result<(Box<rtio::RtioProcess:Send>,
275 Vec<Option<Box<rtio::RtioPipe:Send>>>),
276 IoError>
277 {
278 match Process::spawn(self, config) {
279 Ok((p, io)) => {
280 Ok((p as Box<rtio::RtioProcess:Send>,
281 io.move_iter().map(|i| i.map(|p| {
282 box p as Box<rtio::RtioPipe:Send>
283 })).collect()))
284 }
285 Err(e) => Err(uv_error_to_io_error(e)),
286 }
287 }
288
289 fn kill(&mut self, pid: libc::pid_t, signum: int) -> Result<(), IoError> {
290 Process::kill(pid, signum).map_err(uv_error_to_io_error)
291 }
292
293 fn unix_bind(&mut self, path: &CString)
294 -> Result<Box<rtio::RtioUnixListener:Send>, IoError> {
295 match PipeListener::bind(self, path) {
296 Ok(p) => Ok(p as Box<rtio::RtioUnixListener:Send>),
297 Err(e) => Err(uv_error_to_io_error(e)),
298 }
299 }
300
301 fn unix_connect(&mut self, path: &CString, timeout: Option<u64>)
302 -> Result<Box<rtio::RtioPipe:Send>, IoError> {
303 match PipeWatcher::connect(self, path, timeout) {
304 Ok(p) => Ok(box p as Box<rtio::RtioPipe:Send>),
305 Err(e) => Err(uv_error_to_io_error(e)),
306 }
307 }
308
309 fn tty_open(&mut self, fd: c_int, readable: bool)
310 -> Result<Box<rtio::RtioTTY:Send>, IoError> {
311 match TtyWatcher::new(self, fd, readable) {
312 Ok(tty) => Ok(box tty as Box<rtio::RtioTTY:Send>),
313 Err(e) => Err(uv_error_to_io_error(e))
314 }
315 }
316
317 fn pipe_open(&mut self, fd: c_int)
318 -> Result<Box<rtio::RtioPipe:Send>, IoError> {
319 match PipeWatcher::open(self, fd) {
320 Ok(s) => Ok(box s as Box<rtio::RtioPipe:Send>),
321 Err(e) => Err(uv_error_to_io_error(e))
322 }
323 }
324
325 fn signal(&mut self, signum: Signum, channel: Sender<Signum>)
326 -> Result<Box<rtio::RtioSignal:Send>, IoError> {
327 match SignalWatcher::new(self, signum, channel) {
328 Ok(s) => Ok(s as Box<rtio::RtioSignal:Send>),
329 Err(e) => Err(uv_error_to_io_error(e)),
330 }
331 }
332 }
librustuv/uvio.rs:49:43-49:43 -struct- definition:
// Obviously an Event Loop is always home.
pub struct UvEventLoop {
uvio: UvIoFactory
references:- 557: let handle_pool = QueuePool::new(&mut loop_);
58: UvEventLoop {
59: uvio: UvIoFactory {
--
87: impl EventLoop for UvEventLoop {
88: fn run(&mut self) {
librustuv/uvio.rs:131:1-131:1 -struct- definition:
pub struct UvIoFactory {
pub loop_: Loop,
handle_pool: Option<Box<QueuePool>>,
references:- 2058: UvEventLoop {
59: uvio: UvIoFactory {
60: loop_: loop_,
--
148: impl IoFactory for UvIoFactory {
149: // Connect to an address and return a new stream
librustuv/file.rs:
39: impl FsRequest {
40: pub fn open(io: &mut UvIoFactory, path: &CString, flags: int, mode: int)
41: -> Result<FileWatcher, UvError>
--
359: impl FileWatcher {
360: pub fn new(io: &mut UvIoFactory, fd: c_int,
361: close: rtio::CloseBehavior) -> FileWatcher {
librustuv/net.rs:
352: impl TcpListener {
353: pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
354: -> Result<Box<TcpListener>, UvError> {
--
504: impl UdpWatcher {
505: pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
506: -> Result<UdpWatcher, UvError> {
librustuv/timer.rs:
34: impl TimerWatcher {
35: pub fn new(io: &mut UvIoFactory) -> Box<TimerWatcher> {
36: let handle = io.make_handle();
librustuv/process.rs:
41: /// occurred.
42: pub fn spawn(io_loop: &mut UvIoFactory, config: process::ProcessConfig)
43: -> Result<(Box<Process>, Vec<Option<PipeWatcher>>), UvError>
--
137: io: &process::StdioContainer,
138: io_loop: &mut UvIoFactory) -> Option<PipeWatcher> {
139: match *io {
librustuv/pipe.rs:
80: pub fn open(io: &mut UvIoFactory, file: libc::c_int)
81: -> Result<PipeWatcher, UvError>
--
225: impl PipeListener {
226: pub fn bind(io: &mut UvIoFactory, name: &CString)
227: -> Result<Box<PipeListener>, UvError>
librustuv/tty.rs:
29: impl TtyWatcher {
30: pub fn new(io: &mut UvIoFactory, fd: libc::c_int, readable: bool)
31: -> Result<TtyWatcher, UvError>
librustuv/signal.rs:
28: impl SignalWatcher {
29: pub fn new(io: &mut UvIoFactory, signum: Signum, channel: Sender<Signum>)
30: -> Result<Box<SignalWatcher>, UvError> {
librustuv/timeout.rs:
226: pub fn connect<T>(
227: mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
228: f: |&Request, &T, uvll::uv_connect_cb| -> c_int
librustuv/uvio.rs:
137: impl UvIoFactory {
138: pub fn uv_loop<'a>(&mut self) -> *uvll::uv_loop_t { self.loop_.handle }