(index<- )        ./libextra/sync.rs

    1  // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
    2  // file at the top-level directory of this distribution and at
    3  // http://rust-lang.org/COPYRIGHT.
    4  //
    5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
    6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
    7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
    8  // option. This file may not be copied, modified, or distributed
    9  // except according to those terms.
   10  
   11  /**
   12   * The concurrency primitives you know and love.
   13   *
   14   * Maybe once we have a "core exports x only to std" mechanism, these can be
   15   * in std.
   16   */
   17  
   18  
   19  use std::borrow;
   20  use std::comm;
   21  use std::comm::SendDeferred;
   22  use std::comm::{GenericPort, Peekable};
   23  use std::task;
   24  use std::unstable::sync::{Exclusive, UnsafeArc};
   25  use std::unstable::atomics;
   26  use std::unstable::finally::Finally;
   27  use std::util;
   28  use std::util::NonCopyable;
   29  
   30  /****************************************************************************
   31   * Internals
   32   ****************************************************************************/
   33  
   34  // Each waiting task receives on one of these.
   35  #[doc(hidden)]
   36  type WaitEnd = comm::PortOne<()>;
   37  #[doc(hidden)]
   38  type SignalEnd = comm::ChanOne<()>;
   39  // A doubly-ended queue of waiting tasks.
   40  #[doc(hidden)]
   41  struct WaitQueue { head: comm::Port<SignalEnd>,
   42                     tail: comm::Chan<SignalEnd> }
   43  
   44  impl WaitQueue {
   45      fn new() -> WaitQueue {
   46          let (block_head, block_tail) = comm::stream();
   47          WaitQueue { head: block_head, tail: block_tail }
   48      }
   49  
   50      // Signals one live task from the queue.
   51      fn signal(&self) -> bool {
   52          // The peek is mandatory to make sure recv doesn't block.
   53          if self.head.peek() {
   54              // Pop and send a wakeup signal. If the waiter was killed, its port
   55              // will have closed. Keep trying until we get a live task.
   56              if self.head.recv().try_send_deferred(()) {
   57                  true
   58              } else {
   59                  self.signal()
   60              }
   61          } else {
   62              false
   63          }
   64      }
   65  
   66      fn broadcast(&self) -> uint {
   67          let mut count = 0;
   68          while self.head.peek() {
   69              if self.head.recv().try_send_deferred(()) {
   70                  count += 1;
   71              }
   72          }
   73          count
   74      }
   75  
   76      fn wait_end(&self) -> WaitEnd {
   77          let (wait_end, signal_end) = comm::oneshot();
   78          self.tail.send_deferred(signal_end);
   79          wait_end
   80      }
   81  }
   82  
   83  // The building-block used to make semaphores, mutexes, and rwlocks.
   84  #[doc(hidden)]
   85  struct SemInner<Q> {
   86      count: int,
   87      waiters:   WaitQueue,
   88      // Can be either unit or another waitqueue. Some sems shouldn't come with
   89      // a condition variable attached, others should.
   90      blocked:   Q
   91  }
   92  
   93  #[doc(hidden)]
   94  struct Sem<Q>(Exclusive<SemInner<Q>>);
   95  
   96  #[doc(hidden)]
   97  impl<Q:Send> Sem<Q> {
   98      fn new(countint, qQ) -> Sem<Q> {
   99          Sem(Exclusive::new(SemInner {
  100              count: count, waiters: WaitQueue::new(), blocked: q }))
  101      }
  102  
  103      pub fn acquire(&self) {
  104          unsafe {
  105              let mut waiter_nobe = None;
  106              do (**self).with |state| {
  107                  state.count -= 1;
  108                  if state.count < 0 {
  109                      // Create waiter nobe, enqueue ourself, and tell
  110                      // outer scope we need to block.
  111                      waiter_nobe = Some(state.waiters.wait_end());
  112                  }
  113              }
  114              // Uncomment if you wish to test for sem races. Not valgrind-friendly.
  115              /* do 1000.times { task::deschedule(); } */
  116              // Need to wait outside the exclusive.
  117              if waiter_nobe.is_some() {
  118                  let _ = waiter_nobe.unwrap().recv();
  119              }
  120          }
  121      }
  122  
  123      pub fn release(&self) {
  124          unsafe {
  125              do (**self).with |state| {
  126                  state.count += 1;
  127                  if state.count <= 0 {
  128                      state.waiters.signal();
  129                  }
  130              }
  131          }
  132      }
  133  
  134      pub fn access<U>(&self, blk&fn() -> U) -> U {
  135          do task::unkillable {
  136              do (|| {
  137                  self.acquire();
  138                  do task::rekillable { blk() }
  139              }).finally {
  140                  self.release();
  141              }
  142          }
  143      }
  144  }
  145  
  146  #[doc(hidden)]
  147  impl Sem<~[WaitQueue]> {
  148      fn new_and_signal(countint, num_condvarsuint)
  149          -> Sem<~[WaitQueue]> {
  150          let mut queues = ~[];
  151          do num_condvars.times {
  152              queues.push(WaitQueue::new());
  153          }
  154          Sem::new(count, queues)
  155      }
  156  }
  157  
  158  // FIXME(#3598): Want to use an Option down below, but we need a custom enum
  159  // that's not polymorphic to get around the fact that lifetimes are invariant
  160  // inside of type parameters.
  161  enum ReacquireOrderLock<'self> {
  162      Nothing, // c.c
  163      Just(&'self Semaphore),
  164  }
  165  
  166  /// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
  167  pub struct Condvar<'self> {
  168      // The 'Sem' object associated with this condvar. This is the one that's
  169      // atomically-unlocked-and-descheduled upon and reacquired during wakeup.
  170      priv sem: &'self Sem<~[WaitQueue]>,
  171      // This is (can be) an extra semaphore which is held around the reacquire
  172      // operation on the first one. This is only used in cvars associated with
  173      // rwlocks, and is needed to ensure that, when a downgrader is trying to
  174      // hand off the access lock (which would be the first field, here), a 2nd
  175      // writer waking up from a cvar wait can't race with a reader to steal it,
  176      // See the comment in write_cond for more detail.
  177      priv order: ReacquireOrderLock<'self>,
  178      // Make sure condvars are non-copyable.
  179      priv token: util::NonCopyable,
  180  }
  181  
  182  impl<'self> Condvar<'self> {
  183      /**
  184       * Atomically drop the associated lock, and block until a signal is sent.
  185       *
  186       * # Failure
  187       * A task which is killed (i.e., by linked failure with another task)
  188       * while waiting on a condition variable will wake up, fail, and unlock
  189       * the associated lock as it unwinds.
  190       */
  191      pub fn wait(&self) { self.wait_on(0) }
  192  
  193      /**
  194       * As wait(), but can specify which of multiple condition variables to
  195       * wait on. Only a signal_on() or broadcast_on() with the same condvar_id
  196       * will wake this thread.
  197       *
  198       * The associated lock must have been initialised with an appropriate
  199       * number of condvars. The condvar_id must be between 0 and num_condvars-1
  200       * or else this call will fail.
  201       *
  202       * wait() is equivalent to wait_on(0).
  203       */
  204      pub fn wait_on(&self, condvar_iduint) {
  205          let mut WaitEnd = None;
  206          let mut out_of_bounds = None;
  207          do task::unkillable {
  208              // Release lock, 'atomically' enqueuing ourselves in so doing.
  209              unsafe {
  210                  do (**self.sem).with |state| {
  211                      if condvar_id < state.blocked.len() {
  212                          // Drop the lock.
  213                          state.count += 1;
  214                          if state.count <= 0 {
  215                              state.waiters.signal();
  216                          }
  217                          // Create waiter nobe, and enqueue ourself to
  218                          // be woken up by a signaller.
  219                          WaitEnd = Some(state.blocked[condvar_id].wait_end());
  220                      } else {
  221                          out_of_bounds = Some(state.blocked.len());
  222                      }
  223                  }
  224              }
  225  
  226              // If deschedule checks start getting inserted anywhere, we can be
  227              // killed before or after enqueueing. Deciding whether to
  228              // unkillably reacquire the lock needs to happen atomically
  229              // wrt enqueuing.
  230              do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
  231                  // Unconditionally "block". (Might not actually block if a
  232                  // signaller already sent -- I mean 'unconditionally' in contrast
  233                  // with acquire().)
  234                  do (|| {
  235                      do task::rekillable {
  236                          let _ = WaitEnd.take_unwrap().recv();
  237                      }
  238                  }).finally {
  239                      // Reacquire the condvar. Note this is back in the unkillable
  240                      // section; it needs to succeed, instead of itself dying.
  241                      match self.order {
  242                          Just(lock) => do lock.access {
  243                              self.sem.acquire();
  244                          },
  245                          Nothing => {
  246                              self.sem.acquire();
  247                          },
  248                      }
  249                  }
  250              }
  251          }
  252      }
  253  
  254      /// Wake up a blocked task. Returns false if there was no blocked task.
  255      pub fn signal(&self) -> bool { self.signal_on(0) }
  256  
  257      /// As signal, but with a specified condvar_id. See wait_on.
  258      pub fn signal_on(&self, condvar_iduint) -> bool {
  259          unsafe {
  260              let mut out_of_bounds = None;
  261              let mut result = false;
  262              do (**self.sem).with |state| {
  263                  if condvar_id < state.blocked.len() {
  264                      result = state.blocked[condvar_id].signal();
  265                  } else {
  266                      out_of_bounds = Some(state.blocked.len());
  267                  }
  268              }
  269              do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
  270                  result
  271              }
  272          }
  273      }
  274  
  275      /// Wake up all blocked tasks. Returns the number of tasks woken.
  276      pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
  277  
  278      /// As broadcast, but with a specified condvar_id. See wait_on.
  279      pub fn broadcast_on(&self, condvar_iduint) -> uint {
  280          let mut out_of_bounds = None;
  281          let mut queue = None;
  282          unsafe {
  283              do (**self.sem).with |state| {
  284                  if condvar_id < state.blocked.len() {
  285                      // To avoid :broadcast_heavy, we make a new waitqueue,
  286                      // swap it out with the old one, and broadcast on the
  287                      // old one outside of the little-lock.
  288                      queue = Some(util::replace(&mut state.blocked[condvar_id],
  289                                                 WaitQueue::new()));
  290                  } else {
  291                      out_of_bounds = Some(state.blocked.len());
  292                  }
  293              }
  294              do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
  295                  let queue = queue.take_unwrap();
  296                  queue.broadcast()
  297              }
  298          }
  299      }
  300  }
  301  
  302  // Checks whether a condvar ID was out of bounds, and fails if so, or does
  303  // something else next on success.
  304  #[inline]
  305  #[doc(hidden)]
  306  fn check_cvar_bounds<U>(out_of_boundsOption<uint>, iduint, act&str,
  307                          blk&fn() -> U) -> U {
  308      match out_of_bounds {
  309          Some(0) =>
  310              fail!("%s with illegal ID %u - this lock has no condvars!", act, id),
  311          Some(length) =>
  312              fail!("%s with illegal ID %u - ID must be less than %u", act, id, length),
  313          None => blk()
  314      }
  315  }
  316  
  317  #[doc(hidden)]
  318  impl Sem<~[WaitQueue]> {
  319      // The only other places that condvars get built are rwlock.write_cond()
  320      // and rwlock_write_mode.
  321      pub fn access_cond<U>(&self, blk&fn(c: &Condvar) -> U) -> U {
  322          do self.access {
  323              blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() })
  324          }
  325      }
  326  }
  327  
  328  /****************************************************************************
  329   * Semaphores
  330   ****************************************************************************/
  331  
  332  /// A counting, blocking, bounded-waiting semaphore.
  333  struct Semaphore { priv sem: Sem<()> }
  334  
  335  
  336  impl Clone for Semaphore {
  337      /// Create a new handle to the semaphore.
  338      fn clone(&self) -> Semaphore {
  339          Semaphore { sem: Sem((*self.sem).clone()) }
  340      }
  341  }
  342  
  343  impl Semaphore {
  344      /// Create a new semaphore with the specified count.
  345      pub fn new(countint) -> Semaphore {
  346          Semaphore { sem: Sem::new(count, ()) }
  347      }
  348  
  349      /**
  350       * Acquire a resource represented by the semaphore. Blocks if necessary
  351       * until resource(s) become available.
  352       */
  353      pub fn acquire(&self) { (&self.sem).acquire() }
  354  
  355      /**
  356       * Release a held resource represented by the semaphore. Wakes a blocked
  357       * contending task, if any exist. Won't block the caller.
  358       */
  359      pub fn release(&self) { (&self.sem).release() }
  360  
  361      /// Run a function with ownership of one of the semaphore's resources.
  362      pub fn access<U>(&self, blk&fn() -> U) -> U { (&self.sem).access(blk) }
  363  }
  364  
  365  /****************************************************************************
  366   * Mutexes
  367   ****************************************************************************/
  368  
  369  /**
  370   * A blocking, bounded-waiting, mutual exclusion lock with an associated
  371   * FIFO condition variable.
  372   *
  373   * # Failure
  374   * A task which fails while holding a mutex will unlock the mutex as it
  375   * unwinds.
  376   */
  377  pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
  378  
  379  impl Clone for Mutex {
  380      /// Create a new handle to the mutex.
  381      fn clone(&self) -> Mutex { Mutex { sem: Sem((*self.sem).clone()) } }
  382  }
  383  
  384  impl Mutex {
  385      /// Create a new mutex, with one associated condvar.
  386      pub fn new() -> Mutex { Mutex::new_with_condvars(1) }
  387  
  388      /**
  389      * Create a new mutex, with a specified number of associated condvars. This
  390      * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between
  391      * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but
  392      * any operations on the condvar will fail.)
  393      */
  394      pub fn new_with_condvars(num_condvarsuint) -> Mutex {
  395          Mutex { sem: Sem::new_and_signal(1, num_condvars) }
  396      }
  397  
  398  
  399      /// Run a function with ownership of the mutex.
  400      pub fn lock<U>(&self, blk&fn() -> U) -> U {
  401          (&self.sem).access(blk)
  402      }
  403  
  404      /// Run a function with ownership of the mutex and a handle to a condvar.
  405      pub fn lock_cond<U>(&self, blk&fn(c: &Condvar) -> U) -> U {
  406          (&self.sem).access_cond(blk)
  407      }
  408  }
  409  
  410  /****************************************************************************
  411   * Reader-writer locks
  412   ****************************************************************************/
  413  
  414  // NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem
  415  
  416  #[doc(hidden)]
  417  struct RWLockInner {
  418      // You might ask, "Why don't you need to use an atomic for the mode flag?"
  419      // This flag affects the behaviour of readers (for plain readers, they
  420      // assert on it; for downgraders, they use it to decide which mode to
  421      // unlock for). Consider that the flag is only unset when the very last
  422      // reader exits; therefore, it can never be unset during a reader/reader
  423      // (or reader/downgrader) race.
  424      // By the way, if we didn't care about the assert in the read unlock path,
  425      // we could instead store the mode flag in write_downgrade's stack frame,
  426      // and have the downgrade tokens store a borrowed pointer to it.
  427      read_mode:  bool,
  428      // The only way the count flag is ever accessed is with xadd. Since it is
  429      // a read-modify-write operation, multiple xadds on different cores will
  430      // always be consistent with respect to each other, so a monotonic/relaxed
  431      // consistency ordering suffices (i.e., no extra barriers are needed).
  432      // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
  433      // acquire/release orderings superfluously. Change these someday.
  434      read_count: atomics::AtomicUint,
  435  }
  436  
  437  /**
  438   * A blocking, no-starvation, reader-writer lock with an associated condvar.
  439   *
  440   * # Failure
  441   * A task which fails while holding an rwlock will unlock the rwlock as it
  442   * unwinds.
  443   */
  444  pub struct RWLock {
  445      priv order_lock:  Semaphore,
  446      priv access_lock: Sem<~[WaitQueue]>,
  447      priv state:       UnsafeArc<RWLockInner>,
  448  }
  449  
  450  impl RWLock {
  451      /// Create a new rwlock, with one associated condvar.
  452      pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
  453  
  454      /**
  455      * Create a new rwlock, with a specified number of associated condvars.
  456      * Similar to mutex_with_condvars.
  457      */
  458      pub fn new_with_condvars(num_condvarsuint) -> RWLock {
  459          let state = UnsafeArc::new(RWLockInner {
  460              read_mode:  false,
  461              read_count: atomics::AtomicUint::new(0),
  462          });
  463          RWLock { order_lock:  Semaphore::new(1),
  464                  access_lock: Sem::new_and_signal(1, num_condvars),
  465                  state:       state, }
  466      }
  467  
  468      /// Create a new handle to the rwlock.
  469      pub fn clone(&self) -> RWLock {
  470          RWLock { order_lock:  (&(self.order_lock)).clone(),
  471                   access_lock: Sem((*self.access_lock).clone()),
  472                   state:       self.state.clone() }
  473      }
  474  
  475      /**
  476       * Run a function with the rwlock in read mode. Calls to 'read' from other
  477       * tasks may run concurrently with this one.
  478       */
  479      pub fn read<U>(&self, blk&fn() -> U) -> U {
  480          unsafe {
  481              do task::unkillable {
  482                  do (&self.order_lock).access {
  483                      let state = &mut *self.state.get();
  484                      let old_count = state.read_count.fetch_add(1, atomics::Acquire);
  485                      if old_count == 0 {
  486                          (&self.access_lock).acquire();
  487                          state.read_mode = true;
  488                      }
  489                  }
  490                  do (|| {
  491                      do task::rekillable { blk() }
  492                  }).finally {
  493                      let state = &mut *self.state.get();
  494                      assert!(state.read_mode);
  495                      let old_count = state.read_count.fetch_sub(1, atomics::Release);
  496                      assert!(old_count > 0);
  497                      if old_count == 1 {
  498                          state.read_mode = false;
  499                          // Note: this release used to be outside of a locked access
  500                          // to exclusive-protected state. If this code is ever
  501                          // converted back to such (instead of using atomic ops),
  502                          // this access MUST NOT go inside the exclusive access.
  503                          (&self.access_lock).release();
  504                      }
  505                  }
  506              }
  507          }
  508      }
  509  
  510      /**
  511       * Run a function with the rwlock in write mode. No calls to 'read' or
  512       * 'write' from other tasks will run concurrently with this one.
  513       */
  514      pub fn write<U>(&self, blk&fn() -> U) -> U {
  515          do task::unkillable {
  516              (&self.order_lock).acquire();
  517              do (&self.access_lock).access {
  518                  (&self.order_lock).release();
  519                  do task::rekillable {
  520                      blk()
  521                  }
  522              }
  523          }
  524      }
  525  
  526      /**
  527       * As write(), but also with a handle to a condvar. Waiting on this
  528       * condvar will allow readers and writers alike to take the rwlock before
  529       * the waiting task is signalled. (Note: a writer that waited and then
  530       * was signalled might reacquire the lock before other waiting writers.)
  531       */
  532      pub fn write_cond<U>(&self, blk&fn(c: &Condvar) -> U) -> U {
  533          // It's important to thread our order lock into the condvar, so that
  534          // when a cond.wait() wakes up, it uses it while reacquiring the
  535          // access lock. If we permitted a waking-up writer to "cut in line",
  536          // there could arise a subtle race when a downgrader attempts to hand
  537          // off the reader cloud lock to a waiting reader. This race is tested
  538          // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like:
  539          // T1 (writer)              T2 (downgrader)             T3 (reader)
  540          // [in cond.wait()]
  541          //                          [locks for writing]
  542          //                          [holds access_lock]
  543          // [is signalled, perhaps by
  544          //  downgrader or a 4th thread]
  545          // tries to lock access(!)
  546          //                                                      lock order_lock
  547          //                                                      xadd read_count[0->1]
  548          //                                                      tries to lock access
  549          //                          [downgrade]
  550          //                          xadd read_count[1->2]
  551          //                          unlock access
  552          // Since T1 contended on the access lock before T3 did, it will steal
  553          // the lock handoff. Adding order_lock in the condvar reacquire path
  554          // solves this because T1 will hold order_lock while waiting on access,
  555          // which will cause T3 to have to wait until T1 finishes its write,
  556          // which can't happen until T2 finishes the downgrade-read entirely.
  557          // The astute reader will also note that making waking writers use the
  558          // order_lock is better for not starving readers.
  559          do task::unkillable {
  560              (&self.order_lock).acquire();
  561              do (&self.access_lock).access_cond |cond| {
  562                  (&self.order_lock).release();
  563                  do task::rekillable {
  564                      let opt_lock = Just(&self.order_lock);
  565                      blk(&Condvar { sem: cond.sem, order: opt_lock,
  566                                     token: NonCopyable::new() })
  567                  }
  568              }
  569          }
  570      }
  571  
  572      /**
  573       * As write(), but with the ability to atomically 'downgrade' the lock;
  574       * i.e., to become a reader without letting other writers get the lock in
  575       * the meantime (such as unlocking and then re-locking as a reader would
  576       * do). The block takes a "write mode token" argument, which can be
  577       * transformed into a "read mode token" by calling downgrade(). Example:
  578       *
  579       * # Example
  580       *
  581       * ~~~ {.rust}
  582       * do lock.write_downgrade |mut write_token| {
  583       *     do write_token.write_cond |condvar| {
  584       *         ... exclusive access ...
  585       *     }
  586       *     let read_token = lock.downgrade(write_token);
  587       *     do read_token.read {
  588       *         ... shared access ...
  589       *     }
  590       * }
  591       * ~~~
  592       */
  593      pub fn write_downgrade<U>(&self, blk&fn(v: RWLockWriteMode) -> U) -> U {
  594          // Implementation slightly different from the slicker 'write's above.
  595          // The exit path is conditional on whether the caller downgrades.
  596          do task::unkillable {
  597              (&self.order_lock).acquire();
  598              (&self.access_lock).acquire();
  599              (&self.order_lock).release();
  600              do (|| {
  601                  do task::rekillable {
  602                      blk(RWLockWriteMode { lock: self, token: NonCopyable::new() })
  603                  }
  604              }).finally {
  605                  let writer_or_last_reader;
  606                  // Check if we're releasing from read mode or from write mode.
  607                  let state = unsafe { &mut *self.state.get() };
  608                  if state.read_mode {
  609                      // Releasing from read mode.
  610                      let old_count = state.read_count.fetch_sub(1, atomics::Release);
  611                      assert!(old_count > 0);
  612                      // Check if other readers remain.
  613                      if old_count == 1 {
  614                          // Case 1: Writer downgraded & was the last reader
  615                          writer_or_last_reader = true;
  616                          state.read_mode = false;
  617                      } else {
  618                          // Case 2: Writer downgraded & was not the last reader
  619                          writer_or_last_reader = false;
  620                      }
  621                  } else {
  622                      // Case 3: Writer did not downgrade
  623                      writer_or_last_reader = true;
  624                  }
  625                  if writer_or_last_reader {
  626                      // Nobody left inside; release the "reader cloud" lock.
  627                      (&self.access_lock).release();
  628                  }
  629              }
  630          }
  631      }
  632  
  633      /// To be called inside of the write_downgrade block.
  634      pub fn downgrade<'a>(&self, tokenRWLockWriteMode<'a>)
  635                           -> RWLockReadMode<'a> {
  636          if !borrow::ref_eq(self, token.lock) {
  637              fail!("Can't downgrade() with a different rwlock's write_mode!");
  638          }
  639          unsafe {
  640              do task::unkillable {
  641                  let state = &mut *self.state.get();
  642                  assert!(!state.read_mode);
  643                  state.read_mode = true;
  644                  // If a reader attempts to enter at this point, both the
  645                  // downgrader and reader will set the mode flag. This is fine.
  646                  let old_count = state.read_count.fetch_add(1, atomics::Release);
  647                  // If another reader was already blocking, we need to hand-off
  648                  // the "reader cloud" access lock to them.
  649                  if old_count != 0 {
  650                      // Guaranteed not to let another writer in, because
  651                      // another reader was holding the order_lock. Hence they
  652                      // must be the one to get the access_lock (because all
  653                      // access_locks are acquired with order_lock held). See
  654                      // the comment in write_cond for more justification.
  655                      (&self.access_lock).release();
  656                  }
  657              }
  658          }
  659          RWLockReadMode { lock: token.lock, token: NonCopyable::new() }
  660      }
  661  }
  662  
  663  /// The "write permission" token used for rwlock.write_downgrade().
  664  pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
  665  
  666  /// The "read permission" token used for rwlock.write_downgrade().
  667  pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
  668                                     priv token: NonCopyable }
  669  
  670  impl<'self> RWLockWriteMode<'self> {
  671      /// Access the pre-downgrade rwlock in write mode.
  672      pub fn write<U>(&self, blk&fn() -> U) -> U { blk() }
  673      /// Access the pre-downgrade rwlock in write mode with a condvar.
  674      pub fn write_cond<U>(&self, blk&fn(c: &Condvar) -> U) -> U {
  675          // Need to make the condvar use the order lock when reacquiring the
  676          // access lock. See comment in RWLock::write_cond for why.
  677          blk(&Condvar { sem:        &self.lock.access_lock,
  678                         order: Just(&self.lock.order_lock),
  679                         token: NonCopyable::new() })
  680      }
  681  }
  682  
  683  impl<'self> RWLockReadMode<'self> {
  684      /// Access the post-downgrade rwlock in read mode.
  685      pub fn read<U>(&self, blk&fn() -> U) -> U { blk() }
  686  }
  687  
  688  /****************************************************************************
  689   * Tests
  690   ****************************************************************************/
  691  
  692  #[cfg(test)]
  693  mod tests {
  694      use sync::*;
  695  
  696      use std::cast;
  697      use std::cell::Cell;
  698      use std::comm;
  699      use std::result;
  700      use std::task;
  701  
  702      /************************************************************************
  703       * Semaphore tests
  704       ************************************************************************/
  705      #[test]
  706      fn test_sem_acquire_release() {
  707          let s = Semaphore::new(1);
  708          s.acquire();
  709          s.release();
  710          s.acquire();
  711      }
  712      #[test]
  713      fn test_sem_basic() {
  714          let s = Semaphore::new(1);
  715          do s.access { }
  716      }
  717      #[test]
  718      fn test_sem_as_mutex() {
  719          let s = Semaphore::new(1);
  720          let s2 = s.clone();
  721          do task::spawn {
  722              do s2.access {
  723                  do 5.times { task::deschedule(); }
  724              }
  725          }
  726          do s.access {
  727              do 5.times { task::deschedule(); }
  728          }
  729      }
  730      #[test]
  731      fn test_sem_as_cvar() {
  732          /* Child waits and parent signals */
  733          let (p, c) = comm::stream();
  734          let s = Semaphore::new(0);
  735          let s2 = s.clone();
  736          do task::spawn {
  737              s2.acquire();
  738              c.send(());
  739          }
  740          do 5.times { task::deschedule(); }
  741          s.release();
  742          let _ = p.recv();
  743  
  744          /* Parent waits and child signals */
  745          let (p, c) = comm::stream();
  746          let s = Semaphore::new(0);
  747          let s2 = s.clone();
  748          do task::spawn {
  749              do 5.times { task::deschedule(); }
  750              s2.release();
  751              let _ = p.recv();
  752          }
  753          s.acquire();
  754          c.send(());
  755      }
  756      #[test]
  757      fn test_sem_multi_resource() {
  758          // Parent and child both get in the critical section at the same
  759          // time, and shake hands.
  760          let s = Semaphore::new(2);
  761          let s2 = s.clone();
  762          let (p1,c1) = comm::stream();
  763          let (p2,c2) = comm::stream();
  764          do task::spawn {
  765              do s2.access {
  766                  let _ = p2.recv();
  767                  c1.send(());
  768              }
  769          }
  770          do s.access {
  771              c2.send(());
  772              let _ = p1.recv();
  773          }
  774      }
  775      #[test]
  776      fn test_sem_runtime_friendly_blocking() {
  777          // Force the runtime to schedule two threads on the same sched_loop.
  778          // When one blocks, it should schedule the other one.
  779          do task::spawn_sched(task::SingleThreaded) {
  780              let s = Semaphore::new(1);
  781              let s2 = s.clone();
  782              let (p, c) = comm::stream();
  783              let child_data = Cell::new((s2, c));
  784              do s.access {
  785                  let (s2, c) = child_data.take();
  786                  do task::spawn {
  787                      c.send(());
  788                      do s2.access { }
  789                      c.send(());
  790                  }
  791                  let _ = p.recv(); // wait for child to come alive
  792                  do 5.times { task::deschedule(); } // let the child contend
  793              }
  794              let _ = p.recv(); // wait for child to be done
  795          }
  796      }
  797      /************************************************************************
  798       * Mutex tests
  799       ************************************************************************/
  800      #[test]
  801      fn test_mutex_lock() {
  802          // Unsafely achieve shared state, and do the textbook
  803          // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
  804          let (p, c) = comm::stream();
  805          let m = Mutex::new();
  806          let m2 = m.clone();
  807          let mut sharedstate = ~0;
  808          {
  809              let ptr: *int = &*sharedstate;
  810              do task::spawn {
  811                  let sharedstate: &mut int =
  812                      unsafe { cast::transmute(ptr) };
  813                  access_shared(sharedstate, &m2, 10);
  814                  c.send(());
  815  
  816              }
  817          }
  818          {
  819              access_shared(sharedstate, &m, 10);
  820              let _ = p.recv();
  821  
  822              assert_eq!(*sharedstate, 20);
  823          }
  824  
  825          fn access_shared(sharedstate: &mut int, m: &Mutex, n: uint) {
  826              do n.times {
  827                  do m.lock {
  828                      let oldval = *sharedstate;
  829                      task::deschedule();
  830                      *sharedstate = oldval + 1;
  831                  }
  832              }
  833          }
  834      }
  835      #[test]
  836      fn test_mutex_cond_wait() {
  837          let m = Mutex::new();
  838  
  839          // Child wakes up parent
  840          do m.lock_cond |cond| {
  841              let m2 = m.clone();
  842              do task::spawn {
  843                  do m2.lock_cond |cond| {
  844                      let woken = cond.signal();
  845                      assert!(woken);
  846                  }
  847              }
  848              cond.wait();
  849          }
  850          // Parent wakes up child
  851          let (port,chan) = comm::stream();
  852          let m3 = m.clone();
  853          do task::spawn {
  854              do m3.lock_cond |cond| {
  855                  chan.send(());
  856                  cond.wait();
  857                  chan.send(());
  858              }
  859          }
  860          let _ = port.recv(); // Wait until child gets in the mutex
  861          do m.lock_cond |cond| {
  862              let woken = cond.signal();
  863              assert!(woken);
  864          }
  865          let _ = port.recv(); // Wait until child wakes up
  866      }
  867      #[cfg(test)]
  868      fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
  869          let m = Mutex::new();
  870          let mut ports = ~[];
  871  
  872          do num_waiters.times {
  873              let mi = m.clone();
  874              let (port, chan) = comm::stream();
  875              ports.push(port);
  876              do task::spawn {
  877                  do mi.lock_cond |cond| {
  878                      chan.send(());
  879                      cond.wait();
  880                      chan.send(());
  881                  }
  882              }
  883          }
  884  
  885          // wait until all children get in the mutex
  886          for port in ports.iter() { let _ = port.recv(); }
  887          do m.lock_cond |cond| {
  888              let num_woken = cond.broadcast();
  889              assert_eq!(num_woken, num_waiters);
  890          }
  891          // wait until all children wake up
  892          for port in ports.iter() { let _ = port.recv(); }
  893      }
  894      #[test]
  895      fn test_mutex_cond_broadcast() {
  896          test_mutex_cond_broadcast_helper(12);
  897      }
  898      #[test]
  899      fn test_mutex_cond_broadcast_none() {
  900          test_mutex_cond_broadcast_helper(0);
  901      }
  902      #[test]
  903      fn test_mutex_cond_no_waiter() {
  904          let m = Mutex::new();
  905          let m2 = m.clone();
  906          do task::try {
  907              do m.lock_cond |_x| { }
  908          };
  909          do m2.lock_cond |cond| {
  910              assert!(!cond.signal());
  911          }
  912      }
  913      #[test]
  914      fn test_mutex_killed_simple() {
  915          // Mutex must get automatically unlocked if failed/killed within.
  916          let m = Mutex::new();
  917          let m2 = m.clone();
  918  
  919          let result: result::Result<(),()> = do task::try {
  920              do m2.lock {
  921                  fail!();
  922              }
  923          };
  924          assert!(result.is_err());
  925          // child task must have finished by the time try returns
  926          do m.lock { }
  927      }
  928      #[ignore(reason = "linked failure")]
  929      #[test]
  930      fn test_mutex_killed_cond() {
  931          // Getting killed during cond wait must not corrupt the mutex while
  932          // unwinding (e.g. double unlock).
  933          let m = Mutex::new();
  934          let m2 = m.clone();
  935  
  936          let result: result::Result<(),()> = do task::try {
  937              let (p, c) = comm::stream();
  938              do task::spawn || { // linked
  939                  let _ = p.recv(); // wait for sibling to get in the mutex
  940                  task::deschedule();
  941                  fail!();
  942              }
  943              do m2.lock_cond |cond| {
  944                  c.send(()); // tell sibling go ahead
  945                  cond.wait(); // block forever
  946              }
  947          };
  948          assert!(result.is_err());
  949          // child task must have finished by the time try returns
  950          do m.lock_cond |cond| {
  951              let woken = cond.signal();
  952              assert!(!woken);
  953          }
  954      }
  955      #[ignore(reason = "linked failure")]
  956      #[test]
  957      fn test_mutex_killed_broadcast() {
  958          use std::unstable::finally::Finally;
  959  
  960          let m = Mutex::new();
  961          let m2 = m.clone();
  962          let (p, c) = comm::stream();
  963  
  964          let result: result::Result<(),()> = do task::try {
  965              let mut sibling_convos = ~[];
  966              do 2.times {
  967                  let (p, c) = comm::stream();
  968                  let c = Cell::new(c);
  969                  sibling_convos.push(p);
  970                  let mi = m2.clone();
  971                  // spawn sibling task
  972                  do task::spawn { // linked
  973                      do mi.lock_cond |cond| {
  974                          let c = c.take();
  975                          c.send(()); // tell sibling to go ahead
  976                          do (|| {
  977                              cond.wait(); // block forever
  978                          }).finally {
  979                              error!("task unwinding and sending");
  980                              c.send(());
  981                              error!("task unwinding and done sending");
  982                          }
  983                      }
  984                  }
  985              }
  986              for p in sibling_convos.iter() {
  987                  let _ = p.recv(); // wait for sibling to get in the mutex
  988              }
  989              do m2.lock { }
  990              c.send(sibling_convos); // let parent wait on all children
  991              fail!();
  992          };
  993          assert!(result.is_err());
  994          // child task must have finished by the time try returns
  995          let r = p.recv();
  996          for p in r.iter() { p.recv(); } // wait on all its siblings
  997          do m.lock_cond |cond| {
  998              let woken = cond.broadcast();
  999              assert_eq!(woken, 0);
 1000          }
 1001      }
 1002      #[test]
 1003      fn test_mutex_cond_signal_on_0() {
 1004          // Tests that signal_on(0) is equivalent to signal().
 1005          let m = Mutex::new();
 1006          do m.lock_cond |cond| {
 1007              let m2 = m.clone();
 1008              do task::spawn {
 1009                  do m2.lock_cond |cond| {
 1010                      cond.signal_on(0);
 1011                  }
 1012              }
 1013              cond.wait();
 1014          }
 1015      }
 1016      #[test]
 1017      fn test_mutex_different_conds() {
 1018          let result = do task::try {
 1019              let m = Mutex::new_with_condvars(2);
 1020              let m2 = m.clone();
 1021              let (p, c) = comm::stream();
 1022              do task::spawn {
 1023                  do m2.lock_cond |cond| {
 1024                      c.send(());
 1025                      cond.wait_on(1);
 1026                  }
 1027              }
 1028              let _ = p.recv();
 1029              do m.lock_cond |cond| {
 1030                  if !cond.signal_on(0) {
 1031                      fail!(); // success; punt sibling awake.
 1032                  }
 1033              }
 1034          };
 1035          assert!(result.is_err());
 1036      }
 1037      #[test]
 1038      fn test_mutex_no_condvars() {
 1039          let result = do task::try {
 1040              let m = Mutex::new_with_condvars(0);
 1041              do m.lock_cond |cond| { cond.wait(); }
 1042          };
 1043          assert!(result.is_err());
 1044          let result = do task::try {
 1045              let m = Mutex::new_with_condvars(0);
 1046              do m.lock_cond |cond| { cond.signal(); }
 1047          };
 1048          assert!(result.is_err());
 1049          let result = do task::try {
 1050              let m = Mutex::new_with_condvars(0);
 1051              do m.lock_cond |cond| { cond.broadcast(); }
 1052          };
 1053          assert!(result.is_err());
 1054      }
 1055      /************************************************************************
 1056       * Reader/writer lock tests
 1057       ************************************************************************/
 1058      #[cfg(test)]
 1059      pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead }
 1060      #[cfg(test)]
 1061      fn lock_rwlock_in_mode(x: &RWLock, mode: RWLockMode, blk: &fn()) {
 1062          match mode {
 1063              Read => x.read(blk),
 1064              Write => x.write(blk),
 1065              Downgrade =>
 1066                  do x.write_downgrade |mode| {
 1067                      do mode.write { blk() };
 1068                  },
 1069              DowngradeRead =>
 1070                  do x.write_downgrade |mode| {
 1071                      let mode = x.downgrade(mode);
 1072                      do mode.read { blk() };
 1073                  },
 1074          }
 1075      }
 1076      #[cfg(test)]
 1077      fn test_rwlock_exclusion(x: &RWLock,
 1078                                   mode1: RWLockMode,
 1079                                   mode2: RWLockMode) {
 1080          // Test mutual exclusion between readers and writers. Just like the
 1081          // mutex mutual exclusion test, a ways above.
 1082          let (p, c) = comm::stream();
 1083          let x2 = x.clone();
 1084          let mut sharedstate = ~0;
 1085          {
 1086              let ptr: *int = &*sharedstate;
 1087              do task::spawn {
 1088                  let sharedstate: &mut int =
 1089                      unsafe { cast::transmute(ptr) };
 1090                  access_shared(sharedstate, &x2, mode1, 10);
 1091                  c.send(());
 1092              }
 1093          }
 1094          {
 1095              access_shared(sharedstate, x, mode2, 10);
 1096              let _ = p.recv();
 1097  
 1098              assert_eq!(*sharedstate, 20);
 1099          }
 1100  
 1101          fn access_shared(sharedstate: &mut int, x: &RWLock, mode: RWLockMode,
 1102                           n: uint) {
 1103              do n.times {
 1104                  do lock_rwlock_in_mode(x, mode) {
 1105                      let oldval = *sharedstate;
 1106                      task::deschedule();
 1107                      *sharedstate = oldval + 1;
 1108                  }
 1109              }
 1110          }
 1111      }
 1112      #[test]
 1113      fn test_rwlock_readers_wont_modify_the_data() {
 1114          test_rwlock_exclusion(&RWLock::new(), Read, Write);
 1115          test_rwlock_exclusion(&RWLock::new(), Write, Read);
 1116          test_rwlock_exclusion(&RWLock::new(), Read, Downgrade);
 1117          test_rwlock_exclusion(&RWLock::new(), Downgrade, Read);
 1118      }
 1119      #[test]
 1120      fn test_rwlock_writers_and_writers() {
 1121          test_rwlock_exclusion(&RWLock::new(), Write, Write);
 1122          test_rwlock_exclusion(&RWLock::new(), Write, Downgrade);
 1123          test_rwlock_exclusion(&RWLock::new(), Downgrade, Write);
 1124          test_rwlock_exclusion(&RWLock::new(), Downgrade, Downgrade);
 1125      }
 1126      #[cfg(test)]
 1127      fn test_rwlock_handshake(x: &RWLock,
 1128                                   mode1: RWLockMode,
 1129                                   mode2: RWLockMode,
 1130                                   make_mode2_go_first: bool) {
 1131          // Much like sem_multi_resource.
 1132          let x2 = x.clone();
 1133          let (p1, c1) = comm::stream();
 1134          let (p2, c2) = comm::stream();
 1135          do task::spawn {
 1136              if !make_mode2_go_first {
 1137                  let _ = p2.recv(); // parent sends to us once it locks, or ...
 1138              }
 1139              do lock_rwlock_in_mode(&x2, mode2) {
 1140                  if make_mode2_go_first {
 1141                      c1.send(()); // ... we send to it once we lock
 1142                  }
 1143                  let _ = p2.recv();
 1144                  c1.send(());
 1145              }
 1146          }
 1147          if make_mode2_go_first {
 1148              let _ = p1.recv(); // child sends to us once it locks, or ...
 1149          }
 1150          do lock_rwlock_in_mode(x, mode1) {
 1151              if !make_mode2_go_first {
 1152                  c2.send(()); // ... we send to it once we lock
 1153              }
 1154              c2.send(());
 1155              let _ = p1.recv();
 1156          }
 1157      }
 1158      #[test]
 1159      fn test_rwlock_readers_and_readers() {
 1160          test_rwlock_handshake(&RWLock::new(), Read, Read, false);
 1161          // The downgrader needs to get in before the reader gets in, otherwise
 1162          // they cannot end up reading at the same time.
 1163          test_rwlock_handshake(&RWLock::new(), DowngradeRead, Read, false);
 1164          test_rwlock_handshake(&RWLock::new(), Read, DowngradeRead, true);
 1165          // Two downgrade_reads can never both end up reading at the same time.
 1166      }
 1167      #[test]
 1168      fn test_rwlock_downgrade_unlock() {
 1169          // Tests that downgrade can unlock the lock in both modes
 1170          let x = RWLock::new();
 1171          do lock_rwlock_in_mode(&x, Downgrade) { }
 1172          test_rwlock_handshake(&x, Read, Read, false);
 1173          let y = RWLock::new();
 1174          do lock_rwlock_in_mode(&y, DowngradeRead) { }
 1175          test_rwlock_exclusion(&y, Write, Write);
 1176      }
 1177      #[test]
 1178      fn test_rwlock_read_recursive() {
 1179          let x = RWLock::new();
 1180          do x.read { do x.read { } }
 1181      }
 1182      #[test]
 1183      fn test_rwlock_cond_wait() {
 1184          // As test_mutex_cond_wait above.
 1185          let x = RWLock::new();
 1186  
 1187          // Child wakes up parent
 1188          do x.write_cond |cond| {
 1189              let x2 = x.clone();
 1190              do task::spawn {
 1191                  do x2.write_cond |cond| {
 1192                      let woken = cond.signal();
 1193                      assert!(woken);
 1194                  }
 1195              }
 1196              cond.wait();
 1197          }
 1198          // Parent wakes up child
 1199          let (port, chan) = comm::stream();
 1200          let x3 = x.clone();
 1201          do task::spawn {
 1202              do x3.write_cond |cond| {
 1203                  chan.send(());
 1204                  cond.wait();
 1205                  chan.send(());
 1206              }
 1207          }
 1208          let _ = port.recv(); // Wait until child gets in the rwlock
 1209          do x.read { } // Must be able to get in as a reader in the meantime
 1210          do x.write_cond |cond| { // Or as another writer
 1211              let woken = cond.signal();
 1212              assert!(woken);
 1213          }
 1214          let _ = port.recv(); // Wait until child wakes up
 1215          do x.read { } // Just for good measure
 1216      }
 1217      #[cfg(test)]
 1218      fn test_rwlock_cond_broadcast_helper(num_waiters: uint,
 1219                                               dg1: bool,
 1220                                               dg2: bool) {
 1221          // Much like the mutex broadcast test. Downgrade-enabled.
 1222          fn lock_cond(x: &RWLock, downgrade: bool, blk: &fn(c: &Condvar)) {
 1223              if downgrade {
 1224                  do x.write_downgrade |mode| {
 1225                      do mode.write_cond |c| { blk(c) }
 1226                  }
 1227              } else {
 1228                  do x.write_cond |c| { blk(c) }
 1229              }
 1230          }
 1231          let x = RWLock::new();
 1232          let mut ports = ~[];
 1233  
 1234          do num_waiters.times {
 1235              let xi = x.clone();
 1236              let (port, chan) = comm::stream();
 1237              ports.push(port);
 1238              do task::spawn {
 1239                  do lock_cond(&xi, dg1) |cond| {
 1240                      chan.send(());
 1241                      cond.wait();
 1242                      chan.send(());
 1243                  }
 1244              }
 1245          }
 1246  
 1247          // wait until all children get in the mutex
 1248          for port in ports.iter() { let _ = port.recv(); }
 1249          do lock_cond(&x, dg2) |cond| {
 1250              let num_woken = cond.broadcast();
 1251              assert_eq!(num_woken, num_waiters);
 1252          }
 1253          // wait until all children wake up
 1254          for port in ports.iter() { let _ = port.recv(); }
 1255      }
 1256      #[test]
 1257      fn test_rwlock_cond_broadcast() {
 1258          test_rwlock_cond_broadcast_helper(0, true, true);
 1259          test_rwlock_cond_broadcast_helper(0, true, false);
 1260          test_rwlock_cond_broadcast_helper(0, false, true);
 1261          test_rwlock_cond_broadcast_helper(0, false, false);
 1262          test_rwlock_cond_broadcast_helper(12, true, true);
 1263          test_rwlock_cond_broadcast_helper(12, true, false);
 1264          test_rwlock_cond_broadcast_helper(12, false, true);
 1265          test_rwlock_cond_broadcast_helper(12, false, false);
 1266      }
 1267      #[cfg(test)]
 1268      fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) {
 1269          // Mutex must get automatically unlocked if failed/killed within.
 1270          let x = RWLock::new();
 1271          let x2 = x.clone();
 1272  
 1273          let result: result::Result<(),()> = do task::try || {
 1274              do lock_rwlock_in_mode(&x2, mode1) {
 1275                  fail!();
 1276              }
 1277          };
 1278          assert!(result.is_err());
 1279          // child task must have finished by the time try returns
 1280          do lock_rwlock_in_mode(&x, mode2) { }
 1281      }
 1282      #[test]
 1283      fn test_rwlock_reader_killed_writer() {
 1284          rwlock_kill_helper(Read, Write);
 1285      }
 1286      #[test]
 1287      fn test_rwlock_writer_killed_reader() {
 1288          rwlock_kill_helper(Write, Read);
 1289      }
 1290      #[test]
 1291      fn test_rwlock_reader_killed_reader() {
 1292          rwlock_kill_helper(Read, Read);
 1293      }
 1294      #[test]
 1295      fn test_rwlock_writer_killed_writer() {
 1296          rwlock_kill_helper(Write, Write);
 1297      }
 1298      #[test]
 1299      fn test_rwlock_kill_downgrader() {
 1300          rwlock_kill_helper(Downgrade, Read);
 1301          rwlock_kill_helper(Read, Downgrade);
 1302          rwlock_kill_helper(Downgrade, Write);
 1303          rwlock_kill_helper(Write, Downgrade);
 1304          rwlock_kill_helper(DowngradeRead, Read);
 1305          rwlock_kill_helper(Read, DowngradeRead);
 1306          rwlock_kill_helper(DowngradeRead, Write);
 1307          rwlock_kill_helper(Write, DowngradeRead);
 1308          rwlock_kill_helper(DowngradeRead, Downgrade);
 1309          rwlock_kill_helper(DowngradeRead, Downgrade);
 1310          rwlock_kill_helper(Downgrade, DowngradeRead);
 1311          rwlock_kill_helper(Downgrade, DowngradeRead);
 1312      }
 1313      #[test] #[should_fail]
 1314      fn test_rwlock_downgrade_cant_swap() {
 1315          // Tests that you can't downgrade with a different rwlock's token.
 1316          let x = RWLock::new();
 1317          let y = RWLock::new();
 1318          do x.write_downgrade |xwrite| {
 1319              let mut xopt = Some(xwrite);
 1320              do y.write_downgrade |_ywrite| {
 1321                  y.downgrade(xopt.take_unwrap());
 1322                  error!("oops, y.downgrade(x) should have failed!");
 1323              }
 1324          }
 1325      }
 1326  }

