(index<- ) ./librustuv/queue.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! A concurrent queue used to signal remote event loops
12 //!
13 //! This queue implementation is used to send tasks among event loops. This is
14 //! backed by a multi-producer/single-consumer queue from libstd and uv_async_t
15 //! handles (to wake up a remote event loop).
16 //!
17 //! The uv_async_t is stored next to the event loop, so in order to not keep the
18 //! event loop alive we use uv_ref and uv_unref in order to control when the
19 //! async handle is active or not.
20
21 #![allow(dead_code)]
22
23 use libc::c_void;
24 use std::cast;
25 use std::rt::task::BlockedTask;
26 use std::unstable::mutex::NativeMutex;
27 use std::sync::arc::UnsafeArc;
28 use mpsc = std::sync::mpsc_queue;
29
30 use async::AsyncWatcher;
31 use super::{Loop, UvHandle};
32 use uvll;
33
34 enum Message {
35 Task(BlockedTask),
36 Increment,
37 Decrement,
38 }
39
40 struct State {
41 handle: *uvll::uv_async_t,
42 lock: NativeMutex, // see comments in async_cb for why this is needed
43 queue: mpsc::Queue<Message>,
44 }
45
46 /// This structure is intended to be stored next to the event loop, and it is
47 /// used to create new `Queue` structures.
48 pub struct QueuePool {
49 queue: UnsafeArc<State>,
50 refcnt: uint,
51 }
52
53 /// This type is used to send messages back to the original event loop.
54 pub struct Queue {
55 queue: UnsafeArc<State>,
56 }
57
58 extern fn async_cb(handle: *uvll::uv_async_t) {
59 let pool: &mut QueuePool = unsafe {
60 cast::transmute(uvll::get_data_for_uv_handle(handle))
61 };
62 let state: &mut State = unsafe { cast::transmute(pool.queue.get()) };
63
64 // Remember that there is no guarantee about how many times an async
65 // callback is called with relation to the number of sends, so process the
66 // entire queue in a loop.
67 loop {
68 match state.queue.pop() {
69 mpsc::Data(Task(task)) => {
70 let _ = task.wake().map(|t| t.reawaken());
71 }
72 mpsc::Data(Increment) => unsafe {
73 if pool.refcnt == 0 {
74 uvll::uv_ref(state.handle);
75 }
76 pool.refcnt += 1;
77 },
78 mpsc::Data(Decrement) => unsafe {
79 pool.refcnt -= 1;
80 if pool.refcnt == 0 {
81 uvll::uv_unref(state.handle);
82 }
83 },
84 mpsc::Empty | mpsc::Inconsistent => break
85 };
86 }
87
88 // If the refcount is now zero after processing the queue, then there is no
89 // longer a reference on the async handle and it is possible that this event
90 // loop can exit. What we're not guaranteed, however, is that a producer in
91 // the middle of dropping itself is yet done with the handle. It could be
92 // possible that we saw their Decrement message but they have yet to signal
93 // on the async handle. If we were to return immediately, the entire uv loop
94 // could be destroyed meaning the call to uv_async_send would abort()
95 //
96 // In order to fix this, an OS mutex is used to wait for the other end to
97 // finish before we continue. The drop block on a handle will acquire a
98 // mutex and then drop it after both the push and send have been completed.
99 // If we acquire the mutex here, then we are guaranteed that there are no
100 // longer any senders which are holding on to their handles, so we can
101 // safely allow the event loop to exit.
102 if pool.refcnt == 0 {
103 unsafe {
104 let _l = state.lock.lock();
105 }
106 }
107 }
108
109 impl QueuePool {
110 pub fn new(loop_: &mut Loop) -> Box<QueuePool> {
111 let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
112 let state = UnsafeArc::new(State {
113 handle: handle,
114 lock: unsafe {NativeMutex::new()},
115 queue: mpsc::Queue::new(),
116 });
117 let q = box QueuePool {
118 refcnt: 0,
119 queue: state,
120 };
121
122 unsafe {
123 assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0);
124 uvll::uv_unref(handle);
125 let data = &*q as *QueuePool as *c_void;
126 uvll::set_data_for_uv_handle(handle, data);
127 }
128
129 return q;
130 }
131
132 pub fn queue(&mut self) -> Queue {
133 unsafe {
134 if self.refcnt == 0 {
135 uvll::uv_ref((*self.queue.get()).handle);
136 }
137 self.refcnt += 1;
138 }
139 Queue { queue: self.queue.clone() }
140 }
141
142 pub fn handle(&self) -> *uvll::uv_async_t {
143 unsafe { (*self.queue.get()).handle }
144 }
145 }
146
147 impl Queue {
148 pub fn push(&mut self, task: BlockedTask) {
149 unsafe {
150 (*self.queue.get()).queue.push(Task(task));
151 uvll::uv_async_send((*self.queue.get()).handle);
152 }
153 }
154 }
155
156 impl Clone for Queue {
157 fn clone(&self) -> Queue {
158 // Push a request to increment on the queue, but there's no need to
159 // signal the event loop to process it at this time. We're guaranteed
160 // that the count is at least one (because we have a queue right here),
161 // and if the queue is dropped later on it'll see the increment for the
162 // decrement anyway.
163 unsafe {
164 (*self.queue.get()).queue.push(Increment);
165 }
166 Queue { queue: self.queue.clone() }
167 }
168 }
169
170 impl Drop for Queue {
171 fn drop(&mut self) {
172 // See the comments in the async_cb function for why there is a lock
173 // that is acquired only on a drop.
174 unsafe {
175 let state = self.queue.get();
176 let _l = (*state).lock.lock();
177 (*state).queue.push(Decrement);
178 uvll::uv_async_send((*state).handle);
179 }
180 }
181 }
182
183 impl Drop for State {
184 fn drop(&mut self) {
185 unsafe {
186 uvll::uv_close(self.handle, cast::transmute(0));
187 // Note that this does *not* free the handle, that is the
188 // responsibility of the caller because the uv loop must be closed
189 // before we deallocate this uv handle.
190 }
191 }
192 }
librustuv/queue.rs:39:1-39:1 -struct- definition:
struct State {
handle: *uvll::uv_async_t,
lock: NativeMutex, // see comments in async_cb for why this is needed
references:- 5111: let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
112: let state = UnsafeArc::new(State {
113: handle: handle,
--
183: impl Drop for State {
184: fn drop(&mut self) {
librustuv/queue.rs:53:72-53:72 -struct- definition:
/// This type is used to send messages back to the original event loop.
pub struct Queue {
queue: UnsafeArc<State>,
references:- 8138: }
139: Queue { queue: self.queue.clone() }
140: }
--
156: impl Clone for Queue {
157: fn clone(&self) -> Queue {
--
170: impl Drop for Queue {
171: fn drop(&mut self) {
librustuv/homing.rs:
50: pub struct HomeHandle {
51: queue: Queue,
52: id: uint,
librustuv/queue.rs:
165: }
166: Queue { queue: self.queue.clone() }
167: }
librustuv/queue.rs:47:43-47:43 -struct- definition:
/// used to create new `Queue` structures.
pub struct QueuePool {
queue: UnsafeArc<State>,
references:- 7116: });
117: let q = box QueuePool {
118: refcnt: 0,
--
124: uvll::uv_unref(handle);
125: let data = &*q as *QueuePool as *c_void;
126: uvll::set_data_for_uv_handle(handle, data);
librustuv/uvio.rs:
133: pub loop_: Loop,
134: handle_pool: Option<Box<QueuePool>>,
135: }
librustuv/homing.rs:
55: impl HomeHandle {
56: pub fn new(id: uint, pool: &mut QueuePool) -> HomeHandle {
57: HomeHandle { queue: pool.queue(), id: id }
librustuv/queue.rs:
109: impl QueuePool {
110: pub fn new(loop_: &mut Loop) -> Box<QueuePool> {