(index<- )        ./libstd/comm/stream.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  /// Stream channels
  12  ///
  13  /// This is the flavor of channels which are optimized for one sender and one
  14  /// receiver. The sender will be upgraded to a shared channel if the channel is
  15  /// cloned.
  16  ///
  17  /// High level implementation details can be found in the comment of the parent
  18  /// module.
  19  
  20  use cmp;
  21  use comm::Receiver;
  22  use int;
  23  use iter::Iterator;
  24  use kinds::Send;
  25  use ops::Drop;
  26  use option::{Some, None};
  27  use owned::Box;
  28  use result::{Ok, Err, Result};
  29  use rt::local::Local;
  30  use rt::task::{Task, BlockedTask};
  31  use rt::thread::Thread;
  32  use spsc = sync::spsc_queue;
  33  use sync::atomics;
  34  
  35  static DISCONNECTED: int = int::MIN;
  36  #[cfg(test)]
  37  static MAX_STEALS: int = 5;
  38  #[cfg(not(test))]
  39  static MAX_STEALS: int = 1 << 20;
  40  
  41  pub struct Packet<T> {
  42      queue: spsc::Queue<Message<T>>, // internal queue for all message
  43  
  44      cnt: atomics::AtomicInt, // How many items are on this channel
  45      steals: int, // How many times has a port received without blocking?
  46      to_wake: atomics::AtomicUint, // Task to wake up
  47  
  48      port_dropped: atomics::AtomicBool, // flag if the channel has been destroyed.
  49  }
  50  
  51  pub enum Failure<T> {
  52      Empty,
  53      Disconnected,
  54      Upgraded(Receiver<T>),
  55  }
  56  
  57  pub enum UpgradeResult {
  58      UpSuccess,
  59      UpDisconnected,
  60      UpWoke(BlockedTask),
  61  }
  62  
  63  pub enum SelectionResult<T> {
  64      SelSuccess,
  65      SelCanceled(BlockedTask),
  66      SelUpgraded(BlockedTask, Receiver<T>),
  67  }
  68  
  69  // Any message could contain an "upgrade request" to a new shared port, so the
  70  // internal queue it's a queue of T, but rather Message<T>
  71  enum Message<T> {
  72      Data(T),
  73      GoUp(Receiver<T>),
  74  }
  75  
  76  impl<T: Send> Packet<T> {
  77      pub fn new() -> Packet<T> {
  78          Packet {
  79              queue: spsc::Queue::new(128),
  80  
  81              cnt: atomics::AtomicInt::new(0),
  82              steals: 0,
  83              to_wake: atomics::AtomicUint::new(0),
  84  
  85              port_dropped: atomics::AtomicBool::new(false),
  86          }
  87      }
  88  
  89  
  90      pub fn send(&mut self, tT) -> Result<(), T> {
  91          // If the other port has deterministically gone away, then definitely
  92          // must return the data back up the stack. Otherwise, the data is
  93          // considered as being sent.
  94          if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
  95  
  96          match self.do_send(Data(t)) {
  97              UpSuccess | UpDisconnected => {},
  98              UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
  99          }
 100          Ok(())
 101      }
 102      pub fn upgrade(&mut self, upReceiver<T>) -> UpgradeResult {
 103          // If the port has gone away, then there's no need to proceed any
 104          // further.
 105          if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected }
 106  
 107          self.do_send(GoUp(up))
 108      }
 109  
 110      fn do_send(&mut self, tMessage<T>) -> UpgradeResult {
 111          self.queue.push(t);
 112          match self.cnt.fetch_add(1, atomics::SeqCst) {
 113              // As described in the mod's doc comment, -1 == wakeup
 114              -1 => UpWoke(self.take_to_wake()),
 115              // As as described before, SPSC queues must be >= -2
 116              -2 => UpSuccess,
 117  
 118              // Be sure to preserve the disconnected state, and the return value
 119              // in this case is going to be whether our data was received or not.
 120              // This manifests itself on whether we have an empty queue or not.
 121              //
 122              // Primarily, are required to drain the queue here because the port
 123              // will never remove this data. We can only have at most one item to
 124              // drain (the port drains the rest).
 125              DISCONNECTED => {
 126                  self.cnt.store(DISCONNECTED, atomics::SeqCst);
 127                  let first = self.queue.pop();
 128                  let second = self.queue.pop();
 129                  assert!(second.is_none());
 130  
 131                  match first {
 132                      Some(..) => UpSuccess,  // we failed to send the data
 133                      None => UpDisconnected, // we successfully sent data
 134                  }
 135              }
 136  
 137              // Otherwise we just sent some data on a non-waiting queue, so just
 138              // make sure the world is sane and carry on!
 139              n => { assert!(n >= 0); UpSuccess }
 140          }
 141      }
 142  
 143      // Consumes ownership of the 'to_wake' field.
 144      fn take_to_wake(&mut self) -> BlockedTask {
 145          let task = self.to_wake.load(atomics::SeqCst);
 146          self.to_wake.store(0, atomics::SeqCst);
 147          assert!(task != 0);
 148          unsafe { BlockedTask::cast_from_uint(task) }
 149      }
 150  
 151      // Decrements the count on the channel for a sleeper, returning the sleeper
 152      // back if it shouldn't sleep. Note that this is the location where we take
 153      // steals into account.
 154      fn decrement(&mut self, taskBlockedTask) -> Result<(), BlockedTask> {
 155          assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
 156          let n = unsafe { task.cast_to_uint() };
 157          self.to_wake.store(n, atomics::SeqCst);
 158  
 159          let steals = self.steals;
 160          self.steals = 0;
 161  
 162          match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
 163              DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
 164              // If we factor in our steals and notice that the channel has no
 165              // data, we successfully sleep
 166              n => {
 167                  assert!(n >= 0);
 168                  if n - steals <= 0 { return Ok(()) }
 169              }
 170          }
 171  
 172          self.to_wake.store(0, atomics::SeqCst);
 173          Err(unsafe { BlockedTask::cast_from_uint(n) })
 174      }
 175  
 176      pub fn recv(&mut self) -> Result<T, Failure<T>> {
 177          // Optimistic preflight check (scheduling is expensive).
 178          match self.try_recv() {
 179              Err(Empty) => {}
 180              data => return data,
 181          }
 182  
 183          // Welp, our channel has no data. Deschedule the current task and
 184          // initiate the blocking protocol.
 185          let taskBox<Task> = Local::take();
 186          task.deschedule(1, |task| {
 187              self.decrement(task)
 188          });
 189  
 190          match self.try_recv() {
 191              // Messages which actually popped from the queue shouldn't count as
 192              // a steal, so offset the decrement here (we already have our
 193              // "steal" factored into the channel count above).
 194              data @ Ok(..) |
 195              data @ Err(Upgraded(..)) => {
 196                  self.steals -= 1;
 197                  data
 198              }
 199  
 200              data => data,
 201          }
 202      }
 203  
 204      pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
 205          match self.queue.pop() {
 206              // If we stole some data, record to that effect (this will be
 207              // factored into cnt later on).
 208              //
 209              // Note that we don't allow steals to grow without bound in order to
 210              // prevent eventual overflow of either steals or cnt as an overflow
 211              // would have catastrophic results. Sometimes, steals > cnt, but
 212              // other times cnt > steals, so we don't know the relation between
 213              // steals and cnt. This code path is executed only rarely, so we do
 214              // a pretty slow operation, of swapping 0 into cnt, taking steals
 215              // down as much as possible (without going negative), and then
 216              // adding back in whatever we couldn't factor into steals.
 217              Some(data) => {
 218                  if self.steals > MAX_STEALS {
 219                      match self.cnt.swap(0, atomics::SeqCst) {
 220                          DISCONNECTED => {
 221                              self.cnt.store(DISCONNECTED, atomics::SeqCst);
 222                          }
 223                          n => {
 224                              let m = cmp::min(n, self.steals);
 225                              self.steals -= m;
 226                              self.bump(n - m);
 227                          }
 228                      }
 229                      assert!(self.steals >= 0);
 230                  }
 231                  self.steals += 1;
 232                  match data {
 233                      Data(t) => Ok(t),
 234                      GoUp(up) => Err(Upgraded(up)),
 235                  }
 236              }
 237  
 238              None => {
 239                  match self.cnt.load(atomics::SeqCst) {
 240                      n if n != DISCONNECTED => Err(Empty),
 241  
 242                      // This is a little bit of a tricky case. We failed to pop
 243                      // data above, and then we have viewed that the channel is
 244                      // disconnected. In this window more data could have been
 245                      // sent on the channel. It doesn't really make sense to
 246                      // return that the channel is disconnected when there's
 247                      // actually data on it, so be extra sure there's no data by
 248                      // popping one more time.
 249                      //
 250                      // We can ignore steals because the other end is
 251                      // disconnected and we'll never need to really factor in our
 252                      // steals again.
 253                      _ => {
 254                          match self.queue.pop() {
 255                              Some(Data(t)) => Ok(t),
 256                              Some(GoUp(up)) => Err(Upgraded(up)),
 257                              None => Err(Disconnected),
 258                          }
 259                      }
 260                  }
 261              }
 262          }
 263      }
 264  
 265      pub fn drop_chan(&mut self) {
 266          // Dropping a channel is pretty simple, we just flag it as disconnected
 267          // and then wakeup a blocker if there is one.
 268          match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
 269              -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
 270              DISCONNECTED => {}
 271              n => { assert!(n >= 0); }
 272          }
 273      }
 274  
 275      pub fn drop_port(&mut self) {
 276          // Dropping a port seems like a fairly trivial thing. In theory all we
 277          // need to do is flag that we're disconnected and then everything else
 278          // can take over (we don't have anyone to wake up).
 279          //
 280          // The catch for Ports is that we want to drop the entire contents of
 281          // the queue. There are multiple reasons for having this property, the
 282          // largest of which is that if another chan is waiting in this channel
 283          // (but not received yet), then waiting on that port will cause a
 284          // deadlock.
 285          //
 286          // So if we accept that we must now destroy the entire contents of the
 287          // queue, this code may make a bit more sense. The tricky part is that
 288          // we can't let any in-flight sends go un-dropped, we have to make sure
 289          // *everything* is dropped and nothing new will come onto the channel.
 290  
 291          // The first thing we do is set a flag saying that we're done for. All
 292          // sends are gated on this flag, so we're immediately guaranteed that
 293          // there are a bounded number of active sends that we'll have to deal
 294          // with.
 295          self.port_dropped.store(true, atomics::SeqCst);
 296  
 297          // Now that we're guaranteed to deal with a bounded number of senders,
 298          // we need to drain the queue. This draining process happens atomically
 299          // with respect to the "count" of the channel. If the count is nonzero
 300          // (with steals taken into account), then there must be data on the
 301          // channel. In this case we drain everything and then try again. We will
 302          // continue to fail while active senders send data while we're dropping
 303          // data, but eventually we're guaranteed to break out of this loop
 304          // (because there is a bounded number of senders).
 305          let mut steals = self.steals;
 306          while {
 307              let cnt = self.cnt.compare_and_swap(
 308                              steals, DISCONNECTED, atomics::SeqCst);
 309              cnt != DISCONNECTED && cnt != steals
 310          } {
 311              loop {
 312                  match self.queue.pop() {
 313                      Some(..) => { steals += 1; }
 314                      None => break
 315                  }
 316              }
 317          }
 318  
 319          // At this point in time, we have gated all future senders from sending,
 320          // and we have flagged the channel as being disconnected. The senders
 321          // still have some responsibility, however, because some sends may not
 322          // complete until after we flag the disconnection. There are more
 323          // details in the sending methods that see DISCONNECTED
 324      }
 325  
 326      ////////////////////////////////////////////////////////////////////////////
 327      // select implementation
 328      ////////////////////////////////////////////////////////////////////////////
 329  
 330      // Tests to see whether this port can receive without blocking. If Ok is
 331      // returned, then that's the answer. If Err is returned, then the returned
 332      // port needs to be queried instead (an upgrade happened)
 333      pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
 334          // We peek at the queue to see if there's anything on it, and we use
 335          // this return value to determine if we should pop from the queue and
 336          // upgrade this channel immediately. If it looks like we've got an
 337          // upgrade pending, then go through the whole recv rigamarole to update
 338          // the internal state.
 339          match self.queue.peek() {
 340              Some(&GoUp(..)) => {
 341                  match self.recv() {
 342                      Err(Upgraded(port)) => Err(port),
 343                      _ => unreachable!(),
 344                  }
 345              }
 346              Some(..) => Ok(true),
 347              None => Ok(false)
 348          }
 349      }
 350  
 351      // increment the count on the channel (used for selection)
 352      fn bump(&mut self, amtint) -> int {
 353          match self.cnt.fetch_add(amt, atomics::SeqCst) {
 354              DISCONNECTED => {
 355                  self.cnt.store(DISCONNECTED, atomics::SeqCst);
 356                  DISCONNECTED
 357              }
 358              n => n
 359          }
 360      }
 361  
 362      // Attempts to start selecting on this port. Like a oneshot, this can fail
 363      // immediately because of an upgrade.
 364      pub fn start_selection(&mut self, taskBlockedTask) -> SelectionResult<T> {
 365          match self.decrement(task) {
 366              Ok(()) => SelSuccess,
 367              Err(task) => {
 368                  let ret = match self.queue.peek() {
 369                      Some(&GoUp(..)) => {
 370                          match self.queue.pop() {
 371                              Some(GoUp(port)) => SelUpgraded(task, port),
 372                              _ => unreachable!(),
 373                          }
 374                      }
 375                      Some(..) => SelCanceled(task),
 376                      None => SelCanceled(task),
 377                  };
 378                  // Undo our decrement above, and we should be guaranteed that the
 379                  // previous value is positive because we're not going to sleep
 380                  let prev = self.bump(1);
 381                  assert!(prev == DISCONNECTED || prev >= 0);
 382                  return ret;
 383              }
 384          }
 385      }
 386  
 387      // Removes a previous task from being blocked in this port
 388      pub fn abort_selection(&mut self,
 389                             was_upgradebool) -> Result<bool, Receiver<T>> {
 390          // If we're aborting selection after upgrading from a oneshot, then
 391          // we're guarantee that no one is waiting. The only way that we could
 392          // have seen the upgrade is if data was actually sent on the channel
 393          // half again. For us, this means that there is guaranteed to be data on
 394          // this channel. Furthermore, we're guaranteed that there was no
 395          // start_selection previously, so there's no need to modify `self.cnt`
 396          // at all.
 397          //
 398          // Hence, because of these invariants, we immediately return `Ok(true)`.
 399          // Note that the data may not actually be sent on the channel just yet.
 400          // The other end could have flagged the upgrade but not sent data to
 401          // this end. This is fine because we know it's a small bounded windows
 402          // of time until the data is actually sent.
 403          if was_upgrade {
 404              assert_eq!(self.steals, 0);
 405              assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
 406              return Ok(true)
 407          }
 408  
 409          // We want to make sure that the count on the channel goes non-negative,
 410          // and in the stream case we can have at most one steal, so just assume
 411          // that we had one steal.
 412          let steals = 1;
 413          let prev = self.bump(steals + 1);
 414  
 415          // If we were previously disconnected, then we know for sure that there
 416          // is no task in to_wake, so just keep going
 417          let has_data = if prev == DISCONNECTED {
 418              assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
 419              true // there is data, that data is that we're disconnected
 420          } else {
 421              let cur = prev + steals + 1;
 422              assert!(cur >= 0);
 423  
 424              // If the previous count was negative, then we just made things go
 425              // positive, hence we passed the -1 boundary and we're responsible
 426              // for removing the to_wake() field and trashing it.
 427              //
 428              // If the previous count was positive then we're in a tougher
 429              // situation. A possible race is that a sender just incremented
 430              // through -1 (meaning it's going to try to wake a task up), but it
 431              // hasn't yet read the to_wake. In order to prevent a future recv()
 432              // from waking up too early (this sender picking up the plastered
 433              // over to_wake), we spin loop here waiting for to_wake to be 0.
 434              // Note that this entire select() implementation needs an overhaul,
 435              // and this is *not* the worst part of it, so this is not done as a
 436              // final solution but rather out of necessity for now to get
 437              // something working.
 438              if prev < 0 {
 439                  self.take_to_wake().trash();
 440              } else {
 441                  while self.to_wake.load(atomics::SeqCst) != 0 {
 442                      Thread::yield_now();
 443                  }
 444              }
 445              assert_eq!(self.steals, 0);
 446              self.steals = steals;
 447  
 448              // if we were previously positive, then there's surely data to
 449              // receive
 450              prev >= 0
 451          };
 452  
 453          // Now that we've determined that this queue "has data", we peek at the
 454          // queue to see if the data is an upgrade or not. If it's an upgrade,
 455          // then we need to destroy this port and abort selection on the
 456          // upgraded port.
 457          if has_data {
 458              match self.queue.peek() {
 459                  Some(&GoUp(..)) => {
 460                      match self.queue.pop() {
 461                          Some(GoUp(port)) => Err(port),
 462                          _ => unreachable!(),
 463                      }
 464                  }
 465                  _ => Ok(true),
 466              }
 467          } else {
 468              Ok(false)
 469          }
 470      }
 471  }
 472  
 473  #[unsafe_destructor]
 474  impl<T: Send> Drop for Packet<T> {
 475      fn drop(&mut self) {
 476          // Note that this load is not only an assert for correctness about
 477          // disconnection, but also a proper fence before the read of
 478          // `to_wake`, so this assert cannot be removed with also removing
 479          // the `to_wake` assert.
 480          assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
 481          assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
 482      }
 483  }


