(index<- ) ./libstd/io/net/udp.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! UDP (User Datagram Protocol) network connections.
12 //!
13 //! This module contains the ability to open a UDP stream to a socket address.
14 //! The destination and binding addresses can either be an IPv4 or IPv6
15 //! address. There is no corresponding notion of a server because UDP is a
16 //! datagram protocol.
17
18 use clone::Clone;
19 use io::net::ip::{SocketAddr, IpAddr};
20 use io::{Reader, Writer, IoResult};
21 use kinds::Send;
22 use owned::Box;
23 use option::Option;
24 use result::{Ok, Err};
25 use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo};
26
27 /// A User Datagram Protocol socket.
28 ///
29 /// This is an implementation of a bound UDP socket. This supports both IPv4 and
30 /// IPv6 addresses, and there is no corresponding notion of a server because UDP
31 /// is a datagram protocol.
32 ///
33 /// # Example
34 ///
35 /// ```rust,no_run
36 /// # #![allow(unused_must_use)]
37 /// use std::io::net::udp::UdpSocket;
38 /// use std::io::net::ip::{Ipv4Addr, SocketAddr};
39 ///
40 /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
41 /// let mut socket = match UdpSocket::bind(addr) {
42 /// Ok(s) => s,
43 /// Err(e) => fail!("couldn't bind socket: {}", e),
44 /// };
45 ///
46 /// let mut buf = [0, ..10];
47 /// match socket.recvfrom(buf) {
48 /// Ok((amt, src)) => {
49 /// // Send a reply to the socket we received data from
50 /// let buf = buf.mut_slice_to(amt);
51 /// buf.reverse();
52 /// socket.sendto(buf, src);
53 /// }
54 /// Err(e) => println!("couldn't receive a datagram: {}", e)
55 /// }
56 /// drop(socket); // close the socket
57 /// ```
58 pub struct UdpSocket {
59 obj: Box<RtioUdpSocket:Send>,
60 }
61
62 impl UdpSocket {
63 /// Creates a UDP socket from the given socket address.
64 pub fn bind(addr: SocketAddr) -> IoResult<UdpSocket> {
65 LocalIo::maybe_raise(|io| {
66 io.udp_bind(addr).map(|s| UdpSocket { obj: s })
67 })
68 }
69
70 /// Receives data from the socket. On success, returns the number of bytes
71 /// read and the address from whence the data came.
72 pub fn recvfrom(&mut self, buf: &mut [u8])
73 -> IoResult<(uint, SocketAddr)> {
74 self.obj.recvfrom(buf)
75 }
76
77 /// Sends data on the socket to the given address. Returns nothing on
78 /// success.
79 pub fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> IoResult<()> {
80 self.obj.sendto(buf, dst)
81 }
82
83 /// Creates a `UdpStream`, which allows use of the `Reader` and `Writer`
84 /// traits to receive and send data from the same address. This transfers
85 /// ownership of the socket to the stream.
86 ///
87 /// Note that this call does not perform any actual network communication,
88 /// because UDP is a datagram protocol.
89 pub fn connect(self, other: SocketAddr) -> UdpStream {
90 UdpStream {
91 socket: self,
92 connected_to: other,
93 }
94 }
95
96 /// Returns the socket address that this socket was created from.
97 pub fn socket_name(&mut self) -> IoResult<SocketAddr> {
98 self.obj.socket_name()
99 }
100
101 /// Joins a multicast IP address (becomes a member of it)
102 #[experimental]
103 pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
104 self.obj.join_multicast(multi)
105 }
106
107 /// Leaves a multicast IP address (drops membership from it)
108 #[experimental]
109 pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> {
110 self.obj.leave_multicast(multi)
111 }
112
113 /// Set the multicast loop flag to the specified value
114 ///
115 /// This lets multicast packets loop back to local sockets (if enabled)
116 #[experimental]
117 pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
118 if on {
119 self.obj.loop_multicast_locally()
120 } else {
121 self.obj.dont_loop_multicast_locally()
122 }
123 }
124
125 /// Sets the multicast TTL
126 #[experimental]
127 pub fn set_multicast_ttl(&mut self, ttl: int) -> IoResult<()> {
128 self.obj.multicast_time_to_live(ttl)
129 }
130
131 /// Sets this socket's TTL
132 #[experimental]
133 pub fn set_ttl(&mut self, ttl: int) -> IoResult<()> {
134 self.obj.time_to_live(ttl)
135 }
136
137 /// Sets the broadcast flag on or off
138 #[experimental]
139 pub fn set_broadast(&mut self, broadcast: bool) -> IoResult<()> {
140 if broadcast {
141 self.obj.hear_broadcasts()
142 } else {
143 self.obj.ignore_broadcasts()
144 }
145 }
146
147 /// Sets the read/write timeout for this socket.
148 ///
149 /// For more information, see `TcpStream::set_timeout`
150 #[experimental = "the timeout argument may change in type and value"]
151 pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
152 self.obj.set_timeout(timeout_ms)
153 }
154
155 /// Sets the read timeout for this socket.
156 ///
157 /// For more information, see `TcpStream::set_timeout`
158 #[experimental = "the timeout argument may change in type and value"]
159 pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) {
160 self.obj.set_read_timeout(timeout_ms)
161 }
162
163 /// Sets the write timeout for this socket.
164 ///
165 /// For more information, see `TcpStream::set_timeout`
166 #[experimental = "the timeout argument may change in type and value"]
167 pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) {
168 self.obj.set_write_timeout(timeout_ms)
169 }
170 }
171
172 impl Clone for UdpSocket {
173 /// Creates a new handle to this UDP socket, allowing for simultaneous
174 /// reads and writes of the socket.
175 ///
176 /// The underlying UDP socket will not be closed until all handles to the
177 /// socket have been deallocated. Two concurrent reads will not receive
178 /// the same data. Instead, the first read will receive the first packet
179 /// received, and the second read will receive the second packet.
180 fn clone(&self) -> UdpSocket {
181 UdpSocket {
182 obj: self.obj.clone(),
183 }
184 }
185 }
186
187 /// A type that allows convenient usage of a UDP stream connected to one
188 /// address via the `Reader` and `Writer` traits.
189 pub struct UdpStream {
190 socket: UdpSocket,
191 connected_to: SocketAddr
192 }
193
194 impl UdpStream {
195 /// Allows access to the underlying UDP socket owned by this stream. This
196 /// is useful to, for example, use the socket to send data to hosts other
197 /// than the one that this stream is connected to.
198 pub fn as_socket<T>(&mut self, f: |&mut UdpSocket| -> T) -> T {
199 f(&mut self.socket)
200 }
201
202 /// Consumes this UDP stream and returns out the underlying socket.
203 pub fn disconnect(self) -> UdpSocket {
204 self.socket
205 }
206 }
207
208 impl Reader for UdpStream {
209 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
210 let peer = self.connected_to;
211 self.as_socket(|sock| {
212 match sock.recvfrom(buf) {
213 Ok((_nread, src)) if src != peer => Ok(0),
214 Ok((nread, _src)) => Ok(nread),
215 Err(e) => Err(e),
216 }
217 })
218 }
219 }
220
221 impl Writer for UdpStream {
222 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
223 let connected_to = self.connected_to;
224 self.as_socket(|sock| sock.sendto(buf, connected_to))
225 }
226 }
227
228 #[cfg(test)]
229 mod test {
230 use super::*;
231 use io::net::ip::{SocketAddr};
232
233 // FIXME #11530 this fails on android because tests are run as root
234 iotest!(fn bind_error() {
235 let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 };
236 match UdpSocket::bind(addr) {
237 Ok(..) => fail!(),
238 Err(e) => assert_eq!(e.kind, PermissionDenied),
239 }
240 } #[ignore(cfg(windows))] #[ignore(cfg(target_os = "android"))])
241
242 iotest!(fn socket_smoke_test_ip4() {
243 let server_ip = next_test_ip4();
244 let client_ip = next_test_ip4();
245 let (tx1, rx1) = channel();
246 let (tx2, rx2) = channel();
247
248 spawn(proc() {
249 match UdpSocket::bind(client_ip) {
250 Ok(ref mut client) => {
251 rx1.recv();
252 client.sendto([99], server_ip).unwrap()
253 }
254 Err(..) => fail!()
255 }
256 tx2.send(());
257 });
258
259 match UdpSocket::bind(server_ip) {
260 Ok(ref mut server) => {
261 tx1.send(());
262 let mut buf = [0];
263 match server.recvfrom(buf) {
264 Ok((nread, src)) => {
265 assert_eq!(nread, 1);
266 assert_eq!(buf[0], 99);
267 assert_eq!(src, client_ip);
268 }
269 Err(..) => fail!()
270 }
271 }
272 Err(..) => fail!()
273 }
274 rx2.recv();
275 })
276
277 iotest!(fn socket_smoke_test_ip6() {
278 let server_ip = next_test_ip6();
279 let client_ip = next_test_ip6();
280 let (tx, rx) = channel::<()>();
281
282 spawn(proc() {
283 match UdpSocket::bind(client_ip) {
284 Ok(ref mut client) => {
285 rx.recv();
286 client.sendto([99], server_ip).unwrap()
287 }
288 Err(..) => fail!()
289 }
290 });
291
292 match UdpSocket::bind(server_ip) {
293 Ok(ref mut server) => {
294 tx.send(());
295 let mut buf = [0];
296 match server.recvfrom(buf) {
297 Ok((nread, src)) => {
298 assert_eq!(nread, 1);
299 assert_eq!(buf[0], 99);
300 assert_eq!(src, client_ip);
301 }
302 Err(..) => fail!()
303 }
304 }
305 Err(..) => fail!()
306 }
307 })
308
309 iotest!(fn stream_smoke_test_ip4() {
310 let server_ip = next_test_ip4();
311 let client_ip = next_test_ip4();
312 let (tx1, rx1) = channel();
313 let (tx2, rx2) = channel();
314
315 spawn(proc() {
316 match UdpSocket::bind(client_ip) {
317 Ok(client) => {
318 let client = box client;
319 let mut stream = client.connect(server_ip);
320 rx1.recv();
321 stream.write([99]).unwrap();
322 }
323 Err(..) => fail!()
324 }
325 tx2.send(());
326 });
327
328 match UdpSocket::bind(server_ip) {
329 Ok(server) => {
330 let server = box server;
331 let mut stream = server.connect(client_ip);
332 tx1.send(());
333 let mut buf = [0];
334 match stream.read(buf) {
335 Ok(nread) => {
336 assert_eq!(nread, 1);
337 assert_eq!(buf[0], 99);
338 }
339 Err(..) => fail!()
340 }
341 }
342 Err(..) => fail!()
343 }
344 rx2.recv();
345 })
346
347 iotest!(fn stream_smoke_test_ip6() {
348 let server_ip = next_test_ip6();
349 let client_ip = next_test_ip6();
350 let (tx1, rx1) = channel();
351 let (tx2, rx2) = channel();
352
353 spawn(proc() {
354 match UdpSocket::bind(client_ip) {
355 Ok(client) => {
356 let client = box client;
357 let mut stream = client.connect(server_ip);
358 rx1.recv();
359 stream.write([99]).unwrap();
360 }
361 Err(..) => fail!()
362 }
363 tx2.send(());
364 });
365
366 match UdpSocket::bind(server_ip) {
367 Ok(server) => {
368 let server = box server;
369 let mut stream = server.connect(client_ip);
370 tx1.send(());
371 let mut buf = [0];
372 match stream.read(buf) {
373 Ok(nread) => {
374 assert_eq!(nread, 1);
375 assert_eq!(buf[0], 99);
376 }
377 Err(..) => fail!()
378 }
379 }
380 Err(..) => fail!()
381 }
382 rx2.recv();
383 })
384
385 pub fn socket_name(addr: SocketAddr) {
386 use result::ResultUnwrap;
387
388 let server = UdpSocket::bind(addr);
389
390 assert!(server.is_ok());
391 let mut server = server.unwrap();
392
393 // Make sure socket_name gives
394 // us the socket we binded to.
395 let so_name = server.socket_name();
396 assert!(so_name.is_ok());
397 assert_eq!(addr, so_name.unwrap());
398 }
399
400 iotest!(fn socket_name_ip4() {
401 socket_name(next_test_ip4());
402 })
403
404 iotest!(fn socket_name_ip6() {
405 socket_name(next_test_ip6());
406 })
407
408 iotest!(fn udp_clone_smoke() {
409 let addr1 = next_test_ip4();
410 let addr2 = next_test_ip4();
411 let mut sock1 = UdpSocket::bind(addr1).unwrap();
412 let sock2 = UdpSocket::bind(addr2).unwrap();
413
414 spawn(proc() {
415 let mut sock2 = sock2;
416 let mut buf = [0, 0];
417 assert_eq!(sock2.recvfrom(buf), Ok((1, addr1)));
418 assert_eq!(buf[0], 1);
419 sock2.sendto([2], addr1).unwrap();
420 });
421
422 let sock3 = sock1.clone();
423
424 let (tx1, rx1) = channel();
425 let (tx2, rx2) = channel();
426 spawn(proc() {
427 let mut sock3 = sock3;
428 rx1.recv();
429 sock3.sendto([1], addr2).unwrap();
430 tx2.send(());
431 });
432 tx1.send(());
433 let mut buf = [0, 0];
434 assert_eq!(sock1.recvfrom(buf), Ok((1, addr2)));
435 rx2.recv();
436 })
437
438 iotest!(fn udp_clone_two_read() {
439 let addr1 = next_test_ip4();
440 let addr2 = next_test_ip4();
441 let mut sock1 = UdpSocket::bind(addr1).unwrap();
442 let sock2 = UdpSocket::bind(addr2).unwrap();
443 let (tx1, rx) = channel();
444 let tx2 = tx1.clone();
445
446 spawn(proc() {
447 let mut sock2 = sock2;
448 sock2.sendto([1], addr1).unwrap();
449 rx.recv();
450 sock2.sendto([2], addr1).unwrap();
451 rx.recv();
452 });
453
454 let sock3 = sock1.clone();
455
456 let (done, rx) = channel();
457 spawn(proc() {
458 let mut sock3 = sock3;
459 let mut buf = [0, 0];
460 sock3.recvfrom(buf).unwrap();
461 tx2.send(());
462 done.send(());
463 });
464 let mut buf = [0, 0];
465 sock1.recvfrom(buf).unwrap();
466 tx1.send(());
467
468 rx.recv();
469 })
470
471 iotest!(fn udp_clone_two_write() {
472 let addr1 = next_test_ip4();
473 let addr2 = next_test_ip4();
474 let mut sock1 = UdpSocket::bind(addr1).unwrap();
475 let sock2 = UdpSocket::bind(addr2).unwrap();
476
477 let (tx, rx) = channel();
478 let (serv_tx, serv_rx) = channel();
479
480 spawn(proc() {
481 let mut sock2 = sock2;
482 let mut buf = [0, 1];
483
484 rx.recv();
485 match sock2.recvfrom(buf) {
486 Ok(..) => {}
487 Err(e) => fail!("failed receive: {}", e),
488 }
489 serv_tx.send(());
490 });
491
492 let sock3 = sock1.clone();
493
494 let (done, rx) = channel();
495 let tx2 = tx.clone();
496 spawn(proc() {
497 let mut sock3 = sock3;
498 match sock3.sendto([1], addr2) {
499 Ok(..) => { let _ = tx2.send_opt(()); }
500 Err(..) => {}
501 }
502 done.send(());
503 });
504 match sock1.sendto([2], addr2) {
505 Ok(..) => { let _ = tx.send_opt(()); }
506 Err(..) => {}
507 }
508 drop(tx);
509
510 rx.recv();
511 serv_rx.recv();
512 })
513
514 iotest!(fn recvfrom_timeout() {
515 let addr1 = next_test_ip4();
516 let addr2 = next_test_ip4();
517 let mut a = UdpSocket::bind(addr1).unwrap();
518
519 let (tx, rx) = channel();
520 let (tx2, rx2) = channel();
521 spawn(proc() {
522 let mut a = UdpSocket::bind(addr2).unwrap();
523 assert_eq!(a.recvfrom([0]), Ok((1, addr1)));
524 assert_eq!(a.sendto([0], addr1), Ok(()));
525 rx.recv();
526 assert_eq!(a.sendto([0], addr1), Ok(()));
527
528 tx2.send(());
529 });
530
531 // Make sure that reads time out, but writes can continue
532 a.set_read_timeout(Some(20));
533 assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
534 assert_eq!(a.recvfrom([0]).err().unwrap().kind, TimedOut);
535 assert_eq!(a.sendto([0], addr2), Ok(()));
536
537 // Cloned handles should be able to block
538 let mut a2 = a.clone();
539 assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
540
541 // Clearing the timeout should allow for receiving
542 a.set_timeout(None);
543 tx.send(());
544 assert_eq!(a2.recvfrom([0]), Ok((1, addr2)));
545
546 // Make sure the child didn't die
547 rx2.recv();
548 })
549
550 iotest!(fn sendto_timeout() {
551 let addr1 = next_test_ip4();
552 let addr2 = next_test_ip4();
553 let mut a = UdpSocket::bind(addr1).unwrap();
554 let _b = UdpSocket::bind(addr2).unwrap();
555
556 a.set_write_timeout(Some(1000));
557 for _ in range(0, 100) {
558 match a.sendto([0, ..4*1024], addr2) {
559 Ok(()) | Err(IoError { kind: ShortWrite(..), .. }) => {},
560 Err(IoError { kind: TimedOut, .. }) => break,
561 Err(e) => fail!("other error: {}", e),
562 }
563 }
564 })
565 }