(index<- )        ./libnative/io/timer_unix.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! Timers for non-linux/non-windows OSes
  12  //!
  13  //! This module implements timers with a worker thread, select(), and a lot of
  14  //! witchcraft that turns out to be horribly inaccurate timers. The unfortunate
  15  //! part is that I'm at a loss of what else to do one these OSes. This is also
  16  //! why linux has a specialized timerfd implementation and windows has its own
  17  //! implementation (they're more accurate than this one).
  18  //!
  19  //! The basic idea is that there is a worker thread that's communicated to via a
  20  //! channel and a pipe, the pipe is used by the worker thread in a select()
  21  //! syscall with a timeout. The timeout is the "next timer timeout" while the
  22  //! channel is used to send data over to the worker thread.
  23  //!
  24  //! Whenever the call to select() times out, then a channel receives a message.
  25  //! Whenever the call returns that the file descriptor has information, then the
  26  //! channel from timers is drained, enqueueing all incoming requests.
  27  //!
  28  //! The actual implementation of the helper thread is a sorted array of
  29  //! timers in terms of target firing date. The target is the absolute time at
  30  //! which the timer should fire. Timers are then re-enqueued after a firing if
  31  //! the repeat boolean is set.
  32  //!
  33  //! Naturally, all this logic of adding times and keeping track of
  34  //! relative/absolute time is a little lossy and not quite exact. I've done the
  35  //! best I could to reduce the amount of calls to 'now()', but there's likely
  36  //! still inaccuracies trickling in here and there.
  37  //!
  38  //! One of the tricky parts of this implementation is that whenever a timer is
  39  //! acted upon, it must cancel whatever the previous action was (if one is
  40  //! active) in order to act like the other implementations of this timer. In
  41  //! order to do this, the timer's inner pointer is transferred to the worker
  42  //! thread. Whenever the timer is modified, it first takes ownership back from
  43  //! the worker thread in order to modify the same data structure. This has the
  44  //! side effect of "cancelling" the previous requests while allowing a
  45  //! re-enqueueing later on.
  46  //!
  47  //! Note that all time units in this file are in *milliseconds*.
  48  
  49  use libc;
  50  use std::mem;
  51  use std::os;
  52  use std::ptr;
  53  use std::rt::rtio;
  54  use std::sync::atomics;
  55  
  56  use io::IoResult;
  57  use io::c;
  58  use io::file::FileDesc;
  59  use io::timer_helper;
  60  
  61  pub struct Timer {
  62      id: uint,
  63      inner: Option<Box<Inner>>,
  64  }
  65  
  66  struct Inner {
  67      tx: Option<Sender<()>>,
  68      interval: u64,
  69      repeat: bool,
  70      target: u64,
  71      id: uint,
  72  }
  73  
  74  #[allow(visible_private_types)]
  75  pub enum Req {
  76      // Add a new timer to the helper thread.
  77      NewTimer(Box<Inner>),
  78  
  79      // Remove a timer based on its id and then send it back on the channel
  80      // provided
  81      RemoveTimer(uint, Sender<Box<Inner>>),
  82  
  83      // Shut down the loop and then ACK this channel once it's shut down
  84      Shutdown,
  85  }
  86  
  87  // returns the current time (in milliseconds)
  88  pub fn now() -> u64 {
  89      unsafe {
  90          let mut nowlibc::timeval = mem::init();
  91          assert_eq!(c::gettimeofday(&mut now, ptr::null()), 0);
  92          return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000;
  93      }
  94  }
  95  
  96  fn helper(inputlibc::c_int, messagesReceiver<Req>) {
  97      let mut setc::fd_set = unsafe { mem::init() };
  98  
  99      let mut fd = FileDesc::new(input, true);
 100      let mut timeoutlibc::timeval = unsafe { mem::init() };
 101  
 102      // active timers are those which are able to be selected upon (and it's a
 103      // sorted list, and dead timers are those which have expired, but ownership
 104      // hasn't yet been transferred back to the timer itself.
 105      let mut activeVec<Box<Inner>> = vec![];
 106      let mut dead = vec![];
 107  
 108      // inserts a timer into an array of timers (sorted by firing time)
 109      fn insert(tBox<Inner>, active&mut Vec<Box<Inner>>) {
 110          match active.iter().position(|tm| tm.target > t.target) {
 111              Some(pos) => { active.insert(pos, t); }
 112              None => { active.push(t); }
 113          }
 114      }
 115  
 116      // signals the first requests in the queue, possible re-enqueueing it.
 117      fn signal(active&mut Vec<Box<Inner>>,
 118                dead&mut Vec<(uint, Box<Inner>)>) {
 119          let mut timer = match active.shift() {
 120              Some(timer) => timer, None => return
 121          };
 122          let tx = timer.tx.take_unwrap();
 123          if tx.send_opt(()).is_ok() && timer.repeat {
 124              timer.tx = Some(tx);
 125              timer.target += timer.interval;
 126              insert(timer, active);
 127          } else {
 128              drop(tx);
 129              dead.push((timer.id, timer));
 130          }
 131      }
 132  
 133      'outer: loop {
 134          let timeout = if active.len() == 0 {
 135              // Empty array? no timeout (wait forever for the next request)
 136              ptr::null()
 137          } else {
 138              let now = now();
 139              // If this request has already expired, then signal it and go
 140              // through another iteration
 141              if active.get(0).target <= now {
 142                  signal(&mut active, &mut dead);
 143                  continue;
 144              }
 145  
 146              // The actual timeout listed in the requests array is an
 147              // absolute date, so here we translate the absolute time to a
 148              // relative time.
 149              let tm = active.get(0).target - now;
 150              timeout.tv_sec = (tm / 1000) as libc::time_t;
 151              timeout.tv_usec = ((tm % 1000) * 1000) as libc::suseconds_t;
 152              &timeout as *libc::timeval
 153          };
 154  
 155          c::fd_set(&mut set, input);
 156          match unsafe {
 157              c::select(input + 1, &set, ptr::null(), ptr::null(), timeout)
 158          } {
 159              // timed out
 160              0 => signal(&mut active, &mut dead),
 161  
 162              // file descriptor write woke us up, we've got some new requests
 163              1 => {
 164                  loop {
 165                      match messages.try_recv() {
 166                          Ok(Shutdown) => {
 167                              assert!(active.len() == 0);
 168                              break 'outer;
 169                          }
 170  
 171                          Ok(NewTimer(timer)) => insert(timer, &mut active),
 172  
 173                          Ok(RemoveTimer(id, ack)) => {
 174                              match dead.iter().position(|&(i, _)| id == i) {
 175                                  Some(i) => {
 176                                      let (_, i) = dead.remove(i).unwrap();
 177                                      ack.send(i);
 178                                      continue
 179                                  }
 180                                  None => {}
 181                              }
 182                              let i = active.iter().position(|i| i.id == id);
 183                              let i = i.expect("no timer found");
 184                              let t = active.remove(i).unwrap();
 185                              ack.send(t);
 186                          }
 187                          Err(..) => break
 188                      }
 189                  }
 190  
 191                  // drain the file descriptor
 192                  let mut buf = [0];
 193                  assert_eq!(fd.inner_read(buf).unwrap(), 1);
 194              }
 195  
 196              -1 if os::errno() == libc::EINTR as int => {}
 197              n => fail!("helper thread failed in select() with error: {} ({})",
 198                         n, os::last_os_error())
 199          }
 200      }
 201  }
 202  
 203  impl Timer {
 204      pub fn new() -> IoResult<Timer> {
 205          timer_helper::boot(helper);
 206  
 207          static mut ID: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
 208          let id = unsafe { ID.fetch_add(1, atomics::Relaxed) };
 209          Ok(Timer {
 210              id: id,
 211              inner: Some(box Inner {
 212                  tx: None,
 213                  interval: 0,
 214                  target: 0,
 215                  repeat: false,
 216                  id: id,
 217              })
 218          })
 219      }
 220  
 221      pub fn sleep(msu64) {
 222          let mut to_sleep = libc::timespec {
 223              tv_sec: (ms / 1000) as libc::time_t,
 224              tv_nsec: ((ms % 1000) * 1000000) as libc::c_long,
 225          };
 226          while unsafe { libc::nanosleep(&to_sleep, &mut to_sleep) } != 0 {
 227              if os::errno() as int != libc::EINTR as int {
 228                  fail!("failed to sleep, but not because of EINTR?");
 229              }
 230          }
 231      }
 232  
 233      fn inner(&mut self) -> Box<Inner> {
 234          match self.inner.take() {
 235              Some(i) => i,
 236              None => {
 237                  let (tx, rx) = channel();
 238                  timer_helper::send(RemoveTimer(self.id, tx));
 239                  rx.recv()
 240              }
 241          }
 242      }
 243  }
 244  
 245  impl rtio::RtioTimer for Timer {
 246      fn sleep(&mut self, msecsu64) {
 247          let mut inner = self.inner();
 248          inner.tx = None; // cancel any previous request
 249          self.inner = Some(inner);
 250  
 251          Timer::sleep(msecs);
 252      }
 253  
 254      fn oneshot(&mut self, msecsu64) -> Receiver<()> {
 255          let now = now();
 256          let mut inner = self.inner();
 257  
 258          let (tx, rx) = channel();
 259          inner.repeat = false;
 260          inner.tx = Some(tx);
 261          inner.interval = msecs;
 262          inner.target = now + msecs;
 263  
 264          timer_helper::send(NewTimer(inner));
 265          return rx;
 266      }
 267  
 268      fn period(&mut self, msecsu64) -> Receiver<()> {
 269          let now = now();
 270          let mut inner = self.inner();
 271  
 272          let (tx, rx) = channel();
 273          inner.repeat = true;
 274          inner.tx = Some(tx);
 275          inner.interval = msecs;
 276          inner.target = now + msecs;
 277  
 278          timer_helper::send(NewTimer(inner));
 279          return rx;
 280      }
 281  }
 282  
 283  impl Drop for Timer {
 284      fn drop(&mut self) {
 285          self.inner = Some(self.inner());
 286      }
 287  }


