(index<- )        ./libstd/comm/shared.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  /// Shared channels
  12  ///
  13  /// This is the flavor of channels which are not necessarily optimized for any
  14  /// particular use case, but are the most general in how they are used. Shared
  15  /// channels are cloneable allowing for multiple senders.
  16  ///
  17  /// High level implementation details can be found in the comment of the parent
  18  /// module. You'll also note that the implementation of the shared and stream
  19  /// channels are quite similar, and this is no coincidence!
  20  
  21  use cmp;
  22  use int;
  23  use iter::Iterator;
  24  use kinds::Send;
  25  use ops::Drop;
  26  use option::{Some, None, Option};
  27  use owned::Box;
  28  use result::{Ok, Err, Result};
  29  use rt::local::Local;
  30  use rt::task::{Task, BlockedTask};
  31  use rt::thread::Thread;
  32  use sync::atomics;
  33  use unstable::mutex::NativeMutex;
  34  
  35  use mpsc = sync::mpsc_queue;
  36  
  37  static DISCONNECTED: int = int::MIN;
  38  static FUDGE: int = 1024;
  39  #[cfg(test)]
  40  static MAX_STEALS: int = 5;
  41  #[cfg(not(test))]
  42  static MAX_STEALS: int = 1 << 20;
  43  
  44  pub struct Packet<T> {
  45      queue: mpsc::Queue<T>,
  46      cnt: atomics::AtomicInt, // How many items are on this channel
  47      steals: int, // How many times has a port received without blocking?
  48      to_wake: atomics::AtomicUint, // Task to wake up
  49  
  50      // The number of channels which are currently using this packet.
  51      channels: atomics::AtomicInt,
  52  
  53      // See the discussion in Port::drop and the channel send methods for what
  54      // these are used for
  55      port_dropped: atomics::AtomicBool,
  56      sender_drain: atomics::AtomicInt,
  57  
  58      // this lock protects various portions of this implementation during
  59      // select()
  60      select_lock: NativeMutex,
  61  }
  62  
  63  pub enum Failure {
  64      Empty,
  65      Disconnected,
  66  }
  67  
  68  impl<T: Send> Packet<T> {
  69      // Creation of a packet *must* be followed by a call to inherit_blocker
  70      pub fn new() -> Packet<T> {
  71          let p = Packet {
  72              queue: mpsc::Queue::new(),
  73              cnt: atomics::AtomicInt::new(0),
  74              steals: 0,
  75              to_wake: atomics::AtomicUint::new(0),
  76              channels: atomics::AtomicInt::new(2),
  77              port_dropped: atomics::AtomicBool::new(false),
  78              sender_drain: atomics::AtomicInt::new(0),
  79              select_lock: unsafe { NativeMutex::new() },
  80          };
  81          // see comments in inherit_blocker about why we grab this lock
  82          unsafe { p.select_lock.lock_noguard() }
  83          return p;
  84      }
  85  
  86      // This function is used at the creation of a shared packet to inherit a
  87      // previously blocked task. This is done to prevent spurious wakeups of
  88      // tasks in select().
  89      //
  90      // This can only be called at channel-creation time
  91      pub fn inherit_blocker(&mut self, taskOption<BlockedTask>) {
  92          match task {
  93              Some(task) => {
  94                  assert_eq!(self.cnt.load(atomics::SeqCst), 0);
  95                  assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
  96                  self.to_wake.store(unsafe { task.cast_to_uint() },
  97                                     atomics::SeqCst);
  98                  self.cnt.store(-1, atomics::SeqCst);
  99  
 100                  // This store is a little sketchy. What's happening here is
 101                  // that we're transferring a blocker from a oneshot or stream
 102                  // channel to this shared channel. In doing so, we never
 103                  // spuriously wake them up and rather only wake them up at the
 104                  // appropriate time. This implementation of shared channels
 105                  // assumes that any blocking recv() will undo the increment of
 106                  // steals performed in try_recv() once the recv is complete.
 107                  // This thread that we're inheriting, however, is not in the
 108                  // middle of recv. Hence, the first time we wake them up,
 109                  // they're going to wake up from their old port, move on to the
 110                  // upgraded port, and then call the block recv() function.
 111                  //
 112                  // When calling this function, they'll find there's data
 113                  // immediately available, counting it as a steal. This in fact
 114                  // wasn't a steal because we appropriately blocked them waiting
 115                  // for data.
 116                  //
 117                  // To offset this bad increment, we initially set the steal
 118                  // count to -1. You'll find some special code in
 119                  // abort_selection() as well to ensure that this -1 steal count
 120                  // doesn't escape too far.
 121                  self.steals = -1;
 122              }
 123              None => {}
 124          }
 125  
 126          // When the shared packet is constructed, we grabbed this lock. The
 127          // purpose of this lock is to ensure that abort_selection() doesn't
 128          // interfere with this method. After we unlock this lock, we're
 129          // signifying that we're done modifying self.cnt and self.to_wake and
 130          // the port is ready for the world to continue using it.
 131          unsafe { self.select_lock.unlock_noguard() }
 132      }
 133  
 134      pub fn send(&mut self, tT) -> Result<(), T> {
 135          // See Port::drop for what's going on
 136          if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
 137  
 138          // Note that the multiple sender case is a little tricker
 139          // semantically than the single sender case. The logic for
 140          // incrementing is "add and if disconnected store disconnected".
 141          // This could end up leading some senders to believe that there
 142          // wasn't a disconnect if in fact there was a disconnect. This means
 143          // that while one thread is attempting to re-store the disconnected
 144          // states, other threads could walk through merrily incrementing
 145          // this very-negative disconnected count. To prevent senders from
 146          // spuriously attempting to send when the channels is actually
 147          // disconnected, the count has a ranged check here.
 148          //
 149          // This is also done for another reason. Remember that the return
 150          // value of this function is:
 151          //
 152          //  `true` == the data *may* be received, this essentially has no
 153          //            meaning
 154          //  `false` == the data will *never* be received, this has a lot of
 155          //             meaning
 156          //
 157          // In the SPSC case, we have a check of 'queue.is_empty()' to see
 158          // whether the data was actually received, but this same condition
 159          // means nothing in a multi-producer context. As a result, this
 160          // preflight check serves as the definitive "this will never be
 161          // received". Once we get beyond this check, we have permanently
 162          // entered the realm of "this may be received"
 163          if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE {
 164              return Err(t)
 165          }
 166  
 167          self.queue.push(t);
 168          match self.cnt.fetch_add(1, atomics::SeqCst) {
 169              -1 => {
 170                  self.take_to_wake().wake().map(|t| t.reawaken());
 171              }
 172  
 173              // In this case, we have possibly failed to send our data, and
 174              // we need to consider re-popping the data in order to fully
 175              // destroy it. We must arbitrate among the multiple senders,
 176              // however, because the queues that we're using are
 177              // single-consumer queues. In order to do this, all exiting
 178              // pushers will use an atomic count in order to count those
 179              // flowing through. Pushers who see 0 are required to drain as
 180              // much as possible, and then can only exit when they are the
 181              // only pusher (otherwise they must try again).
 182              n if n < DISCONNECTED + FUDGE => {
 183                  // see the comment in 'try' for a shared channel for why this
 184                  // window of "not disconnected" is ok.
 185                  self.cnt.store(DISCONNECTED, atomics::SeqCst);
 186  
 187                  if self.sender_drain.fetch_add(1, atomics::SeqCst) == 0 {
 188                      loop {
 189                          // drain the queue, for info on the thread yield see the
 190                          // discussion in try_recv
 191                          loop {
 192                              match self.queue.pop() {
 193                                  mpsc::Data(..) => {}
 194                                  mpsc::Empty => break,
 195                                  mpsc::Inconsistent => Thread::yield_now(),
 196                              }
 197                          }
 198                          // maybe we're done, if we're not the last ones
 199                          // here, then we need to go try again.
 200                          if self.sender_drain.fetch_sub(1, atomics::SeqCst) == 1 {
 201                              break
 202                          }
 203                      }
 204  
 205                      // At this point, there may still be data on the queue,
 206                      // but only if the count hasn't been incremented and
 207                      // some other sender hasn't finished pushing data just
 208                      // yet. That sender in question will drain its own data.
 209                  }
 210              }
 211  
 212              // Can't make any assumptions about this case like in the SPSC case.
 213              _ => {}
 214          }
 215  
 216          Ok(())
 217      }
 218  
 219      pub fn recv(&mut self) -> Result<T, Failure> {
 220          // This code is essentially the exact same as that found in the stream
 221          // case (see stream.rs)
 222          match self.try_recv() {
 223              Err(Empty) => {}
 224              data => return data,
 225          }
 226  
 227          let taskBox<Task> = Local::take();
 228          task.deschedule(1, |task| {
 229              self.decrement(task)
 230          });
 231  
 232          match self.try_recv() {
 233              data @ Ok(..) => { self.steals -= 1; data }
 234              data => data,
 235          }
 236      }
 237  
 238      // Essentially the exact same thing as the stream decrement function.
 239      fn decrement(&mut self, taskBlockedTask) -> Result<(), BlockedTask> {
 240          assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
 241          let n = unsafe { task.cast_to_uint() };
 242          self.to_wake.store(n, atomics::SeqCst);
 243  
 244          let steals = self.steals;
 245          self.steals = 0;
 246  
 247          match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
 248              DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
 249              // If we factor in our steals and notice that the channel has no
 250              // data, we successfully sleep
 251              n => {
 252                  assert!(n >= 0);
 253                  if n - steals <= 0 { return Ok(()) }
 254              }
 255          }
 256  
 257          self.to_wake.store(0, atomics::SeqCst);
 258          Err(unsafe { BlockedTask::cast_from_uint(n) })
 259      }
 260  
 261      pub fn try_recv(&mut self) -> Result<T, Failure> {
 262          let ret = match self.queue.pop() {
 263              mpsc::Data(t) => Some(t),
 264              mpsc::Empty => None,
 265  
 266              // This is a bit of an interesting case. The channel is
 267              // reported as having data available, but our pop() has
 268              // failed due to the queue being in an inconsistent state.
 269              // This means that there is some pusher somewhere which has
 270              // yet to complete, but we are guaranteed that a pop will
 271              // eventually succeed. In this case, we spin in a yield loop
 272              // because the remote sender should finish their enqueue
 273              // operation "very quickly".
 274              //
 275              // Note that this yield loop does *not* attempt to do a green
 276              // yield (regardless of the context), but *always* performs an
 277              // OS-thread yield. The reasoning for this is that the pusher in
 278              // question which is causing the inconsistent state is
 279              // guaranteed to *not* be a blocked task (green tasks can't get
 280              // pre-empted), so it must be on a different OS thread. Also,
 281              // `try_recv` is normally a "guaranteed no rescheduling" context
 282              // in a green-thread situation. By yielding control of the
 283              // thread, we will hopefully allow time for the remote task on
 284              // the other OS thread to make progress.
 285              //
 286              // Avoiding this yield loop would require a different queue
 287              // abstraction which provides the guarantee that after M
 288              // pushes have succeeded, at least M pops will succeed. The
 289              // current queues guarantee that if there are N active
 290              // pushes, you can pop N times once all N have finished.
 291              mpsc::Inconsistent => {
 292                  let data;
 293                  loop {
 294                      Thread::yield_now();
 295                      match self.queue.pop() {
 296                          mpsc::Data(t) => { data = t; break }
 297                          mpsc::Empty => fail!("inconsistent => empty"),
 298                          mpsc::Inconsistent => {}
 299                      }
 300                  }
 301                  Some(data)
 302              }
 303          };
 304          match ret {
 305              // See the discussion in the stream implementation for why we
 306              // might decrement steals.
 307              Some(data) => {
 308                  if self.steals > MAX_STEALS {
 309                      match self.cnt.swap(0, atomics::SeqCst) {
 310                          DISCONNECTED => {
 311                              self.cnt.store(DISCONNECTED, atomics::SeqCst);
 312                          }
 313                          n => {
 314                              let m = cmp::min(n, self.steals);
 315                              self.steals -= m;
 316                              self.bump(n - m);
 317                          }
 318                      }
 319                      assert!(self.steals >= 0);
 320                  }
 321                  self.steals += 1;
 322                  Ok(data)
 323              }
 324  
 325              // See the discussion in the stream implementation for why we try
 326              // again.
 327              None => {
 328                  match self.cnt.load(atomics::SeqCst) {
 329                      n if n != DISCONNECTED => Err(Empty),
 330                      _ => {
 331                          match self.queue.pop() {
 332                              mpsc::Data(t) => Ok(t),
 333                              mpsc::Empty => Err(Disconnected),
 334                              // with no senders, an inconsistency is impossible.
 335                              mpsc::Inconsistent => unreachable!(),
 336                          }
 337                      }
 338                  }
 339              }
 340          }
 341      }
 342  
 343      // Prepares this shared packet for a channel clone, essentially just bumping
 344      // a refcount.
 345      pub fn clone_chan(&mut self) {
 346          self.channels.fetch_add(1, atomics::SeqCst);
 347      }
 348  
 349      // Decrement the reference count on a channel. This is called whenever a
 350      // Chan is dropped and may end up waking up a receiver. It's the receiver's
 351      // responsibility on the other end to figure out that we've disconnected.
 352      pub fn drop_chan(&mut self) {
 353          match self.channels.fetch_sub(1, atomics::SeqCst) {
 354              1 => {}
 355              n if n > 1 => return,
 356              n => fail!("bad number of channels left {}", n),
 357          }
 358  
 359          match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
 360              -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
 361              DISCONNECTED => {}
 362              n => { assert!(n >= 0); }
 363          }
 364      }
 365  
 366      // See the long discussion inside of stream.rs for why the queue is drained,
 367      // and why it is done in this fashion.
 368      pub fn drop_port(&mut self) {
 369          self.port_dropped.store(true, atomics::SeqCst);
 370          let mut steals = self.steals;
 371          while {
 372              let cnt = self.cnt.compare_and_swap(
 373                              steals, DISCONNECTED, atomics::SeqCst);
 374              cnt != DISCONNECTED && cnt != steals
 375          } {
 376              // See the discussion in 'try_recv' for why we yield
 377              // control of this thread.
 378              loop {
 379                  match self.queue.pop() {
 380                      mpsc::Data(..) => { steals += 1; }
 381                      mpsc::Empty | mpsc::Inconsistent => break,
 382                  }
 383              }
 384          }
 385      }
 386  
 387      // Consumes ownership of the 'to_wake' field.
 388      fn take_to_wake(&mut self) -> BlockedTask {
 389          let task = self.to_wake.load(atomics::SeqCst);
 390          self.to_wake.store(0, atomics::SeqCst);
 391          assert!(task != 0);
 392          unsafe { BlockedTask::cast_from_uint(task) }
 393      }
 394  
 395      ////////////////////////////////////////////////////////////////////////////
 396      // select implementation
 397      ////////////////////////////////////////////////////////////////////////////
 398  
 399      // Helper function for select, tests whether this port can receive without
 400      // blocking (obviously not an atomic decision).
 401      //
 402      // This is different than the stream version because there's no need to peek
 403      // at the queue, we can just look at the local count.
 404      pub fn can_recv(&mut self) -> bool {
 405          let cnt = self.cnt.load(atomics::SeqCst);
 406          cnt == DISCONNECTED || cnt - self.steals > 0
 407      }
 408  
 409      // increment the count on the channel (used for selection)
 410      fn bump(&mut self, amtint) -> int {
 411          match self.cnt.fetch_add(amt, atomics::SeqCst) {
 412              DISCONNECTED => {
 413                  self.cnt.store(DISCONNECTED, atomics::SeqCst);
 414                  DISCONNECTED
 415              }
 416              n => n
 417          }
 418      }
 419  
 420      // Inserts the blocked task for selection on this port, returning it back if
 421      // the port already has data on it.
 422      //
 423      // The code here is the same as in stream.rs, except that it doesn't need to
 424      // peek at the channel to see if an upgrade is pending.
 425      pub fn start_selection(&mut self,
 426                             taskBlockedTask) -> Result<(), BlockedTask> {
 427          match self.decrement(task) {
 428              Ok(()) => Ok(()),
 429              Err(task) => {
 430                  let prev = self.bump(1);
 431                  assert!(prev == DISCONNECTED || prev >= 0);
 432                  return Err(task);
 433              }
 434          }
 435      }
 436  
 437      // Cancels a previous task waiting on this port, returning whether there's
 438      // data on the port.
 439      //
 440      // This is similar to the stream implementation (hence fewer comments), but
 441      // uses a different value for the "steals" variable.
 442      pub fn abort_selection(&mut self, _was_upgradebool) -> bool {
 443          // Before we do anything else, we bounce on this lock. The reason for
 444          // doing this is to ensure that any upgrade-in-progress is gone and
 445          // done with. Without this bounce, we can race with inherit_blocker
 446          // about looking at and dealing with to_wake. Once we have acquired the
 447          // lock, we are guaranteed that inherit_blocker is done.
 448          unsafe {
 449              let _guard = self.select_lock.lock();
 450          }
 451  
 452          // Like the stream implementation, we want to make sure that the count
 453          // on the channel goes non-negative. We don't know how negative the
 454          // stream currently is, so instead of using a steal value of 1, we load
 455          // the channel count and figure out what we should do to make it
 456          // positive.
 457          let steals = {
 458              let cnt = self.cnt.load(atomics::SeqCst);
 459              if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
 460          };
 461          let prev = self.bump(steals + 1);
 462  
 463          if prev == DISCONNECTED {
 464              assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
 465              true
 466          } else {
 467              let cur = prev + steals + 1;
 468              assert!(cur >= 0);
 469              if prev < 0 {
 470                  self.take_to_wake().trash();
 471              } else {
 472                  while self.to_wake.load(atomics::SeqCst) != 0 {
 473                      Thread::yield_now();
 474                  }
 475              }
 476              // if the number of steals is -1, it was the pre-emptive -1 steal
 477              // count from when we inherited a blocker. This is fine because
 478              // we're just going to overwrite it with a real value.
 479              assert!(self.steals == 0 || self.steals == -1);
 480              self.steals = steals;
 481              prev >= 0
 482          }
 483      }
 484  }
 485  
 486  #[unsafe_destructor]
 487  impl<T: Send> Drop for Packet<T> {
 488      fn drop(&mut self) {
 489          // Note that this load is not only an assert for correctness about
 490          // disconnection, but also a proper fence before the read of
 491          // `to_wake`, so this assert cannot be removed with also removing
 492          // the `to_wake` assert.
 493          assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
 494          assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
 495          assert_eq!(self.channels.load(atomics::SeqCst), 0);
 496      }
 497  }