(index<- ) ./librustuv/homing.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Homing I/O implementation
12 //!
13 //! In libuv, whenever a handle is created on an I/O loop it is illegal to use
14 //! that handle outside of that I/O loop. We use libuv I/O with our green
15 //! scheduler, and each green scheduler corresponds to a different I/O loop on a
16 //! different OS thread. Green tasks are also free to roam among schedulers,
17 //! which implies that it is possible to create an I/O handle on one event loop
18 //! and then attempt to use it on another.
19 //!
20 //! In order to solve this problem, this module implements the notion of a
21 //! "homing operation" which will transplant a task from its currently running
22 //! scheduler back onto the original I/O loop. This is accomplished entirely at
23 //! the librustuv layer with very little cooperation from the scheduler (which
24 //! we don't even know exists technically).
25 //!
26 //! These homing operations are completed by first realizing that we're on the
27 //! wrong I/O loop, then descheduling ourselves, sending ourselves to the
28 //! correct I/O loop, and then waking up the I/O loop in order to process its
29 //! local queue of tasks which need to run.
30 //!
31 //! This enqueueing is done with a concurrent queue from libstd, and the
32 //! signalling is achieved with an async handle.
33
34 #![allow(dead_code)]
35
36 use std::cast;
37 use std::rt::local::Local;
38 use std::rt::rtio::LocalIo;
39 use std::rt::task::{Task, BlockedTask};
40
41 use ForbidUnwind;
42 use queue::{Queue, QueuePool};
43
44 /// A handle to a remote libuv event loop. This handle will keep the event loop
45 /// alive while active in order to ensure that a homing operation can always be
46 /// completed.
47 ///
48 /// Handles are clone-able in order to derive new handles from existing handles
49 /// (very useful for when accepting a socket from a server).
50 pub struct HomeHandle {
51 queue: Queue,
52 id: uint,
53 }
54
55 impl HomeHandle {
56 pub fn new(id: uint, pool: &mut QueuePool) -> HomeHandle {
57 HomeHandle { queue: pool.queue(), id: id }
58 }
59
60 fn send(&mut self, task: BlockedTask) {
61 self.queue.push(task);
62 }
63 }
64
65 impl Clone for HomeHandle {
66 fn clone(&self) -> HomeHandle {
67 HomeHandle {
68 queue: self.queue.clone(),
69 id: self.id,
70 }
71 }
72 }
73
74 pub fn local_id() -> uint {
75 let mut io = match LocalIo::borrow() {
76 Some(io) => io, None => return 0,
77 };
78 let io = io.get();
79 unsafe {
80 let (_vtable, ptr): (uint, uint) = cast::transmute(io);
81 return ptr;
82 }
83 }
84
85 pub trait HomingIO {
86 fn home<'r>(&'r mut self) -> &'r mut HomeHandle;
87
88 /// This function will move tasks to run on their home I/O scheduler. Note
89 /// that this function does *not* pin the task to the I/O scheduler, but
90 /// rather it simply moves it to running on the I/O scheduler.
91 fn go_to_IO_home(&mut self) -> uint {
92 let _f = ForbidUnwind::new("going home");
93
94 let cur_loop_id = local_id();
95 let destination = self.home().id;
96
97 // Try at all costs to avoid the homing operation because it is quite
98 // expensive. Hence, we only deschedule/send if we're not on the correct
99 // event loop. If we're already on the home event loop, then we're good
100 // to go (remember we have no preemption, so we're guaranteed to stay on
101 // this event loop as long as we avoid the scheduler).
102 if cur_loop_id != destination {
103 let cur_task: Box<Task> = Local::take();
104 cur_task.deschedule(1, |task| {
105 self.home().send(task);
106 Ok(())
107 });
108
109 // Once we wake up, assert that we're in the right location
110 assert_eq!(local_id(), destination);
111 }
112
113 return destination;
114 }
115
116 /// Fires a single homing missile, returning another missile targeted back
117 /// at the original home of this task. In other words, this function will
118 /// move the local task to its I/O scheduler and then return an RAII wrapper
119 /// which will return the task home.
120 fn fire_homing_missile(&mut self) -> HomingMissile {
121 HomingMissile { io_home: self.go_to_IO_home() }
122 }
123 }
124
125 /// After a homing operation has been completed, this will return the current
126 /// task back to its appropriate home (if applicable). The field is used to
127 /// assert that we are where we think we are.
128 pub struct HomingMissile {
129 io_home: uint,
130 }
131
132 impl HomingMissile {
133 /// Check at runtime that the task has *not* transplanted itself to a
134 /// different I/O loop while executing.
135 pub fn check(&self, msg: &'static str) {
136 assert!(local_id() == self.io_home, "{}", msg);
137 }
138 }
139
140 impl Drop for HomingMissile {
141 fn drop(&mut self) {
142 let _f = ForbidUnwind::new("leaving home");
143
144 // It would truly be a sad day if we had moved off the home I/O
145 // scheduler while we were doing I/O.
146 self.check("task moved away from the home scheduler");
147 }
148 }
149
150 #[cfg(test)]
151 mod test {
152 use green::sched;
153 use green::{SchedPool, PoolConfig};
154 use std::rt::rtio::RtioUdpSocket;
155 use std::io::test::next_test_ip4;
156 use std::task::TaskOpts;
157
158 use net::UdpWatcher;
159 use super::super::local_loop;
160
161 // On one thread, create a udp socket. Then send that socket to another
162 // thread and destroy the socket on the remote thread. This should make sure
163 // that homing kicks in for the socket to go back home to the original
164 // thread, close itself, and then come back to the last thread.
165 #[test]
166 fn test_homing_closes_correctly() {
167 let (tx, rx) = channel();
168 let mut pool = SchedPool::new(PoolConfig {
169 threads: 1,
170 event_loop_factory: ::event_loop,
171 });
172
173 pool.spawn(TaskOpts::new(), proc() {
174 let listener = UdpWatcher::bind(local_loop(), next_test_ip4());
175 tx.send(listener.unwrap());
176 });
177
178 let task = pool.task(TaskOpts::new(), proc() {
179 drop(rx.recv());
180 });
181 pool.spawn_sched().send(sched::TaskFromFriend(task));
182
183 pool.shutdown();
184 }
185
186 #[test]
187 fn test_homing_read() {
188 let (tx, rx) = channel();
189 let mut pool = SchedPool::new(PoolConfig {
190 threads: 1,
191 event_loop_factory: ::event_loop,
192 });
193
194 pool.spawn(TaskOpts::new(), proc() {
195 let addr1 = next_test_ip4();
196 let addr2 = next_test_ip4();
197 let listener = UdpWatcher::bind(local_loop(), addr2);
198 tx.send((listener.unwrap(), addr1));
199 let mut listener = UdpWatcher::bind(local_loop(), addr1).unwrap();
200 listener.sendto([1, 2, 3, 4], addr2).unwrap();
201 });
202
203 let task = pool.task(TaskOpts::new(), proc() {
204 let (mut watcher, addr) = rx.recv();
205 let mut buf = [0, ..10];
206 assert_eq!(watcher.recvfrom(buf).unwrap(), (4, addr));
207 });
208 pool.spawn_sched().send(sched::TaskFromFriend(task));
209
210 pool.shutdown();
211 }
212 }
librustuv/homing.rs:49:61-49:61 -struct- definition:
/// (very useful for when accepting a socket from a server).
pub struct HomeHandle {
queue: Queue,
references:- 34librustuv/uvio.rs:
librustuv/file.rs:
librustuv/net.rs:
librustuv/timer.rs:
librustuv/process.rs:
librustuv/pipe.rs:
librustuv/tty.rs:
librustuv/signal.rs:
librustuv/timeout.rs:
librustuv/file.rs:
librustuv/homing.rs:84:1-84:1 -trait- definition:
pub trait HomingIO {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle;
/// This function will move tasks to run on their home I/O scheduler. Note
references:- 13librustuv/file.rs:
355: impl HomingIO for FileWatcher {
356: fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
librustuv/net.rs:
379: impl HomingIO for TcpListener {
380: fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
--
534: impl HomingIO for UdpWatcher {
535: fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
librustuv/process.rs:
203: impl HomingIO for Process {
204: fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
librustuv/pipe.rs:
206: impl HomingIO for PipeWatcher {
207: fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
--
268: impl HomingIO for PipeListener {
269: fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
--
316: impl HomingIO for PipeAcceptor {
317: fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.listener.home }
librustuv/tty.rs:
126: impl HomingIO for TtyWatcher {
127: fn home<'a>(&'a mut self) -> &'a mut HomeHandle { &mut self.home }
librustuv/signal.rs:
57: impl HomingIO for SignalWatcher {
58: fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
librustuv/timeout.rs:
360: pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
361: &mut self, ms: u64, t: &mut T
librustuv/timer.rs:
68: impl HomingIO for TimerWatcher {
69: fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
librustuv/homing.rs:73:1-73:1 -fn- definition:
pub fn local_id() -> uint {
let mut io = match LocalIo::borrow() {
Some(io) => io, None => return 0,
references:- 594: let cur_loop_id = local_id();
95: let destination = self.home().id;
--
135: pub fn check(&self, msg: &'static str) {
136: assert!(local_id() == self.io_home, "{}", msg);
137: }
librustuv/lib.rs:
206: msg: s,
207: io: homing::local_id(),
208: }
--
213: fn drop(&mut self) {
214: assert!(self.io == homing::local_id(),
215: "didnt want a scheduler switch: {}",
librustuv/homing.rs:127:46-127:46 -struct- definition:
/// assert that we are where we think we are.
pub struct HomingMissile {
io_home: uint,
references:- 8120: fn fire_homing_missile(&mut self) -> HomingMissile {
121: HomingMissile { io_home: self.go_to_IO_home() }
122: }
--
140: impl Drop for HomingMissile {
141: fn drop(&mut self) {
librustuv/access.rs:
50: pub fn grant<'a>(&'a mut self, token: uint,
51: missile: HomingMissile) -> Guard<'a> {
52: // This unsafety is actually OK because the homing missile argument
--
71: pub fn close(&self, _missile: &HomingMissile) {
72: // This unsafety is OK because with a homing missile we're guaranteed to
librustuv/timeout.rs:
71: /// error.
72: pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a>> {
73: // First, flag that we're attempting to acquire access. This will allow