libnative/io/timer_unix.rs:74:32-74:32 -enum- definition:
pub enum Req {
    // Add a new timer to the helper thread.
    NewTimer(Box<Inner>),
references:- 7
96: fn helper(input: libc::c_int, messages: Receiver<Req>) {
97:     let mut set: c::fd_set = unsafe { mem::init() };
libnative/io/timer_helper.rs:
35: // are safe to use concurrently.
36: static mut HELPER_CHAN: *mut Sender<Req> = 0 as *mut Sender<Req>;
37: static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;
--
67: pub fn send(req: Req) {
68:     unsafe {
--
88:         imp::close(HELPER_SIGNAL);
89:         let _chan: Box<Sender<Req>> = cast::transmute(HELPER_CHAN);
90:         HELPER_CHAN = 0 as *mut Sender<Req>;
91:         HELPER_SIGNAL = 0 as imp::signal;


libnative/io/timer_unix.rs:109:4-109:4 -fn- definition:
    fn insert(t: Box<Inner>, active: &mut Vec<Box<Inner>>) {
        match active.iter().position(|tm| tm.target > t.target) {
            Some(pos) => { active.insert(pos, t); }
references:- 2
125:             timer.target += timer.interval;
126:             insert(timer, active);
127:         } else {
--
171:                         Ok(NewTimer(timer)) => insert(timer, &mut active),


libnative/io/timer_unix.rs:65:1-65:1 -struct- definition:
struct Inner {
    tx: Option<Sender<()>>,
    interval: u64,
references:- 10
210:             id: id,
211:             inner: Some(box Inner {
212:                 tx: None,
--
233:     fn inner(&mut self) -> Box<Inner> {
234:         match self.inner.take() {


libnative/io/timer_unix.rs:117:4-117:4 -fn- definition:
    fn signal(active: &mut Vec<Box<Inner>>,
              dead: &mut Vec<(uint, Box<Inner>)>) {
        let mut timer = match active.shift() {
references:- 2
159:             // timed out
160:             0 => signal(&mut active, &mut dead),


libnative/io/timer_unix.rs:60:1-60:1 -struct- definition:
pub struct Timer {
    id: uint,
    inner: Option<Box<Inner>>,
references:- 5
208:         let id = unsafe { ID.fetch_add(1, atomics::Relaxed) };
209:         Ok(Timer {
210:             id: id,
--
245: impl rtio::RtioTimer for Timer {
246:     fn sleep(&mut self, msecs: u64) {
--
283: impl Drop for Timer {
284:     fn drop(&mut self) {


libnative/io/timer_unix.rs:87:46-87:46 -fn- definition:
// returns the current time (in milliseconds)
pub fn now() -> u64 {
    unsafe {
references:- 17
254:     fn oneshot(&mut self, msecs: u64) -> Receiver<()> {
255:         let now = now();
256:         let mut inner = self.inner();
--
268:     fn period(&mut self, msecs: u64) -> Receiver<()> {
269:         let now = now();
270:         let mut inner = self.inner();
libnative/io/pipe_unix.rs:
189:     fn set_timeout(&mut self, timeout: Option<u64>) {
190:         let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
191:         self.read_deadline = deadline;
--
267:     fn set_timeout(&mut self, timeout: Option<u64>) {
268:         self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
269:     }
libnative/io/net.rs:
411:     fn set_write_timeout(&mut self, timeout: Option<u64>) {
412:         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
413:     }
--
732:     fn set_read_timeout(&mut self, timeout: Option<u64>) {
733:         self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
734:     }
735:     fn set_write_timeout(&mut self, timeout: Option<u64>) {
736:         self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
737:     }
libnative/io/util.rs:
119:              timeout: u64) -> libc::c_int {
120:         let start = ::io::timer::now();
121:         retry(|| unsafe {
--
148:     match retry(|| {
149:         let now = ::io::timer::now();
150:         let tvp = match deadline {