(index<- )        ./libstd/comm/oneshot.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  /// Oneshot channels/ports
  12  ///
  13  /// This is the initial flavor of channels/ports used for comm module. This is
  14  /// an optimization for the one-use case of a channel. The major optimization of
  15  /// this type is to have one and exactly one allocation when the chan/port pair
  16  /// is created.
  17  ///
  18  /// Another possible optimization would be to not use an UnsafeArc box because
  19  /// in theory we know when the shared packet can be deallocated (no real need
  20  /// for the atomic reference counting), but I was having trouble how to destroy
  21  /// the data early in a drop of a Port.
  22  ///
  23  /// # Implementation
  24  ///
  25  /// Oneshots are implemented around one atomic uint variable. This variable
  26  /// indicates both the state of the port/chan but also contains any tasks
  27  /// blocked on the port. All atomic operations happen on this one word.
  28  ///
  29  /// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
  30  /// on behalf of the channel side of things (it can be mentally thought of as
  31  /// consuming the port). This upgrade is then also stored in the shared packet.
  32  /// The one caveat to consider is that when a port sees a disconnected channel
  33  /// it must check for data because there is no "data plus upgrade" state.
  34  
  35  use comm::Receiver;
  36  use kinds::Send;
  37  use mem;
  38  use ops::Drop;
  39  use option::{Some, None, Option};
  40  use owned::Box;
  41  use result::{Result, Ok, Err};
  42  use rt::local::Local;
  43  use rt::task::{Task, BlockedTask};
  44  use sync::atomics;
  45  
  46  // Various states you can find a port in.
  47  static EMPTY: uint = 0;
  48  static DATA: uint = 1;
  49  static DISCONNECTED: uint = 2;
  50  
  51  pub struct Packet<T> {
  52      // Internal state of the chan/port pair (stores the blocked task as well)
  53      state: atomics::AtomicUint,
  54      // One-shot data slot location
  55      data: Option<T>,
  56      // when used for the second time, a oneshot channel must be upgraded, and
  57      // this contains the slot for the upgrade
  58      upgrade: MyUpgrade<T>,
  59  }
  60  
  61  pub enum Failure<T> {
  62      Empty,
  63      Disconnected,
  64      Upgraded(Receiver<T>),
  65  }
  66  
  67  pub enum UpgradeResult {
  68      UpSuccess,
  69      UpDisconnected,
  70      UpWoke(BlockedTask),
  71  }
  72  
  73  pub enum SelectionResult<T> {
  74      SelCanceled(BlockedTask),
  75      SelUpgraded(BlockedTask, Receiver<T>),
  76      SelSuccess,
  77  }
  78  
  79  enum MyUpgrade<T> {
  80      NothingSent,
  81      SendUsed,
  82      GoUp(Receiver<T>),
  83  }
  84  
  85  impl<T: Send> Packet<T> {
  86      pub fn new() -> Packet<T> {
  87          Packet {
  88              data: None,
  89              upgrade: NothingSent,
  90              state: atomics::AtomicUint::new(EMPTY),
  91          }
  92      }
  93  
  94      pub fn send(&mut self, tT) -> Result<(), T> {
  95          // Sanity check
  96          match self.upgrade {
  97              NothingSent => {}
  98              _ => fail!("sending on a oneshot that's already sent on "),
  99          }
 100          assert!(self.data.is_none());
 101          self.data = Some(t);
 102          self.upgrade = SendUsed;
 103  
 104          match self.state.swap(DATA, atomics::SeqCst) {
 105              // Sent the data, no one was waiting
 106              EMPTY => Ok(()),
 107  
 108              // Couldn't send the data, the port hung up first. Return the data
 109              // back up the stack.
 110              DISCONNECTED => {
 111                  Err(self.data.take_unwrap())
 112              }
 113  
 114              // Not possible, these are one-use channels
 115              DATA => unreachable!(),
 116  
 117              // Anything else means that there was a task waiting on the other
 118              // end. We leave the 'DATA' state inside so it'll pick it up on the
 119              // other end.
 120              n => unsafe {
 121                  let t = BlockedTask::cast_from_uint(n);
 122                  t.wake().map(|t| t.reawaken());
 123                  Ok(())
 124              }
 125          }
 126      }
 127  
 128      // Just tests whether this channel has been sent on or not, this is only
 129      // safe to use from the sender.
 130      pub fn sent(&self) -> bool {
 131          match self.upgrade {
 132              NothingSent => false,
 133              _ => true,
 134          }
 135      }
 136  
 137      pub fn recv(&mut self) -> Result<T, Failure<T>> {
 138          // Attempt to not block the task (it's a little expensive). If it looks
 139          // like we're not empty, then immediately go through to `try_recv`.
 140          if self.state.load(atomics::SeqCst) == EMPTY {
 141              let tBox<Task> = Local::take();
 142              t.deschedule(1, |task| {
 143                  let n = unsafe { task.cast_to_uint() };
 144                  match self.state.compare_and_swap(EMPTY, n, atomics::SeqCst) {
 145                      // Nothing on the channel, we legitimately block
 146                      EMPTY => Ok(()),
 147  
 148                      // If there's data or it's a disconnected channel, then we
 149                      // failed the cmpxchg, so we just wake ourselves back up
 150                      DATA | DISCONNECTED => {
 151                          unsafe { Err(BlockedTask::cast_from_uint(n)) }
 152                      }
 153  
 154                      // Only one thread is allowed to sleep on this port
 155                      _ => unreachable!()
 156                  }
 157              });
 158          }
 159  
 160          self.try_recv()
 161      }
 162  
 163      pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
 164          match self.state.load(atomics::SeqCst) {
 165              EMPTY => Err(Empty),
 166  
 167              // We saw some data on the channel, but the channel can be used
 168              // again to send us an upgrade. As a result, we need to re-insert
 169              // into the channel that there's no data available (otherwise we'll
 170              // just see DATA next time). This is done as a cmpxchg because if
 171              // the state changes under our feet we'd rather just see that state
 172              // change.
 173              DATA => {
 174                  self.state.compare_and_swap(DATA, EMPTY, atomics::SeqCst);
 175                  match self.data.take() {
 176                      Some(data) => Ok(data),
 177                      None => unreachable!(),
 178                  }
 179              }
 180  
 181              // There's no guarantee that we receive before an upgrade happens,
 182              // and an upgrade flags the channel as disconnected, so when we see
 183              // this we first need to check if there's data available and *then*
 184              // we go through and process the upgrade.
 185              DISCONNECTED => {
 186                  match self.data.take() {
 187                      Some(data) => Ok(data),
 188                      None => {
 189                          match mem::replace(&mut self.upgrade, SendUsed) {
 190                              SendUsed | NothingSent => Err(Disconnected),
 191                              GoUp(upgrade) => Err(Upgraded(upgrade))
 192                          }
 193                      }
 194                  }
 195              }
 196              _ => unreachable!()
 197          }
 198      }
 199  
 200      // Returns whether the upgrade was completed. If the upgrade wasn't
 201      // completed, then the port couldn't get sent to the other half (it will
 202      // never receive it).
 203      pub fn upgrade(&mut self, upReceiver<T>) -> UpgradeResult {
 204          let prev = match self.upgrade {
 205              NothingSent => NothingSent,
 206              SendUsed => SendUsed,
 207              _ => fail!("upgrading again"),
 208          };
 209          self.upgrade = GoUp(up);
 210  
 211          match self.state.swap(DISCONNECTED, atomics::SeqCst) {
 212              // If the channel is empty or has data on it, then we're good to go.
 213              // Senders will check the data before the upgrade (in case we
 214              // plastered over the DATA state).
 215              DATA | EMPTY => UpSuccess,
 216  
 217              // If the other end is already disconnected, then we failed the
 218              // upgrade. Be sure to trash the port we were given.
 219              DISCONNECTED => { self.upgrade = prev; UpDisconnected }
 220  
 221              // If someone's waiting, we gotta wake them up
 222              n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) })
 223          }
 224      }
 225  
 226      pub fn drop_chan(&mut self) {
 227          match self.state.swap(DISCONNECTED, atomics::SeqCst) {
 228              DATA | DISCONNECTED | EMPTY => {}
 229  
 230              // If someone's waiting, we gotta wake them up
 231              n => unsafe {
 232                  let t = BlockedTask::cast_from_uint(n);
 233                  t.wake().map(|t| t.reawaken());
 234              }
 235          }
 236      }
 237  
 238      pub fn drop_port(&mut self) {
 239          match self.state.swap(DISCONNECTED, atomics::SeqCst) {
 240              // An empty channel has nothing to do, and a remotely disconnected
 241              // channel also has nothing to do b/c we're about to run the drop
 242              // glue
 243              DISCONNECTED | EMPTY => {}
 244  
 245              // There's data on the channel, so make sure we destroy it promptly.
 246              // This is why not using an arc is a little difficult (need the box
 247              // to stay valid while we take the data).
 248              DATA => { self.data.take_unwrap(); }
 249  
 250              // We're the only ones that can block on this port
 251              _ => unreachable!()
 252          }
 253      }
 254  
 255      ////////////////////////////////////////////////////////////////////////////
 256      // select implementation
 257      ////////////////////////////////////////////////////////////////////////////
 258  
 259      // If Ok, the value is whether this port has data, if Err, then the upgraded
 260      // port needs to be checked instead of this one.
 261      pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
 262          match self.state.load(atomics::SeqCst) {
 263              EMPTY => Ok(false), // Welp, we tried
 264              DATA => Ok(true),   // we have some un-acquired data
 265              DISCONNECTED if self.data.is_some() => Ok(true), // we have data
 266              DISCONNECTED => {
 267                  match mem::replace(&mut self.upgrade, SendUsed) {
 268                      // The other end sent us an upgrade, so we need to
 269                      // propagate upwards whether the upgrade can receive
 270                      // data
 271                      GoUp(upgrade) => Err(upgrade),
 272  
 273                      // If the other end disconnected without sending an
 274                      // upgrade, then we have data to receive (the channel is
 275                      // disconnected).
 276                      up => { self.upgrade = up; Ok(true) }
 277                  }
 278              }
 279              _ => unreachable!(), // we're the "one blocker"
 280          }
 281      }
 282  
 283      // Attempts to start selection on this port. This can either succeed, fail
 284      // because there is data, or fail because there is an upgrade pending.
 285      pub fn start_selection(&mut self, taskBlockedTask) -> SelectionResult<T> {
 286          let n = unsafe { task.cast_to_uint() };
 287          match self.state.compare_and_swap(EMPTY, n, atomics::SeqCst) {
 288              EMPTY => SelSuccess,
 289              DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }),
 290              DISCONNECTED if self.data.is_some() => {
 291                  SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
 292              }
 293              DISCONNECTED => {
 294                  match mem::replace(&mut self.upgrade, SendUsed) {
 295                      // The other end sent us an upgrade, so we need to
 296                      // propagate upwards whether the upgrade can receive
 297                      // data
 298                      GoUp(upgrade) => {
 299                          SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) },
 300                                      upgrade)
 301                      }
 302  
 303                      // If the other end disconnected without sending an
 304                      // upgrade, then we have data to receive (the channel is
 305                      // disconnected).
 306                      up => {
 307                          self.upgrade = up;
 308                          SelCanceled(unsafe { BlockedTask::cast_from_uint(n) })
 309                      }
 310                  }
 311              }
 312              _ => unreachable!(), // we're the "one blocker"
 313          }
 314      }
 315  
 316      // Remove a previous selecting task from this port. This ensures that the
 317      // blocked task will no longer be visible to any other threads.
 318      //
 319      // The return value indicates whether there's data on this port.
 320      pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> {
 321          let state = match self.state.load(atomics::SeqCst) {
 322              // Each of these states means that no further activity will happen
 323              // with regard to abortion selection
 324              s @ EMPTY |
 325              s @ DATA |
 326              s @ DISCONNECTED => s,
 327  
 328              // If we've got a blocked task, then use an atomic to gain ownership
 329              // of it (may fail)
 330              n => self.state.compare_and_swap(n, EMPTY, atomics::SeqCst)
 331          };
 332  
 333          // Now that we've got ownership of our state, figure out what to do
 334          // about it.
 335          match state {
 336              EMPTY => unreachable!(),
 337              // our task used for select was stolen
 338              DATA => Ok(true),
 339  
 340              // If the other end has hung up, then we have complete ownership
 341              // of the port. First, check if there was data waiting for us. This
 342              // is possible if the other end sent something and then hung up.
 343              //
 344              // We then need to check to see if there was an upgrade requested,
 345              // and if so, the upgraded port needs to have its selection aborted.
 346              DISCONNECTED => {
 347                  if self.data.is_some() {
 348                      Ok(true)
 349                  } else {
 350                      match mem::replace(&mut self.upgrade, SendUsed) {
 351                          GoUp(port) => Err(port),
 352                          _ => Ok(true),
 353                      }
 354                  }
 355              }
 356  
 357              // We woke ourselves up from select. Assert that the task should be
 358              // trashed and returne that we don't have any data.
 359              n => {
 360                  let t = unsafe { BlockedTask::cast_from_uint(n) };
 361                  t.trash();
 362                  Ok(false)
 363              }
 364          }
 365      }
 366  }
 367  
 368  #[unsafe_destructor]
 369  impl<T: Send> Drop for Packet<T> {
 370      fn drop(&mut self) {
 371          assert_eq!(self.state.load(atomics::SeqCst), DISCONNECTED);
 372      }
 373  }