libextra/sync.rs:37:15-37:15 -ty- definition:
#[doc(hidden)]
type SignalEnd = comm::ChanOne<()>;
references:-
42:                    tail: comm::Chan<SignalEnd> }
41: struct WaitQueue { head: comm::Port<SignalEnd>,


libextra/sync.rs:416:15-416:15 -struct- definition:
#[doc(hidden)]
struct RWLockInner {
references:-
447:     priv state:       UnsafeArc<RWLockInner>,
459:         let state = UnsafeArc::new(RWLockInner {


libextra/sync.rs:35:15-35:15 -ty- definition:
#[doc(hidden)]
type WaitEnd = comm::PortOne<()>;
references:-
76:     fn wait_end(&self) -> WaitEnd {


libextra/sync.rs:663:68-663:68 -struct- definition:
/// The "write permission" token used for rwlock.write_downgrade().
pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
references:-
634:     pub fn downgrade<'a>(&self, token: RWLockWriteMode<'a>)
602:                     blk(RWLockWriteMode { lock: self, token: NonCopyable::new() })
670: impl<'self> RWLockWriteMode<'self> {
593:     pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U {
libextra/arc.rs:
525:     token: sync::RWLockWriteMode<'self>,


libextra/sync.rs:332:53-332:53 -struct- definition:
/// A counting, blocking, bounded-waiting semaphore.
struct Semaphore { priv sem: Sem<()> }
references:-
338:     fn clone(&self) -> Semaphore {
345:     pub fn new(count: int) -> Semaphore {
163:     Just(&'self Semaphore),
336: impl Clone for Semaphore {
346:         Semaphore { sem: Sem::new(count, ()) }
445:     priv order_lock:  Semaphore,
343: impl Semaphore {
339:         Semaphore { sem: Sem((*self.sem).clone()) }


libextra/sync.rs:160:30-160:30 -enum- definition:
// inside of type parameters.
enum ReacquireOrderLock<'self> {
references:-
177:     priv order: ReacquireOrderLock<'self>,


libextra/sync.rs:376:4-376:4 -struct- definition:
 */
pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
references:-
381:     fn clone(&self) -> Mutex { Mutex { sem: Sem((*self.sem).clone()) } }
394:     pub fn new_with_condvars(num_condvars: uint) -> Mutex {
379: impl Clone for Mutex {
384: impl Mutex {
395:         Mutex { sem: Sem::new_and_signal(1, num_condvars) }
381:     fn clone(&self) -> Mutex { Mutex { sem: Sem((*self.sem).clone()) } }
386:     pub fn new() -> Mutex { Mutex::new_with_condvars(1) }
libextra/arc.rs:
161: struct MutexArcInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }


libextra/sync.rs:84:15-84:15 -struct- definition:
#[doc(hidden)]
struct SemInner<Q> {
references:-
94: struct Sem<Q>(Exclusive<SemInner<Q>>);
99:         Sem(Exclusive::new(SemInner {


libextra/sync.rs:93:15-93:15 -struct- definition:
#[doc(hidden)]
struct Sem<Q>(Exclusive<SemInner<Q>>);
references:-
147: impl Sem<~[WaitQueue]> {
377: pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
446:     priv access_lock: Sem<~[WaitQueue]>,
98:     fn new(count: int, q: Q) -> Sem<Q> {
318: impl Sem<~[WaitQueue]> {
149:         -> Sem<~[WaitQueue]> {
97: impl<Q:Send> Sem<Q> {
333: struct Semaphore { priv sem: Sem<()> }
170:     priv sem: &'self Sem<~[WaitQueue]>,


libextra/sync.rs:443:4-443:4 -struct- definition:
 */
pub struct RWLock {
references:-
667: pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
452:     pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
469:     pub fn clone(&self) -> RWLock {
458:     pub fn new_with_condvars(num_condvars: uint) -> RWLock {
470:         RWLock { order_lock:  (&(self.order_lock)).clone(),
664: pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
463:         RWLock { order_lock:  Semaphore::new(1),
450: impl RWLock {
libextra/arc.rs:
338: struct RWArcInner<T> { priv lock: RWLock, priv failed: bool, priv data: T }
518: fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {


libextra/sync.rs:305:15-305:15 -fn- definition:
#[doc(hidden)]
fn check_cvar_bounds<U>(out_of_bounds: Option<uint>, id: uint, act: &str,
references:-
230:             do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
294:             do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
269:             do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {


libextra/sync.rs:666:67-666:67 -struct- definition:
/// The "read permission" token used for rwlock.write_downgrade().
pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
references:-
683: impl<'self> RWLockReadMode<'self> {
659:         RWLockReadMode { lock: token.lock, token: NonCopyable::new() }
635:                          -> RWLockReadMode<'a> {
libextra/arc.rs:
532:     token: sync::RWLockReadMode<'self>,


libextra/sync.rs:166:74-166:74 -struct- definition:
/// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
pub struct Condvar<'self> {
references:-
323:             blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() })
532:     pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
182: impl<'self> Condvar<'self> {
321:     pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
405:     pub fn lock_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
674:     pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
677:         blk(&Condvar { sem:        &self.lock.access_lock,
565:                     blk(&Condvar { sem: cond.sem, order: opt_lock,
libextra/arc.rs:
55:     priv cond: &'self sync::Condvar<'self>


libextra/sync.rs:40:15-40:15 -struct- definition:
#[doc(hidden)]
struct WaitQueue { head: comm::Port<SignalEnd>,
references:-
87:     waiters:   WaitQueue,
318: impl Sem<~[WaitQueue]> {
377: pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
149:         -> Sem<~[WaitQueue]> {
446:     priv access_lock: Sem<~[WaitQueue]>,
170:     priv sem: &'self Sem<~[WaitQueue]>,
147: impl Sem<~[WaitQueue]> {
44: impl WaitQueue {
47:         WaitQueue { head: block_head, tail: block_tail }
45:     fn new() -> WaitQueue {