(index<- ) ./libsync/mutex.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! A proper mutex implementation regardless of the "flavor of task" which is
12 //! acquiring the lock.
13
14 // # Implementation of Rust mutexes
15 //
16 // Most answers to the question of "how do I use a mutex" are "use pthreads",
17 // but for Rust this isn't quite sufficient. Green threads cannot acquire an OS
18 // mutex because they can context switch among many OS threads, leading to
19 // deadlocks with other green threads.
20 //
21 // Another problem for green threads grabbing an OS mutex is that POSIX dictates
22 // that unlocking a mutex on a different thread from where it was locked is
23 // undefined behavior. Remember that green threads can migrate among OS threads,
24 // so this would mean that we would have to pin green threads to OS threads,
25 // which is less than ideal.
26 //
27 // ## Using deschedule/reawaken
28 //
29 // We already have primitives for descheduling/reawakening tasks, so they're the
30 // first obvious choice when implementing a mutex. The idea would be to have a
31 // concurrent queue that everyone is pushed on to, and then the owner of the
32 // mutex is the one popping from the queue.
33 //
34 // Unfortunately, this is not very performant for native tasks. The suspected
35 // reason for this is that each native thread is suspended on its own condition
36 // variable, unique from all the other threads. In this situation, the kernel
37 // has no idea what the scheduling semantics are of the user program, so all of
38 // the threads are distributed among all cores on the system. This ends up
39 // having very expensive wakeups of remote cores high up in the profile when
40 // handing off the mutex among native tasks. On the other hand, when using an OS
41 // mutex, the kernel knows that all native threads are contended on the same
42 // mutex, so they're in theory all migrated to a single core (fast context
43 // switching).
44 //
45 // ## Mixing implementations
46 //
47 // From that above information, we have two constraints. The first is that
48 // green threads can't touch os mutexes, and the second is that native tasks
49 // pretty much *must* touch an os mutex.
50 //
51 // As a compromise, the queueing implementation is used for green threads and
52 // the os mutex is used for native threads (why not have both?). This ends up
53 // leading to fairly decent performance for both native threads and green
54 // threads on various workloads (uncontended and contended).
55 //
56 // The crux of this implementation is an atomic work which is CAS'd on many
57 // times in order to manage a few flags about who's blocking where and whether
58 // it's locked or not.
59
60 use std::kinds::marker;
61 use std::mem;
62 use std::rt::local::Local;
63 use std::rt::task::{BlockedTask, Task};
64 use std::rt::thread::Thread;
65 use std::sync::atomics;
66 use std::ty::Unsafe;
67 use std::unstable::mutex;
68
69 use q = mpsc_intrusive;
70
71 pub static LOCKED: uint = 1 << 0;
72 pub static GREEN_BLOCKED: uint = 1 << 1;
73 pub static NATIVE_BLOCKED: uint = 1 << 2;
74
75 /// A mutual exclusion primitive useful for protecting shared data
76 ///
77 /// This mutex is an implementation of a lock for all flavors of tasks which may
78 /// be grabbing. A common problem with green threads is that they cannot grab
79 /// locks (if they reschedule during the lock a contender could deadlock the
80 /// system), but this mutex does *not* suffer this problem.
81 ///
82 /// This mutex will properly block tasks waiting for the lock to become
83 /// available. The mutex can also be statically initialized or created via a
84 /// `new` constructor.
85 ///
86 /// # Example
87 ///
88 /// ```rust
89 /// use sync::mutex::Mutex;
90 ///
91 /// let m = Mutex::new();
92 /// let guard = m.lock();
93 /// // do some work
94 /// drop(guard); // unlock the lock
95 /// ```
96 pub struct Mutex {
97 lock: StaticMutex,
98 }
99
100 #[deriving(Eq, Show)]
101 enum Flavor {
102 Unlocked,
103 TryLockAcquisition,
104 GreenAcquisition,
105 NativeAcquisition,
106 }
107
108 /// The static mutex type is provided to allow for static allocation of mutexes.
109 ///
110 /// Note that this is a separate type because using a Mutex correctly means that
111 /// it needs to have a destructor run. In Rust, statics are not allowed to have
112 /// destructors. As a result, a `StaticMutex` has one extra method when compared
113 /// to a `Mutex`, a `destroy` method. This method is unsafe to call, and
114 /// documentation can be found directly on the method.
115 ///
116 /// # Example
117 ///
118 /// ```rust
119 /// use sync::mutex::{StaticMutex, MUTEX_INIT};
120 ///
121 /// static mut LOCK: StaticMutex = MUTEX_INIT;
122 ///
123 /// unsafe {
124 /// let _g = LOCK.lock();
125 /// // do some productive work
126 /// }
127 /// // lock is unlocked here.
128 /// ```
129 pub struct StaticMutex {
130 /// Current set of flags on this mutex
131 state: atomics::AtomicUint,
132 /// an OS mutex used by native threads
133 lock: mutex::StaticNativeMutex,
134
135 /// Type of locking operation currently on this mutex
136 flavor: Unsafe<Flavor>,
137 /// uint-cast of the green thread waiting for this mutex
138 green_blocker: Unsafe<uint>,
139 /// uint-cast of the native thread waiting for this mutex
140 native_blocker: Unsafe<uint>,
141
142 /// A concurrent mpsc queue used by green threads, along with a count used
143 /// to figure out when to dequeue and enqueue.
144 q: q::Queue<uint>,
145 green_cnt: atomics::AtomicUint,
146 }
147
148 /// An RAII implementation of a "scoped lock" of a mutex. When this structure is
149 /// dropped (falls out of scope), the lock will be unlocked.
150 #[must_use]
151 pub struct Guard<'a> {
152 lock: &'a StaticMutex,
153 }
154
155 /// Static initialization of a mutex. This constant can be used to initialize
156 /// other mutex constants.
157 pub static MUTEX_INIT: StaticMutex = StaticMutex {
158 lock: mutex::NATIVE_MUTEX_INIT,
159 state: atomics::INIT_ATOMIC_UINT,
160 flavor: Unsafe { value: Unlocked, marker1: marker::InvariantType },
161 green_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
162 native_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
163 green_cnt: atomics::INIT_ATOMIC_UINT,
164 q: q::Queue {
165 head: atomics::INIT_ATOMIC_UINT,
166 tail: Unsafe {
167 value: 0 as *mut q::Node<uint>,
168 marker1: marker::InvariantType,
169 },
170 stub: q::DummyNode {
171 next: atomics::INIT_ATOMIC_UINT,
172 }
173 }
174 };
175
176 impl StaticMutex {
177 /// Attempts to grab this lock, see `Mutex::try_lock`
178 pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
179 // Attempt to steal the mutex from an unlocked state.
180 //
181 // FIXME: this can mess up the fairness of the mutex, seems bad
182 match self.state.compare_and_swap(0, LOCKED, atomics::SeqCst) {
183 0 => {
184 // After acquiring the mutex, we can safely access the inner
185 // fields.
186 let prev = unsafe {
187 mem::replace(&mut *self.flavor.get(), TryLockAcquisition)
188 };
189 assert_eq!(prev, Unlocked);
190 Some(Guard::new(self))
191 }
192 _ => None
193 }
194 }
195
196 /// Acquires this lock, see `Mutex::lock`
197 pub fn lock<'a>(&'a self) -> Guard<'a> {
198 // First, attempt to steal the mutex from an unlocked state. The "fast
199 // path" needs to have as few atomic instructions as possible, and this
200 // one cmpxchg is already pretty expensive.
201 //
202 // FIXME: this can mess up the fairness of the mutex, seems bad
203 match self.try_lock() {
204 Some(guard) => return guard,
205 None => {}
206 }
207
208 // After we've failed the fast path, then we delegate to the differnet
209 // locking protocols for green/native tasks. This will select two tasks
210 // to continue further (one native, one green).
211 let t: Box<Task> = Local::take();
212 let can_block = t.can_block();
213 let native_bit;
214 if can_block {
215 self.native_lock(t);
216 native_bit = NATIVE_BLOCKED;
217 } else {
218 self.green_lock(t);
219 native_bit = GREEN_BLOCKED;
220 }
221
222 // After we've arbitrated among task types, attempt to re-acquire the
223 // lock (avoids a deschedule). This is very important to do in order to
224 // allow threads coming out of the native_lock function to try their
225 // best to not hit a cvar in deschedule.
226 let mut old = match self.state.compare_and_swap(0, LOCKED,
227 atomics::SeqCst) {
228 0 => {
229 let flavor = if can_block {
230 NativeAcquisition
231 } else {
232 GreenAcquisition
233 };
234 // We've acquired the lock, so this unsafe access to flavor is
235 // allowed.
236 unsafe { *self.flavor.get() = flavor; }
237 return Guard::new(self)
238 }
239 old => old,
240 };
241
242 // Alright, everything else failed. We need to deschedule ourselves and
243 // flag ourselves as waiting. Note that this case should only happen
244 // regularly in native/green contention. Due to try_lock and the header
245 // of lock stealing the lock, it's also possible for native/native
246 // contention to hit this location, but as less common.
247 let t: Box<Task> = Local::take();
248 t.deschedule(1, |task| {
249 let task = unsafe { task.cast_to_uint() };
250
251 // These accesses are protected by the respective native/green
252 // mutexes which were acquired above.
253 let prev = if can_block {
254 unsafe { mem::replace(&mut *self.native_blocker.get(), task) }
255 } else {
256 unsafe { mem::replace(&mut *self.green_blocker.get(), task) }
257 };
258 assert_eq!(prev, 0);
259
260 loop {
261 assert_eq!(old & native_bit, 0);
262 // If the old state was locked, then we need to flag ourselves
263 // as blocking in the state. If the old state was unlocked, then
264 // we attempt to acquire the mutex. Everything here is a CAS
265 // loop that'll eventually make progress.
266 if old & LOCKED != 0 {
267 old = match self.state.compare_and_swap(old,
268 old | native_bit,
269 atomics::SeqCst) {
270 n if n == old => return Ok(()),
271 n => n
272 };
273 } else {
274 assert_eq!(old, 0);
275 old = match self.state.compare_and_swap(old,
276 old | LOCKED,
277 atomics::SeqCst) {
278 n if n == old => {
279 // After acquiring the lock, we have access to the
280 // flavor field, and we've regained access to our
281 // respective native/green blocker field.
282 let prev = if can_block {
283 unsafe {
284 *self.native_blocker.get() = 0;
285 mem::replace(&mut *self.flavor.get(),
286 NativeAcquisition)
287 }
288 } else {
289 unsafe {
290 *self.green_blocker.get() = 0;
291 mem::replace(&mut *self.flavor.get(),
292 GreenAcquisition)
293 }
294 };
295 assert_eq!(prev, Unlocked);
296 return Err(unsafe {
297 BlockedTask::cast_from_uint(task)
298 })
299 }
300 n => n,
301 };
302 }
303 }
304 });
305
306 Guard::new(self)
307 }
308
309 // Tasks which can block are super easy. These tasks just call the blocking
310 // `lock()` function on an OS mutex
311 fn native_lock(&self, t: Box<Task>) {
312 Local::put(t);
313 unsafe { self.lock.lock_noguard(); }
314 }
315
316 fn native_unlock(&self) {
317 unsafe { self.lock.unlock_noguard(); }
318 }
319
320 fn green_lock(&self, t: Box<Task>) {
321 // Green threads flag their presence with an atomic counter, and if they
322 // fail to be the first to the mutex, they enqueue themselves on a
323 // concurrent internal queue with a stack-allocated node.
324 //
325 // FIXME: There isn't a cancellation currently of an enqueue, forcing
326 // the unlocker to spin for a bit.
327 if self.green_cnt.fetch_add(1, atomics::SeqCst) == 0 {
328 Local::put(t);
329 return
330 }
331
332 let mut node = q::Node::new(0);
333 t.deschedule(1, |task| {
334 unsafe {
335 node.data = task.cast_to_uint();
336 self.q.push(&mut node);
337 }
338 Ok(())
339 });
340 }
341
342 fn green_unlock(&self) {
343 // If we're the only green thread, then no need to check the queue,
344 // otherwise the fixme above forces us to spin for a bit.
345 if self.green_cnt.fetch_sub(1, atomics::SeqCst) == 1 { return }
346 let node;
347 loop {
348 match unsafe { self.q.pop() } {
349 Some(t) => { node = t; break; }
350 None => Thread::yield_now(),
351 }
352 }
353 let task = unsafe { BlockedTask::cast_from_uint((*node).data) };
354 task.wake().map(|t| t.reawaken());
355 }
356
357 fn unlock(&self) {
358 // Unlocking this mutex is a little tricky. We favor any task that is
359 // manually blocked (not in each of the separate locks) in order to help
360 // provide a little fairness (green threads will wake up the pending
361 // native thread and native threads will wake up the pending green
362 // thread).
363 //
364 // There's also the question of when we unlock the actual green/native
365 // locking halves as well. If we're waking up someone, then we can wait
366 // to unlock until we've acquired the task to wake up (we're guaranteed
367 // the mutex memory is still valid when there's contenders), but as soon
368 // as we don't find any contenders we must unlock the mutex, and *then*
369 // flag the mutex as unlocked.
370 //
371 // This flagging can fail, leading to another round of figuring out if a
372 // task needs to be woken, and in this case it's ok that the "mutex
373 // halves" are unlocked, we're just mainly dealing with the atomic state
374 // of the outer mutex.
375 let flavor = unsafe { mem::replace(&mut *self.flavor.get(), Unlocked) };
376
377 let mut state = self.state.load(atomics::SeqCst);
378 let mut unlocked = false;
379 let task;
380 loop {
381 assert!(state & LOCKED != 0);
382 if state & GREEN_BLOCKED != 0 {
383 self.unset(state, GREEN_BLOCKED);
384 task = unsafe {
385 *self.flavor.get() = GreenAcquisition;
386 let task = mem::replace(&mut *self.green_blocker.get(), 0);
387 BlockedTask::cast_from_uint(task)
388 };
389 break;
390 } else if state & NATIVE_BLOCKED != 0 {
391 self.unset(state, NATIVE_BLOCKED);
392 task = unsafe {
393 *self.flavor.get() = NativeAcquisition;
394 let task = mem::replace(&mut *self.native_blocker.get(), 0);
395 BlockedTask::cast_from_uint(task)
396 };
397 break;
398 } else {
399 assert_eq!(state, LOCKED);
400 if !unlocked {
401 match flavor {
402 GreenAcquisition => { self.green_unlock(); }
403 NativeAcquisition => { self.native_unlock(); }
404 TryLockAcquisition => {}
405 Unlocked => unreachable!()
406 }
407 unlocked = true;
408 }
409 match self.state.compare_and_swap(LOCKED, 0, atomics::SeqCst) {
410 LOCKED => return,
411 n => { state = n; }
412 }
413 }
414 }
415 if !unlocked {
416 match flavor {
417 GreenAcquisition => { self.green_unlock(); }
418 NativeAcquisition => { self.native_unlock(); }
419 TryLockAcquisition => {}
420 Unlocked => unreachable!()
421 }
422 }
423
424 task.wake().map(|t| t.reawaken());
425 }
426
427 /// Loops around a CAS to unset the `bit` in `state`
428 fn unset(&self, mut state: uint, bit: uint) {
429 loop {
430 assert!(state & bit != 0);
431 let new = state ^ bit;
432 match self.state.compare_and_swap(state, new, atomics::SeqCst) {
433 n if n == state => break,
434 n => { state = n; }
435 }
436 }
437 }
438
439 /// Deallocates resources associated with this static mutex.
440 ///
441 /// This method is unsafe because it provides no guarantees that there are
442 /// no active users of this mutex, and safety is not guaranteed if there are
443 /// active users of this mutex.
444 ///
445 /// This method is required to ensure that there are no memory leaks on
446 /// *all* platforms. It may be the case that some platforms do not leak
447 /// memory if this method is not called, but this is not guaranteed to be
448 /// true on all platforms.
449 pub unsafe fn destroy(&self) {
450 self.lock.destroy()
451 }
452 }
453
454 impl Mutex {
455 /// Creates a new mutex in an unlocked state ready for use.
456 pub fn new() -> Mutex {
457 Mutex {
458 lock: StaticMutex {
459 state: atomics::AtomicUint::new(0),
460 flavor: Unsafe::new(Unlocked),
461 green_blocker: Unsafe::new(0),
462 native_blocker: Unsafe::new(0),
463 green_cnt: atomics::AtomicUint::new(0),
464 q: q::Queue::new(),
465 lock: unsafe { mutex::StaticNativeMutex::new() },
466 }
467 }
468 }
469
470 /// Attempts to acquire this lock.
471 ///
472 /// If the lock could not be acquired at this time, then `None` is returned.
473 /// Otherwise, an RAII guard is returned. The lock will be unlocked when the
474 /// guard is dropped.
475 ///
476 /// This function does not block.
477 pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
478 self.lock.try_lock()
479 }
480
481 /// Acquires a mutex, blocking the current task until it is able to do so.
482 ///
483 /// This function will block the local task until it is available to acquire
484 /// the mutex. Upon returning, the task is the only task with the mutex
485 /// held. An RAII guard is returned to allow scoped unlock of the lock. When
486 /// the guard goes out of scope, the mutex will be unlocked.
487 pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() }
488 }
489
490 impl<'a> Guard<'a> {
491 fn new<'b>(lock: &'b StaticMutex) -> Guard<'b> {
492 if cfg!(debug) {
493 // once we've acquired a lock, it's ok to access the flavor
494 assert!(unsafe { *lock.flavor.get() != Unlocked });
495 assert!(lock.state.load(atomics::SeqCst) & LOCKED != 0);
496 }
497 Guard { lock: lock }
498 }
499 }
500
501 #[unsafe_destructor]
502 impl<'a> Drop for Guard<'a> {
503 #[inline]
504 fn drop(&mut self) {
505 self.lock.unlock();
506 }
507 }
508
509 impl Drop for Mutex {
510 fn drop(&mut self) {
511 // This is actually safe b/c we know that there is no further usage of
512 // this mutex (it's up to the user to arrange for a mutex to get
513 // dropped, that's not our job)
514 unsafe { self.lock.destroy() }
515 }
516 }
517
518 #[cfg(test)]
519 mod test {
520 extern crate native;
521 use super::{Mutex, StaticMutex, MUTEX_INIT};
522
523 #[test]
524 fn smoke() {
525 let m = Mutex::new();
526 drop(m.lock());
527 drop(m.lock());
528 }
529
530 #[test]
531 fn smoke_static() {
532 static mut m: StaticMutex = MUTEX_INIT;
533 unsafe {
534 drop(m.lock());
535 drop(m.lock());
536 m.destroy();
537 }
538 }
539
540 #[test]
541 fn lots_and_lots() {
542 static mut m: StaticMutex = MUTEX_INIT;
543 static mut CNT: uint = 0;
544 static M: uint = 1000;
545 static N: uint = 3;
546
547 fn inc() {
548 for _ in range(0, M) {
549 unsafe {
550 let _g = m.lock();
551 CNT += 1;
552 }
553 }
554 }
555
556 let (tx, rx) = channel();
557 for _ in range(0, N) {
558 let tx2 = tx.clone();
559 native::task::spawn(proc() { inc(); tx2.send(()); });
560 let tx2 = tx.clone();
561 spawn(proc() { inc(); tx2.send(()); });
562 }
563
564 drop(tx);
565 for _ in range(0, 2 * N) {
566 rx.recv();
567 }
568 assert_eq!(unsafe {CNT}, M * N * 2);
569 unsafe {
570 m.destroy();
571 }
572 }
573
574 #[test]
575 fn trylock() {
576 let m = Mutex::new();
577 assert!(m.try_lock().is_some());
578 }
579 }
libsync/mutex.rs:95:8-95:8 -struct- definition:
/// ```
pub struct Mutex {
lock: StaticMutex,
references:- 5456: pub fn new() -> Mutex {
457: Mutex {
458: lock: StaticMutex {
--
509: impl Drop for Mutex {
510: fn drop(&mut self) {
libsync/raw.rs:
84: struct Sem<Q> {
85: lock: mutex::Mutex,
86: // n.b, we need Sem to be `Share`, but the WaitQueue type is not send/share
libsync/mutex.rs:
454: impl Mutex {
455: /// Creates a new mutex in an unlocked state ready for use.
libsync/mutex.rs:128:8-128:8 -struct- definition:
/// ```
pub struct StaticMutex {
/// Current set of flags on this mutex
references:- 8457: Mutex {
458: lock: StaticMutex {
459: state: atomics::AtomicUint::new(0),
--
490: impl<'a> Guard<'a> {
491: fn new<'b>(lock: &'b StaticMutex) -> Guard<'b> {
492: if cfg!(debug) {
libsync/one.rs:
43: pub struct Once {
44: mutex: StaticMutex,
45: cnt: atomics::AtomicInt,
libsync/mutex.rs:
96: pub struct Mutex {
97: lock: StaticMutex,
98: }
libsync/mutex.rs:100:22-100:22 -enum- definition:
enum Flavor {
Unlocked,
TryLockAcquisition,
references:- 5101: enum Flavor {
--
135: /// Type of locking operation currently on this mutex
136: flavor: Unsafe<Flavor>,
137: /// uint-cast of the green thread waiting for this mutex
libsync/mutex.rs:150:12-150:12 -struct- definition:
pub struct Guard<'a> {
lock: &'a StaticMutex,
}
references:- 8496: }
497: Guard { lock: lock }
498: }
--
502: impl<'a> Drop for Guard<'a> {
503: #[inline]