(index<- ) ./libstd/rt/uv/net.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::*;
12 use libc::{size_t, ssize_t, c_int, c_void, c_uint};
13 use rt::uv::uvll;
14 use rt::uv::uvll::*;
15 use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
16 use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
17 status_to_maybe_uv_error, vec_to_uv_buf};
18 use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
19 use vec;
20 use str;
21 use from_str::{FromStr};
22
23 pub struct UvAddrInfo(*uvll::addrinfo);
24
25 pub enum UvSocketAddr {
26 UvIpv4SocketAddr(*sockaddr_in),
27 UvIpv6SocketAddr(*sockaddr_in6),
28 }
29
30 fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
31 unsafe {
32 assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
33 assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
34 match addr {
35 _ if is_ip4_addr(addr) => UvIpv4SocketAddr(addr as *uvll::sockaddr_in),
36 _ if is_ip6_addr(addr) => UvIpv6SocketAddr(addr as *uvll::sockaddr_in6),
37 _ => fail2!(),
38 }
39 }
40 }
41
42 fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
43 let malloc = match addr.ip {
44 Ipv4Addr(*) => malloc_ip4_addr,
45 Ipv6Addr(*) => malloc_ip6_addr,
46 };
47 let wrap = match addr.ip {
48 Ipv4Addr(*) => UvIpv4SocketAddr,
49 Ipv6Addr(*) => UvIpv6SocketAddr,
50 };
51 let free = match addr.ip {
52 Ipv4Addr(*) => free_ip4_addr,
53 Ipv6Addr(*) => free_ip6_addr,
54 };
55
56 let addr = unsafe { malloc(addr.ip.to_str(), addr.port as int) };
57 do (|| {
58 f(wrap(addr))
59 }).finally {
60 unsafe { free(addr) };
61 }
62 }
63
64 fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
65 let ip_size = match addr {
66 UvIpv4SocketAddr(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
67 UvIpv6SocketAddr(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
68 };
69 let ip_name = {
70 let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
71 unsafe {
72 let buf_ptr = vec::raw::to_ptr(buf);
73 match addr {
74 UvIpv4SocketAddr(addr) => uvll::ip4_name(addr, buf_ptr, ip_size as size_t),
75 UvIpv6SocketAddr(addr) => uvll::ip6_name(addr, buf_ptr, ip_size as size_t),
76 }
77 };
78 buf
79 };
80 let ip_port = unsafe {
81 let port = match addr {
82 UvIpv4SocketAddr(addr) => uvll::ip4_port(addr),
83 UvIpv6SocketAddr(addr) => uvll::ip6_port(addr),
84 };
85 port as u16
86 };
87 let ip_str = str::from_utf8_slice(ip_name).trim_right_chars(&'\x00');
88 let ip_addr = FromStr::from_str(ip_str).unwrap();
89
90 // finally run the closure
91 f(SocketAddr { ip: ip_addr, port: ip_port })
92 }
93
94 pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
95 use util;
96 uv_socket_addr_as_socket_addr(addr, util::id)
97 }
98
99 // Traverse the addrinfo linked list, producing a vector of Rust socket addresses
100 pub fn accum_sockaddrs(addr: &UvAddrInfo) -> ~[SocketAddr] {
101 unsafe {
102 let &UvAddrInfo(addr) = addr;
103 let mut addr = addr;
104
105 let mut addrs = ~[];
106 loop {
107 let uvaddr = sockaddr_to_UvSocketAddr((*addr).ai_addr);
108 let rustaddr = uv_socket_addr_to_socket_addr(uvaddr);
109 addrs.push(rustaddr);
110 if (*addr).ai_next.is_not_null() {
111 addr = (*addr).ai_next;
112 } else {
113 break;
114 }
115 }
116
117 return addrs;
118 }
119 }
120
121 #[cfg(test)]
122 #[test]
123 fn test_ip4_conversion() {
124 use rt;
125 let ip4 = rt::test::next_test_ip4();
126 assert_eq!(ip4, socket_addr_as_uv_socket_addr(ip4, uv_socket_addr_to_socket_addr));
127 }
128
129 #[cfg(test)]
130 #[test]
131 fn test_ip6_conversion() {
132 use rt;
133 let ip6 = rt::test::next_test_ip6();
134 assert_eq!(ip6, socket_addr_as_uv_socket_addr(ip6, uv_socket_addr_to_socket_addr));
135 }
136
137 // uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
138 // and uv_file_t
139 pub struct StreamWatcher(*uvll::uv_stream_t);
140 impl Watcher for StreamWatcher { }
141
142 impl StreamWatcher {
143 pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
144 {
145 let data = self.get_watcher_data();
146 data.alloc_cb = Some(alloc);
147 data.read_cb = Some(cb);
148 }
149
150 let ret = unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb) };
151
152 if ret != 0 {
153 // uvll::read_start failed, so read_cb will not be called.
154 // Call it manually for scheduling.
155 call_read_cb(self.native_handle(), ret as ssize_t);
156 }
157
158 fn call_read_cb(stream: *uvll::uv_stream_t, errno: ssize_t) {
159 #[fixed_stack_segment]; #[inline(never)];
160 read_cb(stream, errno, vec_to_uv_buf(~[]));
161 }
162
163 extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
164 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
165 let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
166 return (*alloc_cb)(suggested_size as uint);
167 }
168
169 extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
170 rtdebug!("buf addr: {}", buf.base);
171 rtdebug!("buf len: {}", buf.len);
172 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
173 let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
174 let status = status_to_maybe_uv_error(nread as c_int);
175 (*cb)(stream_watcher, nread as int, buf, status);
176 }
177 }
178
179 pub fn read_stop(&mut self) {
180 // It would be nice to drop the alloc and read callbacks here,
181 // but read_stop may be called from inside one of them and we
182 // would end up freeing the in-use environment
183 let handle = self.native_handle();
184 unsafe { uvll::read_stop(handle); }
185 }
186
187 pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
188 {
189 let data = self.get_watcher_data();
190 assert!(data.write_cb.is_none());
191 data.write_cb = Some(cb);
192 }
193
194 let req = WriteRequest::new();
195 unsafe {
196 assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
197 }
198
199 extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
200 let write_request: WriteRequest = NativeHandle::from_native_handle(req);
201 let mut stream_watcher = write_request.stream();
202 write_request.delete();
203 let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
204 let status = status_to_maybe_uv_error(status);
205 cb(stream_watcher, status);
206 }
207 }
208
209 pub fn accept(&mut self, stream: StreamWatcher) {
210 let self_handle = self.native_handle() as *c_void;
211 let stream_handle = stream.native_handle() as *c_void;
212 assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
213 }
214
215 pub fn close(self, cb: NullCallback) {
216 {
217 let mut this = self;
218 let data = this.get_watcher_data();
219 assert!(data.close_cb.is_none());
220 data.close_cb = Some(cb);
221 }
222
223 unsafe { uvll::close(self.native_handle(), close_cb); }
224
225 extern fn close_cb(handle: *uvll::uv_stream_t) {
226 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
227 let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
228 stream_watcher.drop_watcher_data();
229 unsafe { free_handle(handle as *c_void) }
230 cb();
231 }
232 }
233 }
234
235 impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
236 fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
237 StreamWatcher(handle)
238 }
239 fn native_handle(&self) -> *uvll::uv_stream_t {
240 match self { &StreamWatcher(ptr) => ptr }
241 }
242 }
243
244 pub struct TcpWatcher(*uvll::uv_tcp_t);
245 impl Watcher for TcpWatcher { }
246
247 impl TcpWatcher {
248 pub fn new(loop_: &Loop) -> TcpWatcher {
249 unsafe {
250 let handle = malloc_handle(UV_TCP);
251 assert!(handle.is_not_null());
252 assert_eq!(0, uvll::tcp_init(loop_.native_handle(), handle));
253 let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
254 watcher.install_watcher_data();
255 return watcher;
256 }
257 }
258
259 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
260 do socket_addr_as_uv_socket_addr(address) |addr| {
261 let result = unsafe {
262 match addr {
263 UvIpv4SocketAddr(addr) => uvll::tcp_bind(self.native_handle(), addr),
264 UvIpv6SocketAddr(addr) => uvll::tcp_bind6(self.native_handle(), addr),
265 }
266 };
267 match result {
268 0 => Ok(()),
269 _ => Err(UvError(result)),
270 }
271 }
272 }
273
274 pub fn connect(&mut self, address: SocketAddr, cb: ConnectionCallback) {
275 unsafe {
276 assert!(self.get_watcher_data().connect_cb.is_none());
277 self.get_watcher_data().connect_cb = Some(cb);
278
279 let connect_handle = ConnectRequest::new().native_handle();
280 rtdebug!("connect_t: {}", connect_handle);
281 do socket_addr_as_uv_socket_addr(address) |addr| {
282 let result = match addr {
283 UvIpv4SocketAddr(addr) => uvll::tcp_connect(connect_handle,
284 self.native_handle(), addr, connect_cb),
285 UvIpv6SocketAddr(addr) => uvll::tcp_connect6(connect_handle,
286 self.native_handle(), addr, connect_cb),
287 };
288 assert_eq!(0, result);
289 }
290
291 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
292 rtdebug!("connect_t: {}", req);
293 let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
294 let mut stream_watcher = connect_request.stream();
295 connect_request.delete();
296 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
297 let status = status_to_maybe_uv_error(status);
298 cb(stream_watcher, status);
299 }
300 }
301 }
302
303 pub fn listen(&mut self, cb: ConnectionCallback) {
304 {
305 let data = self.get_watcher_data();
306 assert!(data.connect_cb.is_none());
307 data.connect_cb = Some(cb);
308 }
309
310 unsafe {
311 static BACKLOG: c_int = 128; // XXX should be configurable
312 // XXX: This can probably fail
313 assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
314 }
315
316 extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
317 rtdebug!("connection_cb");
318 let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
319 let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
320 let status = status_to_maybe_uv_error(status);
321 (*cb)(stream_watcher, status);
322 }
323 }
324
325 pub fn as_stream(&self) -> StreamWatcher {
326 NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t)
327 }
328 }
329
330 impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
331 fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
332 TcpWatcher(handle)
333 }
334 fn native_handle(&self) -> *uvll::uv_tcp_t {
335 match self { &TcpWatcher(ptr) => ptr }
336 }
337 }
338
339 pub struct UdpWatcher(*uvll::uv_udp_t);
340 impl Watcher for UdpWatcher { }
341
342 impl UdpWatcher {
343 pub fn new(loop_: &Loop) -> UdpWatcher {
344 unsafe {
345 let handle = malloc_handle(UV_UDP);
346 assert!(handle.is_not_null());
347 assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
348 let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
349 watcher.install_watcher_data();
350 return watcher;
351 }
352 }
353
354 pub fn bind(&mut self, address: SocketAddr) -> Result<(), UvError> {
355 do socket_addr_as_uv_socket_addr(address) |addr| {
356 let result = unsafe {
357 match addr {
358 UvIpv4SocketAddr(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
359 UvIpv6SocketAddr(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
360 }
361 };
362 match result {
363 0 => Ok(()),
364 _ => Err(UvError(result)),
365 }
366 }
367 }
368
369 pub fn recv_start(&mut self, alloc: AllocCallback, cb: UdpReceiveCallback) {
370 {
371 let data = self.get_watcher_data();
372 data.alloc_cb = Some(alloc);
373 data.udp_recv_cb = Some(cb);
374 }
375
376 unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
377
378 extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
379 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
380 let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
381 return (*alloc_cb)(suggested_size as uint);
382 }
383
384 extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
385 addr: *uvll::sockaddr, flags: c_uint) {
386 // When there's no data to read the recv callback can be a no-op.
387 // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
388 // this we just drop back to kqueue and wait for the next callback.
389 if nread == 0 {
390 return;
391 }
392
393 rtdebug!("buf addr: {}", buf.base);
394 rtdebug!("buf len: {}", buf.len);
395 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
396 let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
397 let status = status_to_maybe_uv_error(nread as c_int);
398 let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
399 (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
400 }
401 }
402
403 pub fn recv_stop(&mut self) {
404 unsafe { uvll::udp_recv_stop(self.native_handle()); }
405 }
406
407 pub fn send(&mut self, buf: Buf, address: SocketAddr, cb: UdpSendCallback) {
408 {
409 let data = self.get_watcher_data();
410 assert!(data.udp_send_cb.is_none());
411 data.udp_send_cb = Some(cb);
412 }
413
414 let req = UdpSendRequest::new();
415 do socket_addr_as_uv_socket_addr(address) |addr| {
416 let result = unsafe {
417 match addr {
418 UvIpv4SocketAddr(addr) => uvll::udp_send(req.native_handle(),
419 self.native_handle(), [buf], addr, send_cb),
420 UvIpv6SocketAddr(addr) => uvll::udp_send6(req.native_handle(),
421 self.native_handle(), [buf], addr, send_cb),
422 }
423 };
424 assert_eq!(0, result);
425 }
426
427 extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
428 let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
429 let mut udp_watcher = send_request.handle();
430 send_request.delete();
431 let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
432 let status = status_to_maybe_uv_error(status);
433 cb(udp_watcher, status);
434 }
435 }
436
437 pub fn close(self, cb: NullCallback) {
438 {
439 let mut this = self;
440 let data = this.get_watcher_data();
441 assert!(data.close_cb.is_none());
442 data.close_cb = Some(cb);
443 }
444
445 unsafe { uvll::close(self.native_handle(), close_cb); }
446
447 extern fn close_cb(handle: *uvll::uv_udp_t) {
448 let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
449 let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
450 udp_watcher.drop_watcher_data();
451 unsafe { free_handle(handle as *c_void) }
452 cb();
453 }
454 }
455 }
456
457 impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
458 fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
459 UdpWatcher(handle)
460 }
461 fn native_handle(&self) -> *uvll::uv_udp_t {
462 match self { &UdpWatcher(ptr) => ptr }
463 }
464 }
465
466 // uv_connect_t is a subclass of uv_req_t
467 struct ConnectRequest(*uvll::uv_connect_t);
468 impl Request for ConnectRequest { }
469
470 impl ConnectRequest {
471
472 fn new() -> ConnectRequest {
473 let connect_handle = unsafe { malloc_req(UV_CONNECT) };
474 assert!(connect_handle.is_not_null());
475 ConnectRequest(connect_handle as *uvll::uv_connect_t)
476 }
477
478 fn stream(&self) -> StreamWatcher {
479 unsafe {
480 let stream_handle = uvll::get_stream_handle_from_connect_req(self.native_handle());
481 NativeHandle::from_native_handle(stream_handle)
482 }
483 }
484
485 fn delete(self) {
486 unsafe { free_req(self.native_handle() as *c_void) }
487 }
488 }
489
490 impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
491 fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
492 ConnectRequest(handle)
493 }
494 fn native_handle(&self) -> *uvll::uv_connect_t {
495 match self { &ConnectRequest(ptr) => ptr }
496 }
497 }
498
499 pub struct WriteRequest(*uvll::uv_write_t);
500
501 impl Request for WriteRequest { }
502
503 impl WriteRequest {
504 pub fn new() -> WriteRequest {
505 let write_handle = unsafe { malloc_req(UV_WRITE) };
506 assert!(write_handle.is_not_null());
507 WriteRequest(write_handle as *uvll::uv_write_t)
508 }
509
510 pub fn stream(&self) -> StreamWatcher {
511 unsafe {
512 let stream_handle = uvll::get_stream_handle_from_write_req(self.native_handle());
513 NativeHandle::from_native_handle(stream_handle)
514 }
515 }
516
517 pub fn delete(self) {
518 unsafe { free_req(self.native_handle() as *c_void) }
519 }
520 }
521
522 impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
523 fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
524 WriteRequest(handle)
525 }
526 fn native_handle(&self) -> *uvll::uv_write_t {
527 match self { &WriteRequest(ptr) => ptr }
528 }
529 }
530
531 pub struct UdpSendRequest(*uvll::uv_udp_send_t);
532 impl Request for UdpSendRequest { }
533
534 impl UdpSendRequest {
535 pub fn new() -> UdpSendRequest {
536 let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
537 assert!(send_handle.is_not_null());
538 UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
539 }
540
541 pub fn handle(&self) -> UdpWatcher {
542 let send_request_handle = unsafe {
543 uvll::get_udp_handle_from_send_req(self.native_handle())
544 };
545 NativeHandle::from_native_handle(send_request_handle)
546 }
547
548 pub fn delete(self) {
549 unsafe { free_req(self.native_handle() as *c_void) }
550 }
551 }
552
553 impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
554 fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
555 UdpSendRequest(handle)
556 }
557 fn native_handle(&self) -> *uvll::uv_udp_send_t {
558 match self { &UdpSendRequest(ptr) => ptr }
559 }
560 }
561
562 #[cfg(test)]
563 mod test {
564 use super::*;
565 use util::ignore;
566 use cell::Cell;
567 use vec;
568 use unstable::run_in_bare_thread;
569 use rt::thread::Thread;
570 use rt::test::*;
571 use rt::uv::{Loop, AllocCallback};
572 use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
573 use prelude::*;
574
575 #[test]
576 fn connect_close_ip4() {
577 do run_in_bare_thread() {
578 let mut loop_ = Loop::new();
579 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
580 // Connect to a port where nobody is listening
581 let addr = next_test_ip4();
582 do tcp_watcher.connect(addr) |stream_watcher, status| {
583 rtdebug!("tcp_watcher.connect!");
584 assert!(status.is_some());
585 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
586 stream_watcher.close(||());
587 }
588 loop_.run();
589 loop_.close();
590 }
591 }
592
593 #[test]
594 fn connect_close_ip6() {
595 do run_in_bare_thread() {
596 let mut loop_ = Loop::new();
597 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
598 // Connect to a port where nobody is listening
599 let addr = next_test_ip6();
600 do tcp_watcher.connect(addr) |stream_watcher, status| {
601 rtdebug!("tcp_watcher.connect!");
602 assert!(status.is_some());
603 assert_eq!(status.unwrap().name(), ~"ECONNREFUSED");
604 stream_watcher.close(||());
605 }
606 loop_.run();
607 loop_.close();
608 }
609 }
610
611 #[test]
612 fn udp_bind_close_ip4() {
613 do run_in_bare_thread() {
614 let mut loop_ = Loop::new();
615 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
616 let addr = next_test_ip4();
617 udp_watcher.bind(addr);
618 udp_watcher.close(||());
619 loop_.run();
620 loop_.close();
621 }
622 }
623
624 #[test]
625 fn udp_bind_close_ip6() {
626 do run_in_bare_thread() {
627 let mut loop_ = Loop::new();
628 let mut udp_watcher = { UdpWatcher::new(&mut loop_) };
629 let addr = next_test_ip6();
630 udp_watcher.bind(addr);
631 udp_watcher.close(||());
632 loop_.run();
633 loop_.close();
634 }
635 }
636
637 #[test]
638 fn listen_ip4() {
639 do run_in_bare_thread() {
640 static MAX: int = 10;
641 let mut loop_ = Loop::new();
642 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
643 let addr = next_test_ip4();
644 server_tcp_watcher.bind(addr);
645 let loop_ = loop_;
646 rtdebug!("listening");
647 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
648 rtdebug!("listened!");
649 assert!(status.is_none());
650 let mut loop_ = loop_;
651 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
652 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
653 server_stream_watcher.accept(client_tcp_watcher);
654 let count_cell = Cell::new(0);
655 let server_stream_watcher = server_stream_watcher;
656 rtdebug!("starting read");
657 let alloc: AllocCallback = |size| {
658 vec_to_uv_buf(vec::from_elem(size, 0u8))
659 };
660 do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
661
662 rtdebug!("i'm reading!");
663 let buf = vec_from_uv_buf(buf);
664 let mut count = count_cell.take();
665 if status.is_none() {
666 rtdebug!("got {} bytes", nread);
667 let buf = buf.unwrap();
668 for byte in buf.slice(0, nread as uint).iter() {
669 assert!(*byte == count as u8);
670 rtdebug!("{}", *byte as uint);
671 count += 1;
672 }
673 } else {
674 assert_eq!(count, MAX);
675 do stream_watcher.close {
676 server_stream_watcher.close(||());
677 }
678 }
679 count_cell.put_back(count);
680 }
681 }
682
683 let client_thread = do Thread::start {
684 rtdebug!("starting client thread");
685 let mut loop_ = Loop::new();
686 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
687 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
688 rtdebug!("connecting");
689 assert!(status.is_none());
690 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
691 let buf = slice_to_uv_buf(msg);
692 let msg_cell = Cell::new(msg);
693 do stream_watcher.write(buf) |stream_watcher, status| {
694 rtdebug!("writing");
695 assert!(status.is_none());
696 let msg_cell = Cell::new(msg_cell.take());
697 stream_watcher.close(||ignore(msg_cell.take()));
698 }
699 }
700 loop_.run();
701 loop_.close();
702 };
703
704 let mut loop_ = loop_;
705 loop_.run();
706 loop_.close();
707 client_thread.join();
708 }
709 }
710
711 #[test]
712 fn listen_ip6() {
713 do run_in_bare_thread() {
714 static MAX: int = 10;
715 let mut loop_ = Loop::new();
716 let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
717 let addr = next_test_ip6();
718 server_tcp_watcher.bind(addr);
719 let loop_ = loop_;
720 rtdebug!("listening");
721 do server_tcp_watcher.listen |mut server_stream_watcher, status| {
722 rtdebug!("listened!");
723 assert!(status.is_none());
724 let mut loop_ = loop_;
725 let client_tcp_watcher = TcpWatcher::new(&mut loop_);
726 let mut client_tcp_watcher = client_tcp_watcher.as_stream();
727 server_stream_watcher.accept(client_tcp_watcher);
728 let count_cell = Cell::new(0);
729 let server_stream_watcher = server_stream_watcher;
730 rtdebug!("starting read");
731 let alloc: AllocCallback = |size| {
732 vec_to_uv_buf(vec::from_elem(size, 0u8))
733 };
734 do client_tcp_watcher.read_start(alloc)
735 |stream_watcher, nread, buf, status| {
736
737 rtdebug!("i'm reading!");
738 let buf = vec_from_uv_buf(buf);
739 let mut count = count_cell.take();
740 if status.is_none() {
741 rtdebug!("got {} bytes", nread);
742 let buf = buf.unwrap();
743 let r = buf.slice(0, nread as uint);
744 for byte in r.iter() {
745 assert!(*byte == count as u8);
746 rtdebug!("{}", *byte as uint);
747 count += 1;
748 }
749 } else {
750 assert_eq!(count, MAX);
751 do stream_watcher.close {
752 server_stream_watcher.close(||());
753 }
754 }
755 count_cell.put_back(count);
756 }
757 }
758
759 let client_thread = do Thread::start {
760 rtdebug!("starting client thread");
761 let mut loop_ = Loop::new();
762 let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
763 do tcp_watcher.connect(addr) |mut stream_watcher, status| {
764 rtdebug!("connecting");
765 assert!(status.is_none());
766 let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
767 let buf = slice_to_uv_buf(msg);
768 let msg_cell = Cell::new(msg);
769 do stream_watcher.write(buf) |stream_watcher, status| {
770 rtdebug!("writing");
771 assert!(status.is_none());
772 let msg_cell = Cell::new(msg_cell.take());
773 stream_watcher.close(||ignore(msg_cell.take()));
774 }
775 }
776 loop_.run();
777 loop_.close();
778 };
779
780 let mut loop_ = loop_;
781 loop_.run();
782 loop_.close();
783 client_thread.join();
784 }
785 }
786
787 #[test]
788 fn udp_recv_ip4() {
789 do run_in_bare_thread() {
790 static MAX: int = 10;
791 let mut loop_ = Loop::new();
792 let server_addr = next_test_ip4();
793 let client_addr = next_test_ip4();
794
795 let mut server = UdpWatcher::new(&loop_);
796 assert!(server.bind(server_addr).is_ok());
797
798 rtdebug!("starting read");
799 let alloc: AllocCallback = |size| {
800 vec_to_uv_buf(vec::from_elem(size, 0u8))
801 };
802
803 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
804 server.recv_stop();
805 rtdebug!("i'm reading!");
806 assert!(status.is_none());
807 assert_eq!(flags, 0);
808 assert_eq!(src, client_addr);
809
810 let buf = vec_from_uv_buf(buf);
811 let mut count = 0;
812 rtdebug!("got {} bytes", nread);
813
814 let buf = buf.unwrap();
815 for &byte in buf.slice(0, nread as uint).iter() {
816 assert!(byte == count as u8);
817 rtdebug!("{}", byte as uint);
818 count += 1;
819 }
820 assert_eq!(count, MAX);
821
822 server.close(||{});
823 }
824
825 let thread = do Thread::start {
826 let mut loop_ = Loop::new();
827 let mut client = UdpWatcher::new(&loop_);
828 assert!(client.bind(client_addr).is_ok());
829 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
830 let buf = slice_to_uv_buf(msg);
831 do client.send(buf, server_addr) |client, status| {
832 rtdebug!("writing");
833 assert!(status.is_none());
834 client.close(||{});
835 }
836
837 loop_.run();
838 loop_.close();
839 };
840
841 loop_.run();
842 loop_.close();
843 thread.join();
844 }
845 }
846
847 #[test]
848 fn udp_recv_ip6() {
849 do run_in_bare_thread() {
850 static MAX: int = 10;
851 let mut loop_ = Loop::new();
852 let server_addr = next_test_ip6();
853 let client_addr = next_test_ip6();
854
855 let mut server = UdpWatcher::new(&loop_);
856 assert!(server.bind(server_addr).is_ok());
857
858 rtdebug!("starting read");
859 let alloc: AllocCallback = |size| {
860 vec_to_uv_buf(vec::from_elem(size, 0u8))
861 };
862
863 do server.recv_start(alloc) |mut server, nread, buf, src, flags, status| {
864 server.recv_stop();
865 rtdebug!("i'm reading!");
866 assert!(status.is_none());
867 assert_eq!(flags, 0);
868 assert_eq!(src, client_addr);
869
870 let buf = vec_from_uv_buf(buf);
871 let mut count = 0;
872 rtdebug!("got {} bytes", nread);
873
874 let buf = buf.unwrap();
875 for &byte in buf.slice(0, nread as uint).iter() {
876 assert!(byte == count as u8);
877 rtdebug!("{}", byte as uint);
878 count += 1;
879 }
880 assert_eq!(count, MAX);
881
882 server.close(||{});
883 }
884
885 let thread = do Thread::start {
886 let mut loop_ = Loop::new();
887 let mut client = UdpWatcher::new(&loop_);
888 assert!(client.bind(client_addr).is_ok());
889 let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
890 let buf = slice_to_uv_buf(msg);
891 do client.send(buf, server_addr) |client, status| {
892 rtdebug!("writing");
893 assert!(status.is_none());
894 client.close(||{});
895 }
896
897 loop_.run();
898 loop_.close();
899 };
900
901 loop_.run();
902 loop_.close();
903 thread.join();
904 }
905 }
906 }
libstd/rt/uv/net.rs:138:17-138:17 -struct- definition:
// and uv_file_t
pub struct StreamWatcher(*uvll::uv_stream_t);
references:-142: impl StreamWatcher {
235: impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
164: let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
318: let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
478: fn stream(&self) -> StreamWatcher {
236: fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
172: let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
325: pub fn as_stream(&self) -> StreamWatcher {
209: pub fn accept(&mut self, stream: StreamWatcher) {
510: pub fn stream(&self) -> StreamWatcher {
140: impl Watcher for StreamWatcher { }
226: let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
libstd/rt/uv/pipe.rs:
36: pub fn as_stream(&self) -> net::StreamWatcher {
libstd/rt/uv/mod.rs:
132: pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
129: pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
libstd/rt/uv/uvio.rs:
922: fn read_stream(mut watcher: StreamWatcher,
962: fn write_stream(mut watcher: StreamWatcher,
libstd/rt/uv/net.rs:338:1-338:1 -struct- definition:
pub struct UdpWatcher(*uvll::uv_udp_t);
references:-343: pub fn new(loop_: &Loop) -> UdpWatcher {
457: impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
458: fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
348: let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
379: let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
340: impl Watcher for UdpWatcher { }
395: let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
342: impl UdpWatcher {
448: let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
541: pub fn handle(&self) -> UdpWatcher {
libstd/rt/uv/mod.rs:
138: pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
139: pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>);
libstd/rt/uv/uvio.rs:
1143: watcher: UdpWatcher,
libstd/rt/uv/net.rs:29:1-29:1 -fn- definition:
fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
references:-107: let uvaddr = sockaddr_to_UvSocketAddr((*addr).ai_addr);
398: let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
libstd/rt/uv/net.rs:93:1-93:1 -fn- definition:
pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
references:-398: let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
108: let rustaddr = uv_socket_addr_to_socket_addr(uvaddr);
libstd/rt/uv/uvio.rs:
171: net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
169: net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
libstd/rt/uv/net.rs:498:1-498:1 -struct- definition:
pub struct WriteRequest(*uvll::uv_write_t);
references:-523: fn from_native_handle(handle: *uvll:: uv_write_t) -> WriteRequest {
503: impl WriteRequest {
504: pub fn new() -> WriteRequest {
200: let write_request: WriteRequest = NativeHandle::from_native_handle(req);
501: impl Request for WriteRequest { }
522: impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
libstd/rt/uv/net.rs:22:1-22:1 -struct- definition:
pub struct UvAddrInfo(*uvll::addrinfo);
references:-100: pub fn accum_sockaddrs(addr: &UvAddrInfo) -> ~[SocketAddr] {
libstd/rt/uv/addrinfo.rs:
23: type GetAddrInfoCallback = ~fn(GetAddrInfoRequest, &UvAddrInfo, Option<UvError>);
41: service: Option<&str>, hints: Option<UvAddrInfo>,
libstd/rt/uv/net.rs:384:8-384:8 -fn- definition:
extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
addr: *uvll::sockaddr, flags: c_uint) {
references:-376: unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
libstd/rt/uv/net.rs:63:1-63:1 -fn- definition:
fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
references:-96: uv_socket_addr_as_socket_addr(addr, util::id)
libstd/rt/uv/net.rs:158:8-158:8 -fn- definition:
fn call_read_cb(stream: *uvll::uv_stream_t, errno: ssize_t) {
#[fixed_stack_segment]; #[inline(never)];
references:-155: call_read_cb(self.native_handle(), ret as ssize_t);
libstd/rt/uv/net.rs:163:8-163:8 -fn- definition:
extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
references:-150: let ret = unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb) };
libstd/rt/uv/net.rs:378:8-378:8 -fn- definition:
extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
references:-376: unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
libstd/rt/uv/net.rs:169:8-169:8 -fn- definition:
extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
rtdebug!("buf addr: {}", buf.base);
references:-160: read_cb(stream, errno, vec_to_uv_buf(~[]));
150: let ret = unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb) };
libstd/rt/uv/net.rs:41:1-41:1 -fn- definition:
fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
references:-415: do socket_addr_as_uv_socket_addr(address) |addr| {
355: do socket_addr_as_uv_socket_addr(address) |addr| {
281: do socket_addr_as_uv_socket_addr(address) |addr| {
260: do socket_addr_as_uv_socket_addr(address) |addr| {
libstd/rt/uv/net.rs:530:1-530:1 -struct- definition:
pub struct UdpSendRequest(*uvll::uv_udp_send_t);
references:-554: fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
553: impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
534: impl UdpSendRequest {
428: let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
535: pub fn new() -> UdpSendRequest {
532: impl Request for UdpSendRequest { }
libstd/rt/uv/net.rs:291:12-291:12 -fn- definition:
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
rtdebug!("connect_t: {}", req);
references:-286: self.native_handle(), addr, connect_cb),
284: self.native_handle(), addr, connect_cb),
libstd/rt/uv/net.rs:225:8-225:8 -fn- definition:
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
references:-223: unsafe { uvll::close(self.native_handle(), close_cb); }
libstd/rt/uv/net.rs:24:1-24:1 -enum- definition:
pub enum UvSocketAddr {
references:-64: fn uv_socket_addr_as_socket_addr<T>(addr: UvSocketAddr, f: &fn(SocketAddr) -> T) -> T {
94: pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr {
30: fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr {
42: fn socket_addr_as_uv_socket_addr<T>(addr: SocketAddr, f: &fn(UvSocketAddr) -> T) -> T {
libstd/rt/uv/net.rs:99:82-99:82 -fn- definition:
// Traverse the addrinfo linked list, producing a vector of Rust socket addresses
pub fn accum_sockaddrs(addr: &UvAddrInfo) -> ~[SocketAddr] {
references:-libstd/rt/uv/uvio.rs:
674: None => Ok(accum_sockaddrs(addrinfo).map(|addr| addr.ip.clone())),
libstd/rt/uv/net.rs:447:8-447:8 -fn- definition:
extern fn close_cb(handle: *uvll::uv_udp_t) {
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
references:-445: unsafe { uvll::close(self.native_handle(), close_cb); }
libstd/rt/uv/net.rs:316:8-316:8 -fn- definition:
extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
rtdebug!("connection_cb");
references:-313: assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
libstd/rt/uv/net.rs:427:8-427:8 -fn- definition:
extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
references:-419: self.native_handle(), [buf], addr, send_cb),
421: self.native_handle(), [buf], addr, send_cb),
libstd/rt/uv/net.rs:243:1-243:1 -struct- definition:
pub struct TcpWatcher(*uvll::uv_tcp_t);
references:-331: fn from_native_handle(handle: *uvll::uv_tcp_t) -> TcpWatcher {
247: impl TcpWatcher {
330: impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
245: impl Watcher for TcpWatcher { }
253: let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
248: pub fn new(loop_: &Loop) -> TcpWatcher {
libstd/rt/uv/uvio.rs:
804: watcher : TcpWatcher,
813: fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
1043: watcher: TcpWatcher,
libstd/rt/uv/net.rs:466:42-466:42 -struct- definition:
// uv_connect_t is a subclass of uv_req_t
struct ConnectRequest(*uvll::uv_connect_t);
references:-468: impl Request for ConnectRequest { }
470: impl ConnectRequest {
472: fn new() -> ConnectRequest {
490: impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
293: let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
491: fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
libstd/rt/uv/net.rs:199:8-199:8 -fn- definition:
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
let write_request: WriteRequest = NativeHandle::from_native_handle(req);
references:-196: assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));