(index<- )        ./librustuv/queue.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! A concurrent queue used to signal remote event loops
  12  //!
  13  //! This queue implementation is used to send tasks among event loops. This is
  14  //! backed by a multi-producer/single-consumer queue from libstd and uv_async_t
  15  //! handles (to wake up a remote event loop).
  16  //!
  17  //! The uv_async_t is stored next to the event loop, so in order to not keep the
  18  //! event loop alive we use uv_ref and uv_unref in order to control when the
  19  //! async handle is active or not.
  20  
  21  #![allow(dead_code)]
  22  
  23  use libc::c_void;
  24  use std::cast;
  25  use std::rt::task::BlockedTask;
  26  use std::unstable::mutex::NativeMutex;
  27  use std::sync::arc::UnsafeArc;
  28  use mpsc = std::sync::mpsc_queue;
  29  
  30  use async::AsyncWatcher;
  31  use super::{Loop, UvHandle};
  32  use uvll;
  33  
  34  enum Message {
  35      Task(BlockedTask),
  36      Increment,
  37      Decrement,
  38  }
  39  
  40  struct State {
  41      handle: *uvll::uv_async_t,
  42      lock: NativeMutex, // see comments in async_cb for why this is needed
  43      queue: mpsc::Queue<Message>,
  44  }
  45  
  46  /// This structure is intended to be stored next to the event loop, and it is
  47  /// used to create new `Queue` structures.
  48  pub struct QueuePool {
  49      queue: UnsafeArc<State>,
  50      refcnt: uint,
  51  }
  52  
  53  /// This type is used to send messages back to the original event loop.
  54  pub struct Queue {
  55      queue: UnsafeArc<State>,
  56  }
  57  
  58  extern fn async_cb(handle: *uvll::uv_async_t) {
  59      let pool&mut QueuePool = unsafe {
  60          cast::transmute(uvll::get_data_for_uv_handle(handle))
  61      };
  62      let state&mut State = unsafe { cast::transmute(pool.queue.get()) };
  63  
  64      // Remember that there is no guarantee about how many times an async
  65      // callback is called with relation to the number of sends, so process the
  66      // entire queue in a loop.
  67      loop {
  68          match state.queue.pop() {
  69              mpsc::Data(Task(task)) => {
  70                  let _ = task.wake().map(|t| t.reawaken());
  71              }
  72              mpsc::Data(Increment) => unsafe {
  73                  if pool.refcnt == 0 {
  74                      uvll::uv_ref(state.handle);
  75                  }
  76                  pool.refcnt += 1;
  77              },
  78              mpsc::Data(Decrement) => unsafe {
  79                  pool.refcnt -= 1;
  80                  if pool.refcnt == 0 {
  81                      uvll::uv_unref(state.handle);
  82                  }
  83              },
  84              mpsc::Empty | mpsc::Inconsistent => break
  85          };
  86      }
  87  
  88      // If the refcount is now zero after processing the queue, then there is no
  89      // longer a reference on the async handle and it is possible that this event
  90      // loop can exit. What we're not guaranteed, however, is that a producer in
  91      // the middle of dropping itself is yet done with the handle. It could be
  92      // possible that we saw their Decrement message but they have yet to signal
  93      // on the async handle. If we were to return immediately, the entire uv loop
  94      // could be destroyed meaning the call to uv_async_send would abort()
  95      //
  96      // In order to fix this, an OS mutex is used to wait for the other end to
  97      // finish before we continue. The drop block on a handle will acquire a
  98      // mutex and then drop it after both the push and send have been completed.
  99      // If we acquire the mutex here, then we are guaranteed that there are no
 100      // longer any senders which are holding on to their handles, so we can
 101      // safely allow the event loop to exit.
 102      if pool.refcnt == 0 {
 103          unsafe {
 104              let _l = state.lock.lock();
 105          }
 106      }
 107  }
 108  
 109  impl QueuePool {
 110      pub fn new(loop_&mut Loop) -> Box<QueuePool> {
 111          let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
 112          let state = UnsafeArc::new(State {
 113              handle: handle,
 114              lock: unsafe {NativeMutex::new()},
 115              queue: mpsc::Queue::new(),
 116          });
 117          let q = box QueuePool {
 118              refcnt: 0,
 119              queue: state,
 120          };
 121  
 122          unsafe {
 123              assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0);
 124              uvll::uv_unref(handle);
 125              let data = &*q as *QueuePool as *c_void;
 126              uvll::set_data_for_uv_handle(handle, data);
 127          }
 128  
 129          return q;
 130      }
 131  
 132      pub fn queue(&mut self) -> Queue {
 133          unsafe {
 134              if self.refcnt == 0 {
 135                  uvll::uv_ref((*self.queue.get()).handle);
 136              }
 137              self.refcnt += 1;
 138          }
 139          Queue { queue: self.queue.clone() }
 140      }
 141  
 142      pub fn handle(&self) -> *uvll::uv_async_t {
 143          unsafe { (*self.queue.get()).handle }
 144      }
 145  }
 146  
 147  impl Queue {
 148      pub fn push(&mut self, taskBlockedTask) {
 149          unsafe {
 150              (*self.queue.get()).queue.push(Task(task));
 151              uvll::uv_async_send((*self.queue.get()).handle);
 152          }
 153      }
 154  }
 155  
 156  impl Clone for Queue {
 157      fn clone(&self) -> Queue {
 158          // Push a request to increment on the queue, but there's no need to
 159          // signal the event loop to process it at this time. We're guaranteed
 160          // that the count is at least one (because we have a queue right here),
 161          // and if the queue is dropped later on it'll see the increment for the
 162          // decrement anyway.
 163          unsafe {
 164              (*self.queue.get()).queue.push(Increment);
 165          }
 166          Queue { queue: self.queue.clone() }
 167      }
 168  }
 169  
 170  impl Drop for Queue {
 171      fn drop(&mut self) {
 172          // See the comments in the async_cb function for why there is a lock
 173          // that is acquired only on a drop.
 174          unsafe {
 175              let state = self.queue.get();
 176              let _l = (*state).lock.lock();
 177              (*state).queue.push(Decrement);
 178              uvll::uv_async_send((*state).handle);
 179          }
 180      }
 181  }
 182  
 183  impl Drop for State {
 184      fn drop(&mut self) {
 185          unsafe {
 186              uvll::uv_close(self.handle, cast::transmute(0));
 187              // Note that this does *not* free the handle, that is the
 188              // responsibility of the caller because the uv loop must be closed
 189              // before we deallocate this uv handle.
 190          }
 191      }
 192  }


