(index<- ) ./libstd/comm/oneshot.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /// Oneshot channels/ports
12 ///
13 /// This is the initial flavor of channels/ports used for comm module. This is
14 /// an optimization for the one-use case of a channel. The major optimization of
15 /// this type is to have one and exactly one allocation when the chan/port pair
16 /// is created.
17 ///
18 /// Another possible optimization would be to not use an UnsafeArc box because
19 /// in theory we know when the shared packet can be deallocated (no real need
20 /// for the atomic reference counting), but I was having trouble how to destroy
21 /// the data early in a drop of a Port.
22 ///
23 /// # Implementation
24 ///
25 /// Oneshots are implemented around one atomic uint variable. This variable
26 /// indicates both the state of the port/chan but also contains any tasks
27 /// blocked on the port. All atomic operations happen on this one word.
28 ///
29 /// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
30 /// on behalf of the channel side of things (it can be mentally thought of as
31 /// consuming the port). This upgrade is then also stored in the shared packet.
32 /// The one caveat to consider is that when a port sees a disconnected channel
33 /// it must check for data because there is no "data plus upgrade" state.
34
35 use comm::Receiver;
36 use kinds::Send;
37 use mem;
38 use ops::Drop;
39 use option::{Some, None, Option};
40 use owned::Box;
41 use result::{Result, Ok, Err};
42 use rt::local::Local;
43 use rt::task::{Task, BlockedTask};
44 use sync::atomics;
45
46 // Various states you can find a port in.
47 static EMPTY: uint = 0;
48 static DATA: uint = 1;
49 static DISCONNECTED: uint = 2;
50
51 pub struct Packet<T> {
52 // Internal state of the chan/port pair (stores the blocked task as well)
53 state: atomics::AtomicUint,
54 // One-shot data slot location
55 data: Option<T>,
56 // when used for the second time, a oneshot channel must be upgraded, and
57 // this contains the slot for the upgrade
58 upgrade: MyUpgrade<T>,
59 }
60
61 pub enum Failure<T> {
62 Empty,
63 Disconnected,
64 Upgraded(Receiver<T>),
65 }
66
67 pub enum UpgradeResult {
68 UpSuccess,
69 UpDisconnected,
70 UpWoke(BlockedTask),
71 }
72
73 pub enum SelectionResult<T> {
74 SelCanceled(BlockedTask),
75 SelUpgraded(BlockedTask, Receiver<T>),
76 SelSuccess,
77 }
78
79 enum MyUpgrade<T> {
80 NothingSent,
81 SendUsed,
82 GoUp(Receiver<T>),
83 }
84
85 impl<T: Send> Packet<T> {
86 pub fn new() -> Packet<T> {
87 Packet {
88 data: None,
89 upgrade: NothingSent,
90 state: atomics::AtomicUint::new(EMPTY),
91 }
92 }
93
94 pub fn send(&mut self, t: T) -> Result<(), T> {
95 // Sanity check
96 match self.upgrade {
97 NothingSent => {}
98 _ => fail!("sending on a oneshot that's already sent on "),
99 }
100 assert!(self.data.is_none());
101 self.data = Some(t);
102 self.upgrade = SendUsed;
103
104 match self.state.swap(DATA, atomics::SeqCst) {
105 // Sent the data, no one was waiting
106 EMPTY => Ok(()),
107
108 // Couldn't send the data, the port hung up first. Return the data
109 // back up the stack.
110 DISCONNECTED => {
111 Err(self.data.take_unwrap())
112 }
113
114 // Not possible, these are one-use channels
115 DATA => unreachable!(),
116
117 // Anything else means that there was a task waiting on the other
118 // end. We leave the 'DATA' state inside so it'll pick it up on the
119 // other end.
120 n => unsafe {
121 let t = BlockedTask::cast_from_uint(n);
122 t.wake().map(|t| t.reawaken());
123 Ok(())
124 }
125 }
126 }
127
128 // Just tests whether this channel has been sent on or not, this is only
129 // safe to use from the sender.
130 pub fn sent(&self) -> bool {
131 match self.upgrade {
132 NothingSent => false,
133 _ => true,
134 }
135 }
136
137 pub fn recv(&mut self) -> Result<T, Failure<T>> {
138 // Attempt to not block the task (it's a little expensive). If it looks
139 // like we're not empty, then immediately go through to `try_recv`.
140 if self.state.load(atomics::SeqCst) == EMPTY {
141 let t: Box<Task> = Local::take();
142 t.deschedule(1, |task| {
143 let n = unsafe { task.cast_to_uint() };
144 match self.state.compare_and_swap(EMPTY, n, atomics::SeqCst) {
145 // Nothing on the channel, we legitimately block
146 EMPTY => Ok(()),
147
148 // If there's data or it's a disconnected channel, then we
149 // failed the cmpxchg, so we just wake ourselves back up
150 DATA | DISCONNECTED => {
151 unsafe { Err(BlockedTask::cast_from_uint(n)) }
152 }
153
154 // Only one thread is allowed to sleep on this port
155 _ => unreachable!()
156 }
157 });
158 }
159
160 self.try_recv()
161 }
162
163 pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
164 match self.state.load(atomics::SeqCst) {
165 EMPTY => Err(Empty),
166
167 // We saw some data on the channel, but the channel can be used
168 // again to send us an upgrade. As a result, we need to re-insert
169 // into the channel that there's no data available (otherwise we'll
170 // just see DATA next time). This is done as a cmpxchg because if
171 // the state changes under our feet we'd rather just see that state
172 // change.
173 DATA => {
174 self.state.compare_and_swap(DATA, EMPTY, atomics::SeqCst);
175 match self.data.take() {
176 Some(data) => Ok(data),
177 None => unreachable!(),
178 }
179 }
180
181 // There's no guarantee that we receive before an upgrade happens,
182 // and an upgrade flags the channel as disconnected, so when we see
183 // this we first need to check if there's data available and *then*
184 // we go through and process the upgrade.
185 DISCONNECTED => {
186 match self.data.take() {
187 Some(data) => Ok(data),
188 None => {
189 match mem::replace(&mut self.upgrade, SendUsed) {
190 SendUsed | NothingSent => Err(Disconnected),
191 GoUp(upgrade) => Err(Upgraded(upgrade))
192 }
193 }
194 }
195 }
196 _ => unreachable!()
197 }
198 }
199
200 // Returns whether the upgrade was completed. If the upgrade wasn't
201 // completed, then the port couldn't get sent to the other half (it will
202 // never receive it).
203 pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
204 let prev = match self.upgrade {
205 NothingSent => NothingSent,
206 SendUsed => SendUsed,
207 _ => fail!("upgrading again"),
208 };
209 self.upgrade = GoUp(up);
210
211 match self.state.swap(DISCONNECTED, atomics::SeqCst) {
212 // If the channel is empty or has data on it, then we're good to go.
213 // Senders will check the data before the upgrade (in case we
214 // plastered over the DATA state).
215 DATA | EMPTY => UpSuccess,
216
217 // If the other end is already disconnected, then we failed the
218 // upgrade. Be sure to trash the port we were given.
219 DISCONNECTED => { self.upgrade = prev; UpDisconnected }
220
221 // If someone's waiting, we gotta wake them up
222 n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) })
223 }
224 }
225
226 pub fn drop_chan(&mut self) {
227 match self.state.swap(DISCONNECTED, atomics::SeqCst) {
228 DATA | DISCONNECTED | EMPTY => {}
229
230 // If someone's waiting, we gotta wake them up
231 n => unsafe {
232 let t = BlockedTask::cast_from_uint(n);
233 t.wake().map(|t| t.reawaken());
234 }
235 }
236 }
237
238 pub fn drop_port(&mut self) {
239 match self.state.swap(DISCONNECTED, atomics::SeqCst) {
240 // An empty channel has nothing to do, and a remotely disconnected
241 // channel also has nothing to do b/c we're about to run the drop
242 // glue
243 DISCONNECTED | EMPTY => {}
244
245 // There's data on the channel, so make sure we destroy it promptly.
246 // This is why not using an arc is a little difficult (need the box
247 // to stay valid while we take the data).
248 DATA => { self.data.take_unwrap(); }
249
250 // We're the only ones that can block on this port
251 _ => unreachable!()
252 }
253 }
254
255 ////////////////////////////////////////////////////////////////////////////
256 // select implementation
257 ////////////////////////////////////////////////////////////////////////////
258
259 // If Ok, the value is whether this port has data, if Err, then the upgraded
260 // port needs to be checked instead of this one.
261 pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
262 match self.state.load(atomics::SeqCst) {
263 EMPTY => Ok(false), // Welp, we tried
264 DATA => Ok(true), // we have some un-acquired data
265 DISCONNECTED if self.data.is_some() => Ok(true), // we have data
266 DISCONNECTED => {
267 match mem::replace(&mut self.upgrade, SendUsed) {
268 // The other end sent us an upgrade, so we need to
269 // propagate upwards whether the upgrade can receive
270 // data
271 GoUp(upgrade) => Err(upgrade),
272
273 // If the other end disconnected without sending an
274 // upgrade, then we have data to receive (the channel is
275 // disconnected).
276 up => { self.upgrade = up; Ok(true) }
277 }
278 }
279 _ => unreachable!(), // we're the "one blocker"
280 }
281 }
282
283 // Attempts to start selection on this port. This can either succeed, fail
284 // because there is data, or fail because there is an upgrade pending.
285 pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
286 let n = unsafe { task.cast_to_uint() };
287 match self.state.compare_and_swap(EMPTY, n, atomics::SeqCst) {
288 EMPTY => SelSuccess,
289 DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }),
290 DISCONNECTED if self.data.is_some() => {
291 SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
292 }
293 DISCONNECTED => {
294 match mem::replace(&mut self.upgrade, SendUsed) {
295 // The other end sent us an upgrade, so we need to
296 // propagate upwards whether the upgrade can receive
297 // data
298 GoUp(upgrade) => {
299 SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) },
300 upgrade)
301 }
302
303 // If the other end disconnected without sending an
304 // upgrade, then we have data to receive (the channel is
305 // disconnected).
306 up => {
307 self.upgrade = up;
308 SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
309 }
310 }
311 }
312 _ => unreachable!(), // we're the "one blocker"
313 }
314 }
315
316 // Remove a previous selecting task from this port. This ensures that the
317 // blocked task will no longer be visible to any other threads.
318 //
319 // The return value indicates whether there's data on this port.
320 pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> {
321 let state = match self.state.load(atomics::SeqCst) {
322 // Each of these states means that no further activity will happen
323 // with regard to abortion selection
324 s @ EMPTY |
325 s @ DATA |
326 s @ DISCONNECTED => s,
327
328 // If we've got a blocked task, then use an atomic to gain ownership
329 // of it (may fail)
330 n => self.state.compare_and_swap(n, EMPTY, atomics::SeqCst)
331 };
332
333 // Now that we've got ownership of our state, figure out what to do
334 // about it.
335 match state {
336 EMPTY => unreachable!(),
337 // our task used for select was stolen
338 DATA => Ok(true),
339
340 // If the other end has hung up, then we have complete ownership
341 // of the port. First, check if there was data waiting for us. This
342 // is possible if the other end sent something and then hung up.
343 //
344 // We then need to check to see if there was an upgrade requested,
345 // and if so, the upgraded port needs to have its selection aborted.
346 DISCONNECTED => {
347 if self.data.is_some() {
348 Ok(true)
349 } else {
350 match mem::replace(&mut self.upgrade, SendUsed) {
351 GoUp(port) => Err(port),
352 _ => Ok(true),
353 }
354 }
355 }
356
357 // We woke ourselves up from select. Assert that the task should be
358 // trashed and returne that we don't have any data.
359 n => {
360 let t = unsafe { BlockedTask::cast_from_uint(n) };
361 t.trash();
362 Ok(false)
363 }
364 }
365 }
366 }
367
368 #[unsafe_destructor]
369 impl<T: Send> Drop for Packet<T> {
370 fn drop(&mut self) {
371 assert_eq!(self.state.load(atomics::SeqCst), DISCONNECTED);
372 }
373 }