(index<- )        ./libstd/io/net/tcp.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
    1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
    2  // file at the top-level directory of this distribution and at
    3  // http://rust-lang.org/COPYRIGHT.
    4  //
    5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
    6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
    7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
    8  // option. This file may not be copied, modified, or distributed
    9  // except according to those terms.
   10  
   11  //! TCP network connections
   12  //!
   13  //! This module contains the ability to open a TCP stream to a socket address,
   14  //! as well as creating a socket server to accept incoming connections. The
   15  //! destination and binding addresses can either be an IPv4 or IPv6 address.
   16  //!
   17  //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
   18  //! listener (socket server) implements the `Listener` and `Acceptor` traits.
   19  
   20  use clone::Clone;
   21  use io::IoResult;
   22  use io::net::ip::SocketAddr;
   23  use io::{Reader, Writer, Listener, Acceptor};
   24  use kinds::Send;
   25  use option::{None, Some, Option};
   26  use owned::Box;
   27  use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
   28  use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
   29  
   30  /// A structure which represents a TCP stream between a local socket and a
   31  /// remote socket.
   32  ///
   33  /// # Example
   34  ///
   35  /// ```no_run
   36  /// # #![allow(unused_must_use)]
   37  /// use std::io::net::tcp::TcpStream;
   38  /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
   39  ///
   40  /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
   41  /// let mut stream = TcpStream::connect(addr);
   42  ///
   43  /// stream.write([1]);
   44  /// let mut buf = [0];
   45  /// stream.read(buf);
   46  /// drop(stream); // close the connection
   47  /// ```
   48  pub struct TcpStream {
   49      obj: Box<RtioTcpStream:Send>,
   50  }
   51  
   52  impl TcpStream {
   53      fn new(sBox<RtioTcpStream:Send>) -> TcpStream {
   54          TcpStream { obj: s }
   55      }
   56  
   57      /// Creates a TCP connection to a remote socket address.
   58      ///
   59      /// If no error is encountered, then `Ok(stream)` is returned.
   60      pub fn connect(addrSocketAddr) -> IoResult<TcpStream> {
   61          LocalIo::maybe_raise(|io| {
   62              io.tcp_connect(addr, None).map(TcpStream::new)
   63          })
   64      }
   65  
   66      /// Creates a TCP connection to a remote socket address, timing out after
   67      /// the specified number of milliseconds.
   68      ///
   69      /// This is the same as the `connect` method, except that if the timeout
   70      /// specified (in milliseconds) elapses before a connection is made an error
   71      /// will be returned. The error's kind will be `TimedOut`.
   72      #[experimental = "the timeout argument may eventually change types"]
   73      pub fn connect_timeout(addrSocketAddr,
   74                             timeout_msu64) -> IoResult<TcpStream> {
   75          LocalIo::maybe_raise(|io| {
   76              io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
   77          })
   78      }
   79  
   80      /// Returns the socket address of the remote peer of this TCP connection.
   81      pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
   82          self.obj.peer_name()
   83      }
   84  
   85      /// Returns the socket address of the local half of this TCP connection.
   86      pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
   87          self.obj.socket_name()
   88      }
   89  
   90      /// Sets the nodelay flag on this connection to the boolean specified
   91      #[experimental]
   92      pub fn set_nodelay(&mut self, nodelaybool) -> IoResult<()> {
   93          if nodelay {
   94              self.obj.nodelay()
   95          } else {
   96              self.obj.control_congestion()
   97          }
   98      }
   99  
  100      /// Sets the keepalive timeout to the timeout specified.
  101      ///
  102      /// If the value specified is `None`, then the keepalive flag is cleared on
  103      /// this connection. Otherwise, the keepalive timeout will be set to the
  104      /// specified time, in seconds.
  105      #[experimental]
  106      pub fn set_keepalive(&mut self, delay_in_secondsOption<uint>) -> IoResult<()> {
  107          match delay_in_seconds {
  108              Some(i) => self.obj.keepalive(i),
  109              None => self.obj.letdie(),
  110          }
  111      }
  112  
  113      /// Closes the reading half of this connection.
  114      ///
  115      /// This method will close the reading portion of this connection, causing
  116      /// all pending and future reads to immediately return with an error.
  117      ///
  118      /// # Example
  119      ///
  120      /// ```no_run
  121      /// # #![allow(unused_must_use)]
  122      /// use std::io::timer;
  123      /// use std::io::net::tcp::TcpStream;
  124      /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
  125      ///
  126      /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
  127      /// let mut stream = TcpStream::connect(addr).unwrap();
  128      /// let stream2 = stream.clone();
  129      ///
  130      /// spawn(proc() {
  131      ///     // close this stream after one second
  132      ///     timer::sleep(1000);
  133      ///     let mut stream = stream2;
  134      ///     stream.close_read();
  135      /// });
  136      ///
  137      /// // wait for some data, will get canceled after one second
  138      /// let mut buf = [0];
  139      /// stream.read(buf);
  140      /// ```
  141      ///
  142      /// Note that this method affects all cloned handles associated with this
  143      /// stream, not just this one handle.
  144      pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() }
  145  
  146      /// Closes the writing half of this connection.
  147      ///
  148      /// This method will close the writing portion of this connection, causing
  149      /// all future writes to immediately return with an error.
  150      ///
  151      /// Note that this method affects all cloned handles associated with this
  152      /// stream, not just this one handle.
  153      pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
  154  
  155      /// Sets a timeout, in milliseconds, for blocking operations on this stream.
  156      ///
  157      /// This function will set a timeout for all blocking operations (including
  158      /// reads and writes) on this stream. The timeout specified is a relative
  159      /// time, in milliseconds, into the future after which point operations will
  160      /// time out. This means that the timeout must be reset periodically to keep
  161      /// it from expiring. Specifying a value of `None` will clear the timeout
  162      /// for this stream.
  163      ///
  164      /// The timeout on this stream is local to this stream only. Setting a
  165      /// timeout does not affect any other cloned instances of this stream, nor
  166      /// does the timeout propagated to cloned handles of this stream. Setting
  167      /// this timeout will override any specific read or write timeouts
  168      /// previously set for this stream.
  169      ///
  170      /// For clarification on the semantics of interrupting a read and a write,
  171      /// take a look at `set_read_timeout` and `set_write_timeout`.
  172      #[experimental = "the timeout argument may change in type and value"]
  173      pub fn set_timeout(&mut self, timeout_msOption<u64>) {
  174          self.obj.set_timeout(timeout_ms)
  175      }
  176  
  177      /// Sets the timeout for read operations on this stream.
  178      ///
  179      /// See documentation in `set_timeout` for the semantics of this read time.
  180      /// This will overwrite any previous read timeout set through either this
  181      /// function or `set_timeout`.
  182      ///
  183      /// # Errors
  184      ///
  185      /// When this timeout expires, if there is no pending read operation, no
  186      /// action is taken. Otherwise, the read operation will be scheduled to
  187      /// promptly return. If a timeout error is returned, then no data was read
  188      /// during the timeout period.
  189      #[experimental = "the timeout argument may change in type and value"]
  190      pub fn set_read_timeout(&mut self, timeout_msOption<u64>) {
  191          self.obj.set_read_timeout(timeout_ms)
  192      }
  193  
  194      /// Sets the timeout for write operations on this stream.
  195      ///
  196      /// See documentation in `set_timeout` for the semantics of this write time.
  197      /// This will overwrite any previous write timeout set through either this
  198      /// function or `set_timeout`.
  199      ///
  200      /// # Errors
  201      ///
  202      /// When this timeout expires, if there is no pending write operation, no
  203      /// action is taken. Otherwise, the pending write operation will be
  204      /// scheduled to promptly return. The actual state of the underlying stream
  205      /// is not specified.
  206      ///
  207      /// The write operation may return an error of type `ShortWrite` which
  208      /// indicates that the object is known to have written an exact number of
  209      /// bytes successfully during the timeout period, and the remaining bytes
  210      /// were never written.
  211      ///
  212      /// If the write operation returns `TimedOut`, then it the timeout primitive
  213      /// does not know how many bytes were written as part of the timeout
  214      /// operation. It may be the case that bytes continue to be written in an
  215      /// asynchronous fashion after the call to write returns.
  216      #[experimental = "the timeout argument may change in type and value"]
  217      pub fn set_write_timeout(&mut self, timeout_msOption<u64>) {
  218          self.obj.set_write_timeout(timeout_ms)
  219      }
  220  }
  221  
  222  impl Clone for TcpStream {
  223      /// Creates a new handle to this TCP stream, allowing for simultaneous reads
  224      /// and writes of this connection.
  225      ///
  226      /// The underlying TCP stream will not be closed until all handles to the
  227      /// stream have been deallocated. All handles will also follow the same
  228      /// stream, but two concurrent reads will not receive the same data.
  229      /// Instead, the first read will receive the first packet received, and the
  230      /// second read will receive the second packet.
  231      fn clone(&self) -> TcpStream {
  232          TcpStream { obj: self.obj.clone() }
  233      }
  234  }
  235  
  236  impl Reader for TcpStream {
  237      fn read(&mut self, buf&mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
  238  }
  239  
  240  impl Writer for TcpStream {
  241      fn write(&mut self, buf&[u8]) -> IoResult<()> { self.obj.write(buf) }
  242  }
  243  
  244  /// A structure representing a socket server. This listener is used to create a
  245  /// `TcpAcceptor` which can be used to accept sockets on a local port.
  246  ///
  247  /// # Example
  248  ///
  249  /// ```rust
  250  /// # fn main() { }
  251  /// # fn foo() {
  252  /// # #![allow(dead_code)]
  253  /// use std::io::{TcpListener, TcpStream};
  254  /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
  255  /// use std::io::{Acceptor, Listener};
  256  ///
  257  /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 80 };
  258  /// let listener = TcpListener::bind(addr);
  259  ///
  260  /// // bind the listener to the specified address
  261  /// let mut acceptor = listener.listen();
  262  ///
  263  /// fn handle_client(mut stream: TcpStream) {
  264  ///     // ...
  265  /// # &mut stream; // silence unused mutability/variable warning
  266  /// }
  267  /// // accept connections and process them, spawning a new tasks for each one
  268  /// for stream in acceptor.incoming() {
  269  ///     match stream {
  270  ///         Err(e) => { /* connection failed */ }
  271  ///         Ok(stream) => spawn(proc() {
  272  ///             // connection succeeded
  273  ///             handle_client(stream)
  274  ///         })
  275  ///     }
  276  /// }
  277  ///
  278  /// // close the socket server
  279  /// drop(acceptor);
  280  /// # }
  281  /// ```
  282  pub struct TcpListener {
  283      obj: Box<RtioTcpListener:Send>,
  284  }
  285  
  286  impl TcpListener {
  287      /// Creates a new `TcpListener` which will be bound to the specified local
  288      /// socket address. This listener is not ready for accepting connections,
  289      /// `listen` must be called on it before that's possible.
  290      ///
  291      /// Binding with a port number of 0 will request that the OS assigns a port
  292      /// to this listener. The port allocated can be queried via the
  293      /// `socket_name` function.
  294      pub fn bind(addrSocketAddr) -> IoResult<TcpListener> {
  295          LocalIo::maybe_raise(|io| {
  296              io.tcp_bind(addr).map(|l| TcpListener { obj: l })
  297          })
  298      }
  299  
  300      /// Returns the local socket address of this listener.
  301      pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
  302          self.obj.socket_name()
  303      }
  304  }
  305  
  306  impl Listener<TcpStream, TcpAcceptor> for TcpListener {
  307      fn listen(self) -> IoResult<TcpAcceptor> {
  308          self.obj.listen().map(|acceptor| TcpAcceptor { obj: acceptor })
  309      }
  310  }
  311  
  312  /// The accepting half of a TCP socket server. This structure is created through
  313  /// a `TcpListener`'s `listen` method, and this object can be used to accept new
  314  /// `TcpStream` instances.
  315  pub struct TcpAcceptor {
  316      obj: Box<RtioTcpAcceptor:Send>,
  317  }
  318  
  319  impl TcpAcceptor {
  320      /// Prevents blocking on all future accepts after `ms` milliseconds have
  321      /// elapsed.
  322      ///
  323      /// This function is used to set a deadline after which this acceptor will
  324      /// time out accepting any connections. The argument is the relative
  325      /// distance, in milliseconds, to a point in the future after which all
  326      /// accepts will fail.
  327      ///
  328      /// If the argument specified is `None`, then any previously registered
  329      /// timeout is cleared.
  330      ///
  331      /// A timeout of `0` can be used to "poll" this acceptor to see if it has
  332      /// any pending connections. All pending connections will be accepted,
  333      /// regardless of whether the timeout has expired or not (the accept will
  334      /// not block in this case).
  335      ///
  336      /// # Example
  337      ///
  338      /// ```no_run
  339      /// # #![allow(experimental)]
  340      /// use std::io::net::tcp::TcpListener;
  341      /// use std::io::net::ip::{SocketAddr, Ipv4Addr};
  342      /// use std::io::{Listener, Acceptor, TimedOut};
  343      ///
  344      /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 8482 };
  345      /// let mut a = TcpListener::bind(addr).listen().unwrap();
  346      ///
  347      /// // After 100ms have passed, all accepts will fail
  348      /// a.set_timeout(Some(100));
  349      ///
  350      /// match a.accept() {
  351      ///     Ok(..) => println!("accepted a socket"),
  352      ///     Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
  353      ///     Err(e) => println!("err: {}", e),
  354      /// }
  355      ///
  356      /// // Reset the timeout and try again
  357      /// a.set_timeout(Some(100));
  358      /// let socket = a.accept();
  359      ///
  360      /// // Clear the timeout and block indefinitely waiting for a connection
  361      /// a.set_timeout(None);
  362      /// let socket = a.accept();
  363      /// ```
  364      #[experimental = "the type of the argument and name of this function are \
  365                        subject to change"]
  366      pub fn set_timeout(&mut self, msOption<u64>) { self.obj.set_timeout(ms); }
  367  }
  368  
  369  impl Acceptor<TcpStream> for TcpAcceptor {
  370      fn accept(&mut self) -> IoResult<TcpStream> {
  371          self.obj.accept().map(TcpStream::new)
  372      }
  373  }
  374  
  375  #[cfg(test)]
  376  #[allow(experimental)]
  377  mod test {
  378      use super::*;
  379      use io::net::ip::SocketAddr;
  380      use io::*;
  381      use prelude::*;
  382  
  383      // FIXME #11530 this fails on android because tests are run as root
  384      iotest!(fn bind_error() {
  385          let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
  386          match TcpListener::bind(addr) {
  387              Ok(..) => fail!(),
  388              Err(e) => assert_eq!(e.kind, PermissionDenied),
  389          }
  390      } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
  391  
  392      iotest!(fn connect_error() {
  393          let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
  394          match TcpStream::connect(addr) {
  395              Ok(..) => fail!(),
  396              Err(e) => assert_eq!(e.kind, ConnectionRefused),
  397          }
  398      })
  399  
  400      iotest!(fn smoke_test_ip4() {
  401          let addr = next_test_ip4();
  402          let mut acceptor = TcpListener::bind(addr).listen();
  403  
  404          spawn(proc() {
  405              let mut stream = TcpStream::connect(addr);
  406              stream.write([99]).unwrap();
  407          });
  408  
  409          let mut stream = acceptor.accept();
  410          let mut buf = [0];
  411          stream.read(buf).unwrap();
  412          assert!(buf[0] == 99);
  413      })
  414  
  415      iotest!(fn smoke_test_ip6() {
  416          let addr = next_test_ip6();
  417          let mut acceptor = TcpListener::bind(addr).listen();
  418  
  419          spawn(proc() {
  420              let mut stream = TcpStream::connect(addr);
  421              stream.write([99]).unwrap();
  422          });
  423  
  424          let mut stream = acceptor.accept();
  425          let mut buf = [0];
  426          stream.read(buf).unwrap();
  427          assert!(buf[0] == 99);
  428      })
  429  
  430      iotest!(fn read_eof_ip4() {
  431          let addr = next_test_ip4();
  432          let mut acceptor = TcpListener::bind(addr).listen();
  433  
  434          spawn(proc() {
  435              let _stream = TcpStream::connect(addr);
  436              // Close
  437          });
  438  
  439          let mut stream = acceptor.accept();
  440          let mut buf = [0];
  441          let nread = stream.read(buf);
  442          assert!(nread.is_err());
  443      })
  444  
  445      iotest!(fn read_eof_ip6() {
  446          let addr = next_test_ip6();
  447          let mut acceptor = TcpListener::bind(addr).listen();
  448  
  449          spawn(proc() {
  450              let _stream = TcpStream::connect(addr);
  451              // Close
  452          });
  453  
  454          let mut stream = acceptor.accept();
  455          let mut buf = [0];
  456          let nread = stream.read(buf);
  457          assert!(nread.is_err());
  458      })
  459  
  460      iotest!(fn read_eof_twice_ip4() {
  461          let addr = next_test_ip4();
  462          let mut acceptor = TcpListener::bind(addr).listen();
  463  
  464          spawn(proc() {
  465              let _stream = TcpStream::connect(addr);
  466              // Close
  467          });
  468  
  469          let mut stream = acceptor.accept();
  470          let mut buf = [0];
  471          let nread = stream.read(buf);
  472          assert!(nread.is_err());
  473  
  474          match stream.read(buf) {
  475              Ok(..) => fail!(),
  476              Err(ref e) => {
  477                  assert!(e.kind == NotConnected || e.kind == EndOfFile,
  478                          "unknown kind: {:?}", e.kind);
  479              }
  480          }
  481      })
  482  
  483      iotest!(fn read_eof_twice_ip6() {
  484          let addr = next_test_ip6();
  485          let mut acceptor = TcpListener::bind(addr).listen();
  486  
  487          spawn(proc() {
  488              let _stream = TcpStream::connect(addr);
  489              // Close
  490          });
  491  
  492          let mut stream = acceptor.accept();
  493          let mut buf = [0];
  494          let nread = stream.read(buf);
  495          assert!(nread.is_err());
  496  
  497          match stream.read(buf) {
  498              Ok(..) => fail!(),
  499              Err(ref e) => {
  500                  assert!(e.kind == NotConnected || e.kind == EndOfFile,
  501                          "unknown kind: {:?}", e.kind);
  502              }
  503          }
  504      })
  505  
  506      iotest!(fn write_close_ip4() {
  507          let addr = next_test_ip4();
  508          let mut acceptor = TcpListener::bind(addr).listen();
  509  
  510          spawn(proc() {
  511              let _stream = TcpStream::connect(addr);
  512              // Close
  513          });
  514  
  515          let mut stream = acceptor.accept();
  516          let buf = [0];
  517          loop {
  518              match stream.write(buf) {
  519                  Ok(..) => {}
  520                  Err(e) => {
  521                      assert!(e.kind == ConnectionReset ||
  522                              e.kind == BrokenPipe ||
  523                              e.kind == ConnectionAborted,
  524                              "unknown error: {:?}", e);
  525                      break;
  526                  }
  527              }
  528          }
  529      })
  530  
  531      iotest!(fn write_close_ip6() {
  532          let addr = next_test_ip6();
  533          let mut acceptor = TcpListener::bind(addr).listen();
  534  
  535          spawn(proc() {
  536              let _stream = TcpStream::connect(addr);
  537              // Close
  538          });
  539  
  540          let mut stream = acceptor.accept();
  541          let buf = [0];
  542          loop {
  543              match stream.write(buf) {
  544                  Ok(..) => {}
  545                  Err(e) => {
  546                      assert!(e.kind == ConnectionReset ||
  547                              e.kind == BrokenPipe ||
  548                              e.kind == ConnectionAborted,
  549                              "unknown error: {:?}", e);
  550                      break;
  551                  }
  552              }
  553          }
  554      })
  555  
  556      iotest!(fn multiple_connect_serial_ip4() {
  557          let addr = next_test_ip4();
  558          let max = 10u;
  559          let mut acceptor = TcpListener::bind(addr).listen();
  560  
  561          spawn(proc() {
  562              for _ in range(0, max) {
  563                  let mut stream = TcpStream::connect(addr);
  564                  stream.write([99]).unwrap();
  565              }
  566          });
  567  
  568          for ref mut stream in acceptor.incoming().take(max) {
  569              let mut buf = [0];
  570              stream.read(buf).unwrap();
  571              assert_eq!(buf[0], 99);
  572          }
  573      })
  574  
  575      iotest!(fn multiple_connect_serial_ip6() {
  576          let addr = next_test_ip6();
  577          let max = 10u;
  578          let mut acceptor = TcpListener::bind(addr).listen();
  579  
  580          spawn(proc() {
  581              for _ in range(0, max) {
  582                  let mut stream = TcpStream::connect(addr);
  583                  stream.write([99]).unwrap();
  584              }
  585          });
  586  
  587          for ref mut stream in acceptor.incoming().take(max) {
  588              let mut buf = [0];
  589              stream.read(buf).unwrap();
  590              assert_eq!(buf[0], 99);
  591          }
  592      })
  593  
  594      iotest!(fn multiple_connect_interleaved_greedy_schedule_ip4() {
  595          let addr = next_test_ip4();
  596          static MAX: int = 10;
  597          let acceptor = TcpListener::bind(addr).listen();
  598  
  599          spawn(proc() {
  600              let mut acceptor = acceptor;
  601              for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
  602                  // Start another task to handle the connection
  603                  spawn(proc() {
  604                      let mut stream = stream;
  605                      let mut buf = [0];
  606                      stream.read(buf).unwrap();
  607                      assert!(buf[0] == i as u8);
  608                      debug!("read");
  609                  });
  610              }
  611          });
  612  
  613          connect(0, addr);
  614  
  615          fn connect(i: int, addr: SocketAddr) {
  616              if i == MAX { return }
  617  
  618              spawn(proc() {
  619                  debug!("connecting");
  620                  let mut stream = TcpStream::connect(addr);
  621                  // Connect again before writing
  622                  connect(i + 1, addr);
  623                  debug!("writing");
  624                  stream.write([i as u8]).unwrap();
  625              });
  626          }
  627      })
  628  
  629      iotest!(fn multiple_connect_interleaved_greedy_schedule_ip6() {
  630          let addr = next_test_ip6();
  631          static MAX: int = 10;
  632          let acceptor = TcpListener::bind(addr).listen();
  633  
  634          spawn(proc() {
  635              let mut acceptor = acceptor;
  636              for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
  637                  // Start another task to handle the connection
  638                  spawn(proc() {
  639                      let mut stream = stream;
  640                      let mut buf = [0];
  641                      stream.read(buf).unwrap();
  642                      assert!(buf[0] == i as u8);
  643                      debug!("read");
  644                  });
  645              }
  646          });
  647  
  648          connect(0, addr);
  649  
  650          fn connect(i: int, addr: SocketAddr) {
  651              if i == MAX { return }
  652  
  653              spawn(proc() {
  654                  debug!("connecting");
  655                  let mut stream = TcpStream::connect(addr);
  656                  // Connect again before writing
  657                  connect(i + 1, addr);
  658                  debug!("writing");
  659                  stream.write([i as u8]).unwrap();
  660              });
  661          }
  662      })
  663  
  664      iotest!(fn multiple_connect_interleaved_lazy_schedule_ip4() {
  665          static MAX: int = 10;
  666          let addr = next_test_ip4();
  667          let acceptor = TcpListener::bind(addr).listen();
  668  
  669          spawn(proc() {
  670              let mut acceptor = acceptor;
  671              for stream in acceptor.incoming().take(MAX as uint) {
  672                  // Start another task to handle the connection
  673                  spawn(proc() {
  674                      let mut stream = stream;
  675                      let mut buf = [0];
  676                      stream.read(buf).unwrap();
  677                      assert!(buf[0] == 99);
  678                      debug!("read");
  679                  });
  680              }
  681          });
  682  
  683          connect(0, addr);
  684  
  685          fn connect(i: int, addr: SocketAddr) {
  686              if i == MAX { return }
  687  
  688              spawn(proc() {
  689                  debug!("connecting");
  690                  let mut stream = TcpStream::connect(addr);
  691                  // Connect again before writing
  692                  connect(i + 1, addr);
  693                  debug!("writing");
  694                  stream.write([99]).unwrap();
  695              });
  696          }
  697      })
  698  
  699      iotest!(fn multiple_connect_interleaved_lazy_schedule_ip6() {
  700          static MAX: int = 10;
  701          let addr = next_test_ip6();
  702          let acceptor = TcpListener::bind(addr).listen();
  703  
  704          spawn(proc() {
  705              let mut acceptor = acceptor;
  706              for stream in acceptor.incoming().take(MAX as uint) {
  707                  // Start another task to handle the connection
  708                  spawn(proc() {
  709                      let mut stream = stream;
  710                      let mut buf = [0];
  711                      stream.read(buf).unwrap();
  712                      assert!(buf[0] == 99);
  713                      debug!("read");
  714                  });
  715              }
  716          });
  717  
  718          connect(0, addr);
  719  
  720          fn connect(i: int, addr: SocketAddr) {
  721              if i == MAX { return }
  722  
  723              spawn(proc() {
  724                  debug!("connecting");
  725                  let mut stream = TcpStream::connect(addr);
  726                  // Connect again before writing
  727                  connect(i + 1, addr);
  728                  debug!("writing");
  729                  stream.write([99]).unwrap();
  730              });
  731          }
  732      })
  733  
  734      pub fn socket_name(addr: SocketAddr) {
  735          let mut listener = TcpListener::bind(addr).unwrap();
  736  
  737          // Make sure socket_name gives
  738          // us the socket we binded to.
  739          let so_name = listener.socket_name();
  740          assert!(so_name.is_ok());
  741          assert_eq!(addr, so_name.unwrap());
  742      }
  743  
  744      pub fn peer_name(addr: SocketAddr) {
  745          let acceptor = TcpListener::bind(addr).listen();
  746          spawn(proc() {
  747              let mut acceptor = acceptor;
  748              acceptor.accept().unwrap();
  749          });
  750  
  751          let stream = TcpStream::connect(addr);
  752  
  753          assert!(stream.is_ok());
  754          let mut stream = stream.unwrap();
  755  
  756          // Make sure peer_name gives us the
  757          // address/port of the peer we've
  758          // connected to.
  759          let peer_name = stream.peer_name();
  760          assert!(peer_name.is_ok());
  761          assert_eq!(addr, peer_name.unwrap());
  762      }
  763  
  764      iotest!(fn socket_and_peer_name_ip4() {
  765          peer_name(next_test_ip4());
  766          socket_name(next_test_ip4());
  767      })
  768  
  769      iotest!(fn socket_and_peer_name_ip6() {
  770          // FIXME: peer name is not consistent
  771          //peer_name(next_test_ip6());
  772          socket_name(next_test_ip6());
  773      })
  774  
  775      iotest!(fn partial_read() {
  776          let addr = next_test_ip4();
  777          let (tx, rx) = channel();
  778          spawn(proc() {
  779              let mut srv = TcpListener::bind(addr).listen().unwrap();
  780              tx.send(());
  781              let mut cl = srv.accept().unwrap();
  782              cl.write([10]).unwrap();
  783              let mut b = [0];
  784              cl.read(b).unwrap();
  785              tx.send(());
  786          });
  787  
  788          rx.recv();
  789          let mut c = TcpStream::connect(addr).unwrap();
  790          let mut b = [0, ..10];
  791          assert_eq!(c.read(b), Ok(1));
  792          c.write([1]).unwrap();
  793          rx.recv();
  794      })
  795  
  796      iotest!(fn double_bind() {
  797          let addr = next_test_ip4();
  798          let listener = TcpListener::bind(addr).unwrap().listen();
  799          assert!(listener.is_ok());
  800          match TcpListener::bind(addr).listen() {
  801              Ok(..) => fail!(),
  802              Err(e) => {
  803                  assert!(e.kind == ConnectionRefused || e.kind == OtherIoError);
  804              }
  805          }
  806      })
  807  
  808      iotest!(fn fast_rebind() {
  809          let addr = next_test_ip4();
  810          let (tx, rx) = channel();
  811  
  812          spawn(proc() {
  813              rx.recv();
  814              let _stream = TcpStream::connect(addr).unwrap();
  815              // Close
  816              rx.recv();
  817          });
  818  
  819          {
  820              let mut acceptor = TcpListener::bind(addr).listen();
  821              tx.send(());
  822              {
  823                  let _stream = acceptor.accept().unwrap();
  824                  // Close client
  825                  tx.send(());
  826              }
  827              // Close listener
  828          }
  829          let _listener = TcpListener::bind(addr);
  830      })
  831  
  832      iotest!(fn tcp_clone_smoke() {
  833          let addr = next_test_ip4();
  834          let mut acceptor = TcpListener::bind(addr).listen();
  835  
  836          spawn(proc() {
  837              let mut s = TcpStream::connect(addr);
  838              let mut buf = [0, 0];
  839              assert_eq!(s.read(buf), Ok(1));
  840              assert_eq!(buf[0], 1);
  841              s.write([2]).unwrap();
  842          });
  843  
  844          let mut s1 = acceptor.accept().unwrap();
  845          let s2 = s1.clone();
  846  
  847          let (tx1, rx1) = channel();
  848          let (tx2, rx2) = channel();
  849          spawn(proc() {
  850              let mut s2 = s2;
  851              rx1.recv();
  852              s2.write([1]).unwrap();
  853              tx2.send(());
  854          });
  855          tx1.send(());
  856          let mut buf = [0, 0];
  857          assert_eq!(s1.read(buf), Ok(1));
  858          rx2.recv();
  859      })
  860  
  861      iotest!(fn tcp_clone_two_read() {
  862          let addr = next_test_ip6();
  863          let mut acceptor = TcpListener::bind(addr).listen();
  864          let (tx1, rx) = channel();
  865          let tx2 = tx1.clone();
  866  
  867          spawn(proc() {
  868              let mut s = TcpStream::connect(addr);
  869              s.write([1]).unwrap();
  870              rx.recv();
  871              s.write([2]).unwrap();
  872              rx.recv();
  873          });
  874  
  875          let mut s1 = acceptor.accept().unwrap();
  876          let s2 = s1.clone();
  877  
  878          let (done, rx) = channel();
  879          spawn(proc() {
  880              let mut s2 = s2;
  881              let mut buf = [0, 0];
  882              s2.read(buf).unwrap();
  883              tx2.send(());
  884              done.send(());
  885          });
  886          let mut buf = [0, 0];
  887          s1.read(buf).unwrap();
  888          tx1.send(());
  889  
  890          rx.recv();
  891      })
  892  
  893      iotest!(fn tcp_clone_two_write() {
  894          let addr = next_test_ip4();
  895          let mut acceptor = TcpListener::bind(addr).listen();
  896  
  897          spawn(proc() {
  898              let mut s = TcpStream::connect(addr);
  899              let mut buf = [0, 1];
  900              s.read(buf).unwrap();
  901              s.read(buf).unwrap();
  902          });
  903  
  904          let mut s1 = acceptor.accept().unwrap();
  905          let s2 = s1.clone();
  906  
  907          let (done, rx) = channel();
  908          spawn(proc() {
  909              let mut s2 = s2;
  910              s2.write([1]).unwrap();
  911              done.send(());
  912          });
  913          s1.write([2]).unwrap();
  914  
  915          rx.recv();
  916      })
  917  
  918      iotest!(fn shutdown_smoke() {
  919          use rt::rtio::RtioTcpStream;
  920  
  921          let addr = next_test_ip4();
  922          let a = TcpListener::bind(addr).unwrap().listen();
  923          spawn(proc() {
  924              let mut a = a;
  925              let mut c = a.accept().unwrap();
  926              assert_eq!(c.read_to_end(), Ok(vec!()));
  927              c.write([1]).unwrap();
  928          });
  929  
  930          let mut s = TcpStream::connect(addr).unwrap();
  931          assert!(s.obj.close_write().is_ok());
  932          assert!(s.write([1]).is_err());
  933          assert_eq!(s.read_to_end(), Ok(vec!(1)));
  934      })
  935  
  936      iotest!(fn accept_timeout() {
  937          let addr = next_test_ip4();
  938          let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
  939  
  940          a.set_timeout(Some(10));
  941  
  942          // Make sure we time out once and future invocations also time out
  943          let err = a.accept().err().unwrap();
  944          assert_eq!(err.kind, TimedOut);
  945          let err = a.accept().err().unwrap();
  946          assert_eq!(err.kind, TimedOut);
  947  
  948          // Also make sure that even though the timeout is expired that we will
  949          // continue to receive any pending connections.
  950          let (tx, rx) = channel();
  951          spawn(proc() {
  952              tx.send(TcpStream::connect(addr).unwrap());
  953          });
  954          let l = rx.recv();
  955          for i in range(0, 1001) {
  956              match a.accept() {
  957                  Ok(..) => break,
  958                  Err(ref e) if e.kind == TimedOut => {}
  959                  Err(e) => fail!("error: {}", e),
  960              }
  961              ::task::deschedule();
  962              if i == 1000 { fail!("should have a pending connection") }
  963          }
  964          drop(l);
  965  
  966          // Unset the timeout and make sure that this always blocks.
  967          a.set_timeout(None);
  968          spawn(proc() {
  969              drop(TcpStream::connect(addr).unwrap());
  970          });
  971          a.accept().unwrap();
  972      })
  973  
  974      iotest!(fn close_readwrite_smoke() {
  975          let addr = next_test_ip4();
  976          let a = TcpListener::bind(addr).listen().unwrap();
  977          let (_tx, rx) = channel::<()>();
  978          spawn(proc() {
  979              let mut a = a;
  980              let _s = a.accept().unwrap();
  981              let _ = rx.recv_opt();
  982          });
  983  
  984          let mut b = [0];
  985          let mut s = TcpStream::connect(addr).unwrap();
  986          let mut s2 = s.clone();
  987  
  988          // closing should prevent reads/writes
  989          s.close_write().unwrap();
  990          assert!(s.write([0]).is_err());
  991          s.close_read().unwrap();
  992          assert!(s.read(b).is_err());
  993  
  994          // closing should affect previous handles
  995          assert!(s2.write([0]).is_err());
  996          assert!(s2.read(b).is_err());
  997  
  998          // closing should affect new handles
  999          let mut s3 = s.clone();
 1000          assert!(s3.write([0]).is_err());
 1001          assert!(s3.read(b).is_err());
 1002  
 1003          // make sure these don't die
 1004          let _ = s2.close_read();
 1005          let _ = s2.close_write();
 1006          let _ = s3.close_read();
 1007          let _ = s3.close_write();
 1008      })
 1009  
 1010      iotest!(fn close_read_wakes_up() {
 1011          let addr = next_test_ip4();
 1012          let a = TcpListener::bind(addr).listen().unwrap();
 1013          let (_tx, rx) = channel::<()>();
 1014          spawn(proc() {
 1015              let mut a = a;
 1016              let _s = a.accept().unwrap();
 1017              let _ = rx.recv_opt();
 1018          });
 1019  
 1020          let mut s = TcpStream::connect(addr).unwrap();
 1021          let s2 = s.clone();
 1022          let (tx, rx) = channel();
 1023          spawn(proc() {
 1024              let mut s2 = s2;
 1025              assert!(s2.read([0]).is_err());
 1026              tx.send(());
 1027          });
 1028          // this should wake up the child task
 1029          s.close_read().unwrap();
 1030  
 1031          // this test will never finish if the child doesn't wake up
 1032          rx.recv();
 1033      })
 1034  
 1035      iotest!(fn readwrite_timeouts() {
 1036          let addr = next_test_ip6();
 1037          let mut a = TcpListener::bind(addr).listen().unwrap();
 1038          let (tx, rx) = channel::<()>();
 1039          spawn(proc() {
 1040              let mut s = TcpStream::connect(addr).unwrap();
 1041              rx.recv();
 1042              assert!(s.write([0]).is_ok());
 1043              let _ = rx.recv_opt();
 1044          });
 1045  
 1046          let mut s = a.accept().unwrap();
 1047          s.set_timeout(Some(20));
 1048          assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
 1049          assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
 1050  
 1051          s.set_timeout(Some(20));
 1052          for i in range(0, 1001) {
 1053              match s.write([0, .. 128 * 1024]) {
 1054                  Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
 1055                  Err(IoError { kind: TimedOut, .. }) => break,
 1056                  Err(e) => fail!("{}", e),
 1057             }
 1058             if i == 1000 { fail!("should have filled up?!"); }
 1059          }
 1060          assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
 1061  
 1062          tx.send(());
 1063          s.set_timeout(None);
 1064          assert_eq!(s.read([0, 0]), Ok(1));
 1065      })
 1066  
 1067      iotest!(fn read_timeouts() {
 1068          let addr = next_test_ip6();
 1069          let mut a = TcpListener::bind(addr).listen().unwrap();
 1070          let (tx, rx) = channel::<()>();
 1071          spawn(proc() {
 1072              let mut s = TcpStream::connect(addr).unwrap();
 1073              rx.recv();
 1074              let mut amt = 0;
 1075              while amt < 100 * 128 * 1024 {
 1076                  match s.read([0, ..128 * 1024]) {
 1077                      Ok(n) => { amt += n; }
 1078                      Err(e) => fail!("{}", e),
 1079                  }
 1080              }
 1081              let _ = rx.recv_opt();
 1082          });
 1083  
 1084          let mut s = a.accept().unwrap();
 1085          s.set_read_timeout(Some(20));
 1086          assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
 1087          assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
 1088  
 1089          tx.send(());
 1090          for _ in range(0, 100) {
 1091              assert!(s.write([0, ..128 * 1024]).is_ok());
 1092          }
 1093      })
 1094  
 1095      iotest!(fn write_timeouts() {
 1096          let addr = next_test_ip6();
 1097          let mut a = TcpListener::bind(addr).listen().unwrap();
 1098          let (tx, rx) = channel::<()>();
 1099          spawn(proc() {
 1100              let mut s = TcpStream::connect(addr).unwrap();
 1101              rx.recv();
 1102              assert!(s.write([0]).is_ok());
 1103              let _ = rx.recv_opt();
 1104          });
 1105  
 1106          let mut s = a.accept().unwrap();
 1107          s.set_write_timeout(Some(20));
 1108          for i in range(0, 1001) {
 1109              match s.write([0, .. 128 * 1024]) {
 1110                  Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
 1111                  Err(IoError { kind: TimedOut, .. }) => break,
 1112                  Err(e) => fail!("{}", e),
 1113             }
 1114             if i == 1000 { fail!("should have filled up?!"); }
 1115          }
 1116          assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
 1117  
 1118          tx.send(());
 1119          assert!(s.read([0]).is_ok());
 1120      })
 1121  
 1122      iotest!(fn timeout_concurrent_read() {
 1123          let addr = next_test_ip6();
 1124          let mut a = TcpListener::bind(addr).listen().unwrap();
 1125          let (tx, rx) = channel::<()>();
 1126          spawn(proc() {
 1127              let mut s = TcpStream::connect(addr).unwrap();
 1128              rx.recv();
 1129              assert_eq!(s.write([0]), Ok(()));
 1130              let _ = rx.recv_opt();
 1131          });
 1132  
 1133          let mut s = a.accept().unwrap();
 1134          let s2 = s.clone();
 1135          let (tx2, rx2) = channel();
 1136          spawn(proc() {
 1137              let mut s2 = s2;
 1138              assert_eq!(s2.read([0]), Ok(1));
 1139              tx2.send(());
 1140          });
 1141  
 1142          s.set_read_timeout(Some(20));
 1143          assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
 1144          tx.send(());
 1145  
 1146          rx2.recv();
 1147      })
 1148  }


