(index<- ) ./librustuv/timeout.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use libc::c_int;
12 use std::cast;
13 use std::io::IoResult;
14 use std::mem;
15 use std::rt::task::BlockedTask;
16
17 use access;
18 use homing::{HomeHandle, HomingMissile, HomingIO};
19 use timer::TimerWatcher;
20 use uvll;
21 use uvio::UvIoFactory;
22 use {Loop, UvError, uv_error_to_io_error, Request, wakeup};
23 use {UvHandle, wait_until_woken_after};
24
25 /// Managment of a timeout when gaining access to a portion of a duplex stream.
26 pub struct AccessTimeout {
27 state: TimeoutState,
28 timer: Option<Box<TimerWatcher>>,
29 pub access: access::Access,
30 }
31
32 pub struct Guard<'a> {
33 state: &'a mut TimeoutState,
34 pub access: access::Guard<'a>,
35 pub can_timeout: bool,
36 }
37
38 #[deriving(Eq)]
39 enum TimeoutState {
40 NoTimeout,
41 TimeoutPending(ClientState),
42 TimedOut,
43 }
44
45 #[deriving(Eq)]
46 enum ClientState {
47 NoWaiter,
48 AccessPending,
49 RequestPending,
50 }
51
52 struct TimerContext {
53 timeout: *mut AccessTimeout,
54 callback: fn(uint) -> Option<BlockedTask>,
55 payload: uint,
56 }
57
58 impl AccessTimeout {
59 pub fn new() -> AccessTimeout {
60 AccessTimeout {
61 state: NoTimeout,
62 timer: None,
63 access: access::Access::new(),
64 }
65 }
66
67 /// Grants access to half of a duplex stream, timing out if necessary.
68 ///
69 /// On success, Ok(Guard) is returned and access has been granted to the
70 /// stream. If a timeout occurs, then Err is returned with an appropriate
71 /// error.
72 pub fn grant<'a>(&'a mut self, m: HomingMissile) -> IoResult<Guard<'a>> {
73 // First, flag that we're attempting to acquire access. This will allow
74 // us to cancel the pending grant if we timeout out while waiting for a
75 // grant.
76 match self.state {
77 NoTimeout => {},
78 TimeoutPending(ref mut client) => *client = AccessPending,
79 TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
80 }
81 let access = self.access.grant(self as *mut _ as uint, m);
82
83 // After acquiring the grant, we need to flag ourselves as having a
84 // pending request so the timeout knows to cancel the request.
85 let can_timeout = match self.state {
86 NoTimeout => false,
87 TimeoutPending(ref mut client) => { *client = RequestPending; true }
88 TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
89 };
90
91 Ok(Guard {
92 access: access,
93 state: &mut self.state,
94 can_timeout: can_timeout
95 })
96 }
97
98 /// Sets the pending timeout to the value specified.
99 ///
100 /// The home/loop variables are used to construct a timer if one has not
101 /// been previously constructed.
102 ///
103 /// The callback will be invoked if the timeout elapses, and the data of
104 /// the time will be set to `data`.
105 pub fn set_timeout(&mut self, ms: Option<u64>,
106 home: &HomeHandle,
107 loop_: &Loop,
108 cb: fn(uint) -> Option<BlockedTask>,
109 data: uint) {
110 self.state = NoTimeout;
111 let ms = match ms {
112 Some(ms) => ms,
113 None => return match self.timer {
114 Some(ref mut t) => t.stop(),
115 None => {}
116 }
117 };
118
119 // If we have a timeout, lazily initialize the timer which will be used
120 // to fire when the timeout runs out.
121 if self.timer.is_none() {
122 let mut timer = box TimerWatcher::new_home(loop_, home.clone());
123 let cx = box TimerContext {
124 timeout: self as *mut _,
125 callback: cb,
126 payload: data,
127 };
128 unsafe {
129 timer.set_data(&*cx);
130 cast::forget(cx);
131 }
132 self.timer = Some(timer);
133 }
134
135 let timer = self.timer.get_mut_ref();
136 unsafe {
137 let cx = uvll::get_data_for_uv_handle(timer.handle);
138 let cx = cx as *mut TimerContext;
139 (*cx).callback = cb;
140 (*cx).payload = data;
141 }
142 timer.stop();
143 timer.start(timer_cb, ms, 0);
144 self.state = TimeoutPending(NoWaiter);
145
146 extern fn timer_cb(timer: *uvll::uv_timer_t) {
147 let cx: &TimerContext = unsafe {
148 &*(uvll::get_data_for_uv_handle(timer) as *TimerContext)
149 };
150 let me = unsafe { &mut *cx.timeout };
151
152 match mem::replace(&mut me.state, TimedOut) {
153 TimedOut | NoTimeout => unreachable!(),
154 TimeoutPending(NoWaiter) => {}
155 TimeoutPending(AccessPending) => {
156 match unsafe { me.access.dequeue(me as *mut _ as uint) } {
157 Some(task) => task.reawaken(),
158 None => unreachable!(),
159 }
160 }
161 TimeoutPending(RequestPending) => {
162 match (cx.callback)(cx.payload) {
163 Some(task) => task.reawaken(),
164 None => unreachable!(),
165 }
166 }
167 }
168 }
169 }
170 }
171
172 impl Clone for AccessTimeout {
173 fn clone(&self) -> AccessTimeout {
174 AccessTimeout {
175 access: self.access.clone(),
176 state: NoTimeout,
177 timer: None,
178 }
179 }
180 }
181
182 #[unsafe_destructor]
183 impl<'a> Drop for Guard<'a> {
184 fn drop(&mut self) {
185 match *self.state {
186 TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) =>
187 unreachable!(),
188
189 NoTimeout | TimedOut => {}
190 TimeoutPending(RequestPending) => {
191 *self.state = TimeoutPending(NoWaiter);
192 }
193 }
194 }
195 }
196
197 impl Drop for AccessTimeout {
198 fn drop(&mut self) {
199 match self.timer {
200 Some(ref timer) => unsafe {
201 let data = uvll::get_data_for_uv_handle(timer.handle);
202 let _data: Box<TimerContext> = cast::transmute(data);
203 },
204 None => {}
205 }
206 }
207 }
208
209 ////////////////////////////////////////////////////////////////////////////////
210 // Connect timeouts
211 ////////////////////////////////////////////////////////////////////////////////
212
213 pub struct ConnectCtx {
214 pub status: c_int,
215 pub task: Option<BlockedTask>,
216 pub timer: Option<Box<TimerWatcher>>,
217 }
218
219 pub struct AcceptTimeout {
220 timer: Option<TimerWatcher>,
221 timeout_tx: Option<Sender<()>>,
222 timeout_rx: Option<Receiver<()>>,
223 }
224
225 impl ConnectCtx {
226 pub fn connect<T>(
227 mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
228 f: |&Request, &T, uvll::uv_connect_cb| -> c_int
229 ) -> Result<T, UvError> {
230 let mut req = Request::new(uvll::UV_CONNECT);
231 let r = f(&req, &obj, connect_cb);
232 return match r {
233 0 => {
234 req.defuse(); // uv callback now owns this request
235 match timeout {
236 Some(t) => {
237 let mut timer = TimerWatcher::new(io);
238 timer.start(timer_cb, t, 0);
239 self.timer = Some(timer);
240 }
241 None => {}
242 }
243 wait_until_woken_after(&mut self.task, &io.loop_, || {
244 let data = &self as *_;
245 match self.timer {
246 Some(ref mut timer) => unsafe { timer.set_data(data) },
247 None => {}
248 }
249 req.set_data(data);
250 });
251 // Make sure an erroneously fired callback doesn't have access
252 // to the context any more.
253 req.set_data(0 as *int);
254
255 // If we failed because of a timeout, drop the TcpWatcher as
256 // soon as possible because it's data is now set to null and we
257 // want to cancel the callback ASAP.
258 match self.status {
259 0 => Ok(obj),
260 n => { drop(obj); Err(UvError(n)) }
261 }
262 }
263 n => Err(UvError(n))
264 };
265
266 extern fn timer_cb(handle: *uvll::uv_timer_t) {
267 // Don't close the corresponding tcp request, just wake up the task
268 // and let RAII take care of the pending watcher.
269 let cx: &mut ConnectCtx = unsafe {
270 &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
271 };
272 cx.status = uvll::ECANCELED;
273 wakeup(&mut cx.task);
274 }
275
276 extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
277 // This callback can be invoked with ECANCELED if the watcher is
278 // closed by the timeout callback. In that case we just want to free
279 // the request and be along our merry way.
280 let req = Request::wrap(req);
281 if status == uvll::ECANCELED { return }
282
283 // Apparently on windows when the handle is closed this callback may
284 // not be invoked with ECANCELED but rather another error code.
285 // Either ways, if the data is null, then our timeout has expired
286 // and there's nothing we can do.
287 let data = unsafe { uvll::get_data_for_req(req.handle) };
288 if data.is_null() { return }
289
290 let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
291 cx.status = status;
292 match cx.timer {
293 Some(ref mut t) => t.stop(),
294 None => {}
295 }
296 // Note that the timer callback doesn't cancel the connect request
297 // (that's the job of uv_close()), so it's possible for this
298 // callback to get triggered after the timeout callback fires, but
299 // before the task wakes up. In that case, we did indeed
300 // successfully connect, but we don't need to wake someone up. We
301 // updated the status above (correctly so), and the task will pick
302 // up on this when it wakes up.
303 if cx.task.is_some() {
304 wakeup(&mut cx.task);
305 }
306 }
307 }
308 }
309
310 impl AcceptTimeout {
311 pub fn new() -> AcceptTimeout {
312 AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
313 }
314
315 pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
316 match self.timeout_rx {
317 None => c.recv(),
318 Some(ref rx) => {
319 use std::comm::Select;
320
321 // Poll the incoming channel first (don't rely on the order of
322 // select just yet). If someone's pending then we should return
323 // them immediately.
324 match c.try_recv() {
325 Ok(data) => return data,
326 Err(..) => {}
327 }
328
329 // Use select to figure out which channel gets ready first. We
330 // do some custom handling of select to ensure that we never
331 // actually drain the timeout channel (we'll keep seeing the
332 // timeout message in the future).
333 let s = Select::new();
334 let mut timeout = s.handle(rx);
335 let mut data = s.handle(c);
336 unsafe {
337 timeout.add();
338 data.add();
339 }
340 if s.wait() == timeout.id() {
341 Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
342 } else {
343 c.recv()
344 }
345 }
346 }
347 }
348
349 pub fn clear(&mut self) {
350 match self.timeout_rx {
351 Some(ref t) => { let _ = t.try_recv(); }
352 None => {}
353 }
354 match self.timer {
355 Some(ref mut t) => t.stop(),
356 None => {}
357 }
358 }
359
360 pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
361 &mut self, ms: u64, t: &mut T
362 ) {
363 // If we have a timeout, lazily initialize the timer which will be used
364 // to fire when the timeout runs out.
365 if self.timer.is_none() {
366 let loop_ = Loop::wrap(unsafe {
367 uvll::get_loop_for_uv_handle(t.uv_handle())
368 });
369 let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
370 unsafe {
371 timer.set_data(self as *mut _ as *AcceptTimeout);
372 }
373 self.timer = Some(timer);
374 }
375
376 // Once we've got a timer, stop any previous timeout, reset it for the
377 // current one, and install some new channels to send/receive data on
378 let timer = self.timer.get_mut_ref();
379 timer.stop();
380 timer.start(timer_cb, ms, 0);
381 let (tx, rx) = channel();
382 self.timeout_tx = Some(tx);
383 self.timeout_rx = Some(rx);
384
385 extern fn timer_cb(timer: *uvll::uv_timer_t) {
386 let acceptor: &mut AcceptTimeout = unsafe {
387 &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
388 };
389 // This send can never fail because if this timer is active then the
390 // receiving channel is guaranteed to be alive
391 acceptor.timeout_tx.get_ref().send(());
392 }
393 }
394 }
librustuv/timeout.rs:45:16-45:16 -enum- definition:
enum ClientState {
NoWaiter,
AccessPending,
references:- 446: enum ClientState {
librustuv/timeout.rs:25:80-25:80 -struct- definition:
/// Managment of a timeout when gaining access to a portion of a duplex stream.
pub struct AccessTimeout {
state: TimeoutState,
references:- 1459: pub fn new() -> AccessTimeout {
60: AccessTimeout {
61: state: NoTimeout,
--
173: fn clone(&self) -> AccessTimeout {
174: AccessTimeout {
175: access: self.access.clone(),
--
197: impl Drop for AccessTimeout {
198: fn drop(&mut self) {
librustuv/net.rs:
486: read_access: AccessTimeout,
487: write_access: AccessTimeout,
librustuv/pipe.rs:
34: // see comments in TcpWatcher for why these exist
35: write_access: AccessTimeout,
36: read_access: AccessTimeout,
37: }
librustuv/timeout.rs:212:1-212:1 -struct- definition:
pub struct ConnectCtx {
pub status: c_int,
pub task: Option<BlockedTask>,
references:- 7librustuv/net.rs:
206: let tcp = TcpWatcher::new(io);
207: let cx = ConnectCtx { status: -1, task: None, timer: None };
208: let (addr, _len) = addr_to_sockaddr(address);
librustuv/timeout.rs:
290: let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
291: cx.status = status;
librustuv/pipe.rs:
93: let pipe = PipeWatcher::new(io, false);
94: let cx = ConnectCtx { status: -1, task: None, timer: None };
95: cx.connect(pipe, timeout, io, |req, pipe, cb| {
librustuv/timeout.rs:38:16-38:16 -enum- definition:
enum TimeoutState {
NoTimeout,
TimeoutPending(ClientState),
references:- 526: pub struct AccessTimeout {
27: state: TimeoutState,
28: timer: Option<Box<TimerWatcher>>,
--
39: enum TimeoutState {
librustuv/timeout.rs:218:1-218:1 -struct- definition:
pub struct AcceptTimeout {
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
references:- 8311: pub fn new() -> AcceptTimeout {
312: AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
313: }
--
385: extern fn timer_cb(timer: *uvll::uv_timer_t) {
386: let acceptor: &mut AcceptTimeout = unsafe {
387: &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
388: };
librustuv/net.rs:
176: listener: Box<TcpListener>,
177: timeout: AcceptTimeout,
178: }
librustuv/pipe.rs:
47: listener: Box<PipeListener>,
48: timeout: AcceptTimeout,
49: }
librustuv/timeout.rs:
370: unsafe {
371: timer.set_data(self as *mut _ as *AcceptTimeout);
372: }
librustuv/timeout.rs:51:1-51:1 -struct- definition:
struct TimerContext {
timeout: *mut AccessTimeout,
callback: fn(uint) -> Option<BlockedTask>,
references:- 5146: extern fn timer_cb(timer: *uvll::uv_timer_t) {
147: let cx: &TimerContext = unsafe {
148: &*(uvll::get_data_for_uv_handle(timer) as *TimerContext)
--
201: let data = uvll::get_data_for_uv_handle(timer.handle);
202: let _data: Box<TimerContext> = cast::transmute(data);
203: },
librustuv/timeout.rs:31:1-31:1 -struct- definition:
pub struct Guard<'a> {
state: &'a mut TimeoutState,
pub access: access::Guard<'a>,
references:- 391: Ok(Guard {
92: access: access,
--
183: impl<'a> Drop for Guard<'a> {
184: fn drop(&mut self) {