(index<- )        ./librustuv/async.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  use std::cast;
  12  use std::rt::rtio::{Callback, RemoteCallback};
  13  use std::unstable::sync::Exclusive;
  14  
  15  use uvll;
  16  use super::{Loop, UvHandle};
  17  
  18  // The entire point of async is to call into a loop from other threads so it
  19  // does not need to home.
  20  pub struct AsyncWatcher {
  21      handle: *uvll::uv_async_t,
  22  
  23      // A flag to tell the callback to exit, set from the dtor. This is
  24      // almost never contested - only in rare races with the dtor.
  25      exit_flag: Exclusive<bool>
  26  }
  27  
  28  struct Payload {
  29      callback: Box<Callback:Send>,
  30      exit_flag: Exclusive<bool>,
  31  }
  32  
  33  impl AsyncWatcher {
  34      pub fn new(loop_&mut Loop, cbBox<Callback:Send>) -> AsyncWatcher {
  35          let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
  36          assert_eq!(unsafe {
  37              uvll::uv_async_init(loop_.handle, handle, async_cb)
  38          }, 0);
  39          let flag = Exclusive::new(false);
  40          let payload = box Payload { callback: cb, exit_flag: flag.clone() };
  41          unsafe {
  42              let payload*u8 = cast::transmute(payload);
  43              uvll::set_data_for_uv_handle(handle, payload);
  44          }
  45          return AsyncWatcher { handle: handle, exit_flag: flag, };
  46      }
  47  }
  48  
  49  impl UvHandle<uvll::uv_async_t> for AsyncWatcher {
  50      fn uv_handle(&self) -> *uvll::uv_async_t { self.handle }
  51      unsafe fn from_uv_handle<'a>(_&'a *uvll::uv_async_t) -> &'a mut AsyncWatcher {
  52          fail!("async watchers can't be built from their handles");
  53      }
  54  }
  55  
  56  extern fn async_cb(handle: *uvll::uv_async_t) {
  57      let payload&mut Payload = unsafe {
  58          cast::transmute(uvll::get_data_for_uv_handle(handle))
  59      };
  60  
  61      // The synchronization logic here is subtle. To review,
  62      // the uv async handle type promises that, after it is
  63      // triggered the remote callback is definitely called at
  64      // least once. UvRemoteCallback needs to maintain those
  65      // semantics while also shutting down cleanly from the
  66      // dtor. In our case that means that, when the
  67      // UvRemoteCallback dtor calls `async.send()`, here `f` is
  68      // always called later.
  69  
  70      // In the dtor both the exit flag is set and the async
  71      // callback fired under a lock.  Here, before calling `f`,
  72      // we take the lock and check the flag. Because we are
  73      // checking the flag before calling `f`, and the flag is
  74      // set under the same lock as the send, then if the flag
  75      // is set then we're guaranteed to call `f` after the
  76      // final send.
  77  
  78      // If the check was done after `f()` then there would be a
  79      // period between that call and the check where the dtor
  80      // could be called in the other thread, missing the final
  81      // callback while still destroying the handle.
  82  
  83      let should_exit = unsafe {
  84          payload.exit_flag.with_imm(|&should_exit| should_exit)
  85      };
  86  
  87      payload.callback.call();
  88  
  89      if should_exit {
  90          unsafe { uvll::uv_close(handle, close_cb) }
  91      }
  92  }
  93  
  94  extern fn close_cb(handle: *uvll::uv_handle_t) {
  95      // drop the payload
  96      let _payloadBox<Payload> = unsafe {
  97          cast::transmute(uvll::get_data_for_uv_handle(handle))
  98      };
  99      // and then free the handle
 100      unsafe { uvll::free_handle(handle) }
 101  }
 102  
 103  impl RemoteCallback for AsyncWatcher {
 104      fn fire(&mut self) {
 105          unsafe { uvll::uv_async_send(self.handle) }
 106      }
 107  }
 108  
 109  impl Drop for AsyncWatcher {
 110      fn drop(&mut self) {
 111          unsafe {
 112              self.exit_flag.with(|should_exit| {
 113                  // NB: These two things need to happen atomically. Otherwise
 114                  // the event handler could wake up due to a *previous*
 115                  // signal and see the exit flag, destroying the handle
 116                  // before the final send.
 117                  *should_exit = true;
 118                  uvll::uv_async_send(self.handle)
 119              })
 120          }
 121      }
 122  }
 123  
 124  #[cfg(test)]
 125  mod test_remote {
 126      use std::rt::rtio::{Callback, RemoteCallback};
 127      use std::rt::thread::Thread;
 128  
 129      use super::AsyncWatcher;
 130      use super::super::local_loop;
 131  
 132      // Make sure that we can fire watchers in remote threads and that they
 133      // actually trigger what they say they will.
 134      #[test]
 135      fn smoke_test() {
 136          struct MyCallback(Option<Sender<int>>);
 137          impl Callback for MyCallback {
 138              fn call(&mut self) {
 139                  // this can get called more than once, but we only want to send
 140                  // once
 141                  let MyCallback(ref mut s) = *self;
 142                  if s.is_some() {
 143                      s.take_unwrap().send(1);
 144                  }
 145              }
 146          }
 147  
 148          let (tx, rx) = channel();
 149          let cb = box MyCallback(Some(tx));
 150          let watcher = AsyncWatcher::new(&mut local_loop().loop_, cb);
 151  
 152          let thread = Thread::start(proc() {
 153              let mut watcher = watcher;
 154              watcher.fire();
 155          });
 156  
 157          assert_eq!(rx.recv(), 1);
 158          thread.join();
 159      }
 160  }