(index<- ) ./librustuv/net.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
12 use libc;
13 use std::cast;
14 use std::io;
15 use std::io::IoError;
16 use std::io::net::ip;
17 use std::mem;
18 use std::ptr;
19 use std::rt::rtio;
20 use std::rt::task::BlockedTask;
21
22 use homing::{HomingIO, HomeHandle};
23 use rc::Refcount;
24 use stream::StreamWatcher;
25 use super::{Loop, Request, UvError, Buf, status_to_io_result,
26 uv_error_to_io_error, UvHandle, slice_to_uv_buf,
27 wait_until_woken_after, wakeup};
28 use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
29 use uvio::UvIoFactory;
30 use uvll;
31
32 ////////////////////////////////////////////////////////////////////////////////
33 /// Generic functions related to dealing with sockaddr things
34 ////////////////////////////////////////////////////////////////////////////////
35
36 pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
37 pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
38
39 pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
40 len: uint) -> ip::SocketAddr {
41 match storage.ss_family as c_int {
42 libc::AF_INET => {
43 assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
44 let storage: &libc::sockaddr_in = unsafe {
45 cast::transmute(storage)
46 };
47 let addr = storage.sin_addr.s_addr as u32;
48 let a = (addr >> 0) as u8;
49 let b = (addr >> 8) as u8;
50 let c = (addr >> 16) as u8;
51 let d = (addr >> 24) as u8;
52 ip::SocketAddr {
53 ip: ip::Ipv4Addr(a, b, c, d),
54 port: ntohs(storage.sin_port),
55 }
56 }
57 libc::AF_INET6 => {
58 assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
59 let storage: &libc::sockaddr_in6 = unsafe {
60 cast::transmute(storage)
61 };
62 let a = ntohs(storage.sin6_addr.s6_addr[0]);
63 let b = ntohs(storage.sin6_addr.s6_addr[1]);
64 let c = ntohs(storage.sin6_addr.s6_addr[2]);
65 let d = ntohs(storage.sin6_addr.s6_addr[3]);
66 let e = ntohs(storage.sin6_addr.s6_addr[4]);
67 let f = ntohs(storage.sin6_addr.s6_addr[5]);
68 let g = ntohs(storage.sin6_addr.s6_addr[6]);
69 let h = ntohs(storage.sin6_addr.s6_addr[7]);
70 ip::SocketAddr {
71 ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
72 port: ntohs(storage.sin6_port),
73 }
74 }
75 n => {
76 fail!("unknown family {}", n);
77 }
78 }
79 }
80
81 fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
82 unsafe {
83 let mut storage: libc::sockaddr_storage = mem::init();
84 let len = match addr.ip {
85 ip::Ipv4Addr(a, b, c, d) => {
86 let storage: &mut libc::sockaddr_in =
87 cast::transmute(&mut storage);
88 (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
89 (*storage).sin_port = htons(addr.port);
90 (*storage).sin_addr = libc::in_addr {
91 s_addr: (d as u32 << 24) |
92 (c as u32 << 16) |
93 (b as u32 << 8) |
94 (a as u32 << 0)
95 };
96 mem::size_of::<libc::sockaddr_in>()
97 }
98 ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
99 let storage: &mut libc::sockaddr_in6 =
100 cast::transmute(&mut storage);
101 storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
102 storage.sin6_port = htons(addr.port);
103 storage.sin6_addr = libc::in6_addr {
104 s6_addr: [
105 htons(a),
106 htons(b),
107 htons(c),
108 htons(d),
109 htons(e),
110 htons(f),
111 htons(g),
112 htons(h),
113 ]
114 };
115 mem::size_of::<libc::sockaddr_in6>()
116 }
117 };
118 return (storage, len);
119 }
120 }
121
122 enum SocketNameKind {
123 TcpPeer,
124 Tcp,
125 Udp
126 }
127
128 fn socket_name(sk: SocketNameKind,
129 handle: *c_void) -> Result<ip::SocketAddr, IoError> {
130 let getsockname = match sk {
131 TcpPeer => uvll::uv_tcp_getpeername,
132 Tcp => uvll::uv_tcp_getsockname,
133 Udp => uvll::uv_udp_getsockname,
134 };
135
136 // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
137 let mut sockaddr: libc::sockaddr_storage = unsafe { mem::init() };
138 let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
139
140 let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
141 match unsafe {
142 getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
143 } {
144 0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
145 n => Err(uv_error_to_io_error(UvError(n)))
146 }
147 }
148
149 ////////////////////////////////////////////////////////////////////////////////
150 /// TCP implementation
151 ////////////////////////////////////////////////////////////////////////////////
152
153 pub struct TcpWatcher {
154 handle: *uvll::uv_tcp_t,
155 stream: StreamWatcher,
156 home: HomeHandle,
157 refcount: Refcount,
158
159 // libuv can't support concurrent reads and concurrent writes of the same
160 // stream object, so we use these access guards in order to arbitrate among
161 // multiple concurrent reads and writes. Note that libuv *can* read and
162 // write simultaneously, it just can't read and read simultaneously.
163 read_access: AccessTimeout,
164 write_access: AccessTimeout,
165 }
166
167 pub struct TcpListener {
168 home: HomeHandle,
169 handle: *uvll::uv_pipe_t,
170 closing_task: Option<BlockedTask>,
171 outgoing: Sender<Result<Box<rtio::RtioTcpStream:Send>, IoError>>,
172 incoming: Receiver<Result<Box<rtio::RtioTcpStream:Send>, IoError>>,
173 }
174
175 pub struct TcpAcceptor {
176 listener: Box<TcpListener>,
177 timeout: AcceptTimeout,
178 }
179
180 // TCP watchers (clients/streams)
181
182 impl TcpWatcher {
183 pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
184 let handle = io.make_handle();
185 TcpWatcher::new_home(&io.loop_, handle)
186 }
187
188 fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
189 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
190 assert_eq!(unsafe {
191 uvll::uv_tcp_init(loop_.handle, handle)
192 }, 0);
193 TcpWatcher {
194 home: home,
195 handle: handle,
196 stream: StreamWatcher::new(handle),
197 refcount: Refcount::new(),
198 read_access: AccessTimeout::new(),
199 write_access: AccessTimeout::new(),
200 }
201 }
202
203 pub fn connect(io: &mut UvIoFactory,
204 address: ip::SocketAddr,
205 timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
206 let tcp = TcpWatcher::new(io);
207 let cx = ConnectCtx { status: -1, task: None, timer: None };
208 let (addr, _len) = addr_to_sockaddr(address);
209 let addr_p = &addr as *_ as *libc::sockaddr;
210 cx.connect(tcp, timeout, io, |req, tcp, cb| {
211 unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
212 })
213 }
214 }
215
216 impl HomingIO for TcpWatcher {
217 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
218 }
219
220 impl rtio::RtioSocket for TcpWatcher {
221 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
222 let _m = self.fire_homing_missile();
223 socket_name(Tcp, self.handle)
224 }
225 }
226
227 impl rtio::RtioTcpStream for TcpWatcher {
228 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
229 let m = self.fire_homing_missile();
230 let guard = try!(self.read_access.grant(m));
231
232 // see comments in close_read about this check
233 if guard.access.is_closed() {
234 return Err(io::standard_error(io::EndOfFile))
235 }
236
237 self.stream.read(buf).map_err(uv_error_to_io_error)
238 }
239
240 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
241 let m = self.fire_homing_missile();
242 let guard = try!(self.write_access.grant(m));
243 self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
244 }
245
246 fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
247 let _m = self.fire_homing_missile();
248 socket_name(TcpPeer, self.handle)
249 }
250
251 fn control_congestion(&mut self) -> Result<(), IoError> {
252 let _m = self.fire_homing_missile();
253 status_to_io_result(unsafe {
254 uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
255 })
256 }
257
258 fn nodelay(&mut self) -> Result<(), IoError> {
259 let _m = self.fire_homing_missile();
260 status_to_io_result(unsafe {
261 uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
262 })
263 }
264
265 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
266 let _m = self.fire_homing_missile();
267 status_to_io_result(unsafe {
268 uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
269 delay_in_seconds as c_uint)
270 })
271 }
272
273 fn letdie(&mut self) -> Result<(), IoError> {
274 let _m = self.fire_homing_missile();
275 status_to_io_result(unsafe {
276 uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
277 })
278 }
279
280 fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
281 box TcpWatcher {
282 handle: self.handle,
283 stream: StreamWatcher::new(self.handle),
284 home: self.home.clone(),
285 refcount: self.refcount.clone(),
286 read_access: self.read_access.clone(),
287 write_access: self.write_access.clone(),
288 } as Box<rtio::RtioTcpStream:Send>
289 }
290
291 fn close_read(&mut self) -> Result<(), IoError> {
292 // see comments in PipeWatcher::close_read
293 let task = {
294 let m = self.fire_homing_missile();
295 self.read_access.access.close(&m);
296 self.stream.cancel_read(uvll::EOF as libc::ssize_t)
297 };
298 let _ = task.map(|t| t.reawaken());
299 Ok(())
300 }
301
302 fn close_write(&mut self) -> Result<(), IoError> {
303 let _m = self.fire_homing_missile();
304 shutdown(self.handle, &self.uv_loop())
305 }
306
307 fn set_timeout(&mut self, timeout: Option<u64>) {
308 self.set_read_timeout(timeout);
309 self.set_write_timeout(timeout);
310 }
311
312 fn set_read_timeout(&mut self, ms: Option<u64>) {
313 let _m = self.fire_homing_missile();
314 let loop_ = self.uv_loop();
315 self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
316 &self.stream as *_ as uint);
317
318 fn cancel_read(stream: uint) -> Option<BlockedTask> {
319 let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
320 stream.cancel_read(uvll::ECANCELED as ssize_t)
321 }
322 }
323
324 fn set_write_timeout(&mut self, ms: Option<u64>) {
325 let _m = self.fire_homing_missile();
326 let loop_ = self.uv_loop();
327 self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
328 &self.stream as *_ as uint);
329
330 fn cancel_write(stream: uint) -> Option<BlockedTask> {
331 let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
332 stream.cancel_write()
333 }
334 }
335 }
336
337 impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
338 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
339 }
340
341 impl Drop for TcpWatcher {
342 fn drop(&mut self) {
343 let _m = self.fire_homing_missile();
344 if self.refcount.decrement() {
345 self.close();
346 }
347 }
348 }
349
350 // TCP listeners (unbound servers)
351
352 impl TcpListener {
353 pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
354 -> Result<Box<TcpListener>, UvError> {
355 let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
356 assert_eq!(unsafe {
357 uvll::uv_tcp_init(io.uv_loop(), handle)
358 }, 0);
359 let (tx, rx) = channel();
360 let l = box TcpListener {
361 home: io.make_handle(),
362 handle: handle,
363 closing_task: None,
364 outgoing: tx,
365 incoming: rx,
366 };
367 let (addr, _len) = addr_to_sockaddr(address);
368 let res = unsafe {
369 let addr_p = &addr as *libc::sockaddr_storage;
370 uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
371 };
372 return match res {
373 0 => Ok(l.install()),
374 n => Err(UvError(n))
375 };
376 }
377 }
378
379 impl HomingIO for TcpListener {
380 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
381 }
382
383 impl UvHandle<uvll::uv_tcp_t> for TcpListener {
384 fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
385 }
386
387 impl rtio::RtioSocket for TcpListener {
388 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
389 let _m = self.fire_homing_missile();
390 socket_name(Tcp, self.handle)
391 }
392 }
393
394 impl rtio::RtioTcpListener for TcpListener {
395 fn listen(~self) -> Result<Box<rtio::RtioTcpAcceptor:Send>, IoError> {
396 // create the acceptor object from ourselves
397 let mut acceptor = box TcpAcceptor {
398 listener: self,
399 timeout: AcceptTimeout::new(),
400 };
401
402 let _m = acceptor.fire_homing_missile();
403 // FIXME: the 128 backlog should be configurable
404 match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
405 0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor:Send>),
406 n => Err(uv_error_to_io_error(UvError(n))),
407 }
408 }
409 }
410
411 extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
412 assert!(status != uvll::ECANCELED);
413 let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
414 let msg = match status {
415 0 => {
416 let loop_ = Loop::wrap(unsafe {
417 uvll::get_loop_for_uv_handle(server)
418 });
419 let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
420 assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
421 Ok(box client as Box<rtio::RtioTcpStream:Send>)
422 }
423 n => Err(uv_error_to_io_error(UvError(n)))
424 };
425 tcp.outgoing.send(msg);
426 }
427
428 impl Drop for TcpListener {
429 fn drop(&mut self) {
430 let _m = self.fire_homing_missile();
431 self.close();
432 }
433 }
434
435 // TCP acceptors (bound servers)
436
437 impl HomingIO for TcpAcceptor {
438 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
439 }
440
441 impl rtio::RtioSocket for TcpAcceptor {
442 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
443 let _m = self.fire_homing_missile();
444 socket_name(Tcp, self.listener.handle)
445 }
446 }
447
448 impl rtio::RtioTcpAcceptor for TcpAcceptor {
449 fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream:Send>, IoError> {
450 self.timeout.accept(&self.listener.incoming)
451 }
452
453 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
454 let _m = self.fire_homing_missile();
455 status_to_io_result(unsafe {
456 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
457 })
458 }
459
460 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
461 let _m = self.fire_homing_missile();
462 status_to_io_result(unsafe {
463 uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
464 })
465 }
466
467 fn set_timeout(&mut self, ms: Option<u64>) {
468 let _m = self.fire_homing_missile();
469 match ms {
470 None => self.timeout.clear(),
471 Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
472 }
473 }
474 }
475
476 ////////////////////////////////////////////////////////////////////////////////
477 /// UDP implementation
478 ////////////////////////////////////////////////////////////////////////////////
479
480 pub struct UdpWatcher {
481 handle: *uvll::uv_udp_t,
482 home: HomeHandle,
483
484 // See above for what these fields are
485 refcount: Refcount,
486 read_access: AccessTimeout,
487 write_access: AccessTimeout,
488
489 blocked_sender: Option<BlockedTask>,
490 }
491
492 struct UdpRecvCtx {
493 task: Option<BlockedTask>,
494 buf: Option<Buf>,
495 result: Option<(ssize_t, Option<ip::SocketAddr>)>,
496 }
497
498 struct UdpSendCtx {
499 result: c_int,
500 data: Option<Vec<u8>>,
501 udp: *mut UdpWatcher,
502 }
503
504 impl UdpWatcher {
505 pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
506 -> Result<UdpWatcher, UvError> {
507 let udp = UdpWatcher {
508 handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
509 home: io.make_handle(),
510 refcount: Refcount::new(),
511 read_access: AccessTimeout::new(),
512 write_access: AccessTimeout::new(),
513 blocked_sender: None,
514 };
515 assert_eq!(unsafe {
516 uvll::uv_udp_init(io.uv_loop(), udp.handle)
517 }, 0);
518 let (addr, _len) = addr_to_sockaddr(address);
519 let result = unsafe {
520 let addr_p = &addr as *libc::sockaddr_storage;
521 uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
522 };
523 return match result {
524 0 => Ok(udp),
525 n => Err(UvError(n)),
526 };
527 }
528 }
529
530 impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
531 fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
532 }
533
534 impl HomingIO for UdpWatcher {
535 fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
536 }
537
538 impl rtio::RtioSocket for UdpWatcher {
539 fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
540 let _m = self.fire_homing_missile();
541 socket_name(Udp, self.handle)
542 }
543 }
544
545 impl rtio::RtioUdpSocket for UdpWatcher {
546 fn recvfrom(&mut self, buf: &mut [u8])
547 -> Result<(uint, ip::SocketAddr), IoError>
548 {
549 let loop_ = self.uv_loop();
550 let m = self.fire_homing_missile();
551 let _guard = try!(self.read_access.grant(m));
552
553 return match unsafe {
554 uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
555 } {
556 0 => {
557 let mut cx = UdpRecvCtx {
558 task: None,
559 buf: Some(slice_to_uv_buf(buf)),
560 result: None,
561 };
562 let handle = self.handle;
563 wait_until_woken_after(&mut cx.task, &loop_, || {
564 unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
565 });
566 match cx.result.take_unwrap() {
567 (n, _) if n < 0 =>
568 Err(uv_error_to_io_error(UvError(n as c_int))),
569 (n, addr) => Ok((n as uint, addr.unwrap()))
570 }
571 }
572 n => Err(uv_error_to_io_error(UvError(n)))
573 };
574
575 extern fn alloc_cb(handle: *uvll::uv_udp_t,
576 _suggested_size: size_t,
577 buf: *mut Buf) {
578 unsafe {
579 let cx = uvll::get_data_for_uv_handle(handle);
580 let cx = &mut *(cx as *mut UdpRecvCtx);
581 *buf = cx.buf.take().expect("recv alloc_cb called more than once")
582 }
583 }
584
585 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
586 addr: *libc::sockaddr, _flags: c_uint) {
587 assert!(nread != uvll::ECANCELED as ssize_t);
588 let cx = unsafe {
589 &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
590 };
591
592 // When there's no data to read the recv callback can be a no-op.
593 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
594 // this we just drop back to kqueue and wait for the next callback.
595 if nread == 0 {
596 cx.buf = Some(unsafe { *buf });
597 return
598 }
599
600 unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) }
601 let addr = if addr == ptr::null() {
602 None
603 } else {
604 let len = mem::size_of::<libc::sockaddr_storage>();
605 Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
606 };
607 cx.result = Some((nread, addr));
608 wakeup(&mut cx.task);
609 }
610 }
611
612 fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
613 let m = self.fire_homing_missile();
614 let loop_ = self.uv_loop();
615 let guard = try!(self.write_access.grant(m));
616
617 let mut req = Request::new(uvll::UV_UDP_SEND);
618 let (addr, _len) = addr_to_sockaddr(dst);
619 let addr_p = &addr as *_ as *libc::sockaddr;
620
621 // see comments in StreamWatcher::write for why we may allocate a buffer
622 // here.
623 let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None};
624 let uv_buf = if guard.can_timeout {
625 slice_to_uv_buf(data.get_ref().as_slice())
626 } else {
627 slice_to_uv_buf(buf)
628 };
629
630 return match unsafe {
631 uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb)
632 } {
633 0 => {
634 req.defuse(); // uv callback now owns this request
635 let mut cx = UdpSendCtx {
636 result: uvll::ECANCELED, data: data, udp: self as *mut _
637 };
638 wait_until_woken_after(&mut self.blocked_sender, &loop_, || {
639 req.set_data(&cx);
640 });
641
642 if cx.result != uvll::ECANCELED {
643 return match cx.result {
644 0 => Ok(()),
645 n => Err(uv_error_to_io_error(UvError(n)))
646 }
647 }
648 let new_cx = box UdpSendCtx {
649 result: 0,
650 udp: 0 as *mut UdpWatcher,
651 data: cx.data.take(),
652 };
653 unsafe {
654 req.set_data(&*new_cx);
655 cast::forget(new_cx);
656 }
657 Err(uv_error_to_io_error(UvError(cx.result)))
658 }
659 n => Err(uv_error_to_io_error(UvError(n)))
660 };
661
662 // This function is the same as stream::write_cb, but adapted for udp
663 // instead of streams.
664 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
665 let req = Request::wrap(req);
666 let cx: &mut UdpSendCtx = unsafe { req.get_data() };
667 cx.result = status;
668
669 if cx.udp as uint != 0 {
670 let udp: &mut UdpWatcher = unsafe { &mut *cx.udp };
671 wakeup(&mut udp.blocked_sender);
672 } else {
673 let _cx: Box<UdpSendCtx> = unsafe { cast::transmute(cx) };
674 }
675 }
676 }
677
678 fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
679 let _m = self.fire_homing_missile();
680 status_to_io_result(unsafe {
681 multi.to_str().with_c_str(|m_addr| {
682 uvll::uv_udp_set_membership(self.handle,
683 m_addr, ptr::null(),
684 uvll::UV_JOIN_GROUP)
685 })
686 })
687 }
688
689 fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
690 let _m = self.fire_homing_missile();
691 status_to_io_result(unsafe {
692 multi.to_str().with_c_str(|m_addr| {
693 uvll::uv_udp_set_membership(self.handle,
694 m_addr, ptr::null(),
695 uvll::UV_LEAVE_GROUP)
696 })
697 })
698 }
699
700 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
701 let _m = self.fire_homing_missile();
702 status_to_io_result(unsafe {
703 uvll::uv_udp_set_multicast_loop(self.handle,
704 1 as c_int)
705 })
706 }
707
708 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
709 let _m = self.fire_homing_missile();
710 status_to_io_result(unsafe {
711 uvll::uv_udp_set_multicast_loop(self.handle,
712 0 as c_int)
713 })
714 }
715
716 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
717 let _m = self.fire_homing_missile();
718 status_to_io_result(unsafe {
719 uvll::uv_udp_set_multicast_ttl(self.handle,
720 ttl as c_int)
721 })
722 }
723
724 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
725 let _m = self.fire_homing_missile();
726 status_to_io_result(unsafe {
727 uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
728 })
729 }
730
731 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
732 let _m = self.fire_homing_missile();
733 status_to_io_result(unsafe {
734 uvll::uv_udp_set_broadcast(self.handle,
735 1 as c_int)
736 })
737 }
738
739 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
740 let _m = self.fire_homing_missile();
741 status_to_io_result(unsafe {
742 uvll::uv_udp_set_broadcast(self.handle,
743 0 as c_int)
744 })
745 }
746
747 fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
748 box UdpWatcher {
749 handle: self.handle,
750 home: self.home.clone(),
751 refcount: self.refcount.clone(),
752 write_access: self.write_access.clone(),
753 read_access: self.read_access.clone(),
754 blocked_sender: None,
755 } as Box<rtio::RtioUdpSocket:Send>
756 }
757
758 fn set_timeout(&mut self, timeout: Option<u64>) {
759 self.set_read_timeout(timeout);
760 self.set_write_timeout(timeout);
761 }
762
763 fn set_read_timeout(&mut self, ms: Option<u64>) {
764 let _m = self.fire_homing_missile();
765 let loop_ = self.uv_loop();
766 self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
767 self.handle as uint);
768
769 fn cancel_read(stream: uint) -> Option<BlockedTask> {
770 // This method is quite similar to StreamWatcher::cancel_read, see
771 // there for more information
772 let handle = stream as *uvll::uv_udp_t;
773 assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0);
774 let data = unsafe {
775 let data = uvll::get_data_for_uv_handle(handle);
776 if data.is_null() { return None }
777 uvll::set_data_for_uv_handle(handle, 0 as *int);
778 &mut *(data as *mut UdpRecvCtx)
779 };
780 data.result = Some((uvll::ECANCELED as ssize_t, None));
781 data.task.take()
782 }
783 }
784
785 fn set_write_timeout(&mut self, ms: Option<u64>) {
786 let _m = self.fire_homing_missile();
787 let loop_ = self.uv_loop();
788 self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
789 self as *mut _ as uint);
790
791 fn cancel_write(stream: uint) -> Option<BlockedTask> {
792 let stream: &mut UdpWatcher = unsafe { cast::transmute(stream) };
793 stream.blocked_sender.take()
794 }
795 }
796 }
797
798 impl Drop for UdpWatcher {
799 fn drop(&mut self) {
800 // Send ourselves home to close this handle (blocking while doing so).
801 let _m = self.fire_homing_missile();
802 if self.refcount.decrement() {
803 self.close();
804 }
805 }
806 }
807
808 ////////////////////////////////////////////////////////////////////////////////
809 // Shutdown helper
810 ////////////////////////////////////////////////////////////////////////////////
811
812 pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
813 struct Ctx {
814 slot: Option<BlockedTask>,
815 status: c_int,
816 }
817 let mut req = Request::new(uvll::UV_SHUTDOWN);
818
819 return match unsafe { uvll::uv_shutdown(req.handle, handle, shutdown_cb) } {
820 0 => {
821 req.defuse(); // uv callback now owns this request
822 let mut cx = Ctx { slot: None, status: 0 };
823
824 wait_until_woken_after(&mut cx.slot, loop_, || {
825 req.set_data(&cx);
826 });
827
828 status_to_io_result(cx.status)
829 }
830 n => Err(uv_error_to_io_error(UvError(n)))
831 };
832
833 extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
834 let req = Request::wrap(req);
835 assert!(status != uvll::ECANCELED);
836 let cx: &mut Ctx = unsafe { req.get_data() };
837 cx.status = status;
838 wakeup(&mut cx.slot);
839 }
840 }
841
842 #[cfg(test)]
843 mod test {
844 use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
845 RtioUdpSocket};
846 use std::io::test::{next_test_ip4, next_test_ip6};
847
848 use super::{UdpWatcher, TcpWatcher, TcpListener};
849 use super::super::local_loop;
850
851 #[test]
852 fn connect_close_ip4() {
853 match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
854 Ok(..) => fail!(),
855 Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
856 }
857 }
858
859 #[test]
860 fn connect_close_ip6() {
861 match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
862 Ok(..) => fail!(),
863 Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
864 }
865 }
866
867 #[test]
868 fn udp_bind_close_ip4() {
869 match UdpWatcher::bind(local_loop(), next_test_ip4()) {
870 Ok(..) => {}
871 Err(..) => fail!()
872 }
873 }
874
875 #[test]
876 fn udp_bind_close_ip6() {
877 match UdpWatcher::bind(local_loop(), next_test_ip6()) {
878 Ok(..) => {}
879 Err(..) => fail!()
880 }
881 }
882
883 #[test]
884 fn listen_ip4() {
885 let (tx, rx) = channel();
886 let addr = next_test_ip4();
887
888 spawn(proc() {
889 let w = match TcpListener::bind(local_loop(), addr) {
890 Ok(w) => w, Err(e) => fail!("{:?}", e)
891 };
892 let mut w = match w.listen() {
893 Ok(w) => w, Err(e) => fail!("{:?}", e),
894 };
895 tx.send(());
896 match w.accept() {
897 Ok(mut stream) => {
898 let mut buf = [0u8, ..10];
899 match stream.read(buf) {
900 Ok(10) => {} e => fail!("{:?}", e),
901 }
902 for i in range(0, 10u8) {
903 assert_eq!(buf[i as uint], i + 1);
904 }
905 }
906 Err(e) => fail!("{:?}", e)
907 }
908 });
909
910 rx.recv();
911 let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
912 Ok(w) => w, Err(e) => fail!("{:?}", e)
913 };
914 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
915 Ok(()) => {}, Err(e) => fail!("{:?}", e)
916 }
917 }
918
919 #[test]
920 fn listen_ip6() {
921 let (tx, rx) = channel();
922 let addr = next_test_ip6();
923
924 spawn(proc() {
925 let w = match TcpListener::bind(local_loop(), addr) {
926 Ok(w) => w, Err(e) => fail!("{:?}", e)
927 };
928 let mut w = match w.listen() {
929 Ok(w) => w, Err(e) => fail!("{:?}", e),
930 };
931 tx.send(());
932 match w.accept() {
933 Ok(mut stream) => {
934 let mut buf = [0u8, ..10];
935 match stream.read(buf) {
936 Ok(10) => {} e => fail!("{:?}", e),
937 }
938 for i in range(0, 10u8) {
939 assert_eq!(buf[i as uint], i + 1);
940 }
941 }
942 Err(e) => fail!("{:?}", e)
943 }
944 });
945
946 rx.recv();
947 let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
948 Ok(w) => w, Err(e) => fail!("{:?}", e)
949 };
950 match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
951 Ok(()) => {}, Err(e) => fail!("{:?}", e)
952 }
953 }
954
955 #[test]
956 fn udp_recv_ip4() {
957 let (tx, rx) = channel();
958 let client = next_test_ip4();
959 let server = next_test_ip4();
960
961 spawn(proc() {
962 match UdpWatcher::bind(local_loop(), server) {
963 Ok(mut w) => {
964 tx.send(());
965 let mut buf = [0u8, ..10];
966 match w.recvfrom(buf) {
967 Ok((10, addr)) => assert_eq!(addr, client),
968 e => fail!("{:?}", e),
969 }
970 for i in range(0, 10u8) {
971 assert_eq!(buf[i as uint], i + 1);
972 }
973 }
974 Err(e) => fail!("{:?}", e)
975 }
976 });
977
978 rx.recv();
979 let mut w = match UdpWatcher::bind(local_loop(), client) {
980 Ok(w) => w, Err(e) => fail!("{:?}", e)
981 };
982 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
983 Ok(()) => {}, Err(e) => fail!("{:?}", e)
984 }
985 }
986
987 #[test]
988 fn udp_recv_ip6() {
989 let (tx, rx) = channel();
990 let client = next_test_ip6();
991 let server = next_test_ip6();
992
993 spawn(proc() {
994 match UdpWatcher::bind(local_loop(), server) {
995 Ok(mut w) => {
996 tx.send(());
997 let mut buf = [0u8, ..10];
998 match w.recvfrom(buf) {
999 Ok((10, addr)) => assert_eq!(addr, client),
1000 e => fail!("{:?}", e),
1001 }
1002 for i in range(0, 10u8) {
1003 assert_eq!(buf[i as uint], i + 1);
1004 }
1005 }
1006 Err(e) => fail!("{:?}", e)
1007 }
1008 });
1009
1010 rx.recv();
1011 let mut w = match UdpWatcher::bind(local_loop(), client) {
1012 Ok(w) => w, Err(e) => fail!("{:?}", e)
1013 };
1014 match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
1015 Ok(()) => {}, Err(e) => fail!("{:?}", e)
1016 }
1017 }
1018
1019 #[test]
1020 fn test_read_read_read() {
1021 let addr = next_test_ip4();
1022 static MAX: uint = 5000;
1023 let (tx, rx) = channel();
1024
1025 spawn(proc() {
1026 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1027 let mut acceptor = listener.listen().unwrap();
1028 tx.send(());
1029 let mut stream = acceptor.accept().unwrap();
1030 let buf = [1, .. 2048];
1031 let mut total_bytes_written = 0;
1032 while total_bytes_written < MAX {
1033 assert!(stream.write(buf).is_ok());
1034 uvdebug!("wrote bytes");
1035 total_bytes_written += buf.len();
1036 }
1037 });
1038
1039 rx.recv();
1040 let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1041 let mut buf = [0, .. 2048];
1042 let mut total_bytes_read = 0;
1043 while total_bytes_read < MAX {
1044 let nread = stream.read(buf).unwrap();
1045 total_bytes_read += nread;
1046 for i in range(0u, nread) {
1047 assert_eq!(buf[i], 1);
1048 }
1049 }
1050 uvdebug!("read {} bytes total", total_bytes_read);
1051 }
1052
1053 #[test]
1054 #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
1055 fn test_udp_twice() {
1056 let server_addr = next_test_ip4();
1057 let client_addr = next_test_ip4();
1058 let (tx, rx) = channel();
1059
1060 spawn(proc() {
1061 let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
1062 rx.recv();
1063 assert!(client.sendto([1], server_addr).is_ok());
1064 assert!(client.sendto([2], server_addr).is_ok());
1065 });
1066
1067 let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
1068 tx.send(());
1069 let mut buf1 = [0];
1070 let mut buf2 = [0];
1071 let (nread1, src1) = server.recvfrom(buf1).unwrap();
1072 let (nread2, src2) = server.recvfrom(buf2).unwrap();
1073 assert_eq!(nread1, 1);
1074 assert_eq!(nread2, 1);
1075 assert_eq!(src1, client_addr);
1076 assert_eq!(src2, client_addr);
1077 assert_eq!(buf1[0], 1);
1078 assert_eq!(buf2[0], 2);
1079 }
1080
1081 #[test]
1082 fn test_udp_many_read() {
1083 let server_out_addr = next_test_ip4();
1084 let server_in_addr = next_test_ip4();
1085 let client_out_addr = next_test_ip4();
1086 let client_in_addr = next_test_ip4();
1087 static MAX: uint = 500_000;
1088
1089 let (tx1, rx1) = channel::<()>();
1090 let (tx2, rx2) = channel::<()>();
1091
1092 spawn(proc() {
1093 let l = local_loop();
1094 let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
1095 let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1096 let (tx, rx) = (tx2, rx1);
1097 tx.send(());
1098 rx.recv();
1099 let msg = [1, .. 2048];
1100 let mut total_bytes_sent = 0;
1101 let mut buf = [1];
1102 while buf[0] == 1 {
1103 // send more data
1104 assert!(server_out.sendto(msg, client_in_addr).is_ok());
1105 total_bytes_sent += msg.len();
1106 // check if the client has received enough
1107 let res = server_in.recvfrom(buf);
1108 assert!(res.is_ok());
1109 let (nread, src) = res.unwrap();
1110 assert_eq!(nread, 1);
1111 assert_eq!(src, client_out_addr);
1112 }
1113 assert!(total_bytes_sent >= MAX);
1114 });
1115
1116 let l = local_loop();
1117 let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
1118 let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1119 let (tx, rx) = (tx1, rx2);
1120 rx.recv();
1121 tx.send(());
1122 let mut total_bytes_recv = 0;
1123 let mut buf = [0, .. 2048];
1124 while total_bytes_recv < MAX {
1125 // ask for more
1126 assert!(client_out.sendto([1], server_in_addr).is_ok());
1127 // wait for data
1128 let res = client_in.recvfrom(buf);
1129 assert!(res.is_ok());
1130 let (nread, src) = res.unwrap();
1131 assert_eq!(src, server_out_addr);
1132 total_bytes_recv += nread;
1133 for i in range(0u, nread) {
1134 assert_eq!(buf[i], 1);
1135 }
1136 }
1137 // tell the server we're done
1138 assert!(client_out.sendto([0], server_in_addr).is_ok());
1139 }
1140
1141 #[test]
1142 fn test_read_and_block() {
1143 let addr = next_test_ip4();
1144 let (tx, rx) = channel::<Receiver<()>>();
1145
1146 spawn(proc() {
1147 let rx = rx.recv();
1148 let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1149 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1150 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1151 rx.recv();
1152 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1153 stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1154 rx.recv();
1155 });
1156
1157 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1158 let mut acceptor = listener.listen().unwrap();
1159 let (tx2, rx2) = channel();
1160 tx.send(rx2);
1161 let mut stream = acceptor.accept().unwrap();
1162 let mut buf = [0, .. 2048];
1163
1164 let expected = 32;
1165 let mut current = 0;
1166 let mut reads = 0;
1167
1168 while current < expected {
1169 let nread = stream.read(buf).unwrap();
1170 for i in range(0u, nread) {
1171 let val = buf[i] as uint;
1172 assert_eq!(val, current % 8);
1173 current += 1;
1174 }
1175 reads += 1;
1176
1177 let _ = tx2.send_opt(());
1178 }
1179
1180 // Make sure we had multiple reads
1181 assert!(reads > 1);
1182 }
1183
1184 #[test]
1185 fn test_simple_tcp_server_and_client_on_diff_threads() {
1186 let addr = next_test_ip4();
1187
1188 spawn(proc() {
1189 let listener = TcpListener::bind(local_loop(), addr).unwrap();
1190 let mut acceptor = listener.listen().unwrap();
1191 let mut stream = acceptor.accept().unwrap();
1192 let mut buf = [0, .. 2048];
1193 let nread = stream.read(buf).unwrap();
1194 assert_eq!(nread, 8);
1195 for i in range(0u, nread) {
1196 assert_eq!(buf[i], i as u8);
1197 }
1198 });
1199
1200 let mut stream = TcpWatcher::connect(local_loop(), addr, None);
1201 while stream.is_err() {
1202 stream = TcpWatcher::connect(local_loop(), addr, None);
1203 }
1204 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1205 }
1206
1207 #[should_fail] #[test]
1208 fn tcp_listener_fail_cleanup() {
1209 let addr = next_test_ip4();
1210 let w = TcpListener::bind(local_loop(), addr).unwrap();
1211 let _w = w.listen().unwrap();
1212 fail!();
1213 }
1214
1215 #[should_fail] #[test]
1216 fn tcp_stream_fail_cleanup() {
1217 let (tx, rx) = channel();
1218 let addr = next_test_ip4();
1219
1220 spawn(proc() {
1221 let w = TcpListener::bind(local_loop(), addr).unwrap();
1222 let mut w = w.listen().unwrap();
1223 tx.send(());
1224 drop(w.accept().unwrap());
1225 });
1226 rx.recv();
1227 let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1228 fail!();
1229 }
1230
1231 #[should_fail] #[test]
1232 fn udp_listener_fail_cleanup() {
1233 let addr = next_test_ip4();
1234 let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
1235 fail!();
1236 }
1237
1238 #[should_fail] #[test]
1239 fn udp_fail_other_task() {
1240 let addr = next_test_ip4();
1241 let (tx, rx) = channel();
1242
1243 // force the handle to be created on a different scheduler, failure in
1244 // the original task will force a homing operation back to this
1245 // scheduler.
1246 spawn(proc() {
1247 let w = UdpWatcher::bind(local_loop(), addr).unwrap();
1248 tx.send(w);
1249 });
1250
1251 let _w = rx.recv();
1252 fail!();
1253 }
1254 }
librustuv/net.rs:491:1-491:1 -struct- definition:
struct UdpRecvCtx {
task: Option<BlockedTask>,
buf: Option<Buf>,
references:- 4556: 0 => {
557: let mut cx = UdpRecvCtx {
558: task: None,
--
588: let cx = unsafe {
589: &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
590: };
--
777: uvll::set_data_for_uv_handle(handle, 0 as *int);
778: &mut *(data as *mut UdpRecvCtx)
779: };
librustuv/net.rs:127:1-127:1 -fn- definition:
fn socket_name(sk: SocketNameKind,
handle: *c_void) -> Result<ip::SocketAddr, IoError> {
let getsockname = match sk {
references:- 5389: let _m = self.fire_homing_missile();
390: socket_name(Tcp, self.handle)
391: }
--
443: let _m = self.fire_homing_missile();
444: socket_name(Tcp, self.listener.handle)
445: }
--
540: let _m = self.fire_homing_missile();
541: socket_name(Udp, self.handle)
542: }
librustuv/net.rs:174:1-174:1 -struct- definition:
pub struct TcpAcceptor {
listener: Box<TcpListener>,
timeout: AcceptTimeout,
references:- 4437: impl HomingIO for TcpAcceptor {
438: fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
--
448: impl rtio::RtioTcpAcceptor for TcpAcceptor {
449: fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream:Send>, IoError> {
librustuv/net.rs:38:1-38:1 -fn- definition:
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
len: uint) -> ip::SocketAddr {
match storage.ss_family as c_int {
references:- 3143: } {
144: 0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
145: n => Err(uv_error_to_io_error(UvError(n)))
librustuv/addrinfo.rs:
142: loop {
143: let rustaddr = net::sockaddr_to_addr(cast::transmute((*addr).ai_addr),
144: (*addr).ai_addrlen as uint);
librustuv/net.rs:
604: let len = mem::size_of::<libc::sockaddr_storage>();
605: Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
606: };
librustuv/net.rs:35:1-35:1 -fn- definition:
pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
references:- 10111: htons(g),
112: htons(h),
113: ]
librustuv/net.rs:166:1-166:1 -struct- definition:
pub struct TcpListener {
home: HomeHandle,
handle: *uvll::uv_pipe_t,
references:- 10359: let (tx, rx) = channel();
360: let l = box TcpListener {
361: home: io.make_handle(),
--
383: impl UvHandle<uvll::uv_tcp_t> for TcpListener {
384: fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
--
412: assert!(status != uvll::ECANCELED);
413: let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
414: let msg = match status {
--
428: impl Drop for TcpListener {
429: fn drop(&mut self) {
librustuv/net.rs:813:4-813:4 -struct- definition:
struct Ctx {
slot: Option<BlockedTask>,
status: c_int,
references:- 2821: req.defuse(); // uv callback now owns this request
822: let mut cx = Ctx { slot: None, status: 0 };
--
835: assert!(status != uvll::ECANCELED);
836: let cx: &mut Ctx = unsafe { req.get_data() };
837: cx.status = status;
librustuv/net.rs:152:1-152:1 -struct- definition:
pub struct TcpWatcher {
handle: *uvll::uv_tcp_t,
stream: StreamWatcher,
references:- 11192: }, 0);
193: TcpWatcher {
194: home: home,
--
341: impl Drop for TcpWatcher {
342: fn drop(&mut self) {
librustuv/net.rs:36:48-36:48 -fn- definition:
pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
references:- 1068: let g = ntohs(storage.sin6_addr.s6_addr[6]);
69: let h = ntohs(storage.sin6_addr.s6_addr[7]);
70: ip::SocketAddr {
71: ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
72: port: ntohs(storage.sin6_port),
73: }
librustuv/net.rs:497:1-497:1 -struct- definition:
struct UdpSendCtx {
result: c_int,
data: Option<Vec<u8>>,
references:- 4634: req.defuse(); // uv callback now owns this request
635: let mut cx = UdpSendCtx {
636: result: uvll::ECANCELED, data: data, udp: self as *mut _
--
647: }
648: let new_cx = box UdpSendCtx {
649: result: 0,
--
665: let req = Request::wrap(req);
666: let cx: &mut UdpSendCtx = unsafe { req.get_data() };
667: cx.result = status;
--
672: } else {
673: let _cx: Box<UdpSendCtx> = unsafe { cast::transmute(cx) };
674: }
librustuv/net.rs:811:1-811:1 -fn- definition:
pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
struct Ctx {
slot: Option<BlockedTask>,
references:- 2303: let _m = self.fire_homing_missile();
304: shutdown(self.handle, &self.uv_loop())
305: }
librustuv/pipe.rs:
172: let _m = self.fire_homing_missile();
173: net::shutdown(self.stream.handle, &self.uv_loop())
174: }
librustuv/net.rs:479:1-479:1 -struct- definition:
pub struct UdpWatcher {
handle: *uvll::uv_udp_t,
home: HomeHandle,
references:- 13506: -> Result<UdpWatcher, UvError> {
507: let udp = UdpWatcher {
508: handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
--
747: fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
748: box UdpWatcher {
749: handle: self.handle,
--
791: fn cancel_write(stream: uint) -> Option<BlockedTask> {
792: let stream: &mut UdpWatcher = unsafe { cast::transmute(stream) };
793: stream.blocked_sender.take()
--
798: impl Drop for UdpWatcher {
799: fn drop(&mut self) {
librustuv/net.rs:80:1-80:1 -fn- definition:
fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
unsafe {
let mut storage: libc::sockaddr_storage = mem::init();
references:- 4207: let cx = ConnectCtx { status: -1, task: None, timer: None };
208: let (addr, _len) = addr_to_sockaddr(address);
209: let addr_p = &addr as *_ as *libc::sockaddr;
--
366: };
367: let (addr, _len) = addr_to_sockaddr(address);
368: let res = unsafe {
--
517: }, 0);
518: let (addr, _len) = addr_to_sockaddr(address);
519: let result = unsafe {
--
617: let mut req = Request::new(uvll::UV_UDP_SEND);
618: let (addr, _len) = addr_to_sockaddr(dst);
619: let addr_p = &addr as *_ as *libc::sockaddr;