1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Timers for non-linux/non-windows OSes
12 //!
13 //! This module implements timers with a worker thread, select(), and a lot of
14 //! witchcraft that turns out to be horribly inaccurate timers. The unfortunate
15 //! part is that I'm at a loss of what else to do one these OSes. This is also
16 //! why linux has a specialized timerfd implementation and windows has its own
17 //! implementation (they're more accurate than this one).
18 //!
19 //! The basic idea is that there is a worker thread that's communicated to via a
20 //! channel and a pipe, the pipe is used by the worker thread in a select()
21 //! syscall with a timeout. The timeout is the "next timer timeout" while the
22 //! channel is used to send data over to the worker thread.
23 //!
24 //! Whenever the call to select() times out, then a channel receives a message.
25 //! Whenever the call returns that the file descriptor has information, then the
26 //! channel from timers is drained, enqueueing all incoming requests.
27 //!
28 //! The actual implementation of the helper thread is a sorted array of
29 //! timers in terms of target firing date. The target is the absolute time at
30 //! which the timer should fire. Timers are then re-enqueued after a firing if
31 //! the repeat boolean is set.
32 //!
33 //! Naturally, all this logic of adding times and keeping track of
34 //! relative/absolute time is a little lossy and not quite exact. I've done the
35 //! best I could to reduce the amount of calls to 'now()', but there's likely
36 //! still inaccuracies trickling in here and there.
37 //!
38 //! One of the tricky parts of this implementation is that whenever a timer is
39 //! acted upon, it must cancel whatever the previous action was (if one is
40 //! active) in order to act like the other implementations of this timer. In
41 //! order to do this, the timer's inner pointer is transferred to the worker
42 //! thread. Whenever the timer is modified, it first takes ownership back from
43 //! the worker thread in order to modify the same data structure. This has the
44 //! side effect of "cancelling" the previous requests while allowing a
45 //! re-enqueueing later on.
46 //!
47 //! Note that all time units in this file are in *milliseconds*.
48
49 use libc;
50 use std::mem;
51 use std::os;
52 use std::ptr;
53 use std::rt::rtio;
54 use std::sync::atomics;
55
56 use io::IoResult;
57 use io::c;
58 use io::file::FileDesc;
59 use io::timer_helper;
60
61 pub struct Timer {
62 id: uint,
63 inner: Option<Box<Inner>>,
64 }
65
66 struct Inner {
67 tx: Option<Sender<()>>,
68 interval: u64,
69 repeat: bool,
70 target: u64,
71 id: uint,
72 }
73
74 #[allow(visible_private_types)]
75 pub enum Req {
76 // Add a new timer to the helper thread.
77 NewTimer(Box<Inner>),
78
79 // Remove a timer based on its id and then send it back on the channel
80 // provided
81 RemoveTimer(uint, Sender<Box<Inner>>),
82
83 // Shut down the loop and then ACK this channel once it's shut down
84 Shutdown,
85 }
86
87 // returns the current time (in milliseconds)
88 pub fn now() -> u64 {
89 unsafe {
90 let mut now: libc::timeval = mem::init();
91 assert_eq!(c::gettimeofday(&mut now, ptr::null()), 0);
92 return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000;
93 }
94 }
95
96 fn helper(input: libc::c_int, messages: Receiver<Req>) {
97 let mut set: c::fd_set = unsafe { mem::init() };
98
99 let mut fd = FileDesc::new(input, true);
100 let mut timeout: libc::timeval = unsafe { mem::init() };
101
102 // active timers are those which are able to be selected upon (and it's a
103 // sorted list, and dead timers are those which have expired, but ownership
104 // hasn't yet been transferred back to the timer itself.
105 let mut active: Vec<Box<Inner>> = vec![];
106 let mut dead = vec![];
107
108 // inserts a timer into an array of timers (sorted by firing time)
109 fn insert(t: Box<Inner>, active: &mut Vec<Box<Inner>>) {
110 match active.iter().position(|tm| tm.target > t.target) {
111 Some(pos) => { active.insert(pos, t); }
112 None => { active.push(t); }
113 }
114 }
115
116 // signals the first requests in the queue, possible re-enqueueing it.
117 fn signal(active: &mut Vec<Box<Inner>>,
118 dead: &mut Vec<(uint, Box<Inner>)>) {
119 let mut timer = match active.shift() {
120 Some(timer) => timer, None => return
121 };
122 let tx = timer.tx.take_unwrap();
123 if tx.send_opt(()).is_ok() && timer.repeat {
124 timer.tx = Some(tx);
125 timer.target += timer.interval;
126 insert(timer, active);
127 } else {
128 drop(tx);
129 dead.push((timer.id, timer));
130 }
131 }
132
133 'outer: loop {
134 let timeout = if active.len() == 0 {
135 // Empty array? no timeout (wait forever for the next request)
136 ptr::null()
137 } else {
138 let now = now();
139 // If this request has already expired, then signal it and go
140 // through another iteration
141 if active.get(0).target <= now {
142 signal(&mut active, &mut dead);
143 continue;
144 }
145
146 // The actual timeout listed in the requests array is an
147 // absolute date, so here we translate the absolute time to a
148 // relative time.
149 let tm = active.get(0).target - now;
150 timeout.tv_sec = (tm / 1000) as libc::time_t;
151 timeout.tv_usec = ((tm % 1000) * 1000) as libc::suseconds_t;
152 &timeout as *libc::timeval
153 };
154
155 c::fd_set(&mut set, input);
156 match unsafe {
157 c::select(input + 1, &set, ptr::null(), ptr::null(), timeout)
158 } {
159 // timed out
160 0 => signal(&mut active, &mut dead),
161
162 // file descriptor write woke us up, we've got some new requests
163 1 => {
164 loop {
165 match messages.try_recv() {
166 Ok(Shutdown) => {
167 assert!(active.len() == 0);
168 break 'outer;
169 }
170
171 Ok(NewTimer(timer)) => insert(timer, &mut active),
172
173 Ok(RemoveTimer(id, ack)) => {
174 match dead.iter().position(|&(i, _)| id == i) {
175 Some(i) => {
176 let (_, i) = dead.remove(i).unwrap();
177 ack.send(i);
178 continue
179 }
180 None => {}
181 }
182 let i = active.iter().position(|i| i.id == id);
183 let i = i.expect("no timer found");
184 let t = active.remove(i).unwrap();
185 ack.send(t);
186 }
187 Err(..) => break
188 }
189 }
190
191 // drain the file descriptor
192 let mut buf = [0];
193 assert_eq!(fd.inner_read(buf).unwrap(), 1);
194 }
195
196 -1 if os::errno() == libc::EINTR as int => {}
197 n => fail!("helper thread failed in select() with error: {} ({})",
198 n, os::last_os_error())
199 }
200 }
201 }
202
203 impl Timer {
204 pub fn new() -> IoResult<Timer> {
205 timer_helper::boot(helper);
206
207 static mut ID: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
208 let id = unsafe { ID.fetch_add(1, atomics::Relaxed) };
209 Ok(Timer {
210 id: id,
211 inner: Some(box Inner {
212 tx: None,
213 interval: 0,
214 target: 0,
215 repeat: false,
216 id: id,
217 })
218 })
219 }
220
221 pub fn sleep(ms: u64) {
222 let mut to_sleep = libc::timespec {
223 tv_sec: (ms / 1000) as libc::time_t,
224 tv_nsec: ((ms % 1000) * 1000000) as libc::c_long,
225 };
226 while unsafe { libc::nanosleep(&to_sleep, &mut to_sleep) } != 0 {
227 if os::errno() as int != libc::EINTR as int {
228 fail!("failed to sleep, but not because of EINTR?");
229 }
230 }
231 }
232
233 fn inner(&mut self) -> Box<Inner> {
234 match self.inner.take() {
235 Some(i) => i,
236 None => {
237 let (tx, rx) = channel();
238 timer_helper::send(RemoveTimer(self.id, tx));
239 rx.recv()
240 }
241 }
242 }
243 }
244
245 impl rtio::RtioTimer for Timer {
246 fn sleep(&mut self, msecs: u64) {
247 let mut inner = self.inner();
248 inner.tx = None; // cancel any previous request
249 self.inner = Some(inner);
250
251 Timer::sleep(msecs);
252 }
253
254 fn oneshot(&mut self, msecs: u64) -> Receiver<()> {
255 let now = now();
256 let mut inner = self.inner();
257
258 let (tx, rx) = channel();
259 inner.repeat = false;
260 inner.tx = Some(tx);
261 inner.interval = msecs;
262 inner.target = now + msecs;
263
264 timer_helper::send(NewTimer(inner));
265 return rx;
266 }
267
268 fn period(&mut self, msecs: u64) -> Receiver<()> {
269 let now = now();
270 let mut inner = self.inner();
271
272 let (tx, rx) = channel();
273 inner.repeat = true;
274 inner.tx = Some(tx);
275 inner.interval = msecs;
276 inner.target = now + msecs;
277
278 timer_helper::send(NewTimer(inner));
279 return rx;
280 }
281 }
282
283 impl Drop for Timer {
284 fn drop(&mut self) {
285 self.inner = Some(self.inner());
286 }
287 }
libnative/io/timer_unix.rs:74:32-74:32 -enum- definition:
pub enum Req {
// Add a new timer to the helper thread.
NewTimer(Box<Inner>),
references:- 796: fn helper(input: libc::c_int, messages: Receiver<Req>) {
97: let mut set: c::fd_set = unsafe { mem::init() };
libnative/io/timer_helper.rs:
35: // are safe to use concurrently.
36: static mut HELPER_CHAN: *mut Sender<Req> = 0 as *mut Sender<Req>;
37: static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;
--
67: pub fn send(req: Req) {
68: unsafe {
--
88: imp::close(HELPER_SIGNAL);
89: let _chan: Box<Sender<Req>> = cast::transmute(HELPER_CHAN);
90: HELPER_CHAN = 0 as *mut Sender<Req>;
91: HELPER_SIGNAL = 0 as imp::signal;
libnative/io/timer_unix.rs:109:4-109:4 -fn- definition:
fn insert(t: Box<Inner>, active: &mut Vec<Box<Inner>>) {
match active.iter().position(|tm| tm.target > t.target) {
Some(pos) => { active.insert(pos, t); }
references:- 2125: timer.target += timer.interval;
126: insert(timer, active);
127: } else {
--
171: Ok(NewTimer(timer)) => insert(timer, &mut active),
libnative/io/timer_unix.rs:65:1-65:1 -struct- definition:
struct Inner {
tx: Option<Sender<()>>,
interval: u64,
references:- 10210: id: id,
211: inner: Some(box Inner {
212: tx: None,
--
233: fn inner(&mut self) -> Box<Inner> {
234: match self.inner.take() {
libnative/io/timer_unix.rs:117:4-117:4 -fn- definition:
fn signal(active: &mut Vec<Box<Inner>>,
dead: &mut Vec<(uint, Box<Inner>)>) {
let mut timer = match active.shift() {
references:- 2159: // timed out
160: 0 => signal(&mut active, &mut dead),
libnative/io/timer_unix.rs:60:1-60:1 -struct- definition:
pub struct Timer {
id: uint,
inner: Option<Box<Inner>>,
references:- 5208: let id = unsafe { ID.fetch_add(1, atomics::Relaxed) };
209: Ok(Timer {
210: id: id,
--
245: impl rtio::RtioTimer for Timer {
246: fn sleep(&mut self, msecs: u64) {
--
283: impl Drop for Timer {
284: fn drop(&mut self) {
libnative/io/timer_unix.rs:87:46-87:46 -fn- definition:
// returns the current time (in milliseconds)
pub fn now() -> u64 {
unsafe {
references:- 17254: fn oneshot(&mut self, msecs: u64) -> Receiver<()> {
255: let now = now();
256: let mut inner = self.inner();
--
268: fn period(&mut self, msecs: u64) -> Receiver<()> {
269: let now = now();
270: let mut inner = self.inner();
libnative/io/pipe_unix.rs:
189: fn set_timeout(&mut self, timeout: Option<u64>) {
190: let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
191: self.read_deadline = deadline;
--
267: fn set_timeout(&mut self, timeout: Option<u64>) {
268: self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
269: }
libnative/io/net.rs:
411: fn set_write_timeout(&mut self, timeout: Option<u64>) {
412: self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
413: }
--
732: fn set_read_timeout(&mut self, timeout: Option<u64>) {
733: self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
734: }
735: fn set_write_timeout(&mut self, timeout: Option<u64>) {
736: self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
737: }
libnative/io/util.rs:
119: timeout: u64) -> libc::c_int {
120: let start = ::io::timer::now();
121: retry(|| unsafe {
--
148: match retry(|| {
149: let now = ::io::timer::now();
150: let tvp = match deadline {