(index<- )        ./libgreen/basic.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! This is a basic event loop implementation not meant for any "real purposes"
  12  //! other than testing the scheduler and proving that it's possible to have a
  13  //! pluggable event loop.
  14  //!
  15  //! This implementation is also used as the fallback implementation of an event
  16  //! loop if no other one is provided (and M:N scheduling is desired).
  17  
  18  use std::cast;
  19  use std::mem::replace;
  20  use std::rt::rtio::{EventLoop, IoFactory, RemoteCallback};
  21  use std::rt::rtio::{PausableIdleCallback, Callback};
  22  use std::unstable::sync::Exclusive;
  23  
  24  /// This is the only exported function from this module.
  25  pub fn event_loop() -> Box<EventLoop:Send> {
  26      box BasicLoop::new() as Box<EventLoop:Send>
  27  }
  28  
  29  struct BasicLoop {
  30      work: Vec<proc():Send>,             // pending work
  31      idle: Option<*mut BasicPausable>, // only one is allowed
  32      remotes: Vec<(uint, Box<Callback:Send>)>,
  33      next_remote: uint,
  34      messages: Exclusive<Vec<Message>>,
  35  }
  36  
  37  enum Message { RunRemote(uint), RemoveRemote(uint) }
  38  
  39  impl BasicLoop {
  40      fn new() -> BasicLoop {
  41          BasicLoop {
  42              work: vec![],
  43              idle: None,
  44              next_remote: 0,
  45              remotes: vec![],
  46              messages: Exclusive::new(vec![]),
  47          }
  48      }
  49  
  50      /// Process everything in the work queue (continually)
  51      fn work(&mut self) {
  52          while self.work.len() > 0 {
  53              for work in replace(&mut self.work, vec![]).move_iter() {
  54                  work();
  55              }
  56          }
  57      }
  58  
  59      fn remote_work(&mut self) {
  60          let messages = unsafe {
  61              self.messages.with(|messages| {
  62                  if messages.len() > 0 {
  63                      Some(replace(messages, vec![]))
  64                  } else {
  65                      None
  66                  }
  67              })
  68          };
  69          let messages = match messages {
  70              Some(m) => m, None => return
  71          };
  72          for message in messages.iter() {
  73              self.message(*message);
  74          }
  75      }
  76  
  77      fn message(&mut self, messageMessage) {
  78          match message {
  79              RunRemote(i) => {
  80                  match self.remotes.mut_iter().find(|& &(id, _)| id == i) {
  81                      Some(&(_, ref mut f)) => f.call(),
  82                      None => unreachable!()
  83                  }
  84              }
  85              RemoveRemote(i) => {
  86                  match self.remotes.iter().position(|&(id, _)| id == i) {
  87                      Some(i) => { self.remotes.remove(i).unwrap(); }
  88                      None => unreachable!()
  89                  }
  90              }
  91          }
  92      }
  93  
  94      /// Run the idle callback if one is registered
  95      fn idle(&mut self) {
  96          unsafe {
  97              match self.idle {
  98                  Some(idle) => {
  99                      if (*idle).active {
 100                          (*idle).work.call();
 101                      }
 102                  }
 103                  None => {}
 104              }
 105          }
 106      }
 107  
 108      fn has_idle(&self) -> bool {
 109          unsafe { self.idle.is_some() && (**self.idle.get_ref()).active }
 110      }
 111  }
 112  
 113  impl EventLoop for BasicLoop {
 114      fn run(&mut self) {
 115          // Not exactly efficient, but it gets the job done.
 116          while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() {
 117  
 118              self.work();
 119              self.remote_work();
 120  
 121              if self.has_idle() {
 122                  self.idle();
 123                  continue
 124              }
 125  
 126              unsafe {
 127                  // We block here if we have no messages to process and we may
 128                  // receive a message at a later date
 129                  self.messages.hold_and_wait(|messages| {
 130                      self.remotes.len() > 0 &&
 131                          messages.len() == 0 &&
 132                          self.work.len() == 0
 133                  })
 134              }
 135          }
 136      }
 137  
 138      fn callback(&mut self, fproc():Send) {
 139          self.work.push(f);
 140      }
 141  
 142      // FIXME: Seems like a really weird requirement to have an event loop provide.
 143      fn pausable_idle_callback(&mut self, cbBox<Callback:Send>)
 144                                -> Box<PausableIdleCallback:Send> {
 145          let callback = box BasicPausable::new(self, cb);
 146          rtassert!(self.idle.is_none());
 147          unsafe {
 148              let cb_ptr&*mut BasicPausable = cast::transmute(&callback);
 149              self.idle = Some(*cb_ptr);
 150          }
 151          callback as Box<PausableIdleCallback:Send>
 152      }
 153  
 154      fn remote_callback(&mut self, fBox<Callback:Send>)
 155                         -> Box<RemoteCallback:Send> {
 156          let id = self.next_remote;
 157          self.next_remote += 1;
 158          self.remotes.push((id, f));
 159          box BasicRemote::new(self.messages.clone(), id) as
 160              Box<RemoteCallback:Send>
 161      }
 162  
 163      fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }
 164  
 165      fn has_active_io(&self) -> bool { false }
 166  }
 167  
 168  struct BasicRemote {
 169      queue: Exclusive<Vec<Message>>,
 170      id: uint,
 171  }
 172  
 173  impl BasicRemote {
 174      fn new(queueExclusive<Vec<Message>>, iduint) -> BasicRemote {
 175          BasicRemote { queue: queue, id: id }
 176      }
 177  }
 178  
 179  impl RemoteCallback for BasicRemote {
 180      fn fire(&mut self) {
 181          unsafe {
 182              self.queue.hold_and_signal(|queue| {
 183                  queue.push(RunRemote(self.id));
 184              })
 185          }
 186      }
 187  }
 188  
 189  impl Drop for BasicRemote {
 190      fn drop(&mut self) {
 191          unsafe {
 192              self.queue.hold_and_signal(|queue| {
 193                  queue.push(RemoveRemote(self.id));
 194              })
 195          }
 196      }
 197  }
 198  
 199  struct BasicPausable {
 200      eloop: *mut BasicLoop,
 201      work: Box<Callback:Send>,
 202      active: bool,
 203  }
 204  
 205  impl BasicPausable {
 206      fn new(eloop&mut BasicLoop, cbBox<Callback:Send>) -> BasicPausable {
 207          BasicPausable {
 208              active: false,
 209              work: cb,
 210              eloop: eloop,
 211          }
 212      }
 213  }
 214  
 215  impl PausableIdleCallback for BasicPausable {
 216      fn pause(&mut self) {
 217          self.active = false;
 218      }
 219      fn resume(&mut self) {
 220          self.active = true;
 221      }
 222  }
 223  
 224  impl Drop for BasicPausable {
 225      fn drop(&mut self) {
 226          unsafe {
 227              (*self.eloop).idle = None;
 228          }
 229      }
 230  }
 231  
 232  #[cfg(test)]
 233  mod test {
 234      use std::task::TaskOpts;
 235  
 236      use basic;
 237      use PoolConfig;
 238      use SchedPool;
 239  
 240      fn pool() -> SchedPool {
 241          SchedPool::new(PoolConfig {
 242              threads: 1,
 243              event_loop_factory: basic::event_loop,
 244          })
 245      }
 246  
 247      fn run(f: proc():Send) {
 248          let mut pool = pool();
 249          pool.spawn(TaskOpts::new(), f);
 250          pool.shutdown();
 251      }
 252  
 253      #[test]
 254      fn smoke() {
 255          run(proc() {});
 256      }
 257  
 258      #[test]
 259      fn some_channels() {
 260          run(proc() {
 261              let (tx, rx) = channel();
 262              spawn(proc() {
 263                  tx.send(());
 264              });
 265              rx.recv();
 266          });
 267      }
 268  
 269      #[test]
 270      fn multi_thread() {
 271          let mut pool = SchedPool::new(PoolConfig {
 272              threads: 2,
 273              event_loop_factory: basic::event_loop,
 274          });
 275  
 276          for _ in range(0, 20) {
 277              pool.spawn(TaskOpts::new(), proc() {
 278                  let (tx, rx) = channel();
 279                  spawn(proc() {
 280                      tx.send(());
 281                  });
 282                  rx.recv();
 283              });
 284          }
 285  
 286          pool.shutdown();
 287      }
 288  }


