(index<- ) ./libsync/arc.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12 * Concurrency-enabled mechanisms for sharing mutable and/or immutable state
13 * between tasks.
14 */
15
16 use std::cast;
17 use std::ptr;
18 use std::rt::global_heap;
19 use std::sync::atomics;
20
21 /// An atomically reference counted wrapper for shared state.
22 ///
23 /// # Example
24 ///
25 /// In this example, a large vector of floats is shared between several tasks.
26 /// With simple pipes, without `Arc`, a copy would have to be made for each
27 /// task.
28 ///
29 /// ```rust
30 /// use sync::Arc;
31 ///
32 /// fn main() {
33 /// let numbers = Vec::from_fn(100, |i| i as f32);
34 /// let shared_numbers = Arc::new(numbers);
35 ///
36 /// for _ in range(0, 10) {
37 /// let child_numbers = shared_numbers.clone();
38 ///
39 /// spawn(proc() {
40 /// let local_numbers = child_numbers.as_slice();
41 ///
42 /// // Work with the local numbers
43 /// });
44 /// }
45 /// }
46 /// ```
47 #[unsafe_no_drop_flag]
48 pub struct Arc<T> {
49 x: *mut ArcInner<T>,
50 }
51
52 /// A weak pointer to an `Arc`.
53 ///
54 /// Weak pointers will not keep the data inside of the `Arc` alive, and can be
55 /// used to break cycles between `Arc` pointers.
56 #[unsafe_no_drop_flag]
57 pub struct Weak<T> {
58 x: *mut ArcInner<T>,
59 }
60
61 struct ArcInner<T> {
62 strong: atomics::AtomicUint,
63 weak: atomics::AtomicUint,
64 data: T,
65 }
66
67 impl<T: Share + Send> Arc<T> {
68 /// Create an atomically reference counted wrapper.
69 #[inline]
70 pub fn new(data: T) -> Arc<T> {
71 // Start the weak pointer count as 1 which is the weak pointer that's
72 // held by all the strong pointers (kinda), see std/rc.rs for more info
73 let x = box ArcInner {
74 strong: atomics::AtomicUint::new(1),
75 weak: atomics::AtomicUint::new(1),
76 data: data,
77 };
78 Arc { x: unsafe { cast::transmute(x) } }
79 }
80
81 #[inline]
82 fn inner<'a>(&'a self) -> &'a ArcInner<T> {
83 // This unsafety is ok because while this arc is alive we're guaranteed
84 // that the inner pointer is valid. Furthermore, we know that the
85 // `ArcInner` structure itself is `Share` because the inner data is
86 // `Share` as well, so we're ok loaning out an immutable pointer to
87 // these contents.
88 unsafe { &*self.x }
89 }
90
91 /// Downgrades a strong pointer to a weak pointer
92 ///
93 /// Weak pointers will not keep the data alive. Once all strong references
94 /// to the underlying data have been dropped, the data itself will be
95 /// destroyed.
96 pub fn downgrade(&self) -> Weak<T> {
97 // See the clone() impl for why this is relaxed
98 self.inner().weak.fetch_add(1, atomics::Relaxed);
99 Weak { x: self.x }
100 }
101 }
102
103 impl<T: Share + Send> Clone for Arc<T> {
104 /// Duplicate an atomically reference counted wrapper.
105 ///
106 /// The resulting two `Arc` objects will point to the same underlying data
107 /// object. However, one of the `Arc` objects can be sent to another task,
108 /// allowing them to share the underlying data.
109 #[inline]
110 fn clone(&self) -> Arc<T> {
111 // Using a relaxed ordering is alright here, as knowledge of the
112 // original reference prevents other threads from erroneously deleting
113 // the object.
114 //
115 // As explained in the [Boost documentation][1], Increasing the
116 // reference counter can always be done with memory_order_relaxed: New
117 // references to an object can only be formed from an existing
118 // reference, and passing an existing reference from one thread to
119 // another must already provide any required synchronization.
120 //
121 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
122 self.inner().strong.fetch_add(1, atomics::Relaxed);
123 Arc { x: self.x }
124 }
125 }
126
127 impl<T: Send + Share> Deref<T> for Arc<T> {
128 #[inline]
129 fn deref<'a>(&'a self) -> &'a T {
130 &self.inner().data
131 }
132 }
133
134 impl<T: Send + Share + Clone> Arc<T> {
135 /// Acquires a mutable pointer to the inner contents by guaranteeing that
136 /// the reference count is one (no sharing is possible).
137 ///
138 /// This is also referred to as a copy-on-write operation because the inner
139 /// data is cloned if the reference count is greater than one.
140 #[inline]
141 #[experimental]
142 pub fn make_unique<'a>(&'a mut self) -> &'a mut T {
143 if self.inner().strong.load(atomics::SeqCst) != 1 {
144 *self = Arc::new(self.deref().clone())
145 }
146 // This unsafety is ok because we're guaranteed that the pointer
147 // returned is the *only* pointer that will ever be returned to T. Our
148 // reference count is guaranteed to be 1 at this point, and we required
149 // the Arc itself to be `mut`, so we're returning the only possible
150 // reference to the inner data.
151 unsafe { cast::transmute::<&_, &mut _>(self.deref()) }
152 }
153 }
154
155 #[unsafe_destructor]
156 impl<T: Share + Send> Drop for Arc<T> {
157 fn drop(&mut self) {
158 // This structure has #[unsafe_no_drop_flag], so this drop glue may run
159 // more than once (but it is guaranteed to be zeroed after the first if
160 // it's run more than once)
161 if self.x.is_null() { return }
162
163 // Because `fetch_sub` is already atomic, we do not need to synchronize
164 // with other threads unless we are going to delete the object. This
165 // same logic applies to the below `fetch_sub` to the `weak` count.
166 if self.inner().strong.fetch_sub(1, atomics::Release) != 1 { return }
167
168 // This fence is needed to prevent reordering of use of the data and
169 // deletion of the data. Because it is marked `Release`, the
170 // decreasing of the reference count sychronizes with this `Acquire`
171 // fence. This means that use of the data happens before decreasing
172 // the refernce count, which happens before this fence, which
173 // happens before the deletion of the data.
174 //
175 // As explained in the [Boost documentation][1],
176 //
177 // It is important to enforce any possible access to the object in
178 // one thread (through an existing reference) to *happen before*
179 // deleting the object in a different thread. This is achieved by a
180 // "release" operation after dropping a reference (any access to the
181 // object through this reference must obviously happened before),
182 // and an "acquire" operation before deleting the object.
183 //
184 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
185 atomics::fence(atomics::Acquire);
186
187 // Destroy the data at this time, even though we may not free the box
188 // allocation itself (there may still be weak pointers lying around).
189 unsafe { drop(ptr::read(&self.inner().data)); }
190
191 if self.inner().weak.fetch_sub(1, atomics::Release) == 1 {
192 atomics::fence(atomics::Acquire);
193 unsafe { global_heap::exchange_free(self.x as *u8) }
194 }
195 }
196 }
197
198 impl<T: Share + Send> Weak<T> {
199 /// Attempts to upgrade this weak reference to a strong reference.
200 ///
201 /// This method will fail to upgrade this reference if the strong reference
202 /// count has already reached 0, but if there are still other active strong
203 /// references this function will return a new strong reference to the data
204 pub fn upgrade(&self) -> Option<Arc<T>> {
205 // We use a CAS loop to increment the strong count instead of a
206 // fetch_add because once the count hits 0 is must never be above 0.
207 let inner = self.inner();
208 loop {
209 let n = inner.strong.load(atomics::SeqCst);
210 if n == 0 { return None }
211 let old = inner.strong.compare_and_swap(n, n + 1, atomics::SeqCst);
212 if old == n { return Some(Arc { x: self.x }) }
213 }
214 }
215
216 #[inline]
217 fn inner<'a>(&'a self) -> &'a ArcInner<T> {
218 // See comments above for why this is "safe"
219 unsafe { &*self.x }
220 }
221 }
222
223 impl<T: Share + Send> Clone for Weak<T> {
224 #[inline]
225 fn clone(&self) -> Weak<T> {
226 // See comments in Arc::clone() for why this is relaxed
227 self.inner().weak.fetch_add(1, atomics::Relaxed);
228 Weak { x: self.x }
229 }
230 }
231
232 #[unsafe_destructor]
233 impl<T: Share + Send> Drop for Weak<T> {
234 fn drop(&mut self) {
235 // see comments above for why this check is here
236 if self.x.is_null() { return }
237
238 // If we find out that we were the last weak pointer, then its time to
239 // deallocate the data entirely. See the discussion in Arc::drop() about
240 // the memory orderings
241 if self.inner().weak.fetch_sub(1, atomics::Release) == 1 {
242 atomics::fence(atomics::Acquire);
243 unsafe { global_heap::exchange_free(self.x as *u8) }
244 }
245 }
246 }
247
248 #[cfg(test)]
249 #[allow(experimental)]
250 mod tests {
251 use super::{Arc, Weak};
252 use std::sync::atomics;
253 use std::task;
254 use Mutex;
255
256 struct Canary(*mut atomics::AtomicUint);
257
258 impl Drop for Canary
259 {
260 fn drop(&mut self) {
261 unsafe {
262 match *self {
263 Canary(c) => {
264 (*c).fetch_add(1, atomics::SeqCst);
265 }
266 }
267 }
268 }
269 }
270
271 #[test]
272 fn manually_share_arc() {
273 let v = vec!(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
274 let arc_v = Arc::new(v);
275
276 let (tx, rx) = channel();
277
278 task::spawn(proc() {
279 let arc_v: Arc<Vec<int>> = rx.recv();
280 assert_eq!(*arc_v.get(3), 4);
281 });
282
283 tx.send(arc_v.clone());
284
285 assert_eq!(*arc_v.get(2), 3);
286 assert_eq!(*arc_v.get(4), 5);
287
288 info!("{:?}", arc_v);
289 }
290
291 #[test]
292 fn test_cowarc_clone_make_unique() {
293 let mut cow0 = Arc::new(75u);
294 let mut cow1 = cow0.clone();
295 let mut cow2 = cow1.clone();
296
297 assert!(75 == *cow0.make_unique());
298 assert!(75 == *cow1.make_unique());
299 assert!(75 == *cow2.make_unique());
300
301 *cow0.make_unique() += 1;
302 *cow1.make_unique() += 2;
303 *cow2.make_unique() += 3;
304
305 assert!(76 == *cow0);
306 assert!(77 == *cow1);
307 assert!(78 == *cow2);
308
309 // none should point to the same backing memory
310 assert!(*cow0 != *cow1);
311 assert!(*cow0 != *cow2);
312 assert!(*cow1 != *cow2);
313 }
314
315 #[test]
316 fn test_cowarc_clone_unique2() {
317 let mut cow0 = Arc::new(75u);
318 let cow1 = cow0.clone();
319 let cow2 = cow1.clone();
320
321 assert!(75 == *cow0);
322 assert!(75 == *cow1);
323 assert!(75 == *cow2);
324
325 *cow0.make_unique() += 1;
326
327 assert!(76 == *cow0);
328 assert!(75 == *cow1);
329 assert!(75 == *cow2);
330
331 // cow1 and cow2 should share the same contents
332 // cow0 should have a unique reference
333 assert!(*cow0 != *cow1);
334 assert!(*cow0 != *cow2);
335 assert!(*cow1 == *cow2);
336 }
337
338 #[test]
339 fn test_live() {
340 let x = Arc::new(5);
341 let y = x.downgrade();
342 assert!(y.upgrade().is_some());
343 }
344
345 #[test]
346 fn test_dead() {
347 let x = Arc::new(5);
348 let y = x.downgrade();
349 drop(x);
350 assert!(y.upgrade().is_none());
351 }
352
353 #[test]
354 fn weak_self_cyclic() {
355 struct Cycle {
356 x: Mutex<Option<Weak<Cycle>>>
357 }
358
359 let a = Arc::new(Cycle { x: Mutex::new(None) });
360 let b = a.clone().downgrade();
361 *a.deref().x.lock().deref_mut() = Some(b);
362
363 // hopefully we don't double-free (or leak)...
364 }
365
366 #[test]
367 fn drop_arc() {
368 let mut canary = atomics::AtomicUint::new(0);
369 let x = Arc::new(Canary(&mut canary as *mut atomics::AtomicUint));
370 drop(x);
371 assert!(canary.load(atomics::Acquire) == 1);
372 }
373
374 #[test]
375 fn drop_arc_weak() {
376 let mut canary = atomics::AtomicUint::new(0);
377 let arc = Arc::new(Canary(&mut canary as *mut atomics::AtomicUint));
378 let arc_weak = arc.downgrade();
379 assert!(canary.load(atomics::Acquire) == 0);
380 drop(arc);
381 assert!(canary.load(atomics::Acquire) == 1);
382 drop(arc_weak);
383 }
384 }
libsync/arc.rs:56:23-56:23 -struct- definition:
pub struct Weak<T> {
x: *mut ArcInner<T>,
}
references:- 798: self.inner().weak.fetch_add(1, atomics::Relaxed);
99: Weak { x: self.x }
100: }
--
227: self.inner().weak.fetch_add(1, atomics::Relaxed);
228: Weak { x: self.x }
229: }
--
233: impl<T: Share + Send> Drop for Weak<T> {
234: fn drop(&mut self) {
libsync/arc.rs:60:1-60:1 -struct- definition:
struct ArcInner<T> {
strong: atomics::AtomicUint,
weak: atomics::AtomicUint,
references:- 572: // held by all the strong pointers (kinda), see std/rc.rs for more info
73: let x = box ArcInner {
74: strong: atomics::AtomicUint::new(1),
--
81: #[inline]
82: fn inner<'a>(&'a self) -> &'a ArcInner<T> {
83: // This unsafety is ok because while this arc is alive we're guaranteed
--
216: #[inline]
217: fn inner<'a>(&'a self) -> &'a ArcInner<T> {
218: // See comments above for why this is "safe"
libsync/arc.rs:47:23-47:23 -struct- definition:
pub struct Arc<T> {
x: *mut ArcInner<T>,
}
references:- 11211: let old = inner.strong.compare_and_swap(n, n + 1, atomics::SeqCst);
212: if old == n { return Some(Arc { x: self.x }) }
213: }