(index<- )        ./libstd/comm/sync.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  /// Synchronous channels/ports
  12  ///
  13  /// This channel implementation differs significantly from the asynchronous
  14  /// implementations found next to it (oneshot/stream/share). This is an
  15  /// implementation of a synchronous, bounded buffer channel.
  16  ///
  17  /// Each channel is created with some amount of backing buffer, and sends will
  18  /// *block* until buffer space becomes available. A buffer size of 0 is valid,
  19  /// which means that every successful send is paired with a successful recv.
  20  ///
  21  /// This flavor of channels defines a new `send_opt` method for channels which
  22  /// is the method by which a message is sent but the task does not fail if it
  23  /// cannot be delivered.
  24  ///
  25  /// Another major difference is that send() will *always* return back the data
  26  /// if it couldn't be sent. This is because it is deterministically known when
  27  /// the data is received and when it is not received.
  28  ///
  29  /// Implementation-wise, it can all be summed up with "use a mutex plus some
  30  /// logic". The mutex used here is an OS native mutex, meaning that no user code
  31  /// is run inside of the mutex (to prevent context switching). This
  32  /// implementation shares almost all code for the buffered and unbuffered cases
  33  /// of a synchronous channel. There are a few branches for the unbuffered case,
  34  /// but they're mostly just relevant to blocking senders.
  35  
  36  use cast;
  37  use container::Container;
  38  use iter::Iterator;
  39  use kinds::Send;
  40  use mem;
  41  use ops::Drop;
  42  use option::{Some, None, Option};
  43  use owned::Box;
  44  use ptr::RawPtr;
  45  use result::{Result, Ok, Err};
  46  use rt::local::Local;
  47  use rt::task::{Task, BlockedTask};
  48  use sync::atomics;
  49  use ty::Unsafe;
  50  use unstable::mutex::{NativeMutex, LockGuard};
  51  use vec::Vec;
  52  
  53  pub struct Packet<T> {
  54      /// Only field outside of the mutex. Just done for kicks, but mainly because
  55      /// the other shared channel already had the code implemented
  56      channels: atomics::AtomicUint,
  57  
  58      /// The state field is protected by this mutex
  59      lock: NativeMutex,
  60      state: Unsafe<State<T>>,
  61  }
  62  
  63  struct State<T> {
  64      disconnected: bool, // Is the channel disconnected yet?
  65      queue: Queue,       // queue of senders waiting to send data
  66      blocker: Blocker,   // currently blocked task on this channel
  67      buf: Buffer<T>,     // storage for buffered messages
  68      cap: uint,          // capacity of this channel
  69  
  70      /// A curious flag used to indicate whether a sender failed or succeeded in
  71      /// blocking. This is used to transmit information back to the task that it
  72      /// must dequeue its message from the buffer because it was not received.
  73      /// This is only relevant in the 0-buffer case. This obviously cannot be
  74      /// safely constructed, but it's guaranteed to always have a valid pointer
  75      /// value.
  76      canceled: Option<&'static mut bool>,
  77  }
  78  
  79  /// Possible flavors of tasks who can be blocked on this channel.
  80  enum Blocker {
  81      BlockedSender(BlockedTask),
  82      BlockedReceiver(BlockedTask),
  83      NoneBlocked
  84  }
  85  
  86  /// Simple queue for threading tasks together. Nodes are stack-allocated, so
  87  /// this structure is not safe at all
  88  struct Queue {
  89      head: *mut Node,
  90      tail: *mut Node,
  91  }
  92  
  93  struct Node {
  94      task: Option<BlockedTask>,
  95      next: *mut Node,
  96  }
  97  
  98  /// A simple ring-buffer
  99  struct Buffer<T> {
 100      buf: Vec<Option<T>>,
 101      start: uint,
 102      size: uint,
 103  }
 104  
 105  #[deriving(Show)]
 106  pub enum Failure {
 107      Empty,
 108      Disconnected,
 109  }
 110  
 111  /// Atomically blocks the current task, placing it into `slot`, unlocking `lock`
 112  /// in the meantime. This re-locks the mutex upon returning.
 113  fn wait(slot: &mut Blocker, ffn(BlockedTask) -> Blocker,
 114          lock: &NativeMutex) {
 115      let meBox<Task> = Local::take();
 116      me.deschedule(1, |task| {
 117          match mem::replace(slot, f(task)) {
 118              NoneBlocked => {}
 119              _ => unreachable!(),
 120          }
 121          unsafe { lock.unlock_noguard(); }
 122          Ok(())
 123      });
 124      unsafe { lock.lock_noguard(); }
 125  }
 126  
 127  /// Wakes up a task, dropping the lock at the correct time
 128  fn wakeup(taskBlockedTask, guardLockGuard) {
 129      // We need to be careful to wake up the waiting task *outside* of the mutex
 130      // in case it incurs a context switch.
 131      mem::drop(guard);
 132      task.wake().map(|t| t.reawaken());
 133  }
 134  
 135  impl<T: Send> Packet<T> {
 136      pub fn new(capuint) -> Packet<T> {
 137          Packet {
 138              channels: atomics::AtomicUint::new(1),
 139              lock: unsafe { NativeMutex::new() },
 140              state: Unsafe::new(State {
 141                  disconnected: false,
 142                  blocker: NoneBlocked,
 143                  cap: cap,
 144                  canceled: None,
 145                  queue: Queue {
 146                      head: 0 as *mut Node,
 147                      tail: 0 as *mut Node,
 148                  },
 149                  buf: Buffer {
 150                      buf: Vec::from_fn(cap + if cap == 0 {1} else {0}, |_| None),
 151                      start: 0,
 152                      size: 0,
 153                  },
 154              }),
 155          }
 156      }
 157  
 158      // Locks this channel, returning a guard for the state and the mutable state
 159      // itself. Care should be taken to ensure that the state does not escape the
 160      // guard!
 161      //
 162      // Note that we're ok promoting an & reference to an &mut reference because
 163      // the lock ensures that we're the only ones in the world with a pointer to
 164      // the state.
 165      fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) {
 166          unsafe {
 167              let guard = self.lock.lock();
 168              (guard, &mut *self.state.get())
 169          }
 170      }
 171  
 172      pub fn send(&self, tT) -> Result<(), T> {
 173          let (guard, state) = self.lock();
 174  
 175          // wait for a slot to become available, and enqueue the data
 176          while !state.disconnected && state.buf.size() == state.buf.cap() {
 177              state.queue.enqueue(&self.lock);
 178          }
 179          if state.disconnected { return Err(t) }
 180          state.buf.enqueue(t);
 181  
 182          match mem::replace(&mut state.blocker, NoneBlocked) {
 183              // if our capacity is 0, then we need to wait for a receiver to be
 184              // available to take our data. After waiting, we check again to make
 185              // sure the port didn't go away in the meantime. If it did, we need
 186              // to hand back our data.
 187              NoneBlocked if state.cap == 0 => {
 188                  let mut canceled = false;
 189                  assert!(state.canceled.is_none());
 190                  state.canceled = Some(unsafe { cast::transmute(&mut canceled) });
 191                  wait(&mut state.blocker, BlockedSender, &self.lock);
 192                  if canceled {Err(state.buf.dequeue())} else {Ok(())}
 193              }
 194  
 195              // success, we buffered some data
 196              NoneBlocked => Ok(()),
 197  
 198              // success, someone's about to receive our buffered data.
 199              BlockedReceiver(task) => { wakeup(task, guard); Ok(()) }
 200  
 201              BlockedSender(..) => fail!("lolwut"),
 202          }
 203      }
 204  
 205      pub fn try_send(&self, tT) -> Result<(), super::TrySendError<T>> {
 206          let (guard, state) = self.lock();
 207          if state.disconnected {
 208              Err(super::RecvDisconnected(t))
 209          } else if state.buf.size() == state.buf.cap() {
 210              Err(super::Full(t))
 211          } else if state.cap == 0 {
 212              // With capacity 0, even though we have buffer space we can't
 213              // transfer the data unless there's a receiver waiting.
 214              match mem::replace(&mut state.blocker, NoneBlocked) {
 215                  NoneBlocked => Err(super::Full(t)),
 216                  BlockedSender(..) => unreachable!(),
 217                  BlockedReceiver(task) => {
 218                      state.buf.enqueue(t);
 219                      wakeup(task, guard);
 220                      Ok(())
 221                  }
 222              }
 223          } else {
 224              // If the buffer has some space and the capacity isn't 0, then we
 225              // just enqueue the data for later retrieval.
 226              assert!(state.buf.size() < state.buf.cap());
 227              state.buf.enqueue(t);
 228              Ok(())
 229          }
 230      }
 231  
 232      // Receives a message from this channel
 233      //
 234      // When reading this, remember that there can only ever be one receiver at
 235      // time.
 236      pub fn recv(&self) -> Result<T, ()> {
 237          let (guard, state) = self.lock();
 238  
 239          // Wait for the buffer to have something in it. No need for a while loop
 240          // because we're the only receiver.
 241          let mut waited = false;
 242          if !state.disconnected && state.buf.size() == 0 {
 243              wait(&mut state.blocker, BlockedReceiver, &self.lock);
 244              waited = true;
 245          }
 246          if state.disconnected && state.buf.size() == 0 { return Err(()) }
 247  
 248          // Pick up the data, wake up our neighbors, and carry on
 249          assert!(state.buf.size() > 0);
 250          let ret = state.buf.dequeue();
 251          self.wakeup_senders(waited, guard, state);
 252          return Ok(ret);
 253      }
 254  
 255      pub fn try_recv(&self) -> Result<T, Failure> {
 256          let (guard, state) = self.lock();
 257  
 258          // Easy cases first
 259          if state.disconnected { return Err(Disconnected) }
 260          if state.buf.size() == 0 { return Err(Empty) }
 261  
 262          // Be sure to wake up neighbors
 263          let ret = Ok(state.buf.dequeue());
 264          self.wakeup_senders(false, guard, state);
 265  
 266          return ret;
 267      }
 268  
 269      // Wake up pending senders after some data has been received
 270      //
 271      // * `waited` - flag if the receiver blocked to receive some data, or if it
 272      //              just picked up some data on the way out
 273      // * `guard` - the lock guard that is held over this channel's lock
 274      fn wakeup_senders(&self, waitedbool,
 275                        guardLockGuard,
 276                        state&mut State<T>) {
 277          let pending_sender1Option<BlockedTask> = state.queue.dequeue();
 278  
 279          // If this is a no-buffer channel (cap == 0), then if we didn't wait we
 280          // need to ACK the sender. If we waited, then the sender waking us up
 281          // was already the ACK.
 282          let pending_sender2 = if state.cap == 0 && !waited {
 283              match mem::replace(&mut state.blocker, NoneBlocked) {
 284                  NoneBlocked => None,
 285                  BlockedReceiver(..) => unreachable!(),
 286                  BlockedSender(task) => {
 287                      state.canceled.take();
 288                      Some(task)
 289                  }
 290              }
 291          } else {
 292              None
 293          };
 294          mem::drop((state, guard));
 295  
 296          // only outside of the lock do we wake up the pending tasks
 297          pending_sender1.map(|t| t.wake().map(|t| t.reawaken()));
 298          pending_sender2.map(|t| t.wake().map(|t| t.reawaken()));
 299      }
 300  
 301      // Prepares this shared packet for a channel clone, essentially just bumping
 302      // a refcount.
 303      pub fn clone_chan(&self) {
 304          self.channels.fetch_add(1, atomics::SeqCst);
 305      }
 306  
 307      pub fn drop_chan(&self) {
 308          // Only flag the channel as disconnected if we're the last channel
 309          match self.channels.fetch_sub(1, atomics::SeqCst) {
 310              1 => {}
 311              _ => return
 312          }
 313  
 314          // Not much to do other than wake up a receiver if one's there
 315          let (guard, state) = self.lock();
 316          if state.disconnected { return }
 317          state.disconnected = true;
 318          match mem::replace(&mut state.blocker, NoneBlocked) {
 319              NoneBlocked => {}
 320              BlockedSender(..) => unreachable!(),
 321              BlockedReceiver(task) => wakeup(task, guard),
 322          }
 323      }
 324  
 325      pub fn drop_port(&self) {
 326          let (guard, state) = self.lock();
 327  
 328          if state.disconnected { return }
 329          state.disconnected = true;
 330  
 331          // If the capacity is 0, then the sender may want its data back after
 332          // we're disconnected. Otherwise it's now our responsibility to destroy
 333          // the buffered data. As with many other portions of this code, this
 334          // needs to be careful to destroy the data *outside* of the lock to
 335          // prevent deadlock.
 336          let _data = if state.cap != 0 {
 337              mem::replace(&mut state.buf.buf, Vec::new())
 338          } else {
 339              Vec::new()
 340          };
 341          let mut queue = mem::replace(&mut state.queue, Queue {
 342              head: 0 as *mut Node,
 343              tail: 0 as *mut Node,
 344          });
 345  
 346          let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
 347              NoneBlocked => None,
 348              BlockedSender(task) => {
 349                  *state.canceled.take_unwrap() = true;
 350                  Some(task)
 351              }
 352              BlockedReceiver(..) => unreachable!(),
 353          };
 354          mem::drop((state, guard));
 355  
 356          loop {
 357              match queue.dequeue() {
 358                  Some(task) => { task.wake().map(|t| t.reawaken()); }
 359                  None => break,
 360              }
 361          }
 362          waiter.map(|t| t.wake().map(|t| t.reawaken()));
 363      }
 364  
 365      ////////////////////////////////////////////////////////////////////////////
 366      // select implementation
 367      ////////////////////////////////////////////////////////////////////////////
 368  
 369      // If Ok, the value is whether this port has data, if Err, then the upgraded
 370      // port needs to be checked instead of this one.
 371      pub fn can_recv(&self) -> bool {
 372          let (_g, state) = self.lock();
 373          state.disconnected || state.buf.size() > 0
 374      }
 375  
 376      // Attempts to start selection on this port. This can either succeed or fail
 377      // because there is data waiting.
 378      pub fn start_selection(&self, taskBlockedTask) -> Result<(), BlockedTask>{
 379          let (_g, state) = self.lock();
 380          if state.disconnected || state.buf.size() > 0 {
 381              Err(task)
 382          } else {
 383              match mem::replace(&mut state.blocker, BlockedReceiver(task)) {
 384                  NoneBlocked => {}
 385                  BlockedSender(..) => unreachable!(),
 386                  BlockedReceiver(..) => unreachable!(),
 387              }
 388              Ok(())
 389          }
 390      }
 391  
 392      // Remove a previous selecting task from this port. This ensures that the
 393      // blocked task will no longer be visible to any other threads.
 394      //
 395      // The return value indicates whether there's data on this port.
 396      pub fn abort_selection(&self) -> bool {
 397          let (_g, state) = self.lock();
 398          match mem::replace(&mut state.blocker, NoneBlocked) {
 399              NoneBlocked => true,
 400              BlockedSender(task) => {
 401                  state.blocker = BlockedSender(task);
 402                  true
 403              }
 404              BlockedReceiver(task) => { task.trash(); false }
 405          }
 406      }
 407  }
 408  
 409  #[unsafe_destructor]
 410  impl<T: Send> Drop for Packet<T> {
 411      fn drop(&mut self) {
 412          assert_eq!(self.channels.load(atomics::SeqCst), 0);
 413          let (_g, state) = self.lock();
 414          assert!(state.queue.dequeue().is_none());
 415          assert!(state.canceled.is_none());
 416      }
 417  }
 418  
 419  
 420  ////////////////////////////////////////////////////////////////////////////////
 421  // Buffer, a simple ring buffer backed by Vec<T>
 422  ////////////////////////////////////////////////////////////////////////////////
 423  
 424  impl<T> Buffer<T> {
 425      fn enqueue(&mut self, tT) {
 426          let pos = (self.start + self.size) % self.buf.len();
 427          self.size += 1;
 428          let prev = mem::replace(self.buf.get_mut(pos), Some(t));
 429          assert!(prev.is_none());
 430      }
 431  
 432      fn dequeue(&mut self) -> T {
 433          let start = self.start;
 434          self.size -= 1;
 435          self.start = (self.start + 1) % self.buf.len();
 436          self.buf.get_mut(start).take_unwrap()
 437      }
 438  
 439      fn size(&self) -> uint { self.size }
 440      fn cap(&self) -> uint { self.buf.len() }
 441  }
 442  
 443  ////////////////////////////////////////////////////////////////////////////////
 444  // Queue, a simple queue to enqueue tasks with (stack-allocated nodes)
 445  ////////////////////////////////////////////////////////////////////////////////
 446  
 447  impl Queue {
 448      fn enqueue(&mut self, lock&NativeMutex) {
 449          let taskBox<Task> = Local::take();
 450          let mut node = Node {
 451              task: None,
 452              next: 0 as *mut Node,
 453          };
 454          task.deschedule(1, |task| {
 455              node.task = Some(task);
 456              if self.tail.is_null() {
 457                  self.head = &mut node as *mut Node;
 458                  self.tail = &mut node as *mut Node;
 459              } else {
 460                  unsafe {
 461                      (*self.tail).next = &mut node as *mut Node;
 462                      self.tail = &mut node as *mut Node;
 463                  }
 464              }
 465              unsafe { lock.unlock_noguard(); }
 466              Ok(())
 467          });
 468          unsafe { lock.lock_noguard(); }
 469          assert!(node.next.is_null());
 470      }
 471  
 472      fn dequeue(&mut self) -> Option<BlockedTask> {
 473          if self.head.is_null() {
 474              return None
 475          }
 476          let node = self.head;
 477          self.head = unsafe { (*node).next };
 478          if self.head.is_null() {
 479              self.tail = 0 as *mut Node;
 480          }
 481          unsafe {
 482              (*node).next = 0 as *mut Node;
 483              Some((*node).task.take_unwrap())
 484          }
 485      }
 486  }