libstd/io/net/tcp.rs:281:8-281:8 -struct- definition:
/// ```
pub struct TcpListener {
    obj: Box<RtioTcpListener:Send>,
references:- 4
295:         LocalIo::maybe_raise(|io| {
296:             io.tcp_bind(addr).map(|l| TcpListener { obj: l })
297:         })
--
306: impl Listener<TcpStream, TcpAcceptor> for TcpListener {
307:     fn listen(self) -> IoResult<TcpAcceptor> {


libstd/io/net/tcp.rs:47:8-47:8 -struct- definition:
/// ```
pub struct TcpStream {
    obj: Box<RtioTcpStream:Send>,
references:- 13
231:     fn clone(&self) -> TcpStream {
232:         TcpStream { obj: self.obj.clone() }
233:     }
--
240: impl Writer for TcpStream {
241:     fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) }
--
306: impl Listener<TcpStream, TcpAcceptor> for TcpListener {
307:     fn listen(self) -> IoResult<TcpAcceptor> {
--
369: impl Acceptor<TcpStream> for TcpAcceptor {
370:     fn accept(&mut self) -> IoResult<TcpStream> {
371:         self.obj.accept().map(TcpStream::new)


libstd/io/net/tcp.rs:314:27-314:27 -struct- definition:
/// `TcpStream` instances.
pub struct TcpAcceptor {
    obj: Box<RtioTcpAcceptor:Send>,
references:- 5
307:     fn listen(self) -> IoResult<TcpAcceptor> {
308:         self.obj.listen().map(|acceptor| TcpAcceptor { obj: acceptor })
309:     }
--
319: impl TcpAcceptor {
320:     /// Prevents blocking on all future accepts after `ms` milliseconds have
--
369: impl Acceptor<TcpStream> for TcpAcceptor {
370:     fn accept(&mut self) -> IoResult<TcpStream> {