(index<- )        ./libextra/arc.rs

   1  // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  /*!
  12   * Concurrency-enabled mechanisms for sharing mutable and/or immutable state
  13   * between tasks.
  14   *
  15   * # Example
  16   *
  17   * In this example, a large vector of floats is shared between several tasks.
  18   * With simple pipes, without Arc, a copy would have to be made for each task.
  19   *
  20   * ~~~ {.rust}
  21   * extern mod std;
  22   * use extra::arc;
  23   * let numbers=vec::from_fn(100, |ind| (ind as float)*rand::random());
  24   * let shared_numbers=arc::Arc::new(numbers);
  25   *
  26   *   do 10.times {
  27   *       let (port, chan)  = stream();
  28   *       chan.send(shared_numbers.clone());
  29   *
  30   *       do spawn {
  31   *           let shared_numbers=port.recv();
  32   *           let local_numbers=shared_numbers.get();
  33   *
  34   *           // Work with the local numbers
  35   *       }
  36   *   }
  37   * ~~~
  38   */
  39  
  40  #[allow(missing_doc)];
  41  
  42  
  43  use sync;
  44  use sync::{Mutex, RWLock};
  45  
  46  use std::cast;
  47  use std::unstable::sync::UnsafeArc;
  48  use std::task;
  49  use std::borrow;
  50  
  51  /// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
  52  pub struct Condvar<'self> {
  53      priv is_mutex: bool,
  54      priv failed: &'self mut bool,
  55      priv cond: &'self sync::Condvar<'self>
  56  }
  57  
  58  impl<'self> Condvar<'self> {
  59      /// Atomically exit the associated Arc and block until a signal is sent.
  60      #[inline]
  61      pub fn wait(&self) { self.wait_on(0) }
  62  
  63      /**
  64       * Atomically exit the associated Arc and block on a specified condvar
  65       * until a signal is sent on that same condvar (as sync::cond.wait_on).
  66       *
  67       * wait() is equivalent to wait_on(0).
  68       */
  69      #[inline]
  70      pub fn wait_on(&self, condvar_iduint) {
  71          assert!(!*self.failed);
  72          self.cond.wait_on(condvar_id);
  73          // This is why we need to wrap sync::condvar.
  74          check_poison(self.is_mutex, *self.failed);
  75      }
  76  
  77      /// Wake up a blocked task. Returns false if there was no blocked task.
  78      #[inline]
  79      pub fn signal(&self) -> bool { self.signal_on(0) }
  80  
  81      /**
  82       * Wake up a blocked task on a specified condvar (as
  83       * sync::cond.signal_on). Returns false if there was no blocked task.
  84       */
  85      #[inline]
  86      pub fn signal_on(&self, condvar_iduint) -> bool {
  87          assert!(!*self.failed);
  88          self.cond.signal_on(condvar_id)
  89      }
  90  
  91      /// Wake up all blocked tasks. Returns the number of tasks woken.
  92      #[inline]
  93      pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
  94  
  95      /**
  96       * Wake up all blocked tasks on a specified condvar (as
  97       * sync::cond.broadcast_on). Returns the number of tasks woken.
  98       */
  99      #[inline]
 100      pub fn broadcast_on(&self, condvar_iduint) -> uint {
 101          assert!(!*self.failed);
 102          self.cond.broadcast_on(condvar_id)
 103      }
 104  }
 105  
 106  /****************************************************************************
 107   * Immutable Arc
 108   ****************************************************************************/
 109  
 110  /// An atomically reference counted wrapper for shared immutable state.
 111  pub struct Arc<T> { priv x: UnsafeArc<T> }
 112  
 113  
 114  /**
 115   * Access the underlying data in an atomically reference counted
 116   * wrapper.
 117   */
 118  impl<T:Freeze+Send> Arc<T> {
 119      /// Create an atomically reference counted wrapper.
 120      pub fn new(dataT) -> Arc<T> {
 121          Arc { x: UnsafeArc::new(data) }
 122      }
 123  
 124      pub fn get<'a>(&'a self) -> &'a T {
 125          unsafe { &*self.x.get_immut() }
 126      }
 127  
 128      /**
 129       * Retrieve the data back out of the Arc. This function blocks until the
 130       * reference given to it is the last existing one, and then unwrap the data
 131       * instead of destroying it.
 132       *
 133       * If multiple tasks call unwrap, all but the first will fail. Do not call
 134       * unwrap from a task that holds another reference to the same Arc; it is
 135       * guaranteed to deadlock.
 136       */
 137      pub fn unwrap(self) -> T {
 138          let Arc { x: x } = self;
 139          x.unwrap()
 140      }
 141  }
 142  
 143  impl<T:Freeze + Send> Clone for Arc<T> {
 144      /**
 145      * Duplicate an atomically reference counted wrapper.
 146      *
 147      * The resulting two `arc` objects will point to the same underlying data
 148      * object. However, one of the `arc` objects can be sent to another task,
 149      * allowing them to share the underlying data.
 150      */
 151      fn clone(&self) -> Arc<T> {
 152          Arc { x: self.x.clone() }
 153      }
 154  }
 155  
 156  /****************************************************************************
 157   * Mutex protected Arc (unsafe)
 158   ****************************************************************************/
 159  
 160  #[doc(hidden)]
 161  struct MutexArcInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
 162  
 163  /// An Arc with mutable data protected by a blocking mutex.
 164  #[no_freeze]
 165  struct MutexArc<T> { priv x: UnsafeArc<MutexArcInner<T>> }
 166  
 167  
 168  impl<T:Send> Clone for MutexArc<T> {
 169      /// Duplicate a mutex-protected Arc. See arc::clone for more details.
 170      fn clone(&self) -> MutexArc<T> {
 171          // NB: Cloning the underlying mutex is not necessary. Its reference
 172          // count would be exactly the same as the shared state's.
 173          MutexArc { x: self.x.clone() }
 174      }
 175  }
 176  
 177  impl<T:Send> MutexArc<T> {
 178      /// Create a mutex-protected Arc with the supplied data.
 179      pub fn new(user_dataT) -> MutexArc<T> {
 180          MutexArc::new_with_condvars(user_data, 1)
 181      }
 182  
 183      /**
 184       * Create a mutex-protected Arc with the supplied data and a specified number
 185       * of condvars (as sync::Mutex::new_with_condvars).
 186       */
 187      pub fn new_with_condvars(user_dataT, num_condvarsuint) -> MutexArc<T> {
 188          let data = MutexArcInner {
 189              lock: Mutex::new_with_condvars(num_condvars),
 190              failed: false, data: user_data
 191          };
 192          MutexArc { x: UnsafeArc::new(data) }
 193      }
 194  
 195      /**
 196       * Access the underlying mutable data with mutual exclusion from other
 197       * tasks. The argument closure will be run with the mutex locked; all
 198       * other tasks wishing to access the data will block until the closure
 199       * finishes running.
 200       *
 201       * The reason this function is 'unsafe' is because it is possible to
 202       * construct a circular reference among multiple Arcs by mutating the
 203       * underlying data. This creates potential for deadlock, but worse, this
 204       * will guarantee a memory leak of all involved Arcs. Using MutexArcs
 205       * inside of other Arcs is safe in absence of circular references.
 206       *
 207       * If you wish to nest MutexArcs, one strategy for ensuring safety at
 208       * runtime is to add a "nesting level counter" inside the stored data, and
 209       * when traversing the arcs, assert that they monotonically decrease.
 210       *
 211       * # Failure
 212       *
 213       * Failing while inside the Arc will unlock the Arc while unwinding, so
 214       * that other tasks won't block forever. It will also poison the Arc:
 215       * any tasks that subsequently try to access it (including those already
 216       * blocked on the mutex) will also fail immediately.
 217       */
 218      #[inline]
 219      pub unsafe fn unsafe_access<U>(&self, blk&fn(x: &mut T) -> U) -> U {
 220          let state = self.x.get();
 221          // Borrowck would complain about this if the function were
 222          // not already unsafe. See borrow_rwlock, far below.
 223          do (&(*state).lock).lock {
 224              check_poison(true, (*state).failed);
 225              let _z = PoisonOnFail(&mut (*state).failed);
 226              blk(&mut (*state).data)
 227          }
 228      }
 229  
 230      /// As unsafe_access(), but with a condvar, as sync::mutex.lock_cond().
 231      #[inline]
 232      pub unsafe fn unsafe_access_cond<'x, 'c, U>(&self,
 233                                           blk&fn(x: &'x mut T,
 234                                                    c: &'c Condvar) -> U)
 235                                           -> U {
 236          let state = self.x.get();
 237          do (&(*state).lock).lock_cond |cond| {
 238              check_poison(true, (*state).failed);
 239              let _z = PoisonOnFail(&mut (*state).failed);
 240              blk(&mut (*state).data,
 241                  &Condvar {is_mutex: true,
 242                            failed: &mut (*state).failed,
 243                            cond: cond })
 244          }
 245      }
 246  
 247      /**
 248       * Retrieves the data, blocking until all other references are dropped,
 249       * exactly as arc::unwrap.
 250       *
 251       * Will additionally fail if another task has failed while accessing the arc.
 252       */
 253      pub fn unwrap(self) -> T {
 254          let MutexArc { x: x } = self;
 255          let inner = x.unwrap();
 256          let MutexArcInner { failed: failed, data: data, _ } = inner;
 257          if failed {
 258              fail!(~"Can't unwrap poisoned MutexArc - another task failed inside!");
 259          }
 260          data
 261      }
 262  }
 263  
 264  impl<T:Freeze + Send> MutexArc<T> {
 265  
 266      /**
 267       * As unsafe_access.
 268       *
 269       * The difference between access and unsafe_access is that the former
 270       * forbids mutexes to be nested. While unsafe_access can be used on
 271       * MutexArcs without freezable interiors, this safe version of access
 272       * requires the Freeze bound, which prohibits access on MutexArcs which
 273       * might contain nested MutexArcs inside.
 274       *
 275       * The purpose of this is to offer a safe implementation of MutexArc to be
 276       * used instead of RWArc in cases where no readers are needed and sightly
 277       * better performance is required.
 278       *
 279       * Both methods have the same failure behaviour as unsafe_access and
 280       * unsafe_access_cond.
 281       */
 282      #[inline]
 283      pub fn access<U>(&self, blk&fn(x: &mut T) -> U) -> U {
 284          unsafe { self.unsafe_access(blk) }
 285      }
 286  
 287      /// As unsafe_access_cond but safe and Freeze.
 288      #[inline]
 289      pub fn access_cond<'x, 'c, U>(&self,
 290                                    blk&fn(x: &'x mut T,
 291                                             c: &'c Condvar) -> U)
 292                                    -> U {
 293          unsafe { self.unsafe_access_cond(blk) }
 294      }
 295  }
 296  
 297  // Common code for {mutex.access,rwlock.write}{,_cond}.
 298  #[inline]
 299  #[doc(hidden)]
 300  fn check_poison(is_mutexbool, failedbool) {
 301      if failed {
 302          if is_mutex {
 303              fail!("Poisoned MutexArc - another task failed inside!");
 304          } else {
 305              fail!("Poisoned rw_arc - another task failed inside!");
 306          }
 307      }
 308  }
 309  
 310  #[doc(hidden)]
 311  struct PoisonOnFail {
 312      failed: *mut bool,
 313  }
 314  
 315  impl Drop for PoisonOnFail {
 316      fn drop(&mut self) {
 317          unsafe {
 318              /* assert!(!*self.failed);
 319                 -- might be false in case of cond.wait() */
 320              if task::failing() {
 321                  *self.failed = true;
 322              }
 323          }
 324      }
 325  }
 326  
 327  fn PoisonOnFail<'r>(failed&'r mut bool) -> PoisonOnFail {
 328      PoisonOnFail {
 329          failed: failed
 330      }
 331  }
 332  
 333  /****************************************************************************
 334   * R/W lock protected Arc
 335   ****************************************************************************/
 336  
 337  #[doc(hidden)]
 338  struct RWArcInner<T> { priv lock: RWLock, priv failed: bool, priv data: T }
 339  /**
 340   * A dual-mode Arc protected by a reader-writer lock. The data can be accessed
 341   * mutably or immutably, and immutably-accessing tasks may run concurrently.
 342   *
 343   * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested.
 344   */
 345  #[no_freeze]
 346  struct RWArc<T> {
 347      priv x: UnsafeArc<RWArcInner<T>>,
 348  }
 349  
 350  impl<T:Freeze + Send> Clone for RWArc<T> {
 351      /// Duplicate a rwlock-protected Arc. See arc::clone for more details.
 352      fn clone(&self) -> RWArc<T> {
 353          RWArc { x: self.x.clone() }
 354      }
 355  
 356  }
 357  
 358  impl<T:Freeze + Send> RWArc<T> {
 359      /// Create a reader/writer Arc with the supplied data.
 360      pub fn new(user_dataT) -> RWArc<T> {
 361          RWArc::new_with_condvars(user_data, 1)
 362      }
 363  
 364      /**
 365       * Create a reader/writer Arc with the supplied data and a specified number
 366       * of condvars (as sync::RWLock::new_with_condvars).
 367       */
 368      pub fn new_with_condvars(user_dataT, num_condvarsuint) -> RWArc<T> {
 369          let data = RWArcInner {
 370              lock: RWLock::new_with_condvars(num_condvars),
 371              failed: false, data: user_data
 372          };
 373          RWArc { x: UnsafeArc::new(data), }
 374      }
 375  
 376      /**
 377       * Access the underlying data mutably. Locks the rwlock in write mode;
 378       * other readers and writers will block.
 379       *
 380       * # Failure
 381       *
 382       * Failing while inside the Arc will unlock the Arc while unwinding, so
 383       * that other tasks won't block forever. As MutexArc.access, it will also
 384       * poison the Arc, so subsequent readers and writers will both also fail.
 385       */
 386      #[inline]
 387      pub fn write<U>(&self, blk&fn(x: &mut T) -> U) -> U {
 388          unsafe {
 389              let state = self.x.get();
 390              do (*borrow_rwlock(state)).write {
 391                  check_poison(false, (*state).failed);
 392                  let _z = PoisonOnFail(&mut (*state).failed);
 393                  blk(&mut (*state).data)
 394              }
 395          }
 396      }
 397  
 398      /// As write(), but with a condvar, as sync::rwlock.write_cond().
 399      #[inline]
 400      pub fn write_cond<'x, 'c, U>(&self,
 401                                   blk&fn(x: &'x mut T, c: &'c Condvar) -> U)
 402                                   -> U {
 403          unsafe {
 404              let state = self.x.get();
 405              do (*borrow_rwlock(state)).write_cond |cond| {
 406                  check_poison(false, (*state).failed);
 407                  let _z = PoisonOnFail(&mut (*state).failed);
 408                  blk(&mut (*state).data,
 409                      &Condvar {is_mutex: false,
 410                                failed: &mut (*state).failed,
 411                                cond: cond})
 412              }
 413          }
 414      }
 415  
 416      /**
 417       * Access the underlying data immutably. May run concurrently with other
 418       * reading tasks.
 419       *
 420       * # Failure
 421       *
 422       * Failing will unlock the Arc while unwinding. However, unlike all other
 423       * access modes, this will not poison the Arc.
 424       */
 425      pub fn read<U>(&self, blk&fn(x: &T) -> U) -> U {
 426          unsafe {
 427              let state = self.x.get();
 428              do (*state).lock.read {
 429                  check_poison(false, (*state).failed);
 430                  blk(&(*state).data)
 431              }
 432          }
 433      }
 434  
 435      /**
 436       * As write(), but with the ability to atomically 'downgrade' the lock.
 437       * See sync::rwlock.write_downgrade(). The RWWriteMode token must be used
 438       * to obtain the &mut T, and can be transformed into a RWReadMode token by
 439       * calling downgrade(), after which a &T can be obtained instead.
 440       *
 441       * # Example
 442       *
 443       * ~~~ {.rust}
 444       * do arc.write_downgrade |mut write_token| {
 445       *     do write_token.write_cond |state, condvar| {
 446       *         ... exclusive access with mutable state ...
 447       *     }
 448       *     let read_token = arc.downgrade(write_token);
 449       *     do read_token.read |state| {
 450       *         ... shared access with immutable state ...
 451       *     }
 452       * }
 453       * ~~~
 454       */
 455      pub fn write_downgrade<U>(&self, blk&fn(v: RWWriteMode<T>) -> U) -> U {
 456          unsafe {
 457              let state = self.x.get();
 458              do (*borrow_rwlock(state)).write_downgrade |write_mode| {
 459                  check_poison(false, (*state).failed);
 460                  blk(RWWriteMode {
 461                      data: &mut (*state).data,
 462                      token: write_mode,
 463                      poison: PoisonOnFail(&mut (*state).failed)
 464                  })
 465              }
 466          }
 467      }
 468  
 469      /// To be called inside of the write_downgrade block.
 470      pub fn downgrade<'a>(&self, tokenRWWriteMode<'a, T>)
 471                           -> RWReadMode<'a, T> {
 472          unsafe {
 473              // The rwlock should assert that the token belongs to us for us.
 474              let state = self.x.get();
 475              let RWWriteMode {
 476                  data: data,
 477                  token: t,
 478                  poison: _poison
 479              } = token;
 480              // Let readers in
 481              let new_token = (*state).lock.downgrade(t);
 482              // Whatever region the input reference had, it will be safe to use
 483              // the same region for the output reference. (The only 'unsafe' part
 484              // of this cast is removing the mutability.)
 485              let new_data = cast::transmute_immut(data);
 486              // Downgrade ensured the token belonged to us. Just a sanity check.
 487              assert!(borrow::ref_eq(&(*state).data, new_data));
 488              // Produce new token
 489              RWReadMode {
 490                  data: new_data,
 491                  token: new_token,
 492              }
 493          }
 494      }
 495  
 496      /**
 497       * Retrieves the data, blocking until all other references are dropped,
 498       * exactly as arc::unwrap.
 499       *
 500       * Will additionally fail if another task has failed while accessing the arc
 501       * in write mode.
 502       */
 503      pub fn unwrap(self) -> T {
 504          let RWArc { x: x, _ } = self;
 505          let inner = x.unwrap();
 506          let RWArcInner { failed: failed, data: data, _ } = inner;
 507          if failed {
 508              fail!(~"Can't unwrap poisoned RWArc - another task failed inside!")
 509          }
 510          data
 511      }
 512  }
 513  
 514  // Borrowck rightly complains about immutably aliasing the rwlock in order to
 515  // lock it. This wraps the unsafety, with the justification that the 'lock'
 516  // field is never overwritten; only 'failed' and 'data'.
 517  #[doc(hidden)]
 518  fn borrow_rwlock<T:Freeze + Send>(state*mut RWArcInner<T>) -> *RWLock {
 519      unsafe { cast::transmute(&(*state).lock) }
 520  }
 521  
 522  /// The "write permission" token used for RWArc.write_downgrade().
 523  pub struct RWWriteMode<'self, T> {
 524      data: &'self mut T,
 525      token: sync::RWLockWriteMode<'self>,
 526      poison: PoisonOnFail,
 527  }
 528  
 529  /// The "read permission" token used for RWArc.write_downgrade().
 530  pub struct RWReadMode<'self, T> {
 531      data: &'self T,
 532      token: sync::RWLockReadMode<'self>,
 533  }
 534  
 535  impl<'self, T:Freeze + Send> RWWriteMode<'self, T> {
 536      /// Access the pre-downgrade RWArc in write mode.
 537      pub fn write<U>(&mut self, blk&fn(x: &mut T) -> U) -> U {
 538          match *self {
 539              RWWriteMode {
 540                  data: &ref mut data,
 541                  token: ref token,
 542                  poison: _
 543              } => {
 544                  do token.write {
 545                      blk(data)
 546                  }
 547              }
 548          }
 549      }
 550  
 551      /// Access the pre-downgrade RWArc in write mode with a condvar.
 552      pub fn write_cond<'x, 'c, U>(&mut self,
 553                                   blk&fn(x: &'x mut T, c: &'c Condvar) -> U)
 554                                   -> U {
 555          match *self {
 556              RWWriteMode {
 557                  data: &ref mut data,
 558                  token: ref token,
 559                  poison: ref poison
 560              } => {
 561                  do token.write_cond |cond| {
 562                      unsafe {
 563                          let cvar = Condvar {
 564                              is_mutex: false,
 565                              failed: &mut *poison.failed,
 566                              cond: cond
 567                          };
 568                          blk(data, &cvar)
 569                      }
 570                  }
 571              }
 572          }
 573      }
 574  }
 575  
 576  impl<'self, T:Freeze + Send> RWReadMode<'self, T> {
 577      /// Access the post-downgrade rwlock in read mode.
 578      pub fn read<U>(&self, blk&fn(x: &T) -> U) -> U {
 579          match *self {
 580              RWReadMode {
 581                  data: data,
 582                  token: ref token
 583              } => {
 584                  do token.read { blk(data) }
 585              }
 586          }
 587      }
 588  }
 589  
 590  /****************************************************************************
 591   * Tests
 592   ****************************************************************************/
 593  
 594  #[cfg(test)]
 595  mod tests {
 596  
 597      use arc::*;
 598  
 599      use std::cell::Cell;
 600      use std::comm;
 601      use std::task;
 602  
 603      #[test]
 604      fn manually_share_arc() {
 605          let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
 606          let arc_v = Arc::new(v);
 607  
 608          let (p, c) = comm::stream();
 609  
 610          do task::spawn {
 611              let arc_v: Arc<~[int]> = p.recv();
 612  
 613              let v = arc_v.get().clone();
 614              assert_eq!(v[3], 4);
 615          };
 616  
 617          c.send(arc_v.clone());
 618  
 619          assert_eq!(arc_v.get()[2], 3);
 620          assert_eq!(arc_v.get()[4], 5);
 621  
 622          info!(arc_v);
 623      }
 624  
 625      #[test]
 626      fn test_mutex_arc_condvar() {
 627          let arc = ~MutexArc::new(false);
 628          let arc2 = ~arc.clone();
 629          let (p,c) = comm::oneshot();
 630          let (c,p) = (Cell::new(c), Cell::new(p));
 631          do task::spawn || {
 632              // wait until parent gets in
 633              p.take().recv();
 634              do arc2.access_cond |state, cond| {
 635                  *state = true;
 636                  cond.signal();
 637              }
 638          }
 639  
 640          do arc.access_cond |state, cond| {
 641              c.take().send(());
 642              assert!(!*state);
 643              while !*state {
 644                  cond.wait();
 645              }
 646          }
 647      }
 648  
 649      #[test] #[should_fail]
 650      fn test_arc_condvar_poison() {
 651          let arc = ~MutexArc::new(1);
 652          let arc2 = ~arc.clone();
 653          let (p, c) = comm::stream();
 654  
 655          do task::spawn_unlinked || {
 656              let _ = p.recv();
 657              do arc2.access_cond |one, cond| {
 658                  cond.signal();
 659                  // Parent should fail when it wakes up.
 660                  assert_eq!(*one, 0);
 661              }
 662          }
 663  
 664          do arc.access_cond |one, cond| {
 665              c.send(());
 666              while *one == 1 {
 667                  cond.wait();
 668              }
 669          }
 670      }
 671  
 672      #[test] #[should_fail]
 673      fn test_mutex_arc_poison() {
 674          let arc = ~MutexArc::new(1);
 675          let arc2 = ~arc.clone();
 676          do task::try || {
 677              do arc2.access |one| {
 678                  assert_eq!(*one, 2);
 679              }
 680          };
 681          do arc.access |one| {
 682              assert_eq!(*one, 1);
 683          }
 684      }
 685  
 686      #[test] #[should_fail]
 687      pub fn test_mutex_arc_unwrap_poison() {
 688          let arc = MutexArc::new(1);
 689          let arc2 = ~(&arc).clone();
 690          let (p, c) = comm::stream();
 691          do task::spawn {
 692              do arc2.access |one| {
 693                  c.send(());
 694                  assert!(*one == 2);
 695              }
 696          }
 697          let _ = p.recv();
 698          let one = arc.unwrap();
 699          assert!(one == 1);
 700      }
 701  
 702      #[test]
 703      fn test_unsafe_mutex_arc_nested() {
 704          unsafe {
 705              // Tests nested mutexes and access
 706              // to underlaying data.
 707              let arc = ~MutexArc::new(1);
 708              let arc2 = ~MutexArc::new(*arc);
 709              do task::spawn || {
 710                  do (*arc2).unsafe_access |mutex| {
 711                      do (*mutex).access |one| {
 712                          assert!(*one == 1);
 713                      }
 714                  }
 715              };
 716          }
 717      }
 718  
 719      #[test] #[should_fail]
 720      fn test_rw_arc_poison_wr() {
 721          let arc = RWArc::new(1);
 722          let arc2 = arc.clone();
 723          do task::try {
 724              do arc2.write |one| {
 725                  assert_eq!(*one, 2);
 726              }
 727          };
 728          do arc.read |one| {
 729              assert_eq!(*one, 1);
 730          }
 731      }
 732  
 733      #[test] #[should_fail]
 734      fn test_rw_arc_poison_ww() {
 735          let arc = RWArc::new(1);
 736          let arc2 = arc.clone();
 737          do task::try {
 738              do arc2.write |one| {
 739                  assert_eq!(*one, 2);
 740              }
 741          };
 742          do arc.write |one| {
 743              assert_eq!(*one, 1);
 744          }
 745      }
 746      #[test] #[should_fail]
 747      fn test_rw_arc_poison_dw() {
 748          let arc = RWArc::new(1);
 749          let arc2 = arc.clone();
 750          do task::try {
 751              do arc2.write_downgrade |mut write_mode| {
 752                  do write_mode.write |one| {
 753                      assert_eq!(*one, 2);
 754                  }
 755              }
 756          };
 757          do arc.write |one| {
 758              assert_eq!(*one, 1);
 759          }
 760      }
 761      #[test]
 762      fn test_rw_arc_no_poison_rr() {
 763          let arc = RWArc::new(1);
 764          let arc2 = arc.clone();
 765          do task::try {
 766              do arc2.read |one| {
 767                  assert_eq!(*one, 2);
 768              }
 769          };
 770          do arc.read |one| {
 771              assert_eq!(*one, 1);
 772          }
 773      }
 774      #[test]
 775      fn test_rw_arc_no_poison_rw() {
 776          let arc = RWArc::new(1);
 777          let arc2 = arc.clone();
 778          do task::try {
 779              do arc2.read |one| {
 780                  assert_eq!(*one, 2);
 781              }
 782          };
 783          do arc.write |one| {
 784              assert_eq!(*one, 1);
 785          }
 786      }
 787      #[test]
 788      fn test_rw_arc_no_poison_dr() {
 789          let arc = RWArc::new(1);
 790          let arc2 = arc.clone();
 791          do task::try {
 792              do arc2.write_downgrade |write_mode| {
 793                  let read_mode = arc2.downgrade(write_mode);
 794                  do read_mode.read |one| {
 795                      assert_eq!(*one, 2);
 796                  }
 797              }
 798          };
 799          do arc.write |one| {
 800              assert_eq!(*one, 1);
 801          }
 802      }
 803      #[test]
 804      fn test_rw_arc() {
 805          let arc = RWArc::new(0);
 806          let arc2 = arc.clone();
 807          let (p, c) = comm::stream();
 808  
 809          do task::spawn {
 810              do arc2.write |num| {
 811                  do 10.times {
 812                      let tmp = *num;
 813                      *num = -1;
 814                      task::deschedule();
 815                      *num = tmp + 1;
 816                  }
 817                  c.send(());
 818              }
 819          }
 820  
 821          // Readers try to catch the writer in the act
 822          let mut children = ~[];
 823          do 5.times {
 824              let arc3 = arc.clone();
 825              let mut builder = task::task();
 826              builder.future_result(|r| children.push(r));
 827              do builder.spawn {
 828                  do arc3.read |num| {
 829                      assert!(*num >= 0);
 830                  }
 831              }
 832          }
 833  
 834          // Wait for children to pass their asserts
 835          for r in children.iter() {
 836              r.recv();
 837          }
 838  
 839          // Wait for writer to finish
 840          p.recv();
 841          do arc.read |num| {
 842              assert_eq!(*num, 10);
 843          }
 844      }
 845      #[test]
 846      fn test_rw_downgrade() {
 847          // (1) A downgrader gets in write mode and does cond.wait.
 848          // (2) A writer gets in write mode, sets state to 42, and does signal.
 849          // (3) Downgrader wakes, sets state to 31337.
 850          // (4) tells writer and all other readers to contend as it downgrades.
 851          // (5) Writer attempts to set state back to 42, while downgraded task
 852          //     and all reader tasks assert that it's 31337.
 853          let arc = RWArc::new(0);
 854  
 855          // Reader tasks
 856          let mut reader_convos = ~[];
 857          do 10.times {
 858              let ((rp1, rc1), (rp2, rc2)) = (comm::stream(), comm::stream());
 859              reader_convos.push((rc1, rp2));
 860              let arcn = arc.clone();
 861              do task::spawn {
 862                  rp1.recv(); // wait for downgrader to give go-ahead
 863                  do arcn.read |state| {
 864                      assert_eq!(*state, 31337);
 865                      rc2.send(());
 866                  }
 867              }
 868          }
 869  
 870          // Writer task
 871          let arc2 = arc.clone();
 872          let ((wp1, wc1), (wp2, wc2)) = (comm::stream(), comm::stream());
 873          do task::spawn || {
 874              wp1.recv();
 875              do arc2.write_cond |state, cond| {
 876                  assert_eq!(*state, 0);
 877                  *state = 42;
 878                  cond.signal();
 879              }
 880              wp1.recv();
 881              do arc2.write |state| {
 882                  // This shouldn't happen until after the downgrade read
 883                  // section, and all other readers, finish.
 884                  assert_eq!(*state, 31337);
 885                  *state = 42;
 886              }
 887              wc2.send(());
 888          }
 889  
 890          // Downgrader (us)
 891          do arc.write_downgrade |mut write_mode| {
 892              do write_mode.write_cond |state, cond| {
 893                  wc1.send(()); // send to another writer who will wake us up
 894                  while *state == 0 {
 895                      cond.wait();
 896                  }
 897                  assert_eq!(*state, 42);
 898                  *state = 31337;
 899                  // send to other readers
 900                  for &(ref rc, _) in reader_convos.iter() {
 901                      rc.send(())
 902                  }
 903              }
 904              let read_mode = arc.downgrade(write_mode);
 905              do read_mode.read |state| {
 906                  // complete handshake with other readers
 907                  for &(_, ref rp) in reader_convos.iter() {
 908                      rp.recv()
 909                  }
 910                  wc1.send(()); // tell writer to try again
 911                  assert_eq!(*state, 31337);
 912              }
 913          }
 914  
 915          wp2.recv(); // complete handshake with writer
 916      }
 917      #[cfg(test)]
 918      fn test_rw_write_cond_downgrade_read_race_helper() {
 919          // Tests that when a downgrader hands off the "reader cloud" lock
 920          // because of a contending reader, a writer can't race to get it
 921          // instead, which would result in readers_and_writers. This tests
 922          // the sync module rather than this one, but it's here because an
 923          // rwarc gives us extra shared state to help check for the race.
 924          // If you want to see this test fail, go to sync.rs and replace the
 925          // line in RWLock::write_cond() that looks like:
 926          //     "blk(&Condvar { order: opt_lock, ..*cond })"
 927          // with just "blk(cond)".
 928          let x = RWArc::new(true);
 929          let (wp, wc) = comm::stream();
 930  
 931          // writer task
 932          let xw = x.clone();
 933          do task::spawn {
 934              do xw.write_cond |state, c| {
 935                  wc.send(()); // tell downgrader it's ok to go
 936                  c.wait();
 937                  // The core of the test is here: the condvar reacquire path
 938                  // must involve order_lock, so that it cannot race with a reader
 939                  // trying to receive the "reader cloud lock hand-off".
 940                  *state = false;
 941              }
 942          }
 943  
 944          wp.recv(); // wait for writer to get in
 945  
 946          do x.write_downgrade |mut write_mode| {
 947              do write_mode.write_cond |state, c| {
 948                  assert!(*state);
 949                  // make writer contend in the cond-reacquire path
 950                  c.signal();
 951              }
 952              // make a reader task to trigger the "reader cloud lock" handoff
 953              let xr = x.clone();
 954              let (rp, rc) = comm::stream();
 955              do task::spawn {
 956                  rc.send(());
 957                  do xr.read |_state| { }
 958              }
 959              rp.recv(); // wait for reader task to exist
 960  
 961              let read_mode = x.downgrade(write_mode);
 962              do read_mode.read |state| {
 963                  // if writer mistakenly got in, make sure it mutates state
 964                  // before we assert on it
 965                  do 5.times { task::deschedule(); }
 966                  // make sure writer didn't get in.
 967                  assert!(*state);
 968              }
 969          }
 970      }
 971      #[test]
 972      fn test_rw_write_cond_downgrade_read_race() {
 973          // Ideally the above test case would have deschedule statements in it that
 974          // helped to expose the race nearly 100% of the time... but adding
 975          // deschedules in the intuitively-right locations made it even less likely,
 976          // and I wasn't sure why :( . This is a mediocre "next best" option.
 977          do 8.times { test_rw_write_cond_downgrade_read_race_helper() }
 978      }
 979  }

