(index<- ) ./librustuv/pipe.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use libc;
12 use std::c_str::CString;
13 use std::cast;
14 use std::io::IoError;
15 use std::io;
16 use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
17 use std::rt::task::BlockedTask;
18
19 use homing::{HomingIO, HomeHandle};
20 use net;
21 use rc::Refcount;
22 use stream::StreamWatcher;
23 use super::{Loop, UvError, UvHandle, uv_error_to_io_error};
24 use timeout::{AcceptTimeout, ConnectCtx, AccessTimeout};
25 use uvio::UvIoFactory;
26 use uvll;
27
28 pub struct PipeWatcher {
29 stream: StreamWatcher,
30 home: HomeHandle,
31 defused: bool,
32 refcount: Refcount,
33
34 // see comments in TcpWatcher for why these exist
35 write_access: AccessTimeout,
36 read_access: AccessTimeout,
37 }
38
39 pub struct PipeListener {
40 home: HomeHandle,
41 pipe: *uvll::uv_pipe_t,
42 outgoing: Sender<Result<Box<RtioPipe:Send>, IoError>>,
43 incoming: Receiver<Result<Box<RtioPipe:Send>, IoError>>,
44 }
45
46 pub struct PipeAcceptor {
47 listener: Box<PipeListener>,
48 timeout: AcceptTimeout,
49 }
50
51 // PipeWatcher implementation and traits
52
53 impl PipeWatcher {
54 // Creates an uninitialized pipe watcher. The underlying uv pipe is ready to
55 // get bound to some other source (this is normally a helper method paired
56 // with another call).
57 pub fn new(io: &mut UvIoFactory, ipc: bool) -> PipeWatcher {
58 let home = io.make_handle();
59 PipeWatcher::new_home(&io.loop_, home, ipc)
60 }
61
62 pub fn new_home(loop_: &Loop, home: HomeHandle, ipc: bool) -> PipeWatcher {
63 let handle = unsafe {
64 let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
65 assert!(!handle.is_null());
66 let ipc = ipc as libc::c_int;
67 assert_eq!(uvll::uv_pipe_init(loop_.handle, handle, ipc), 0);
68 handle
69 };
70 PipeWatcher {
71 stream: StreamWatcher::new(handle),
72 home: home,
73 defused: false,
74 refcount: Refcount::new(),
75 read_access: AccessTimeout::new(),
76 write_access: AccessTimeout::new(),
77 }
78 }
79
80 pub fn open(io: &mut UvIoFactory, file: libc::c_int)
81 -> Result<PipeWatcher, UvError>
82 {
83 let pipe = PipeWatcher::new(io, false);
84 match unsafe { uvll::uv_pipe_open(pipe.handle(), file) } {
85 0 => Ok(pipe),
86 n => Err(UvError(n))
87 }
88 }
89
90 pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option<u64>)
91 -> Result<PipeWatcher, UvError>
92 {
93 let pipe = PipeWatcher::new(io, false);
94 let cx = ConnectCtx { status: -1, task: None, timer: None };
95 cx.connect(pipe, timeout, io, |req, pipe, cb| {
96 unsafe {
97 uvll::uv_pipe_connect(req.handle, pipe.handle(),
98 name.with_ref(|p| p), cb)
99 }
100 0
101 })
102 }
103
104 pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
105
106 // Unwraps the underlying uv pipe. This cancels destruction of the pipe and
107 // allows the pipe to get moved elsewhere
108 fn unwrap(mut self) -> *uvll::uv_pipe_t {
109 self.defused = true;
110 return self.stream.handle;
111 }
112 }
113
114 impl RtioPipe for PipeWatcher {
115 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
116 let m = self.fire_homing_missile();
117 let guard = try!(self.read_access.grant(m));
118
119 // see comments in close_read about this check
120 if guard.access.is_closed() {
121 return Err(io::standard_error(io::EndOfFile))
122 }
123
124 self.stream.read(buf).map_err(uv_error_to_io_error)
125 }
126
127 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
128 let m = self.fire_homing_missile();
129 let guard = try!(self.write_access.grant(m));
130 self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
131 }
132
133 fn clone(&self) -> Box<RtioPipe:Send> {
134 box PipeWatcher {
135 stream: StreamWatcher::new(self.stream.handle),
136 defused: false,
137 home: self.home.clone(),
138 refcount: self.refcount.clone(),
139 read_access: self.read_access.clone(),
140 write_access: self.write_access.clone(),
141 } as Box<RtioPipe:Send>
142 }
143
144 fn close_read(&mut self) -> Result<(), IoError> {
145 // The current uv_shutdown method only shuts the writing half of the
146 // connection, and no method is provided to shut down the reading half
147 // of the connection. With a lack of method, we emulate shutting down
148 // the reading half of the connection by manually returning early from
149 // all future calls to `read`.
150 //
151 // Note that we must be careful to ensure that *all* cloned handles see
152 // the closing of the read half, so we stored the "is closed" bit in the
153 // Access struct, not in our own personal watcher. Additionally, the
154 // homing missile is used as a locking mechanism to ensure there is no
155 // contention over this bit.
156 //
157 // To shutdown the read half, we must first flag the access as being
158 // closed, and then afterwards we cease any pending read. Note that this
159 // ordering is crucial because we could in theory be rescheduled during
160 // the uv_read_stop which means that another read invocation could leak
161 // in before we set the flag.
162 let task = {
163 let m = self.fire_homing_missile();
164 self.read_access.access.close(&m);
165 self.stream.cancel_read(uvll::EOF as libc::ssize_t)
166 };
167 let _ = task.map(|t| t.reawaken());
168 Ok(())
169 }
170
171 fn close_write(&mut self) -> Result<(), IoError> {
172 let _m = self.fire_homing_missile();
173 net::shutdown(self.stream.handle, &self.uv_loop())
174 }
175
176 fn set_timeout(&mut self, timeout: Option<u64>) {
177 self.set_read_timeout(timeout);
178 self.set_write_timeout(timeout);
179 }
180
181 fn set_read_timeout(&mut self, ms: Option<u64>) {
182 let _m = self.fire_homing_missile();
183 let loop_ = self.uv_loop();
184 self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
185 &self.stream as *_ as uint);
186
187 fn cancel_read(stream: uint) -> Option<BlockedTask> {
188 let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
189 stream.cancel_read(uvll::ECANCELED as libc::ssize_t)
190 }
191 }
192
193 fn set_write_timeout(&mut self, ms: Option<u64>) {
194 let _m = self.fire_homing_missile();
195 let loop_ = self.uv_loop();
196 self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
197 &self.stream as *_ as uint);
198
199 fn cancel_write(stream: uint) -> Option<BlockedTask> {
200 let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
201 stream.cancel_write()
202 }
203 }
204 }
205
206 impl HomingIO for PipeWatcher {
207 fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
208 }
209
210 impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
211 fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
212 }
213
214 impl Drop for PipeWatcher {
215 fn drop(&mut self) {
216 let _m = self.fire_homing_missile();
217 if !self.defused && self.refcount.decrement() {
218 self.close();
219 }
220 }
221 }
222
223 // PipeListener implementation and traits
224
225 impl PipeListener {
226 pub fn bind(io: &mut UvIoFactory, name: &CString)
227 -> Result<Box<PipeListener>, UvError>
228 {
229 let pipe = PipeWatcher::new(io, false);
230 match unsafe {
231 uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
232 } {
233 0 => {
234 // If successful, unwrap the PipeWatcher because we control how
235 // we close the pipe differently. We can't rely on
236 // StreamWatcher's default close method.
237 let (tx, rx) = channel();
238 let p = box PipeListener {
239 home: io.make_handle(),
240 pipe: pipe.unwrap(),
241 incoming: rx,
242 outgoing: tx,
243 };
244 Ok(p.install())
245 }
246 n => Err(UvError(n))
247 }
248 }
249 }
250
251 impl RtioUnixListener for PipeListener {
252 fn listen(~self) -> Result<Box<RtioUnixAcceptor:Send>, IoError> {
253 // create the acceptor object from ourselves
254 let mut acceptor = box PipeAcceptor {
255 listener: self,
256 timeout: AcceptTimeout::new(),
257 };
258
259 let _m = acceptor.fire_homing_missile();
260 // FIXME: the 128 backlog should be configurable
261 match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
262 0 => Ok(acceptor as Box<RtioUnixAcceptor:Send>),
263 n => Err(uv_error_to_io_error(UvError(n))),
264 }
265 }
266 }
267
268 impl HomingIO for PipeListener {
269 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
270 }
271
272 impl UvHandle<uvll::uv_pipe_t> for PipeListener {
273 fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
274 }
275
276 extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
277 assert!(status != uvll::ECANCELED);
278
279 let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
280 let msg = match status {
281 0 => {
282 let loop_ = Loop::wrap(unsafe {
283 uvll::get_loop_for_uv_handle(server)
284 });
285 let client = PipeWatcher::new_home(&loop_, pipe.home().clone(), false);
286 assert_eq!(unsafe { uvll::uv_accept(server, client.handle()) }, 0);
287 Ok(box client as Box<RtioPipe:Send>)
288 }
289 n => Err(uv_error_to_io_error(UvError(n)))
290 };
291 pipe.outgoing.send(msg);
292 }
293
294 impl Drop for PipeListener {
295 fn drop(&mut self) {
296 let _m = self.fire_homing_missile();
297 self.close();
298 }
299 }
300
301 // PipeAcceptor implementation and traits
302
303 impl RtioUnixAcceptor for PipeAcceptor {
304 fn accept(&mut self) -> Result<Box<RtioPipe:Send>, IoError> {
305 self.timeout.accept(&self.listener.incoming)
306 }
307
308 fn set_timeout(&mut self, timeout_ms: Option<u64>) {
309 match timeout_ms {
310 None => self.timeout.clear(),
311 Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
312 }
313 }
314 }
315
316 impl HomingIO for PipeAcceptor {
317 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
318 }
319
320 #[cfg(test)]
321 mod tests {
322 use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
323 use std::io::test::next_test_unix;
324
325 use super::{PipeWatcher, PipeListener};
326 use super::super::local_loop;
327
328 #[test]
329 fn connect_err() {
330 match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(),
331 None) {
332 Ok(..) => fail!(),
333 Err(..) => {}
334 }
335 }
336
337 #[test]
338 fn bind_err() {
339 match PipeListener::bind(local_loop(), &"path/to/nowhere".to_c_str()) {
340 Ok(..) => fail!(),
341 Err(e) => assert_eq!(e.name(), "EACCES".to_owned()),
342 }
343 }
344
345 #[test]
346 fn bind() {
347 let p = next_test_unix().to_c_str();
348 match PipeListener::bind(local_loop(), &p) {
349 Ok(..) => {}
350 Err(..) => fail!(),
351 }
352 }
353
354 #[test] #[should_fail]
355 fn bind_fail() {
356 let p = next_test_unix().to_c_str();
357 let _w = PipeListener::bind(local_loop(), &p).unwrap();
358 fail!();
359 }
360
361 #[test]
362 fn connect() {
363 let path = next_test_unix();
364 let path2 = path.clone();
365 let (tx, rx) = channel();
366
367 spawn(proc() {
368 let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
369 let mut p = p.listen().unwrap();
370 tx.send(());
371 let mut client = p.accept().unwrap();
372 let mut buf = [0];
373 assert!(client.read(buf).unwrap() == 1);
374 assert_eq!(buf[0], 1);
375 assert!(client.write([2]).is_ok());
376 });
377 rx.recv();
378 let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
379 assert!(c.write([1]).is_ok());
380 let mut buf = [0];
381 assert!(c.read(buf).unwrap() == 1);
382 assert_eq!(buf[0], 2);
383 }
384
385 #[test] #[should_fail]
386 fn connect_fail() {
387 let path = next_test_unix();
388 let path2 = path.clone();
389 let (tx, rx) = channel();
390
391 spawn(proc() {
392 let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
393 let mut p = p.listen().unwrap();
394 tx.send(());
395 drop(p.accept().unwrap());
396 });
397 rx.recv();
398 let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap();
399 fail!()
400
401 }
402 }
librustuv/pipe.rs:27:1-27:1 -struct- definition:
pub struct PipeWatcher {
stream: StreamWatcher,
home: HomeHandle,
references:- 1369: };
70: PipeWatcher {
71: stream: StreamWatcher::new(handle),
--
133: fn clone(&self) -> Box<RtioPipe:Send> {
134: box PipeWatcher {
135: stream: StreamWatcher::new(self.stream.handle),
--
206: impl HomingIO for PipeWatcher {
207: fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
--
214: impl Drop for PipeWatcher {
215: fn drop(&mut self) {
librustuv/process.rs:
137: io: &process::StdioContainer,
138: io_loop: &mut UvIoFactory) -> Option<PipeWatcher> {
139: match *io {
librustuv/pipe.rs:
210: impl UvHandle<uvll::uv_pipe_t> for PipeWatcher {
211: fn uv_handle(&self) -> *uvll::uv_pipe_t { self.stream.handle }
librustuv/pipe.rs:45:1-45:1 -struct- definition:
pub struct PipeAcceptor {
listener: Box<PipeListener>,
timeout: AcceptTimeout,
references:- 3253: // create the acceptor object from ourselves
254: let mut acceptor = box PipeAcceptor {
255: listener: self,
--
303: impl RtioUnixAcceptor for PipeAcceptor {
304: fn accept(&mut self) -> Result<Box<RtioPipe:Send>, IoError> {
--
316: impl HomingIO for PipeAcceptor {
317: fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
librustuv/pipe.rs:38:1-38:1 -struct- definition:
pub struct PipeListener {
home: HomeHandle,
pipe: *uvll::uv_pipe_t,
references:- 9237: let (tx, rx) = channel();
238: let p = box PipeListener {
239: home: io.make_handle(),
--
272: impl UvHandle<uvll::uv_pipe_t> for PipeListener {
273: fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
--
294: impl Drop for PipeListener {
295: fn drop(&mut self) {