libgreen/basic.rs:28:1-28:1 -struct- definition:
struct BasicLoop {
    work: Vec<proc():Send>,             // pending work
    idle: Option<*mut BasicPausable>, // only one is allowed
references:- 6
39: impl BasicLoop {
40:     fn new() -> BasicLoop {
41:         BasicLoop {
--
199: struct BasicPausable {
200:     eloop: *mut BasicLoop,
201:     work: Box<Callback:Send>,
--
205: impl BasicPausable {
206:     fn new(eloop: &mut BasicLoop, cb: Box<Callback:Send>) -> BasicPausable {
207:         BasicPausable {


libgreen/basic.rs:167:1-167:1 -struct- definition:
struct BasicRemote {
    queue: Exclusive<Vec<Message>>,
    id: uint,
references:- 5
174:     fn new(queue: Exclusive<Vec<Message>>, id: uint) -> BasicRemote {
175:         BasicRemote { queue: queue, id: id }
176:     }
--
189: impl Drop for BasicRemote {
190:     fn drop(&mut self) {


libgreen/basic.rs:198:1-198:1 -struct- definition:
struct BasicPausable {
    eloop: *mut BasicLoop,
    work: Box<Callback:Send>,
references:- 7
206:     fn new(eloop: &mut BasicLoop, cb: Box<Callback:Send>) -> BasicPausable {
207:         BasicPausable {
208:             active: false,
--
215: impl PausableIdleCallback for BasicPausable {
216:     fn pause(&mut self) {
--
224: impl Drop for BasicPausable {
225:     fn drop(&mut self) {


libgreen/basic.rs:36:1-36:1 -enum- definition:
enum Message { RunRemote(uint), RemoveRemote(uint) }
impl BasicLoop {
    fn new() -> BasicLoop {
references:- 4
33:     next_remote: uint,
34:     messages: Exclusive<Vec<Message>>,
35: }
--
77:     fn message(&mut self, message: Message) {
78:         match message {
--
168: struct BasicRemote {
169:     queue: Exclusive<Vec<Message>>,
170:     id: uint,
--
173: impl BasicRemote {
174:     fn new(queue: Exclusive<Vec<Message>>, id: uint) -> BasicRemote {
175:         BasicRemote { queue: queue, id: id }