(index<- ) ./libextra/sync.rs
1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /**
12 * The concurrency primitives you know and love.
13 *
14 * Maybe once we have a "core exports x only to std" mechanism, these can be
15 * in std.
16 */
17
18
19 use std::borrow;
20 use std::comm;
21 use std::comm::SendDeferred;
22 use std::comm::{GenericPort, Peekable};
23 use std::task;
24 use std::unstable::sync::{Exclusive, UnsafeArc};
25 use std::unstable::atomics;
26 use std::unstable::finally::Finally;
27 use std::util;
28 use std::util::NonCopyable;
29
30 /****************************************************************************
31 * Internals
32 ****************************************************************************/
33
34 // Each waiting task receives on one of these.
35 #[doc(hidden)]
36 type WaitEnd = comm::PortOne<()>;
37 #[doc(hidden)]
38 type SignalEnd = comm::ChanOne<()>;
39 // A doubly-ended queue of waiting tasks.
40 #[doc(hidden)]
41 struct WaitQueue { head: comm::Port<SignalEnd>,
42 tail: comm::Chan<SignalEnd> }
43
44 impl WaitQueue {
45 fn new() -> WaitQueue {
46 let (block_head, block_tail) = comm::stream();
47 WaitQueue { head: block_head, tail: block_tail }
48 }
49
50 // Signals one live task from the queue.
51 fn signal(&self) -> bool {
52 // The peek is mandatory to make sure recv doesn't block.
53 if self.head.peek() {
54 // Pop and send a wakeup signal. If the waiter was killed, its port
55 // will have closed. Keep trying until we get a live task.
56 if self.head.recv().try_send_deferred(()) {
57 true
58 } else {
59 self.signal()
60 }
61 } else {
62 false
63 }
64 }
65
66 fn broadcast(&self) -> uint {
67 let mut count = 0;
68 while self.head.peek() {
69 if self.head.recv().try_send_deferred(()) {
70 count += 1;
71 }
72 }
73 count
74 }
75
76 fn wait_end(&self) -> WaitEnd {
77 let (wait_end, signal_end) = comm::oneshot();
78 self.tail.send_deferred(signal_end);
79 wait_end
80 }
81 }
82
83 // The building-block used to make semaphores, mutexes, and rwlocks.
84 #[doc(hidden)]
85 struct SemInner<Q> {
86 count: int,
87 waiters: WaitQueue,
88 // Can be either unit or another waitqueue. Some sems shouldn't come with
89 // a condition variable attached, others should.
90 blocked: Q
91 }
92
93 #[doc(hidden)]
94 struct Sem<Q>(Exclusive<SemInner<Q>>);
95
96 #[doc(hidden)]
97 impl<Q:Send> Sem<Q> {
98 fn new(count: int, q: Q) -> Sem<Q> {
99 Sem(Exclusive::new(SemInner {
100 count: count, waiters: WaitQueue::new(), blocked: q }))
101 }
102
103 pub fn acquire(&self) {
104 unsafe {
105 let mut waiter_nobe = None;
106 do (**self).with |state| {
107 state.count -= 1;
108 if state.count < 0 {
109 // Create waiter nobe, enqueue ourself, and tell
110 // outer scope we need to block.
111 waiter_nobe = Some(state.waiters.wait_end());
112 }
113 }
114 // Uncomment if you wish to test for sem races. Not valgrind-friendly.
115 /* do 1000.times { task::deschedule(); } */
116 // Need to wait outside the exclusive.
117 if waiter_nobe.is_some() {
118 let _ = waiter_nobe.unwrap().recv();
119 }
120 }
121 }
122
123 pub fn release(&self) {
124 unsafe {
125 do (**self).with |state| {
126 state.count += 1;
127 if state.count <= 0 {
128 state.waiters.signal();
129 }
130 }
131 }
132 }
133
134 pub fn access<U>(&self, blk: &fn() -> U) -> U {
135 do task::unkillable {
136 do (|| {
137 self.acquire();
138 do task::rekillable { blk() }
139 }).finally {
140 self.release();
141 }
142 }
143 }
144 }
145
146 #[doc(hidden)]
147 impl Sem<~[WaitQueue]> {
148 fn new_and_signal(count: int, num_condvars: uint)
149 -> Sem<~[WaitQueue]> {
150 let mut queues = ~[];
151 do num_condvars.times {
152 queues.push(WaitQueue::new());
153 }
154 Sem::new(count, queues)
155 }
156 }
157
158 // FIXME(#3598): Want to use an Option down below, but we need a custom enum
159 // that's not polymorphic to get around the fact that lifetimes are invariant
160 // inside of type parameters.
161 enum ReacquireOrderLock<'self> {
162 Nothing, // c.c
163 Just(&'self Semaphore),
164 }
165
166 /// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
167 pub struct Condvar<'self> {
168 // The 'Sem' object associated with this condvar. This is the one that's
169 // atomically-unlocked-and-descheduled upon and reacquired during wakeup.
170 priv sem: &'self Sem<~[WaitQueue]>,
171 // This is (can be) an extra semaphore which is held around the reacquire
172 // operation on the first one. This is only used in cvars associated with
173 // rwlocks, and is needed to ensure that, when a downgrader is trying to
174 // hand off the access lock (which would be the first field, here), a 2nd
175 // writer waking up from a cvar wait can't race with a reader to steal it,
176 // See the comment in write_cond for more detail.
177 priv order: ReacquireOrderLock<'self>,
178 // Make sure condvars are non-copyable.
179 priv token: util::NonCopyable,
180 }
181
182 impl<'self> Condvar<'self> {
183 /**
184 * Atomically drop the associated lock, and block until a signal is sent.
185 *
186 * # Failure
187 * A task which is killed (i.e., by linked failure with another task)
188 * while waiting on a condition variable will wake up, fail, and unlock
189 * the associated lock as it unwinds.
190 */
191 pub fn wait(&self) { self.wait_on(0) }
192
193 /**
194 * As wait(), but can specify which of multiple condition variables to
195 * wait on. Only a signal_on() or broadcast_on() with the same condvar_id
196 * will wake this thread.
197 *
198 * The associated lock must have been initialised with an appropriate
199 * number of condvars. The condvar_id must be between 0 and num_condvars-1
200 * or else this call will fail.
201 *
202 * wait() is equivalent to wait_on(0).
203 */
204 pub fn wait_on(&self, condvar_id: uint) {
205 let mut WaitEnd = None;
206 let mut out_of_bounds = None;
207 do task::unkillable {
208 // Release lock, 'atomically' enqueuing ourselves in so doing.
209 unsafe {
210 do (**self.sem).with |state| {
211 if condvar_id < state.blocked.len() {
212 // Drop the lock.
213 state.count += 1;
214 if state.count <= 0 {
215 state.waiters.signal();
216 }
217 // Create waiter nobe, and enqueue ourself to
218 // be woken up by a signaller.
219 WaitEnd = Some(state.blocked[condvar_id].wait_end());
220 } else {
221 out_of_bounds = Some(state.blocked.len());
222 }
223 }
224 }
225
226 // If deschedule checks start getting inserted anywhere, we can be
227 // killed before or after enqueueing. Deciding whether to
228 // unkillably reacquire the lock needs to happen atomically
229 // wrt enqueuing.
230 do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
231 // Unconditionally "block". (Might not actually block if a
232 // signaller already sent -- I mean 'unconditionally' in contrast
233 // with acquire().)
234 do (|| {
235 do task::rekillable {
236 let _ = WaitEnd.take_unwrap().recv();
237 }
238 }).finally {
239 // Reacquire the condvar. Note this is back in the unkillable
240 // section; it needs to succeed, instead of itself dying.
241 match self.order {
242 Just(lock) => do lock.access {
243 self.sem.acquire();
244 },
245 Nothing => {
246 self.sem.acquire();
247 },
248 }
249 }
250 }
251 }
252 }
253
254 /// Wake up a blocked task. Returns false if there was no blocked task.
255 pub fn signal(&self) -> bool { self.signal_on(0) }
256
257 /// As signal, but with a specified condvar_id. See wait_on.
258 pub fn signal_on(&self, condvar_id: uint) -> bool {
259 unsafe {
260 let mut out_of_bounds = None;
261 let mut result = false;
262 do (**self.sem).with |state| {
263 if condvar_id < state.blocked.len() {
264 result = state.blocked[condvar_id].signal();
265 } else {
266 out_of_bounds = Some(state.blocked.len());
267 }
268 }
269 do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
270 result
271 }
272 }
273 }
274
275 /// Wake up all blocked tasks. Returns the number of tasks woken.
276 pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
277
278 /// As broadcast, but with a specified condvar_id. See wait_on.
279 pub fn broadcast_on(&self, condvar_id: uint) -> uint {
280 let mut out_of_bounds = None;
281 let mut queue = None;
282 unsafe {
283 do (**self.sem).with |state| {
284 if condvar_id < state.blocked.len() {
285 // To avoid :broadcast_heavy, we make a new waitqueue,
286 // swap it out with the old one, and broadcast on the
287 // old one outside of the little-lock.
288 queue = Some(util::replace(&mut state.blocked[condvar_id],
289 WaitQueue::new()));
290 } else {
291 out_of_bounds = Some(state.blocked.len());
292 }
293 }
294 do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
295 let queue = queue.take_unwrap();
296 queue.broadcast()
297 }
298 }
299 }
300 }
301
302 // Checks whether a condvar ID was out of bounds, and fails if so, or does
303 // something else next on success.
304 #[inline]
305 #[doc(hidden)]
306 fn check_cvar_bounds<U>(out_of_bounds: Option<uint>, id: uint, act: &str,
307 blk: &fn() -> U) -> U {
308 match out_of_bounds {
309 Some(0) =>
310 fail!("%s with illegal ID %u - this lock has no condvars!", act, id),
311 Some(length) =>
312 fail!("%s with illegal ID %u - ID must be less than %u", act, id, length),
313 None => blk()
314 }
315 }
316
317 #[doc(hidden)]
318 impl Sem<~[WaitQueue]> {
319 // The only other places that condvars get built are rwlock.write_cond()
320 // and rwlock_write_mode.
321 pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
322 do self.access {
323 blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() })
324 }
325 }
326 }
327
328 /****************************************************************************
329 * Semaphores
330 ****************************************************************************/
331
332 /// A counting, blocking, bounded-waiting semaphore.
333 struct Semaphore { priv sem: Sem<()> }
334
335
336 impl Clone for Semaphore {
337 /// Create a new handle to the semaphore.
338 fn clone(&self) -> Semaphore {
339 Semaphore { sem: Sem((*self.sem).clone()) }
340 }
341 }
342
343 impl Semaphore {
344 /// Create a new semaphore with the specified count.
345 pub fn new(count: int) -> Semaphore {
346 Semaphore { sem: Sem::new(count, ()) }
347 }
348
349 /**
350 * Acquire a resource represented by the semaphore. Blocks if necessary
351 * until resource(s) become available.
352 */
353 pub fn acquire(&self) { (&self.sem).acquire() }
354
355 /**
356 * Release a held resource represented by the semaphore. Wakes a blocked
357 * contending task, if any exist. Won't block the caller.
358 */
359 pub fn release(&self) { (&self.sem).release() }
360
361 /// Run a function with ownership of one of the semaphore's resources.
362 pub fn access<U>(&self, blk: &fn() -> U) -> U { (&self.sem).access(blk) }
363 }
364
365 /****************************************************************************
366 * Mutexes
367 ****************************************************************************/
368
369 /**
370 * A blocking, bounded-waiting, mutual exclusion lock with an associated
371 * FIFO condition variable.
372 *
373 * # Failure
374 * A task which fails while holding a mutex will unlock the mutex as it
375 * unwinds.
376 */
377 pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
378
379 impl Clone for Mutex {
380 /// Create a new handle to the mutex.
381 fn clone(&self) -> Mutex { Mutex { sem: Sem((*self.sem).clone()) } }
382 }
383
384 impl Mutex {
385 /// Create a new mutex, with one associated condvar.
386 pub fn new() -> Mutex { Mutex::new_with_condvars(1) }
387
388 /**
389 * Create a new mutex, with a specified number of associated condvars. This
390 * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between
391 * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but
392 * any operations on the condvar will fail.)
393 */
394 pub fn new_with_condvars(num_condvars: uint) -> Mutex {
395 Mutex { sem: Sem::new_and_signal(1, num_condvars) }
396 }
397
398
399 /// Run a function with ownership of the mutex.
400 pub fn lock<U>(&self, blk: &fn() -> U) -> U {
401 (&self.sem).access(blk)
402 }
403
404 /// Run a function with ownership of the mutex and a handle to a condvar.
405 pub fn lock_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
406 (&self.sem).access_cond(blk)
407 }
408 }
409
410 /****************************************************************************
411 * Reader-writer locks
412 ****************************************************************************/
413
414 // NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem
415
416 #[doc(hidden)]
417 struct RWLockInner {
418 // You might ask, "Why don't you need to use an atomic for the mode flag?"
419 // This flag affects the behaviour of readers (for plain readers, they
420 // assert on it; for downgraders, they use it to decide which mode to
421 // unlock for). Consider that the flag is only unset when the very last
422 // reader exits; therefore, it can never be unset during a reader/reader
423 // (or reader/downgrader) race.
424 // By the way, if we didn't care about the assert in the read unlock path,
425 // we could instead store the mode flag in write_downgrade's stack frame,
426 // and have the downgrade tokens store a borrowed pointer to it.
427 read_mode: bool,
428 // The only way the count flag is ever accessed is with xadd. Since it is
429 // a read-modify-write operation, multiple xadds on different cores will
430 // always be consistent with respect to each other, so a monotonic/relaxed
431 // consistency ordering suffices (i.e., no extra barriers are needed).
432 // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
433 // acquire/release orderings superfluously. Change these someday.
434 read_count: atomics::AtomicUint,
435 }
436
437 /**
438 * A blocking, no-starvation, reader-writer lock with an associated condvar.
439 *
440 * # Failure
441 * A task which fails while holding an rwlock will unlock the rwlock as it
442 * unwinds.
443 */
444 pub struct RWLock {
445 priv order_lock: Semaphore,
446 priv access_lock: Sem<~[WaitQueue]>,
447 priv state: UnsafeArc<RWLockInner>,
448 }
449
450 impl RWLock {
451 /// Create a new rwlock, with one associated condvar.
452 pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
453
454 /**
455 * Create a new rwlock, with a specified number of associated condvars.
456 * Similar to mutex_with_condvars.
457 */
458 pub fn new_with_condvars(num_condvars: uint) -> RWLock {
459 let state = UnsafeArc::new(RWLockInner {
460 read_mode: false,
461 read_count: atomics::AtomicUint::new(0),
462 });
463 RWLock { order_lock: Semaphore::new(1),
464 access_lock: Sem::new_and_signal(1, num_condvars),
465 state: state, }
466 }
467
468 /// Create a new handle to the rwlock.
469 pub fn clone(&self) -> RWLock {
470 RWLock { order_lock: (&(self.order_lock)).clone(),
471 access_lock: Sem((*self.access_lock).clone()),
472 state: self.state.clone() }
473 }
474
475 /**
476 * Run a function with the rwlock in read mode. Calls to 'read' from other
477 * tasks may run concurrently with this one.
478 */
479 pub fn read<U>(&self, blk: &fn() -> U) -> U {
480 unsafe {
481 do task::unkillable {
482 do (&self.order_lock).access {
483 let state = &mut *self.state.get();
484 let old_count = state.read_count.fetch_add(1, atomics::Acquire);
485 if old_count == 0 {
486 (&self.access_lock).acquire();
487 state.read_mode = true;
488 }
489 }
490 do (|| {
491 do task::rekillable { blk() }
492 }).finally {
493 let state = &mut *self.state.get();
494 assert!(state.read_mode);
495 let old_count = state.read_count.fetch_sub(1, atomics::Release);
496 assert!(old_count > 0);
497 if old_count == 1 {
498 state.read_mode = false;
499 // Note: this release used to be outside of a locked access
500 // to exclusive-protected state. If this code is ever
501 // converted back to such (instead of using atomic ops),
502 // this access MUST NOT go inside the exclusive access.
503 (&self.access_lock).release();
504 }
505 }
506 }
507 }
508 }
509
510 /**
511 * Run a function with the rwlock in write mode. No calls to 'read' or
512 * 'write' from other tasks will run concurrently with this one.
513 */
514 pub fn write<U>(&self, blk: &fn() -> U) -> U {
515 do task::unkillable {
516 (&self.order_lock).acquire();
517 do (&self.access_lock).access {
518 (&self.order_lock).release();
519 do task::rekillable {
520 blk()
521 }
522 }
523 }
524 }
525
526 /**
527 * As write(), but also with a handle to a condvar. Waiting on this
528 * condvar will allow readers and writers alike to take the rwlock before
529 * the waiting task is signalled. (Note: a writer that waited and then
530 * was signalled might reacquire the lock before other waiting writers.)
531 */
532 pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
533 // It's important to thread our order lock into the condvar, so that
534 // when a cond.wait() wakes up, it uses it while reacquiring the
535 // access lock. If we permitted a waking-up writer to "cut in line",
536 // there could arise a subtle race when a downgrader attempts to hand
537 // off the reader cloud lock to a waiting reader. This race is tested
538 // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like:
539 // T1 (writer) T2 (downgrader) T3 (reader)
540 // [in cond.wait()]
541 // [locks for writing]
542 // [holds access_lock]
543 // [is signalled, perhaps by
544 // downgrader or a 4th thread]
545 // tries to lock access(!)
546 // lock order_lock
547 // xadd read_count[0->1]
548 // tries to lock access
549 // [downgrade]
550 // xadd read_count[1->2]
551 // unlock access
552 // Since T1 contended on the access lock before T3 did, it will steal
553 // the lock handoff. Adding order_lock in the condvar reacquire path
554 // solves this because T1 will hold order_lock while waiting on access,
555 // which will cause T3 to have to wait until T1 finishes its write,
556 // which can't happen until T2 finishes the downgrade-read entirely.
557 // The astute reader will also note that making waking writers use the
558 // order_lock is better for not starving readers.
559 do task::unkillable {
560 (&self.order_lock).acquire();
561 do (&self.access_lock).access_cond |cond| {
562 (&self.order_lock).release();
563 do task::rekillable {
564 let opt_lock = Just(&self.order_lock);
565 blk(&Condvar { sem: cond.sem, order: opt_lock,
566 token: NonCopyable::new() })
567 }
568 }
569 }
570 }
571
572 /**
573 * As write(), but with the ability to atomically 'downgrade' the lock;
574 * i.e., to become a reader without letting other writers get the lock in
575 * the meantime (such as unlocking and then re-locking as a reader would
576 * do). The block takes a "write mode token" argument, which can be
577 * transformed into a "read mode token" by calling downgrade(). Example:
578 *
579 * # Example
580 *
581 * ~~~ {.rust}
582 * do lock.write_downgrade |mut write_token| {
583 * do write_token.write_cond |condvar| {
584 * ... exclusive access ...
585 * }
586 * let read_token = lock.downgrade(write_token);
587 * do read_token.read {
588 * ... shared access ...
589 * }
590 * }
591 * ~~~
592 */
593 pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U {
594 // Implementation slightly different from the slicker 'write's above.
595 // The exit path is conditional on whether the caller downgrades.
596 do task::unkillable {
597 (&self.order_lock).acquire();
598 (&self.access_lock).acquire();
599 (&self.order_lock).release();
600 do (|| {
601 do task::rekillable {
602 blk(RWLockWriteMode { lock: self, token: NonCopyable::new() })
603 }
604 }).finally {
605 let writer_or_last_reader;
606 // Check if we're releasing from read mode or from write mode.
607 let state = unsafe { &mut *self.state.get() };
608 if state.read_mode {
609 // Releasing from read mode.
610 let old_count = state.read_count.fetch_sub(1, atomics::Release);
611 assert!(old_count > 0);
612 // Check if other readers remain.
613 if old_count == 1 {
614 // Case 1: Writer downgraded & was the last reader
615 writer_or_last_reader = true;
616 state.read_mode = false;
617 } else {
618 // Case 2: Writer downgraded & was not the last reader
619 writer_or_last_reader = false;
620 }
621 } else {
622 // Case 3: Writer did not downgrade
623 writer_or_last_reader = true;
624 }
625 if writer_or_last_reader {
626 // Nobody left inside; release the "reader cloud" lock.
627 (&self.access_lock).release();
628 }
629 }
630 }
631 }
632
633 /// To be called inside of the write_downgrade block.
634 pub fn downgrade<'a>(&self, token: RWLockWriteMode<'a>)
635 -> RWLockReadMode<'a> {
636 if !borrow::ref_eq(self, token.lock) {
637 fail!("Can't downgrade() with a different rwlock's write_mode!");
638 }
639 unsafe {
640 do task::unkillable {
641 let state = &mut *self.state.get();
642 assert!(!state.read_mode);
643 state.read_mode = true;
644 // If a reader attempts to enter at this point, both the
645 // downgrader and reader will set the mode flag. This is fine.
646 let old_count = state.read_count.fetch_add(1, atomics::Release);
647 // If another reader was already blocking, we need to hand-off
648 // the "reader cloud" access lock to them.
649 if old_count != 0 {
650 // Guaranteed not to let another writer in, because
651 // another reader was holding the order_lock. Hence they
652 // must be the one to get the access_lock (because all
653 // access_locks are acquired with order_lock held). See
654 // the comment in write_cond for more justification.
655 (&self.access_lock).release();
656 }
657 }
658 }
659 RWLockReadMode { lock: token.lock, token: NonCopyable::new() }
660 }
661 }
662
663 /// The "write permission" token used for rwlock.write_downgrade().
664 pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
665
666 /// The "read permission" token used for rwlock.write_downgrade().
667 pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
668 priv token: NonCopyable }
669
670 impl<'self> RWLockWriteMode<'self> {
671 /// Access the pre-downgrade rwlock in write mode.
672 pub fn write<U>(&self, blk: &fn() -> U) -> U { blk() }
673 /// Access the pre-downgrade rwlock in write mode with a condvar.
674 pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
675 // Need to make the condvar use the order lock when reacquiring the
676 // access lock. See comment in RWLock::write_cond for why.
677 blk(&Condvar { sem: &self.lock.access_lock,
678 order: Just(&self.lock.order_lock),
679 token: NonCopyable::new() })
680 }
681 }
682
683 impl<'self> RWLockReadMode<'self> {
684 /// Access the post-downgrade rwlock in read mode.
685 pub fn read<U>(&self, blk: &fn() -> U) -> U { blk() }
686 }
687
688 /****************************************************************************
689 * Tests
690 ****************************************************************************/
691
692 #[cfg(test)]
693 mod tests {
694 use sync::*;
695
696 use std::cast;
697 use std::cell::Cell;
698 use std::comm;
699 use std::result;
700 use std::task;
701
702 /************************************************************************
703 * Semaphore tests
704 ************************************************************************/
705 #[test]
706 fn test_sem_acquire_release() {
707 let s = Semaphore::new(1);
708 s.acquire();
709 s.release();
710 s.acquire();
711 }
712 #[test]
713 fn test_sem_basic() {
714 let s = Semaphore::new(1);
715 do s.access { }
716 }
717 #[test]
718 fn test_sem_as_mutex() {
719 let s = Semaphore::new(1);
720 let s2 = s.clone();
721 do task::spawn {
722 do s2.access {
723 do 5.times { task::deschedule(); }
724 }
725 }
726 do s.access {
727 do 5.times { task::deschedule(); }
728 }
729 }
730 #[test]
731 fn test_sem_as_cvar() {
732 /* Child waits and parent signals */
733 let (p, c) = comm::stream();
734 let s = Semaphore::new(0);
735 let s2 = s.clone();
736 do task::spawn {
737 s2.acquire();
738 c.send(());
739 }
740 do 5.times { task::deschedule(); }
741 s.release();
742 let _ = p.recv();
743
744 /* Parent waits and child signals */
745 let (p, c) = comm::stream();
746 let s = Semaphore::new(0);
747 let s2 = s.clone();
748 do task::spawn {
749 do 5.times { task::deschedule(); }
750 s2.release();
751 let _ = p.recv();
752 }
753 s.acquire();
754 c.send(());
755 }
756 #[test]
757 fn test_sem_multi_resource() {
758 // Parent and child both get in the critical section at the same
759 // time, and shake hands.
760 let s = Semaphore::new(2);
761 let s2 = s.clone();
762 let (p1,c1) = comm::stream();
763 let (p2,c2) = comm::stream();
764 do task::spawn {
765 do s2.access {
766 let _ = p2.recv();
767 c1.send(());
768 }
769 }
770 do s.access {
771 c2.send(());
772 let _ = p1.recv();
773 }
774 }
775 #[test]
776 fn test_sem_runtime_friendly_blocking() {
777 // Force the runtime to schedule two threads on the same sched_loop.
778 // When one blocks, it should schedule the other one.
779 do task::spawn_sched(task::SingleThreaded) {
780 let s = Semaphore::new(1);
781 let s2 = s.clone();
782 let (p, c) = comm::stream();
783 let child_data = Cell::new((s2, c));
784 do s.access {
785 let (s2, c) = child_data.take();
786 do task::spawn {
787 c.send(());
788 do s2.access { }
789 c.send(());
790 }
791 let _ = p.recv(); // wait for child to come alive
792 do 5.times { task::deschedule(); } // let the child contend
793 }
794 let _ = p.recv(); // wait for child to be done
795 }
796 }
797 /************************************************************************
798 * Mutex tests
799 ************************************************************************/
800 #[test]
801 fn test_mutex_lock() {
802 // Unsafely achieve shared state, and do the textbook
803 // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
804 let (p, c) = comm::stream();
805 let m = Mutex::new();
806 let m2 = m.clone();
807 let mut sharedstate = ~0;
808 {
809 let ptr: *int = &*sharedstate;
810 do task::spawn {
811 let sharedstate: &mut int =
812 unsafe { cast::transmute(ptr) };
813 access_shared(sharedstate, &m2, 10);
814 c.send(());
815
816 }
817 }
818 {
819 access_shared(sharedstate, &m, 10);
820 let _ = p.recv();
821
822 assert_eq!(*sharedstate, 20);
823 }
824
825 fn access_shared(sharedstate: &mut int, m: &Mutex, n: uint) {
826 do n.times {
827 do m.lock {
828 let oldval = *sharedstate;
829 task::deschedule();
830 *sharedstate = oldval + 1;
831 }
832 }
833 }
834 }
835 #[test]
836 fn test_mutex_cond_wait() {
837 let m = Mutex::new();
838
839 // Child wakes up parent
840 do m.lock_cond |cond| {
841 let m2 = m.clone();
842 do task::spawn {
843 do m2.lock_cond |cond| {
844 let woken = cond.signal();
845 assert!(woken);
846 }
847 }
848 cond.wait();
849 }
850 // Parent wakes up child
851 let (port,chan) = comm::stream();
852 let m3 = m.clone();
853 do task::spawn {
854 do m3.lock_cond |cond| {
855 chan.send(());
856 cond.wait();
857 chan.send(());
858 }
859 }
860 let _ = port.recv(); // Wait until child gets in the mutex
861 do m.lock_cond |cond| {
862 let woken = cond.signal();
863 assert!(woken);
864 }
865 let _ = port.recv(); // Wait until child wakes up
866 }
867 #[cfg(test)]
868 fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
869 let m = Mutex::new();
870 let mut ports = ~[];
871
872 do num_waiters.times {
873 let mi = m.clone();
874 let (port, chan) = comm::stream();
875 ports.push(port);
876 do task::spawn {
877 do mi.lock_cond |cond| {
878 chan.send(());
879 cond.wait();
880 chan.send(());
881 }
882 }
883 }
884
885 // wait until all children get in the mutex
886 for port in ports.iter() { let _ = port.recv(); }
887 do m.lock_cond |cond| {
888 let num_woken = cond.broadcast();
889 assert_eq!(num_woken, num_waiters);
890 }
891 // wait until all children wake up
892 for port in ports.iter() { let _ = port.recv(); }
893 }
894 #[test]
895 fn test_mutex_cond_broadcast() {
896 test_mutex_cond_broadcast_helper(12);
897 }
898 #[test]
899 fn test_mutex_cond_broadcast_none() {
900 test_mutex_cond_broadcast_helper(0);
901 }
902 #[test]
903 fn test_mutex_cond_no_waiter() {
904 let m = Mutex::new();
905 let m2 = m.clone();
906 do task::try {
907 do m.lock_cond |_x| { }
908 };
909 do m2.lock_cond |cond| {
910 assert!(!cond.signal());
911 }
912 }
913 #[test]
914 fn test_mutex_killed_simple() {
915 // Mutex must get automatically unlocked if failed/killed within.
916 let m = Mutex::new();
917 let m2 = m.clone();
918
919 let result: result::Result<(),()> = do task::try {
920 do m2.lock {
921 fail!();
922 }
923 };
924 assert!(result.is_err());
925 // child task must have finished by the time try returns
926 do m.lock { }
927 }
928 #[ignore(reason = "linked failure")]
929 #[test]
930 fn test_mutex_killed_cond() {
931 // Getting killed during cond wait must not corrupt the mutex while
932 // unwinding (e.g. double unlock).
933 let m = Mutex::new();
934 let m2 = m.clone();
935
936 let result: result::Result<(),()> = do task::try {
937 let (p, c) = comm::stream();
938 do task::spawn || { // linked
939 let _ = p.recv(); // wait for sibling to get in the mutex
940 task::deschedule();
941 fail!();
942 }
943 do m2.lock_cond |cond| {
944 c.send(()); // tell sibling go ahead
945 cond.wait(); // block forever
946 }
947 };
948 assert!(result.is_err());
949 // child task must have finished by the time try returns
950 do m.lock_cond |cond| {
951 let woken = cond.signal();
952 assert!(!woken);
953 }
954 }
955 #[ignore(reason = "linked failure")]
956 #[test]
957 fn test_mutex_killed_broadcast() {
958 use std::unstable::finally::Finally;
959
960 let m = Mutex::new();
961 let m2 = m.clone();
962 let (p, c) = comm::stream();
963
964 let result: result::Result<(),()> = do task::try {
965 let mut sibling_convos = ~[];
966 do 2.times {
967 let (p, c) = comm::stream();
968 let c = Cell::new(c);
969 sibling_convos.push(p);
970 let mi = m2.clone();
971 // spawn sibling task
972 do task::spawn { // linked
973 do mi.lock_cond |cond| {
974 let c = c.take();
975 c.send(()); // tell sibling to go ahead
976 do (|| {
977 cond.wait(); // block forever
978 }).finally {
979 error!("task unwinding and sending");
980 c.send(());
981 error!("task unwinding and done sending");
982 }
983 }
984 }
985 }
986 for p in sibling_convos.iter() {
987 let _ = p.recv(); // wait for sibling to get in the mutex
988 }
989 do m2.lock { }
990 c.send(sibling_convos); // let parent wait on all children
991 fail!();
992 };
993 assert!(result.is_err());
994 // child task must have finished by the time try returns
995 let r = p.recv();
996 for p in r.iter() { p.recv(); } // wait on all its siblings
997 do m.lock_cond |cond| {
998 let woken = cond.broadcast();
999 assert_eq!(woken, 0);
1000 }
1001 }
1002 #[test]
1003 fn test_mutex_cond_signal_on_0() {
1004 // Tests that signal_on(0) is equivalent to signal().
1005 let m = Mutex::new();
1006 do m.lock_cond |cond| {
1007 let m2 = m.clone();
1008 do task::spawn {
1009 do m2.lock_cond |cond| {
1010 cond.signal_on(0);
1011 }
1012 }
1013 cond.wait();
1014 }
1015 }
1016 #[test]
1017 fn test_mutex_different_conds() {
1018 let result = do task::try {
1019 let m = Mutex::new_with_condvars(2);
1020 let m2 = m.clone();
1021 let (p, c) = comm::stream();
1022 do task::spawn {
1023 do m2.lock_cond |cond| {
1024 c.send(());
1025 cond.wait_on(1);
1026 }
1027 }
1028 let _ = p.recv();
1029 do m.lock_cond |cond| {
1030 if !cond.signal_on(0) {
1031 fail!(); // success; punt sibling awake.
1032 }
1033 }
1034 };
1035 assert!(result.is_err());
1036 }
1037 #[test]
1038 fn test_mutex_no_condvars() {
1039 let result = do task::try {
1040 let m = Mutex::new_with_condvars(0);
1041 do m.lock_cond |cond| { cond.wait(); }
1042 };
1043 assert!(result.is_err());
1044 let result = do task::try {
1045 let m = Mutex::new_with_condvars(0);
1046 do m.lock_cond |cond| { cond.signal(); }
1047 };
1048 assert!(result.is_err());
1049 let result = do task::try {
1050 let m = Mutex::new_with_condvars(0);
1051 do m.lock_cond |cond| { cond.broadcast(); }
1052 };
1053 assert!(result.is_err());
1054 }
1055 /************************************************************************
1056 * Reader/writer lock tests
1057 ************************************************************************/
1058 #[cfg(test)]
1059 pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead }
1060 #[cfg(test)]
1061 fn lock_rwlock_in_mode(x: &RWLock, mode: RWLockMode, blk: &fn()) {
1062 match mode {
1063 Read => x.read(blk),
1064 Write => x.write(blk),
1065 Downgrade =>
1066 do x.write_downgrade |mode| {
1067 do mode.write { blk() };
1068 },
1069 DowngradeRead =>
1070 do x.write_downgrade |mode| {
1071 let mode = x.downgrade(mode);
1072 do mode.read { blk() };
1073 },
1074 }
1075 }
1076 #[cfg(test)]
1077 fn test_rwlock_exclusion(x: &RWLock,
1078 mode1: RWLockMode,
1079 mode2: RWLockMode) {
1080 // Test mutual exclusion between readers and writers. Just like the
1081 // mutex mutual exclusion test, a ways above.
1082 let (p, c) = comm::stream();
1083 let x2 = x.clone();
1084 let mut sharedstate = ~0;
1085 {
1086 let ptr: *int = &*sharedstate;
1087 do task::spawn {
1088 let sharedstate: &mut int =
1089 unsafe { cast::transmute(ptr) };
1090 access_shared(sharedstate, &x2, mode1, 10);
1091 c.send(());
1092 }
1093 }
1094 {
1095 access_shared(sharedstate, x, mode2, 10);
1096 let _ = p.recv();
1097
1098 assert_eq!(*sharedstate, 20);
1099 }
1100
1101 fn access_shared(sharedstate: &mut int, x: &RWLock, mode: RWLockMode,
1102 n: uint) {
1103 do n.times {
1104 do lock_rwlock_in_mode(x, mode) {
1105 let oldval = *sharedstate;
1106 task::deschedule();
1107 *sharedstate = oldval + 1;
1108 }
1109 }
1110 }
1111 }
1112 #[test]
1113 fn test_rwlock_readers_wont_modify_the_data() {
1114 test_rwlock_exclusion(&RWLock::new(), Read, Write);
1115 test_rwlock_exclusion(&RWLock::new(), Write, Read);
1116 test_rwlock_exclusion(&RWLock::new(), Read, Downgrade);
1117 test_rwlock_exclusion(&RWLock::new(), Downgrade, Read);
1118 }
1119 #[test]
1120 fn test_rwlock_writers_and_writers() {
1121 test_rwlock_exclusion(&RWLock::new(), Write, Write);
1122 test_rwlock_exclusion(&RWLock::new(), Write, Downgrade);
1123 test_rwlock_exclusion(&RWLock::new(), Downgrade, Write);
1124 test_rwlock_exclusion(&RWLock::new(), Downgrade, Downgrade);
1125 }
1126 #[cfg(test)]
1127 fn test_rwlock_handshake(x: &RWLock,
1128 mode1: RWLockMode,
1129 mode2: RWLockMode,
1130 make_mode2_go_first: bool) {
1131 // Much like sem_multi_resource.
1132 let x2 = x.clone();
1133 let (p1, c1) = comm::stream();
1134 let (p2, c2) = comm::stream();
1135 do task::spawn {
1136 if !make_mode2_go_first {
1137 let _ = p2.recv(); // parent sends to us once it locks, or ...
1138 }
1139 do lock_rwlock_in_mode(&x2, mode2) {
1140 if make_mode2_go_first {
1141 c1.send(()); // ... we send to it once we lock
1142 }
1143 let _ = p2.recv();
1144 c1.send(());
1145 }
1146 }
1147 if make_mode2_go_first {
1148 let _ = p1.recv(); // child sends to us once it locks, or ...
1149 }
1150 do lock_rwlock_in_mode(x, mode1) {
1151 if !make_mode2_go_first {
1152 c2.send(()); // ... we send to it once we lock
1153 }
1154 c2.send(());
1155 let _ = p1.recv();
1156 }
1157 }
1158 #[test]
1159 fn test_rwlock_readers_and_readers() {
1160 test_rwlock_handshake(&RWLock::new(), Read, Read, false);
1161 // The downgrader needs to get in before the reader gets in, otherwise
1162 // they cannot end up reading at the same time.
1163 test_rwlock_handshake(&RWLock::new(), DowngradeRead, Read, false);
1164 test_rwlock_handshake(&RWLock::new(), Read, DowngradeRead, true);
1165 // Two downgrade_reads can never both end up reading at the same time.
1166 }
1167 #[test]
1168 fn test_rwlock_downgrade_unlock() {
1169 // Tests that downgrade can unlock the lock in both modes
1170 let x = RWLock::new();
1171 do lock_rwlock_in_mode(&x, Downgrade) { }
1172 test_rwlock_handshake(&x, Read, Read, false);
1173 let y = RWLock::new();
1174 do lock_rwlock_in_mode(&y, DowngradeRead) { }
1175 test_rwlock_exclusion(&y, Write, Write);
1176 }
1177 #[test]
1178 fn test_rwlock_read_recursive() {
1179 let x = RWLock::new();
1180 do x.read { do x.read { } }
1181 }
1182 #[test]
1183 fn test_rwlock_cond_wait() {
1184 // As test_mutex_cond_wait above.
1185 let x = RWLock::new();
1186
1187 // Child wakes up parent
1188 do x.write_cond |cond| {
1189 let x2 = x.clone();
1190 do task::spawn {
1191 do x2.write_cond |cond| {
1192 let woken = cond.signal();
1193 assert!(woken);
1194 }
1195 }
1196 cond.wait();
1197 }
1198 // Parent wakes up child
1199 let (port, chan) = comm::stream();
1200 let x3 = x.clone();
1201 do task::spawn {
1202 do x3.write_cond |cond| {
1203 chan.send(());
1204 cond.wait();
1205 chan.send(());
1206 }
1207 }
1208 let _ = port.recv(); // Wait until child gets in the rwlock
1209 do x.read { } // Must be able to get in as a reader in the meantime
1210 do x.write_cond |cond| { // Or as another writer
1211 let woken = cond.signal();
1212 assert!(woken);
1213 }
1214 let _ = port.recv(); // Wait until child wakes up
1215 do x.read { } // Just for good measure
1216 }
1217 #[cfg(test)]
1218 fn test_rwlock_cond_broadcast_helper(num_waiters: uint,
1219 dg1: bool,
1220 dg2: bool) {
1221 // Much like the mutex broadcast test. Downgrade-enabled.
1222 fn lock_cond(x: &RWLock, downgrade: bool, blk: &fn(c: &Condvar)) {
1223 if downgrade {
1224 do x.write_downgrade |mode| {
1225 do mode.write_cond |c| { blk(c) }
1226 }
1227 } else {
1228 do x.write_cond |c| { blk(c) }
1229 }
1230 }
1231 let x = RWLock::new();
1232 let mut ports = ~[];
1233
1234 do num_waiters.times {
1235 let xi = x.clone();
1236 let (port, chan) = comm::stream();
1237 ports.push(port);
1238 do task::spawn {
1239 do lock_cond(&xi, dg1) |cond| {
1240 chan.send(());
1241 cond.wait();
1242 chan.send(());
1243 }
1244 }
1245 }
1246
1247 // wait until all children get in the mutex
1248 for port in ports.iter() { let _ = port.recv(); }
1249 do lock_cond(&x, dg2) |cond| {
1250 let num_woken = cond.broadcast();
1251 assert_eq!(num_woken, num_waiters);
1252 }
1253 // wait until all children wake up
1254 for port in ports.iter() { let _ = port.recv(); }
1255 }
1256 #[test]
1257 fn test_rwlock_cond_broadcast() {
1258 test_rwlock_cond_broadcast_helper(0, true, true);
1259 test_rwlock_cond_broadcast_helper(0, true, false);
1260 test_rwlock_cond_broadcast_helper(0, false, true);
1261 test_rwlock_cond_broadcast_helper(0, false, false);
1262 test_rwlock_cond_broadcast_helper(12, true, true);
1263 test_rwlock_cond_broadcast_helper(12, true, false);
1264 test_rwlock_cond_broadcast_helper(12, false, true);
1265 test_rwlock_cond_broadcast_helper(12, false, false);
1266 }
1267 #[cfg(test)]
1268 fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) {
1269 // Mutex must get automatically unlocked if failed/killed within.
1270 let x = RWLock::new();
1271 let x2 = x.clone();
1272
1273 let result: result::Result<(),()> = do task::try || {
1274 do lock_rwlock_in_mode(&x2, mode1) {
1275 fail!();
1276 }
1277 };
1278 assert!(result.is_err());
1279 // child task must have finished by the time try returns
1280 do lock_rwlock_in_mode(&x, mode2) { }
1281 }
1282 #[test]
1283 fn test_rwlock_reader_killed_writer() {
1284 rwlock_kill_helper(Read, Write);
1285 }
1286 #[test]
1287 fn test_rwlock_writer_killed_reader() {
1288 rwlock_kill_helper(Write, Read);
1289 }
1290 #[test]
1291 fn test_rwlock_reader_killed_reader() {
1292 rwlock_kill_helper(Read, Read);
1293 }
1294 #[test]
1295 fn test_rwlock_writer_killed_writer() {
1296 rwlock_kill_helper(Write, Write);
1297 }
1298 #[test]
1299 fn test_rwlock_kill_downgrader() {
1300 rwlock_kill_helper(Downgrade, Read);
1301 rwlock_kill_helper(Read, Downgrade);
1302 rwlock_kill_helper(Downgrade, Write);
1303 rwlock_kill_helper(Write, Downgrade);
1304 rwlock_kill_helper(DowngradeRead, Read);
1305 rwlock_kill_helper(Read, DowngradeRead);
1306 rwlock_kill_helper(DowngradeRead, Write);
1307 rwlock_kill_helper(Write, DowngradeRead);
1308 rwlock_kill_helper(DowngradeRead, Downgrade);
1309 rwlock_kill_helper(DowngradeRead, Downgrade);
1310 rwlock_kill_helper(Downgrade, DowngradeRead);
1311 rwlock_kill_helper(Downgrade, DowngradeRead);
1312 }
1313 #[test] #[should_fail]
1314 fn test_rwlock_downgrade_cant_swap() {
1315 // Tests that you can't downgrade with a different rwlock's token.
1316 let x = RWLock::new();
1317 let y = RWLock::new();
1318 do x.write_downgrade |xwrite| {
1319 let mut xopt = Some(xwrite);
1320 do y.write_downgrade |_ywrite| {
1321 y.downgrade(xopt.take_unwrap());
1322 error!("oops, y.downgrade(x) should have failed!");
1323 }
1324 }
1325 }
1326 }
libextra/sync.rs:37:15-37:15 -ty- definition:
#[doc(hidden)]
type SignalEnd = comm::ChanOne<()>;
references:-42: tail: comm::Chan<SignalEnd> }
41: struct WaitQueue { head: comm::Port<SignalEnd>,
libextra/sync.rs:416:15-416:15 -struct- definition:
#[doc(hidden)]
struct RWLockInner {
references:-447: priv state: UnsafeArc<RWLockInner>,
459: let state = UnsafeArc::new(RWLockInner {
libextra/sync.rs:35:15-35:15 -ty- definition:
#[doc(hidden)]
type WaitEnd = comm::PortOne<()>;
references:-76: fn wait_end(&self) -> WaitEnd {
libextra/sync.rs:663:68-663:68 -struct- definition:
/// The "write permission" token used for rwlock.write_downgrade().
pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
references:-634: pub fn downgrade<'a>(&self, token: RWLockWriteMode<'a>)
602: blk(RWLockWriteMode { lock: self, token: NonCopyable::new() })
670: impl<'self> RWLockWriteMode<'self> {
593: pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U {
libextra/arc.rs:
525: token: sync::RWLockWriteMode<'self>,
libextra/sync.rs:332:53-332:53 -struct- definition:
/// A counting, blocking, bounded-waiting semaphore.
struct Semaphore { priv sem: Sem<()> }
references:-338: fn clone(&self) -> Semaphore {
345: pub fn new(count: int) -> Semaphore {
163: Just(&'self Semaphore),
336: impl Clone for Semaphore {
346: Semaphore { sem: Sem::new(count, ()) }
445: priv order_lock: Semaphore,
343: impl Semaphore {
339: Semaphore { sem: Sem((*self.sem).clone()) }
libextra/sync.rs:160:30-160:30 -enum- definition:
// inside of type parameters.
enum ReacquireOrderLock<'self> {
references:-177: priv order: ReacquireOrderLock<'self>,
libextra/sync.rs:376:4-376:4 -struct- definition:
*/
pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
references:-381: fn clone(&self) -> Mutex { Mutex { sem: Sem((*self.sem).clone()) } }
394: pub fn new_with_condvars(num_condvars: uint) -> Mutex {
379: impl Clone for Mutex {
384: impl Mutex {
395: Mutex { sem: Sem::new_and_signal(1, num_condvars) }
381: fn clone(&self) -> Mutex { Mutex { sem: Sem((*self.sem).clone()) } }
386: pub fn new() -> Mutex { Mutex::new_with_condvars(1) }
libextra/arc.rs:
161: struct MutexArcInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
libextra/sync.rs:84:15-84:15 -struct- definition:
#[doc(hidden)]
struct SemInner<Q> {
references:-94: struct Sem<Q>(Exclusive<SemInner<Q>>);
99: Sem(Exclusive::new(SemInner {
libextra/sync.rs:93:15-93:15 -struct- definition:
#[doc(hidden)]
struct Sem<Q>(Exclusive<SemInner<Q>>);
references:-147: impl Sem<~[WaitQueue]> {
377: pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
446: priv access_lock: Sem<~[WaitQueue]>,
98: fn new(count: int, q: Q) -> Sem<Q> {
318: impl Sem<~[WaitQueue]> {
149: -> Sem<~[WaitQueue]> {
97: impl<Q:Send> Sem<Q> {
333: struct Semaphore { priv sem: Sem<()> }
170: priv sem: &'self Sem<~[WaitQueue]>,
libextra/sync.rs:443:4-443:4 -struct- definition:
*/
pub struct RWLock {
references:-667: pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
452: pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
469: pub fn clone(&self) -> RWLock {
458: pub fn new_with_condvars(num_condvars: uint) -> RWLock {
470: RWLock { order_lock: (&(self.order_lock)).clone(),
664: pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
463: RWLock { order_lock: Semaphore::new(1),
450: impl RWLock {
libextra/arc.rs:
338: struct RWArcInner<T> { priv lock: RWLock, priv failed: bool, priv data: T }
518: fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {
libextra/sync.rs:305:15-305:15 -fn- definition:
#[doc(hidden)]
fn check_cvar_bounds<U>(out_of_bounds: Option<uint>, id: uint, act: &str,
references:-230: do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
294: do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
269: do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
libextra/sync.rs:666:67-666:67 -struct- definition:
/// The "read permission" token used for rwlock.write_downgrade().
pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
references:-683: impl<'self> RWLockReadMode<'self> {
659: RWLockReadMode { lock: token.lock, token: NonCopyable::new() }
635: -> RWLockReadMode<'a> {
libextra/arc.rs:
532: token: sync::RWLockReadMode<'self>,
libextra/sync.rs:166:74-166:74 -struct- definition:
/// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
pub struct Condvar<'self> {
references:-323: blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() })
532: pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
182: impl<'self> Condvar<'self> {
321: pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
405: pub fn lock_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
674: pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
677: blk(&Condvar { sem: &self.lock.access_lock,
565: blk(&Condvar { sem: cond.sem, order: opt_lock,
libextra/arc.rs:
55: priv cond: &'self sync::Condvar<'self>
libextra/sync.rs:40:15-40:15 -struct- definition:
#[doc(hidden)]
struct WaitQueue { head: comm::Port<SignalEnd>,
references:-87: waiters: WaitQueue,
318: impl Sem<~[WaitQueue]> {
377: pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
149: -> Sem<~[WaitQueue]> {
446: priv access_lock: Sem<~[WaitQueue]>,
170: priv sem: &'self Sem<~[WaitQueue]>,
147: impl Sem<~[WaitQueue]> {
44: impl WaitQueue {
47: WaitQueue { head: block_head, tail: block_tail }
45: fn new() -> WaitQueue {