libextra/arc.rs:51:77-51:77 -struct- definition:
/// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
pub struct Condvar<'self> {
references:-
409:                     &Condvar {is_mutex: false,
553:                                  blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
563:                         let cvar = Condvar {
234:                                                   c: &'c Condvar) -> U)
291:                                            c: &'c Condvar) -> U)
241:                 &Condvar {is_mutex: true,
401:                                  blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
58: impl<'self> Condvar<'self> {


libextra/arc.rs:345:13-345:13 -struct- definition:
#[no_freeze]
struct RWArc<T> {
references:-
353:         RWArc { x: self.x.clone() }
352:     fn clone(&self) -> RWArc<T> {
350: impl<T:Freeze + Send> Clone for RWArc<T> {
373:         RWArc { x: UnsafeArc::new(data), }
360:     pub fn new(user_data: T) -> RWArc<T> {
504:         let RWArc { x: x, _ } = self;
368:     pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWArc<T> {
358: impl<T:Freeze + Send> RWArc<T> {
libextra/workcache.rs:
297:                               lg: RWArc<Logger>,
230:     db: RWArc<Database>,
291:                lg: RWArc<Logger>,
290:     pub fn new(db: RWArc<Database>,
296:     pub fn new_with_freshness(db: RWArc<Database>,
231:     logger: RWArc<Logger>,


libextra/arc.rs:326:1-326:1 -fn- definition:

fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
references:-
392:                 let _z = PoisonOnFail(&mut (*state).failed);
225:             let _z = PoisonOnFail(&mut (*state).failed);
239:             let _z = PoisonOnFail(&mut (*state).failed);
407:                 let _z = PoisonOnFail(&mut (*state).failed);
463:                     poison: PoisonOnFail(&mut (*state).failed)


libextra/arc.rs:337:15-337:15 -struct- definition:
#[doc(hidden)]
struct RWArcInner<T> { priv lock: RWLock, priv failed: bool, priv data: T }
references:-
347:     priv x: UnsafeArc<RWArcInner<T>>,
369:         let data = RWArcInner {
518: fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {
506:         let RWArcInner { failed: failed, data: data, _ } = inner;


libextra/arc.rs:160:15-160:15 -struct- definition:
#[doc(hidden)]
struct MutexArcInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
references:-
165: struct MutexArc<T> { priv x: UnsafeArc<MutexArcInner<T>> }
188:         let data = MutexArcInner {
256:         let MutexArcInner { failed: failed, data: data, _ } = inner;


libextra/arc.rs:299:15-299:15 -fn- definition:
#[doc(hidden)]
fn check_poison(is_mutex: bool, failed: bool) {
references:-
459:                 check_poison(false, (*state).failed);
429:                 check_poison(false, (*state).failed);
74:         check_poison(self.is_mutex, *self.failed);
391:                 check_poison(false, (*state).failed);
406:                 check_poison(false, (*state).failed);
224:             check_poison(true, (*state).failed);
238:             check_poison(true, (*state).failed);


libextra/arc.rs:517:15-517:15 -fn- definition:
#[doc(hidden)]
fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {
references:-
458:             do (*borrow_rwlock(state)).write_downgrade |write_mode| {
405:             do (*borrow_rwlock(state)).write_cond |cond| {
390:             do (*borrow_rwlock(state)).write {


libextra/arc.rs:164:13-164:13 -struct- definition:
#[no_freeze]
struct MutexArc<T> { priv x: UnsafeArc<MutexArcInner<T>> }
references:-
173:         MutexArc { x: self.x.clone() }
264: impl<T:Freeze + Send> MutexArc<T> {
179:     pub fn new(user_data: T) -> MutexArc<T> {
187:     pub fn new_with_condvars(user_data: T, num_condvars: uint) -> MutexArc<T> {
170:     fn clone(&self) -> MutexArc<T> {
177: impl<T:Send> MutexArc<T> {
168: impl<T:Send> Clone for MutexArc<T> {
254:         let MutexArc { x: x } = self;
192:         MutexArc { x: UnsafeArc::new(data) }


libextra/arc.rs:529:66-529:66 -struct- definition:
/// The "read permission" token used for RWArc.write_downgrade().
pub struct RWReadMode<'self, T> {
references:-
580:             RWReadMode {
576: impl<'self, T:Freeze + Send> RWReadMode<'self, T> {
489:             RWReadMode {
471:                          -> RWReadMode<'a, T> {


libextra/arc.rs:310:15-310:15 -struct- definition:
#[doc(hidden)]
struct PoisonOnFail {
references:-
526:     poison: PoisonOnFail,
315: impl Drop for PoisonOnFail {
328:     PoisonOnFail {
327: fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {


libextra/arc.rs:110:72-110:72 -struct- definition:
/// An atomically reference counted wrapper for shared immutable state.
pub struct Arc<T> { priv x: UnsafeArc<T> }
references:-
120:     pub fn new(data: T) -> Arc<T> {
151:     fn clone(&self) -> Arc<T> {
118: impl<T:Freeze+Send> Arc<T> {
138:         let Arc { x: x } = self;
143: impl<T:Freeze + Send> Clone for Arc<T> {
152:         Arc { x: self.x.clone() }
121:         Arc { x: UnsafeArc::new(data) }
libextra/workcache.rs:
299:                               freshness: Arc<FreshnessMap>) -> Context {
232:     cfg: Arc<json::Object>,
239:     freshness: Arc<FreshnessMap>
298:                               cfg: Arc<json::Object>,
292:                cfg: Arc<json::Object>) -> Context {


libextra/arc.rs:522:67-522:67 -struct- definition:
/// The "write permission" token used for RWArc.write_downgrade().
pub struct RWWriteMode<'self, T> {
references:-
475:             let RWWriteMode {
556:             RWWriteMode {
455:     pub fn write_downgrade<U>(&self, blk: &fn(v: RWWriteMode<T>) -> U) -> U {
535: impl<'self, T:Freeze + Send> RWWriteMode<'self, T> {
460:                 blk(RWWriteMode {
539:             RWWriteMode {
470:     pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>)