(index<- ) ./libextra/arc.rs
1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12 * Concurrency-enabled mechanisms for sharing mutable and/or immutable state
13 * between tasks.
14 *
15 * # Example
16 *
17 * In this example, a large vector of floats is shared between several tasks.
18 * With simple pipes, without Arc, a copy would have to be made for each task.
19 *
20 * ~~~ {.rust}
21 * extern mod std;
22 * use extra::arc;
23 * let numbers=vec::from_fn(100, |ind| (ind as float)*rand::random());
24 * let shared_numbers=arc::Arc::new(numbers);
25 *
26 * do 10.times {
27 * let (port, chan) = stream();
28 * chan.send(shared_numbers.clone());
29 *
30 * do spawn {
31 * let shared_numbers=port.recv();
32 * let local_numbers=shared_numbers.get();
33 *
34 * // Work with the local numbers
35 * }
36 * }
37 * ~~~
38 */
39
40 #[allow(missing_doc)];
41
42
43 use sync;
44 use sync::{Mutex, RWLock};
45
46 use std::cast;
47 use std::unstable::sync::UnsafeArc;
48 use std::task;
49 use std::borrow;
50
51 /// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
52 pub struct Condvar<'self> {
53 priv is_mutex: bool,
54 priv failed: &'self mut bool,
55 priv cond: &'self sync::Condvar<'self>
56 }
57
58 impl<'self> Condvar<'self> {
59 /// Atomically exit the associated Arc and block until a signal is sent.
60 #[inline]
61 pub fn wait(&self) { self.wait_on(0) }
62
63 /**
64 * Atomically exit the associated Arc and block on a specified condvar
65 * until a signal is sent on that same condvar (as sync::cond.wait_on).
66 *
67 * wait() is equivalent to wait_on(0).
68 */
69 #[inline]
70 pub fn wait_on(&self, condvar_id: uint) {
71 assert!(!*self.failed);
72 self.cond.wait_on(condvar_id);
73 // This is why we need to wrap sync::condvar.
74 check_poison(self.is_mutex, *self.failed);
75 }
76
77 /// Wake up a blocked task. Returns false if there was no blocked task.
78 #[inline]
79 pub fn signal(&self) -> bool { self.signal_on(0) }
80
81 /**
82 * Wake up a blocked task on a specified condvar (as
83 * sync::cond.signal_on). Returns false if there was no blocked task.
84 */
85 #[inline]
86 pub fn signal_on(&self, condvar_id: uint) -> bool {
87 assert!(!*self.failed);
88 self.cond.signal_on(condvar_id)
89 }
90
91 /// Wake up all blocked tasks. Returns the number of tasks woken.
92 #[inline]
93 pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
94
95 /**
96 * Wake up all blocked tasks on a specified condvar (as
97 * sync::cond.broadcast_on). Returns the number of tasks woken.
98 */
99 #[inline]
100 pub fn broadcast_on(&self, condvar_id: uint) -> uint {
101 assert!(!*self.failed);
102 self.cond.broadcast_on(condvar_id)
103 }
104 }
105
106 /****************************************************************************
107 * Immutable Arc
108 ****************************************************************************/
109
110 /// An atomically reference counted wrapper for shared immutable state.
111 pub struct Arc<T> { priv x: UnsafeArc<T> }
112
113
114 /**
115 * Access the underlying data in an atomically reference counted
116 * wrapper.
117 */
118 impl<T:Freeze+Send> Arc<T> {
119 /// Create an atomically reference counted wrapper.
120 pub fn new(data: T) -> Arc<T> {
121 Arc { x: UnsafeArc::new(data) }
122 }
123
124 pub fn get<'a>(&'a self) -> &'a T {
125 unsafe { &*self.x.get_immut() }
126 }
127
128 /**
129 * Retrieve the data back out of the Arc. This function blocks until the
130 * reference given to it is the last existing one, and then unwrap the data
131 * instead of destroying it.
132 *
133 * If multiple tasks call unwrap, all but the first will fail. Do not call
134 * unwrap from a task that holds another reference to the same Arc; it is
135 * guaranteed to deadlock.
136 */
137 pub fn unwrap(self) -> T {
138 let Arc { x: x } = self;
139 x.unwrap()
140 }
141 }
142
143 impl<T:Freeze + Send> Clone for Arc<T> {
144 /**
145 * Duplicate an atomically reference counted wrapper.
146 *
147 * The resulting two `arc` objects will point to the same underlying data
148 * object. However, one of the `arc` objects can be sent to another task,
149 * allowing them to share the underlying data.
150 */
151 fn clone(&self) -> Arc<T> {
152 Arc { x: self.x.clone() }
153 }
154 }
155
156 /****************************************************************************
157 * Mutex protected Arc (unsafe)
158 ****************************************************************************/
159
160 #[doc(hidden)]
161 struct MutexArcInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
162
163 /// An Arc with mutable data protected by a blocking mutex.
164 #[no_freeze]
165 struct MutexArc<T> { priv x: UnsafeArc<MutexArcInner<T>> }
166
167
168 impl<T:Send> Clone for MutexArc<T> {
169 /// Duplicate a mutex-protected Arc. See arc::clone for more details.
170 fn clone(&self) -> MutexArc<T> {
171 // NB: Cloning the underlying mutex is not necessary. Its reference
172 // count would be exactly the same as the shared state's.
173 MutexArc { x: self.x.clone() }
174 }
175 }
176
177 impl<T:Send> MutexArc<T> {
178 /// Create a mutex-protected Arc with the supplied data.
179 pub fn new(user_data: T) -> MutexArc<T> {
180 MutexArc::new_with_condvars(user_data, 1)
181 }
182
183 /**
184 * Create a mutex-protected Arc with the supplied data and a specified number
185 * of condvars (as sync::Mutex::new_with_condvars).
186 */
187 pub fn new_with_condvars(user_data: T, num_condvars: uint) -> MutexArc<T> {
188 let data = MutexArcInner {
189 lock: Mutex::new_with_condvars(num_condvars),
190 failed: false, data: user_data
191 };
192 MutexArc { x: UnsafeArc::new(data) }
193 }
194
195 /**
196 * Access the underlying mutable data with mutual exclusion from other
197 * tasks. The argument closure will be run with the mutex locked; all
198 * other tasks wishing to access the data will block until the closure
199 * finishes running.
200 *
201 * The reason this function is 'unsafe' is because it is possible to
202 * construct a circular reference among multiple Arcs by mutating the
203 * underlying data. This creates potential for deadlock, but worse, this
204 * will guarantee a memory leak of all involved Arcs. Using MutexArcs
205 * inside of other Arcs is safe in absence of circular references.
206 *
207 * If you wish to nest MutexArcs, one strategy for ensuring safety at
208 * runtime is to add a "nesting level counter" inside the stored data, and
209 * when traversing the arcs, assert that they monotonically decrease.
210 *
211 * # Failure
212 *
213 * Failing while inside the Arc will unlock the Arc while unwinding, so
214 * that other tasks won't block forever. It will also poison the Arc:
215 * any tasks that subsequently try to access it (including those already
216 * blocked on the mutex) will also fail immediately.
217 */
218 #[inline]
219 pub unsafe fn unsafe_access<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
220 let state = self.x.get();
221 // Borrowck would complain about this if the function were
222 // not already unsafe. See borrow_rwlock, far below.
223 do (&(*state).lock).lock {
224 check_poison(true, (*state).failed);
225 let _z = PoisonOnFail(&mut (*state).failed);
226 blk(&mut (*state).data)
227 }
228 }
229
230 /// As unsafe_access(), but with a condvar, as sync::mutex.lock_cond().
231 #[inline]
232 pub unsafe fn unsafe_access_cond<'x, 'c, U>(&self,
233 blk: &fn(x: &'x mut T,
234 c: &'c Condvar) -> U)
235 -> U {
236 let state = self.x.get();
237 do (&(*state).lock).lock_cond |cond| {
238 check_poison(true, (*state).failed);
239 let _z = PoisonOnFail(&mut (*state).failed);
240 blk(&mut (*state).data,
241 &Condvar {is_mutex: true,
242 failed: &mut (*state).failed,
243 cond: cond })
244 }
245 }
246
247 /**
248 * Retrieves the data, blocking until all other references are dropped,
249 * exactly as arc::unwrap.
250 *
251 * Will additionally fail if another task has failed while accessing the arc.
252 */
253 pub fn unwrap(self) -> T {
254 let MutexArc { x: x } = self;
255 let inner = x.unwrap();
256 let MutexArcInner { failed: failed, data: data, _ } = inner;
257 if failed {
258 fail!(~"Can't unwrap poisoned MutexArc - another task failed inside!");
259 }
260 data
261 }
262 }
263
264 impl<T:Freeze + Send> MutexArc<T> {
265
266 /**
267 * As unsafe_access.
268 *
269 * The difference between access and unsafe_access is that the former
270 * forbids mutexes to be nested. While unsafe_access can be used on
271 * MutexArcs without freezable interiors, this safe version of access
272 * requires the Freeze bound, which prohibits access on MutexArcs which
273 * might contain nested MutexArcs inside.
274 *
275 * The purpose of this is to offer a safe implementation of MutexArc to be
276 * used instead of RWArc in cases where no readers are needed and sightly
277 * better performance is required.
278 *
279 * Both methods have the same failure behaviour as unsafe_access and
280 * unsafe_access_cond.
281 */
282 #[inline]
283 pub fn access<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
284 unsafe { self.unsafe_access(blk) }
285 }
286
287 /// As unsafe_access_cond but safe and Freeze.
288 #[inline]
289 pub fn access_cond<'x, 'c, U>(&self,
290 blk: &fn(x: &'x mut T,
291 c: &'c Condvar) -> U)
292 -> U {
293 unsafe { self.unsafe_access_cond(blk) }
294 }
295 }
296
297 // Common code for {mutex.access,rwlock.write}{,_cond}.
298 #[inline]
299 #[doc(hidden)]
300 fn check_poison(is_mutex: bool, failed: bool) {
301 if failed {
302 if is_mutex {
303 fail!("Poisoned MutexArc - another task failed inside!");
304 } else {
305 fail!("Poisoned rw_arc - another task failed inside!");
306 }
307 }
308 }
309
310 #[doc(hidden)]
311 struct PoisonOnFail {
312 failed: *mut bool,
313 }
314
315 impl Drop for PoisonOnFail {
316 fn drop(&mut self) {
317 unsafe {
318 /* assert!(!*self.failed);
319 -- might be false in case of cond.wait() */
320 if task::failing() {
321 *self.failed = true;
322 }
323 }
324 }
325 }
326
327 fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
328 PoisonOnFail {
329 failed: failed
330 }
331 }
332
333 /****************************************************************************
334 * R/W lock protected Arc
335 ****************************************************************************/
336
337 #[doc(hidden)]
338 struct RWArcInner<T> { priv lock: RWLock, priv failed: bool, priv data: T }
339 /**
340 * A dual-mode Arc protected by a reader-writer lock. The data can be accessed
341 * mutably or immutably, and immutably-accessing tasks may run concurrently.
342 *
343 * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested.
344 */
345 #[no_freeze]
346 struct RWArc<T> {
347 priv x: UnsafeArc<RWArcInner<T>>,
348 }
349
350 impl<T:Freeze + Send> Clone for RWArc<T> {
351 /// Duplicate a rwlock-protected Arc. See arc::clone for more details.
352 fn clone(&self) -> RWArc<T> {
353 RWArc { x: self.x.clone() }
354 }
355
356 }
357
358 impl<T:Freeze + Send> RWArc<T> {
359 /// Create a reader/writer Arc with the supplied data.
360 pub fn new(user_data: T) -> RWArc<T> {
361 RWArc::new_with_condvars(user_data, 1)
362 }
363
364 /**
365 * Create a reader/writer Arc with the supplied data and a specified number
366 * of condvars (as sync::RWLock::new_with_condvars).
367 */
368 pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWArc<T> {
369 let data = RWArcInner {
370 lock: RWLock::new_with_condvars(num_condvars),
371 failed: false, data: user_data
372 };
373 RWArc { x: UnsafeArc::new(data), }
374 }
375
376 /**
377 * Access the underlying data mutably. Locks the rwlock in write mode;
378 * other readers and writers will block.
379 *
380 * # Failure
381 *
382 * Failing while inside the Arc will unlock the Arc while unwinding, so
383 * that other tasks won't block forever. As MutexArc.access, it will also
384 * poison the Arc, so subsequent readers and writers will both also fail.
385 */
386 #[inline]
387 pub fn write<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
388 unsafe {
389 let state = self.x.get();
390 do (*borrow_rwlock(state)).write {
391 check_poison(false, (*state).failed);
392 let _z = PoisonOnFail(&mut (*state).failed);
393 blk(&mut (*state).data)
394 }
395 }
396 }
397
398 /// As write(), but with a condvar, as sync::rwlock.write_cond().
399 #[inline]
400 pub fn write_cond<'x, 'c, U>(&self,
401 blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
402 -> U {
403 unsafe {
404 let state = self.x.get();
405 do (*borrow_rwlock(state)).write_cond |cond| {
406 check_poison(false, (*state).failed);
407 let _z = PoisonOnFail(&mut (*state).failed);
408 blk(&mut (*state).data,
409 &Condvar {is_mutex: false,
410 failed: &mut (*state).failed,
411 cond: cond})
412 }
413 }
414 }
415
416 /**
417 * Access the underlying data immutably. May run concurrently with other
418 * reading tasks.
419 *
420 * # Failure
421 *
422 * Failing will unlock the Arc while unwinding. However, unlike all other
423 * access modes, this will not poison the Arc.
424 */
425 pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
426 unsafe {
427 let state = self.x.get();
428 do (*state).lock.read {
429 check_poison(false, (*state).failed);
430 blk(&(*state).data)
431 }
432 }
433 }
434
435 /**
436 * As write(), but with the ability to atomically 'downgrade' the lock.
437 * See sync::rwlock.write_downgrade(). The RWWriteMode token must be used
438 * to obtain the &mut T, and can be transformed into a RWReadMode token by
439 * calling downgrade(), after which a &T can be obtained instead.
440 *
441 * # Example
442 *
443 * ~~~ {.rust}
444 * do arc.write_downgrade |mut write_token| {
445 * do write_token.write_cond |state, condvar| {
446 * ... exclusive access with mutable state ...
447 * }
448 * let read_token = arc.downgrade(write_token);
449 * do read_token.read |state| {
450 * ... shared access with immutable state ...
451 * }
452 * }
453 * ~~~
454 */
455 pub fn write_downgrade<U>(&self, blk: &fn(v: RWWriteMode<T>) -> U) -> U {
456 unsafe {
457 let state = self.x.get();
458 do (*borrow_rwlock(state)).write_downgrade |write_mode| {
459 check_poison(false, (*state).failed);
460 blk(RWWriteMode {
461 data: &mut (*state).data,
462 token: write_mode,
463 poison: PoisonOnFail(&mut (*state).failed)
464 })
465 }
466 }
467 }
468
469 /// To be called inside of the write_downgrade block.
470 pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>)
471 -> RWReadMode<'a, T> {
472 unsafe {
473 // The rwlock should assert that the token belongs to us for us.
474 let state = self.x.get();
475 let RWWriteMode {
476 data: data,
477 token: t,
478 poison: _poison
479 } = token;
480 // Let readers in
481 let new_token = (*state).lock.downgrade(t);
482 // Whatever region the input reference had, it will be safe to use
483 // the same region for the output reference. (The only 'unsafe' part
484 // of this cast is removing the mutability.)
485 let new_data = cast::transmute_immut(data);
486 // Downgrade ensured the token belonged to us. Just a sanity check.
487 assert!(borrow::ref_eq(&(*state).data, new_data));
488 // Produce new token
489 RWReadMode {
490 data: new_data,
491 token: new_token,
492 }
493 }
494 }
495
496 /**
497 * Retrieves the data, blocking until all other references are dropped,
498 * exactly as arc::unwrap.
499 *
500 * Will additionally fail if another task has failed while accessing the arc
501 * in write mode.
502 */
503 pub fn unwrap(self) -> T {
504 let RWArc { x: x, _ } = self;
505 let inner = x.unwrap();
506 let RWArcInner { failed: failed, data: data, _ } = inner;
507 if failed {
508 fail!(~"Can't unwrap poisoned RWArc - another task failed inside!")
509 }
510 data
511 }
512 }
513
514 // Borrowck rightly complains about immutably aliasing the rwlock in order to
515 // lock it. This wraps the unsafety, with the justification that the 'lock'
516 // field is never overwritten; only 'failed' and 'data'.
517 #[doc(hidden)]
518 fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {
519 unsafe { cast::transmute(&(*state).lock) }
520 }
521
522 /// The "write permission" token used for RWArc.write_downgrade().
523 pub struct RWWriteMode<'self, T> {
524 data: &'self mut T,
525 token: sync::RWLockWriteMode<'self>,
526 poison: PoisonOnFail,
527 }
528
529 /// The "read permission" token used for RWArc.write_downgrade().
530 pub struct RWReadMode<'self, T> {
531 data: &'self T,
532 token: sync::RWLockReadMode<'self>,
533 }
534
535 impl<'self, T:Freeze + Send> RWWriteMode<'self, T> {
536 /// Access the pre-downgrade RWArc in write mode.
537 pub fn write<U>(&mut self, blk: &fn(x: &mut T) -> U) -> U {
538 match *self {
539 RWWriteMode {
540 data: &ref mut data,
541 token: ref token,
542 poison: _
543 } => {
544 do token.write {
545 blk(data)
546 }
547 }
548 }
549 }
550
551 /// Access the pre-downgrade RWArc in write mode with a condvar.
552 pub fn write_cond<'x, 'c, U>(&mut self,
553 blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
554 -> U {
555 match *self {
556 RWWriteMode {
557 data: &ref mut data,
558 token: ref token,
559 poison: ref poison
560 } => {
561 do token.write_cond |cond| {
562 unsafe {
563 let cvar = Condvar {
564 is_mutex: false,
565 failed: &mut *poison.failed,
566 cond: cond
567 };
568 blk(data, &cvar)
569 }
570 }
571 }
572 }
573 }
574 }
575
576 impl<'self, T:Freeze + Send> RWReadMode<'self, T> {
577 /// Access the post-downgrade rwlock in read mode.
578 pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
579 match *self {
580 RWReadMode {
581 data: data,
582 token: ref token
583 } => {
584 do token.read { blk(data) }
585 }
586 }
587 }
588 }
589
590 /****************************************************************************
591 * Tests
592 ****************************************************************************/
593
594 #[cfg(test)]
595 mod tests {
596
597 use arc::*;
598
599 use std::cell::Cell;
600 use std::comm;
601 use std::task;
602
603 #[test]
604 fn manually_share_arc() {
605 let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
606 let arc_v = Arc::new(v);
607
608 let (p, c) = comm::stream();
609
610 do task::spawn {
611 let arc_v: Arc<~[int]> = p.recv();
612
613 let v = arc_v.get().clone();
614 assert_eq!(v[3], 4);
615 };
616
617 c.send(arc_v.clone());
618
619 assert_eq!(arc_v.get()[2], 3);
620 assert_eq!(arc_v.get()[4], 5);
621
622 info!(arc_v);
623 }
624
625 #[test]
626 fn test_mutex_arc_condvar() {
627 let arc = ~MutexArc::new(false);
628 let arc2 = ~arc.clone();
629 let (p,c) = comm::oneshot();
630 let (c,p) = (Cell::new(c), Cell::new(p));
631 do task::spawn || {
632 // wait until parent gets in
633 p.take().recv();
634 do arc2.access_cond |state, cond| {
635 *state = true;
636 cond.signal();
637 }
638 }
639
640 do arc.access_cond |state, cond| {
641 c.take().send(());
642 assert!(!*state);
643 while !*state {
644 cond.wait();
645 }
646 }
647 }
648
649 #[test] #[should_fail]
650 fn test_arc_condvar_poison() {
651 let arc = ~MutexArc::new(1);
652 let arc2 = ~arc.clone();
653 let (p, c) = comm::stream();
654
655 do task::spawn_unlinked || {
656 let _ = p.recv();
657 do arc2.access_cond |one, cond| {
658 cond.signal();
659 // Parent should fail when it wakes up.
660 assert_eq!(*one, 0);
661 }
662 }
663
664 do arc.access_cond |one, cond| {
665 c.send(());
666 while *one == 1 {
667 cond.wait();
668 }
669 }
670 }
671
672 #[test] #[should_fail]
673 fn test_mutex_arc_poison() {
674 let arc = ~MutexArc::new(1);
675 let arc2 = ~arc.clone();
676 do task::try || {
677 do arc2.access |one| {
678 assert_eq!(*one, 2);
679 }
680 };
681 do arc.access |one| {
682 assert_eq!(*one, 1);
683 }
684 }
685
686 #[test] #[should_fail]
687 pub fn test_mutex_arc_unwrap_poison() {
688 let arc = MutexArc::new(1);
689 let arc2 = ~(&arc).clone();
690 let (p, c) = comm::stream();
691 do task::spawn {
692 do arc2.access |one| {
693 c.send(());
694 assert!(*one == 2);
695 }
696 }
697 let _ = p.recv();
698 let one = arc.unwrap();
699 assert!(one == 1);
700 }
701
702 #[test]
703 fn test_unsafe_mutex_arc_nested() {
704 unsafe {
705 // Tests nested mutexes and access
706 // to underlaying data.
707 let arc = ~MutexArc::new(1);
708 let arc2 = ~MutexArc::new(*arc);
709 do task::spawn || {
710 do (*arc2).unsafe_access |mutex| {
711 do (*mutex).access |one| {
712 assert!(*one == 1);
713 }
714 }
715 };
716 }
717 }
718
719 #[test] #[should_fail]
720 fn test_rw_arc_poison_wr() {
721 let arc = RWArc::new(1);
722 let arc2 = arc.clone();
723 do task::try {
724 do arc2.write |one| {
725 assert_eq!(*one, 2);
726 }
727 };
728 do arc.read |one| {
729 assert_eq!(*one, 1);
730 }
731 }
732
733 #[test] #[should_fail]
734 fn test_rw_arc_poison_ww() {
735 let arc = RWArc::new(1);
736 let arc2 = arc.clone();
737 do task::try {
738 do arc2.write |one| {
739 assert_eq!(*one, 2);
740 }
741 };
742 do arc.write |one| {
743 assert_eq!(*one, 1);
744 }
745 }
746 #[test] #[should_fail]
747 fn test_rw_arc_poison_dw() {
748 let arc = RWArc::new(1);
749 let arc2 = arc.clone();
750 do task::try {
751 do arc2.write_downgrade |mut write_mode| {
752 do write_mode.write |one| {
753 assert_eq!(*one, 2);
754 }
755 }
756 };
757 do arc.write |one| {
758 assert_eq!(*one, 1);
759 }
760 }
761 #[test]
762 fn test_rw_arc_no_poison_rr() {
763 let arc = RWArc::new(1);
764 let arc2 = arc.clone();
765 do task::try {
766 do arc2.read |one| {
767 assert_eq!(*one, 2);
768 }
769 };
770 do arc.read |one| {
771 assert_eq!(*one, 1);
772 }
773 }
774 #[test]
775 fn test_rw_arc_no_poison_rw() {
776 let arc = RWArc::new(1);
777 let arc2 = arc.clone();
778 do task::try {
779 do arc2.read |one| {
780 assert_eq!(*one, 2);
781 }
782 };
783 do arc.write |one| {
784 assert_eq!(*one, 1);
785 }
786 }
787 #[test]
788 fn test_rw_arc_no_poison_dr() {
789 let arc = RWArc::new(1);
790 let arc2 = arc.clone();
791 do task::try {
792 do arc2.write_downgrade |write_mode| {
793 let read_mode = arc2.downgrade(write_mode);
794 do read_mode.read |one| {
795 assert_eq!(*one, 2);
796 }
797 }
798 };
799 do arc.write |one| {
800 assert_eq!(*one, 1);
801 }
802 }
803 #[test]
804 fn test_rw_arc() {
805 let arc = RWArc::new(0);
806 let arc2 = arc.clone();
807 let (p, c) = comm::stream();
808
809 do task::spawn {
810 do arc2.write |num| {
811 do 10.times {
812 let tmp = *num;
813 *num = -1;
814 task::deschedule();
815 *num = tmp + 1;
816 }
817 c.send(());
818 }
819 }
820
821 // Readers try to catch the writer in the act
822 let mut children = ~[];
823 do 5.times {
824 let arc3 = arc.clone();
825 let mut builder = task::task();
826 builder.future_result(|r| children.push(r));
827 do builder.spawn {
828 do arc3.read |num| {
829 assert!(*num >= 0);
830 }
831 }
832 }
833
834 // Wait for children to pass their asserts
835 for r in children.iter() {
836 r.recv();
837 }
838
839 // Wait for writer to finish
840 p.recv();
841 do arc.read |num| {
842 assert_eq!(*num, 10);
843 }
844 }
845 #[test]
846 fn test_rw_downgrade() {
847 // (1) A downgrader gets in write mode and does cond.wait.
848 // (2) A writer gets in write mode, sets state to 42, and does signal.
849 // (3) Downgrader wakes, sets state to 31337.
850 // (4) tells writer and all other readers to contend as it downgrades.
851 // (5) Writer attempts to set state back to 42, while downgraded task
852 // and all reader tasks assert that it's 31337.
853 let arc = RWArc::new(0);
854
855 // Reader tasks
856 let mut reader_convos = ~[];
857 do 10.times {
858 let ((rp1, rc1), (rp2, rc2)) = (comm::stream(), comm::stream());
859 reader_convos.push((rc1, rp2));
860 let arcn = arc.clone();
861 do task::spawn {
862 rp1.recv(); // wait for downgrader to give go-ahead
863 do arcn.read |state| {
864 assert_eq!(*state, 31337);
865 rc2.send(());
866 }
867 }
868 }
869
870 // Writer task
871 let arc2 = arc.clone();
872 let ((wp1, wc1), (wp2, wc2)) = (comm::stream(), comm::stream());
873 do task::spawn || {
874 wp1.recv();
875 do arc2.write_cond |state, cond| {
876 assert_eq!(*state, 0);
877 *state = 42;
878 cond.signal();
879 }
880 wp1.recv();
881 do arc2.write |state| {
882 // This shouldn't happen until after the downgrade read
883 // section, and all other readers, finish.
884 assert_eq!(*state, 31337);
885 *state = 42;
886 }
887 wc2.send(());
888 }
889
890 // Downgrader (us)
891 do arc.write_downgrade |mut write_mode| {
892 do write_mode.write_cond |state, cond| {
893 wc1.send(()); // send to another writer who will wake us up
894 while *state == 0 {
895 cond.wait();
896 }
897 assert_eq!(*state, 42);
898 *state = 31337;
899 // send to other readers
900 for &(ref rc, _) in reader_convos.iter() {
901 rc.send(())
902 }
903 }
904 let read_mode = arc.downgrade(write_mode);
905 do read_mode.read |state| {
906 // complete handshake with other readers
907 for &(_, ref rp) in reader_convos.iter() {
908 rp.recv()
909 }
910 wc1.send(()); // tell writer to try again
911 assert_eq!(*state, 31337);
912 }
913 }
914
915 wp2.recv(); // complete handshake with writer
916 }
917 #[cfg(test)]
918 fn test_rw_write_cond_downgrade_read_race_helper() {
919 // Tests that when a downgrader hands off the "reader cloud" lock
920 // because of a contending reader, a writer can't race to get it
921 // instead, which would result in readers_and_writers. This tests
922 // the sync module rather than this one, but it's here because an
923 // rwarc gives us extra shared state to help check for the race.
924 // If you want to see this test fail, go to sync.rs and replace the
925 // line in RWLock::write_cond() that looks like:
926 // "blk(&Condvar { order: opt_lock, ..*cond })"
927 // with just "blk(cond)".
928 let x = RWArc::new(true);
929 let (wp, wc) = comm::stream();
930
931 // writer task
932 let xw = x.clone();
933 do task::spawn {
934 do xw.write_cond |state, c| {
935 wc.send(()); // tell downgrader it's ok to go
936 c.wait();
937 // The core of the test is here: the condvar reacquire path
938 // must involve order_lock, so that it cannot race with a reader
939 // trying to receive the "reader cloud lock hand-off".
940 *state = false;
941 }
942 }
943
944 wp.recv(); // wait for writer to get in
945
946 do x.write_downgrade |mut write_mode| {
947 do write_mode.write_cond |state, c| {
948 assert!(*state);
949 // make writer contend in the cond-reacquire path
950 c.signal();
951 }
952 // make a reader task to trigger the "reader cloud lock" handoff
953 let xr = x.clone();
954 let (rp, rc) = comm::stream();
955 do task::spawn {
956 rc.send(());
957 do xr.read |_state| { }
958 }
959 rp.recv(); // wait for reader task to exist
960
961 let read_mode = x.downgrade(write_mode);
962 do read_mode.read |state| {
963 // if writer mistakenly got in, make sure it mutates state
964 // before we assert on it
965 do 5.times { task::deschedule(); }
966 // make sure writer didn't get in.
967 assert!(*state);
968 }
969 }
970 }
971 #[test]
972 fn test_rw_write_cond_downgrade_read_race() {
973 // Ideally the above test case would have deschedule statements in it that
974 // helped to expose the race nearly 100% of the time... but adding
975 // deschedules in the intuitively-right locations made it even less likely,
976 // and I wasn't sure why :( . This is a mediocre "next best" option.
977 do 8.times { test_rw_write_cond_downgrade_read_race_helper() }
978 }
979 }
libextra/arc.rs:51:77-51:77 -struct- definition:
/// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
pub struct Condvar<'self> {
references:-409: &Condvar {is_mutex: false,
553: blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
563: let cvar = Condvar {
234: c: &'c Condvar) -> U)
291: c: &'c Condvar) -> U)
241: &Condvar {is_mutex: true,
401: blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
58: impl<'self> Condvar<'self> {
libextra/arc.rs:345:13-345:13 -struct- definition:
#[no_freeze]
struct RWArc<T> {
references:-353: RWArc { x: self.x.clone() }
352: fn clone(&self) -> RWArc<T> {
350: impl<T:Freeze + Send> Clone for RWArc<T> {
373: RWArc { x: UnsafeArc::new(data), }
360: pub fn new(user_data: T) -> RWArc<T> {
504: let RWArc { x: x, _ } = self;
368: pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWArc<T> {
358: impl<T:Freeze + Send> RWArc<T> {
libextra/workcache.rs:
297: lg: RWArc<Logger>,
230: db: RWArc<Database>,
291: lg: RWArc<Logger>,
290: pub fn new(db: RWArc<Database>,
296: pub fn new_with_freshness(db: RWArc<Database>,
231: logger: RWArc<Logger>,
libextra/arc.rs:326:1-326:1 -fn- definition:
fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
references:-392: let _z = PoisonOnFail(&mut (*state).failed);
225: let _z = PoisonOnFail(&mut (*state).failed);
239: let _z = PoisonOnFail(&mut (*state).failed);
407: let _z = PoisonOnFail(&mut (*state).failed);
463: poison: PoisonOnFail(&mut (*state).failed)
libextra/arc.rs:337:15-337:15 -struct- definition:
#[doc(hidden)]
struct RWArcInner<T> { priv lock: RWLock, priv failed: bool, priv data: T }
references:-347: priv x: UnsafeArc<RWArcInner<T>>,
369: let data = RWArcInner {
518: fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {
506: let RWArcInner { failed: failed, data: data, _ } = inner;
libextra/arc.rs:160:15-160:15 -struct- definition:
#[doc(hidden)]
struct MutexArcInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
references:-165: struct MutexArc<T> { priv x: UnsafeArc<MutexArcInner<T>> }
188: let data = MutexArcInner {
256: let MutexArcInner { failed: failed, data: data, _ } = inner;
libextra/arc.rs:299:15-299:15 -fn- definition:
#[doc(hidden)]
fn check_poison(is_mutex: bool, failed: bool) {
references:-459: check_poison(false, (*state).failed);
429: check_poison(false, (*state).failed);
74: check_poison(self.is_mutex, *self.failed);
391: check_poison(false, (*state).failed);
406: check_poison(false, (*state).failed);
224: check_poison(true, (*state).failed);
238: check_poison(true, (*state).failed);
libextra/arc.rs:517:15-517:15 -fn- definition:
#[doc(hidden)]
fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {
references:-458: do (*borrow_rwlock(state)).write_downgrade |write_mode| {
405: do (*borrow_rwlock(state)).write_cond |cond| {
390: do (*borrow_rwlock(state)).write {
libextra/arc.rs:164:13-164:13 -struct- definition:
#[no_freeze]
struct MutexArc<T> { priv x: UnsafeArc<MutexArcInner<T>> }
references:-173: MutexArc { x: self.x.clone() }
264: impl<T:Freeze + Send> MutexArc<T> {
179: pub fn new(user_data: T) -> MutexArc<T> {
187: pub fn new_with_condvars(user_data: T, num_condvars: uint) -> MutexArc<T> {
170: fn clone(&self) -> MutexArc<T> {
177: impl<T:Send> MutexArc<T> {
168: impl<T:Send> Clone for MutexArc<T> {
254: let MutexArc { x: x } = self;
192: MutexArc { x: UnsafeArc::new(data) }
libextra/arc.rs:529:66-529:66 -struct- definition:
/// The "read permission" token used for RWArc.write_downgrade().
pub struct RWReadMode<'self, T> {
references:-580: RWReadMode {
576: impl<'self, T:Freeze + Send> RWReadMode<'self, T> {
489: RWReadMode {
471: -> RWReadMode<'a, T> {
libextra/arc.rs:310:15-310:15 -struct- definition:
#[doc(hidden)]
struct PoisonOnFail {
references:-526: poison: PoisonOnFail,
315: impl Drop for PoisonOnFail {
328: PoisonOnFail {
327: fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
libextra/arc.rs:110:72-110:72 -struct- definition:
/// An atomically reference counted wrapper for shared immutable state.
pub struct Arc<T> { priv x: UnsafeArc<T> }
references:-120: pub fn new(data: T) -> Arc<T> {
151: fn clone(&self) -> Arc<T> {
118: impl<T:Freeze+Send> Arc<T> {
138: let Arc { x: x } = self;
143: impl<T:Freeze + Send> Clone for Arc<T> {
152: Arc { x: self.x.clone() }
121: Arc { x: UnsafeArc::new(data) }
libextra/workcache.rs:
299: freshness: Arc<FreshnessMap>) -> Context {
232: cfg: Arc<json::Object>,
239: freshness: Arc<FreshnessMap>
298: cfg: Arc<json::Object>,
292: cfg: Arc<json::Object>) -> Context {
libextra/arc.rs:522:67-522:67 -struct- definition:
/// The "write permission" token used for RWArc.write_downgrade().
pub struct RWWriteMode<'self, T> {
references:-475: let RWWriteMode {
556: RWWriteMode {
455: pub fn write_downgrade<U>(&self, blk: &fn(v: RWWriteMode<T>) -> U) -> U {
535: impl<'self, T:Freeze + Send> RWWriteMode<'self, T> {
460: blk(RWWriteMode {
539: RWWriteMode {
470: pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>)