(index<- )        ./libstd/io/net/udp.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! UDP (User Datagram Protocol) network connections.
  12  //!
  13  //! This module contains the ability to open a UDP stream to a socket address.
  14  //! The destination and binding addresses can either be an IPv4 or IPv6
  15  //! address. There is no corresponding notion of a server because UDP is a
  16  //! datagram protocol.
  17  
  18  use clone::Clone;
  19  use io::net::ip::{SocketAddr, IpAddr};
  20  use io::{Reader, Writer, IoResult};
  21  use kinds::Send;
  22  use owned::Box;
  23  use option::Option;
  24  use result::{Ok, Err};
  25  use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo};
  26  
  27  /// A User Datagram Protocol socket.
  28  ///
  29  /// This is an implementation of a bound UDP socket. This supports both IPv4 and
  30  /// IPv6 addresses, and there is no corresponding notion of a server because UDP
  31  /// is a datagram protocol.
  32  ///
  33  /// # Example
  34  ///
  35  /// ```rust,no_run
  36  /// # #![allow(unused_must_use)]
  37  /// use std::io::net::udp::UdpSocket;
  38  /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
  39  ///
  40  /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
  41  /// let mut socket = match UdpSocket::bind(addr) {
  42  ///     Ok(s) => s,
  43  ///     Err(e) => fail!("couldn't bind socket: {}", e),
  44  /// };
  45  ///
  46  /// let mut buf = [0, ..10];
  47  /// match socket.recvfrom(buf) {
  48  ///     Ok((amt, src)) => {
  49  ///         // Send a reply to the socket we received data from
  50  ///         let buf = buf.mut_slice_to(amt);
  51  ///         buf.reverse();
  52  ///         socket.sendto(buf, src);
  53  ///     }
  54  ///     Err(e) => println!("couldn't receive a datagram: {}", e)
  55  /// }
  56  /// drop(socket); // close the socket
  57  /// ```
  58  pub struct UdpSocket {
  59      obj: Box<RtioUdpSocket:Send>,
  60  }
  61  
  62  impl UdpSocket {
  63      /// Creates a UDP socket from the given socket address.
  64      pub fn bind(addrSocketAddr) -> IoResult<UdpSocket> {
  65          LocalIo::maybe_raise(|io| {
  66              io.udp_bind(addr).map(|s| UdpSocket { obj: s })
  67          })
  68      }
  69  
  70      /// Receives data from the socket. On success, returns the number of bytes
  71      /// read and the address from whence the data came.
  72      pub fn recvfrom(&mut self, buf&mut [u8])
  73                      -> IoResult<(uint, SocketAddr)> {
  74          self.obj.recvfrom(buf)
  75      }
  76  
  77      /// Sends data on the socket to the given address. Returns nothing on
  78      /// success.
  79      pub fn sendto(&mut self, buf&[u8], dstSocketAddr) -> IoResult<()> {
  80          self.obj.sendto(buf, dst)
  81      }
  82  
  83      /// Creates a `UdpStream`, which allows use of the `Reader` and `Writer`
  84      /// traits to receive and send data from the same address. This transfers
  85      /// ownership of the socket to the stream.
  86      ///
  87      /// Note that this call does not perform any actual network communication,
  88      /// because UDP is a datagram protocol.
  89      pub fn connect(self, otherSocketAddr) -> UdpStream {
  90          UdpStream {
  91              socket: self,
  92              connected_to: other,
  93          }
  94      }
  95  
  96      /// Returns the socket address that this socket was created from.
  97      pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
  98          self.obj.socket_name()
  99      }
 100  
 101      /// Joins a multicast IP address (becomes a member of it)
 102      #[experimental]
 103      pub fn join_multicast(&mut self, multiIpAddr) -> IoResult<()> {
 104          self.obj.join_multicast(multi)
 105      }
 106  
 107      /// Leaves a multicast IP address (drops membership from it)
 108      #[experimental]
 109      pub fn leave_multicast(&mut self, multiIpAddr) -> IoResult<()> {
 110          self.obj.leave_multicast(multi)
 111      }
 112  
 113      /// Set the multicast loop flag to the specified value
 114      ///
 115      /// This lets multicast packets loop back to local sockets (if enabled)
 116      #[experimental]
 117      pub fn set_multicast_loop(&mut self, onbool) -> IoResult<()> {
 118          if on {
 119              self.obj.loop_multicast_locally()
 120          } else {
 121              self.obj.dont_loop_multicast_locally()
 122          }
 123      }
 124  
 125      /// Sets the multicast TTL
 126      #[experimental]
 127      pub fn set_multicast_ttl(&mut self, ttlint) -> IoResult<()> {
 128          self.obj.multicast_time_to_live(ttl)
 129      }
 130  
 131      /// Sets this socket's TTL
 132      #[experimental]
 133      pub fn set_ttl(&mut self, ttlint) -> IoResult<()> {
 134          self.obj.time_to_live(ttl)
 135      }
 136  
 137      /// Sets the broadcast flag on or off
 138      #[experimental]
 139      pub fn set_broadast(&mut self, broadcastbool) -> IoResult<()> {
 140          if broadcast {
 141              self.obj.hear_broadcasts()
 142          } else {
 143              self.obj.ignore_broadcasts()
 144          }
 145      }
 146  
 147      /// Sets the read/write timeout for this socket.
 148      ///
 149      /// For more information, see `TcpStream::set_timeout`
 150      #[experimental = "the timeout argument may change in type and value"]
 151      pub fn set_timeout(&mut self, timeout_msOption<u64>) {
 152          self.obj.set_timeout(timeout_ms)
 153      }
 154  
 155      /// Sets the read timeout for this socket.
 156      ///
 157      /// For more information, see `TcpStream::set_timeout`
 158      #[experimental = "the timeout argument may change in type and value"]
 159      pub fn set_read_timeout(&mut self, timeout_msOption<u64>) {
 160          self.obj.set_read_timeout(timeout_ms)
 161      }
 162  
 163      /// Sets the write timeout for this socket.
 164      ///
 165      /// For more information, see `TcpStream::set_timeout`
 166      #[experimental = "the timeout argument may change in type and value"]
 167      pub fn set_write_timeout(&mut self, timeout_msOption<u64>) {
 168          self.obj.set_write_timeout(timeout_ms)
 169      }
 170  }
 171  
 172  impl Clone for UdpSocket {
 173      /// Creates a new handle to this UDP socket, allowing for simultaneous
 174      /// reads and writes of the socket.
 175      ///
 176      /// The underlying UDP socket will not be closed until all handles to the
 177      /// socket have been deallocated. Two concurrent reads will not receive
 178      /// the same data. Instead, the first read will receive the first packet
 179      /// received, and the second read will receive the second packet.
 180      fn clone(&self) -> UdpSocket {
 181          UdpSocket {
 182              obj: self.obj.clone(),
 183          }
 184      }
 185  }
 186  
 187  /// A type that allows convenient usage of a UDP stream connected to one
 188  /// address via the `Reader` and `Writer` traits.
 189  pub struct UdpStream {
 190      socket: UdpSocket,
 191      connected_to: SocketAddr
 192  }
 193  
 194  impl UdpStream {
 195      /// Allows access to the underlying UDP socket owned by this stream. This
 196      /// is useful to, for example, use the socket to send data to hosts other
 197      /// than the one that this stream is connected to.
 198      pub fn as_socket<T>(&mut self, f|&mut UdpSocket-> T) -> T {
 199          f(&mut self.socket)
 200      }
 201  
 202      /// Consumes this UDP stream and returns out the underlying socket.
 203      pub fn disconnect(self) -> UdpSocket {
 204          self.socket
 205      }
 206  }
 207  
 208  impl Reader for UdpStream {
 209      fn read(&mut self, buf&mut [u8]) -> IoResult<uint> {
 210          let peer = self.connected_to;
 211          self.as_socket(|sock| {
 212              match sock.recvfrom(buf) {
 213                  Ok((_nread, src)) if src != peer => Ok(0),
 214                  Ok((nread, _src)) => Ok(nread),
 215                  Err(e) => Err(e),
 216              }
 217          })
 218      }
 219  }
 220  
 221  impl Writer for UdpStream {
 222      fn write(&mut self, buf&[u8]) -> IoResult<()> {
 223          let connected_to = self.connected_to;
 224          self.as_socket(|sock| sock.sendto(buf, connected_to))
 225      }
 226  }
 227  
 228  #[cfg(test)]
 229  mod test {
 230      use super::*;
 231      use io::net::ip::{SocketAddr};
 232  
 233      // FIXME #11530 this fails on android because tests are run as root
 234      iotest!(fn bind_error() {
 235          let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
 236          match UdpSocket::bind(addr) {
 237              Ok(..) => fail!(),
 238              Err(e) => assert_eq!(e.kind, PermissionDenied),
 239          }
 240      } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
 241  
 242      iotest!(fn socket_smoke_test_ip4() {
 243          let server_ip = next_test_ip4();
 244          let client_ip = next_test_ip4();
 245          let (tx1, rx1) = channel();
 246          let (tx2, rx2) = channel();
 247  
 248          spawn(proc() {
 249              match UdpSocket::bind(client_ip) {
 250                  Ok(ref mut client) => {
 251                      rx1.recv();
 252                      client.sendto([99], server_ip).unwrap()
 253                  }
 254                  Err(..) => fail!()
 255              }
 256              tx2.send(());
 257          });
 258  
 259          match UdpSocket::bind(server_ip) {
 260              Ok(ref mut server) => {
 261                  tx1.send(());
 262                  let mut buf = [0];
 263                  match server.recvfrom(buf) {
 264                      Ok((nread, src)) => {
 265                          assert_eq!(nread, 1);
 266                          assert_eq!(buf[0], 99);
 267                          assert_eq!(src, client_ip);
 268                      }
 269                      Err(..) => fail!()
 270                  }
 271              }
 272              Err(..) => fail!()
 273          }
 274          rx2.recv();
 275      })
 276  
 277      iotest!(fn socket_smoke_test_ip6() {
 278          let server_ip = next_test_ip6();
 279          let client_ip = next_test_ip6();
 280          let (tx, rx) = channel::<()>();
 281  
 282          spawn(proc() {
 283              match UdpSocket::bind(client_ip) {
 284                  Ok(ref mut client) => {
 285                      rx.recv();
 286                      client.sendto([99], server_ip).unwrap()
 287                  }
 288                  Err(..) => fail!()
 289              }
 290          });
 291  
 292          match UdpSocket::bind(server_ip) {
 293              Ok(ref mut server) => {
 294                  tx.send(());
 295                  let mut buf = [0];
 296                  match server.recvfrom(buf) {
 297                      Ok((nread, src)) => {
 298                          assert_eq!(nread, 1);
 299                          assert_eq!(buf[0], 99);
 300                          assert_eq!(src, client_ip);
 301                      }
 302                      Err(..) => fail!()
 303                  }
 304              }
 305              Err(..) => fail!()
 306          }
 307      })
 308  
 309      iotest!(fn stream_smoke_test_ip4() {
 310          let server_ip = next_test_ip4();
 311          let client_ip = next_test_ip4();
 312          let (tx1, rx1) = channel();
 313          let (tx2, rx2) = channel();
 314  
 315          spawn(proc() {
 316              match UdpSocket::bind(client_ip) {
 317                  Ok(client) => {
 318                      let client = box client;
 319                      let mut stream = client.connect(server_ip);
 320                      rx1.recv();
 321                      stream.write([99]).unwrap();
 322                  }
 323                  Err(..) => fail!()
 324              }
 325              tx2.send(());
 326          });
 327  
 328          match UdpSocket::bind(server_ip) {
 329              Ok(server) => {
 330                  let server = box server;
 331                  let mut stream = server.connect(client_ip);
 332                  tx1.send(());
 333                  let mut buf = [0];
 334                  match stream.read(buf) {
 335                      Ok(nread) => {
 336                          assert_eq!(nread, 1);
 337                          assert_eq!(buf[0], 99);
 338                      }
 339                      Err(..) => fail!()
 340                  }
 341              }
 342              Err(..) => fail!()
 343          }
 344          rx2.recv();
 345      })
 346  
 347      iotest!(fn stream_smoke_test_ip6() {
 348          let server_ip = next_test_ip6();
 349          let client_ip = next_test_ip6();
 350          let (tx1, rx1) = channel();
 351          let (tx2, rx2) = channel();
 352  
 353          spawn(proc() {
 354              match UdpSocket::bind(client_ip) {
 355                  Ok(client) => {
 356                      let client = box client;
 357                      let mut stream = client.connect(server_ip);
 358                      rx1.recv();
 359                      stream.write([99]).unwrap();
 360                  }
 361                  Err(..) => fail!()
 362              }
 363              tx2.send(());
 364          });
 365  
 366          match UdpSocket::bind(server_ip) {
 367              Ok(server) => {
 368                  let server = box server;
 369                  let mut stream = server.connect(client_ip);
 370                  tx1.send(());
 371                  let mut buf = [0];
 372                  match stream.read(buf) {
 373                      Ok(nread) => {
 374                          assert_eq!(nread, 1);
 375                          assert_eq!(buf[0], 99);
 376                      }
 377                      Err(..) => fail!()
 378                  }
 379              }
 380              Err(..) => fail!()
 381          }
 382          rx2.recv();
 383      })
 384  
 385      pub fn socket_name(addr: SocketAddr) {
 386          use result::ResultUnwrap;
 387  
 388          let server = UdpSocket::bind(addr);
 389  
 390          assert!(server.is_ok());
 391          let mut server = server.unwrap();
 392  
 393          // Make sure socket_name gives
 394          // us the socket we binded to.
 395          let so_name = server.socket_name();
 396          assert!(so_name.is_ok());
 397          assert_eq!(addr, so_name.unwrap());
 398      }
 399  
 400      iotest!(fn socket_name_ip4() {
 401          socket_name(next_test_ip4());
 402      })
 403  
 404      iotest!(fn socket_name_ip6() {
 405          socket_name(next_test_ip6());
 406      })
 407  
 408      iotest!(fn udp_clone_smoke() {
 409          let addr1 = next_test_ip4();
 410          let addr2 = next_test_ip4();
 411          let mut sock1 = UdpSocket::bind(addr1).unwrap();
 412          let sock2 = UdpSocket::bind(addr2).unwrap();
 413  
 414          spawn(proc() {
 415              let mut sock2 = sock2;
 416              let mut buf = [0, 0];
 417              assert_eq!(sock2.recvfrom(buf), Ok((1, addr1)));
 418              assert_eq!(buf[0], 1);
 419              sock2.sendto([2], addr1).unwrap();
 420          });
 421  
 422          let sock3 = sock1.clone();
 423  
 424          let (tx1, rx1) = channel();
 425          let (tx2, rx2) = channel();
 426          spawn(proc() {
 427              let mut sock3 = sock3;
 428              rx1.recv();
 429              sock3.sendto([1], addr2).unwrap();
 430              tx2.send(());
 431          });
 432          tx1.send(());
 433          let mut buf = [0, 0];
 434          assert_eq!(sock1.recvfrom(buf), Ok((1, addr2)));
 435          rx2.recv();
 436      })
 437  
 438      iotest!(fn udp_clone_two_read() {
 439          let addr1 = next_test_ip4();
 440          let addr2 = next_test_ip4();
 441          let mut sock1 = UdpSocket::bind(addr1).unwrap();
 442          let sock2 = UdpSocket::bind(addr2).unwrap();
 443          let (tx1, rx) = channel();
 444          let tx2 = tx1.clone();
 445  
 446          spawn(proc() {
 447              let mut sock2 = sock2;
 448              sock2.sendto([1], addr1).unwrap();
 449              rx.recv();
 450              sock2.sendto([2], addr1).unwrap();
 451              rx.recv();
 452          });
 453  
 454          let sock3 = sock1.clone();
 455  
 456          let (done, rx) = channel();
 457          spawn(proc() {
 458              let mut sock3 = sock3;
 459              let mut buf = [0, 0];
 460              sock3.recvfrom(buf).unwrap();
 461              tx2.send(());
 462              done.send(());
 463          });
 464          let mut buf = [0, 0];
 465          sock1.recvfrom(buf).unwrap();
 466          tx1.send(());
 467  
 468          rx.recv();
 469      })
 470  
 471      iotest!(fn udp_clone_two_write() {
 472          let addr1 = next_test_ip4();
 473          let addr2 = next_test_ip4();
 474          let mut sock1 = UdpSocket::bind(addr1).unwrap();
 475          let sock2 = UdpSocket::bind(addr2).unwrap();
 476  
 477          let (tx, rx) = channel();
 478          let (serv_tx, serv_rx) = channel();
 479  
 480          spawn(proc() {
 481              let mut sock2 = sock2;
 482              let mut buf = [0, 1];
 483  
 484              rx.recv();
 485              match sock2.recvfrom(buf) {
 486                  Ok(..) => {}
 487                  Err(e) => fail!("failed receive: {}", e),
 488              }
 489              serv_tx.send(());
 490          });
 491  
 492          let sock3 = sock1.clone();
 493  
 494          let (done, rx) = channel();
 495          let tx2 = tx.clone();
 496          spawn(proc() {
 497              let mut sock3 = sock3;
 498              match sock3.sendto([1], addr2) {
 499                  Ok(..) => { let _ = tx2.send_opt(()); }
 500                  Err(..) => {}
 501              }
 502              done.send(());
 503          });
 504          match sock1.sendto([2], addr2) {
 505              Ok(..) => { let _ = tx.send_opt(()); }
 506              Err(..) => {}
 507          }
 508          drop(tx);
 509  
 510          rx.recv();
 511          serv_rx.recv();
 512      })
 513  
 514      iotest!(fn recvfrom_timeout() {
 515          let addr1 = next_test_ip4();
 516          let addr2 = next_test_ip4();
 517          let mut a = UdpSocket::bind(addr1).unwrap();
 518  
 519          let (tx, rx) = channel();
 520          let (tx2, rx2) = channel();
 521          spawn(proc() {
 522              let mut a = UdpSocket::bind(addr2).unwrap();
 523              assert_eq!(a.recvfrom([0]), Ok((1, addr1)));
 524              assert_eq!(a.sendto([0], addr1), Ok(()));
 525              rx.recv();
 526              assert_eq!(a.sendto([0], addr1), Ok(()));
 527  
 528              tx2.send(());
 529          });
 530  
 531          // Make sure that reads time out, but writes can continue
 532          a.set_read_timeout(Some(20));
 533          assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
 534          assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
 535          assert_eq!(a.sendto([0], addr2), Ok(()));
 536  
 537          // Cloned handles should be able to block
 538          let mut a2 = a.clone();
 539          assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
 540  
 541          // Clearing the timeout should allow for receiving
 542          a.set_timeout(None);
 543          tx.send(());
 544          assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
 545  
 546          // Make sure the child didn't die
 547          rx2.recv();
 548      })
 549  
 550      iotest!(fn sendto_timeout() {
 551          let addr1 = next_test_ip4();
 552          let addr2 = next_test_ip4();
 553          let mut a = UdpSocket::bind(addr1).unwrap();
 554          let _b = UdpSocket::bind(addr2).unwrap();
 555  
 556          a.set_write_timeout(Some(1000));
 557          for _ in range(0, 100) {
 558              match a.sendto([0, ..4*1024], addr2) {
 559                  Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
 560                  Err(IoError { kind: TimedOut, .. }) => break,
 561                  Err(e) => fail!("other error: {}", e),
 562              }
 563          }
 564      })
 565  }