libstd/comm/stream.rs:56:1-56:1 -enum- definition:
pub enum UpgradeResult {
    UpSuccess,
    UpDisconnected,
references:- 2
110:     fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
111:         self.queue.push(t);


libstd/comm/stream.rs:50:1-50:1 -enum- definition:
pub enum Failure<T> {
    Empty,
    Disconnected,
references:- 2
204:     pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
205:         match self.queue.pop() {


libstd/comm/stream.rs:40:1-40:1 -struct- definition:
pub struct Packet<T> {
    queue: spsc::Queue<Message<T>>, // internal queue for all message
    cnt: atomics::AtomicInt, // How many items are on this channel
references:- 5
474: impl<T: Send> Drop for Packet<T> {
475:     fn drop(&mut self) {
libstd/comm/mod.rs:
389:     Oneshot(UnsafeArc<oneshot::Packet<T>>),
390:     Stream(UnsafeArc<stream::Packet<T>>),
391:     Shared(UnsafeArc<shared::Packet<T>>),
libstd/comm/stream.rs:
77:     pub fn new() -> Packet<T> {
78:         Packet {
79:             queue: spsc::Queue::new(128),


libstd/comm/stream.rs:70:59-70:59 -enum- definition:
// internal queue it's a queue of T, but rather Message<T>
enum Message<T> {
    Data(T),
references:- 2
41: pub struct Packet<T> {
42:     queue: spsc::Queue<Message<T>>, // internal queue for all message
--
110:     fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
111:         self.queue.push(t);