(index<- )        ./librustuv/timeout.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  use libc::c_int;
  12  use std::cast;
  13  use std::io::IoResult;
  14  use std::mem;
  15  use std::rt::task::BlockedTask;
  16  
  17  use access;
  18  use homing::{HomeHandle, HomingMissile, HomingIO};
  19  use timer::TimerWatcher;
  20  use uvll;
  21  use uvio::UvIoFactory;
  22  use {Loop, UvError, uv_error_to_io_error, Request, wakeup};
  23  use {UvHandle, wait_until_woken_after};
  24  
  25  /// Managment of a timeout when gaining access to a portion of a duplex stream.
  26  pub struct AccessTimeout {
  27      state: TimeoutState,
  28      timer: Option<Box<TimerWatcher>>,
  29      pub access: access::Access,
  30  }
  31  
  32  pub struct Guard<'a> {
  33      state: &'a mut TimeoutState,
  34      pub access: access::Guard<'a>,
  35      pub can_timeout: bool,
  36  }
  37  
  38  #[deriving(Eq)]
  39  enum TimeoutState {
  40      NoTimeout,
  41      TimeoutPending(ClientState),
  42      TimedOut,
  43  }
  44  
  45  #[deriving(Eq)]
  46  enum ClientState {
  47      NoWaiter,
  48      AccessPending,
  49      RequestPending,
  50  }
  51  
  52  struct TimerContext {
  53      timeout: *mut AccessTimeout,
  54      callback: fn(uint) -> Option<BlockedTask>,
  55      payload: uint,
  56  }
  57  
  58  impl AccessTimeout {
  59      pub fn new() -> AccessTimeout {
  60          AccessTimeout {
  61              state: NoTimeout,
  62              timer: None,
  63              access: access::Access::new(),
  64          }
  65      }
  66  
  67      /// Grants access to half of a duplex stream, timing out if necessary.
  68      ///
  69      /// On success, Ok(Guard) is returned and access has been granted to the
  70      /// stream. If a timeout occurs, then Err is returned with an appropriate
  71      /// error.
  72      pub fn grant<'a>(&'a mut self, mHomingMissile) -> IoResult<Guard<'a>> {
  73          // First, flag that we're attempting to acquire access. This will allow
  74          // us to cancel the pending grant if we timeout out while waiting for a
  75          // grant.
  76          match self.state {
  77              NoTimeout => {},
  78              TimeoutPending(ref mut client) => *client = AccessPending,
  79              TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
  80          }
  81          let access = self.access.grant(self as *mut _ as uint, m);
  82  
  83          // After acquiring the grant, we need to flag ourselves as having a
  84          // pending request so the timeout knows to cancel the request.
  85          let can_timeout = match self.state {
  86              NoTimeout => false,
  87              TimeoutPending(ref mut client) => { *client = RequestPending; true }
  88              TimedOut => return Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
  89          };
  90  
  91          Ok(Guard {
  92              access: access,
  93              state: &mut self.state,
  94              can_timeout: can_timeout
  95          })
  96      }
  97  
  98      /// Sets the pending timeout to the value specified.
  99      ///
 100      /// The home/loop variables are used to construct a timer if one has not
 101      /// been previously constructed.
 102      ///
 103      /// The callback will be invoked if the timeout elapses, and the data of
 104      /// the time will be set to `data`.
 105      pub fn set_timeout(&mut self, msOption<u64>,
 106                         home&HomeHandle,
 107                         loop_&Loop,
 108                         cbfn(uint) -> Option<BlockedTask>,
 109                         datauint) {
 110          self.state = NoTimeout;
 111          let ms = match ms {
 112              Some(ms) => ms,
 113              None => return match self.timer {
 114                  Some(ref mut t) => t.stop(),
 115                  None => {}
 116              }
 117          };
 118  
 119          // If we have a timeout, lazily initialize the timer which will be used
 120          // to fire when the timeout runs out.
 121          if self.timer.is_none() {
 122              let mut timer = box TimerWatcher::new_home(loop_, home.clone());
 123              let cx = box TimerContext {
 124                  timeout: self as *mut _,
 125                  callback: cb,
 126                  payload: data,
 127              };
 128              unsafe {
 129                  timer.set_data(&*cx);
 130                  cast::forget(cx);
 131              }
 132              self.timer = Some(timer);
 133          }
 134  
 135          let timer = self.timer.get_mut_ref();
 136          unsafe {
 137              let cx = uvll::get_data_for_uv_handle(timer.handle);
 138              let cx = cx as *mut TimerContext;
 139              (*cx).callback = cb;
 140              (*cx).payload = data;
 141          }
 142          timer.stop();
 143          timer.start(timer_cb, ms, 0);
 144          self.state = TimeoutPending(NoWaiter);
 145  
 146          extern fn timer_cb(timer*uvll::uv_timer_t) {
 147              let cx&TimerContext = unsafe {
 148                  &*(uvll::get_data_for_uv_handle(timer) as *TimerContext)
 149              };
 150              let me = unsafe { &mut *cx.timeout };
 151  
 152              match mem::replace(&mut me.state, TimedOut) {
 153                  TimedOut | NoTimeout => unreachable!(),
 154                  TimeoutPending(NoWaiter) => {}
 155                  TimeoutPending(AccessPending) => {
 156                      match unsafe { me.access.dequeue(me as *mut _ as uint) } {
 157                          Some(task) => task.reawaken(),
 158                          None => unreachable!(),
 159                      }
 160                  }
 161                  TimeoutPending(RequestPending) => {
 162                      match (cx.callback)(cx.payload) {
 163                          Some(task) => task.reawaken(),
 164                          None => unreachable!(),
 165                      }
 166                  }
 167              }
 168          }
 169      }
 170  }
 171  
 172  impl Clone for AccessTimeout {
 173      fn clone(&self) -> AccessTimeout {
 174          AccessTimeout {
 175              access: self.access.clone(),
 176              state: NoTimeout,
 177              timer: None,
 178          }
 179      }
 180  }
 181  
 182  #[unsafe_destructor]
 183  impl<'a> Drop for Guard<'a> {
 184      fn drop(&mut self) {
 185          match *self.state {
 186              TimeoutPending(NoWaiter) | TimeoutPending(AccessPending) =>
 187                  unreachable!(),
 188  
 189              NoTimeout | TimedOut => {}
 190              TimeoutPending(RequestPending) => {
 191                  *self.state = TimeoutPending(NoWaiter);
 192              }
 193          }
 194      }
 195  }
 196  
 197  impl Drop for AccessTimeout {
 198      fn drop(&mut self) {
 199          match self.timer {
 200              Some(ref timer) => unsafe {
 201                  let data = uvll::get_data_for_uv_handle(timer.handle);
 202                  let _dataBox<TimerContext> = cast::transmute(data);
 203              },
 204              None => {}
 205          }
 206      }
 207  }
 208  
 209  ////////////////////////////////////////////////////////////////////////////////
 210  // Connect timeouts
 211  ////////////////////////////////////////////////////////////////////////////////
 212  
 213  pub struct ConnectCtx {
 214      pub status: c_int,
 215      pub task: Option<BlockedTask>,
 216      pub timer: Option<Box<TimerWatcher>>,
 217  }
 218  
 219  pub struct AcceptTimeout {
 220      timer: Option<TimerWatcher>,
 221      timeout_tx: Option<Sender<()>>,
 222      timeout_rx: Option<Receiver<()>>,
 223  }
 224  
 225  impl ConnectCtx {
 226      pub fn connect<T>(
 227          mut self, objT, timeoutOption<u64>, io&mut UvIoFactory,
 228          f|&Request, &T, uvll::uv_connect_cb-> c_int
 229      ) -> Result<T, UvError> {
 230          let mut req = Request::new(uvll::UV_CONNECT);
 231          let r = f(&req, &obj, connect_cb);
 232          return match r {
 233              0 => {
 234                  req.defuse(); // uv callback now owns this request
 235                  match timeout {
 236                      Some(t) => {
 237                          let mut timer = TimerWatcher::new(io);
 238                          timer.start(timer_cb, t, 0);
 239                          self.timer = Some(timer);
 240                      }
 241                      None => {}
 242                  }
 243                  wait_until_woken_after(&mut self.task, &io.loop_, || {
 244                      let data = &self as *_;
 245                      match self.timer {
 246                          Some(ref mut timer) => unsafe { timer.set_data(data) },
 247                          None => {}
 248                      }
 249                      req.set_data(data);
 250                  });
 251                  // Make sure an erroneously fired callback doesn't have access
 252                  // to the context any more.
 253                  req.set_data(0 as *int);
 254  
 255                  // If we failed because of a timeout, drop the TcpWatcher as
 256                  // soon as possible because it's data is now set to null and we
 257                  // want to cancel the callback ASAP.
 258                  match self.status {
 259                      0 => Ok(obj),
 260                      n => { drop(obj); Err(UvError(n)) }
 261                  }
 262              }
 263              n => Err(UvError(n))
 264          };
 265  
 266          extern fn timer_cb(handle*uvll::uv_timer_t) {
 267              // Don't close the corresponding tcp request, just wake up the task
 268              // and let RAII take care of the pending watcher.
 269              let cx&mut ConnectCtx = unsafe {
 270                  &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
 271              };
 272              cx.status = uvll::ECANCELED;
 273              wakeup(&mut cx.task);
 274          }
 275  
 276          extern fn connect_cb(req*uvll::uv_connect_t, statusc_int) {
 277              // This callback can be invoked with ECANCELED if the watcher is
 278              // closed by the timeout callback. In that case we just want to free
 279              // the request and be along our merry way.
 280              let req = Request::wrap(req);
 281              if status == uvll::ECANCELED { return }
 282  
 283              // Apparently on windows when the handle is closed this callback may
 284              // not be invoked with ECANCELED but rather another error code.
 285              // Either ways, if the data is null, then our timeout has expired
 286              // and there's nothing we can do.
 287              let data = unsafe { uvll::get_data_for_req(req.handle) };
 288              if data.is_null() { return }
 289  
 290              let cx&mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
 291              cx.status = status;
 292              match cx.timer {
 293                  Some(ref mut t) => t.stop(),
 294                  None => {}
 295              }
 296              // Note that the timer callback doesn't cancel the connect request
 297              // (that's the job of uv_close()), so it's possible for this
 298              // callback to get triggered after the timeout callback fires, but
 299              // before the task wakes up. In that case, we did indeed
 300              // successfully connect, but we don't need to wake someone up. We
 301              // updated the status above (correctly so), and the task will pick
 302              // up on this when it wakes up.
 303              if cx.task.is_some() {
 304                  wakeup(&mut cx.task);
 305              }
 306          }
 307      }
 308  }
 309  
 310  impl AcceptTimeout {
 311      pub fn new() -> AcceptTimeout {
 312          AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
 313      }
 314  
 315      pub fn accept<T: Send>(&mut self, c&Receiver<IoResult<T>>) -> IoResult<T> {
 316          match self.timeout_rx {
 317              None => c.recv(),
 318              Some(ref rx) => {
 319                  use std::comm::Select;
 320  
 321                  // Poll the incoming channel first (don't rely on the order of
 322                  // select just yet). If someone's pending then we should return
 323                  // them immediately.
 324                  match c.try_recv() {
 325                      Ok(data) => return data,
 326                      Err(..) => {}
 327                  }
 328  
 329                  // Use select to figure out which channel gets ready first. We
 330                  // do some custom handling of select to ensure that we never
 331                  // actually drain the timeout channel (we'll keep seeing the
 332                  // timeout message in the future).
 333                  let s = Select::new();
 334                  let mut timeout = s.handle(rx);
 335                  let mut data = s.handle(c);
 336                  unsafe {
 337                      timeout.add();
 338                      data.add();
 339                  }
 340                  if s.wait() == timeout.id() {
 341                      Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
 342                  } else {
 343                      c.recv()
 344                  }
 345              }
 346          }
 347      }
 348  
 349      pub fn clear(&mut self) {
 350          match self.timeout_rx {
 351              Some(ref t) => { let _ = t.try_recv(); }
 352              None => {}
 353          }
 354          match self.timer {
 355              Some(ref mut t) => t.stop(),
 356              None => {}
 357          }
 358      }
 359  
 360      pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
 361          &mut self, msu64, t&mut T
 362      ) {
 363          // If we have a timeout, lazily initialize the timer which will be used
 364          // to fire when the timeout runs out.
 365          if self.timer.is_none() {
 366              let loop_ = Loop::wrap(unsafe {
 367                  uvll::get_loop_for_uv_handle(t.uv_handle())
 368              });
 369              let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
 370              unsafe {
 371                  timer.set_data(self as *mut _ as *AcceptTimeout);
 372              }
 373              self.timer = Some(timer);
 374          }
 375  
 376          // Once we've got a timer, stop any previous timeout, reset it for the
 377          // current one, and install some new channels to send/receive data on
 378          let timer = self.timer.get_mut_ref();
 379          timer.stop();
 380          timer.start(timer_cb, ms, 0);
 381          let (tx, rx) = channel();
 382          self.timeout_tx = Some(tx);
 383          self.timeout_rx = Some(rx);
 384  
 385          extern fn timer_cb(timer*uvll::uv_timer_t) {
 386              let acceptor&mut AcceptTimeout = unsafe {
 387                  &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
 388              };
 389              // This send can never fail because if this timer is active then the
 390              // receiving channel is guaranteed to be alive
 391              acceptor.timeout_tx.get_ref().send(());
 392          }
 393      }
 394  }


