(index<- )        ./libsync/mutex.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! A proper mutex implementation regardless of the "flavor of task" which is
  12  //! acquiring the lock.
  13  
  14  // # Implementation of Rust mutexes
  15  //
  16  // Most answers to the question of "how do I use a mutex" are "use pthreads",
  17  // but for Rust this isn't quite sufficient. Green threads cannot acquire an OS
  18  // mutex because they can context switch among many OS threads, leading to
  19  // deadlocks with other green threads.
  20  //
  21  // Another problem for green threads grabbing an OS mutex is that POSIX dictates
  22  // that unlocking a mutex on a different thread from where it was locked is
  23  // undefined behavior. Remember that green threads can migrate among OS threads,
  24  // so this would mean that we would have to pin green threads to OS threads,
  25  // which is less than ideal.
  26  //
  27  // ## Using deschedule/reawaken
  28  //
  29  // We already have primitives for descheduling/reawakening tasks, so they're the
  30  // first obvious choice when implementing a mutex. The idea would be to have a
  31  // concurrent queue that everyone is pushed on to, and then the owner of the
  32  // mutex is the one popping from the queue.
  33  //
  34  // Unfortunately, this is not very performant for native tasks. The suspected
  35  // reason for this is that each native thread is suspended on its own condition
  36  // variable, unique from all the other threads. In this situation, the kernel
  37  // has no idea what the scheduling semantics are of the user program, so all of
  38  // the threads are distributed among all cores on the system. This ends up
  39  // having very expensive wakeups of remote cores high up in the profile when
  40  // handing off the mutex among native tasks. On the other hand, when using an OS
  41  // mutex, the kernel knows that all native threads are contended on the same
  42  // mutex, so they're in theory all migrated to a single core (fast context
  43  // switching).
  44  //
  45  // ## Mixing implementations
  46  //
  47  // From that above information, we have two constraints. The first is that
  48  // green threads can't touch os mutexes, and the second is that native tasks
  49  // pretty much *must* touch an os mutex.
  50  //
  51  // As a compromise, the queueing implementation is used for green threads and
  52  // the os mutex is used for native threads (why not have both?). This ends up
  53  // leading to fairly decent performance for both native threads and green
  54  // threads on various workloads (uncontended and contended).
  55  //
  56  // The crux of this implementation is an atomic work which is CAS'd on many
  57  // times in order to manage a few flags about who's blocking where and whether
  58  // it's locked or not.
  59  
  60  use std::kinds::marker;
  61  use std::mem;
  62  use std::rt::local::Local;
  63  use std::rt::task::{BlockedTask, Task};
  64  use std::rt::thread::Thread;
  65  use std::sync::atomics;
  66  use std::ty::Unsafe;
  67  use std::unstable::mutex;
  68  
  69  use q = mpsc_intrusive;
  70  
  71  pub static LOCKED: uint = 1 << 0;
  72  pub static GREEN_BLOCKED: uint = 1 << 1;
  73  pub static NATIVE_BLOCKED: uint = 1 << 2;
  74  
  75  /// A mutual exclusion primitive useful for protecting shared data
  76  ///
  77  /// This mutex is an implementation of a lock for all flavors of tasks which may
  78  /// be grabbing. A common problem with green threads is that they cannot grab
  79  /// locks (if they reschedule during the lock a contender could deadlock the
  80  /// system), but this mutex does *not* suffer this problem.
  81  ///
  82  /// This mutex will properly block tasks waiting for the lock to become
  83  /// available. The mutex can also be statically initialized or created via a
  84  /// `new` constructor.
  85  ///
  86  /// # Example
  87  ///
  88  /// ```rust
  89  /// use sync::mutex::Mutex;
  90  ///
  91  /// let m = Mutex::new();
  92  /// let guard = m.lock();
  93  /// // do some work
  94  /// drop(guard); // unlock the lock
  95  /// ```
  96  pub struct Mutex {
  97      lock: StaticMutex,
  98  }
  99  
 100  #[deriving(Eq, Show)]
 101  enum Flavor {
 102      Unlocked,
 103      TryLockAcquisition,
 104      GreenAcquisition,
 105      NativeAcquisition,
 106  }
 107  
 108  /// The static mutex type is provided to allow for static allocation of mutexes.
 109  ///
 110  /// Note that this is a separate type because using a Mutex correctly means that
 111  /// it needs to have a destructor run. In Rust, statics are not allowed to have
 112  /// destructors. As a result, a `StaticMutex` has one extra method when compared
 113  /// to a `Mutex`, a `destroy` method. This method is unsafe to call, and
 114  /// documentation can be found directly on the method.
 115  ///
 116  /// # Example
 117  ///
 118  /// ```rust
 119  /// use sync::mutex::{StaticMutex, MUTEX_INIT};
 120  ///
 121  /// static mut LOCK: StaticMutex = MUTEX_INIT;
 122  ///
 123  /// unsafe {
 124  ///     let _g = LOCK.lock();
 125  ///     // do some productive work
 126  /// }
 127  /// // lock is unlocked here.
 128  /// ```
 129  pub struct StaticMutex {
 130      /// Current set of flags on this mutex
 131      state: atomics::AtomicUint,
 132      /// an OS mutex used by native threads
 133      lock: mutex::StaticNativeMutex,
 134  
 135      /// Type of locking operation currently on this mutex
 136      flavor: Unsafe<Flavor>,
 137      /// uint-cast of the green thread waiting for this mutex
 138      green_blocker: Unsafe<uint>,
 139      /// uint-cast of the native thread waiting for this mutex
 140      native_blocker: Unsafe<uint>,
 141  
 142      /// A concurrent mpsc queue used by green threads, along with a count used
 143      /// to figure out when to dequeue and enqueue.
 144      q: q::Queue<uint>,
 145      green_cnt: atomics::AtomicUint,
 146  }
 147  
 148  /// An RAII implementation of a "scoped lock" of a mutex. When this structure is
 149  /// dropped (falls out of scope), the lock will be unlocked.
 150  #[must_use]
 151  pub struct Guard<'a> {
 152      lock: &'a StaticMutex,
 153  }
 154  
 155  /// Static initialization of a mutex. This constant can be used to initialize
 156  /// other mutex constants.
 157  pub static MUTEX_INIT: StaticMutex = StaticMutex {
 158      lock: mutex::NATIVE_MUTEX_INIT,
 159      state: atomics::INIT_ATOMIC_UINT,
 160      flavor: Unsafe { value: Unlocked, marker1: marker::InvariantType },
 161      green_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
 162      native_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
 163      green_cnt: atomics::INIT_ATOMIC_UINT,
 164      q: q::Queue {
 165          head: atomics::INIT_ATOMIC_UINT,
 166          tail: Unsafe {
 167              value: 0 as *mut q::Node<uint>,
 168              marker1: marker::InvariantType,
 169          },
 170          stub: q::DummyNode {
 171              next: atomics::INIT_ATOMIC_UINT,
 172          }
 173      }
 174  };
 175  
 176  impl StaticMutex {
 177      /// Attempts to grab this lock, see `Mutex::try_lock`
 178      pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
 179          // Attempt to steal the mutex from an unlocked state.
 180          //
 181          // FIXME: this can mess up the fairness of the mutex, seems bad
 182          match self.state.compare_and_swap(0, LOCKED, atomics::SeqCst) {
 183              0 => {
 184                  // After acquiring the mutex, we can safely access the inner
 185                  // fields.
 186                  let prev = unsafe {
 187                      mem::replace(&mut *self.flavor.get(), TryLockAcquisition)
 188                  };
 189                  assert_eq!(prev, Unlocked);
 190                  Some(Guard::new(self))
 191              }
 192              _ => None
 193          }
 194      }
 195  
 196      /// Acquires this lock, see `Mutex::lock`
 197      pub fn lock<'a>(&'a self) -> Guard<'a> {
 198          // First, attempt to steal the mutex from an unlocked state. The "fast
 199          // path" needs to have as few atomic instructions as possible, and this
 200          // one cmpxchg is already pretty expensive.
 201          //
 202          // FIXME: this can mess up the fairness of the mutex, seems bad
 203          match self.try_lock() {
 204              Some(guard) => return guard,
 205              None => {}
 206          }
 207  
 208          // After we've failed the fast path, then we delegate to the differnet
 209          // locking protocols for green/native tasks. This will select two tasks
 210          // to continue further (one native, one green).
 211          let tBox<Task> = Local::take();
 212          let can_block = t.can_block();
 213          let native_bit;
 214          if can_block {
 215              self.native_lock(t);
 216              native_bit = NATIVE_BLOCKED;
 217          } else {
 218              self.green_lock(t);
 219              native_bit = GREEN_BLOCKED;
 220          }
 221  
 222          // After we've arbitrated among task types, attempt to re-acquire the
 223          // lock (avoids a deschedule). This is very important to do in order to
 224          // allow threads coming out of the native_lock function to try their
 225          // best to not hit a cvar in deschedule.
 226          let mut old = match self.state.compare_and_swap(0, LOCKED,
 227                                                          atomics::SeqCst) {
 228              0 => {
 229                  let flavor = if can_block {
 230                      NativeAcquisition
 231                  } else {
 232                      GreenAcquisition
 233                  };
 234                  // We've acquired the lock, so this unsafe access to flavor is
 235                  // allowed.
 236                  unsafe { *self.flavor.get() = flavor; }
 237                  return Guard::new(self)
 238              }
 239              old => old,
 240          };
 241  
 242          // Alright, everything else failed. We need to deschedule ourselves and
 243          // flag ourselves as waiting. Note that this case should only happen
 244          // regularly in native/green contention. Due to try_lock and the header
 245          // of lock stealing the lock, it's also possible for native/native
 246          // contention to hit this location, but as less common.
 247          let tBox<Task> = Local::take();
 248          t.deschedule(1, |task| {
 249              let task = unsafe { task.cast_to_uint() };
 250  
 251              // These accesses are protected by the respective native/green
 252              // mutexes which were acquired above.
 253              let prev = if can_block {
 254                  unsafe { mem::replace(&mut *self.native_blocker.get(), task) }
 255              } else {
 256                  unsafe { mem::replace(&mut *self.green_blocker.get(), task) }
 257              };
 258              assert_eq!(prev, 0);
 259  
 260              loop {
 261                  assert_eq!(old & native_bit, 0);
 262                  // If the old state was locked, then we need to flag ourselves
 263                  // as blocking in the state. If the old state was unlocked, then
 264                  // we attempt to acquire the mutex. Everything here is a CAS
 265                  // loop that'll eventually make progress.
 266                  if old & LOCKED != 0 {
 267                      old = match self.state.compare_and_swap(old,
 268                                                              old | native_bit,
 269                                                              atomics::SeqCst) {
 270                          n if n == old => return Ok(()),
 271                          n => n
 272                      };
 273                  } else {
 274                      assert_eq!(old, 0);
 275                      old = match self.state.compare_and_swap(old,
 276                                                              old | LOCKED,
 277                                                              atomics::SeqCst) {
 278                          n if n == old => {
 279                              // After acquiring the lock, we have access to the
 280                              // flavor field, and we've regained access to our
 281                              // respective native/green blocker field.
 282                              let prev = if can_block {
 283                                  unsafe {
 284                                      *self.native_blocker.get() = 0;
 285                                      mem::replace(&mut *self.flavor.get(),
 286                                                   NativeAcquisition)
 287                                  }
 288                              } else {
 289                                  unsafe {
 290                                      *self.green_blocker.get() = 0;
 291                                      mem::replace(&mut *self.flavor.get(),
 292                                                   GreenAcquisition)
 293                                  }
 294                              };
 295                              assert_eq!(prev, Unlocked);
 296                              return Err(unsafe {
 297                                  BlockedTask::cast_from_uint(task)
 298                              })
 299                          }
 300                          n => n,
 301                      };
 302                  }
 303              }
 304          });
 305  
 306          Guard::new(self)
 307      }
 308  
 309      // Tasks which can block are super easy. These tasks just call the blocking
 310      // `lock()` function on an OS mutex
 311      fn native_lock(&self, tBox<Task>) {
 312          Local::put(t);
 313          unsafe { self.lock.lock_noguard(); }
 314      }
 315  
 316      fn native_unlock(&self) {
 317          unsafe { self.lock.unlock_noguard(); }
 318      }
 319  
 320      fn green_lock(&self, tBox<Task>) {
 321          // Green threads flag their presence with an atomic counter, and if they
 322          // fail to be the first to the mutex, they enqueue themselves on a
 323          // concurrent internal queue with a stack-allocated node.
 324          //
 325          // FIXME: There isn't a cancellation currently of an enqueue, forcing
 326          //        the unlocker to spin for a bit.
 327          if self.green_cnt.fetch_add(1, atomics::SeqCst) == 0 {
 328              Local::put(t);
 329              return
 330          }
 331  
 332          let mut node = q::Node::new(0);
 333          t.deschedule(1, |task| {
 334              unsafe {
 335                  node.data = task.cast_to_uint();
 336                  self.q.push(&mut node);
 337              }
 338              Ok(())
 339          });
 340      }
 341  
 342      fn green_unlock(&self) {
 343          // If we're the only green thread, then no need to check the queue,
 344          // otherwise the fixme above forces us to spin for a bit.
 345          if self.green_cnt.fetch_sub(1, atomics::SeqCst) == 1 { return }
 346          let node;
 347          loop {
 348              match unsafe { self.q.pop() } {
 349                  Some(t) => { node = t; break; }
 350                  None => Thread::yield_now(),
 351              }
 352          }
 353          let task = unsafe { BlockedTask::cast_from_uint((*node).data) };
 354          task.wake().map(|t| t.reawaken());
 355      }
 356  
 357      fn unlock(&self) {
 358          // Unlocking this mutex is a little tricky. We favor any task that is
 359          // manually blocked (not in each of the separate locks) in order to help
 360          // provide a little fairness (green threads will wake up the pending
 361          // native thread and native threads will wake up the pending green
 362          // thread).
 363          //
 364          // There's also the question of when we unlock the actual green/native
 365          // locking halves as well. If we're waking up someone, then we can wait
 366          // to unlock until we've acquired the task to wake up (we're guaranteed
 367          // the mutex memory is still valid when there's contenders), but as soon
 368          // as we don't find any contenders we must unlock the mutex, and *then*
 369          // flag the mutex as unlocked.
 370          //
 371          // This flagging can fail, leading to another round of figuring out if a
 372          // task needs to be woken, and in this case it's ok that the "mutex
 373          // halves" are unlocked, we're just mainly dealing with the atomic state
 374          // of the outer mutex.
 375          let flavor = unsafe { mem::replace(&mut *self.flavor.get(), Unlocked) };
 376  
 377          let mut state = self.state.load(atomics::SeqCst);
 378          let mut unlocked = false;
 379          let task;
 380          loop {
 381              assert!(state & LOCKED != 0);
 382              if state & GREEN_BLOCKED != 0 {
 383                  self.unset(state, GREEN_BLOCKED);
 384                  task = unsafe {
 385                      *self.flavor.get() = GreenAcquisition;
 386                      let task = mem::replace(&mut *self.green_blocker.get(), 0);
 387                      BlockedTask::cast_from_uint(task)
 388                  };
 389                  break;
 390              } else if state & NATIVE_BLOCKED != 0 {
 391                  self.unset(state, NATIVE_BLOCKED);
 392                  task = unsafe {
 393                      *self.flavor.get() = NativeAcquisition;
 394                      let task = mem::replace(&mut *self.native_blocker.get(), 0);
 395                      BlockedTask::cast_from_uint(task)
 396                  };
 397                  break;
 398              } else {
 399                  assert_eq!(state, LOCKED);
 400                  if !unlocked {
 401                      match flavor {
 402                          GreenAcquisition => { self.green_unlock(); }
 403                          NativeAcquisition => { self.native_unlock(); }
 404                          TryLockAcquisition => {}
 405                          Unlocked => unreachable!()
 406                      }
 407                      unlocked = true;
 408                  }
 409                  match self.state.compare_and_swap(LOCKED, 0, atomics::SeqCst) {
 410                      LOCKED => return,
 411                      n => { state = n; }
 412                  }
 413              }
 414          }
 415          if !unlocked {
 416              match flavor {
 417                  GreenAcquisition => { self.green_unlock(); }
 418                  NativeAcquisition => { self.native_unlock(); }
 419                  TryLockAcquisition => {}
 420                  Unlocked => unreachable!()
 421              }
 422          }
 423  
 424          task.wake().map(|t| t.reawaken());
 425      }
 426  
 427      /// Loops around a CAS to unset the `bit` in `state`
 428      fn unset(&self, mut stateuint, bituint) {
 429          loop {
 430              assert!(state & bit != 0);
 431              let new = state ^ bit;
 432              match self.state.compare_and_swap(state, new, atomics::SeqCst) {
 433                  n if n == state => break,
 434                  n => { state = n; }
 435              }
 436          }
 437      }
 438  
 439      /// Deallocates resources associated with this static mutex.
 440      ///
 441      /// This method is unsafe because it provides no guarantees that there are
 442      /// no active users of this mutex, and safety is not guaranteed if there are
 443      /// active users of this mutex.
 444      ///
 445      /// This method is required to ensure that there are no memory leaks on
 446      /// *all* platforms. It may be the case that some platforms do not leak
 447      /// memory if this method is not called, but this is not guaranteed to be
 448      /// true on all platforms.
 449      pub unsafe fn destroy(&self) {
 450          self.lock.destroy()
 451      }
 452  }
 453  
 454  impl Mutex {
 455      /// Creates a new mutex in an unlocked state ready for use.
 456      pub fn new() -> Mutex {
 457          Mutex {
 458              lock: StaticMutex {
 459                  state: atomics::AtomicUint::new(0),
 460                  flavor: Unsafe::new(Unlocked),
 461                  green_blocker: Unsafe::new(0),
 462                  native_blocker: Unsafe::new(0),
 463                  green_cnt: atomics::AtomicUint::new(0),
 464                  q: q::Queue::new(),
 465                  lock: unsafe { mutex::StaticNativeMutex::new() },
 466              }
 467          }
 468      }
 469  
 470      /// Attempts to acquire this lock.
 471      ///
 472      /// If the lock could not be acquired at this time, then `None` is returned.
 473      /// Otherwise, an RAII guard is returned. The lock will be unlocked when the
 474      /// guard is dropped.
 475      ///
 476      /// This function does not block.
 477      pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
 478          self.lock.try_lock()
 479      }
 480  
 481      /// Acquires a mutex, blocking the current task until it is able to do so.
 482      ///
 483      /// This function will block the local task until it is available to acquire
 484      /// the mutex. Upon returning, the task is the only task with the mutex
 485      /// held. An RAII guard is returned to allow scoped unlock of the lock. When
 486      /// the guard goes out of scope, the mutex will be unlocked.
 487      pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() }
 488  }
 489  
 490  impl<'a> Guard<'a> {
 491      fn new<'b>(lock&'b StaticMutex) -> Guard<'b> {
 492          if cfg!(debug) {
 493              // once we've acquired a lock, it's ok to access the flavor
 494              assert!(unsafe { *lock.flavor.get() != Unlocked });
 495              assert!(lock.state.load(atomics::SeqCst) & LOCKED != 0);
 496          }
 497          Guard { lock: lock }
 498      }
 499  }
 500  
 501  #[unsafe_destructor]
 502  impl<'a> Drop for Guard<'a> {
 503      #[inline]
 504      fn drop(&mut self) {
 505          self.lock.unlock();
 506      }
 507  }
 508  
 509  impl Drop for Mutex {
 510      fn drop(&mut self) {
 511          // This is actually safe b/c we know that there is no further usage of
 512          // this mutex (it's up to the user to arrange for a mutex to get
 513          // dropped, that's not our job)
 514          unsafe { self.lock.destroy() }
 515      }
 516  }
 517  
 518  #[cfg(test)]
 519  mod test {
 520      extern crate native;
 521      use super::{Mutex, StaticMutex, MUTEX_INIT};
 522  
 523      #[test]
 524      fn smoke() {
 525          let m = Mutex::new();
 526          drop(m.lock());
 527          drop(m.lock());
 528      }
 529  
 530      #[test]
 531      fn smoke_static() {
 532          static mut m: StaticMutex = MUTEX_INIT;
 533          unsafe {
 534              drop(m.lock());
 535              drop(m.lock());
 536              m.destroy();
 537          }
 538      }
 539  
 540      #[test]
 541      fn lots_and_lots() {
 542          static mut m: StaticMutex = MUTEX_INIT;
 543          static mut CNT: uint = 0;
 544          static M: uint = 1000;
 545          static N: uint = 3;
 546  
 547          fn inc() {
 548              for _ in range(0, M) {
 549                  unsafe {
 550                      let _g = m.lock();
 551                      CNT += 1;
 552                  }
 553              }
 554          }
 555  
 556          let (tx, rx) = channel();
 557          for _ in range(0, N) {
 558              let tx2 = tx.clone();
 559              native::task::spawn(proc() { inc(); tx2.send(()); });
 560              let tx2 = tx.clone();
 561              spawn(proc() { inc(); tx2.send(()); });
 562          }
 563  
 564          drop(tx);
 565          for _ in range(0, 2 * N) {
 566              rx.recv();
 567          }
 568          assert_eq!(unsafe {CNT}, M * N * 2);
 569          unsafe {
 570              m.destroy();
 571          }
 572      }
 573  
 574      #[test]
 575      fn trylock() {
 576          let m = Mutex::new();
 577          assert!(m.try_lock().is_some());
 578      }
 579  }


