(index<- ) ./libstd/rt/uv/uvio.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use c_str::ToCStr;
12 use cast::transmute;
13 use cast;
14 use cell::Cell;
15 use clone::Clone;
16 use libc::{c_int, c_uint, c_void, pid_t};
17 use ops::Drop;
18 use option::*;
19 use ptr;
20 use str;
21 use str::Str;
22 use result::*;
23 use rt::io::IoError;
24 use rt::io::net::ip::{SocketAddr, IpAddr};
25 use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
26 use rt::io::process::ProcessConfig;
27 use rt::kill::BlockedTask;
28 use rt::local::Local;
29 use rt::rtio::*;
30 use rt::sched::{Scheduler, SchedHandle};
31 use rt::tube::Tube;
32 use rt::task::SchedHome;
33 use rt::uv::*;
34 use rt::uv::idle::IdleWatcher;
35 use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr, accum_sockaddrs};
36 use rt::uv::addrinfo::GetAddrInfoRequest;
37 use unstable::sync::Exclusive;
38 use path::{GenericPath, Path};
39 use super::super::io::support::PathLike;
40 use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
41 S_IRUSR, S_IWUSR, S_IRWXU};
42 use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
43 CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite,
44 FileStat};
45 use task;
46
47 #[cfg(test)] use container::Container;
48 #[cfg(test)] use unstable::run_in_bare_thread;
49 #[cfg(test)] use rt::test::{spawntask,
50 next_test_ip4,
51 run_in_mt_newsched_task};
52 #[cfg(test)] use iter::{Iterator, range};
53 #[cfg(test)] use rt::comm::oneshot;
54
55 // XXX we should not be calling uvll functions in here.
56
57 trait HomingIO {
58
59 fn home<'r>(&'r mut self) -> &'r mut SchedHandle;
60
61 /* XXX This will move pinned tasks to do IO on the proper scheduler
62 * and then move them back to their home.
63 */
64 fn go_to_IO_home(&mut self) -> SchedHome {
65 use rt::sched::PinnedTask;
66
67 do task::unkillable { // FIXME(#8674)
68 let mut old = None;
69 {
70 let ptr = &mut old;
71 let scheduler: ~Scheduler = Local::take();
72 do scheduler.deschedule_running_task_and_then |_, task| {
73 /* FIXME(#8674) if the task was already killed then wake
74 * will return None. In that case, the home pointer will never be set.
75 *
76 * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
77 */
78 do task.wake().map |mut task| {
79 *ptr = Some(task.take_unwrap_home());
80 self.home().send(PinnedTask(task));
81 };
82 }
83 }
84 old.expect("No old home because task had already been killed.")
85 }
86 }
87
88 // XXX dummy self param
89 fn restore_original_home(_dummy_self: Option<Self>, old: SchedHome) {
90 use rt::sched::TaskFromFriend;
91
92 let old = Cell::new(old);
93 do task::unkillable { // FIXME(#8674)
94 let scheduler: ~Scheduler = Local::take();
95 do scheduler.deschedule_running_task_and_then |scheduler, task| {
96 /* FIXME(#8674) if the task was already killed then wake
97 * will return None. In that case, the home pointer will never be restored.
98 *
99 * RESOLUTION IDEA: Since the task is dead, we should just abort the IO action.
100 */
101 do task.wake().map |mut task| {
102 task.give_home(old.take());
103 scheduler.make_handle().send(TaskFromFriend(task));
104 };
105 }
106 }
107 }
108
109 fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
110 let home = self.go_to_IO_home();
111 let a = io(self); // do IO
112 HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
113 a // return the result of the IO
114 }
115
116 fn home_for_io_consume<A>(self, io: &fn(Self) -> A) -> A {
117 let mut this = self;
118 let home = this.go_to_IO_home();
119 let a = io(this); // do IO
120 HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
121 a // return the result of the IO
122 }
123
124 fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
125 let home = self.go_to_IO_home();
126 let a = do task::unkillable { // FIXME(#8674)
127 let scheduler: ~Scheduler = Local::take();
128 io_sched(self, scheduler) // do IO and scheduling action
129 };
130 HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
131 a // return result of IO
132 }
133 }
134
135 // get a handle for the current scheduler
136 macro_rules! get_handle_to_current_scheduler(
137 () => (do Local::borrow |sched: &mut Scheduler| { sched.make_handle() })
138 )
139
140 enum SocketNameKind {
141 TcpPeer,
142 Tcp,
143 Udp
144 }
145
146 fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
147 handle: U) -> Result<SocketAddr, IoError> {
148 let getsockname = match sk {
149 TcpPeer => uvll::tcp_getpeername,
150 Tcp => uvll::tcp_getsockname,
151 Udp => uvll::udp_getsockname,
152 };
153
154 // Allocate a sockaddr_storage
155 // since we don't know if it's ipv4 or ipv6
156 let r_addr = unsafe { uvll::malloc_sockaddr_storage() };
157
158 let r = unsafe {
159 getsockname(handle.native_handle() as *c_void, r_addr as *uvll::sockaddr_storage)
160 };
161
162 if r != 0 {
163 let status = status_to_maybe_uv_error(r);
164 return Err(uv_error_to_io_error(status.unwrap()));
165 }
166
167 let addr = unsafe {
168 if uvll::is_ip6_addr(r_addr as *uvll::sockaddr) {
169 net::uv_socket_addr_to_socket_addr(UvIpv6SocketAddr(r_addr as *uvll::sockaddr_in6))
170 } else {
171 net::uv_socket_addr_to_socket_addr(UvIpv4SocketAddr(r_addr as *uvll::sockaddr_in))
172 }
173 };
174
175 unsafe { uvll::free_sockaddr_storage(r_addr); }
176
177 Ok(addr)
178
179 }
180
181 // Obviously an Event Loop is always home.
182 pub struct UvEventLoop {
183 uvio: UvIoFactory
184 }
185
186 impl UvEventLoop {
187 pub fn new() -> UvEventLoop {
188 UvEventLoop {
189 uvio: UvIoFactory(Loop::new())
190 }
191 }
192 }
193
194 impl Drop for UvEventLoop {
195 fn drop(&mut self) {
196 self.uvio.uv_loop().close();
197 }
198 }
199
200 impl EventLoop for UvEventLoop {
201 fn run(&mut self) {
202 self.uvio.uv_loop().run();
203 }
204
205 fn callback(&mut self, f: ~fn()) {
206 let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
207 do idle_watcher.start |mut idle_watcher, status| {
208 assert!(status.is_none());
209 idle_watcher.stop();
210 idle_watcher.close(||());
211 f();
212 }
213 }
214
215 fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
216 let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
217 return ~UvPausibleIdleCallback {
218 watcher: idle_watcher,
219 idle_flag: false,
220 closed: false
221 };
222 }
223
224 fn callback_ms(&mut self, ms: u64, f: ~fn()) {
225 let mut timer = TimerWatcher::new(self.uvio.uv_loop());
226 do timer.start(ms, 0) |timer, status| {
227 assert!(status.is_none());
228 timer.close(||());
229 f();
230 }
231 }
232
233 fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
234 ~UvRemoteCallback::new(self.uvio.uv_loop(), f)
235 }
236
237 fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
238 Some(&mut self.uvio)
239 }
240 }
241
242 pub struct UvPausibleIdleCallback {
243 watcher: IdleWatcher,
244 idle_flag: bool,
245 closed: bool
246 }
247
248 impl UvPausibleIdleCallback {
249 #[inline]
250 pub fn start(&mut self, f: ~fn()) {
251 do self.watcher.start |_idle_watcher, _status| {
252 f();
253 };
254 self.idle_flag = true;
255 }
256 #[inline]
257 pub fn pause(&mut self) {
258 if self.idle_flag == true {
259 self.watcher.stop();
260 self.idle_flag = false;
261 }
262 }
263 #[inline]
264 pub fn resume(&mut self) {
265 if self.idle_flag == false {
266 self.watcher.restart();
267 self.idle_flag = true;
268 }
269 }
270 #[inline]
271 pub fn close(&mut self) {
272 self.pause();
273 if !self.closed {
274 self.closed = true;
275 self.watcher.close(||{});
276 }
277 }
278 }
279
280 #[test]
281 fn test_callback_run_once() {
282 do run_in_bare_thread {
283 let mut event_loop = UvEventLoop::new();
284 let mut count = 0;
285 let count_ptr: *mut int = &mut count;
286 do event_loop.callback {
287 unsafe { *count_ptr += 1 }
288 }
289 event_loop.run();
290 assert_eq!(count, 1);
291 }
292 }
293
294 // The entire point of async is to call into a loop from other threads so it does not need to home.
295 pub struct UvRemoteCallback {
296 // The uv async handle for triggering the callback
297 async: AsyncWatcher,
298 // A flag to tell the callback to exit, set from the dtor. This is
299 // almost never contested - only in rare races with the dtor.
300 exit_flag: Exclusive<bool>
301 }
302
303 impl UvRemoteCallback {
304 pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
305 let exit_flag = Exclusive::new(false);
306 let exit_flag_clone = exit_flag.clone();
307 let async = do AsyncWatcher::new(loop_) |watcher, status| {
308 assert!(status.is_none());
309
310 // The synchronization logic here is subtle. To review,
311 // the uv async handle type promises that, after it is
312 // triggered the remote callback is definitely called at
313 // least once. UvRemoteCallback needs to maintain those
314 // semantics while also shutting down cleanly from the
315 // dtor. In our case that means that, when the
316 // UvRemoteCallback dtor calls `async.send()`, here `f` is
317 // always called later.
318
319 // In the dtor both the exit flag is set and the async
320 // callback fired under a lock. Here, before calling `f`,
321 // we take the lock and check the flag. Because we are
322 // checking the flag before calling `f`, and the flag is
323 // set under the same lock as the send, then if the flag
324 // is set then we're guaranteed to call `f` after the
325 // final send.
326
327 // If the check was done after `f()` then there would be a
328 // period between that call and the check where the dtor
329 // could be called in the other thread, missing the final
330 // callback while still destroying the handle.
331
332 let should_exit = unsafe {
333 exit_flag_clone.with_imm(|&should_exit| should_exit)
334 };
335
336 f();
337
338 if should_exit {
339 watcher.close(||());
340 }
341
342 };
343 UvRemoteCallback {
344 async: async,
345 exit_flag: exit_flag
346 }
347 }
348 }
349
350 impl RemoteCallback for UvRemoteCallback {
351 fn fire(&mut self) { self.async.send() }
352 }
353
354 impl Drop for UvRemoteCallback {
355 fn drop(&mut self) {
356 unsafe {
357 let this: &mut UvRemoteCallback = cast::transmute_mut(self);
358 do this.exit_flag.with |should_exit| {
359 // NB: These two things need to happen atomically. Otherwise
360 // the event handler could wake up due to a *previous*
361 // signal and see the exit flag, destroying the handle
362 // before the final send.
363 *should_exit = true;
364 this.async.send();
365 }
366 }
367 }
368 }
369
370 #[cfg(test)]
371 mod test_remote {
372 use cell::Cell;
373 use rt::test::*;
374 use rt::thread::Thread;
375 use rt::tube::Tube;
376 use rt::rtio::EventLoop;
377 use rt::local::Local;
378 use rt::sched::Scheduler;
379
380 #[test]
381 fn test_uv_remote() {
382 do run_in_mt_newsched_task {
383 let mut tube = Tube::new();
384 let tube_clone = tube.clone();
385 let remote_cell = Cell::new_empty();
386 do Local::borrow |sched: &mut Scheduler| {
387 let tube_clone = tube_clone.clone();
388 let tube_clone_cell = Cell::new(tube_clone);
389 let remote = do sched.event_loop.remote_callback {
390 // This could be called multiple times
391 if !tube_clone_cell.is_empty() {
392 tube_clone_cell.take().send(1);
393 }
394 };
395 remote_cell.put_back(remote);
396 }
397 let thread = do Thread::start {
398 remote_cell.take().fire();
399 };
400
401 assert!(tube.recv() == 1);
402 thread.join();
403 }
404 }
405 }
406
407 pub struct UvIoFactory(Loop);
408
409 impl UvIoFactory {
410 pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
411 match self { &UvIoFactory(ref mut ptr) => ptr }
412 }
413 }
414
415 /// Helper for a variety of simple uv_fs_* functions that
416 /// have no ret val
417 fn uv_fs_helper<P: PathLike>(loop_: &mut Loop, path: &P,
418 cb: ~fn(&mut FsRequest, &mut Loop, &P,
419 ~fn(&FsRequest, Option<UvError>)))
420 -> Result<(), IoError> {
421 let result_cell = Cell::new_empty();
422 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
423 let path_cell = Cell::new(path);
424 do task::unkillable { // FIXME(#8674)
425 let scheduler: ~Scheduler = Local::take();
426 let mut new_req = FsRequest::new();
427 do scheduler.deschedule_running_task_and_then |_, task| {
428 let task_cell = Cell::new(task);
429 let path = path_cell.take();
430 do cb(&mut new_req, loop_, path) |_, err| {
431 let res = match err {
432 None => Ok(()),
433 Some(err) => Err(uv_error_to_io_error(err))
434 };
435 unsafe { (*result_cell_ptr).put_back(res); }
436 let scheduler: ~Scheduler = Local::take();
437 scheduler.resume_blocked_task_immediately(task_cell.take());
438 };
439 }
440 }
441 assert!(!result_cell.is_empty());
442 return result_cell.take();
443 }
444
445 impl IoFactory for UvIoFactory {
446 // Connect to an address and return a new stream
447 // NB: This blocks the task waiting on the connection.
448 // It would probably be better to return a future
449 fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
450 // Create a cell in the task to hold the result. We will fill
451 // the cell before resuming the task.
452 let result_cell = Cell::new_empty();
453 let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
454
455 // Block this task and take ownership, switch to scheduler context
456 do task::unkillable { // FIXME(#8674)
457 let scheduler: ~Scheduler = Local::take();
458 do scheduler.deschedule_running_task_and_then |_, task| {
459
460 let mut tcp = TcpWatcher::new(self.uv_loop());
461 let task_cell = Cell::new(task);
462
463 // Wait for a connection
464 do tcp.connect(addr) |stream, status| {
465 match status {
466 None => {
467 let tcp = NativeHandle::from_native_handle(stream.native_handle());
468 let home = get_handle_to_current_scheduler!();
469 let res = Ok(~UvTcpStream { watcher: tcp, home: home });
470
471 // Store the stream in the task's stack
472 unsafe { (*result_cell_ptr).put_back(res); }
473
474 // Context switch
475 let scheduler: ~Scheduler = Local::take();
476 scheduler.resume_blocked_task_immediately(task_cell.take());
477 }
478 Some(_) => {
479 let task_cell = Cell::new(task_cell.take());
480 do stream.close {
481 let res = Err(uv_error_to_io_error(status.unwrap()));
482 unsafe { (*result_cell_ptr).put_back(res); }
483 let scheduler: ~Scheduler = Local::take();
484 scheduler.resume_blocked_task_immediately(task_cell.take());
485 }
486 }
487 }
488 }
489 }
490 }
491
492 assert!(!result_cell.is_empty());
493 return result_cell.take();
494 }
495
496 fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> {
497 let mut watcher = TcpWatcher::new(self.uv_loop());
498 match watcher.bind(addr) {
499 Ok(_) => {
500 let home = get_handle_to_current_scheduler!();
501 Ok(~UvTcpListener::new(watcher, home))
502 }
503 Err(uverr) => {
504 do task::unkillable { // FIXME(#8674)
505 let scheduler: ~Scheduler = Local::take();
506 do scheduler.deschedule_running_task_and_then |_, task| {
507 let task_cell = Cell::new(task);
508 do watcher.as_stream().close {
509 let scheduler: ~Scheduler = Local::take();
510 scheduler.resume_blocked_task_immediately(task_cell.take());
511 }
512 }
513 Err(uv_error_to_io_error(uverr))
514 }
515 }
516 }
517 }
518
519 fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
520 let mut watcher = UdpWatcher::new(self.uv_loop());
521 match watcher.bind(addr) {
522 Ok(_) => {
523 let home = get_handle_to_current_scheduler!();
524 Ok(~UvUdpSocket { watcher: watcher, home: home })
525 }
526 Err(uverr) => {
527 do task::unkillable { // FIXME(#8674)
528 let scheduler: ~Scheduler = Local::take();
529 do scheduler.deschedule_running_task_and_then |_, task| {
530 let task_cell = Cell::new(task);
531 do watcher.close {
532 let scheduler: ~Scheduler = Local::take();
533 scheduler.resume_blocked_task_immediately(task_cell.take());
534 }
535 }
536 Err(uv_error_to_io_error(uverr))
537 }
538 }
539 }
540 }
541
542 fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
543 let watcher = TimerWatcher::new(self.uv_loop());
544 let home = get_handle_to_current_scheduler!();
545 Ok(~UvTimer::new(watcher, home))
546 }
547
548 fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream {
549 let loop_ = Loop {handle: self.uv_loop().native_handle()};
550 let home = get_handle_to_current_scheduler!();
551 ~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream
552 }
553
554 fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
555 -> Result<~RtioFileStream, IoError> {
556 let mut flags = match fm {
557 Open => 0,
558 Create => O_CREAT,
559 OpenOrCreate => O_CREAT,
560 Append => O_APPEND,
561 Truncate => O_TRUNC,
562 CreateOrTruncate => O_TRUNC | O_CREAT
563 };
564 flags = match fa {
565 Read => flags | O_RDONLY,
566 Write => flags | O_WRONLY,
567 ReadWrite => flags | O_RDWR
568 };
569 let create_mode = match fm {
570 Create|OpenOrCreate|CreateOrTruncate =>
571 S_IRUSR | S_IWUSR,
572 _ => 0
573 };
574 let result_cell = Cell::new_empty();
575 let result_cell_ptr: *Cell<Result<~RtioFileStream,
576 IoError>> = &result_cell;
577 let path_cell = Cell::new(path);
578 do task::unkillable { // FIXME(#8674)
579 let scheduler: ~Scheduler = Local::take();
580 let open_req = file::FsRequest::new();
581 do scheduler.deschedule_running_task_and_then |_, task| {
582 let task_cell = Cell::new(task);
583 let path = path_cell.take();
584 do open_req.open(self.uv_loop(), path, flags as int, create_mode as int)
585 |req,err| {
586 if err.is_none() {
587 let loop_ = Loop {handle: req.get_loop().native_handle()};
588 let home = get_handle_to_current_scheduler!();
589 let fd = req.get_result() as c_int;
590 let fs = ~UvFileStream::new(
591 loop_, fd, true, home) as ~RtioFileStream;
592 let res = Ok(fs);
593 unsafe { (*result_cell_ptr).put_back(res); }
594 let scheduler: ~Scheduler = Local::take();
595 scheduler.resume_blocked_task_immediately(task_cell.take());
596 } else {
597 let res = Err(uv_error_to_io_error(err.unwrap()));
598 unsafe { (*result_cell_ptr).put_back(res); }
599 let scheduler: ~Scheduler = Local::take();
600 scheduler.resume_blocked_task_immediately(task_cell.take());
601 }
602 };
603 };
604 };
605 assert!(!result_cell.is_empty());
606 return result_cell.take();
607 }
608
609 fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
610 do uv_fs_helper(self.uv_loop(), path) |unlink_req, l, p, cb| {
611 do unlink_req.unlink(l, p) |req, err| {
612 cb(req, err)
613 };
614 }
615 }
616 fn fs_stat<P: PathLike>(&mut self, path: &P) -> Result<FileStat, IoError> {
617 use str::StrSlice;
618 let result_cell = Cell::new_empty();
619 let result_cell_ptr: *Cell<Result<FileStat,
620 IoError>> = &result_cell;
621 let path_cell = Cell::new(path);
622 do task::unkillable { // FIXME(#8674)
623 let scheduler: ~Scheduler = Local::take();
624 let stat_req = file::FsRequest::new();
625 do scheduler.deschedule_running_task_and_then |_, task| {
626 let task_cell = Cell::new(task);
627 let path = path_cell.take();
628 let path_str = path.path_as_str(|p| p.to_owned());
629 do stat_req.stat(self.uv_loop(), path)
630 |req,err| {
631 let res = match err {
632 None => {
633 let stat = req.get_stat();
634 Ok(FileStat {
635 path: Path::new(path_str.as_slice()),
636 is_file: stat.is_file(),
637 is_dir: stat.is_dir(),
638 size: stat.st_size,
639 created: stat.st_ctim.tv_sec as u64,
640 modified: stat.st_mtim.tv_sec as u64,
641 accessed: stat.st_atim.tv_sec as u64
642 })
643 },
644 Some(e) => {
645 Err(uv_error_to_io_error(e))
646 }
647 };
648 unsafe { (*result_cell_ptr).put_back(res); }
649 let scheduler: ~Scheduler = Local::take();
650 scheduler.resume_blocked_task_immediately(task_cell.take());
651 };
652 };
653 };
654 assert!(!result_cell.is_empty());
655 return result_cell.take();
656 }
657
658 fn get_host_addresses(&mut self, host: &str) -> Result<~[IpAddr], IoError> {
659 let result_cell = Cell::new_empty();
660 let result_cell_ptr: *Cell<Result<~[IpAddr], IoError>> = &result_cell;
661 let host_ptr: *&str = &host;
662 let addrinfo_req = GetAddrInfoRequest::new();
663 let addrinfo_req_cell = Cell::new(addrinfo_req);
664 do task::unkillable { // FIXME(#8674)
665 let scheduler: ~Scheduler = Local::take();
666 do scheduler.deschedule_running_task_and_then |_, task| {
667 let task_cell = Cell::new(task);
668 let mut addrinfo_req = addrinfo_req_cell.take();
669 unsafe {
670 do addrinfo_req.getaddrinfo(self.uv_loop(),
671 Some(*host_ptr),
672 None, None) |_, addrinfo, err| {
673 let res = match err {
674 None => Ok(accum_sockaddrs(addrinfo).map(|addr| addr.ip.clone())),
675 Some(err) => Err(uv_error_to_io_error(err))
676 };
677 (*result_cell_ptr).put_back(res);
678 let scheduler: ~Scheduler = Local::take();
679 scheduler.resume_blocked_task_immediately(task_cell.take());
680 }
681 }
682 }
683 }
684 addrinfo_req.delete();
685 assert!(!result_cell.is_empty());
686 return result_cell.take();
687 }
688 fn fs_mkdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
689 let mode = S_IRWXU as int;
690 do uv_fs_helper(self.uv_loop(), path) |mkdir_req, l, p, cb| {
691 do mkdir_req.mkdir(l, p, mode as int) |req, err| {
692 cb(req, err)
693 };
694 }
695 }
696 fn fs_rmdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
697 do uv_fs_helper(self.uv_loop(), path) |rmdir_req, l, p, cb| {
698 do rmdir_req.rmdir(l, p) |req, err| {
699 cb(req, err)
700 };
701 }
702 }
703 fn fs_readdir<P: PathLike>(&mut self, path: &P, flags: c_int) ->
704 Result<~[Path], IoError> {
705 use str::StrSlice;
706 let result_cell = Cell::new_empty();
707 let result_cell_ptr: *Cell<Result<~[Path],
708 IoError>> = &result_cell;
709 let path_cell = Cell::new(path);
710 do task::unkillable { // FIXME(#8674)
711 let scheduler: ~Scheduler = Local::take();
712 let stat_req = file::FsRequest::new();
713 do scheduler.deschedule_running_task_and_then |_, task| {
714 let task_cell = Cell::new(task);
715 let path = path_cell.take();
716 let path_str = path.path_as_str(|p| p.to_owned());
717 do stat_req.readdir(self.uv_loop(), path, flags)
718 |req,err| {
719 let res = match err {
720 None => {
721 let rel_paths = req.get_paths();
722 let mut paths = ~[];
723 for r in rel_paths.iter() {
724 let mut p = Path::new(path_str.as_slice());
725 p.push(r.as_slice());
726 paths.push(p);
727 }
728 Ok(paths)
729 },
730 Some(e) => {
731 Err(uv_error_to_io_error(e))
732 }
733 };
734 unsafe { (*result_cell_ptr).put_back(res); }
735 let scheduler: ~Scheduler = Local::take();
736 scheduler.resume_blocked_task_immediately(task_cell.take());
737 };
738 };
739 };
740 assert!(!result_cell.is_empty());
741 return result_cell.take();
742 }
743
744 fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError> {
745 let home = get_handle_to_current_scheduler!();
746 Ok(~UvUnboundPipe { pipe: Pipe::new(self.uv_loop(), ipc), home: home })
747 }
748
749 fn spawn(&mut self, config: ProcessConfig)
750 -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>
751 {
752 // Sadly, we must create the UvProcess before we actually call uv_spawn
753 // so that the exit_cb can close over it and notify it when the process
754 // has exited.
755 let mut ret = ~UvProcess {
756 process: Process::new(),
757 home: None,
758 exit_status: None,
759 term_signal: None,
760 exit_error: None,
761 descheduled: None,
762 };
763 let ret_ptr = unsafe {
764 *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
765 };
766
767 // The purpose of this exit callback is to record the data about the
768 // exit and then wake up the task which may be waiting for the process
769 // to exit. This is all performed in the current io-loop, and the
770 // implementation of UvProcess ensures that reading these fields always
771 // occurs on the current io-loop.
772 let exit_cb: ExitCallback = |_, exit_status, term_signal, error| {
773 unsafe {
774 assert!((*ret_ptr).exit_status.is_none());
775 (*ret_ptr).exit_status = Some(exit_status);
776 (*ret_ptr).term_signal = Some(term_signal);
777 (*ret_ptr).exit_error = error;
778 match (*ret_ptr).descheduled.take() {
779 Some(task) => {
780 let scheduler: ~Scheduler = Local::take();
781 scheduler.resume_blocked_task_immediately(task);
782 }
783 None => {}
784 }
785 }
786 };
787
788 match ret.process.spawn(self.uv_loop(), config, exit_cb) {
789 Ok(io) => {
790 // Only now do we actually get a handle to this scheduler.
791 ret.home = Some(get_handle_to_current_scheduler!());
792 Ok((ret, io))
793 }
794 Err(uverr) => {
795 // We still need to close the process handle we created, but
796 // that's taken care for us in the destructor of UvProcess
797 Err(uv_error_to_io_error(uverr))
798 }
799 }
800 }
801 }
802
803 pub struct UvTcpListener {
804 watcher : TcpWatcher,
805 home: SchedHandle,
806 }
807
808 impl HomingIO for UvTcpListener {
809 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
810 }
811
812 impl UvTcpListener {
813 fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
814 UvTcpListener { watcher: watcher, home: home }
815 }
816 }
817
818 impl Drop for UvTcpListener {
819 fn drop(&mut self) {
820 do self.home_for_io_with_sched |self_, scheduler| {
821 do scheduler.deschedule_running_task_and_then |_, task| {
822 let task = Cell::new(task);
823 do self_.watcher.as_stream().close {
824 let scheduler: ~Scheduler = Local::take();
825 scheduler.resume_blocked_task_immediately(task.take());
826 }
827 }
828 }
829 }
830 }
831
832 impl RtioSocket for UvTcpListener {
833 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
834 do self.home_for_io |self_| {
835 socket_name(Tcp, self_.watcher)
836 }
837 }
838 }
839
840 impl RtioTcpListener for UvTcpListener {
841 fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> {
842 do self.home_for_io_consume |self_| {
843 let mut acceptor = ~UvTcpAcceptor::new(self_);
844 let incoming = Cell::new(acceptor.incoming.clone());
845 do acceptor.listener.watcher.listen |mut server, status| {
846 do incoming.with_mut_ref |incoming| {
847 let inc = match status {
848 Some(_) => Err(standard_error(OtherIoError)),
849 None => {
850 let inc = TcpWatcher::new(&server.event_loop());
851 // first accept call in the callback guarenteed to succeed
852 server.accept(inc.as_stream());
853 let home = get_handle_to_current_scheduler!();
854 Ok(~UvTcpStream { watcher: inc, home: home })
855 }
856 };
857 incoming.send(inc);
858 }
859 };
860 Ok(acceptor)
861 }
862 }
863 }
864
865 pub struct UvTcpAcceptor {
866 listener: UvTcpListener,
867 incoming: Tube<Result<~RtioTcpStreamObject, IoError>>,
868 }
869
870 impl HomingIO for UvTcpAcceptor {
871 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
872 }
873
874 impl UvTcpAcceptor {
875 fn new(listener: UvTcpListener) -> UvTcpAcceptor {
876 UvTcpAcceptor { listener: listener, incoming: Tube::new() }
877 }
878 }
879
880 impl RtioSocket for UvTcpAcceptor {
881 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
882 do self.home_for_io |self_| {
883 socket_name(Tcp, self_.listener.watcher)
884 }
885 }
886 }
887
888 impl RtioTcpAcceptor for UvTcpAcceptor {
889 fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
890 do self.home_for_io |self_| {
891 self_.incoming.recv()
892 }
893 }
894
895 fn accept_simultaneously(&mut self) -> Result<(), IoError> {
896 do self.home_for_io |self_| {
897 let r = unsafe {
898 uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int)
899 };
900
901 match status_to_maybe_uv_error(r) {
902 Some(err) => Err(uv_error_to_io_error(err)),
903 None => Ok(())
904 }
905 }
906 }
907
908 fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
909 do self.home_for_io |self_| {
910 let r = unsafe {
911 uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int)
912 };
913
914 match status_to_maybe_uv_error(r) {
915 Some(err) => Err(uv_error_to_io_error(err)),
916 None => Ok(())
917 }
918 }
919 }
920 }
921
922 fn read_stream(mut watcher: StreamWatcher,
923 scheduler: ~Scheduler,
924 buf: &mut [u8]) -> Result<uint, IoError> {
925 let result_cell = Cell::new_empty();
926 let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
927
928 let buf_ptr: *&mut [u8] = &buf;
929 do scheduler.deschedule_running_task_and_then |_sched, task| {
930 let task_cell = Cell::new(task);
931 // XXX: We shouldn't reallocate these callbacks every
932 // call to read
933 let alloc: AllocCallback = |_| unsafe {
934 slice_to_uv_buf(*buf_ptr)
935 };
936 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
937
938 // Stop reading so that no read callbacks are
939 // triggered before the user calls `read` again.
940 // XXX: Is there a performance impact to calling
941 // stop here?
942 watcher.read_stop();
943
944 let result = if status.is_none() {
945 assert!(nread >= 0);
946 Ok(nread as uint)
947 } else {
948 Err(uv_error_to_io_error(status.unwrap()))
949 };
950
951 unsafe { (*result_cell_ptr).put_back(result); }
952
953 let scheduler: ~Scheduler = Local::take();
954 scheduler.resume_blocked_task_immediately(task_cell.take());
955 }
956 }
957
958 assert!(!result_cell.is_empty());
959 result_cell.take()
960 }
961
962 fn write_stream(mut watcher: StreamWatcher,
963 scheduler: ~Scheduler,
964 buf: &[u8]) -> Result<(), IoError> {
965 let result_cell = Cell::new_empty();
966 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
967 let buf_ptr: *&[u8] = &buf;
968 do scheduler.deschedule_running_task_and_then |_, task| {
969 let task_cell = Cell::new(task);
970 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
971 do watcher.write(buf) |_watcher, status| {
972 let result = if status.is_none() {
973 Ok(())
974 } else {
975 Err(uv_error_to_io_error(status.unwrap()))
976 };
977
978 unsafe { (*result_cell_ptr).put_back(result); }
979
980 let scheduler: ~Scheduler = Local::take();
981 scheduler.resume_blocked_task_immediately(task_cell.take());
982 }
983 }
984
985 assert!(!result_cell.is_empty());
986 result_cell.take()
987 }
988
989 pub struct UvUnboundPipe {
990 pipe: Pipe,
991 home: SchedHandle,
992 }
993
994 impl HomingIO for UvUnboundPipe {
995 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
996 }
997
998 impl Drop for UvUnboundPipe {
999 fn drop(&mut self) {
1000 do self.home_for_io |self_| {
1001 let scheduler: ~Scheduler = Local::take();
1002 do scheduler.deschedule_running_task_and_then |_, task| {
1003 let task_cell = Cell::new(task);
1004 do self_.pipe.close {
1005 let scheduler: ~Scheduler = Local::take();
1006 scheduler.resume_blocked_task_immediately(task_cell.take());
1007 }
1008 }
1009 }
1010 }
1011 }
1012
1013 impl UvUnboundPipe {
1014 pub unsafe fn bind(~self) -> UvPipeStream {
1015 UvPipeStream { inner: self }
1016 }
1017 }
1018
1019 pub struct UvPipeStream {
1020 priv inner: ~UvUnboundPipe,
1021 }
1022
1023 impl UvPipeStream {
1024 pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream {
1025 UvPipeStream { inner: inner }
1026 }
1027 }
1028
1029 impl RtioPipe for UvPipeStream {
1030 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
1031 do self.inner.home_for_io_with_sched |self_, scheduler| {
1032 read_stream(self_.pipe.as_stream(), scheduler, buf)
1033 }
1034 }
1035 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
1036 do self.inner.home_for_io_with_sched |self_, scheduler| {
1037 write_stream(self_.pipe.as_stream(), scheduler, buf)
1038 }
1039 }
1040 }
1041
1042 pub struct UvTcpStream {
1043 watcher: TcpWatcher,
1044 home: SchedHandle,
1045 }
1046
1047 impl HomingIO for UvTcpStream {
1048 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1049 }
1050
1051 impl Drop for UvTcpStream {
1052 fn drop(&mut self) {
1053 do self.home_for_io_with_sched |self_, scheduler| {
1054 do scheduler.deschedule_running_task_and_then |_, task| {
1055 let task_cell = Cell::new(task);
1056 do self_.watcher.as_stream().close {
1057 let scheduler: ~Scheduler = Local::take();
1058 scheduler.resume_blocked_task_immediately(task_cell.take());
1059 }
1060 }
1061 }
1062 }
1063 }
1064
1065 impl RtioSocket for UvTcpStream {
1066 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
1067 do self.home_for_io |self_| {
1068 socket_name(Tcp, self_.watcher)
1069 }
1070 }
1071 }
1072
1073 impl RtioTcpStream for UvTcpStream {
1074 fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
1075 do self.home_for_io_with_sched |self_, scheduler| {
1076 read_stream(self_.watcher.as_stream(), scheduler, buf)
1077 }
1078 }
1079
1080 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
1081 do self.home_for_io_with_sched |self_, scheduler| {
1082 write_stream(self_.watcher.as_stream(), scheduler, buf)
1083 }
1084 }
1085
1086 fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
1087 do self.home_for_io |self_| {
1088 socket_name(TcpPeer, self_.watcher)
1089 }
1090 }
1091
1092 fn control_congestion(&mut self) -> Result<(), IoError> {
1093 do self.home_for_io |self_| {
1094 let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
1095
1096 match status_to_maybe_uv_error(r) {
1097 Some(err) => Err(uv_error_to_io_error(err)),
1098 None => Ok(())
1099 }
1100 }
1101 }
1102
1103 fn nodelay(&mut self) -> Result<(), IoError> {
1104 do self.home_for_io |self_| {
1105 let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
1106
1107 match status_to_maybe_uv_error(r) {
1108 Some(err) => Err(uv_error_to_io_error(err)),
1109 None => Ok(())
1110 }
1111 }
1112 }
1113
1114 fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
1115 do self.home_for_io |self_| {
1116 let r = unsafe {
1117 uvll::tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
1118 delay_in_seconds as c_uint)
1119 };
1120
1121 match status_to_maybe_uv_error(r) {
1122 Some(err) => Err(uv_error_to_io_error(err)),
1123 None => Ok(())
1124 }
1125 }
1126 }
1127
1128 fn letdie(&mut self) -> Result<(), IoError> {
1129 do self.home_for_io |self_| {
1130 let r = unsafe {
1131 uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
1132 };
1133
1134 match status_to_maybe_uv_error(r) {
1135 Some(err) => Err(uv_error_to_io_error(err)),
1136 None => Ok(())
1137 }
1138 }
1139 }
1140 }
1141
1142 pub struct UvUdpSocket {
1143 watcher: UdpWatcher,
1144 home: SchedHandle,
1145 }
1146
1147 impl HomingIO for UvUdpSocket {
1148 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1149 }
1150
1151 impl Drop for UvUdpSocket {
1152 fn drop(&mut self) {
1153 do self.home_for_io_with_sched |self_, scheduler| {
1154 do scheduler.deschedule_running_task_and_then |_, task| {
1155 let task_cell = Cell::new(task);
1156 do self_.watcher.close {
1157 let scheduler: ~Scheduler = Local::take();
1158 scheduler.resume_blocked_task_immediately(task_cell.take());
1159 }
1160 }
1161 }
1162 }
1163 }
1164
1165 impl RtioSocket for UvUdpSocket {
1166 fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
1167 do self.home_for_io |self_| {
1168 socket_name(Udp, self_.watcher)
1169 }
1170 }
1171 }
1172
1173 impl RtioUdpSocket for UvUdpSocket {
1174 fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
1175 do self.home_for_io_with_sched |self_, scheduler| {
1176 let result_cell = Cell::new_empty();
1177 let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
1178
1179 let buf_ptr: *&mut [u8] = &buf;
1180 do scheduler.deschedule_running_task_and_then |_, task| {
1181 let task_cell = Cell::new(task);
1182 let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
1183 do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
1184 let _ = flags; // /XXX add handling for partials?
1185
1186 watcher.recv_stop();
1187
1188 let result = match status {
1189 None => {
1190 assert!(nread >= 0);
1191 Ok((nread as uint, addr))
1192 }
1193 Some(err) => Err(uv_error_to_io_error(err)),
1194 };
1195
1196 unsafe { (*result_cell_ptr).put_back(result); }
1197
1198 let scheduler: ~Scheduler = Local::take();
1199 scheduler.resume_blocked_task_immediately(task_cell.take());
1200 }
1201 }
1202
1203 assert!(!result_cell.is_empty());
1204 result_cell.take()
1205 }
1206 }
1207
1208 fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
1209 do self.home_for_io_with_sched |self_, scheduler| {
1210 let result_cell = Cell::new_empty();
1211 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
1212 let buf_ptr: *&[u8] = &buf;
1213 do scheduler.deschedule_running_task_and_then |_, task| {
1214 let task_cell = Cell::new(task);
1215 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1216 do self_.watcher.send(buf, dst) |_watcher, status| {
1217
1218 let result = match status {
1219 None => Ok(()),
1220 Some(err) => Err(uv_error_to_io_error(err)),
1221 };
1222
1223 unsafe { (*result_cell_ptr).put_back(result); }
1224
1225 let scheduler: ~Scheduler = Local::take();
1226 scheduler.resume_blocked_task_immediately(task_cell.take());
1227 }
1228 }
1229
1230 assert!(!result_cell.is_empty());
1231 result_cell.take()
1232 }
1233 }
1234
1235 fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
1236 do self.home_for_io |self_| {
1237 let r = unsafe {
1238 do multi.to_str().with_c_str |m_addr| {
1239 uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
1240 ptr::null(), uvll::UV_JOIN_GROUP)
1241 }
1242 };
1243
1244 match status_to_maybe_uv_error(r) {
1245 Some(err) => Err(uv_error_to_io_error(err)),
1246 None => Ok(())
1247 }
1248 }
1249 }
1250
1251 fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
1252 do self.home_for_io |self_| {
1253 let r = unsafe {
1254 do multi.to_str().with_c_str |m_addr| {
1255 uvll::udp_set_membership(self_.watcher.native_handle(), m_addr,
1256 ptr::null(), uvll::UV_LEAVE_GROUP)
1257 }
1258 };
1259
1260 match status_to_maybe_uv_error(r) {
1261 Some(err) => Err(uv_error_to_io_error(err)),
1262 None => Ok(())
1263 }
1264 }
1265 }
1266
1267 fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
1268 do self.home_for_io |self_| {
1269
1270 let r = unsafe {
1271 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
1272 };
1273
1274 match status_to_maybe_uv_error(r) {
1275 Some(err) => Err(uv_error_to_io_error(err)),
1276 None => Ok(())
1277 }
1278 }
1279 }
1280
1281 fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
1282 do self.home_for_io |self_| {
1283
1284 let r = unsafe {
1285 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
1286 };
1287
1288 match status_to_maybe_uv_error(r) {
1289 Some(err) => Err(uv_error_to_io_error(err)),
1290 None => Ok(())
1291 }
1292 }
1293 }
1294
1295 fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1296 do self.home_for_io |self_| {
1297
1298 let r = unsafe {
1299 uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
1300 };
1301
1302 match status_to_maybe_uv_error(r) {
1303 Some(err) => Err(uv_error_to_io_error(err)),
1304 None => Ok(())
1305 }
1306 }
1307 }
1308
1309 fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
1310 do self.home_for_io |self_| {
1311
1312 let r = unsafe {
1313 uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
1314 };
1315
1316 match status_to_maybe_uv_error(r) {
1317 Some(err) => Err(uv_error_to_io_error(err)),
1318 None => Ok(())
1319 }
1320 }
1321 }
1322
1323 fn hear_broadcasts(&mut self) -> Result<(), IoError> {
1324 do self.home_for_io |self_| {
1325
1326 let r = unsafe {
1327 uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
1328 };
1329
1330 match status_to_maybe_uv_error(r) {
1331 Some(err) => Err(uv_error_to_io_error(err)),
1332 None => Ok(())
1333 }
1334 }
1335 }
1336
1337 fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
1338 do self.home_for_io |self_| {
1339
1340 let r = unsafe {
1341 uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
1342 };
1343
1344 match status_to_maybe_uv_error(r) {
1345 Some(err) => Err(uv_error_to_io_error(err)),
1346 None => Ok(())
1347 }
1348 }
1349 }
1350 }
1351
1352 pub struct UvTimer {
1353 watcher: timer::TimerWatcher,
1354 home: SchedHandle,
1355 }
1356
1357 impl HomingIO for UvTimer {
1358 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1359 }
1360
1361 impl UvTimer {
1362 fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
1363 UvTimer { watcher: w, home: home }
1364 }
1365 }
1366
1367 impl Drop for UvTimer {
1368 fn drop(&mut self) {
1369 do self.home_for_io_with_sched |self_, scheduler| {
1370 rtdebug!("closing UvTimer");
1371 do scheduler.deschedule_running_task_and_then |_, task| {
1372 let task_cell = Cell::new(task);
1373 do self_.watcher.close {
1374 let scheduler: ~Scheduler = Local::take();
1375 scheduler.resume_blocked_task_immediately(task_cell.take());
1376 }
1377 }
1378 }
1379 }
1380 }
1381
1382 impl RtioTimer for UvTimer {
1383 fn sleep(&mut self, msecs: u64) {
1384 do self.home_for_io_with_sched |self_, scheduler| {
1385 do scheduler.deschedule_running_task_and_then |_sched, task| {
1386 rtdebug!("sleep: entered scheduler context");
1387 let task_cell = Cell::new(task);
1388 do self_.watcher.start(msecs, 0) |_, status| {
1389 assert!(status.is_none());
1390 let scheduler: ~Scheduler = Local::take();
1391 scheduler.resume_blocked_task_immediately(task_cell.take());
1392 }
1393 }
1394 self_.watcher.stop();
1395 }
1396 }
1397 }
1398
1399 pub struct UvFileStream {
1400 loop_: Loop,
1401 fd: c_int,
1402 close_on_drop: bool,
1403 home: SchedHandle
1404 }
1405
1406 impl HomingIO for UvFileStream {
1407 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
1408 }
1409
1410 impl UvFileStream {
1411 fn new(loop_: Loop, fd: c_int, close_on_drop: bool,
1412 home: SchedHandle) -> UvFileStream {
1413 UvFileStream {
1414 loop_: loop_,
1415 fd: fd,
1416 close_on_drop: close_on_drop,
1417 home: home
1418 }
1419 }
1420 fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
1421 let result_cell = Cell::new_empty();
1422 let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
1423 let buf_ptr: *&mut [u8] = &buf;
1424 do self.home_for_io_with_sched |self_, scheduler| {
1425 do scheduler.deschedule_running_task_and_then |_, task| {
1426 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1427 let task_cell = Cell::new(task);
1428 let read_req = file::FsRequest::new();
1429 do read_req.read(&self_.loop_, self_.fd, buf, offset) |req, uverr| {
1430 let res = match uverr {
1431 None => Ok(req.get_result() as int),
1432 Some(err) => Err(uv_error_to_io_error(err))
1433 };
1434 unsafe { (*result_cell_ptr).put_back(res); }
1435 let scheduler: ~Scheduler = Local::take();
1436 scheduler.resume_blocked_task_immediately(task_cell.take());
1437 };
1438 };
1439 };
1440 result_cell.take()
1441 }
1442 fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
1443 let result_cell = Cell::new_empty();
1444 let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
1445 let buf_ptr: *&[u8] = &buf;
1446 do self.home_for_io_with_sched |self_, scheduler| {
1447 do scheduler.deschedule_running_task_and_then |_, task| {
1448 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
1449 let task_cell = Cell::new(task);
1450 let write_req = file::FsRequest::new();
1451 do write_req.write(&self_.loop_, self_.fd, buf, offset) |_, uverr| {
1452 let res = match uverr {
1453 None => Ok(()),
1454 Some(err) => Err(uv_error_to_io_error(err))
1455 };
1456 unsafe { (*result_cell_ptr).put_back(res); }
1457 let scheduler: ~Scheduler = Local::take();
1458 scheduler.resume_blocked_task_immediately(task_cell.take());
1459 };
1460 };
1461 };
1462 result_cell.take()
1463 }
1464 fn seek_common(&mut self, pos: i64, whence: c_int) ->
1465 Result<u64, IoError>{
1466 #[fixed_stack_segment]; #[inline(never)];
1467 unsafe {
1468 match lseek(self.fd, pos as off_t, whence) {
1469 -1 => {
1470 Err(IoError {
1471 kind: OtherIoError,
1472 desc: "Failed to lseek.",
1473 detail: None
1474 })
1475 },
1476 n => Ok(n as u64)
1477 }
1478 }
1479 }
1480 }
1481
1482 impl Drop for UvFileStream {
1483 fn drop(&mut self) {
1484 if self.close_on_drop {
1485 do self.home_for_io_with_sched |self_, scheduler| {
1486 do scheduler.deschedule_running_task_and_then |_, task| {
1487 let task_cell = Cell::new(task);
1488 let close_req = file::FsRequest::new();
1489 do close_req.close(&self_.loop_, self_.fd) |_,_| {
1490 let scheduler: ~Scheduler = Local::take();
1491 scheduler.resume_blocked_task_immediately(task_cell.take());
1492 };
1493 };
1494 }
1495 }
1496 }
1497 }
1498
1499 impl RtioFileStream for UvFileStream {
1500 fn read(&mut self, buf: &mut [u8]) -> Result<int, IoError> {
1501 self.base_read(buf, -1)
1502 }
1503 fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
1504 self.base_write(buf, -1)
1505 }
1506 fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
1507 self.base_read(buf, offset as i64)
1508 }
1509 fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
1510 self.base_write(buf, offset as i64)
1511 }
1512 fn seek(&mut self, pos: i64, whence: SeekStyle) -> Result<u64, IoError> {
1513 use libc::{SEEK_SET, SEEK_CUR, SEEK_END};
1514 let whence = match whence {
1515 SeekSet => SEEK_SET,
1516 SeekCur => SEEK_CUR,
1517 SeekEnd => SEEK_END
1518 };
1519 self.seek_common(pos, whence)
1520 }
1521 fn tell(&self) -> Result<u64, IoError> {
1522 use libc::SEEK_CUR;
1523 // this is temporary
1524 let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
1525 self_.seek_common(0, SEEK_CUR)
1526 }
1527 fn flush(&mut self) -> Result<(), IoError> {
1528 Ok(())
1529 }
1530 }
1531
1532 pub struct UvProcess {
1533 process: process::Process,
1534
1535 // Sadly, this structure must be created before we return it, so in that
1536 // brief interim the `home` is None.
1537 home: Option<SchedHandle>,
1538
1539 // All None until the process exits (exit_error may stay None)
1540 priv exit_status: Option<int>,
1541 priv term_signal: Option<int>,
1542 priv exit_error: Option<UvError>,
1543
1544 // Used to store which task to wake up from the exit_cb
1545 priv descheduled: Option<BlockedTask>,
1546 }
1547
1548 impl HomingIO for UvProcess {
1549 fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() }
1550 }
1551
1552 impl Drop for UvProcess {
1553 fn drop(&mut self) {
1554 let close = |self_: &mut UvProcess| {
1555 let scheduler: ~Scheduler = Local::take();
1556 do scheduler.deschedule_running_task_and_then |_, task| {
1557 let task = Cell::new(task);
1558 do self_.process.close {
1559 let scheduler: ~Scheduler = Local::take();
1560 scheduler.resume_blocked_task_immediately(task.take());
1561 }
1562 }
1563 };
1564
1565 // If home is none, then this process never actually successfully
1566 // spawned, so there's no need to switch event loops
1567 if self.home.is_none() {
1568 close(self)
1569 } else {
1570 self.home_for_io(close)
1571 }
1572 }
1573 }
1574
1575 impl RtioProcess for UvProcess {
1576 fn id(&self) -> pid_t {
1577 self.process.pid()
1578 }
1579
1580 fn kill(&mut self, signal: int) -> Result<(), IoError> {
1581 do self.home_for_io |self_| {
1582 match self_.process.kill(signal) {
1583 Ok(()) => Ok(()),
1584 Err(uverr) => Err(uv_error_to_io_error(uverr))
1585 }
1586 }
1587 }
1588
1589 fn wait(&mut self) -> int {
1590 // Make sure (on the home scheduler) that we have an exit status listed
1591 do self.home_for_io |self_| {
1592 match self_.exit_status {
1593 Some(*) => {}
1594 None => {
1595 // If there's no exit code previously listed, then the
1596 // process's exit callback has yet to be invoked. We just
1597 // need to deschedule ourselves and wait to be reawoken.
1598 let scheduler: ~Scheduler = Local::take();
1599 do scheduler.deschedule_running_task_and_then |_, task| {
1600 assert!(self_.descheduled.is_none());
1601 self_.descheduled = Some(task);
1602 }
1603 assert!(self_.exit_status.is_some());
1604 }
1605 }
1606 }
1607
1608 self.exit_status.unwrap()
1609 }
1610 }
1611
1612 #[test]
1613 fn test_simple_io_no_connect() {
1614 do run_in_mt_newsched_task {
1615 unsafe {
1616 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1617 let addr = next_test_ip4();
1618 let maybe_chan = (*io).tcp_connect(addr);
1619 assert!(maybe_chan.is_err());
1620 }
1621 }
1622 }
1623
1624 #[test]
1625 fn test_simple_udp_io_bind_only() {
1626 do run_in_mt_newsched_task {
1627 unsafe {
1628 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1629 let addr = next_test_ip4();
1630 let maybe_socket = (*io).udp_bind(addr);
1631 assert!(maybe_socket.is_ok());
1632 }
1633 }
1634 }
1635
1636 #[test]
1637 fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
1638 use rt::sleeper_list::SleeperList;
1639 use rt::work_queue::WorkQueue;
1640 use rt::thread::Thread;
1641 use rt::task::Task;
1642 use rt::sched::{Shutdown, TaskFromFriend};
1643 do run_in_bare_thread {
1644 let sleepers = SleeperList::new();
1645 let work_queue1 = WorkQueue::new();
1646 let work_queue2 = WorkQueue::new();
1647 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1648
1649 let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1650 sleepers.clone());
1651 let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1652 sleepers.clone());
1653
1654 let handle1 = Cell::new(sched1.make_handle());
1655 let handle2 = Cell::new(sched2.make_handle());
1656 let tasksFriendHandle = Cell::new(sched2.make_handle());
1657
1658 let on_exit: ~fn(bool) = |exit_status| {
1659 handle1.take().send(Shutdown);
1660 handle2.take().send(Shutdown);
1661 rtassert!(exit_status);
1662 };
1663
1664 let test_function: ~fn() = || {
1665 let io: *mut IoFactoryObject = unsafe {
1666 Local::unsafe_borrow()
1667 };
1668 let addr = next_test_ip4();
1669 let maybe_socket = unsafe { (*io).udp_bind(addr) };
1670 // this socket is bound to this event loop
1671 assert!(maybe_socket.is_ok());
1672
1673 // block self on sched1
1674 do task::unkillable { // FIXME(#8674)
1675 let scheduler: ~Scheduler = Local::take();
1676 do scheduler.deschedule_running_task_and_then |_, task| {
1677 // unblock task
1678 do task.wake().map |task| {
1679 // send self to sched2
1680 tasksFriendHandle.take().send(TaskFromFriend(task));
1681 };
1682 // sched1 should now sleep since it has nothing else to do
1683 }
1684 }
1685 // sched2 will wake up and get the task
1686 // as we do nothing else, the function ends and the socket goes out of scope
1687 // sched2 will start to run the destructor
1688 // the destructor will first block the task, set it's home as sched1, then enqueue it
1689 // sched2 will dequeue the task, see that it has a home, and send it to sched1
1690 // sched1 will wake up, exec the close function on the correct loop, and then we're done
1691 };
1692
1693 let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
1694 main_task.death.on_exit = Some(on_exit);
1695 let main_task = Cell::new(main_task);
1696
1697 let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});
1698
1699 let sched1 = Cell::new(sched1);
1700 let sched2 = Cell::new(sched2);
1701
1702 let thread1 = do Thread::start {
1703 sched1.take().bootstrap(main_task.take());
1704 };
1705 let thread2 = do Thread::start {
1706 sched2.take().bootstrap(null_task.take());
1707 };
1708
1709 thread1.join();
1710 thread2.join();
1711 }
1712 }
1713
1714 #[test]
1715 fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() {
1716 use rt::sleeper_list::SleeperList;
1717 use rt::work_queue::WorkQueue;
1718 use rt::thread::Thread;
1719 use rt::task::Task;
1720 use rt::comm::oneshot;
1721 use rt::sched::Shutdown;
1722 do run_in_bare_thread {
1723 let sleepers = SleeperList::new();
1724 let work_queue1 = WorkQueue::new();
1725 let work_queue2 = WorkQueue::new();
1726 let queues = ~[work_queue1.clone(), work_queue2.clone()];
1727
1728 let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
1729 sleepers.clone());
1730 let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
1731 sleepers.clone());
1732
1733 let handle1 = Cell::new(sched1.make_handle());
1734 let handle2 = Cell::new(sched2.make_handle());
1735
1736 let (port, chan) = oneshot();
1737 let port = Cell::new(port);
1738 let chan = Cell::new(chan);
1739
1740 let body1: ~fn() = || {
1741 let io: *mut IoFactoryObject = unsafe {
1742 Local::unsafe_borrow()
1743 };
1744 let addr = next_test_ip4();
1745 let socket = unsafe { (*io).udp_bind(addr) };
1746 assert!(socket.is_ok());
1747 chan.take().send(socket);
1748 };
1749
1750 let body2: ~fn() = || {
1751 let socket = port.take().recv();
1752 assert!(socket.is_ok());
1753 /* The socket goes out of scope and the destructor is called.
1754 * The destructor:
1755 * - sends itself back to sched1
1756 * - frees the socket
1757 * - resets the home of the task to whatever it was previously
1758 */
1759 };
1760
1761 let on_exit: ~fn(bool) = |exit| {
1762 handle1.take().send(Shutdown);
1763 handle2.take().send(Shutdown);
1764 rtassert!(exit);
1765 };
1766
1767 let task1 = Cell::new(~Task::new_root(&mut sched1.stack_pool, None, body1));
1768
1769 let mut task2 = ~Task::new_root(&mut sched2.stack_pool, None, body2);
1770 task2.death.on_exit = Some(on_exit);
1771 let task2 = Cell::new(task2);
1772
1773 let sched1 = Cell::new(sched1);
1774 let sched2 = Cell::new(sched2);
1775
1776 let thread1 = do Thread::start {
1777 sched1.take().bootstrap(task1.take());
1778 };
1779 let thread2 = do Thread::start {
1780 sched2.take().bootstrap(task2.take());
1781 };
1782
1783 thread1.join();
1784 thread2.join();
1785 }
1786 }
1787
1788 #[test]
1789 fn test_simple_tcp_server_and_client() {
1790 do run_in_mt_newsched_task {
1791 let addr = next_test_ip4();
1792 let (port, chan) = oneshot();
1793 let port = Cell::new(port);
1794 let chan = Cell::new(chan);
1795
1796 // Start the server first so it's listening when we connect
1797 do spawntask {
1798 unsafe {
1799 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1800 let listener = (*io).tcp_bind(addr).unwrap();
1801 let mut acceptor = listener.listen().unwrap();
1802 chan.take().send(());
1803 let mut stream = acceptor.accept().unwrap();
1804 let mut buf = [0, .. 2048];
1805 let nread = stream.read(buf).unwrap();
1806 assert_eq!(nread, 8);
1807 for i in range(0u, nread) {
1808 rtdebug!("{}", buf[i]);
1809 assert_eq!(buf[i], i as u8);
1810 }
1811 }
1812 }
1813
1814 do spawntask {
1815 unsafe {
1816 port.take().recv();
1817 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1818 let mut stream = (*io).tcp_connect(addr).unwrap();
1819 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1820 }
1821 }
1822 }
1823 }
1824
1825 #[test]
1826 fn test_simple_tcp_server_and_client_on_diff_threads() {
1827 use rt::sleeper_list::SleeperList;
1828 use rt::work_queue::WorkQueue;
1829 use rt::thread::Thread;
1830 use rt::task::Task;
1831 use rt::sched::{Shutdown};
1832 do run_in_bare_thread {
1833 let sleepers = SleeperList::new();
1834
1835 let server_addr = next_test_ip4();
1836 let client_addr = server_addr.clone();
1837
1838 let server_work_queue = WorkQueue::new();
1839 let client_work_queue = WorkQueue::new();
1840 let queues = ~[server_work_queue.clone(), client_work_queue.clone()];
1841
1842 let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue,
1843 queues.clone(), sleepers.clone());
1844 let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue,
1845 queues.clone(), sleepers.clone());
1846
1847 let server_handle = Cell::new(server_sched.make_handle());
1848 let client_handle = Cell::new(client_sched.make_handle());
1849
1850 let server_on_exit: ~fn(bool) = |exit_status| {
1851 server_handle.take().send(Shutdown);
1852 rtassert!(exit_status);
1853 };
1854
1855 let client_on_exit: ~fn(bool) = |exit_status| {
1856 client_handle.take().send(Shutdown);
1857 rtassert!(exit_status);
1858 };
1859
1860 let server_fn: ~fn() = || {
1861 let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
1862 let listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
1863 let mut acceptor = listener.listen().unwrap();
1864 let mut stream = acceptor.accept().unwrap();
1865 let mut buf = [0, .. 2048];
1866 let nread = stream.read(buf).unwrap();
1867 assert_eq!(nread, 8);
1868 for i in range(0u, nread) {
1869 assert_eq!(buf[i], i as u8);
1870 }
1871 };
1872
1873 let client_fn: ~fn() = || {
1874 let io: *mut IoFactoryObject = unsafe {
1875 Local::unsafe_borrow()
1876 };
1877 let mut stream = unsafe { (*io).tcp_connect(client_addr) };
1878 while stream.is_err() {
1879 stream = unsafe { (*io).tcp_connect(client_addr) };
1880 }
1881 stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
1882 };
1883
1884 let mut server_task = ~Task::new_root(&mut server_sched.stack_pool, None, server_fn);
1885 server_task.death.on_exit = Some(server_on_exit);
1886 let server_task = Cell::new(server_task);
1887
1888 let mut client_task = ~Task::new_root(&mut client_sched.stack_pool, None, client_fn);
1889 client_task.death.on_exit = Some(client_on_exit);
1890 let client_task = Cell::new(client_task);
1891
1892 let server_sched = Cell::new(server_sched);
1893 let client_sched = Cell::new(client_sched);
1894
1895 let server_thread = do Thread::start {
1896 server_sched.take().bootstrap(server_task.take());
1897 };
1898 let client_thread = do Thread::start {
1899 client_sched.take().bootstrap(client_task.take());
1900 };
1901
1902 server_thread.join();
1903 client_thread.join();
1904 }
1905 }
1906
1907 #[test]
1908 fn test_simple_udp_server_and_client() {
1909 do run_in_mt_newsched_task {
1910 let server_addr = next_test_ip4();
1911 let client_addr = next_test_ip4();
1912 let (port, chan) = oneshot();
1913 let port = Cell::new(port);
1914 let chan = Cell::new(chan);
1915
1916 do spawntask {
1917 unsafe {
1918 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1919 let mut server_socket = (*io).udp_bind(server_addr).unwrap();
1920 chan.take().send(());
1921 let mut buf = [0, .. 2048];
1922 let (nread,src) = server_socket.recvfrom(buf).unwrap();
1923 assert_eq!(nread, 8);
1924 for i in range(0u, nread) {
1925 rtdebug!("{}", buf[i]);
1926 assert_eq!(buf[i], i as u8);
1927 }
1928 assert_eq!(src, client_addr);
1929 }
1930 }
1931
1932 do spawntask {
1933 unsafe {
1934 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1935 let mut client_socket = (*io).udp_bind(client_addr).unwrap();
1936 port.take().recv();
1937 client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
1938 }
1939 }
1940 }
1941 }
1942
1943 #[test] #[ignore(reason = "busted")]
1944 fn test_read_and_block() {
1945 do run_in_mt_newsched_task {
1946 let addr = next_test_ip4();
1947 let (port, chan) = oneshot();
1948 let port = Cell::new(port);
1949 let chan = Cell::new(chan);
1950
1951 do spawntask {
1952 let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
1953 let listener = unsafe { (*io).tcp_bind(addr).unwrap() };
1954 let mut acceptor = listener.listen().unwrap();
1955 chan.take().send(());
1956 let mut stream = acceptor.accept().unwrap();
1957 let mut buf = [0, .. 2048];
1958
1959 let expected = 32;
1960 let mut current = 0;
1961 let mut reads = 0;
1962
1963 while current < expected {
1964 let nread = stream.read(buf).unwrap();
1965 for i in range(0u, nread) {
1966 let val = buf[i] as uint;
1967 assert_eq!(val, current % 8);
1968 current += 1;
1969 }
1970 reads += 1;
1971
1972 do task::unkillable { // FIXME(#8674)
1973 let scheduler: ~Scheduler = Local::take();
1974 // Yield to the other task in hopes that it
1975 // will trigger a read callback while we are
1976 // not ready for it
1977 do scheduler.deschedule_running_task_and_then |sched, task| {
1978 let task = Cell::new(task);
1979 sched.enqueue_blocked_task(task.take());
1980 }
1981 }
1982 }
1983
1984 // Make sure we had multiple reads
1985 assert!(reads > 1);
1986 }
1987
1988 do spawntask {
1989 unsafe {
1990 port.take().recv();
1991 let io: *mut IoFactoryObject = Local::unsafe_borrow();
1992 let mut stream = (*io).tcp_connect(addr).unwrap();
1993 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1994 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1995 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1996 stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
1997 }
1998 }
1999
2000 }
2001 }
2002
2003 #[test]
2004 fn test_read_read_read() {
2005 do run_in_mt_newsched_task {
2006 let addr = next_test_ip4();
2007 static MAX: uint = 500000;
2008 let (port, chan) = oneshot();
2009 let port = Cell::new(port);
2010 let chan = Cell::new(chan);
2011
2012 do spawntask {
2013 unsafe {
2014 let io: *mut IoFactoryObject = Local::unsafe_borrow();
2015 let listener = (*io).tcp_bind(addr).unwrap();
2016 let mut acceptor = listener.listen().unwrap();
2017 chan.take().send(());
2018 let mut stream = acceptor.accept().unwrap();
2019 let buf = [1, .. 2048];
2020 let mut total_bytes_written = 0;
2021 while total_bytes_written < MAX {
2022 stream.write(buf);
2023 total_bytes_written += buf.len();
2024 }
2025 }
2026 }
2027
2028 do spawntask {
2029 unsafe {
2030 port.take().recv();
2031 let io: *mut IoFactoryObject = Local::unsafe_borrow();
2032 let mut stream = (*io).tcp_connect(addr).unwrap();
2033 let mut buf = [0, .. 2048];
2034 let mut total_bytes_read = 0;
2035 while total_bytes_read < MAX {
2036 let nread = stream.read(buf).unwrap();
2037 rtdebug!("read {} bytes", nread);
2038 total_bytes_read += nread;
2039 for i in range(0u, nread) {
2040 assert_eq!(buf[i], 1);
2041 }
2042 }
2043 rtdebug!("read {} bytes total", total_bytes_read);
2044 }
2045 }
2046 }
2047 }
2048
2049 #[test]
2050 fn test_udp_twice() {
2051 do run_in_mt_newsched_task {
2052 let server_addr = next_test_ip4();
2053 let client_addr = next_test_ip4();
2054 let (port, chan) = oneshot();
2055 let port = Cell::new(port);
2056 let chan = Cell::new(chan);
2057
2058 do spawntask {
2059 unsafe {
2060 let io: *mut IoFactoryObject = Local::unsafe_borrow();
2061 let mut client = (*io).udp_bind(client_addr).unwrap();
2062 port.take().recv();
2063 assert!(client.sendto([1], server_addr).is_ok());
2064 assert!(client.sendto([2], server_addr).is_ok());
2065 }
2066 }
2067
2068 do spawntask {
2069 unsafe {
2070 let io: *mut IoFactoryObject = Local::unsafe_borrow();
2071 let mut server = (*io).udp_bind(server_addr).unwrap();
2072 chan.take().send(());
2073 let mut buf1 = [0];
2074 let mut buf2 = [0];
2075 let (nread1, src1) = server.recvfrom(buf1).unwrap();
2076 let (nread2, src2) = server.recvfrom(buf2).unwrap();
2077 assert_eq!(nread1, 1);
2078 assert_eq!(nread2, 1);
2079 assert_eq!(src1, client_addr);
2080 assert_eq!(src2, client_addr);
2081 assert_eq!(buf1[0], 1);
2082 assert_eq!(buf2[0], 2);
2083 }
2084 }
2085 }
2086 }
2087
2088 #[test]
2089 fn test_udp_many_read() {
2090 do run_in_mt_newsched_task {
2091 let server_out_addr = next_test_ip4();
2092 let server_in_addr = next_test_ip4();
2093 let client_out_addr = next_test_ip4();
2094 let client_in_addr = next_test_ip4();
2095 static MAX: uint = 500_000;
2096
2097 let (p1, c1) = oneshot();
2098 let (p2, c2) = oneshot();
2099
2100 let first = Cell::new((p1, c2));
2101 let second = Cell::new((p2, c1));
2102
2103 do spawntask {
2104 unsafe {
2105 let io: *mut IoFactoryObject = Local::unsafe_borrow();
2106 let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
2107 let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
2108 let (port, chan) = first.take();
2109 chan.send(());
2110 port.recv();
2111 let msg = [1, .. 2048];
2112 let mut total_bytes_sent = 0;
2113 let mut buf = [1];
2114 while buf[0] == 1 {
2115 // send more data
2116 assert!(server_out.sendto(msg, client_in_addr).is_ok());
2117 total_bytes_sent += msg.len();
2118 // check if the client has received enough
2119 let res = server_in.recvfrom(buf);
2120 assert!(res.is_ok());
2121 let (nread, src) = res.unwrap();
2122 assert_eq!(nread, 1);
2123 assert_eq!(src, client_out_addr);
2124 }
2125 assert!(total_bytes_sent >= MAX);
2126 }
2127 }
2128
2129 do spawntask {
2130 unsafe {
2131 let io: *mut IoFactoryObject = Local::unsafe_borrow();
2132 let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
2133 let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
2134 let (port, chan) = second.take();
2135 port.recv();
2136 chan.send(());
2137 let mut total_bytes_recv = 0;
2138 let mut buf = [0, .. 2048];
2139 while total_bytes_recv < MAX {
2140 // ask for more
2141 assert!(client_out.sendto([1], server_in_addr).is_ok());
2142 // wait for data
2143 let res = client_in.recvfrom(buf);
2144 assert!(res.is_ok());
2145 let (nread, src) = res.unwrap();
2146 assert_eq!(src, server_out_addr);
2147 total_bytes_recv += nread;
2148 for i in range(0u, nread) {
2149 assert_eq!(buf[i], 1);
2150 }
2151 }
2152 // tell the server we're done
2153 assert!(client_out.sendto([0], server_in_addr).is_ok());
2154 }
2155 }
2156 }
2157 }
2158
2159 #[test]
2160 fn test_timer_sleep_simple() {
2161 do run_in_mt_newsched_task {
2162 unsafe {
2163 let io: *mut IoFactoryObject = Local::unsafe_borrow();
2164 let timer = (*io).timer_init();
2165 do timer.map_move |mut t| { t.sleep(1) };
2166 }
2167 }
2168 }
2169
2170 fn file_test_uvio_full_simple_impl() {
2171 use str::StrSlice; // why does this have to be explicitly imported to work?
2172 // compiler was complaining about no trait for str that
2173 // does .as_bytes() ..
2174 use path::Path;
2175 use rt::io::{Open, Create, ReadWrite, Read};
2176 unsafe {
2177 let io: *mut IoFactoryObject = Local::unsafe_borrow();
2178 let write_val = "hello uvio!";
2179 let path = "./tmp/file_test_uvio_full.txt";
2180 {
2181 let create_fm = Create;
2182 let create_fa = ReadWrite;
2183 let mut fd = (*io).fs_open(&Path::new(path), create_fm, create_fa).unwrap();
2184 let write_buf = write_val.as_bytes();
2185 fd.write(write_buf);
2186 }
2187 {
2188 let ro_fm = Open;
2189 let ro_fa = Read;
2190 let mut fd = (*io).fs_open(&Path::new(path), ro_fm, ro_fa).unwrap();
2191 let mut read_vec = [0, .. 1028];
2192 let nread = fd.read(read_vec).unwrap();
2193 let read_val = str::from_utf8(read_vec.slice(0, nread as uint));
2194 assert!(read_val == write_val.to_owned());
2195 }
2196 (*io).fs_unlink(&Path::new(path));
2197 }
2198 }
2199
2200 #[test]
2201 fn file_test_uvio_full_simple() {
2202 do run_in_mt_newsched_task {
2203 file_test_uvio_full_simple_impl();
2204 }
2205 }
2206
2207 fn uvio_naive_print(input: &str) {
2208 use str::StrSlice;
2209 unsafe {
2210 use libc::{STDOUT_FILENO};
2211 let io: *mut IoFactoryObject = Local::unsafe_borrow();
2212 {
2213 let mut fd = (*io).fs_from_raw_fd(STDOUT_FILENO, false);
2214 let write_buf = input.as_bytes();
2215 fd.write(write_buf);
2216 }
2217 }
2218 }
2219
2220 #[test]
2221 fn file_test_uvio_write_to_stdout() {
2222 do run_in_mt_newsched_task {
2223 uvio_naive_print("jubilation\n");
2224 }
2225 }
libstd/rt/uv/uvio.rs:294:100-294:100 -struct- definition:
// The entire point of async is to call into a loop from other threads so it does not need to home.
pub struct UvRemoteCallback {
references:-354: impl Drop for UvRemoteCallback {
350: impl RemoteCallback for UvRemoteCallback {
343: UvRemoteCallback {
304: pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
357: let this: &mut UvRemoteCallback = cast::transmute_mut(self);
303: impl UvRemoteCallback {
libstd/rt/rtio.rs:
28: pub type RemoteCallbackObject = uvio::UvRemoteCallback;
libstd/rt/uv/uvio.rs:1141:1-1141:1 -struct- definition:
pub struct UvUdpSocket {
references:-524: Ok(~UvUdpSocket { watcher: watcher, home: home })
1165: impl RtioSocket for UvUdpSocket {
1147: impl HomingIO for UvUdpSocket {
1173: impl RtioUdpSocket for UvUdpSocket {
1151: impl Drop for UvUdpSocket {
libstd/rt/rtio.rs:
33: pub type RtioUdpSocketObject = uvio::UvUdpSocket;
libstd/rt/uv/uvio.rs:961:1-961:1 -fn- definition:
fn write_stream(mut watcher: StreamWatcher,
references:-1037: write_stream(self_.pipe.as_stream(), scheduler, buf)
1082: write_stream(self_.watcher.as_stream(), scheduler, buf)
libstd/rt/uv/uvio.rs:802:1-802:1 -struct- definition:
pub struct UvTcpListener {
references:-812: impl UvTcpListener {
814: UvTcpListener { watcher: watcher, home: home }
813: fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
832: impl RtioSocket for UvTcpListener {
840: impl RtioTcpListener for UvTcpListener {
818: impl Drop for UvTcpListener {
866: listener: UvTcpListener,
808: impl HomingIO for UvTcpListener {
875: fn new(listener: UvTcpListener) -> UvTcpAcceptor {
libstd/rt/rtio.rs:
32: pub type RtioTcpListenerObject = uvio::UvTcpListener;
libstd/rt/uv/uvio.rs:416:20-416:20 -fn- definition:
/// have no ret val
fn uv_fs_helper<P: PathLike>(loop_: &mut Loop, path: &P,
references:-690: do uv_fs_helper(self.uv_loop(), path) |mkdir_req, l, p, cb| {
697: do uv_fs_helper(self.uv_loop(), path) |rmdir_req, l, p, cb| {
610: do uv_fs_helper(self.uv_loop(), path) |unlink_req, l, p, cb| {
libstd/rt/uv/uvio.rs:1018:1-1018:1 -struct- definition:
pub struct UvPipeStream {
references:-1024: pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream {
1023: impl UvPipeStream {
1015: UvPipeStream { inner: self }
1025: UvPipeStream { inner: inner }
1014: pub unsafe fn bind(~self) -> UvPipeStream {
1029: impl RtioPipe for UvPipeStream {
libstd/rt/uv/process.rs:
147: io: StdioContainer) -> Option<UvPipeStream> {
47: -> Result<~[Option<UvPipeStream>], uv::UvError>
libstd/rt/rtio.rs:
36: pub type RtioPipeObject = uvio::UvPipeStream;
libstd/rt/uv/uvio.rs:921:1-921:1 -fn- definition:
fn read_stream(mut watcher: StreamWatcher,
references:-1032: read_stream(self_.pipe.as_stream(), scheduler, buf)
1076: read_stream(self_.watcher.as_stream(), scheduler, buf)
libstd/rt/uv/uvio.rs:56:1-56:1 -trait- definition:
trait HomingIO {
references:-1047: impl HomingIO for UvTcpStream {
1357: impl HomingIO for UvTimer {
1406: impl HomingIO for UvFileStream {
112: HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
89: fn restore_original_home(_dummy_self: Option<Self>, old: SchedHome) {
1147: impl HomingIO for UvUdpSocket {
1548: impl HomingIO for UvProcess {
994: impl HomingIO for UvUnboundPipe {
124: fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
120: HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
109: fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
130: HomingIO::restore_original_home(None::<Self> /* XXX dummy self */, home);
808: impl HomingIO for UvTcpListener {
870: impl HomingIO for UvTcpAcceptor {
116: fn home_for_io_consume<A>(self, io: &fn(Self) -> A) -> A {
libstd/rt/uv/uvio.rs:181:43-181:43 -struct- definition:
// Obviously an Event Loop is always home.
pub struct UvEventLoop {
references:-186: impl UvEventLoop {
188: UvEventLoop {
194: impl Drop for UvEventLoop {
200: impl EventLoop for UvEventLoop {
187: pub fn new() -> UvEventLoop {
libstd/rt/rtio.rs:
27: pub type EventLoopObject = uvio::UvEventLoop;
libstd/rt/uv/uvio.rs:1351:1-1351:1 -struct- definition:
pub struct UvTimer {
references:-1357: impl HomingIO for UvTimer {
1367: impl Drop for UvTimer {
1362: fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
1361: impl UvTimer {
1363: UvTimer { watcher: w, home: home }
1382: impl RtioTimer for UvTimer {
libstd/rt/rtio.rs:
34: pub type RtioTimerObject = uvio::UvTimer;
libstd/rt/uv/uvio.rs:241:1-241:1 -struct- definition:
pub struct UvPausibleIdleCallback {
references:-248: impl UvPausibleIdleCallback {
217: return ~UvPausibleIdleCallback {
libstd/rt/rtio.rs:
35: pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;
libstd/rt/uv/uvio.rs:1531:1-1531:1 -struct- definition:
pub struct UvProcess {
references:-1554: let close = |self_: &mut UvProcess| {
1552: impl Drop for UvProcess {
1548: impl HomingIO for UvProcess {
764: *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
755: let mut ret = ~UvProcess {
1575: impl RtioProcess for UvProcess {
764: *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
libstd/rt/rtio.rs:
38: pub type RtioProcessObject = uvio::UvProcess;
libstd/rt/uv/uvio.rs:988:1-988:1 -struct- definition:
pub struct UvUnboundPipe {
references:-746: Ok(~UvUnboundPipe { pipe: Pipe::new(self.uv_loop(), ipc), home: home })
998: impl Drop for UvUnboundPipe {
1020: priv inner: ~UvUnboundPipe,
1024: pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream {
1013: impl UvUnboundPipe {
994: impl HomingIO for UvUnboundPipe {
libstd/rt/rtio.rs:
37: pub type RtioUnboundPipeObject = uvio::UvUnboundPipe;
libstd/rt/uv/uvio.rs:406:1-406:1 -struct- definition:
pub struct UvIoFactory(Loop);
references:-409: impl UvIoFactory {
183: uvio: UvIoFactory
445: impl IoFactory for UvIoFactory {
libstd/rt/rtio.rs:
29: pub type IoFactoryObject = uvio::UvIoFactory;
libstd/rt/uv/uvio.rs:864:1-864:1 -struct- definition:
pub struct UvTcpAcceptor {
references:-870: impl HomingIO for UvTcpAcceptor {
875: fn new(listener: UvTcpListener) -> UvTcpAcceptor {
874: impl UvTcpAcceptor {
880: impl RtioSocket for UvTcpAcceptor {
888: impl RtioTcpAcceptor for UvTcpAcceptor {
876: UvTcpAcceptor { listener: listener, incoming: Tube::new() }
libstd/rt/rtio.rs:
31: pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor;
libstd/rt/uv/uvio.rs:139:1-139:1 -enum- definition:
enum SocketNameKind {
references:-146: fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
libstd/rt/uv/uvio.rs:1041:1-1041:1 -struct- definition:
pub struct UvTcpStream {
references:-1073: impl RtioTcpStream for UvTcpStream {
469: let res = Ok(~UvTcpStream { watcher: tcp, home: home });
1047: impl HomingIO for UvTcpStream {
1065: impl RtioSocket for UvTcpStream {
854: Ok(~UvTcpStream { watcher: inc, home: home })
1051: impl Drop for UvTcpStream {
libstd/rt/rtio.rs:
30: pub type RtioTcpStreamObject = uvio::UvTcpStream;
libstd/rt/uv/uvio.rs:1398:1-1398:1 -struct- definition:
pub struct UvFileStream {
references:-1482: impl Drop for UvFileStream {
1524: let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
1413: UvFileStream {
1412: home: SchedHandle) -> UvFileStream {
1524: let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
1406: impl HomingIO for UvFileStream {
1499: impl RtioFileStream for UvFileStream {
1410: impl UvFileStream {
libstd/rt/uv/uvio.rs:145:1-145:1 -fn- definition:
fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
references:-883: socket_name(Tcp, self_.listener.watcher)
1068: socket_name(Tcp, self_.watcher)
835: socket_name(Tcp, self_.watcher)
1168: socket_name(Udp, self_.watcher)
1088: socket_name(TcpPeer, self_.watcher)