librustuv/timeout.rs:45:16-45:16 -enum- definition:
enum ClientState {
    NoWaiter,
    AccessPending,
references:- 4
46: enum ClientState {


librustuv/timeout.rs:25:80-25:80 -struct- definition:
/// Managment of a timeout when gaining access to a portion of a duplex stream.
pub struct AccessTimeout {
    state: TimeoutState,
references:- 14
59:     pub fn new() -> AccessTimeout {
60:         AccessTimeout {
61:             state: NoTimeout,
--
173:     fn clone(&self) -> AccessTimeout {
174:         AccessTimeout {
175:             access: self.access.clone(),
--
197: impl Drop for AccessTimeout {
198:     fn drop(&mut self) {
librustuv/net.rs:
486:     read_access: AccessTimeout,
487:     write_access: AccessTimeout,
librustuv/pipe.rs:
34:     // see comments in TcpWatcher for why these exist
35:     write_access: AccessTimeout,
36:     read_access: AccessTimeout,
37: }


librustuv/timeout.rs:212:1-212:1 -struct- definition:
pub struct ConnectCtx {
    pub status: c_int,
    pub task: Option<BlockedTask>,
references:- 7
librustuv/net.rs:
206:         let tcp = TcpWatcher::new(io);
207:         let cx = ConnectCtx { status: -1, task: None, timer: None };
208:         let (addr, _len) = addr_to_sockaddr(address);
librustuv/timeout.rs:
290:             let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
291:             cx.status = status;
librustuv/pipe.rs:
93:         let pipe = PipeWatcher::new(io, false);
94:         let cx = ConnectCtx { status: -1, task: None, timer: None };
95:         cx.connect(pipe, timeout, io, |req, pipe, cb| {


librustuv/timeout.rs:38:16-38:16 -enum- definition:
enum TimeoutState {
    NoTimeout,
    TimeoutPending(ClientState),
references:- 5
26: pub struct AccessTimeout {
27:     state: TimeoutState,
28:     timer: Option<Box<TimerWatcher>>,
--
39: enum TimeoutState {


librustuv/timeout.rs:218:1-218:1 -struct- definition:
pub struct AcceptTimeout {
    timer: Option<TimerWatcher>,
    timeout_tx: Option<Sender<()>>,
references:- 8
311:     pub fn new() -> AcceptTimeout {
312:         AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
313:     }
--
385:         extern fn timer_cb(timer: *uvll::uv_timer_t) {
386:             let acceptor: &mut AcceptTimeout = unsafe {
387:                 &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
388:             };
librustuv/net.rs:
176:     listener: Box<TcpListener>,
177:     timeout: AcceptTimeout,
178: }
librustuv/pipe.rs:
47:     listener: Box<PipeListener>,
48:     timeout: AcceptTimeout,
49: }
librustuv/timeout.rs:
370:             unsafe {
371:                 timer.set_data(self as *mut _ as *AcceptTimeout);
372:             }


librustuv/timeout.rs:51:1-51:1 -struct- definition:
struct TimerContext {
    timeout: *mut AccessTimeout,
    callback: fn(uint) -> Option<BlockedTask>,
references:- 5
146:         extern fn timer_cb(timer: *uvll::uv_timer_t) {
147:             let cx: &TimerContext = unsafe {
148:                 &*(uvll::get_data_for_uv_handle(timer) as *TimerContext)
--
201:                 let data = uvll::get_data_for_uv_handle(timer.handle);
202:                 let _data: Box<TimerContext> = cast::transmute(data);
203:             },


librustuv/timeout.rs:31:1-31:1 -struct- definition:
pub struct Guard<'a> {
    state: &'a mut TimeoutState,
    pub access: access::Guard<'a>,
references:- 3
91:         Ok(Guard {
92:             access: access,
--
183: impl<'a> Drop for Guard<'a> {
184:     fn drop(&mut self) {