(index<- ) ./libstd/io/net/unix.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12
13 Named pipes
14
15 This module contains the ability to communicate over named pipes with
16 synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
17 while on Unix it corresponds to UNIX domain sockets.
18
19 These pipes are similar to TCP in the sense that you can have both a stream to a
20 server and a server itself. The server provided accepts other `UnixStream`
21 instances as clients.
22
23 */
24
25 #![allow(missing_doc)]
26
27 use prelude::*;
28
29 use c_str::ToCStr;
30 use clone::Clone;
31 use io::{Listener, Acceptor, Reader, Writer, IoResult};
32 use kinds::Send;
33 use owned::Box;
34 use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
35 use rt::rtio::{RtioUnixAcceptor, RtioPipe};
36
37 /// A stream which communicates over a named pipe.
38 pub struct UnixStream {
39 obj: Box<RtioPipe:Send>,
40 }
41
42 impl UnixStream {
43 /// Connect to a pipe named by `path`. This will attempt to open a
44 /// connection to the underlying socket.
45 ///
46 /// The returned stream will be closed when the object falls out of scope.
47 ///
48 /// # Example
49 ///
50 /// ```rust
51 /// # #![allow(unused_must_use)]
52 /// use std::io::net::unix::UnixStream;
53 ///
54 /// let server = Path::new("path/to/my/socket");
55 /// let mut stream = UnixStream::connect(&server);
56 /// stream.write([1, 2, 3]);
57 /// ```
58 pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
59 LocalIo::maybe_raise(|io| {
60 io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
61 })
62 }
63
64 /// Connect to a pipe named by `path`, timing out if the specified number of
65 /// milliseconds.
66 ///
67 /// This function is similar to `connect`, except that if `timeout_ms`
68 /// elapses the function will return an error of kind `TimedOut`.
69 #[experimental = "the timeout argument is likely to change types"]
70 pub fn connect_timeout<P: ToCStr>(path: &P,
71 timeout_ms: u64) -> IoResult<UnixStream> {
72 LocalIo::maybe_raise(|io| {
73 let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms));
74 s.map(|p| UnixStream { obj: p })
75 })
76 }
77
78
79 /// Closes the reading half of this connection.
80 ///
81 /// This method will close the reading portion of this connection, causing
82 /// all pending and future reads to immediately return with an error.
83 ///
84 /// Note that this method affects all cloned handles associated with this
85 /// stream, not just this one handle.
86 pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() }
87
88 /// Closes the writing half of this connection.
89 ///
90 /// This method will close the writing portion of this connection, causing
91 /// all pending and future writes to immediately return with an error.
92 ///
93 /// Note that this method affects all cloned handles associated with this
94 /// stream, not just this one handle.
95 pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
96
97 /// Sets the read/write timeout for this socket.
98 ///
99 /// For more information, see `TcpStream::set_timeout`
100 #[experimental = "the timeout argument may change in type and value"]
101 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
102 self.obj.set_timeout(timeout_ms)
103 }
104
105 /// Sets the read timeout for this socket.
106 ///
107 /// For more information, see `TcpStream::set_timeout`
108 #[experimental = "the timeout argument may change in type and value"]
109 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
110 self.obj.set_read_timeout(timeout_ms)
111 }
112
113 /// Sets the write timeout for this socket.
114 ///
115 /// For more information, see `TcpStream::set_timeout`
116 #[experimental = "the timeout argument may change in type and value"]
117 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
118 self.obj.set_write_timeout(timeout_ms)
119 }
120 }
121
122 impl Clone for UnixStream {
123 fn clone(&self) -> UnixStream {
124 UnixStream { obj: self.obj.clone() }
125 }
126 }
127
128 impl Reader for UnixStream {
129 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
130 }
131
132 impl Writer for UnixStream {
133 fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) }
134 }
135
136 /// A value that can listen for incoming named pipe connection requests.
137 pub struct UnixListener {
138 /// The internal, opaque runtime Unix listener.
139 obj: Box<RtioUnixListener:Send>,
140 }
141
142 impl UnixListener {
143
144 /// Creates a new listener, ready to receive incoming connections on the
145 /// specified socket. The server will be named by `path`.
146 ///
147 /// This listener will be closed when it falls out of scope.
148 ///
149 /// # Example
150 ///
151 /// ```
152 /// # fn main() {}
153 /// # fn foo() {
154 /// # #![allow(unused_must_use)]
155 /// use std::io::net::unix::UnixListener;
156 /// use std::io::{Listener, Acceptor};
157 ///
158 /// let server = Path::new("/path/to/my/socket");
159 /// let stream = UnixListener::bind(&server);
160 /// for mut client in stream.listen().incoming() {
161 /// client.write([1, 2, 3, 4]);
162 /// }
163 /// # }
164 /// ```
165 pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> {
166 LocalIo::maybe_raise(|io| {
167 io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s })
168 })
169 }
170 }
171
172 impl Listener<UnixStream, UnixAcceptor> for UnixListener {
173 fn listen(self) -> IoResult<UnixAcceptor> {
174 self.obj.listen().map(|obj| UnixAcceptor { obj: obj })
175 }
176 }
177
178 /// A value that can accept named pipe connections, returned from `listen()`.
179 pub struct UnixAcceptor {
180 /// The internal, opaque runtime Unix acceptor.
181 obj: Box<RtioUnixAcceptor:Send>,
182 }
183
184 impl UnixAcceptor {
185 /// Sets a timeout for this acceptor, after which accept() will no longer
186 /// block indefinitely.
187 ///
188 /// The argument specified is the amount of time, in milliseconds, into the
189 /// future after which all invocations of accept() will not block (and any
190 /// pending invocation will return). A value of `None` will clear any
191 /// existing timeout.
192 ///
193 /// When using this method, it is likely necessary to reset the timeout as
194 /// appropriate, the timeout specified is specific to this object, not
195 /// specific to the next request.
196 #[experimental = "the name and arguments to this function are likely \
197 to change"]
198 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
199 self.obj.set_timeout(timeout_ms)
200 }
201 }
202
203 impl Acceptor<UnixStream> for UnixAcceptor {
204 fn accept(&mut self) -> IoResult<UnixStream> {
205 self.obj.accept().map(|s| UnixStream { obj: s })
206 }
207 }
208
209 #[cfg(test)]
210 #[allow(experimental)]
211 mod tests {
212 use prelude::*;
213 use super::*;
214 use io::*;
215 use io::test::*;
216
217 pub fn smalltest(server: proc(UnixStream):Send, client: proc(UnixStream):Send) {
218 let path1 = next_test_unix();
219 let path2 = path1.clone();
220
221 let mut acceptor = UnixListener::bind(&path1).listen();
222
223 spawn(proc() {
224 match UnixStream::connect(&path2) {
225 Ok(c) => client(c),
226 Err(e) => fail!("failed connect: {}", e),
227 }
228 });
229
230 match acceptor.accept() {
231 Ok(c) => server(c),
232 Err(e) => fail!("failed accept: {}", e),
233 }
234 }
235
236 iotest!(fn bind_error() {
237 let path = "path/to/nowhere";
238 match UnixListener::bind(&path) {
239 Ok(..) => fail!(),
240 Err(e) => {
241 assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
242 e.kind == InvalidInput);
243 }
244 }
245 })
246
247 iotest!(fn connect_error() {
248 let path = if cfg!(windows) {
249 r"\\.\pipe\this_should_not_exist_ever"
250 } else {
251 "path/to/nowhere"
252 };
253 match UnixStream::connect(&path) {
254 Ok(..) => fail!(),
255 Err(e) => {
256 assert!(e.kind == FileNotFound || e.kind == OtherIoError);
257 }
258 }
259 })
260
261 iotest!(fn smoke() {
262 smalltest(proc(mut server) {
263 let mut buf = [0];
264 server.read(buf).unwrap();
265 assert!(buf[0] == 99);
266 }, proc(mut client) {
267 client.write([99]).unwrap();
268 })
269 })
270
271 iotest!(fn read_eof() {
272 smalltest(proc(mut server) {
273 let mut buf = [0];
274 assert!(server.read(buf).is_err());
275 assert!(server.read(buf).is_err());
276 }, proc(_client) {
277 // drop the client
278 })
279 } #[ignore(cfg(windows))]) // FIXME(#12516)
280
281 iotest!(fn write_begone() {
282 smalltest(proc(mut server) {
283 let buf = [0];
284 loop {
285 match server.write(buf) {
286 Ok(..) => {}
287 Err(e) => {
288 assert!(e.kind == BrokenPipe ||
289 e.kind == NotConnected ||
290 e.kind == ConnectionReset,
291 "unknown error {:?}", e);
292 break;
293 }
294 }
295 }
296 }, proc(_client) {
297 // drop the client
298 })
299 })
300
301 iotest!(fn accept_lots() {
302 let times = 10;
303 let path1 = next_test_unix();
304 let path2 = path1.clone();
305
306 let mut acceptor = match UnixListener::bind(&path1).listen() {
307 Ok(a) => a,
308 Err(e) => fail!("failed listen: {}", e),
309 };
310
311 spawn(proc() {
312 for _ in range(0, times) {
313 let mut stream = UnixStream::connect(&path2);
314 match stream.write([100]) {
315 Ok(..) => {}
316 Err(e) => fail!("failed write: {}", e)
317 }
318 }
319 });
320
321 for _ in range(0, times) {
322 let mut client = acceptor.accept();
323 let mut buf = [0];
324 match client.read(buf) {
325 Ok(..) => {}
326 Err(e) => fail!("failed read/accept: {}", e),
327 }
328 assert_eq!(buf[0], 100);
329 }
330 })
331
332 #[cfg(unix)]
333 iotest!(fn path_exists() {
334 let path = next_test_unix();
335 let _acceptor = UnixListener::bind(&path).listen();
336 assert!(path.exists());
337 })
338
339 iotest!(fn unix_clone_smoke() {
340 let addr = next_test_unix();
341 let mut acceptor = UnixListener::bind(&addr).listen();
342
343 spawn(proc() {
344 let mut s = UnixStream::connect(&addr);
345 let mut buf = [0, 0];
346 debug!("client reading");
347 assert_eq!(s.read(buf), Ok(1));
348 assert_eq!(buf[0], 1);
349 debug!("client writing");
350 s.write([2]).unwrap();
351 debug!("client dropping");
352 });
353
354 let mut s1 = acceptor.accept().unwrap();
355 let s2 = s1.clone();
356
357 let (tx1, rx1) = channel();
358 let (tx2, rx2) = channel();
359 spawn(proc() {
360 let mut s2 = s2;
361 rx1.recv();
362 debug!("writer writing");
363 s2.write([1]).unwrap();
364 debug!("writer done");
365 tx2.send(());
366 });
367 tx1.send(());
368 let mut buf = [0, 0];
369 debug!("reader reading");
370 assert_eq!(s1.read(buf), Ok(1));
371 debug!("reader done");
372 rx2.recv();
373 })
374
375 iotest!(fn unix_clone_two_read() {
376 let addr = next_test_unix();
377 let mut acceptor = UnixListener::bind(&addr).listen();
378 let (tx1, rx) = channel();
379 let tx2 = tx1.clone();
380
381 spawn(proc() {
382 let mut s = UnixStream::connect(&addr);
383 s.write([1]).unwrap();
384 rx.recv();
385 s.write([2]).unwrap();
386 rx.recv();
387 });
388
389 let mut s1 = acceptor.accept().unwrap();
390 let s2 = s1.clone();
391
392 let (done, rx) = channel();
393 spawn(proc() {
394 let mut s2 = s2;
395 let mut buf = [0, 0];
396 s2.read(buf).unwrap();
397 tx2.send(());
398 done.send(());
399 });
400 let mut buf = [0, 0];
401 s1.read(buf).unwrap();
402 tx1.send(());
403
404 rx.recv();
405 })
406
407 iotest!(fn unix_clone_two_write() {
408 let addr = next_test_unix();
409 let mut acceptor = UnixListener::bind(&addr).listen();
410
411 spawn(proc() {
412 let mut s = UnixStream::connect(&addr);
413 let mut buf = [0, 1];
414 s.read(buf).unwrap();
415 s.read(buf).unwrap();
416 });
417
418 let mut s1 = acceptor.accept().unwrap();
419 let s2 = s1.clone();
420
421 let (tx, rx) = channel();
422 spawn(proc() {
423 let mut s2 = s2;
424 s2.write([1]).unwrap();
425 tx.send(());
426 });
427 s1.write([2]).unwrap();
428
429 rx.recv();
430 })
431
432 iotest!(fn drop_removes_listener_path() {
433 let path = next_test_unix();
434 let l = UnixListener::bind(&path).unwrap();
435 assert!(path.exists());
436 drop(l);
437 assert!(!path.exists());
438 } #[cfg(not(windows))])
439
440 iotest!(fn drop_removes_acceptor_path() {
441 let path = next_test_unix();
442 let l = UnixListener::bind(&path).unwrap();
443 assert!(path.exists());
444 drop(l.listen().unwrap());
445 assert!(!path.exists());
446 } #[cfg(not(windows))])
447
448 iotest!(fn accept_timeout() {
449 let addr = next_test_unix();
450 let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
451
452 a.set_timeout(Some(10));
453
454 // Make sure we time out once and future invocations also time out
455 let err = a.accept().err().unwrap();
456 assert_eq!(err.kind, TimedOut);
457 let err = a.accept().err().unwrap();
458 assert_eq!(err.kind, TimedOut);
459
460 // Also make sure that even though the timeout is expired that we will
461 // continue to receive any pending connections.
462 let (tx, rx) = channel();
463 let addr2 = addr.clone();
464 spawn(proc() {
465 tx.send(UnixStream::connect(&addr2).unwrap());
466 });
467 let l = rx.recv();
468 for i in range(0, 1001) {
469 match a.accept() {
470 Ok(..) => break,
471 Err(ref e) if e.kind == TimedOut => {}
472 Err(e) => fail!("error: {}", e),
473 }
474 ::task::deschedule();
475 if i == 1000 { fail!("should have a pending connection") }
476 }
477 drop(l);
478
479 // Unset the timeout and make sure that this always blocks.
480 a.set_timeout(None);
481 let addr2 = addr.clone();
482 spawn(proc() {
483 drop(UnixStream::connect(&addr2).unwrap());
484 });
485 a.accept().unwrap();
486 })
487
488 iotest!(fn connect_timeout_error() {
489 let addr = next_test_unix();
490 assert!(UnixStream::connect_timeout(&addr, 100).is_err());
491 })
492
493 iotest!(fn connect_timeout_success() {
494 let addr = next_test_unix();
495 let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
496 assert!(UnixStream::connect_timeout(&addr, 100).is_ok());
497 })
498
499 iotest!(fn close_readwrite_smoke() {
500 let addr = next_test_unix();
501 let a = UnixListener::bind(&addr).listen().unwrap();
502 let (_tx, rx) = channel::<()>();
503 spawn(proc() {
504 let mut a = a;
505 let _s = a.accept().unwrap();
506 let _ = rx.recv_opt();
507 });
508
509 let mut b = [0];
510 let mut s = UnixStream::connect(&addr).unwrap();
511 let mut s2 = s.clone();
512
513 // closing should prevent reads/writes
514 s.close_write().unwrap();
515 assert!(s.write([0]).is_err());
516 s.close_read().unwrap();
517 assert!(s.read(b).is_err());
518
519 // closing should affect previous handles
520 assert!(s2.write([0]).is_err());
521 assert!(s2.read(b).is_err());
522
523 // closing should affect new handles
524 let mut s3 = s.clone();
525 assert!(s3.write([0]).is_err());
526 assert!(s3.read(b).is_err());
527
528 // make sure these don't die
529 let _ = s2.close_read();
530 let _ = s2.close_write();
531 let _ = s3.close_read();
532 let _ = s3.close_write();
533 })
534
535 iotest!(fn close_read_wakes_up() {
536 let addr = next_test_unix();
537 let a = UnixListener::bind(&addr).listen().unwrap();
538 let (_tx, rx) = channel::<()>();
539 spawn(proc() {
540 let mut a = a;
541 let _s = a.accept().unwrap();
542 let _ = rx.recv_opt();
543 });
544
545 let mut s = UnixStream::connect(&addr).unwrap();
546 let s2 = s.clone();
547 let (tx, rx) = channel();
548 spawn(proc() {
549 let mut s2 = s2;
550 assert!(s2.read([0]).is_err());
551 tx.send(());
552 });
553 // this should wake up the child task
554 s.close_read().unwrap();
555
556 // this test will never finish if the child doesn't wake up
557 rx.recv();
558 })
559
560 iotest!(fn readwrite_timeouts() {
561 let addr = next_test_unix();
562 let mut a = UnixListener::bind(&addr).listen().unwrap();
563 let (tx, rx) = channel::<()>();
564 spawn(proc() {
565 let mut s = UnixStream::connect(&addr).unwrap();
566 rx.recv();
567 assert!(s.write([0]).is_ok());
568 let _ = rx.recv_opt();
569 });
570
571 let mut s = a.accept().unwrap();
572 s.set_timeout(Some(20));
573 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
574 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
575
576 s.set_timeout(Some(20));
577 for i in range(0, 1001) {
578 match s.write([0, .. 128 * 1024]) {
579 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
580 Err(IoError { kind: TimedOut, .. }) => break,
581 Err(e) => fail!("{}", e),
582 }
583 if i == 1000 { fail!("should have filled up?!"); }
584 }
585
586 // I'm not sure as to why, but apparently the write on windows always
587 // succeeds after the previous timeout. Who knows?
588 if !cfg!(windows) {
589 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
590 }
591
592 tx.send(());
593 s.set_timeout(None);
594 assert_eq!(s.read([0, 0]), Ok(1));
595 })
596
597 iotest!(fn read_timeouts() {
598 let addr = next_test_unix();
599 let mut a = UnixListener::bind(&addr).listen().unwrap();
600 let (tx, rx) = channel::<()>();
601 spawn(proc() {
602 let mut s = UnixStream::connect(&addr).unwrap();
603 rx.recv();
604 let mut amt = 0;
605 while amt < 100 * 128 * 1024 {
606 match s.read([0, ..128 * 1024]) {
607 Ok(n) => { amt += n; }
608 Err(e) => fail!("{}", e),
609 }
610 }
611 let _ = rx.recv_opt();
612 });
613
614 let mut s = a.accept().unwrap();
615 s.set_read_timeout(Some(20));
616 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
617 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
618
619 tx.send(());
620 for _ in range(0, 100) {
621 assert!(s.write([0, ..128 * 1024]).is_ok());
622 }
623 })
624
625 iotest!(fn write_timeouts() {
626 let addr = next_test_unix();
627 let mut a = UnixListener::bind(&addr).listen().unwrap();
628 let (tx, rx) = channel::<()>();
629 spawn(proc() {
630 let mut s = UnixStream::connect(&addr).unwrap();
631 rx.recv();
632 assert!(s.write([0]).is_ok());
633 let _ = rx.recv_opt();
634 });
635
636 let mut s = a.accept().unwrap();
637 s.set_write_timeout(Some(20));
638 for i in range(0, 1001) {
639 match s.write([0, .. 128 * 1024]) {
640 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
641 Err(IoError { kind: TimedOut, .. }) => break,
642 Err(e) => fail!("{}", e),
643 }
644 if i == 1000 { fail!("should have filled up?!"); }
645 }
646
647 tx.send(());
648 assert!(s.read([0]).is_ok());
649 })
650
651 iotest!(fn timeout_concurrent_read() {
652 let addr = next_test_unix();
653 let mut a = UnixListener::bind(&addr).listen().unwrap();
654 let (tx, rx) = channel::<()>();
655 spawn(proc() {
656 let mut s = UnixStream::connect(&addr).unwrap();
657 rx.recv();
658 assert!(s.write([0]).is_ok());
659 let _ = rx.recv_opt();
660 });
661
662 let mut s = a.accept().unwrap();
663 let s2 = s.clone();
664 let (tx2, rx2) = channel();
665 spawn(proc() {
666 let mut s2 = s2;
667 assert!(s2.read([0]).is_ok());
668 tx2.send(());
669 });
670
671 s.set_read_timeout(Some(20));
672 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
673 tx.send(());
674
675 rx2.recv();
676 })
677 }