libsync/mutex.rs:95:8-95:8 -struct- definition:
/// ```
pub struct Mutex {
    lock: StaticMutex,
references:- 5
456:     pub fn new() -> Mutex {
457:         Mutex {
458:             lock: StaticMutex {
--
509: impl Drop for Mutex {
510:     fn drop(&mut self) {
libsync/raw.rs:
84: struct Sem<Q> {
85:     lock: mutex::Mutex,
86:     // n.b, we need Sem to be `Share`, but the WaitQueue type is not send/share
libsync/mutex.rs:
454: impl Mutex {
455:     /// Creates a new mutex in an unlocked state ready for use.


libsync/mutex.rs:128:8-128:8 -struct- definition:
/// ```
pub struct StaticMutex {
    /// Current set of flags on this mutex
references:- 8
457:         Mutex {
458:             lock: StaticMutex {
459:                 state: atomics::AtomicUint::new(0),
--
490: impl<'a> Guard<'a> {
491:     fn new<'b>(lock: &'b StaticMutex) -> Guard<'b> {
492:         if cfg!(debug) {
libsync/one.rs:
43: pub struct Once {
44:     mutex: StaticMutex,
45:     cnt: atomics::AtomicInt,
libsync/mutex.rs:
96: pub struct Mutex {
97:     lock: StaticMutex,
98: }


libsync/mutex.rs:100:22-100:22 -enum- definition:
enum Flavor {
    Unlocked,
    TryLockAcquisition,
references:- 5
101: enum Flavor {
--
135:     /// Type of locking operation currently on this mutex
136:     flavor: Unsafe<Flavor>,
137:     /// uint-cast of the green thread waiting for this mutex


libsync/mutex.rs:150:12-150:12 -struct- definition:
pub struct Guard<'a> {
    lock: &'a StaticMutex,
}
references:- 8
496:         }
497:         Guard { lock: lock }
498:     }
--
502: impl<'a> Drop for Guard<'a> {
503:     #[inline]