(index<- ) ./libnative/io/net.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use libc;
12 use std::cast;
13 use std::io::net::ip;
14 use std::io;
15 use std::mem;
16 use std::rt::rtio;
17 use std::sync::arc::UnsafeArc;
18 use std::unstable::mutex;
19
20 use super::{IoResult, retry, keep_going};
21 use super::c;
22 use super::util;
23
24 ////////////////////////////////////////////////////////////////////////////////
25 // sockaddr and misc bindings
26 ////////////////////////////////////////////////////////////////////////////////
27
28 #[cfg(windows)] pub type sock_t = libc::SOCKET;
29 #[cfg(unix)] pub type sock_t = super::file::fd_t;
30
31 pub fn htons(u: u16) -> u16 {
32 mem::to_be16(u)
33 }
34 pub fn ntohs(u: u16) -> u16 {
35 mem::from_be16(u)
36 }
37
38 enum InAddr {
39 InAddr(libc::in_addr),
40 In6Addr(libc::in6_addr),
41 }
42
43 fn ip_to_inaddr(ip: ip::IpAddr) -> InAddr {
44 match ip {
45 ip::Ipv4Addr(a, b, c, d) => {
46 InAddr(libc::in_addr {
47 s_addr: (d as u32 << 24) |
48 (c as u32 << 16) |
49 (b as u32 << 8) |
50 (a as u32 << 0)
51 })
52 }
53 ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
54 In6Addr(libc::in6_addr {
55 s6_addr: [
56 htons(a),
57 htons(b),
58 htons(c),
59 htons(d),
60 htons(e),
61 htons(f),
62 htons(g),
63 htons(h),
64 ]
65 })
66 }
67 }
68 }
69
70 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
71 unsafe {
72 let storage: libc::sockaddr_storage = mem::init();
73 let len = match ip_to_inaddr(addr.ip) {
74 InAddr(inaddr) => {
75 let storage: *mut libc::sockaddr_in = cast::transmute(&storage);
76 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
77 (*storage).sin_port = htons(addr.port);
78 (*storage).sin_addr = inaddr;
79 mem::size_of::<libc::sockaddr_in>()
80 }
81 In6Addr(inaddr) => {
82 let storage: *mut libc::sockaddr_in6 = cast::transmute(&storage);
83 (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
84 (*storage).sin6_port = htons(addr.port);
85 (*storage).sin6_addr = inaddr;
86 mem::size_of::<libc::sockaddr_in6>()
87 }
88 };
89 return (storage, len);
90 }
91 }
92
93 fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
94 unsafe {
95 let fam = match addr.ip {
96 ip::Ipv4Addr(..) => libc::AF_INET,
97 ip::Ipv6Addr(..) => libc::AF_INET6,
98 };
99 match libc::socket(fam, ty, 0) {
100 -1 => Err(super::last_error()),
101 fd => Ok(fd),
102 }
103 }
104 }
105
106 fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
107 payload: T) -> IoResult<()> {
108 unsafe {
109 let payload = &payload as *T as *libc::c_void;
110 let ret = libc::setsockopt(fd, opt, val,
111 payload,
112 mem::size_of::<T>() as libc::socklen_t);
113 if ret != 0 {
114 Err(last_error())
115 } else {
116 Ok(())
117 }
118 }
119 }
120
121 pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
122 val: libc::c_int) -> IoResult<T> {
123 unsafe {
124 let mut slot: T = mem::init();
125 let mut len = mem::size_of::<T>() as libc::socklen_t;
126 let ret = c::getsockopt(fd, opt, val,
127 &mut slot as *mut _ as *mut _,
128 &mut len);
129 if ret != 0 {
130 Err(last_error())
131 } else {
132 assert!(len as uint == mem::size_of::<T>());
133 Ok(slot)
134 }
135 }
136 }
137
138 #[cfg(windows)]
139 fn last_error() -> io::IoError {
140 io::IoError::from_errno(unsafe { c::WSAGetLastError() } as uint, true)
141 }
142
143 #[cfg(not(windows))]
144 fn last_error() -> io::IoError {
145 super::last_error()
146 }
147
148 #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
149 #[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
150
151 fn sockname(fd: sock_t,
152 f: extern "system" unsafe fn(sock_t, *mut libc::sockaddr,
153 *mut libc::socklen_t) -> libc::c_int)
154 -> IoResult<ip::SocketAddr>
155 {
156 let mut storage: libc::sockaddr_storage = unsafe { mem::init() };
157 let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
158 unsafe {
159 let storage = &mut storage as *mut libc::sockaddr_storage;
160 let ret = f(fd,
161 storage as *mut libc::sockaddr,
162 &mut len as *mut libc::socklen_t);
163 if ret != 0 {
164 return Err(last_error())
165 }
166 }
167 return sockaddr_to_addr(&storage, len as uint);
168 }
169
170 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
171 len: uint) -> IoResult<ip::SocketAddr> {
172 match storage.ss_family as libc::c_int {
173 libc::AF_INET => {
174 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
175 let storage: &libc::sockaddr_in = unsafe {
176 cast::transmute(storage)
177 };
178 let addr = storage.sin_addr.s_addr as u32;
179 let a = (addr >> 0) as u8;
180 let b = (addr >> 8) as u8;
181 let c = (addr >> 16) as u8;
182 let d = (addr >> 24) as u8;
183 Ok(ip::SocketAddr {
184 ip: ip::Ipv4Addr(a, b, c, d),
185 port: ntohs(storage.sin_port),
186 })
187 }
188 libc::AF_INET6 => {
189 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
190 let storage: &libc::sockaddr_in6 = unsafe {
191 cast::transmute(storage)
192 };
193 let a = ntohs(storage.sin6_addr.s6_addr[0]);
194 let b = ntohs(storage.sin6_addr.s6_addr[1]);
195 let c = ntohs(storage.sin6_addr.s6_addr[2]);
196 let d = ntohs(storage.sin6_addr.s6_addr[3]);
197 let e = ntohs(storage.sin6_addr.s6_addr[4]);
198 let f = ntohs(storage.sin6_addr.s6_addr[5]);
199 let g = ntohs(storage.sin6_addr.s6_addr[6]);
200 let h = ntohs(storage.sin6_addr.s6_addr[7]);
201 Ok(ip::SocketAddr {
202 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
203 port: ntohs(storage.sin6_port),
204 })
205 }
206 _ => {
207 Err(io::standard_error(io::OtherIoError))
208 }
209 }
210 }
211
212 #[cfg(unix)]
213 pub fn init() {}
214
215 #[cfg(windows)]
216 pub fn init() {
217
218 unsafe {
219 use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
220 static mut INITIALIZED: bool = false;
221 static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
222
223 let _guard = LOCK.lock();
224 if !INITIALIZED {
225 let mut data: c::WSADATA = mem::init();
226 let ret = c::WSAStartup(0x202, // version 2.2
227 &mut data);
228 assert_eq!(ret, 0);
229 INITIALIZED = true;
230 }
231 }
232 }
233
234 ////////////////////////////////////////////////////////////////////////////////
235 // TCP streams
236 ////////////////////////////////////////////////////////////////////////////////
237
238 pub struct TcpStream {
239 inner: UnsafeArc<Inner>,
240 read_deadline: u64,
241 write_deadline: u64,
242 }
243
244 struct Inner {
245 fd: sock_t,
246 lock: mutex::NativeMutex,
247 }
248
249 pub struct Guard<'a> {
250 pub fd: sock_t,
251 pub guard: mutex::LockGuard<'a>,
252 }
253
254 impl Inner {
255 fn new(fd: sock_t) -> Inner {
256 Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
257 }
258 }
259
260 impl TcpStream {
261 pub fn connect(addr: ip::SocketAddr,
262 timeout: Option<u64>) -> IoResult<TcpStream> {
263 let fd = try!(socket(addr, libc::SOCK_STREAM));
264 let ret = TcpStream::new(Inner::new(fd));
265
266 let (addr, len) = addr_to_sockaddr(addr);
267 let addrp = &addr as *_ as *libc::sockaddr;
268 let len = len as libc::socklen_t;
269
270 match timeout {
271 Some(timeout) => {
272 try!(util::connect_timeout(fd, addrp, len, timeout));
273 Ok(ret)
274 },
275 None => {
276 match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
277 -1 => Err(last_error()),
278 _ => Ok(ret),
279 }
280 }
281 }
282 }
283
284 fn new(inner: Inner) -> TcpStream {
285 TcpStream {
286 inner: UnsafeArc::new(inner),
287 read_deadline: 0,
288 write_deadline: 0,
289 }
290 }
291
292 pub fn fd(&self) -> sock_t {
293 // This unsafety is fine because it's just a read-only arc
294 unsafe { (*self.inner.get()).fd }
295 }
296
297 fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
298 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
299 nodelay as libc::c_int)
300 }
301
302 fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
303 let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
304 seconds.is_some() as libc::c_int);
305 match seconds {
306 Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
307 None => ret,
308 }
309 }
310
311 #[cfg(target_os = "macos")]
312 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
313 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
314 seconds as libc::c_int)
315 }
316 #[cfg(target_os = "freebsd")]
317 fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
318 setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
319 seconds as libc::c_int)
320 }
321 #[cfg(not(target_os = "macos"), not(target_os = "freebsd"))]
322 fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
323 Ok(())
324 }
325
326 #[cfg(target_os = "linux")]
327 fn lock_nonblocking(&self) {}
328
329 #[cfg(not(target_os = "linux"))]
330 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
331 let ret = Guard {
332 fd: self.fd(),
333 guard: unsafe { (*self.inner.get()).lock.lock() },
334 };
335 assert!(util::set_nonblocking(self.fd(), true).is_ok());
336 ret
337 }
338 }
339
340 #[cfg(windows)] type wrlen = libc::c_int;
341 #[cfg(not(windows))] type wrlen = libc::size_t;
342
343 impl rtio::RtioTcpStream for TcpStream {
344 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
345 let fd = self.fd();
346 let dolock = || self.lock_nonblocking();
347 let doread = |nb| unsafe {
348 let flags = if nb {c::MSG_DONTWAIT} else {0};
349 libc::recv(fd,
350 buf.as_mut_ptr() as *mut libc::c_void,
351 buf.len() as wrlen,
352 flags) as libc::c_int
353 };
354 read(fd, self.read_deadline, dolock, doread)
355 }
356
357 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
358 let fd = self.fd();
359 let dolock = || self.lock_nonblocking();
360 let dowrite = |nb: bool, buf: *u8, len: uint| unsafe {
361 let flags = if nb {c::MSG_DONTWAIT} else {0};
362 libc::send(fd,
363 buf as *mut libc::c_void,
364 len as wrlen,
365 flags) as i64
366 };
367 match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
368 Ok(_) => Ok(()),
369 Err(e) => Err(e)
370 }
371 }
372 fn peer_name(&mut self) -> IoResult<ip::SocketAddr> {
373 sockname(self.fd(), libc::getpeername)
374 }
375 fn control_congestion(&mut self) -> IoResult<()> {
376 self.set_nodelay(false)
377 }
378 fn nodelay(&mut self) -> IoResult<()> {
379 self.set_nodelay(true)
380 }
381 fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
382 self.set_keepalive(Some(delay_in_seconds))
383 }
384 fn letdie(&mut self) -> IoResult<()> {
385 self.set_keepalive(None)
386 }
387
388 fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
389 box TcpStream {
390 inner: self.inner.clone(),
391 read_deadline: 0,
392 write_deadline: 0,
393 } as Box<rtio::RtioTcpStream:Send>
394 }
395
396 fn close_write(&mut self) -> IoResult<()> {
397 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
398 }
399 fn close_read(&mut self) -> IoResult<()> {
400 super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
401 }
402
403 fn set_timeout(&mut self, timeout: Option<u64>) {
404 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
405 self.read_deadline = deadline;
406 self.write_deadline = deadline;
407 }
408 fn set_read_timeout(&mut self, timeout: Option<u64>) {
409 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
410 }
411 fn set_write_timeout(&mut self, timeout: Option<u64>) {
412 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
413 }
414 }
415
416 impl rtio::RtioSocket for TcpStream {
417 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
418 sockname(self.fd(), libc::getsockname)
419 }
420 }
421
422 impl Drop for Inner {
423 fn drop(&mut self) { unsafe { close(self.fd); } }
424 }
425
426 #[unsafe_destructor]
427 impl<'a> Drop for Guard<'a> {
428 fn drop(&mut self) {
429 assert!(util::set_nonblocking(self.fd, false).is_ok());
430 }
431 }
432
433 ////////////////////////////////////////////////////////////////////////////////
434 // TCP listeners
435 ////////////////////////////////////////////////////////////////////////////////
436
437 pub struct TcpListener {
438 inner: Inner,
439 }
440
441 impl TcpListener {
442 pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
443 let fd = try!(socket(addr, libc::SOCK_STREAM));
444 let ret = TcpListener { inner: Inner::new(fd) };
445
446 let (addr, len) = addr_to_sockaddr(addr);
447 let addrp = &addr as *_ as *libc::sockaddr;
448 let len = len as libc::socklen_t;
449
450 // On platforms with Berkeley-derived sockets, this allows
451 // to quickly rebind a socket, without needing to wait for
452 // the OS to clean up the previous one.
453 if cfg!(unix) {
454 try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
455 1 as libc::c_int));
456 }
457
458 match unsafe { libc::bind(fd, addrp, len) } {
459 -1 => Err(last_error()),
460 _ => Ok(ret),
461 }
462 }
463
464 pub fn fd(&self) -> sock_t { self.inner.fd }
465
466 pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
467 match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
468 -1 => Err(last_error()),
469 _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
470 }
471 }
472 }
473
474 impl rtio::RtioTcpListener for TcpListener {
475 fn listen(~self) -> IoResult<Box<rtio::RtioTcpAcceptor:Send>> {
476 self.native_listen(128).map(|a| {
477 box a as Box<rtio::RtioTcpAcceptor:Send>
478 })
479 }
480 }
481
482 impl rtio::RtioSocket for TcpListener {
483 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
484 sockname(self.fd(), libc::getsockname)
485 }
486 }
487
488 pub struct TcpAcceptor {
489 listener: TcpListener,
490 deadline: u64,
491 }
492
493 impl TcpAcceptor {
494 pub fn fd(&self) -> sock_t { self.listener.fd() }
495
496 pub fn native_accept(&mut self) -> IoResult<TcpStream> {
497 if self.deadline != 0 {
498 try!(util::await(self.fd(), Some(self.deadline), util::Readable));
499 }
500 unsafe {
501 let mut storage: libc::sockaddr_storage = mem::init();
502 let storagep = &mut storage as *mut libc::sockaddr_storage;
503 let size = mem::size_of::<libc::sockaddr_storage>();
504 let mut size = size as libc::socklen_t;
505 match retry(|| {
506 libc::accept(self.fd(),
507 storagep as *mut libc::sockaddr,
508 &mut size as *mut libc::socklen_t) as libc::c_int
509 }) as sock_t {
510 -1 => Err(last_error()),
511 fd => Ok(TcpStream::new(Inner::new(fd))),
512 }
513 }
514 }
515 }
516
517 impl rtio::RtioSocket for TcpAcceptor {
518 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
519 sockname(self.fd(), libc::getsockname)
520 }
521 }
522
523 impl rtio::RtioTcpAcceptor for TcpAcceptor {
524 fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream:Send>> {
525 self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream:Send>)
526 }
527
528 fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
529 fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
530 fn set_timeout(&mut self, timeout: Option<u64>) {
531 self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
532 }
533 }
534
535 ////////////////////////////////////////////////////////////////////////////////
536 // UDP
537 ////////////////////////////////////////////////////////////////////////////////
538
539 pub struct UdpSocket {
540 inner: UnsafeArc<Inner>,
541 read_deadline: u64,
542 write_deadline: u64,
543 }
544
545 impl UdpSocket {
546 pub fn bind(addr: ip::SocketAddr) -> IoResult<UdpSocket> {
547 let fd = try!(socket(addr, libc::SOCK_DGRAM));
548 let ret = UdpSocket {
549 inner: UnsafeArc::new(Inner::new(fd)),
550 read_deadline: 0,
551 write_deadline: 0,
552 };
553
554 let (addr, len) = addr_to_sockaddr(addr);
555 let addrp = &addr as *_ as *libc::sockaddr;
556 let len = len as libc::socklen_t;
557
558 match unsafe { libc::bind(fd, addrp, len) } {
559 -1 => Err(last_error()),
560 _ => Ok(ret),
561 }
562 }
563
564 pub fn fd(&self) -> sock_t {
565 // unsafety is fine because it's just a read-only arc
566 unsafe { (*self.inner.get()).fd }
567 }
568
569 pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
570 setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
571 on as libc::c_int)
572 }
573
574 pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
575 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
576 on as libc::c_int)
577 }
578
579 pub fn set_membership(&mut self, addr: ip::IpAddr,
580 opt: libc::c_int) -> IoResult<()> {
581 match ip_to_inaddr(addr) {
582 InAddr(addr) => {
583 let mreq = libc::ip_mreq {
584 imr_multiaddr: addr,
585 // interface == INADDR_ANY
586 imr_interface: libc::in_addr { s_addr: 0x0 },
587 };
588 setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
589 }
590 In6Addr(addr) => {
591 let mreq = libc::ip6_mreq {
592 ipv6mr_multiaddr: addr,
593 ipv6mr_interface: 0,
594 };
595 setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
596 }
597 }
598 }
599
600 #[cfg(target_os = "linux")]
601 fn lock_nonblocking(&self) {}
602
603 #[cfg(not(target_os = "linux"))]
604 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
605 let ret = Guard {
606 fd: self.fd(),
607 guard: unsafe { (*self.inner.get()).lock.lock() },
608 };
609 assert!(util::set_nonblocking(self.fd(), true).is_ok());
610 ret
611 }
612 }
613
614 impl rtio::RtioSocket for UdpSocket {
615 fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
616 sockname(self.fd(), libc::getsockname)
617 }
618 }
619
620 #[cfg(windows)] type msglen_t = libc::c_int;
621 #[cfg(unix)] type msglen_t = libc::size_t;
622
623 impl rtio::RtioUdpSocket for UdpSocket {
624 fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, ip::SocketAddr)> {
625 let fd = self.fd();
626 let mut storage: libc::sockaddr_storage = unsafe { mem::init() };
627 let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
628 let mut addrlen: libc::socklen_t =
629 mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
630
631 let dolock = || self.lock_nonblocking();
632 let doread = |nb| unsafe {
633 let flags = if nb {c::MSG_DONTWAIT} else {0};
634 libc::recvfrom(fd,
635 buf.as_mut_ptr() as *mut libc::c_void,
636 buf.len() as msglen_t,
637 flags,
638 storagep,
639 &mut addrlen) as libc::c_int
640 };
641 let n = try!(read(fd, self.read_deadline, dolock, doread));
642 sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
643 Ok((n as uint, addr))
644 })
645 }
646
647 fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> IoResult<()> {
648 let (dst, dstlen) = addr_to_sockaddr(dst);
649 let dstp = &dst as *_ as *libc::sockaddr;
650 let dstlen = dstlen as libc::socklen_t;
651
652 let fd = self.fd();
653 let dolock = || self.lock_nonblocking();
654 let dowrite = |nb, buf: *u8, len: uint| unsafe {
655 let flags = if nb {c::MSG_DONTWAIT} else {0};
656 libc::sendto(fd,
657 buf as *libc::c_void,
658 len as msglen_t,
659 flags,
660 dstp,
661 dstlen) as i64
662 };
663
664 let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
665 if n != buf.len() {
666 Err(io::IoError {
667 kind: io::ShortWrite(n),
668 desc: "couldn't send entire packet at once",
669 detail: None,
670 })
671 } else {
672 Ok(())
673 }
674 }
675
676 fn join_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> {
677 match multi {
678 ip::Ipv4Addr(..) => {
679 self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
680 }
681 ip::Ipv6Addr(..) => {
682 self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
683 }
684 }
685 }
686 fn leave_multicast(&mut self, multi: ip::IpAddr) -> IoResult<()> {
687 match multi {
688 ip::Ipv4Addr(..) => {
689 self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
690 }
691 ip::Ipv6Addr(..) => {
692 self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
693 }
694 }
695 }
696
697 fn loop_multicast_locally(&mut self) -> IoResult<()> {
698 self.set_multicast_loop(true)
699 }
700 fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
701 self.set_multicast_loop(false)
702 }
703
704 fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
705 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
706 ttl as libc::c_int)
707 }
708 fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
709 setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
710 }
711
712 fn hear_broadcasts(&mut self) -> IoResult<()> {
713 self.set_broadcast(true)
714 }
715 fn ignore_broadcasts(&mut self) -> IoResult<()> {
716 self.set_broadcast(false)
717 }
718
719 fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
720 box UdpSocket {
721 inner: self.inner.clone(),
722 read_deadline: 0,
723 write_deadline: 0,
724 } as Box<rtio::RtioUdpSocket:Send>
725 }
726
727 fn set_timeout(&mut self, timeout: Option<u64>) {
728 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
729 self.read_deadline = deadline;
730 self.write_deadline = deadline;
731 }
732 fn set_read_timeout(&mut self, timeout: Option<u64>) {
733 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
734 }
735 fn set_write_timeout(&mut self, timeout: Option<u64>) {
736 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
737 }
738 }
739
740 ////////////////////////////////////////////////////////////////////////////////
741 // Timeout helpers
742 //
743 // The read/write functions below are the helpers for reading/writing a socket
744 // with a possible deadline specified. This is generally viewed as a timed out
745 // I/O operation.
746 //
747 // From the application's perspective, timeouts apply to the I/O object, not to
748 // the underlying file descriptor (it's one timeout per object). This means that
749 // we can't use the SO_RCVTIMEO and corresponding send timeout option.
750 //
751 // The next idea to implement timeouts would be to use nonblocking I/O. An
752 // invocation of select() would wait (with a timeout) for a socket to be ready.
753 // Once its ready, we can perform the operation. Note that the operation *must*
754 // be nonblocking, even though select() says the socket is ready. This is
755 // because some other thread could have come and stolen our data (handles can be
756 // cloned).
757 //
758 // To implement nonblocking I/O, the first option we have is to use the
759 // O_NONBLOCK flag. Remember though that this is a global setting, affecting all
760 // I/O objects, so this was initially viewed as unwise.
761 //
762 // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
763 // send/recv, but the niftiness wears off once you realize it only works well on
764 // linux [1] [2]. This means that it's pretty easy to get a nonblocking
765 // operation on linux (no flag fidding, no affecting other objects), but not on
766 // other platforms.
767 //
768 // To work around this constraint on other platforms, we end up using the
769 // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
770 // could cause other objects' blocking operations to suddenly become
771 // nonblocking. To get around this, a "blocking operation" which returns EAGAIN
772 // falls back to using the same code path as nonblocking operations, but with an
773 // infinite timeout (select + send/recv). This helps emulate blocking
774 // reads/writes despite the underlying descriptor being nonblocking, as well as
775 // optimizing the fast path of just hitting one syscall in the good case.
776 //
777 // As a final caveat, this implementation uses a mutex so only one thread is
778 // doing a nonblocking operation at at time. This is the operation that comes
779 // after the select() (at which point we think the socket is ready). This is
780 // done for sanity to ensure that the state of the O_NONBLOCK flag is what we
781 // expect (wouldn't want someone turning it on when it should be off!). All
782 // operations performed in the lock are *nonblocking* to avoid holding the mutex
783 // forever.
784 //
785 // So, in summary, linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
786 // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
787 // reads/writes are still blocking.
788 //
789 // Fun, fun!
790 //
791 // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
792 // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
793
794 pub fn read<T>(fd: sock_t,
795 deadline: u64,
796 lock: || -> T,
797 read: |bool| -> libc::c_int) -> IoResult<uint> {
798 let mut ret = -1;
799 if deadline == 0 {
800 ret = retry(|| read(false));
801 }
802
803 if deadline != 0 || (ret == -1 && util::wouldblock()) {
804 let deadline = match deadline {
805 0 => None,
806 n => Some(n),
807 };
808 loop {
809 // With a timeout, first we wait for the socket to become
810 // readable using select(), specifying the relevant timeout for
811 // our previously set deadline.
812 try!(util::await(fd, deadline, util::Readable));
813
814 // At this point, we're still within the timeout, and we've
815 // determined that the socket is readable (as returned by
816 // select). We must still read the socket in *nonblocking* mode
817 // because some other thread could come steal our data. If we
818 // fail to read some data, we retry (hence the outer loop) and
819 // wait for the socket to become readable again.
820 let _guard = lock();
821 match retry(|| read(deadline.is_some())) {
822 -1 if util::wouldblock() => { assert!(deadline.is_some()); }
823 -1 => return Err(last_error()),
824 n => { ret = n; break }
825 }
826 }
827 }
828
829 match ret {
830 0 => Err(io::standard_error(io::EndOfFile)),
831 n if n < 0 => Err(last_error()),
832 n => Ok(n as uint)
833 }
834 }
835
836 pub fn write<T>(fd: sock_t,
837 deadline: u64,
838 buf: &[u8],
839 write_everything: bool,
840 lock: || -> T,
841 write: |bool, *u8, uint| -> i64) -> IoResult<uint> {
842 let mut ret = -1;
843 let mut written = 0;
844 if deadline == 0 {
845 if write_everything {
846 ret = keep_going(buf, |inner, len| {
847 written = buf.len() - len;
848 write(false, inner, len)
849 });
850 } else {
851 ret = retry(|| {
852 write(false, buf.as_ptr(), buf.len()) as libc::c_int
853 }) as i64;
854 if ret > 0 { written = ret as uint; }
855 }
856 }
857
858 if deadline != 0 || (ret == -1 && util::wouldblock()) {
859 let deadline = match deadline {
860 0 => None,
861 n => Some(n),
862 };
863 while written < buf.len() && (write_everything || written == 0) {
864 // As with read(), first wait for the socket to be ready for
865 // the I/O operation.
866 match util::await(fd, deadline, util::Writable) {
867 Err(ref e) if e.kind == io::TimedOut && written > 0 => {
868 assert!(deadline.is_some());
869 return Err(io::IoError {
870 kind: io::ShortWrite(written),
871 desc: "short write",
872 detail: None,
873 })
874 }
875 Err(e) => return Err(e),
876 Ok(()) => {}
877 }
878
879 // Also as with read(), we use MSG_DONTWAIT to guard ourselves
880 // against unforseen circumstances.
881 let _guard = lock();
882 let ptr = buf.slice_from(written).as_ptr();
883 let len = buf.len() - written;
884 match retry(|| write(deadline.is_some(), ptr, len) as libc::c_int) {
885 -1 if util::wouldblock() => {}
886 -1 => return Err(last_error()),
887 n => { written += n as uint; }
888 }
889 }
890 ret = 0;
891 }
892 if ret < 0 {
893 Err(last_error())
894 } else {
895 Ok(written)
896 }
897 }
libnative/io/net.rs:30:1-30:1 -fn- definition:
pub fn htons(u: u16) -> u16 {
mem::to_be16(u)
}
references:- 1056: htons(a),
57: htons(b),
58: htons(c),
--
76: (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
77: (*storage).sin_port = htons(addr.port);
78: (*storage).sin_addr = inaddr;
--
83: (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
84: (*storage).sin6_port = htons(addr.port);
85: (*storage).sin6_addr = inaddr;
libnative/io/net.rs:793:1-793:1 -fn- definition:
pub fn read<T>(fd: sock_t,
deadline: u64,
lock: || -> T,
references:- 3353: };
354: read(fd, self.read_deadline, dolock, doread)
355: }
--
640: };
641: let n = try!(read(fd, self.read_deadline, dolock, doread));
642: sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
libnative/io/pipe_unix.rs:
159: };
160: net::read(fd, self.read_deadline, dolock, doread)
161: }
libnative/io/net.rs:150:1-150:1 -fn- definition:
fn sockname(fd: sock_t,
f: extern "system" unsafe fn(sock_t, *mut libc::sockaddr,
*mut libc::socklen_t) -> libc::c_int)
references:- 5372: fn peer_name(&mut self) -> IoResult<ip::SocketAddr> {
373: sockname(self.fd(), libc::getpeername)
374: }
--
615: fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
616: sockname(self.fd(), libc::getsockname)
617: }
libnative/io/net.rs:29:16-29:16 -NK_AS_STR_TODO- definition:
pub fn htons(u: u16) -> u16 {
mem::to_be16(u)
}
references:- 20249: pub struct Guard<'a> {
250: pub fd: sock_t,
251: pub guard: mutex::LockGuard<'a>,
--
564: pub fn fd(&self) -> sock_t {
565: // unsafety is fine because it's just a read-only arc
--
794: pub fn read<T>(fd: sock_t,
795: deadline: u64,
--
836: pub fn write<T>(fd: sock_t,
837: deadline: u64,
libnative/io/util.rs:
138: pub fn await(fd: net::sock_t, deadline: Option<u64>,
139: status: SocketStatus) -> IoResult<()> {
libnative/io/net.rs:
508: &mut size as *mut libc::socklen_t) as libc::c_int
509: }) as sock_t {
510: -1 => Err(last_error()),
libnative/io/net.rs:243:1-243:1 -struct- definition:
struct Inner {
fd: sock_t,
lock: mutex::NativeMutex,
references:- 8255: fn new(fd: sock_t) -> Inner {
256: Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
257: }
--
284: fn new(inner: Inner) -> TcpStream {
285: TcpStream {
--
422: impl Drop for Inner {
423: fn drop(&mut self) { unsafe { close(self.fd); } }
--
539: pub struct UdpSocket {
540: inner: UnsafeArc<Inner>,
541: read_deadline: u64,
libnative/io/net.rs:92:1-92:1 -fn- definition:
fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
unsafe {
let fam = match addr.ip {
references:- 3262: timeout: Option<u64>) -> IoResult<TcpStream> {
263: let fd = try!(socket(addr, libc::SOCK_STREAM));
264: let ret = TcpStream::new(Inner::new(fd));
--
442: pub fn bind(addr: ip::SocketAddr) -> IoResult<TcpListener> {
443: let fd = try!(socket(addr, libc::SOCK_STREAM));
444: let ret = TcpListener { inner: Inner::new(fd) };
--
546: pub fn bind(addr: ip::SocketAddr) -> IoResult<UdpSocket> {
547: let fd = try!(socket(addr, libc::SOCK_DGRAM));
548: let ret = UdpSocket {
libnative/io/net.rs:436:1-436:1 -struct- definition:
pub struct TcpListener {
inner: Inner,
}
references:- 6443: let fd = try!(socket(addr, libc::SOCK_STREAM));
444: let ret = TcpListener { inner: Inner::new(fd) };
--
488: pub struct TcpAcceptor {
489: listener: TcpListener,
490: deadline: u64,
libnative/io/net.rs:169:1-169:1 -fn- definition:
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
len: uint) -> IoResult<ip::SocketAddr> {
match storage.ss_family as libc::c_int {
references:- 3641: let n = try!(read(fd, self.read_deadline, dolock, doread));
642: sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
643: Ok((n as uint, addr))
libnative/io/addrinfo.rs:
63: unsafe {
64: let addr = match sockaddr_to_addr(cast::transmute((*rp).ai_addr),
65: (*rp).ai_addrlen as uint) {
libnative/io/net.rs:
166: }
167: return sockaddr_to_addr(&storage, len as uint);
168: }
libnative/io/net.rs:69:1-69:1 -fn- definition:
fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
unsafe {
let storage: libc::sockaddr_storage = mem::init();
references:- 4446: let (addr, len) = addr_to_sockaddr(addr);
447: let addrp = &addr as *_ as *libc::sockaddr;
--
647: fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> IoResult<()> {
648: let (dst, dstlen) = addr_to_sockaddr(dst);
649: let dstp = &dst as *_ as *libc::sockaddr;
libnative/io/net.rs:341:21-341:21 -NK_AS_STR_TODO- definition:
impl rtio::RtioTcpStream for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let fd = self.fd();
references:- 2363: buf as *mut libc::c_void,
364: len as wrlen,
365: flags) as i64
libnative/io/net.rs:538:1-538:1 -struct- definition:
pub struct UdpSocket {
inner: UnsafeArc<Inner>,
read_deadline: u64,
references:- 6547: let fd = try!(socket(addr, libc::SOCK_DGRAM));
548: let ret = UdpSocket {
549: inner: UnsafeArc::new(Inner::new(fd)),
--
719: fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
720: box UdpSocket {
721: inner: self.inner.clone(),
libnative/io/net.rs:237:1-237:1 -struct- definition:
pub struct TcpStream {
inner: UnsafeArc<Inner>,
read_deadline: u64,
references:- 8284: fn new(inner: Inner) -> TcpStream {
285: TcpStream {
286: inner: UnsafeArc::new(inner),
--
388: fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
389: box TcpStream {
390: inner: self.inner.clone(),
--
416: impl rtio::RtioSocket for TcpStream {
417: fn socket_name(&mut self) -> IoResult<ip::SocketAddr> {
--
496: pub fn native_accept(&mut self) -> IoResult<TcpStream> {
497: if self.deadline != 0 {
libnative/io/net.rs:42:1-42:1 -fn- definition:
fn ip_to_inaddr(ip: ip::IpAddr) -> InAddr {
match ip {
ip::Ipv4Addr(a, b, c, d) => {
references:- 272: let storage: libc::sockaddr_storage = mem::init();
73: let len = match ip_to_inaddr(addr.ip) {
74: InAddr(inaddr) => {
--
580: opt: libc::c_int) -> IoResult<()> {
581: match ip_to_inaddr(addr) {
582: InAddr(addr) => {
libnative/io/net.rs:105:1-105:1 -fn- definition:
fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
payload: T) -> IoResult<()> {
unsafe {
references:- 9704: fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
705: setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
706: ttl as libc::c_int)
--
708: fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
709: setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
710: }
libnative/io/net.rs:143:21-143:21 -fn- definition:
fn last_error() -> io::IoError {
super::last_error()
}
references:- 12892: if ret < 0 {
893: Err(last_error())
894: } else {
libnative/io/net.rs:621:16-621:16 -NK_AS_STR_TODO- definition:
impl rtio::RtioUdpSocket for UdpSocket {
fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, ip::SocketAddr)> {
let fd = self.fd();
references:- 2635: buf.as_mut_ptr() as *mut libc::c_void,
636: buf.len() as msglen_t,
637: flags,
--
657: buf as *libc::c_void,
658: len as msglen_t,
659: flags,
libnative/io/net.rs:33:2-33:2 -fn- definition:
}
pub fn ntohs(u: u16) -> u16 {
mem::from_be16(u)
references:- 10198: let f = ntohs(storage.sin6_addr.s6_addr[5]);
199: let g = ntohs(storage.sin6_addr.s6_addr[6]);
200: let h = ntohs(storage.sin6_addr.s6_addr[7]);
201: Ok(ip::SocketAddr {
202: ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
203: port: ntohs(storage.sin6_port),
204: })
libnative/io/net.rs:487:1-487:1 -struct- definition:
pub struct TcpAcceptor {
listener: TcpListener,
deadline: u64,
references:- 5468: -1 => Err(last_error()),
469: _ => Ok(TcpAcceptor { listener: self, deadline: 0 })
470: }
--
523: impl rtio::RtioTcpAcceptor for TcpAcceptor {
524: fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream:Send>> {
libnative/io/net.rs:835:1-835:1 -fn- definition:
pub fn write<T>(fd: sock_t,
deadline: u64,
buf: &[u8],
references:- 3366: };
367: match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
368: Ok(_) => Ok(()),
--
664: let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
665: if n != buf.len() {
libnative/io/pipe_unix.rs:
172: };
173: match net::write(fd, self.write_deadline, buf, true, dolock, dowrite) {
174: Ok(_) => Ok(()),