(index<- ) ./libgreen/basic.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! This is a basic event loop implementation not meant for any "real purposes"
12 //! other than testing the scheduler and proving that it's possible to have a
13 //! pluggable event loop.
14 //!
15 //! This implementation is also used as the fallback implementation of an event
16 //! loop if no other one is provided (and M:N scheduling is desired).
17
18 use std::cast;
19 use std::mem::replace;
20 use std::rt::rtio::{EventLoop, IoFactory, RemoteCallback};
21 use std::rt::rtio::{PausableIdleCallback, Callback};
22 use std::unstable::sync::Exclusive;
23
24 /// This is the only exported function from this module.
25 pub fn event_loop() -> Box<EventLoop:Send> {
26 box BasicLoop::new() as Box<EventLoop:Send>
27 }
28
29 struct BasicLoop {
30 work: Vec<proc():Send>, // pending work
31 idle: Option<*mut BasicPausable>, // only one is allowed
32 remotes: Vec<(uint, Box<Callback:Send>)>,
33 next_remote: uint,
34 messages: Exclusive<Vec<Message>>,
35 }
36
37 enum Message { RunRemote(uint), RemoveRemote(uint) }
38
39 impl BasicLoop {
40 fn new() -> BasicLoop {
41 BasicLoop {
42 work: vec![],
43 idle: None,
44 next_remote: 0,
45 remotes: vec![],
46 messages: Exclusive::new(vec![]),
47 }
48 }
49
50 /// Process everything in the work queue (continually)
51 fn work(&mut self) {
52 while self.work.len() > 0 {
53 for work in replace(&mut self.work, vec![]).move_iter() {
54 work();
55 }
56 }
57 }
58
59 fn remote_work(&mut self) {
60 let messages = unsafe {
61 self.messages.with(|messages| {
62 if messages.len() > 0 {
63 Some(replace(messages, vec![]))
64 } else {
65 None
66 }
67 })
68 };
69 let messages = match messages {
70 Some(m) => m, None => return
71 };
72 for message in messages.iter() {
73 self.message(*message);
74 }
75 }
76
77 fn message(&mut self, message: Message) {
78 match message {
79 RunRemote(i) => {
80 match self.remotes.mut_iter().find(|& &(id, _)| id == i) {
81 Some(&(_, ref mut f)) => f.call(),
82 None => unreachable!()
83 }
84 }
85 RemoveRemote(i) => {
86 match self.remotes.iter().position(|&(id, _)| id == i) {
87 Some(i) => { self.remotes.remove(i).unwrap(); }
88 None => unreachable!()
89 }
90 }
91 }
92 }
93
94 /// Run the idle callback if one is registered
95 fn idle(&mut self) {
96 unsafe {
97 match self.idle {
98 Some(idle) => {
99 if (*idle).active {
100 (*idle).work.call();
101 }
102 }
103 None => {}
104 }
105 }
106 }
107
108 fn has_idle(&self) -> bool {
109 unsafe { self.idle.is_some() && (**self.idle.get_ref()).active }
110 }
111 }
112
113 impl EventLoop for BasicLoop {
114 fn run(&mut self) {
115 // Not exactly efficient, but it gets the job done.
116 while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() {
117
118 self.work();
119 self.remote_work();
120
121 if self.has_idle() {
122 self.idle();
123 continue
124 }
125
126 unsafe {
127 // We block here if we have no messages to process and we may
128 // receive a message at a later date
129 self.messages.hold_and_wait(|messages| {
130 self.remotes.len() > 0 &&
131 messages.len() == 0 &&
132 self.work.len() == 0
133 })
134 }
135 }
136 }
137
138 fn callback(&mut self, f: proc():Send) {
139 self.work.push(f);
140 }
141
142 // FIXME: Seems like a really weird requirement to have an event loop provide.
143 fn pausable_idle_callback(&mut self, cb: Box<Callback:Send>)
144 -> Box<PausableIdleCallback:Send> {
145 let callback = box BasicPausable::new(self, cb);
146 rtassert!(self.idle.is_none());
147 unsafe {
148 let cb_ptr: &*mut BasicPausable = cast::transmute(&callback);
149 self.idle = Some(*cb_ptr);
150 }
151 callback as Box<PausableIdleCallback:Send>
152 }
153
154 fn remote_callback(&mut self, f: Box<Callback:Send>)
155 -> Box<RemoteCallback:Send> {
156 let id = self.next_remote;
157 self.next_remote += 1;
158 self.remotes.push((id, f));
159 box BasicRemote::new(self.messages.clone(), id) as
160 Box<RemoteCallback:Send>
161 }
162
163 fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }
164
165 fn has_active_io(&self) -> bool { false }
166 }
167
168 struct BasicRemote {
169 queue: Exclusive<Vec<Message>>,
170 id: uint,
171 }
172
173 impl BasicRemote {
174 fn new(queue: Exclusive<Vec<Message>>, id: uint) -> BasicRemote {
175 BasicRemote { queue: queue, id: id }
176 }
177 }
178
179 impl RemoteCallback for BasicRemote {
180 fn fire(&mut self) {
181 unsafe {
182 self.queue.hold_and_signal(|queue| {
183 queue.push(RunRemote(self.id));
184 })
185 }
186 }
187 }
188
189 impl Drop for BasicRemote {
190 fn drop(&mut self) {
191 unsafe {
192 self.queue.hold_and_signal(|queue| {
193 queue.push(RemoveRemote(self.id));
194 })
195 }
196 }
197 }
198
199 struct BasicPausable {
200 eloop: *mut BasicLoop,
201 work: Box<Callback:Send>,
202 active: bool,
203 }
204
205 impl BasicPausable {
206 fn new(eloop: &mut BasicLoop, cb: Box<Callback:Send>) -> BasicPausable {
207 BasicPausable {
208 active: false,
209 work: cb,
210 eloop: eloop,
211 }
212 }
213 }
214
215 impl PausableIdleCallback for BasicPausable {
216 fn pause(&mut self) {
217 self.active = false;
218 }
219 fn resume(&mut self) {
220 self.active = true;
221 }
222 }
223
224 impl Drop for BasicPausable {
225 fn drop(&mut self) {
226 unsafe {
227 (*self.eloop).idle = None;
228 }
229 }
230 }
231
232 #[cfg(test)]
233 mod test {
234 use std::task::TaskOpts;
235
236 use basic;
237 use PoolConfig;
238 use SchedPool;
239
240 fn pool() -> SchedPool {
241 SchedPool::new(PoolConfig {
242 threads: 1,
243 event_loop_factory: basic::event_loop,
244 })
245 }
246
247 fn run(f: proc():Send) {
248 let mut pool = pool();
249 pool.spawn(TaskOpts::new(), f);
250 pool.shutdown();
251 }
252
253 #[test]
254 fn smoke() {
255 run(proc() {});
256 }
257
258 #[test]
259 fn some_channels() {
260 run(proc() {
261 let (tx, rx) = channel();
262 spawn(proc() {
263 tx.send(());
264 });
265 rx.recv();
266 });
267 }
268
269 #[test]
270 fn multi_thread() {
271 let mut pool = SchedPool::new(PoolConfig {
272 threads: 2,
273 event_loop_factory: basic::event_loop,
274 });
275
276 for _ in range(0, 20) {
277 pool.spawn(TaskOpts::new(), proc() {
278 let (tx, rx) = channel();
279 spawn(proc() {
280 tx.send(());
281 });
282 rx.recv();
283 });
284 }
285
286 pool.shutdown();
287 }
288 }
libgreen/basic.rs:28:1-28:1 -struct- definition:
struct BasicLoop {
work: Vec<proc():Send>, // pending work
idle: Option<*mut BasicPausable>, // only one is allowed
references:- 639: impl BasicLoop {
40: fn new() -> BasicLoop {
41: BasicLoop {
--
199: struct BasicPausable {
200: eloop: *mut BasicLoop,
201: work: Box<Callback:Send>,
--
205: impl BasicPausable {
206: fn new(eloop: &mut BasicLoop, cb: Box<Callback:Send>) -> BasicPausable {
207: BasicPausable {
libgreen/basic.rs:167:1-167:1 -struct- definition:
struct BasicRemote {
queue: Exclusive<Vec<Message>>,
id: uint,
references:- 5174: fn new(queue: Exclusive<Vec<Message>>, id: uint) -> BasicRemote {
175: BasicRemote { queue: queue, id: id }
176: }
--
189: impl Drop for BasicRemote {
190: fn drop(&mut self) {
libgreen/basic.rs:198:1-198:1 -struct- definition:
struct BasicPausable {
eloop: *mut BasicLoop,
work: Box<Callback:Send>,
references:- 7206: fn new(eloop: &mut BasicLoop, cb: Box<Callback:Send>) -> BasicPausable {
207: BasicPausable {
208: active: false,
--
215: impl PausableIdleCallback for BasicPausable {
216: fn pause(&mut self) {
--
224: impl Drop for BasicPausable {
225: fn drop(&mut self) {
libgreen/basic.rs:36:1-36:1 -enum- definition:
enum Message { RunRemote(uint), RemoveRemote(uint) }
impl BasicLoop {
fn new() -> BasicLoop {
references:- 433: next_remote: uint,
34: messages: Exclusive<Vec<Message>>,
35: }
--
77: fn message(&mut self, message: Message) {
78: match message {
--
168: struct BasicRemote {
169: queue: Exclusive<Vec<Message>>,
170: id: uint,
--
173: impl BasicRemote {
174: fn new(queue: Exclusive<Vec<Message>>, id: uint) -> BasicRemote {
175: BasicRemote { queue: queue, id: id }