librustuv/queue.rs:39:1-39:1 -struct- definition:
struct State {
    handle: *uvll::uv_async_t,
    lock: NativeMutex, // see comments in async_cb for why this is needed
references:- 5
111:         let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
112:         let state = UnsafeArc::new(State {
113:             handle: handle,
--
183: impl Drop for State {
184:     fn drop(&mut self) {


librustuv/queue.rs:53:72-53:72 -struct- definition:
/// This type is used to send messages back to the original event loop.
pub struct Queue {
    queue: UnsafeArc<State>,
references:- 8
138:         }
139:         Queue { queue: self.queue.clone() }
140:     }
--
156: impl Clone for Queue {
157:     fn clone(&self) -> Queue {
--
170: impl Drop for Queue {
171:     fn drop(&mut self) {
librustuv/homing.rs:
50: pub struct HomeHandle {
51:     queue: Queue,
52:     id: uint,
librustuv/queue.rs:
165:         }
166:         Queue { queue: self.queue.clone() }
167:     }


librustuv/queue.rs:47:43-47:43 -struct- definition:
/// used to create new `Queue` structures.
pub struct QueuePool {
    queue: UnsafeArc<State>,
references:- 7
116:         });
117:         let q = box QueuePool {
118:             refcnt: 0,
--
124:             uvll::uv_unref(handle);
125:             let data = &*q as *QueuePool as *c_void;
126:             uvll::set_data_for_uv_handle(handle, data);
librustuv/uvio.rs:
133:     pub loop_: Loop,
134:     handle_pool: Option<Box<QueuePool>>,
135: }
librustuv/homing.rs:
55: impl HomeHandle {
56:     pub fn new(id: uint, pool: &mut QueuePool) -> HomeHandle {
57:         HomeHandle { queue: pool.queue(), id: id }
librustuv/queue.rs:
109: impl QueuePool {
110:     pub fn new(loop_: &mut Loop) -> Box<QueuePool> {