(index<- )        ./libstd/io/net/unix.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  /*!
  12  
  13  Named pipes
  14  
  15  This module contains the ability to communicate over named pipes with
  16  synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
  17  while on Unix it corresponds to UNIX domain sockets.
  18  
  19  These pipes are similar to TCP in the sense that you can have both a stream to a
  20  server and a server itself. The server provided accepts other `UnixStream`
  21  instances as clients.
  22  
  23  */
  24  
  25  #![allow(missing_doc)]
  26  
  27  use prelude::*;
  28  
  29  use c_str::ToCStr;
  30  use clone::Clone;
  31  use io::{Listener, Acceptor, Reader, Writer, IoResult};
  32  use kinds::Send;
  33  use owned::Box;
  34  use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
  35  use rt::rtio::{RtioUnixAcceptor, RtioPipe};
  36  
  37  /// A stream which communicates over a named pipe.
  38  pub struct UnixStream {
  39      obj: Box<RtioPipe:Send>,
  40  }
  41  
  42  impl UnixStream {
  43      /// Connect to a pipe named by `path`. This will attempt to open a
  44      /// connection to the underlying socket.
  45      ///
  46      /// The returned stream will be closed when the object falls out of scope.
  47      ///
  48      /// # Example
  49      ///
  50      /// ```rust
  51      /// # #![allow(unused_must_use)]
  52      /// use std::io::net::unix::UnixStream;
  53      ///
  54      /// let server = Path::new("path/to/my/socket");
  55      /// let mut stream = UnixStream::connect(&server);
  56      /// stream.write([1, 2, 3]);
  57      /// ```
  58      pub fn connect<P: ToCStr>(path&P) -> IoResult<UnixStream> {
  59          LocalIo::maybe_raise(|io| {
  60              io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
  61          })
  62      }
  63  
  64      /// Connect to a pipe named by `path`, timing out if the specified number of
  65      /// milliseconds.
  66      ///
  67      /// This function is similar to `connect`, except that if `timeout_ms`
  68      /// elapses the function will return an error of kind `TimedOut`.
  69      #[experimental = "the timeout argument is likely to change types"]
  70      pub fn connect_timeout<P: ToCStr>(path&P,
  71                                        timeout_msu64) -> IoResult<UnixStream> {
  72          LocalIo::maybe_raise(|io| {
  73              let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms));
  74              s.map(|p| UnixStream { obj: p })
  75          })
  76      }
  77  
  78  
  79      /// Closes the reading half of this connection.
  80      ///
  81      /// This method will close the reading portion of this connection, causing
  82      /// all pending and future reads to immediately return with an error.
  83      ///
  84      /// Note that this method affects all cloned handles associated with this
  85      /// stream, not just this one handle.
  86      pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() }
  87  
  88      /// Closes the writing half of this connection.
  89      ///
  90      /// This method will close the writing portion of this connection, causing
  91      /// all pending and future writes to immediately return with an error.
  92      ///
  93      /// Note that this method affects all cloned handles associated with this
  94      /// stream, not just this one handle.
  95      pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
  96  
  97      /// Sets the read/write timeout for this socket.
  98      ///
  99      /// For more information, see `TcpStream::set_timeout`
 100      #[experimental = "the timeout argument may change in type and value"]
 101      pub fn set_timeout(&mut self, timeout_msOption<u64>) {
 102          self.obj.set_timeout(timeout_ms)
 103      }
 104  
 105      /// Sets the read timeout for this socket.
 106      ///
 107      /// For more information, see `TcpStream::set_timeout`
 108      #[experimental = "the timeout argument may change in type and value"]
 109      pub fn set_read_timeout(&mut self, timeout_msOption<u64>) {
 110          self.obj.set_read_timeout(timeout_ms)
 111      }
 112  
 113      /// Sets the write timeout for this socket.
 114      ///
 115      /// For more information, see `TcpStream::set_timeout`
 116      #[experimental = "the timeout argument may change in type and value"]
 117      pub fn set_write_timeout(&mut self, timeout_msOption<u64>) {
 118          self.obj.set_write_timeout(timeout_ms)
 119      }
 120  }
 121  
 122  impl Clone for UnixStream {
 123      fn clone(&self) -> UnixStream {
 124          UnixStream { obj: self.obj.clone() }
 125      }
 126  }
 127  
 128  impl Reader for UnixStream {
 129      fn read(&mut self, buf&mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
 130  }
 131  
 132  impl Writer for UnixStream {
 133      fn write(&mut self, buf&[u8]) -> IoResult<()> { self.obj.write(buf) }
 134  }
 135  
 136  /// A value that can listen for incoming named pipe connection requests.
 137  pub struct UnixListener {
 138      /// The internal, opaque runtime Unix listener.
 139      obj: Box<RtioUnixListener:Send>,
 140  }
 141  
 142  impl UnixListener {
 143  
 144      /// Creates a new listener, ready to receive incoming connections on the
 145      /// specified socket. The server will be named by `path`.
 146      ///
 147      /// This listener will be closed when it falls out of scope.
 148      ///
 149      /// # Example
 150      ///
 151      /// ```
 152      /// # fn main() {}
 153      /// # fn foo() {
 154      /// # #![allow(unused_must_use)]
 155      /// use std::io::net::unix::UnixListener;
 156      /// use std::io::{Listener, Acceptor};
 157      ///
 158      /// let server = Path::new("/path/to/my/socket");
 159      /// let stream = UnixListener::bind(&server);
 160      /// for mut client in stream.listen().incoming() {
 161      ///     client.write([1, 2, 3, 4]);
 162      /// }
 163      /// # }
 164      /// ```
 165      pub fn bind<P: ToCStr>(path&P) -> IoResult<UnixListener> {
 166          LocalIo::maybe_raise(|io| {
 167              io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s })
 168          })
 169      }
 170  }
 171  
 172  impl Listener<UnixStream, UnixAcceptor> for UnixListener {
 173      fn listen(self) -> IoResult<UnixAcceptor> {
 174          self.obj.listen().map(|obj| UnixAcceptor { obj: obj })
 175      }
 176  }
 177  
 178  /// A value that can accept named pipe connections, returned from `listen()`.
 179  pub struct UnixAcceptor {
 180      /// The internal, opaque runtime Unix acceptor.
 181      obj: Box<RtioUnixAcceptor:Send>,
 182  }
 183  
 184  impl UnixAcceptor {
 185      /// Sets a timeout for this acceptor, after which accept() will no longer
 186      /// block indefinitely.
 187      ///
 188      /// The argument specified is the amount of time, in milliseconds, into the
 189      /// future after which all invocations of accept() will not block (and any
 190      /// pending invocation will return). A value of `None` will clear any
 191      /// existing timeout.
 192      ///
 193      /// When using this method, it is likely necessary to reset the timeout as
 194      /// appropriate, the timeout specified is specific to this object, not
 195      /// specific to the next request.
 196      #[experimental = "the name and arguments to this function are likely \
 197                        to change"]
 198      pub fn set_timeout(&mut self, timeout_msOption<u64>) {
 199          self.obj.set_timeout(timeout_ms)
 200      }
 201  }
 202  
 203  impl Acceptor<UnixStream> for UnixAcceptor {
 204      fn accept(&mut self) -> IoResult<UnixStream> {
 205          self.obj.accept().map(|s| UnixStream { obj: s })
 206      }
 207  }
 208  
 209  #[cfg(test)]
 210  #[allow(experimental)]
 211  mod tests {
 212      use prelude::*;
 213      use super::*;
 214      use io::*;
 215      use io::test::*;
 216  
 217      pub fn smalltest(server: proc(UnixStream):Send, client: proc(UnixStream):Send) {
 218          let path1 = next_test_unix();
 219          let path2 = path1.clone();
 220  
 221          let mut acceptor = UnixListener::bind(&path1).listen();
 222  
 223          spawn(proc() {
 224              match UnixStream::connect(&path2) {
 225                  Ok(c) => client(c),
 226                  Err(e) => fail!("failed connect: {}", e),
 227              }
 228          });
 229  
 230          match acceptor.accept() {
 231              Ok(c) => server(c),
 232              Err(e) => fail!("failed accept: {}", e),
 233          }
 234      }
 235  
 236      iotest!(fn bind_error() {
 237          let path = "path/to/nowhere";
 238          match UnixListener::bind(&path) {
 239              Ok(..) => fail!(),
 240              Err(e) => {
 241                  assert!(e.kind == PermissionDenied || e.kind == FileNotFound ||
 242                          e.kind == InvalidInput);
 243              }
 244          }
 245      })
 246  
 247      iotest!(fn connect_error() {
 248          let path = if cfg!(windows) {
 249              r"\\.\pipe\this_should_not_exist_ever"
 250          } else {
 251              "path/to/nowhere"
 252          };
 253          match UnixStream::connect(&path) {
 254              Ok(..) => fail!(),
 255              Err(e) => {
 256                  assert!(e.kind == FileNotFound || e.kind == OtherIoError);
 257              }
 258          }
 259      })
 260  
 261      iotest!(fn smoke() {
 262          smalltest(proc(mut server) {
 263              let mut buf = [0];
 264              server.read(buf).unwrap();
 265              assert!(buf[0] == 99);
 266          }, proc(mut client) {
 267              client.write([99]).unwrap();
 268          })
 269      })
 270  
 271      iotest!(fn read_eof() {
 272          smalltest(proc(mut server) {
 273              let mut buf = [0];
 274              assert!(server.read(buf).is_err());
 275              assert!(server.read(buf).is_err());
 276          }, proc(_client) {
 277              // drop the client
 278          })
 279      } #[ignore(cfg(windows))]) // FIXME(#12516)
 280  
 281      iotest!(fn write_begone() {
 282          smalltest(proc(mut server) {
 283              let buf = [0];
 284              loop {
 285                  match server.write(buf) {
 286                      Ok(..) => {}
 287                      Err(e) => {
 288                          assert!(e.kind == BrokenPipe ||
 289                                  e.kind == NotConnected ||
 290                                  e.kind == ConnectionReset,
 291                                  "unknown error {:?}", e);
 292                          break;
 293                      }
 294                  }
 295              }
 296          }, proc(_client) {
 297              // drop the client
 298          })
 299      })
 300  
 301      iotest!(fn accept_lots() {
 302          let times = 10;
 303          let path1 = next_test_unix();
 304          let path2 = path1.clone();
 305  
 306          let mut acceptor = match UnixListener::bind(&path1).listen() {
 307              Ok(a) => a,
 308              Err(e) => fail!("failed listen: {}", e),
 309          };
 310  
 311          spawn(proc() {
 312              for _ in range(0, times) {
 313                  let mut stream = UnixStream::connect(&path2);
 314                  match stream.write([100]) {
 315                      Ok(..) => {}
 316                      Err(e) => fail!("failed write: {}", e)
 317                  }
 318              }
 319          });
 320  
 321          for _ in range(0, times) {
 322              let mut client = acceptor.accept();
 323              let mut buf = [0];
 324              match client.read(buf) {
 325                  Ok(..) => {}
 326                  Err(e) => fail!("failed read/accept: {}", e),
 327              }
 328              assert_eq!(buf[0], 100);
 329          }
 330      })
 331  
 332      #[cfg(unix)]
 333      iotest!(fn path_exists() {
 334          let path = next_test_unix();
 335          let _acceptor = UnixListener::bind(&path).listen();
 336          assert!(path.exists());
 337      })
 338  
 339      iotest!(fn unix_clone_smoke() {
 340          let addr = next_test_unix();
 341          let mut acceptor = UnixListener::bind(&addr).listen();
 342  
 343          spawn(proc() {
 344              let mut s = UnixStream::connect(&addr);
 345              let mut buf = [0, 0];
 346              debug!("client reading");
 347              assert_eq!(s.read(buf), Ok(1));
 348              assert_eq!(buf[0], 1);
 349              debug!("client writing");
 350              s.write([2]).unwrap();
 351              debug!("client dropping");
 352          });
 353  
 354          let mut s1 = acceptor.accept().unwrap();
 355          let s2 = s1.clone();
 356  
 357          let (tx1, rx1) = channel();
 358          let (tx2, rx2) = channel();
 359          spawn(proc() {
 360              let mut s2 = s2;
 361              rx1.recv();
 362              debug!("writer writing");
 363              s2.write([1]).unwrap();
 364              debug!("writer done");
 365              tx2.send(());
 366          });
 367          tx1.send(());
 368          let mut buf = [0, 0];
 369          debug!("reader reading");
 370          assert_eq!(s1.read(buf), Ok(1));
 371          debug!("reader done");
 372          rx2.recv();
 373      })
 374  
 375      iotest!(fn unix_clone_two_read() {
 376          let addr = next_test_unix();
 377          let mut acceptor = UnixListener::bind(&addr).listen();
 378          let (tx1, rx) = channel();
 379          let tx2 = tx1.clone();
 380  
 381          spawn(proc() {
 382              let mut s = UnixStream::connect(&addr);
 383              s.write([1]).unwrap();
 384              rx.recv();
 385              s.write([2]).unwrap();
 386              rx.recv();
 387          });
 388  
 389          let mut s1 = acceptor.accept().unwrap();
 390          let s2 = s1.clone();
 391  
 392          let (done, rx) = channel();
 393          spawn(proc() {
 394              let mut s2 = s2;
 395              let mut buf = [0, 0];
 396              s2.read(buf).unwrap();
 397              tx2.send(());
 398              done.send(());
 399          });
 400          let mut buf = [0, 0];
 401          s1.read(buf).unwrap();
 402          tx1.send(());
 403  
 404          rx.recv();
 405      })
 406  
 407      iotest!(fn unix_clone_two_write() {
 408          let addr = next_test_unix();
 409          let mut acceptor = UnixListener::bind(&addr).listen();
 410  
 411          spawn(proc() {
 412              let mut s = UnixStream::connect(&addr);
 413              let mut buf = [0, 1];
 414              s.read(buf).unwrap();
 415              s.read(buf).unwrap();
 416          });
 417  
 418          let mut s1 = acceptor.accept().unwrap();
 419          let s2 = s1.clone();
 420  
 421          let (tx, rx) = channel();
 422          spawn(proc() {
 423              let mut s2 = s2;
 424              s2.write([1]).unwrap();
 425              tx.send(());
 426          });
 427          s1.write([2]).unwrap();
 428  
 429          rx.recv();
 430      })
 431  
 432      iotest!(fn drop_removes_listener_path() {
 433          let path = next_test_unix();
 434          let l = UnixListener::bind(&path).unwrap();
 435          assert!(path.exists());
 436          drop(l);
 437          assert!(!path.exists());
 438      } #[cfg(not(windows))])
 439  
 440      iotest!(fn drop_removes_acceptor_path() {
 441          let path = next_test_unix();
 442          let l = UnixListener::bind(&path).unwrap();
 443          assert!(path.exists());
 444          drop(l.listen().unwrap());
 445          assert!(!path.exists());
 446      } #[cfg(not(windows))])
 447  
 448      iotest!(fn accept_timeout() {
 449          let addr = next_test_unix();
 450          let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap();
 451  
 452          a.set_timeout(Some(10));
 453  
 454          // Make sure we time out once and future invocations also time out
 455          let err = a.accept().err().unwrap();
 456          assert_eq!(err.kind, TimedOut);
 457          let err = a.accept().err().unwrap();
 458          assert_eq!(err.kind, TimedOut);
 459  
 460          // Also make sure that even though the timeout is expired that we will
 461          // continue to receive any pending connections.
 462          let (tx, rx) = channel();
 463          let addr2 = addr.clone();
 464          spawn(proc() {
 465              tx.send(UnixStream::connect(&addr2).unwrap());
 466          });
 467          let l = rx.recv();
 468          for i in range(0, 1001) {
 469              match a.accept() {
 470                  Ok(..) => break,
 471                  Err(ref e) if e.kind == TimedOut => {}
 472                  Err(e) => fail!("error: {}", e),
 473              }
 474              ::task::deschedule();
 475              if i == 1000 { fail!("should have a pending connection") }
 476          }
 477          drop(l);
 478  
 479          // Unset the timeout and make sure that this always blocks.
 480          a.set_timeout(None);
 481          let addr2 = addr.clone();
 482          spawn(proc() {
 483              drop(UnixStream::connect(&addr2).unwrap());
 484          });
 485          a.accept().unwrap();
 486      })
 487  
 488      iotest!(fn connect_timeout_error() {
 489          let addr = next_test_unix();
 490          assert!(UnixStream::connect_timeout(&addr, 100).is_err());
 491      })
 492  
 493      iotest!(fn connect_timeout_success() {
 494          let addr = next_test_unix();
 495          let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
 496          assert!(UnixStream::connect_timeout(&addr, 100).is_ok());
 497      })
 498  
 499      iotest!(fn close_readwrite_smoke() {
 500          let addr = next_test_unix();
 501          let a = UnixListener::bind(&addr).listen().unwrap();
 502          let (_tx, rx) = channel::<()>();
 503          spawn(proc() {
 504              let mut a = a;
 505              let _s = a.accept().unwrap();
 506              let _ = rx.recv_opt();
 507          });
 508  
 509          let mut b = [0];
 510          let mut s = UnixStream::connect(&addr).unwrap();
 511          let mut s2 = s.clone();
 512  
 513          // closing should prevent reads/writes
 514          s.close_write().unwrap();
 515          assert!(s.write([0]).is_err());
 516          s.close_read().unwrap();
 517          assert!(s.read(b).is_err());
 518  
 519          // closing should affect previous handles
 520          assert!(s2.write([0]).is_err());
 521          assert!(s2.read(b).is_err());
 522  
 523          // closing should affect new handles
 524          let mut s3 = s.clone();
 525          assert!(s3.write([0]).is_err());
 526          assert!(s3.read(b).is_err());
 527  
 528          // make sure these don't die
 529          let _ = s2.close_read();
 530          let _ = s2.close_write();
 531          let _ = s3.close_read();
 532          let _ = s3.close_write();
 533      })
 534  
 535      iotest!(fn close_read_wakes_up() {
 536          let addr = next_test_unix();
 537          let a = UnixListener::bind(&addr).listen().unwrap();
 538          let (_tx, rx) = channel::<()>();
 539          spawn(proc() {
 540              let mut a = a;
 541              let _s = a.accept().unwrap();
 542              let _ = rx.recv_opt();
 543          });
 544  
 545          let mut s = UnixStream::connect(&addr).unwrap();
 546          let s2 = s.clone();
 547          let (tx, rx) = channel();
 548          spawn(proc() {
 549              let mut s2 = s2;
 550              assert!(s2.read([0]).is_err());
 551              tx.send(());
 552          });
 553          // this should wake up the child task
 554          s.close_read().unwrap();
 555  
 556          // this test will never finish if the child doesn't wake up
 557          rx.recv();
 558      })
 559  
 560      iotest!(fn readwrite_timeouts() {
 561          let addr = next_test_unix();
 562          let mut a = UnixListener::bind(&addr).listen().unwrap();
 563          let (tx, rx) = channel::<()>();
 564          spawn(proc() {
 565              let mut s = UnixStream::connect(&addr).unwrap();
 566              rx.recv();
 567              assert!(s.write([0]).is_ok());
 568              let _ = rx.recv_opt();
 569          });
 570  
 571          let mut s = a.accept().unwrap();
 572          s.set_timeout(Some(20));
 573          assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
 574          assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
 575  
 576          s.set_timeout(Some(20));
 577          for i in range(0, 1001) {
 578              match s.write([0, .. 128 * 1024]) {
 579                  Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
 580                  Err(IoError { kind: TimedOut, .. }) => break,
 581                  Err(e) => fail!("{}", e),
 582             }
 583             if i == 1000 { fail!("should have filled up?!"); }
 584          }
 585  
 586          // I'm not sure as to why, but apparently the write on windows always
 587          // succeeds after the previous timeout. Who knows?
 588          if !cfg!(windows) {
 589              assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
 590          }
 591  
 592          tx.send(());
 593          s.set_timeout(None);
 594          assert_eq!(s.read([0, 0]), Ok(1));
 595      })
 596  
 597      iotest!(fn read_timeouts() {
 598          let addr = next_test_unix();
 599          let mut a = UnixListener::bind(&addr).listen().unwrap();
 600          let (tx, rx) = channel::<()>();
 601          spawn(proc() {
 602              let mut s = UnixStream::connect(&addr).unwrap();
 603              rx.recv();
 604              let mut amt = 0;
 605              while amt < 100 * 128 * 1024 {
 606                  match s.read([0, ..128 * 1024]) {
 607                      Ok(n) => { amt += n; }
 608                      Err(e) => fail!("{}", e),
 609                  }
 610              }
 611              let _ = rx.recv_opt();
 612          });
 613  
 614          let mut s = a.accept().unwrap();
 615          s.set_read_timeout(Some(20));
 616          assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
 617          assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
 618  
 619          tx.send(());
 620          for _ in range(0, 100) {
 621              assert!(s.write([0, ..128 * 1024]).is_ok());
 622          }
 623      })
 624  
 625      iotest!(fn write_timeouts() {
 626          let addr = next_test_unix();
 627          let mut a = UnixListener::bind(&addr).listen().unwrap();
 628          let (tx, rx) = channel::<()>();
 629          spawn(proc() {
 630              let mut s = UnixStream::connect(&addr).unwrap();
 631              rx.recv();
 632              assert!(s.write([0]).is_ok());
 633              let _ = rx.recv_opt();
 634          });
 635  
 636          let mut s = a.accept().unwrap();
 637          s.set_write_timeout(Some(20));
 638          for i in range(0, 1001) {
 639              match s.write([0, .. 128 * 1024]) {
 640                  Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
 641                  Err(IoError { kind: TimedOut, .. }) => break,
 642                  Err(e) => fail!("{}", e),
 643             }
 644             if i == 1000 { fail!("should have filled up?!"); }
 645          }
 646  
 647          tx.send(());
 648          assert!(s.read([0]).is_ok());
 649      })
 650  
 651      iotest!(fn timeout_concurrent_read() {
 652          let addr = next_test_unix();
 653          let mut a = UnixListener::bind(&addr).listen().unwrap();
 654          let (tx, rx) = channel::<()>();
 655          spawn(proc() {
 656              let mut s = UnixStream::connect(&addr).unwrap();
 657              rx.recv();
 658              assert!(s.write([0]).is_ok());
 659              let _ = rx.recv_opt();
 660          });
 661  
 662          let mut s = a.accept().unwrap();
 663          let s2 = s.clone();
 664          let (tx2, rx2) = channel();
 665          spawn(proc() {
 666              let mut s2 = s2;
 667              assert!(s2.read([0]).is_ok());
 668              tx2.send(());
 669          });
 670  
 671          s.set_read_timeout(Some(20));
 672          assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
 673          tx.send(());
 674  
 675          rx2.recv();
 676      })
 677  }