(index<- )        ./libstd/rt/io/net/tcp.rs

   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  use option::{Option, Some, None};
  12  use result::{Ok, Err};
  13  use rt::io::net::ip::SocketAddr;
  14  use rt::io::{Reader, Writer, Listener, Acceptor};
  15  use rt::io::{io_error, read_error, EndOfFile};
  16  use rt::rtio::{IoFactory, IoFactoryObject,
  17                 RtioSocket,
  18                 RtioTcpListener, RtioTcpListenerObject,
  19                 RtioTcpAcceptor, RtioTcpAcceptorObject,
  20                 RtioTcpStream, RtioTcpStreamObject};
  21  use rt::local::Local;
  22  
  23  pub struct TcpStream {
  24      priv obj: ~RtioTcpStreamObject
  25  }
  26  
  27  impl TcpStream {
  28      fn new(s~RtioTcpStreamObject) -> TcpStream {
  29          TcpStream { obj: s }
  30      }
  31  
  32      pub fn connect(addrSocketAddr) -> Option<TcpStream> {
  33          let stream = unsafe {
  34              rtdebug!("borrowing io to connect");
  35              let io*mut IoFactoryObject = Local::unsafe_borrow();
  36              rtdebug!("about to connect");
  37              (*io).tcp_connect(addr)
  38          };
  39  
  40          match stream {
  41              Ok(s) => Some(TcpStream::new(s)),
  42              Err(ioerr) => {
  43                  rtdebug!("failed to connect: {:?}", ioerr);
  44                  io_error::cond.raise(ioerr);
  45                  None
  46              }
  47          }
  48      }
  49  
  50      pub fn peer_name(&mut self) -> Option<SocketAddr> {
  51          match self.obj.peer_name() {
  52              Ok(pn) => Some(pn),
  53              Err(ioerr) => {
  54                  rtdebug!("failed to get peer name: {:?}", ioerr);
  55                  io_error::cond.raise(ioerr);
  56                  None
  57              }
  58          }
  59      }
  60  
  61      pub fn socket_name(&mut self) -> Option<SocketAddr> {
  62          match self.obj.socket_name() {
  63              Ok(sn) => Some(sn),
  64              Err(ioerr) => {
  65                  rtdebug!("failed to get socket name: {:?}", ioerr);
  66                  io_error::cond.raise(ioerr);
  67                  None
  68              }
  69          }
  70      }
  71  }
  72  
  73  impl Reader for TcpStream {
  74      fn read(&mut self, buf&mut [u8]) -> Option<uint> {
  75          match self.obj.read(buf) {
  76              Ok(read) => Some(read),
  77              Err(ioerr) => {
  78                  // EOF is indicated by returning None
  79                  if ioerr.kind != EndOfFile {
  80                      read_error::cond.raise(ioerr);
  81                  }
  82                  return None;
  83              }
  84          }
  85      }
  86  
  87      fn eof(&mut self) -> bool { fail2!() }
  88  }
  89  
  90  impl Writer for TcpStream {
  91      fn write(&mut self, buf&[u8]) {
  92          match self.obj.write(buf) {
  93              Ok(_) => (),
  94              Err(ioerr) => io_error::cond.raise(ioerr),
  95          }
  96      }
  97  
  98      fn flush(&mut self) { /* no-op */ }
  99  }
 100  
 101  pub struct TcpListener {
 102      priv obj: ~RtioTcpListenerObject
 103  }
 104  
 105  impl TcpListener {
 106      pub fn bind(addrSocketAddr) -> Option<TcpListener> {
 107          let listener = unsafe {
 108              let io*mut IoFactoryObject = Local::unsafe_borrow();
 109              (*io).tcp_bind(addr)
 110          };
 111          match listener {
 112              Ok(l) => Some(TcpListener { obj: l }),
 113              Err(ioerr) => {
 114                  io_error::cond.raise(ioerr);
 115                  return None;
 116              }
 117          }
 118      }
 119  
 120      pub fn socket_name(&mut self) -> Option<SocketAddr> {
 121          match self.obj.socket_name() {
 122              Ok(sn) => Some(sn),
 123              Err(ioerr) => {
 124                  rtdebug!("failed to get socket name: {:?}", ioerr);
 125                  io_error::cond.raise(ioerr);
 126                  None
 127              }
 128          }
 129      }
 130  }
 131  
 132  impl Listener<TcpStream, TcpAcceptor> for TcpListener {
 133      fn listen(self) -> Option<TcpAcceptor> {
 134          match self.obj.listen() {
 135              Ok(acceptor) => Some(TcpAcceptor { obj: acceptor }),
 136              Err(ioerr) => {
 137                  io_error::cond.raise(ioerr);
 138                  None
 139              }
 140          }
 141      }
 142  }
 143  
 144  pub struct TcpAcceptor {
 145      priv obj: ~RtioTcpAcceptorObject
 146  }
 147  
 148  impl Acceptor<TcpStream> for TcpAcceptor {
 149      fn accept(&mut self) -> Option<TcpStream> {
 150          match self.obj.accept() {
 151              Ok(s) => Some(TcpStream::new(s)),
 152              Err(ioerr) => {
 153                  io_error::cond.raise(ioerr);
 154                  None
 155              }
 156          }
 157      }
 158  }
 159  
 160  #[cfg(test)]
 161  mod test {
 162      use super::*;
 163      use cell::Cell;
 164      use rt::test::*;
 165      use rt::io::net::ip::{Ipv4Addr, SocketAddr};
 166      use rt::io::*;
 167      use prelude::*;
 168      use rt::comm::oneshot;
 169  
 170      #[test] #[ignore]
 171      fn bind_error() {
 172          do run_in_mt_newsched_task {
 173              let mut called = false;
 174              do io_error::cond.trap(|e| {
 175                  assert!(e.kind == PermissionDenied);
 176                  called = true;
 177              }).inside {
 178                  let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
 179                  let listener = TcpListener::bind(addr);
 180                  assert!(listener.is_none());
 181              }
 182              assert!(called);
 183          }
 184      }
 185  
 186      #[test]
 187      fn connect_error() {
 188          do run_in_mt_newsched_task {
 189              let mut called = false;
 190              do io_error::cond.trap(|e| {
 191                  let expected_error = if cfg!(unix) {
 192                      ConnectionRefused
 193                  } else {
 194                      // On Win32, opening port 1 gives WSAEADDRNOTAVAIL error.
 195                      OtherIoError
 196                  };
 197                  assert_eq!(e.kind, expected_error);
 198                  called = true;
 199              }).inside {
 200                  let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
 201                  let stream = TcpStream::connect(addr);
 202                  assert!(stream.is_none());
 203              }
 204              assert!(called);
 205          }
 206      }
 207  
 208      #[test]
 209      fn smoke_test_ip4() {
 210          do run_in_mt_newsched_task {
 211              let addr = next_test_ip4();
 212              let (port, chan) = oneshot();
 213              let port = Cell::new(port);
 214              let chan = Cell::new(chan);
 215  
 216              do spawntask {
 217                  let mut acceptor = TcpListener::bind(addr).listen();
 218                  chan.take().send(());
 219                  let mut stream = acceptor.accept();
 220                  let mut buf = [0];
 221                  stream.read(buf);
 222                  assert!(buf[0] == 99);
 223              }
 224  
 225              do spawntask {
 226                  port.take().recv();
 227                  let mut stream = TcpStream::connect(addr);
 228                  stream.write([99]);
 229              }
 230          }
 231      }
 232  
 233      #[test]
 234      fn smoke_test_ip6() {
 235          do run_in_mt_newsched_task {
 236              let addr = next_test_ip6();
 237              let (port, chan) = oneshot();
 238              let port = Cell::new(port);
 239              let chan = Cell::new(chan);
 240  
 241              do spawntask {
 242                  let mut acceptor = TcpListener::bind(addr).listen();
 243                  chan.take().send(());
 244                  let mut stream = acceptor.accept();
 245                  let mut buf = [0];
 246                  stream.read(buf);
 247                  assert!(buf[0] == 99);
 248              }
 249  
 250              do spawntask {
 251                  port.take().recv();
 252                  let mut stream = TcpStream::connect(addr);
 253                  stream.write([99]);
 254              }
 255          }
 256      }
 257  
 258      #[test]
 259      fn read_eof_ip4() {
 260          do run_in_mt_newsched_task {
 261              let addr = next_test_ip4();
 262              let (port, chan) = oneshot();
 263              let port = Cell::new(port);
 264              let chan = Cell::new(chan);
 265  
 266              do spawntask {
 267                  let mut acceptor = TcpListener::bind(addr).listen();
 268                  chan.take().send(());
 269                  let mut stream = acceptor.accept();
 270                  let mut buf = [0];
 271                  let nread = stream.read(buf);
 272                  assert!(nread.is_none());
 273              }
 274  
 275              do spawntask {
 276                  port.take().recv();
 277                  let _stream = TcpStream::connect(addr);
 278                  // Close
 279              }
 280          }
 281      }
 282  
 283      #[test]
 284      fn read_eof_ip6() {
 285          do run_in_mt_newsched_task {
 286              let addr = next_test_ip6();
 287              let (port, chan) = oneshot();
 288              let port = Cell::new(port);
 289              let chan = Cell::new(chan);
 290  
 291              do spawntask {
 292                  let mut acceptor = TcpListener::bind(addr).listen();
 293                  chan.take().send(());
 294                  let mut stream = acceptor.accept();
 295                  let mut buf = [0];
 296                  let nread = stream.read(buf);
 297                  assert!(nread.is_none());
 298              }
 299  
 300              do spawntask {
 301                  port.take().recv();
 302                  let _stream = TcpStream::connect(addr);
 303                  // Close
 304              }
 305          }
 306      }
 307  
 308      #[test]
 309      fn read_eof_twice_ip4() {
 310          do run_in_mt_newsched_task {
 311              let addr = next_test_ip4();
 312              let (port, chan) = oneshot();
 313              let port = Cell::new(port);
 314              let chan = Cell::new(chan);
 315  
 316              do spawntask {
 317                  let mut acceptor = TcpListener::bind(addr).listen();
 318                  chan.take().send(());
 319                  let mut stream = acceptor.accept();
 320                  let mut buf = [0];
 321                  let nread = stream.read(buf);
 322                  assert!(nread.is_none());
 323                  do read_error::cond.trap(|e| {
 324                      if cfg!(windows) {
 325                          assert_eq!(e.kind, NotConnected);
 326                      } else {
 327                          fail2!();
 328                      }
 329                  }).inside {
 330                      let nread = stream.read(buf);
 331                      assert!(nread.is_none());
 332                  }
 333              }
 334  
 335              do spawntask {
 336                  port.take().recv();
 337                  let _stream = TcpStream::connect(addr);
 338                  // Close
 339              }
 340          }
 341      }
 342  
 343      #[test]
 344      fn read_eof_twice_ip6() {
 345          do run_in_mt_newsched_task {
 346              let addr = next_test_ip6();
 347              let (port, chan) = oneshot();
 348              let port = Cell::new(port);
 349              let chan = Cell::new(chan);
 350  
 351              do spawntask {
 352                  let mut acceptor = TcpListener::bind(addr).listen();
 353                  chan.take().send(());
 354                  let mut stream = acceptor.accept();
 355                  let mut buf = [0];
 356                  let nread = stream.read(buf);
 357                  assert!(nread.is_none());
 358                  do read_error::cond.trap(|e| {
 359                      if cfg!(windows) {
 360                          assert_eq!(e.kind, NotConnected);
 361                      } else {
 362                          fail2!();
 363                      }
 364                  }).inside {
 365                      let nread = stream.read(buf);
 366                      assert!(nread.is_none());
 367                  }
 368              }
 369  
 370              do spawntask {
 371                  port.take().recv();
 372                  let _stream = TcpStream::connect(addr);
 373                  // Close
 374              }
 375          }
 376      }
 377  
 378      #[test]
 379      #[ignore(cfg(windows))] // FIXME #8811
 380      fn write_close_ip4() {
 381          do run_in_mt_newsched_task {
 382              let addr = next_test_ip4();
 383              let (port, chan) = oneshot();
 384              let port = Cell::new(port);
 385              let chan = Cell::new(chan);
 386  
 387              do spawntask {
 388                  let mut acceptor = TcpListener::bind(addr).listen();
 389                  chan.take().send(());
 390                  let mut stream = acceptor.accept();
 391                  let buf = [0];
 392                  loop {
 393                      let mut stop = false;
 394                      do io_error::cond.trap(|e| {
 395                          // NB: ECONNRESET on linux, EPIPE on mac
 396                          assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
 397                          stop = true;
 398                      }).inside {
 399                          stream.write(buf);
 400                      }
 401                      if stop { break }
 402                  }
 403              }
 404  
 405              do spawntask {
 406                  port.take().recv();
 407                  let _stream = TcpStream::connect(addr);
 408                  // Close
 409              }
 410          }
 411      }
 412  
 413      #[test]
 414      #[ignore(cfg(windows))] // FIXME #8811
 415      fn write_close_ip6() {
 416          do run_in_mt_newsched_task {
 417              let addr = next_test_ip6();
 418              let (port, chan) = oneshot();
 419              let port = Cell::new(port);
 420              let chan = Cell::new(chan);
 421  
 422              do spawntask {
 423                  let mut acceptor = TcpListener::bind(addr).listen();
 424                  chan.take().send(());
 425                  let mut stream = acceptor.accept();
 426                  let buf = [0];
 427                  loop {
 428                      let mut stop = false;
 429                      do io_error::cond.trap(|e| {
 430                          // NB: ECONNRESET on linux, EPIPE on mac
 431                          assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
 432                          stop = true;
 433                      }).inside {
 434                          stream.write(buf);
 435                      }
 436                      if stop { break }
 437                  }
 438              }
 439  
 440              do spawntask {
 441                  port.take().recv();
 442                  let _stream = TcpStream::connect(addr);
 443                  // Close
 444              }
 445          }
 446      }
 447  
 448      #[test]
 449      fn multiple_connect_serial_ip4() {
 450          do run_in_mt_newsched_task {
 451              let addr = next_test_ip4();
 452              let max = 10;
 453              let (port, chan) = oneshot();
 454              let port = Cell::new(port);
 455              let chan = Cell::new(chan);
 456  
 457              do spawntask {
 458                  let mut acceptor = TcpListener::bind(addr).listen();
 459                  chan.take().send(());
 460                  for ref mut stream in acceptor.incoming().take(max) {
 461                      let mut buf = [0];
 462                      stream.read(buf);
 463                      assert_eq!(buf[0], 99);
 464                  }
 465              }
 466  
 467              do spawntask {
 468                  port.take().recv();
 469                  do max.times {
 470                      let mut stream = TcpStream::connect(addr);
 471                      stream.write([99]);
 472                  }
 473              }
 474          }
 475      }
 476  
 477      #[test]
 478      fn multiple_connect_serial_ip6() {
 479          do run_in_mt_newsched_task {
 480              let addr = next_test_ip6();
 481              let max = 10;
 482              let (port, chan) = oneshot();
 483              let port = Cell::new(port);
 484              let chan = Cell::new(chan);
 485  
 486              do spawntask {
 487                  let mut acceptor = TcpListener::bind(addr).listen();
 488                  chan.take().send(());
 489                  for ref mut stream in acceptor.incoming().take(max) {
 490                      let mut buf = [0];
 491                      stream.read(buf);
 492                      assert_eq!(buf[0], 99);
 493                  }
 494              }
 495  
 496              do spawntask {
 497                  port.take().recv();
 498                  do max.times {
 499                      let mut stream = TcpStream::connect(addr);
 500                      stream.write([99]);
 501                  }
 502              }
 503          }
 504      }
 505  
 506      #[test]
 507      fn multiple_connect_interleaved_greedy_schedule_ip4() {
 508          do run_in_mt_newsched_task {
 509              let addr = next_test_ip4();
 510              static MAX: int = 10;
 511              let (port, chan) = oneshot();
 512              let chan = Cell::new(chan);
 513  
 514              do spawntask {
 515                  let mut acceptor = TcpListener::bind(addr).listen();
 516                  chan.take().send(());
 517                  for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
 518                      let stream = Cell::new(stream);
 519                      // Start another task to handle the connection
 520                      do spawntask {
 521                          let mut stream = stream.take();
 522                          let mut buf = [0];
 523                          stream.read(buf);
 524                          assert!(buf[0] == i as u8);
 525                          rtdebug!("read");
 526                      }
 527                  }
 528              }
 529  
 530              port.recv();
 531              connect(0, addr);
 532  
 533              fn connect(i: int, addr: SocketAddr) {
 534                  if i == MAX { return }
 535  
 536                  do spawntask {
 537                      rtdebug!("connecting");
 538                      let mut stream = TcpStream::connect(addr);
 539                      // Connect again before writing
 540                      connect(i + 1, addr);
 541                      rtdebug!("writing");
 542                      stream.write([i as u8]);
 543                  }
 544              }
 545          }
 546      }
 547  
 548      #[test]
 549      fn multiple_connect_interleaved_greedy_schedule_ip6() {
 550          do run_in_mt_newsched_task {
 551              let addr = next_test_ip6();
 552              static MAX: int = 10;
 553              let (port, chan) = oneshot();
 554              let chan = Cell::new(chan);
 555  
 556              do spawntask {
 557                  let mut acceptor = TcpListener::bind(addr).listen();
 558                  chan.take().send(());
 559                  for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
 560                      let stream = Cell::new(stream);
 561                      // Start another task to handle the connection
 562                      do spawntask {
 563                          let mut stream = stream.take();
 564                          let mut buf = [0];
 565                          stream.read(buf);
 566                          assert!(buf[0] == i as u8);
 567                          rtdebug!("read");
 568                      }
 569                  }
 570              }
 571  
 572              port.recv();
 573              connect(0, addr);
 574  
 575              fn connect(i: int, addr: SocketAddr) {
 576                  if i == MAX { return }
 577  
 578                  do spawntask {
 579                      rtdebug!("connecting");
 580                      let mut stream = TcpStream::connect(addr);
 581                      // Connect again before writing
 582                      connect(i + 1, addr);
 583                      rtdebug!("writing");
 584                      stream.write([i as u8]);
 585                  }
 586              }
 587          }
 588      }
 589  
 590      #[test]
 591      fn multiple_connect_interleaved_lazy_schedule_ip4() {
 592          do run_in_mt_newsched_task {
 593              let addr = next_test_ip4();
 594              static MAX: int = 10;
 595              let (port, chan) = oneshot();
 596              let chan = Cell::new(chan);
 597  
 598              do spawntask {
 599                  let mut acceptor = TcpListener::bind(addr).listen();
 600                  chan.take().send(());
 601                  for stream in acceptor.incoming().take(MAX as uint) {
 602                      let stream = Cell::new(stream);
 603                      // Start another task to handle the connection
 604                      do spawntask_later {
 605                          let mut stream = stream.take();
 606                          let mut buf = [0];
 607                          stream.read(buf);
 608                          assert!(buf[0] == 99);
 609                          rtdebug!("read");
 610                      }
 611                  }
 612              }
 613  
 614              port.recv();
 615              connect(0, addr);
 616  
 617              fn connect(i: int, addr: SocketAddr) {
 618                  if i == MAX { return }
 619  
 620                  do spawntask_later {
 621                      rtdebug!("connecting");
 622                      let mut stream = TcpStream::connect(addr);
 623                      // Connect again before writing
 624                      connect(i + 1, addr);
 625                      rtdebug!("writing");
 626                      stream.write([99]);
 627                  }
 628              }
 629          }
 630      }
 631      #[test]
 632      fn multiple_connect_interleaved_lazy_schedule_ip6() {
 633          do run_in_mt_newsched_task {
 634              let addr = next_test_ip6();
 635              static MAX: int = 10;
 636              let (port, chan) = oneshot();
 637              let chan = Cell::new(chan);
 638  
 639              do spawntask {
 640                  let mut acceptor = TcpListener::bind(addr).listen();
 641                  chan.take().send(());
 642                  for stream in acceptor.incoming().take(MAX as uint) {
 643                      let stream = Cell::new(stream);
 644                      // Start another task to handle the connection
 645                      do spawntask_later {
 646                          let mut stream = stream.take();
 647                          let mut buf = [0];
 648                          stream.read(buf);
 649                          assert!(buf[0] == 99);
 650                          rtdebug!("read");
 651                      }
 652                  }
 653              }
 654  
 655              port.recv();
 656              connect(0, addr);
 657  
 658              fn connect(i: int, addr: SocketAddr) {
 659                  if i == MAX { return }
 660  
 661                  do spawntask_later {
 662                      rtdebug!("connecting");
 663                      let mut stream = TcpStream::connect(addr);
 664                      // Connect again before writing
 665                      connect(i + 1, addr);
 666                      rtdebug!("writing");
 667                      stream.write([99]);
 668                  }
 669              }
 670          }
 671      }
 672  
 673      #[cfg(test)]
 674      fn socket_name(addr: SocketAddr) {
 675          do run_in_mt_newsched_task {
 676              do spawntask {
 677                  let mut listener = TcpListener::bind(addr).unwrap();
 678  
 679                  // Make sure socket_name gives
 680                  // us the socket we binded to.
 681                  let so_name = listener.socket_name();
 682                  assert!(so_name.is_some());
 683                  assert_eq!(addr, so_name.unwrap());
 684  
 685              }
 686          }
 687      }
 688  
 689      #[cfg(test)]
 690      fn peer_name(addr: SocketAddr) {
 691          do run_in_mt_newsched_task {
 692              let (port, chan) = oneshot();
 693              let port = Cell::new(port);
 694              let chan = Cell::new(chan);
 695  
 696              do spawntask {
 697                  let mut acceptor = TcpListener::bind(addr).listen();
 698                  chan.take().send(());
 699  
 700                  acceptor.accept();
 701              }
 702  
 703              do spawntask {
 704                  port.take().recv();
 705                  let stream = TcpStream::connect(addr);
 706  
 707                  assert!(stream.is_some());
 708                  let mut stream = stream.unwrap();
 709  
 710                  // Make sure peer_name gives us the
 711                  // address/port of the peer we've
 712                  // connected to.
 713                  let peer_name = stream.peer_name();
 714                  assert!(peer_name.is_some());
 715                  assert_eq!(addr, peer_name.unwrap());
 716              }
 717          }
 718      }
 719  
 720      #[test]
 721      fn socket_and_peer_name_ip4() {
 722          peer_name(next_test_ip4());
 723          socket_name(next_test_ip4());
 724      }
 725  
 726      #[test]
 727      fn socket_and_peer_name_ip6() {
 728          // XXX: peer name is not consistent
 729          //peer_name(next_test_ip6());
 730          socket_name(next_test_ip6());
 731      }
 732  
 733  }