(index<- ) ./libstd/io/net/tcp.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! TCP network connections
12 //!
13 //! This module contains the ability to open a TCP stream to a socket address,
14 //! as well as creating a socket server to accept incoming connections. The
15 //! destination and binding addresses can either be an IPv4 or IPv6 address.
16 //!
17 //! A TCP connection implements the `Reader` and `Writer` traits, while the TCP
18 //! listener (socket server) implements the `Listener` and `Acceptor` traits.
19
20 use clone::Clone;
21 use io::IoResult;
22 use io::net::ip::SocketAddr;
23 use io::{Reader, Writer, Listener, Acceptor};
24 use kinds::Send;
25 use option::{None, Some, Option};
26 use owned::Box;
27 use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
28 use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
29
30 /// A structure which represents a TCP stream between a local socket and a
31 /// remote socket.
32 ///
33 /// # Example
34 ///
35 /// ```no_run
36 /// # #![allow(unused_must_use)]
37 /// use std::io::net::tcp::TcpStream;
38 /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
39 ///
40 /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
41 /// let mut stream = TcpStream::connect(addr);
42 ///
43 /// stream.write([1]);
44 /// let mut buf = [0];
45 /// stream.read(buf);
46 /// drop(stream); // close the connection
47 /// ```
48 pub struct TcpStream {
49 obj: Box<RtioTcpStream:Send>,
50 }
51
52 impl TcpStream {
53 fn new(s: Box<RtioTcpStream:Send>) -> TcpStream {
54 TcpStream { obj: s }
55 }
56
57 /// Creates a TCP connection to a remote socket address.
58 ///
59 /// If no error is encountered, then `Ok(stream)` is returned.
60 pub fn connect(addr: SocketAddr) -> IoResult<TcpStream> {
61 LocalIo::maybe_raise(|io| {
62 io.tcp_connect(addr, None).map(TcpStream::new)
63 })
64 }
65
66 /// Creates a TCP connection to a remote socket address, timing out after
67 /// the specified number of milliseconds.
68 ///
69 /// This is the same as the `connect` method, except that if the timeout
70 /// specified (in milliseconds) elapses before a connection is made an error
71 /// will be returned. The error's kind will be `TimedOut`.
72 #[experimental = "the timeout argument may eventually change types"]
73 pub fn connect_timeout(addr: SocketAddr,
74 timeout_ms: u64) -> IoResult<TcpStream> {
75 LocalIo::maybe_raise(|io| {
76 io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new)
77 })
78 }
79
80 /// Returns the socket address of the remote peer of this TCP connection.
81 pub fn peer_name(&mut self) -> IoResult<SocketAddr> {
82 self.obj.peer_name()
83 }
84
85 /// Returns the socket address of the local half of this TCP connection.
86 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
87 self.obj.socket_name()
88 }
89
90 /// Sets the nodelay flag on this connection to the boolean specified
91 #[experimental]
92 pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
93 if nodelay {
94 self.obj.nodelay()
95 } else {
96 self.obj.control_congestion()
97 }
98 }
99
100 /// Sets the keepalive timeout to the timeout specified.
101 ///
102 /// If the value specified is `None`, then the keepalive flag is cleared on
103 /// this connection. Otherwise, the keepalive timeout will be set to the
104 /// specified time, in seconds.
105 #[experimental]
106 pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> {
107 match delay_in_seconds {
108 Some(i) => self.obj.keepalive(i),
109 None => self.obj.letdie(),
110 }
111 }
112
113 /// Closes the reading half of this connection.
114 ///
115 /// This method will close the reading portion of this connection, causing
116 /// all pending and future reads to immediately return with an error.
117 ///
118 /// # Example
119 ///
120 /// ```no_run
121 /// # #![allow(unused_must_use)]
122 /// use std::io::timer;
123 /// use std::io::net::tcp::TcpStream;
124 /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
125 ///
126 /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
127 /// let mut stream = TcpStream::connect(addr).unwrap();
128 /// let stream2 = stream.clone();
129 ///
130 /// spawn(proc() {
131 /// // close this stream after one second
132 /// timer::sleep(1000);
133 /// let mut stream = stream2;
134 /// stream.close_read();
135 /// });
136 ///
137 /// // wait for some data, will get canceled after one second
138 /// let mut buf = [0];
139 /// stream.read(buf);
140 /// ```
141 ///
142 /// Note that this method affects all cloned handles associated with this
143 /// stream, not just this one handle.
144 pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() }
145
146 /// Closes the writing half of this connection.
147 ///
148 /// This method will close the writing portion of this connection, causing
149 /// all future writes to immediately return with an error.
150 ///
151 /// Note that this method affects all cloned handles associated with this
152 /// stream, not just this one handle.
153 pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
154
155 /// Sets a timeout, in milliseconds, for blocking operations on this stream.
156 ///
157 /// This function will set a timeout for all blocking operations (including
158 /// reads and writes) on this stream. The timeout specified is a relative
159 /// time, in milliseconds, into the future after which point operations will
160 /// time out. This means that the timeout must be reset periodically to keep
161 /// it from expiring. Specifying a value of `None` will clear the timeout
162 /// for this stream.
163 ///
164 /// The timeout on this stream is local to this stream only. Setting a
165 /// timeout does not affect any other cloned instances of this stream, nor
166 /// does the timeout propagated to cloned handles of this stream. Setting
167 /// this timeout will override any specific read or write timeouts
168 /// previously set for this stream.
169 ///
170 /// For clarification on the semantics of interrupting a read and a write,
171 /// take a look at `set_read_timeout` and `set_write_timeout`.
172 #[experimental = "the timeout argument may change in type and value"]
173 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
174 self.obj.set_timeout(timeout_ms)
175 }
176
177 /// Sets the timeout for read operations on this stream.
178 ///
179 /// See documentation in `set_timeout` for the semantics of this read time.
180 /// This will overwrite any previous read timeout set through either this
181 /// function or `set_timeout`.
182 ///
183 /// # Errors
184 ///
185 /// When this timeout expires, if there is no pending read operation, no
186 /// action is taken. Otherwise, the read operation will be scheduled to
187 /// promptly return. If a timeout error is returned, then no data was read
188 /// during the timeout period.
189 #[experimental = "the timeout argument may change in type and value"]
190 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
191 self.obj.set_read_timeout(timeout_ms)
192 }
193
194 /// Sets the timeout for write operations on this stream.
195 ///
196 /// See documentation in `set_timeout` for the semantics of this write time.
197 /// This will overwrite any previous write timeout set through either this
198 /// function or `set_timeout`.
199 ///
200 /// # Errors
201 ///
202 /// When this timeout expires, if there is no pending write operation, no
203 /// action is taken. Otherwise, the pending write operation will be
204 /// scheduled to promptly return. The actual state of the underlying stream
205 /// is not specified.
206 ///
207 /// The write operation may return an error of type `ShortWrite` which
208 /// indicates that the object is known to have written an exact number of
209 /// bytes successfully during the timeout period, and the remaining bytes
210 /// were never written.
211 ///
212 /// If the write operation returns `TimedOut`, then it the timeout primitive
213 /// does not know how many bytes were written as part of the timeout
214 /// operation. It may be the case that bytes continue to be written in an
215 /// asynchronous fashion after the call to write returns.
216 #[experimental = "the timeout argument may change in type and value"]
217 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
218 self.obj.set_write_timeout(timeout_ms)
219 }
220 }
221
222 impl Clone for TcpStream {
223 /// Creates a new handle to this TCP stream, allowing for simultaneous reads
224 /// and writes of this connection.
225 ///
226 /// The underlying TCP stream will not be closed until all handles to the
227 /// stream have been deallocated. All handles will also follow the same
228 /// stream, but two concurrent reads will not receive the same data.
229 /// Instead, the first read will receive the first packet received, and the
230 /// second read will receive the second packet.
231 fn clone(&self) -> TcpStream {
232 TcpStream { obj: self.obj.clone() }
233 }
234 }
235
236 impl Reader for TcpStream {
237 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) }
238 }
239
240 impl Writer for TcpStream {
241 fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) }
242 }
243
244 /// A structure representing a socket server. This listener is used to create a
245 /// `TcpAcceptor` which can be used to accept sockets on a local port.
246 ///
247 /// # Example
248 ///
249 /// ```rust
250 /// # fn main() { }
251 /// # fn foo() {
252 /// # #![allow(dead_code)]
253 /// use std::io::{TcpListener, TcpStream};
254 /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
255 /// use std::io::{Acceptor, Listener};
256 ///
257 /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 80 };
258 /// let listener = TcpListener::bind(addr);
259 ///
260 /// // bind the listener to the specified address
261 /// let mut acceptor = listener.listen();
262 ///
263 /// fn handle_client(mut stream: TcpStream) {
264 /// // ...
265 /// # &mut stream; // silence unused mutability/variable warning
266 /// }
267 /// // accept connections and process them, spawning a new tasks for each one
268 /// for stream in acceptor.incoming() {
269 /// match stream {
270 /// Err(e) => { /* connection failed */ }
271 /// Ok(stream) => spawn(proc() {
272 /// // connection succeeded
273 /// handle_client(stream)
274 /// })
275 /// }
276 /// }
277 ///
278 /// // close the socket server
279 /// drop(acceptor);
280 /// # }
281 /// ```
282 pub struct TcpListener {
283 obj: Box<RtioTcpListener:Send>,
284 }
285
286 impl TcpListener {
287 /// Creates a new `TcpListener` which will be bound to the specified local
288 /// socket address. This listener is not ready for accepting connections,
289 /// `listen` must be called on it before that's possible.
290 ///
291 /// Binding with a port number of 0 will request that the OS assigns a port
292 /// to this listener. The port allocated can be queried via the
293 /// `socket_name` function.
294 pub fn bind(addr: SocketAddr) -> IoResult<TcpListener> {
295 LocalIo::maybe_raise(|io| {
296 io.tcp_bind(addr).map(|l| TcpListener { obj: l })
297 })
298 }
299
300 /// Returns the local socket address of this listener.
301 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
302 self.obj.socket_name()
303 }
304 }
305
306 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
307 fn listen(self) -> IoResult<TcpAcceptor> {
308 self.obj.listen().map(|acceptor| TcpAcceptor { obj: acceptor })
309 }
310 }
311
312 /// The accepting half of a TCP socket server. This structure is created through
313 /// a `TcpListener`'s `listen` method, and this object can be used to accept new
314 /// `TcpStream` instances.
315 pub struct TcpAcceptor {
316 obj: Box<RtioTcpAcceptor:Send>,
317 }
318
319 impl TcpAcceptor {
320 /// Prevents blocking on all future accepts after `ms` milliseconds have
321 /// elapsed.
322 ///
323 /// This function is used to set a deadline after which this acceptor will
324 /// time out accepting any connections. The argument is the relative
325 /// distance, in milliseconds, to a point in the future after which all
326 /// accepts will fail.
327 ///
328 /// If the argument specified is `None`, then any previously registered
329 /// timeout is cleared.
330 ///
331 /// A timeout of `0` can be used to "poll" this acceptor to see if it has
332 /// any pending connections. All pending connections will be accepted,
333 /// regardless of whether the timeout has expired or not (the accept will
334 /// not block in this case).
335 ///
336 /// # Example
337 ///
338 /// ```no_run
339 /// # #![allow(experimental)]
340 /// use std::io::net::tcp::TcpListener;
341 /// use std::io::net::ip::{SocketAddr, Ipv4Addr};
342 /// use std::io::{Listener, Acceptor, TimedOut};
343 ///
344 /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 8482 };
345 /// let mut a = TcpListener::bind(addr).listen().unwrap();
346 ///
347 /// // After 100ms have passed, all accepts will fail
348 /// a.set_timeout(Some(100));
349 ///
350 /// match a.accept() {
351 /// Ok(..) => println!("accepted a socket"),
352 /// Err(ref e) if e.kind == TimedOut => { println!("timed out!"); }
353 /// Err(e) => println!("err: {}", e),
354 /// }
355 ///
356 /// // Reset the timeout and try again
357 /// a.set_timeout(Some(100));
358 /// let socket = a.accept();
359 ///
360 /// // Clear the timeout and block indefinitely waiting for a connection
361 /// a.set_timeout(None);
362 /// let socket = a.accept();
363 /// ```
364 #[experimental = "the type of the argument and name of this function are \
365 subject to change"]
366 pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); }
367 }
368
369 impl Acceptor<TcpStream> for TcpAcceptor {
370 fn accept(&mut self) -> IoResult<TcpStream> {
371 self.obj.accept().map(TcpStream::new)
372 }
373 }
374
375 #[cfg(test)]
376 #[allow(experimental)]
377 mod test {
378 use super::*;
379 use io::net::ip::SocketAddr;
380 use io::*;
381 use prelude::*;
382
383 // FIXME #11530 this fails on android because tests are run as root
384 iotest!(fn bind_error() {
385 let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
386 match TcpListener::bind(addr) {
387 Ok(..) => fail!(),
388 Err(e) => assert_eq!(e.kind, PermissionDenied),
389 }
390 } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
391
392 iotest!(fn connect_error() {
393 let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
394 match TcpStream::connect(addr) {
395 Ok(..) => fail!(),
396 Err(e) => assert_eq!(e.kind, ConnectionRefused),
397 }
398 })
399
400 iotest!(fn smoke_test_ip4() {
401 let addr = next_test_ip4();
402 let mut acceptor = TcpListener::bind(addr).listen();
403
404 spawn(proc() {
405 let mut stream = TcpStream::connect(addr);
406 stream.write([99]).unwrap();
407 });
408
409 let mut stream = acceptor.accept();
410 let mut buf = [0];
411 stream.read(buf).unwrap();
412 assert!(buf[0] == 99);
413 })
414
415 iotest!(fn smoke_test_ip6() {
416 let addr = next_test_ip6();
417 let mut acceptor = TcpListener::bind(addr).listen();
418
419 spawn(proc() {
420 let mut stream = TcpStream::connect(addr);
421 stream.write([99]).unwrap();
422 });
423
424 let mut stream = acceptor.accept();
425 let mut buf = [0];
426 stream.read(buf).unwrap();
427 assert!(buf[0] == 99);
428 })
429
430 iotest!(fn read_eof_ip4() {
431 let addr = next_test_ip4();
432 let mut acceptor = TcpListener::bind(addr).listen();
433
434 spawn(proc() {
435 let _stream = TcpStream::connect(addr);
436 // Close
437 });
438
439 let mut stream = acceptor.accept();
440 let mut buf = [0];
441 let nread = stream.read(buf);
442 assert!(nread.is_err());
443 })
444
445 iotest!(fn read_eof_ip6() {
446 let addr = next_test_ip6();
447 let mut acceptor = TcpListener::bind(addr).listen();
448
449 spawn(proc() {
450 let _stream = TcpStream::connect(addr);
451 // Close
452 });
453
454 let mut stream = acceptor.accept();
455 let mut buf = [0];
456 let nread = stream.read(buf);
457 assert!(nread.is_err());
458 })
459
460 iotest!(fn read_eof_twice_ip4() {
461 let addr = next_test_ip4();
462 let mut acceptor = TcpListener::bind(addr).listen();
463
464 spawn(proc() {
465 let _stream = TcpStream::connect(addr);
466 // Close
467 });
468
469 let mut stream = acceptor.accept();
470 let mut buf = [0];
471 let nread = stream.read(buf);
472 assert!(nread.is_err());
473
474 match stream.read(buf) {
475 Ok(..) => fail!(),
476 Err(ref e) => {
477 assert!(e.kind == NotConnected || e.kind == EndOfFile,
478 "unknown kind: {:?}", e.kind);
479 }
480 }
481 })
482
483 iotest!(fn read_eof_twice_ip6() {
484 let addr = next_test_ip6();
485 let mut acceptor = TcpListener::bind(addr).listen();
486
487 spawn(proc() {
488 let _stream = TcpStream::connect(addr);
489 // Close
490 });
491
492 let mut stream = acceptor.accept();
493 let mut buf = [0];
494 let nread = stream.read(buf);
495 assert!(nread.is_err());
496
497 match stream.read(buf) {
498 Ok(..) => fail!(),
499 Err(ref e) => {
500 assert!(e.kind == NotConnected || e.kind == EndOfFile,
501 "unknown kind: {:?}", e.kind);
502 }
503 }
504 })
505
506 iotest!(fn write_close_ip4() {
507 let addr = next_test_ip4();
508 let mut acceptor = TcpListener::bind(addr).listen();
509
510 spawn(proc() {
511 let _stream = TcpStream::connect(addr);
512 // Close
513 });
514
515 let mut stream = acceptor.accept();
516 let buf = [0];
517 loop {
518 match stream.write(buf) {
519 Ok(..) => {}
520 Err(e) => {
521 assert!(e.kind == ConnectionReset ||
522 e.kind == BrokenPipe ||
523 e.kind == ConnectionAborted,
524 "unknown error: {:?}", e);
525 break;
526 }
527 }
528 }
529 })
530
531 iotest!(fn write_close_ip6() {
532 let addr = next_test_ip6();
533 let mut acceptor = TcpListener::bind(addr).listen();
534
535 spawn(proc() {
536 let _stream = TcpStream::connect(addr);
537 // Close
538 });
539
540 let mut stream = acceptor.accept();
541 let buf = [0];
542 loop {
543 match stream.write(buf) {
544 Ok(..) => {}
545 Err(e) => {
546 assert!(e.kind == ConnectionReset ||
547 e.kind == BrokenPipe ||
548 e.kind == ConnectionAborted,
549 "unknown error: {:?}", e);
550 break;
551 }
552 }
553 }
554 })
555
556 iotest!(fn multiple_connect_serial_ip4() {
557 let addr = next_test_ip4();
558 let max = 10u;
559 let mut acceptor = TcpListener::bind(addr).listen();
560
561 spawn(proc() {
562 for _ in range(0, max) {
563 let mut stream = TcpStream::connect(addr);
564 stream.write([99]).unwrap();
565 }
566 });
567
568 for ref mut stream in acceptor.incoming().take(max) {
569 let mut buf = [0];
570 stream.read(buf).unwrap();
571 assert_eq!(buf[0], 99);
572 }
573 })
574
575 iotest!(fn multiple_connect_serial_ip6() {
576 let addr = next_test_ip6();
577 let max = 10u;
578 let mut acceptor = TcpListener::bind(addr).listen();
579
580 spawn(proc() {
581 for _ in range(0, max) {
582 let mut stream = TcpStream::connect(addr);
583 stream.write([99]).unwrap();
584 }
585 });
586
587 for ref mut stream in acceptor.incoming().take(max) {
588 let mut buf = [0];
589 stream.read(buf).unwrap();
590 assert_eq!(buf[0], 99);
591 }
592 })
593
594 iotest!(fn multiple_connect_interleaved_greedy_schedule_ip4() {
595 let addr = next_test_ip4();
596 static MAX: int = 10;
597 let acceptor = TcpListener::bind(addr).listen();
598
599 spawn(proc() {
600 let mut acceptor = acceptor;
601 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
602 // Start another task to handle the connection
603 spawn(proc() {
604 let mut stream = stream;
605 let mut buf = [0];
606 stream.read(buf).unwrap();
607 assert!(buf[0] == i as u8);
608 debug!("read");
609 });
610 }
611 });
612
613 connect(0, addr);
614
615 fn connect(i: int, addr: SocketAddr) {
616 if i == MAX { return }
617
618 spawn(proc() {
619 debug!("connecting");
620 let mut stream = TcpStream::connect(addr);
621 // Connect again before writing
622 connect(i + 1, addr);
623 debug!("writing");
624 stream.write([i as u8]).unwrap();
625 });
626 }
627 })
628
629 iotest!(fn multiple_connect_interleaved_greedy_schedule_ip6() {
630 let addr = next_test_ip6();
631 static MAX: int = 10;
632 let acceptor = TcpListener::bind(addr).listen();
633
634 spawn(proc() {
635 let mut acceptor = acceptor;
636 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
637 // Start another task to handle the connection
638 spawn(proc() {
639 let mut stream = stream;
640 let mut buf = [0];
641 stream.read(buf).unwrap();
642 assert!(buf[0] == i as u8);
643 debug!("read");
644 });
645 }
646 });
647
648 connect(0, addr);
649
650 fn connect(i: int, addr: SocketAddr) {
651 if i == MAX { return }
652
653 spawn(proc() {
654 debug!("connecting");
655 let mut stream = TcpStream::connect(addr);
656 // Connect again before writing
657 connect(i + 1, addr);
658 debug!("writing");
659 stream.write([i as u8]).unwrap();
660 });
661 }
662 })
663
664 iotest!(fn multiple_connect_interleaved_lazy_schedule_ip4() {
665 static MAX: int = 10;
666 let addr = next_test_ip4();
667 let acceptor = TcpListener::bind(addr).listen();
668
669 spawn(proc() {
670 let mut acceptor = acceptor;
671 for stream in acceptor.incoming().take(MAX as uint) {
672 // Start another task to handle the connection
673 spawn(proc() {
674 let mut stream = stream;
675 let mut buf = [0];
676 stream.read(buf).unwrap();
677 assert!(buf[0] == 99);
678 debug!("read");
679 });
680 }
681 });
682
683 connect(0, addr);
684
685 fn connect(i: int, addr: SocketAddr) {
686 if i == MAX { return }
687
688 spawn(proc() {
689 debug!("connecting");
690 let mut stream = TcpStream::connect(addr);
691 // Connect again before writing
692 connect(i + 1, addr);
693 debug!("writing");
694 stream.write([99]).unwrap();
695 });
696 }
697 })
698
699 iotest!(fn multiple_connect_interleaved_lazy_schedule_ip6() {
700 static MAX: int = 10;
701 let addr = next_test_ip6();
702 let acceptor = TcpListener::bind(addr).listen();
703
704 spawn(proc() {
705 let mut acceptor = acceptor;
706 for stream in acceptor.incoming().take(MAX as uint) {
707 // Start another task to handle the connection
708 spawn(proc() {
709 let mut stream = stream;
710 let mut buf = [0];
711 stream.read(buf).unwrap();
712 assert!(buf[0] == 99);
713 debug!("read");
714 });
715 }
716 });
717
718 connect(0, addr);
719
720 fn connect(i: int, addr: SocketAddr) {
721 if i == MAX { return }
722
723 spawn(proc() {
724 debug!("connecting");
725 let mut stream = TcpStream::connect(addr);
726 // Connect again before writing
727 connect(i + 1, addr);
728 debug!("writing");
729 stream.write([99]).unwrap();
730 });
731 }
732 })
733
734 pub fn socket_name(addr: SocketAddr) {
735 let mut listener = TcpListener::bind(addr).unwrap();
736
737 // Make sure socket_name gives
738 // us the socket we binded to.
739 let so_name = listener.socket_name();
740 assert!(so_name.is_ok());
741 assert_eq!(addr, so_name.unwrap());
742 }
743
744 pub fn peer_name(addr: SocketAddr) {
745 let acceptor = TcpListener::bind(addr).listen();
746 spawn(proc() {
747 let mut acceptor = acceptor;
748 acceptor.accept().unwrap();
749 });
750
751 let stream = TcpStream::connect(addr);
752
753 assert!(stream.is_ok());
754 let mut stream = stream.unwrap();
755
756 // Make sure peer_name gives us the
757 // address/port of the peer we've
758 // connected to.
759 let peer_name = stream.peer_name();
760 assert!(peer_name.is_ok());
761 assert_eq!(addr, peer_name.unwrap());
762 }
763
764 iotest!(fn socket_and_peer_name_ip4() {
765 peer_name(next_test_ip4());
766 socket_name(next_test_ip4());
767 })
768
769 iotest!(fn socket_and_peer_name_ip6() {
770 // FIXME: peer name is not consistent
771 //peer_name(next_test_ip6());
772 socket_name(next_test_ip6());
773 })
774
775 iotest!(fn partial_read() {
776 let addr = next_test_ip4();
777 let (tx, rx) = channel();
778 spawn(proc() {
779 let mut srv = TcpListener::bind(addr).listen().unwrap();
780 tx.send(());
781 let mut cl = srv.accept().unwrap();
782 cl.write([10]).unwrap();
783 let mut b = [0];
784 cl.read(b).unwrap();
785 tx.send(());
786 });
787
788 rx.recv();
789 let mut c = TcpStream::connect(addr).unwrap();
790 let mut b = [0, ..10];
791 assert_eq!(c.read(b), Ok(1));
792 c.write([1]).unwrap();
793 rx.recv();
794 })
795
796 iotest!(fn double_bind() {
797 let addr = next_test_ip4();
798 let listener = TcpListener::bind(addr).unwrap().listen();
799 assert!(listener.is_ok());
800 match TcpListener::bind(addr).listen() {
801 Ok(..) => fail!(),
802 Err(e) => {
803 assert!(e.kind == ConnectionRefused || e.kind == OtherIoError);
804 }
805 }
806 })
807
808 iotest!(fn fast_rebind() {
809 let addr = next_test_ip4();
810 let (tx, rx) = channel();
811
812 spawn(proc() {
813 rx.recv();
814 let _stream = TcpStream::connect(addr).unwrap();
815 // Close
816 rx.recv();
817 });
818
819 {
820 let mut acceptor = TcpListener::bind(addr).listen();
821 tx.send(());
822 {
823 let _stream = acceptor.accept().unwrap();
824 // Close client
825 tx.send(());
826 }
827 // Close listener
828 }
829 let _listener = TcpListener::bind(addr);
830 })
831
832 iotest!(fn tcp_clone_smoke() {
833 let addr = next_test_ip4();
834 let mut acceptor = TcpListener::bind(addr).listen();
835
836 spawn(proc() {
837 let mut s = TcpStream::connect(addr);
838 let mut buf = [0, 0];
839 assert_eq!(s.read(buf), Ok(1));
840 assert_eq!(buf[0], 1);
841 s.write([2]).unwrap();
842 });
843
844 let mut s1 = acceptor.accept().unwrap();
845 let s2 = s1.clone();
846
847 let (tx1, rx1) = channel();
848 let (tx2, rx2) = channel();
849 spawn(proc() {
850 let mut s2 = s2;
851 rx1.recv();
852 s2.write([1]).unwrap();
853 tx2.send(());
854 });
855 tx1.send(());
856 let mut buf = [0, 0];
857 assert_eq!(s1.read(buf), Ok(1));
858 rx2.recv();
859 })
860
861 iotest!(fn tcp_clone_two_read() {
862 let addr = next_test_ip6();
863 let mut acceptor = TcpListener::bind(addr).listen();
864 let (tx1, rx) = channel();
865 let tx2 = tx1.clone();
866
867 spawn(proc() {
868 let mut s = TcpStream::connect(addr);
869 s.write([1]).unwrap();
870 rx.recv();
871 s.write([2]).unwrap();
872 rx.recv();
873 });
874
875 let mut s1 = acceptor.accept().unwrap();
876 let s2 = s1.clone();
877
878 let (done, rx) = channel();
879 spawn(proc() {
880 let mut s2 = s2;
881 let mut buf = [0, 0];
882 s2.read(buf).unwrap();
883 tx2.send(());
884 done.send(());
885 });
886 let mut buf = [0, 0];
887 s1.read(buf).unwrap();
888 tx1.send(());
889
890 rx.recv();
891 })
892
893 iotest!(fn tcp_clone_two_write() {
894 let addr = next_test_ip4();
895 let mut acceptor = TcpListener::bind(addr).listen();
896
897 spawn(proc() {
898 let mut s = TcpStream::connect(addr);
899 let mut buf = [0, 1];
900 s.read(buf).unwrap();
901 s.read(buf).unwrap();
902 });
903
904 let mut s1 = acceptor.accept().unwrap();
905 let s2 = s1.clone();
906
907 let (done, rx) = channel();
908 spawn(proc() {
909 let mut s2 = s2;
910 s2.write([1]).unwrap();
911 done.send(());
912 });
913 s1.write([2]).unwrap();
914
915 rx.recv();
916 })
917
918 iotest!(fn shutdown_smoke() {
919 use rt::rtio::RtioTcpStream;
920
921 let addr = next_test_ip4();
922 let a = TcpListener::bind(addr).unwrap().listen();
923 spawn(proc() {
924 let mut a = a;
925 let mut c = a.accept().unwrap();
926 assert_eq!(c.read_to_end(), Ok(vec!()));
927 c.write([1]).unwrap();
928 });
929
930 let mut s = TcpStream::connect(addr).unwrap();
931 assert!(s.obj.close_write().is_ok());
932 assert!(s.write([1]).is_err());
933 assert_eq!(s.read_to_end(), Ok(vec!(1)));
934 })
935
936 iotest!(fn accept_timeout() {
937 let addr = next_test_ip4();
938 let mut a = TcpListener::bind(addr).unwrap().listen().unwrap();
939
940 a.set_timeout(Some(10));
941
942 // Make sure we time out once and future invocations also time out
943 let err = a.accept().err().unwrap();
944 assert_eq!(err.kind, TimedOut);
945 let err = a.accept().err().unwrap();
946 assert_eq!(err.kind, TimedOut);
947
948 // Also make sure that even though the timeout is expired that we will
949 // continue to receive any pending connections.
950 let (tx, rx) = channel();
951 spawn(proc() {
952 tx.send(TcpStream::connect(addr).unwrap());
953 });
954 let l = rx.recv();
955 for i in range(0, 1001) {
956 match a.accept() {
957 Ok(..) => break,
958 Err(ref e) if e.kind == TimedOut => {}
959 Err(e) => fail!("error: {}", e),
960 }
961 ::task::deschedule();
962 if i == 1000 { fail!("should have a pending connection") }
963 }
964 drop(l);
965
966 // Unset the timeout and make sure that this always blocks.
967 a.set_timeout(None);
968 spawn(proc() {
969 drop(TcpStream::connect(addr).unwrap());
970 });
971 a.accept().unwrap();
972 })
973
974 iotest!(fn close_readwrite_smoke() {
975 let addr = next_test_ip4();
976 let a = TcpListener::bind(addr).listen().unwrap();
977 let (_tx, rx) = channel::<()>();
978 spawn(proc() {
979 let mut a = a;
980 let _s = a.accept().unwrap();
981 let _ = rx.recv_opt();
982 });
983
984 let mut b = [0];
985 let mut s = TcpStream::connect(addr).unwrap();
986 let mut s2 = s.clone();
987
988 // closing should prevent reads/writes
989 s.close_write().unwrap();
990 assert!(s.write([0]).is_err());
991 s.close_read().unwrap();
992 assert!(s.read(b).is_err());
993
994 // closing should affect previous handles
995 assert!(s2.write([0]).is_err());
996 assert!(s2.read(b).is_err());
997
998 // closing should affect new handles
999 let mut s3 = s.clone();
1000 assert!(s3.write([0]).is_err());
1001 assert!(s3.read(b).is_err());
1002
1003 // make sure these don't die
1004 let _ = s2.close_read();
1005 let _ = s2.close_write();
1006 let _ = s3.close_read();
1007 let _ = s3.close_write();
1008 })
1009
1010 iotest!(fn close_read_wakes_up() {
1011 let addr = next_test_ip4();
1012 let a = TcpListener::bind(addr).listen().unwrap();
1013 let (_tx, rx) = channel::<()>();
1014 spawn(proc() {
1015 let mut a = a;
1016 let _s = a.accept().unwrap();
1017 let _ = rx.recv_opt();
1018 });
1019
1020 let mut s = TcpStream::connect(addr).unwrap();
1021 let s2 = s.clone();
1022 let (tx, rx) = channel();
1023 spawn(proc() {
1024 let mut s2 = s2;
1025 assert!(s2.read([0]).is_err());
1026 tx.send(());
1027 });
1028 // this should wake up the child task
1029 s.close_read().unwrap();
1030
1031 // this test will never finish if the child doesn't wake up
1032 rx.recv();
1033 })
1034
1035 iotest!(fn readwrite_timeouts() {
1036 let addr = next_test_ip6();
1037 let mut a = TcpListener::bind(addr).listen().unwrap();
1038 let (tx, rx) = channel::<()>();
1039 spawn(proc() {
1040 let mut s = TcpStream::connect(addr).unwrap();
1041 rx.recv();
1042 assert!(s.write([0]).is_ok());
1043 let _ = rx.recv_opt();
1044 });
1045
1046 let mut s = a.accept().unwrap();
1047 s.set_timeout(Some(20));
1048 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1049 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1050
1051 s.set_timeout(Some(20));
1052 for i in range(0, 1001) {
1053 match s.write([0, .. 128 * 1024]) {
1054 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1055 Err(IoError { kind: TimedOut, .. }) => break,
1056 Err(e) => fail!("{}", e),
1057 }
1058 if i == 1000 { fail!("should have filled up?!"); }
1059 }
1060 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1061
1062 tx.send(());
1063 s.set_timeout(None);
1064 assert_eq!(s.read([0, 0]), Ok(1));
1065 })
1066
1067 iotest!(fn read_timeouts() {
1068 let addr = next_test_ip6();
1069 let mut a = TcpListener::bind(addr).listen().unwrap();
1070 let (tx, rx) = channel::<()>();
1071 spawn(proc() {
1072 let mut s = TcpStream::connect(addr).unwrap();
1073 rx.recv();
1074 let mut amt = 0;
1075 while amt < 100 * 128 * 1024 {
1076 match s.read([0, ..128 * 1024]) {
1077 Ok(n) => { amt += n; }
1078 Err(e) => fail!("{}", e),
1079 }
1080 }
1081 let _ = rx.recv_opt();
1082 });
1083
1084 let mut s = a.accept().unwrap();
1085 s.set_read_timeout(Some(20));
1086 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1087 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1088
1089 tx.send(());
1090 for _ in range(0, 100) {
1091 assert!(s.write([0, ..128 * 1024]).is_ok());
1092 }
1093 })
1094
1095 iotest!(fn write_timeouts() {
1096 let addr = next_test_ip6();
1097 let mut a = TcpListener::bind(addr).listen().unwrap();
1098 let (tx, rx) = channel::<()>();
1099 spawn(proc() {
1100 let mut s = TcpStream::connect(addr).unwrap();
1101 rx.recv();
1102 assert!(s.write([0]).is_ok());
1103 let _ = rx.recv_opt();
1104 });
1105
1106 let mut s = a.accept().unwrap();
1107 s.set_write_timeout(Some(20));
1108 for i in range(0, 1001) {
1109 match s.write([0, .. 128 * 1024]) {
1110 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
1111 Err(IoError { kind: TimedOut, .. }) => break,
1112 Err(e) => fail!("{}", e),
1113 }
1114 if i == 1000 { fail!("should have filled up?!"); }
1115 }
1116 assert_eq!(s.write([0]).err().unwrap().kind, TimedOut);
1117
1118 tx.send(());
1119 assert!(s.read([0]).is_ok());
1120 })
1121
1122 iotest!(fn timeout_concurrent_read() {
1123 let addr = next_test_ip6();
1124 let mut a = TcpListener::bind(addr).listen().unwrap();
1125 let (tx, rx) = channel::<()>();
1126 spawn(proc() {
1127 let mut s = TcpStream::connect(addr).unwrap();
1128 rx.recv();
1129 assert_eq!(s.write([0]), Ok(()));
1130 let _ = rx.recv_opt();
1131 });
1132
1133 let mut s = a.accept().unwrap();
1134 let s2 = s.clone();
1135 let (tx2, rx2) = channel();
1136 spawn(proc() {
1137 let mut s2 = s2;
1138 assert_eq!(s2.read([0]), Ok(1));
1139 tx2.send(());
1140 });
1141
1142 s.set_read_timeout(Some(20));
1143 assert_eq!(s.read([0]).err().unwrap().kind, TimedOut);
1144 tx.send(());
1145
1146 rx2.recv();
1147 })
1148 }