(index<- ) ./libstd/rt/io/net/tcp.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use option::{Option, Some, None};
12 use result::{Ok, Err};
13 use rt::io::net::ip::SocketAddr;
14 use rt::io::{Reader, Writer, Listener, Acceptor};
15 use rt::io::{io_error, read_error, EndOfFile};
16 use rt::rtio::{IoFactory, IoFactoryObject,
17 RtioSocket,
18 RtioTcpListener, RtioTcpListenerObject,
19 RtioTcpAcceptor, RtioTcpAcceptorObject,
20 RtioTcpStream, RtioTcpStreamObject};
21 use rt::local::Local;
22
23 pub struct TcpStream {
24 priv obj: ~RtioTcpStreamObject
25 }
26
27 impl TcpStream {
28 fn new(s: ~RtioTcpStreamObject) -> TcpStream {
29 TcpStream { obj: s }
30 }
31
32 pub fn connect(addr: SocketAddr) -> Option<TcpStream> {
33 let stream = unsafe {
34 rtdebug!("borrowing io to connect");
35 let io: *mut IoFactoryObject = Local::unsafe_borrow();
36 rtdebug!("about to connect");
37 (*io).tcp_connect(addr)
38 };
39
40 match stream {
41 Ok(s) => Some(TcpStream::new(s)),
42 Err(ioerr) => {
43 rtdebug!("failed to connect: {:?}", ioerr);
44 io_error::cond.raise(ioerr);
45 None
46 }
47 }
48 }
49
50 pub fn peer_name(&mut self) -> Option<SocketAddr> {
51 match self.obj.peer_name() {
52 Ok(pn) => Some(pn),
53 Err(ioerr) => {
54 rtdebug!("failed to get peer name: {:?}", ioerr);
55 io_error::cond.raise(ioerr);
56 None
57 }
58 }
59 }
60
61 pub fn socket_name(&mut self) -> Option<SocketAddr> {
62 match self.obj.socket_name() {
63 Ok(sn) => Some(sn),
64 Err(ioerr) => {
65 rtdebug!("failed to get socket name: {:?}", ioerr);
66 io_error::cond.raise(ioerr);
67 None
68 }
69 }
70 }
71 }
72
73 impl Reader for TcpStream {
74 fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
75 match self.obj.read(buf) {
76 Ok(read) => Some(read),
77 Err(ioerr) => {
78 // EOF is indicated by returning None
79 if ioerr.kind != EndOfFile {
80 read_error::cond.raise(ioerr);
81 }
82 return None;
83 }
84 }
85 }
86
87 fn eof(&mut self) -> bool { fail2!() }
88 }
89
90 impl Writer for TcpStream {
91 fn write(&mut self, buf: &[u8]) {
92 match self.obj.write(buf) {
93 Ok(_) => (),
94 Err(ioerr) => io_error::cond.raise(ioerr),
95 }
96 }
97
98 fn flush(&mut self) { /* no-op */ }
99 }
100
101 pub struct TcpListener {
102 priv obj: ~RtioTcpListenerObject
103 }
104
105 impl TcpListener {
106 pub fn bind(addr: SocketAddr) -> Option<TcpListener> {
107 let listener = unsafe {
108 let io: *mut IoFactoryObject = Local::unsafe_borrow();
109 (*io).tcp_bind(addr)
110 };
111 match listener {
112 Ok(l) => Some(TcpListener { obj: l }),
113 Err(ioerr) => {
114 io_error::cond.raise(ioerr);
115 return None;
116 }
117 }
118 }
119
120 pub fn socket_name(&mut self) -> Option<SocketAddr> {
121 match self.obj.socket_name() {
122 Ok(sn) => Some(sn),
123 Err(ioerr) => {
124 rtdebug!("failed to get socket name: {:?}", ioerr);
125 io_error::cond.raise(ioerr);
126 None
127 }
128 }
129 }
130 }
131
132 impl Listener<TcpStream, TcpAcceptor> for TcpListener {
133 fn listen(self) -> Option<TcpAcceptor> {
134 match self.obj.listen() {
135 Ok(acceptor) => Some(TcpAcceptor { obj: acceptor }),
136 Err(ioerr) => {
137 io_error::cond.raise(ioerr);
138 None
139 }
140 }
141 }
142 }
143
144 pub struct TcpAcceptor {
145 priv obj: ~RtioTcpAcceptorObject
146 }
147
148 impl Acceptor<TcpStream> for TcpAcceptor {
149 fn accept(&mut self) -> Option<TcpStream> {
150 match self.obj.accept() {
151 Ok(s) => Some(TcpStream::new(s)),
152 Err(ioerr) => {
153 io_error::cond.raise(ioerr);
154 None
155 }
156 }
157 }
158 }
159
160 #[cfg(test)]
161 mod test {
162 use super::*;
163 use cell::Cell;
164 use rt::test::*;
165 use rt::io::net::ip::{Ipv4Addr, SocketAddr};
166 use rt::io::*;
167 use prelude::*;
168 use rt::comm::oneshot;
169
170 #[test] #[ignore]
171 fn bind_error() {
172 do run_in_mt_newsched_task {
173 let mut called = false;
174 do io_error::cond.trap(|e| {
175 assert!(e.kind == PermissionDenied);
176 called = true;
177 }).inside {
178 let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
179 let listener = TcpListener::bind(addr);
180 assert!(listener.is_none());
181 }
182 assert!(called);
183 }
184 }
185
186 #[test]
187 fn connect_error() {
188 do run_in_mt_newsched_task {
189 let mut called = false;
190 do io_error::cond.trap(|e| {
191 let expected_error = if cfg!(unix) {
192 ConnectionRefused
193 } else {
194 // On Win32, opening port 1 gives WSAEADDRNOTAVAIL error.
195 OtherIoError
196 };
197 assert_eq!(e.kind, expected_error);
198 called = true;
199 }).inside {
200 let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
201 let stream = TcpStream::connect(addr);
202 assert!(stream.is_none());
203 }
204 assert!(called);
205 }
206 }
207
208 #[test]
209 fn smoke_test_ip4() {
210 do run_in_mt_newsched_task {
211 let addr = next_test_ip4();
212 let (port, chan) = oneshot();
213 let port = Cell::new(port);
214 let chan = Cell::new(chan);
215
216 do spawntask {
217 let mut acceptor = TcpListener::bind(addr).listen();
218 chan.take().send(());
219 let mut stream = acceptor.accept();
220 let mut buf = [0];
221 stream.read(buf);
222 assert!(buf[0] == 99);
223 }
224
225 do spawntask {
226 port.take().recv();
227 let mut stream = TcpStream::connect(addr);
228 stream.write([99]);
229 }
230 }
231 }
232
233 #[test]
234 fn smoke_test_ip6() {
235 do run_in_mt_newsched_task {
236 let addr = next_test_ip6();
237 let (port, chan) = oneshot();
238 let port = Cell::new(port);
239 let chan = Cell::new(chan);
240
241 do spawntask {
242 let mut acceptor = TcpListener::bind(addr).listen();
243 chan.take().send(());
244 let mut stream = acceptor.accept();
245 let mut buf = [0];
246 stream.read(buf);
247 assert!(buf[0] == 99);
248 }
249
250 do spawntask {
251 port.take().recv();
252 let mut stream = TcpStream::connect(addr);
253 stream.write([99]);
254 }
255 }
256 }
257
258 #[test]
259 fn read_eof_ip4() {
260 do run_in_mt_newsched_task {
261 let addr = next_test_ip4();
262 let (port, chan) = oneshot();
263 let port = Cell::new(port);
264 let chan = Cell::new(chan);
265
266 do spawntask {
267 let mut acceptor = TcpListener::bind(addr).listen();
268 chan.take().send(());
269 let mut stream = acceptor.accept();
270 let mut buf = [0];
271 let nread = stream.read(buf);
272 assert!(nread.is_none());
273 }
274
275 do spawntask {
276 port.take().recv();
277 let _stream = TcpStream::connect(addr);
278 // Close
279 }
280 }
281 }
282
283 #[test]
284 fn read_eof_ip6() {
285 do run_in_mt_newsched_task {
286 let addr = next_test_ip6();
287 let (port, chan) = oneshot();
288 let port = Cell::new(port);
289 let chan = Cell::new(chan);
290
291 do spawntask {
292 let mut acceptor = TcpListener::bind(addr).listen();
293 chan.take().send(());
294 let mut stream = acceptor.accept();
295 let mut buf = [0];
296 let nread = stream.read(buf);
297 assert!(nread.is_none());
298 }
299
300 do spawntask {
301 port.take().recv();
302 let _stream = TcpStream::connect(addr);
303 // Close
304 }
305 }
306 }
307
308 #[test]
309 fn read_eof_twice_ip4() {
310 do run_in_mt_newsched_task {
311 let addr = next_test_ip4();
312 let (port, chan) = oneshot();
313 let port = Cell::new(port);
314 let chan = Cell::new(chan);
315
316 do spawntask {
317 let mut acceptor = TcpListener::bind(addr).listen();
318 chan.take().send(());
319 let mut stream = acceptor.accept();
320 let mut buf = [0];
321 let nread = stream.read(buf);
322 assert!(nread.is_none());
323 do read_error::cond.trap(|e| {
324 if cfg!(windows) {
325 assert_eq!(e.kind, NotConnected);
326 } else {
327 fail2!();
328 }
329 }).inside {
330 let nread = stream.read(buf);
331 assert!(nread.is_none());
332 }
333 }
334
335 do spawntask {
336 port.take().recv();
337 let _stream = TcpStream::connect(addr);
338 // Close
339 }
340 }
341 }
342
343 #[test]
344 fn read_eof_twice_ip6() {
345 do run_in_mt_newsched_task {
346 let addr = next_test_ip6();
347 let (port, chan) = oneshot();
348 let port = Cell::new(port);
349 let chan = Cell::new(chan);
350
351 do spawntask {
352 let mut acceptor = TcpListener::bind(addr).listen();
353 chan.take().send(());
354 let mut stream = acceptor.accept();
355 let mut buf = [0];
356 let nread = stream.read(buf);
357 assert!(nread.is_none());
358 do read_error::cond.trap(|e| {
359 if cfg!(windows) {
360 assert_eq!(e.kind, NotConnected);
361 } else {
362 fail2!();
363 }
364 }).inside {
365 let nread = stream.read(buf);
366 assert!(nread.is_none());
367 }
368 }
369
370 do spawntask {
371 port.take().recv();
372 let _stream = TcpStream::connect(addr);
373 // Close
374 }
375 }
376 }
377
378 #[test]
379 #[ignore(cfg(windows))] // FIXME #8811
380 fn write_close_ip4() {
381 do run_in_mt_newsched_task {
382 let addr = next_test_ip4();
383 let (port, chan) = oneshot();
384 let port = Cell::new(port);
385 let chan = Cell::new(chan);
386
387 do spawntask {
388 let mut acceptor = TcpListener::bind(addr).listen();
389 chan.take().send(());
390 let mut stream = acceptor.accept();
391 let buf = [0];
392 loop {
393 let mut stop = false;
394 do io_error::cond.trap(|e| {
395 // NB: ECONNRESET on linux, EPIPE on mac
396 assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
397 stop = true;
398 }).inside {
399 stream.write(buf);
400 }
401 if stop { break }
402 }
403 }
404
405 do spawntask {
406 port.take().recv();
407 let _stream = TcpStream::connect(addr);
408 // Close
409 }
410 }
411 }
412
413 #[test]
414 #[ignore(cfg(windows))] // FIXME #8811
415 fn write_close_ip6() {
416 do run_in_mt_newsched_task {
417 let addr = next_test_ip6();
418 let (port, chan) = oneshot();
419 let port = Cell::new(port);
420 let chan = Cell::new(chan);
421
422 do spawntask {
423 let mut acceptor = TcpListener::bind(addr).listen();
424 chan.take().send(());
425 let mut stream = acceptor.accept();
426 let buf = [0];
427 loop {
428 let mut stop = false;
429 do io_error::cond.trap(|e| {
430 // NB: ECONNRESET on linux, EPIPE on mac
431 assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
432 stop = true;
433 }).inside {
434 stream.write(buf);
435 }
436 if stop { break }
437 }
438 }
439
440 do spawntask {
441 port.take().recv();
442 let _stream = TcpStream::connect(addr);
443 // Close
444 }
445 }
446 }
447
448 #[test]
449 fn multiple_connect_serial_ip4() {
450 do run_in_mt_newsched_task {
451 let addr = next_test_ip4();
452 let max = 10;
453 let (port, chan) = oneshot();
454 let port = Cell::new(port);
455 let chan = Cell::new(chan);
456
457 do spawntask {
458 let mut acceptor = TcpListener::bind(addr).listen();
459 chan.take().send(());
460 for ref mut stream in acceptor.incoming().take(max) {
461 let mut buf = [0];
462 stream.read(buf);
463 assert_eq!(buf[0], 99);
464 }
465 }
466
467 do spawntask {
468 port.take().recv();
469 do max.times {
470 let mut stream = TcpStream::connect(addr);
471 stream.write([99]);
472 }
473 }
474 }
475 }
476
477 #[test]
478 fn multiple_connect_serial_ip6() {
479 do run_in_mt_newsched_task {
480 let addr = next_test_ip6();
481 let max = 10;
482 let (port, chan) = oneshot();
483 let port = Cell::new(port);
484 let chan = Cell::new(chan);
485
486 do spawntask {
487 let mut acceptor = TcpListener::bind(addr).listen();
488 chan.take().send(());
489 for ref mut stream in acceptor.incoming().take(max) {
490 let mut buf = [0];
491 stream.read(buf);
492 assert_eq!(buf[0], 99);
493 }
494 }
495
496 do spawntask {
497 port.take().recv();
498 do max.times {
499 let mut stream = TcpStream::connect(addr);
500 stream.write([99]);
501 }
502 }
503 }
504 }
505
506 #[test]
507 fn multiple_connect_interleaved_greedy_schedule_ip4() {
508 do run_in_mt_newsched_task {
509 let addr = next_test_ip4();
510 static MAX: int = 10;
511 let (port, chan) = oneshot();
512 let chan = Cell::new(chan);
513
514 do spawntask {
515 let mut acceptor = TcpListener::bind(addr).listen();
516 chan.take().send(());
517 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
518 let stream = Cell::new(stream);
519 // Start another task to handle the connection
520 do spawntask {
521 let mut stream = stream.take();
522 let mut buf = [0];
523 stream.read(buf);
524 assert!(buf[0] == i as u8);
525 rtdebug!("read");
526 }
527 }
528 }
529
530 port.recv();
531 connect(0, addr);
532
533 fn connect(i: int, addr: SocketAddr) {
534 if i == MAX { return }
535
536 do spawntask {
537 rtdebug!("connecting");
538 let mut stream = TcpStream::connect(addr);
539 // Connect again before writing
540 connect(i + 1, addr);
541 rtdebug!("writing");
542 stream.write([i as u8]);
543 }
544 }
545 }
546 }
547
548 #[test]
549 fn multiple_connect_interleaved_greedy_schedule_ip6() {
550 do run_in_mt_newsched_task {
551 let addr = next_test_ip6();
552 static MAX: int = 10;
553 let (port, chan) = oneshot();
554 let chan = Cell::new(chan);
555
556 do spawntask {
557 let mut acceptor = TcpListener::bind(addr).listen();
558 chan.take().send(());
559 for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
560 let stream = Cell::new(stream);
561 // Start another task to handle the connection
562 do spawntask {
563 let mut stream = stream.take();
564 let mut buf = [0];
565 stream.read(buf);
566 assert!(buf[0] == i as u8);
567 rtdebug!("read");
568 }
569 }
570 }
571
572 port.recv();
573 connect(0, addr);
574
575 fn connect(i: int, addr: SocketAddr) {
576 if i == MAX { return }
577
578 do spawntask {
579 rtdebug!("connecting");
580 let mut stream = TcpStream::connect(addr);
581 // Connect again before writing
582 connect(i + 1, addr);
583 rtdebug!("writing");
584 stream.write([i as u8]);
585 }
586 }
587 }
588 }
589
590 #[test]
591 fn multiple_connect_interleaved_lazy_schedule_ip4() {
592 do run_in_mt_newsched_task {
593 let addr = next_test_ip4();
594 static MAX: int = 10;
595 let (port, chan) = oneshot();
596 let chan = Cell::new(chan);
597
598 do spawntask {
599 let mut acceptor = TcpListener::bind(addr).listen();
600 chan.take().send(());
601 for stream in acceptor.incoming().take(MAX as uint) {
602 let stream = Cell::new(stream);
603 // Start another task to handle the connection
604 do spawntask_later {
605 let mut stream = stream.take();
606 let mut buf = [0];
607 stream.read(buf);
608 assert!(buf[0] == 99);
609 rtdebug!("read");
610 }
611 }
612 }
613
614 port.recv();
615 connect(0, addr);
616
617 fn connect(i: int, addr: SocketAddr) {
618 if i == MAX { return }
619
620 do spawntask_later {
621 rtdebug!("connecting");
622 let mut stream = TcpStream::connect(addr);
623 // Connect again before writing
624 connect(i + 1, addr);
625 rtdebug!("writing");
626 stream.write([99]);
627 }
628 }
629 }
630 }
631 #[test]
632 fn multiple_connect_interleaved_lazy_schedule_ip6() {
633 do run_in_mt_newsched_task {
634 let addr = next_test_ip6();
635 static MAX: int = 10;
636 let (port, chan) = oneshot();
637 let chan = Cell::new(chan);
638
639 do spawntask {
640 let mut acceptor = TcpListener::bind(addr).listen();
641 chan.take().send(());
642 for stream in acceptor.incoming().take(MAX as uint) {
643 let stream = Cell::new(stream);
644 // Start another task to handle the connection
645 do spawntask_later {
646 let mut stream = stream.take();
647 let mut buf = [0];
648 stream.read(buf);
649 assert!(buf[0] == 99);
650 rtdebug!("read");
651 }
652 }
653 }
654
655 port.recv();
656 connect(0, addr);
657
658 fn connect(i: int, addr: SocketAddr) {
659 if i == MAX { return }
660
661 do spawntask_later {
662 rtdebug!("connecting");
663 let mut stream = TcpStream::connect(addr);
664 // Connect again before writing
665 connect(i + 1, addr);
666 rtdebug!("writing");
667 stream.write([99]);
668 }
669 }
670 }
671 }
672
673 #[cfg(test)]
674 fn socket_name(addr: SocketAddr) {
675 do run_in_mt_newsched_task {
676 do spawntask {
677 let mut listener = TcpListener::bind(addr).unwrap();
678
679 // Make sure socket_name gives
680 // us the socket we binded to.
681 let so_name = listener.socket_name();
682 assert!(so_name.is_some());
683 assert_eq!(addr, so_name.unwrap());
684
685 }
686 }
687 }
688
689 #[cfg(test)]
690 fn peer_name(addr: SocketAddr) {
691 do run_in_mt_newsched_task {
692 let (port, chan) = oneshot();
693 let port = Cell::new(port);
694 let chan = Cell::new(chan);
695
696 do spawntask {
697 let mut acceptor = TcpListener::bind(addr).listen();
698 chan.take().send(());
699
700 acceptor.accept();
701 }
702
703 do spawntask {
704 port.take().recv();
705 let stream = TcpStream::connect(addr);
706
707 assert!(stream.is_some());
708 let mut stream = stream.unwrap();
709
710 // Make sure peer_name gives us the
711 // address/port of the peer we've
712 // connected to.
713 let peer_name = stream.peer_name();
714 assert!(peer_name.is_some());
715 assert_eq!(addr, peer_name.unwrap());
716 }
717 }
718 }
719
720 #[test]
721 fn socket_and_peer_name_ip4() {
722 peer_name(next_test_ip4());
723 socket_name(next_test_ip4());
724 }
725
726 #[test]
727 fn socket_and_peer_name_ip6() {
728 // XXX: peer name is not consistent
729 //peer_name(next_test_ip6());
730 socket_name(next_test_ip6());
731 }
732
733 }