libstd/comm/sync.rs:105:18-105:18 -enum- definition:
pub enum Failure {
    Empty,
    Disconnected,
references:- 2
255:     pub fn try_recv(&self) -> Result<T, Failure> {
256:         let (guard, state) = self.lock();


libstd/comm/sync.rs:92:1-92:1 -struct- definition:
struct Node {
    task: Option<BlockedTask>,
    next: *mut Node,
references:- 15
449:         let task: Box<Task> = Local::take();
450:         let mut node = Node {
451:             task: None,
--
457:                 self.head = &mut node as *mut Node;
458:                 self.tail = &mut node as *mut Node;
459:             } else {
--
461:                     (*self.tail).next = &mut node as *mut Node;
462:                     self.tail = &mut node as *mut Node;
463:                 }
--
481:         unsafe {
482:             (*node).next = 0 as *mut Node;
483:             Some((*node).task.take_unwrap())


libstd/comm/sync.rs:87:38-87:38 -struct- definition:
/// this structure is not safe at all
struct Queue {
    head: *mut Node,
references:- 4
340:         };
341:         let mut queue = mem::replace(&mut state.queue, Queue {
342:             head: 0 as *mut Node,
--
447: impl Queue {
448:     fn enqueue(&mut self, lock: &NativeMutex) {


libstd/comm/sync.rs:62:1-62:1 -struct- definition:
struct State<T> {
    disconnected: bool, // Is the channel disconnected yet?
    queue: Queue,       // queue of senders waiting to send data
references:- 4
59:     lock: NativeMutex,
60:     state: Unsafe<State<T>>,
61: }
--
164:     // the state.
165:     fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) {
166:         unsafe {
--
275:                       guard: LockGuard,
276:                       state: &mut State<T>) {
277:         let pending_sender1: Option<BlockedTask> = state.queue.dequeue();


libstd/comm/sync.rs:79:66-79:66 -enum- definition:
/// Possible flavors of tasks who can be blocked on this channel.
enum Blocker {
    BlockedSender(BlockedTask),
references:- 3
112: /// in the meantime. This re-locks the mutex upon returning.
113: fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker,
114:         lock: &NativeMutex) {


libstd/comm/sync.rs:112:61-112:61 -fn- definition:
/// in the meantime. This re-locks the mutex upon returning.
fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker,
        lock: &NativeMutex) {
references:- 2
190:                 state.canceled = Some(unsafe { cast::transmute(&mut canceled) });
191:                 wait(&mut state.blocker, BlockedSender, &self.lock);
192:                 if canceled {Err(state.buf.dequeue())} else {Ok(())}
--
242:         if !state.disconnected && state.buf.size() == 0 {
243:             wait(&mut state.blocker, BlockedReceiver, &self.lock);
244:             waited = true;


libstd/comm/sync.rs:98:25-98:25 -struct- definition:
/// A simple ring-buffer
struct Buffer<T> {
    buf: Vec<Option<T>>,
references:- 3
66:     blocker: Blocker,   // currently blocked task on this channel
67:     buf: Buffer<T>,     // storage for buffered messages
68:     cap: uint,          // capacity of this channel
--
424: impl<T> Buffer<T> {
425:     fn enqueue(&mut self, t: T) {


libstd/comm/sync.rs:127:59-127:59 -fn- definition:
/// Wakes up a task, dropping the lock at the correct time
fn wakeup(task: BlockedTask, guard: LockGuard) {
    // We need to be careful to wake up the waiting task *outside* of the mutex
references:- 3
320:             BlockedSender(..) => unreachable!(),
321:             BlockedReceiver(task) => wakeup(task, guard),
322:         }


libstd/comm/sync.rs:52:1-52:1 -struct- definition:
pub struct Packet<T> {
    /// Only field outside of the mutex. Just done for kicks, but mainly because
    /// the other shared channel already had the code implemented
references:- 7
136:     pub fn new(cap: uint) -> Packet<T> {
137:         Packet {
138:             channels: atomics::AtomicUint::new(1),
libstd/comm/mod.rs:
647: impl<T: Send> SyncSender<T> {
648:     fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
649:         SyncSender { inner: inner, marker: marker::NoShare }
libstd/comm/sync.rs:
410: impl<T: Send> Drop for Packet<T> {
411:     fn drop(&mut self) {