(index<- )        ./libstd/comm/mod.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
    1  // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
    2  // file at the top-level directory of this distribution and at
    3  // http://rust-lang.org/COPYRIGHT.
    4  //
    5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
    6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
    7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
    8  // option. This file may not be copied, modified, or distributed
    9  // except according to those terms.
   10  
   11  //! Communication primitives for concurrent tasks
   12  //!
   13  //! Rust makes it very difficult to share data among tasks to prevent race
   14  //! conditions and to improve parallelism, but there is often a need for
   15  //! communication between concurrent tasks. The primitives defined in this
   16  //! module are the building blocks for synchronization in rust.
   17  //!
   18  //! This module provides message-based communication over channels, concretely
   19  //! defined among three types:
   20  //!
   21  //! * `Sender`
   22  //! * `SyncSender`
   23  //! * `Receiver`
   24  //!
   25  //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
   26  //! senders are clone-able such that many tasks can send simultaneously to one
   27  //! receiver.  These channels are *task blocking*, not *thread blocking*. This
   28  //! means that if one task is blocked on a channel, other tasks can continue to
   29  //! make progress.
   30  //!
   31  //! Rust channels come in one of two flavors:
   32  //!
   33  //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
   34  //!    will return a `(Sender, Receiver)` tuple where all sends will be
   35  //!    **asynchronous** (they never block). The channel conceptually has an
   36  //!    infinite buffer.
   37  //!
   38  //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
   39  //!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
   40  //!    is a pre-allocated buffer of a fixed size. All sends will be
   41  //!    **synchronous** by blocking until there is buffer space available. Note
   42  //!    that a bound of 0 is allowed, causing the channel to become a
   43  //!    "rendezvous" channel where each sender atomically hands off a message to
   44  //!    a receiver.
   45  //!
   46  //! ## Failure Propagation
   47  //!
   48  //! In addition to being a core primitive for communicating in rust, channels
   49  //! are the points at which failure is propagated among tasks.  Whenever the one
   50  //! half of channel is closed, the other half will have its next operation
   51  //! `fail!`. The purpose of this is to allow propagation of failure among tasks
   52  //! that are linked to one another via channels.
   53  //!
   54  //! There are methods on both of senders and receivers to perform their
   55  //! respective operations without failing, however.
   56  //!
   57  //! ## Runtime Requirements
   58  //!
   59  //! The channel types defined in this module generally have very few runtime
   60  //! requirements in order to operate. The major requirement they have is for a
   61  //! local rust `Task` to be available if any *blocking* operation is performed.
   62  //!
   63  //! If a local `Task` is not available (for example an FFI callback), then the
   64  //! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as
   65  //! the `try_send` method on a `SyncSender`, but no other operations are
   66  //! guaranteed to be safe.
   67  //!
   68  //! Additionally, channels can interoperate between runtimes. If one task in a
   69  //! program is running on libnative and another is running on libgreen, they can
   70  //! still communicate with one another using channels.
   71  //!
   72  //! # Example
   73  //!
   74  //! Simple usage:
   75  //!
   76  //! ```
   77  //! // Create a simple streaming channel
   78  //! let (tx, rx) = channel();
   79  //! spawn(proc() {
   80  //!     tx.send(10);
   81  //! });
   82  //! assert_eq!(rx.recv(), 10);
   83  //! ```
   84  //!
   85  //! Shared usage:
   86  //!
   87  //! ```
   88  //! // Create a shared channel which can be sent along from many tasks
   89  //! let (tx, rx) = channel();
   90  //! for i in range(0, 10) {
   91  //!     let tx = tx.clone();
   92  //!     spawn(proc() {
   93  //!         tx.send(i);
   94  //!     })
   95  //! }
   96  //!
   97  //! for _ in range(0, 10) {
   98  //!     let j = rx.recv();
   99  //!     assert!(0 <= j && j < 10);
  100  //! }
  101  //! ```
  102  //!
  103  //! Propagating failure:
  104  //!
  105  //! ```should_fail
  106  //! // The call to recv() will fail!() because the channel has already hung
  107  //! // up (or been deallocated)
  108  //! let (tx, rx) = channel::<int>();
  109  //! drop(tx);
  110  //! rx.recv();
  111  //! ```
  112  //!
  113  //! Synchronous channels:
  114  //!
  115  //! ```
  116  //! let (tx, rx) = sync_channel(0);
  117  //! spawn(proc() {
  118  //!     // This will wait for the parent task to start receiving
  119  //!     tx.send(53);
  120  //! });
  121  //! rx.recv();
  122  //! ```
  123  
  124  // A description of how Rust's channel implementation works
  125  //
  126  // Channels are supposed to be the basic building block for all other
  127  // concurrent primitives that are used in Rust. As a result, the channel type
  128  // needs to be highly optimized, flexible, and broad enough for use everywhere.
  129  //
  130  // The choice of implementation of all channels is to be built on lock-free data
  131  // structures. The channels themselves are then consequently also lock-free data
  132  // structures. As always with lock-free code, this is a very "here be dragons"
  133  // territory, especially because I'm unaware of any academic papers which have
  134  // gone into great length about channels of these flavors.
  135  //
  136  // ## Flavors of channels
  137  //
  138  // From the perspective of a consumer of this library, there is only one flavor
  139  // of channel. This channel can be used as a stream and cloned to allow multiple
  140  // senders. Under the hood, however, there are actually three flavors of
  141  // channels in play.
  142  //
  143  // * Oneshots - these channels are highly optimized for the one-send use case.
  144  //              They contain as few atomics as possible and involve one and
  145  //              exactly one allocation.
  146  // * Streams - these channels are optimized for the non-shared use case. They
  147  //             use a different concurrent queue which is more tailored for this
  148  //             use case. The initial allocation of this flavor of channel is not
  149  //             optimized.
  150  // * Shared - this is the most general form of channel that this module offers,
  151  //            a channel with multiple senders. This type is as optimized as it
  152  //            can be, but the previous two types mentioned are much faster for
  153  //            their use-cases.
  154  //
  155  // ## Concurrent queues
  156  //
  157  // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
  158  // recv() obviously blocks. This means that under the hood there must be some
  159  // shared and concurrent queue holding all of the actual data.
  160  //
  161  // With two flavors of channels, two flavors of queues are also used. We have
  162  // chosen to use queues from a well-known author which are abbreviated as SPSC
  163  // and MPSC (single producer, single consumer and multiple producer, single
  164  // consumer). SPSC queues are used for streams while MPSC queues are used for
  165  // shared channels.
  166  //
  167  // ### SPSC optimizations
  168  //
  169  // The SPSC queue found online is essentially a linked list of nodes where one
  170  // half of the nodes are the "queue of data" and the other half of nodes are a
  171  // cache of unused nodes. The unused nodes are used such that an allocation is
  172  // not required on every push() and a free doesn't need to happen on every
  173  // pop().
  174  //
  175  // As found online, however, the cache of nodes is of an infinite size. This
  176  // means that if a channel at one point in its life had 50k items in the queue,
  177  // then the queue will always have the capacity for 50k items. I believed that
  178  // this was an unnecessary limitation of the implementation, so I have altered
  179  // the queue to optionally have a bound on the cache size.
  180  //
  181  // By default, streams will have an unbounded SPSC queue with a small-ish cache
  182  // size. The hope is that the cache is still large enough to have very fast
  183  // send() operations while not too large such that millions of channels can
  184  // coexist at once.
  185  //
  186  // ### MPSC optimizations
  187  //
  188  // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
  189  // a linked list under the hood to earn its unboundedness, but I have not put
  190  // forth much effort into having a cache of nodes similar to the SPSC queue.
  191  //
  192  // For now, I believe that this is "ok" because shared channels are not the most
  193  // common type, but soon we may wish to revisit this queue choice and determine
  194  // another candidate for backend storage of shared channels.
  195  //
  196  // ## Overview of the Implementation
  197  //
  198  // Now that there's a little background on the concurrent queues used, it's
  199  // worth going into much more detail about the channels themselves. The basic
  200  // pseudocode for a send/recv are:
  201  //
  202  //
  203  //      send(t)                             recv()
  204  //        queue.push(t)                       return if queue.pop()
  205  //        if increment() == -1                deschedule {
  206  //          wakeup()                            if decrement() > 0
  207  //                                                cancel_deschedule()
  208  //                                            }
  209  //                                            queue.pop()
  210  //
  211  // As mentioned before, there are no locks in this implementation, only atomic
  212  // instructions are used.
  213  //
  214  // ### The internal atomic counter
  215  //
  216  // Every channel has a shared counter with each half to keep track of the size
  217  // of the queue. This counter is used to abort descheduling by the receiver and
  218  // to know when to wake up on the sending side.
  219  //
  220  // As seen in the pseudocode, senders will increment this count and receivers
  221  // will decrement the count. The theory behind this is that if a sender sees a
  222  // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
  223  // then it doesn't need to block.
  224  //
  225  // The recv() method has a beginning call to pop(), and if successful, it needs
  226  // to decrement the count. It is a crucial implementation detail that this
  227  // decrement does *not* happen to the shared counter. If this were the case,
  228  // then it would be possible for the counter to be very negative when there were
  229  // no receivers waiting, in which case the senders would have to determine when
  230  // it was actually appropriate to wake up a receiver.
  231  //
  232  // Instead, the "steal count" is kept track of separately (not atomically
  233  // because it's only used by receivers), and then the decrement() call when
  234  // descheduling will lump in all of the recent steals into one large decrement.
  235  //
  236  // The implication of this is that if a sender sees a -1 count, then there's
  237  // guaranteed to be a waiter waiting!
  238  //
  239  // ## Native Implementation
  240  //
  241  // A major goal of these channels is to work seamlessly on and off the runtime.
  242  // All of the previous race conditions have been worded in terms of
  243  // scheduler-isms (which is obviously not available without the runtime).
  244  //
  245  // For now, native usage of channels (off the runtime) will fall back onto
  246  // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
  247  // is still entirely lock-free, the "deschedule" blocks above are surrounded by
  248  // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
  249  // condition variable.
  250  //
  251  // ## Select
  252  //
  253  // Being able to support selection over channels has greatly influenced this
  254  // design, and not only does selection need to work inside the runtime, but also
  255  // outside the runtime.
  256  //
  257  // The implementation is fairly straightforward. The goal of select() is not to
  258  // return some data, but only to return which channel can receive data without
  259  // blocking. The implementation is essentially the entire blocking procedure
  260  // followed by an increment as soon as its woken up. The cancellation procedure
  261  // involves an increment and swapping out of to_wake to acquire ownership of the
  262  // task to unblock.
  263  //
  264  // Sadly this current implementation requires multiple allocations, so I have
  265  // seen the throughput of select() be much worse than it should be. I do not
  266  // believe that there is anything fundamental which needs to change about these
  267  // channels, however, in order to support a more efficient select().
  268  //
  269  // # Conclusion
  270  //
  271  // And now that you've seen all the races that I found and attempted to fix,
  272  // here's the code for you to find some more!
  273  
  274  use cell::Cell;
  275  use clone::Clone;
  276  use iter::Iterator;
  277  use kinds::Send;
  278  use kinds::marker;
  279  use mem;
  280  use ops::Drop;
  281  use option::{Some, None, Option};
  282  use owned::Box;
  283  use result::{Ok, Err, Result};
  284  use rt::local::Local;
  285  use rt::task::{Task, BlockedTask};
  286  use sync::arc::UnsafeArc;
  287  use ty::Unsafe;
  288  
  289  pub use comm::select::{Select, Handle};
  290  
  291  macro_rules! test (
  292      { fn $name:ident() $b:block $(#[$a:meta])*} => (
  293          mod $name {
  294              #![allow(unused_imports)]
  295  
  296              use native;
  297              use comm::*;
  298              use prelude::*;
  299              use super::*;
  300              use super::super::*;
  301              use owned::Box;
  302              use task;
  303  
  304              fn f() $b
  305  
  306              $(#[$a])* #[test] fn uv() { f() }
  307              $(#[$a])* #[test] fn native() {
  308                  use native;
  309                  let (tx, rx) = channel();
  310                  native::task::spawn(proc() { tx.send(f()) });
  311                  rx.recv();
  312              }
  313          }
  314      )
  315  )
  316  
  317  mod select;
  318  mod oneshot;
  319  mod stream;
  320  mod shared;
  321  mod sync;
  322  
  323  // Use a power of 2 to allow LLVM to optimize to something that's not a
  324  // division, this is hit pretty regularly.
  325  static RESCHED_FREQ: int = 256;
  326  
  327  /// The receiving-half of Rust's channel type. This half can only be owned by
  328  /// one task
  329  pub struct Receiver<T> {
  330      inner: Unsafe<Flavor<T>>,
  331      receives: Cell<uint>,
  332      // can't share in an arc
  333      marker: marker::NoShare,
  334  }
  335  
  336  /// An iterator over messages on a receiver, this iterator will block
  337  /// whenever `next` is called, waiting for a new message, and `None` will be
  338  /// returned when the corresponding channel has hung up.
  339  pub struct Messages<'a, T> {
  340      rx: &'a Receiver<T>
  341  }
  342  
  343  /// The sending-half of Rust's asynchronous channel type. This half can only be
  344  /// owned by one task, but it can be cloned to send to other tasks.
  345  pub struct Sender<T> {
  346      inner: Unsafe<Flavor<T>>,
  347      sends: Cell<uint>,
  348      // can't share in an arc
  349      marker: marker::NoShare,
  350  }
  351  
  352  /// The sending-half of Rust's synchronous channel type. This half can only be
  353  /// owned by one task, but it can be cloned to send to other tasks.
  354  pub struct SyncSender<T> {
  355      inner: UnsafeArc<sync::Packet<T>>,
  356      // can't share in an arc
  357      marker: marker::NoShare,
  358  }
  359  
  360  /// This enumeration is the list of the possible reasons that try_recv could not
  361  /// return data when called.
  362  #[deriving(Eq, Clone, Show)]
  363  pub enum TryRecvError {
  364      /// This channel is currently empty, but the sender(s) have not yet
  365      /// disconnected, so data may yet become available.
  366      Empty,
  367      /// This channel's sending half has become disconnected, and there will
  368      /// never be any more data received on this channel
  369      Disconnected,
  370  }
  371  
  372  /// This enumeration is the list of the possible error outcomes for the
  373  /// `SyncSender::try_send` method.
  374  #[deriving(Eq, Clone, Show)]
  375  pub enum TrySendError<T> {
  376      /// The data could not be sent on the channel because it would require that
  377      /// the callee block to send the data.
  378      ///
  379      /// If this is a buffered channel, then the buffer is full at this time. If
  380      /// this is not a buffered channel, then there is no receiver available to
  381      /// acquire the data.
  382      Full(T),
  383      /// This channel's receiving half has disconnected, so the data could not be
  384      /// sent. The data is returned back to the callee in this case.
  385      RecvDisconnected(T),
  386  }
  387  
  388  enum Flavor<T> {
  389      Oneshot(UnsafeArc<oneshot::Packet<T>>),
  390      Stream(UnsafeArc<stream::Packet<T>>),
  391      Shared(UnsafeArc<shared::Packet<T>>),
  392      Sync(UnsafeArc<sync::Packet<T>>),
  393  }
  394  
  395  #[doc(hidden)]
  396  trait UnsafeFlavor<T> {
  397      fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>>;
  398      unsafe fn mut_inner<'a>(&'a self) -> &'a mut Flavor<T> {
  399          &mut *self.inner_unsafe().get()
  400      }
  401      unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
  402          &*self.inner_unsafe().get()
  403      }
  404  }
  405  impl<T> UnsafeFlavor<T> for Sender<T> {
  406      fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
  407          &self.inner
  408      }
  409  }
  410  impl<T> UnsafeFlavor<T> for Receiver<T> {
  411      fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
  412          &self.inner
  413      }
  414  }
  415  
  416  /// Creates a new asynchronous channel, returning the sender/receiver halves.
  417  ///
  418  /// All data sent on the sender will become available on the receiver, and no
  419  /// send will block the calling task (this channel has an "infinite buffer").
  420  ///
  421  /// # Example
  422  ///
  423  /// ```
  424  /// let (tx, rx) = channel();
  425  ///
  426  /// // Spawn off an expensive computation
  427  /// spawn(proc() {
  428  /// #   fn expensive_computation() {}
  429  ///     tx.send(expensive_computation());
  430  /// });
  431  ///
  432  /// // Do some useful work for awhile
  433  ///
  434  /// // Let's see what that answer was
  435  /// println!("{}", rx.recv());
  436  /// ```
  437  pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
  438      let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
  439      (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a)))
  440  }
  441  
  442  /// Creates a new synchronous, bounded channel.
  443  ///
  444  /// Like asynchronous channels, the `Receiver` will block until a message
  445  /// becomes available. These channels differ greatly in the semantics of the
  446  /// sender from asynchronous channels, however.
  447  ///
  448  /// This channel has an internal buffer on which messages will be queued. When
  449  /// the internal buffer becomes full, future sends will *block* waiting for the
  450  /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
  451  /// becomes  "rendezvous channel" where each send will not return until a recv
  452  /// is paired with it.
  453  ///
  454  /// As with asynchronous channels, all senders will fail in `send` if the
  455  /// `Receiver` has been destroyed.
  456  ///
  457  /// # Example
  458  ///
  459  /// ```
  460  /// let (tx, rx) = sync_channel(1);
  461  ///
  462  /// // this returns immediately
  463  /// tx.send(1);
  464  ///
  465  /// spawn(proc() {
  466  ///     // this will block until the previous message has been received
  467  ///     tx.send(2);
  468  /// });
  469  ///
  470  /// assert_eq!(rx.recv(), 1);
  471  /// assert_eq!(rx.recv(), 2);
  472  /// ```
  473  pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
  474      let (a, b) = UnsafeArc::new2(sync::Packet::new(bound));
  475      (SyncSender::new(a), Receiver::new(Sync(b)))
  476  }
  477  
  478  ////////////////////////////////////////////////////////////////////////////////
  479  // Sender
  480  ////////////////////////////////////////////////////////////////////////////////
  481  
  482  impl<T: Send> Sender<T> {
  483      fn new(innerFlavor<T>) -> Sender<T> {
  484          Sender { inner: Unsafe::new(inner), sends: Cell::new(0), marker: marker::NoShare }
  485      }
  486  
  487      /// Sends a value along this channel to be received by the corresponding
  488      /// receiver.
  489      ///
  490      /// Rust channels are infinitely buffered so this method will never block.
  491      ///
  492      /// # Failure
  493      ///
  494      /// This function will fail if the other end of the channel has hung up.
  495      /// This means that if the corresponding receiver has fallen out of scope,
  496      /// this function will trigger a fail message saying that a message is
  497      /// being sent on a closed channel.
  498      ///
  499      /// Note that if this function does *not* fail, it does not mean that the
  500      /// data will be successfully received. All sends are placed into a queue,
  501      /// so it is possible for a send to succeed (the other end is alive), but
  502      /// then the other end could immediately disconnect.
  503      ///
  504      /// The purpose of this functionality is to propagate failure among tasks.
  505      /// If failure is not desired, then consider using the `send_opt` method
  506      pub fn send(&self, tT) {
  507          if self.send_opt(t).is_err() {
  508              fail!("sending on a closed channel");
  509          }
  510      }
  511  
  512      /// Attempts to send a value on this channel, returning it back if it could
  513      /// not be sent.
  514      ///
  515      /// A successful send occurs when it is determined that the other end of
  516      /// the channel has not hung up already. An unsuccessful send would be one
  517      /// where the corresponding receiver has already been deallocated. Note
  518      /// that a return value of `Err` means that the data will never be
  519      /// received, but a return value of `Ok` does *not* mean that the data
  520      /// will be received.  It is possible for the corresponding receiver to
  521      /// hang up immediately after this function returns `Ok`.
  522      ///
  523      /// Like `send`, this method will never block.
  524      ///
  525      /// # Failure
  526      ///
  527      /// This method will never fail, it will return the message back to the
  528      /// caller if the other end is disconnected
  529      ///
  530      /// # Example
  531      ///
  532      /// ```
  533      /// let (tx, rx) = channel();
  534      ///
  535      /// // This send is always successful
  536      /// assert_eq!(tx.send_opt(1), Ok(()));
  537      ///
  538      /// // This send will fail because the receiver is gone
  539      /// drop(rx);
  540      /// assert_eq!(tx.send_opt(1), Err(1));
  541      /// ```
  542      pub fn send_opt(&self, tT) -> Result<(), T> {
  543          // In order to prevent starvation of other tasks in situations where
  544          // a task sends repeatedly without ever receiving, we occassionally
  545          // yield instead of doing a send immediately.
  546          //
  547          // Don't unconditionally attempt to yield because the TLS overhead can
  548          // be a bit much, and also use `try_take` instead of `take` because
  549          // there's no reason that this send shouldn't be usable off the
  550          // runtime.
  551          let cnt = self.sends.get() + 1;
  552          self.sends.set(cnt);
  553          if cnt % (RESCHED_FREQ as uint) == 0 {
  554              let taskOption<Box<Task>> = Local::try_take();
  555              task.map(|t| t.maybe_yield());
  556          }
  557  
  558          let (new_inner, ret) = match *unsafe { self.inner() } {
  559              Oneshot(ref p) => {
  560                  let p = p.get();
  561                  unsafe {
  562                      if !(*p).sent() {
  563                          return (*p).send(t);
  564                      } else {
  565                          let (a, b) = UnsafeArc::new2(stream::Packet::new());
  566                          match (*p).upgrade(Receiver::new(Stream(b))) {
  567                              oneshot::UpSuccess => {
  568                                  let ret = (*a.get()).send(t);
  569                                  (a, ret)
  570                              }
  571                              oneshot::UpDisconnected => (a, Err(t)),
  572                              oneshot::UpWoke(task) => {
  573                                  // This send cannot fail because the task is
  574                                  // asleep (we're looking at it), so the receiver
  575                                  // can't go away.
  576                                  (*a.get()).send(t).ok().unwrap();
  577                                  task.wake().map(|t| t.reawaken());
  578                                  (a, Ok(()))
  579                              }
  580                          }
  581                      }
  582                  }
  583              }
  584              Stream(ref p) => return unsafe { (*p.get()).send(t) },
  585              Shared(ref p) => return unsafe { (*p.get()).send(t) },
  586              Sync(..) => unreachable!(),
  587          };
  588  
  589          unsafe {
  590              let tmp = Sender::new(Stream(new_inner));
  591              mem::swap(self.mut_inner(), tmp.mut_inner());
  592          }
  593          return ret;
  594      }
  595  }
  596  
  597  impl<T: Send> Clone for Sender<T> {
  598      fn clone(&self) -> Sender<T> {
  599          let (packet, sleeper) = match *unsafe { self.inner() } {
  600              Oneshot(ref p) => {
  601                  let (a, b) = UnsafeArc::new2(shared::Packet::new());
  602                  match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
  603                      oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
  604                      oneshot::UpWoke(task) => (b, Some(task))
  605                  }
  606              }
  607              Stream(ref p) => {
  608                  let (a, b) = UnsafeArc::new2(shared::Packet::new());
  609                  match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
  610                      stream::UpSuccess | stream::UpDisconnected => (b, None),
  611                      stream::UpWoke(task) => (b, Some(task)),
  612                  }
  613              }
  614              Shared(ref p) => {
  615                  unsafe { (*p.get()).clone_chan(); }
  616                  return Sender::new(Shared(p.clone()));
  617              }
  618              Sync(..) => unreachable!(),
  619          };
  620  
  621          unsafe {
  622              (*packet.get()).inherit_blocker(sleeper);
  623  
  624              let tmp = Sender::new(Shared(packet.clone()));
  625              mem::swap(self.mut_inner(), tmp.mut_inner());
  626          }
  627          Sender::new(Shared(packet))
  628      }
  629  }
  630  
  631  #[unsafe_destructor]
  632  impl<T: Send> Drop for Sender<T> {
  633      fn drop(&mut self) {
  634          match *unsafe { self.mut_inner() } {
  635              Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
  636              Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
  637              Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
  638              Sync(..) => unreachable!(),
  639          }
  640      }
  641  }
  642  
  643  ////////////////////////////////////////////////////////////////////////////////
  644  // SyncSender
  645  ////////////////////////////////////////////////////////////////////////////////
  646  
  647  impl<T: Send> SyncSender<T> {
  648      fn new(innerUnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
  649          SyncSender { inner: inner, marker: marker::NoShare }
  650      }
  651  
  652      /// Sends a value on this synchronous channel.
  653      ///
  654      /// This function will *block* until space in the internal buffer becomes
  655      /// available or a receiver is available to hand off the message to.
  656      ///
  657      /// Note that a successful send does *not* guarantee that the receiver will
  658      /// ever see the data if there is a buffer on this channel. Messages may be
  659      /// enqueued in the internal buffer for the receiver to receive at a later
  660      /// time. If the buffer size is 0, however, it can be guaranteed that the
  661      /// receiver has indeed received the data if this function returns success.
  662      ///
  663      /// # Failure
  664      ///
  665      /// Similarly to `Sender::send`, this function will fail if the
  666      /// corresponding `Receiver` for this channel has disconnected. This
  667      /// behavior is used to propagate failure among tasks.
  668      ///
  669      /// If failure is not desired, you can achieve the same semantics with the
  670      /// `SyncSender::send_opt` method which will not fail if the receiver
  671      /// disconnects.
  672      pub fn send(&self, tT) {
  673          if self.send_opt(t).is_err() {
  674              fail!("sending on a closed channel");
  675          }
  676      }
  677  
  678      /// Send a value on a channel, returning it back if the receiver
  679      /// disconnected
  680      ///
  681      /// This method will *block* to send the value `t` on the channel, but if
  682      /// the value could not be sent due to the receiver disconnecting, the value
  683      /// is returned back to the callee. This function is similar to `try_send`,
  684      /// except that it will block if the channel is currently full.
  685      ///
  686      /// # Failure
  687      ///
  688      /// This function cannot fail.
  689      pub fn send_opt(&self, tT) -> Result<(), T> {
  690          unsafe { (*self.inner.get()).send(t) }
  691      }
  692  
  693      /// Attempts to send a value on this channel without blocking.
  694      ///
  695      /// This method differs from `send_opt` by returning immediately if the
  696      /// channel's buffer is full or no receiver is waiting to acquire some
  697      /// data. Compared with `send_opt`, this function has two failure cases
  698      /// instead of one (one for disconnection, one for a full buffer).
  699      ///
  700      /// See `SyncSender::send` for notes about guarantees of whether the
  701      /// receiver has received the data or not if this function is successful.
  702      ///
  703      /// # Failure
  704      ///
  705      /// This function cannot fail
  706      pub fn try_send(&self, tT) -> Result<(), TrySendError<T>> {
  707          unsafe { (*self.inner.get()).try_send(t) }
  708      }
  709  }
  710  
  711  impl<T: Send> Clone for SyncSender<T> {
  712      fn clone(&self) -> SyncSender<T> {
  713          unsafe { (*self.inner.get()).clone_chan(); }
  714          return SyncSender::new(self.inner.clone());
  715      }
  716  }
  717  
  718  #[unsafe_destructor]
  719  impl<T: Send> Drop for SyncSender<T> {
  720      fn drop(&mut self) {
  721          unsafe { (*self.inner.get()).drop_chan(); }
  722      }
  723  }
  724  
  725  ////////////////////////////////////////////////////////////////////////////////
  726  // Receiver
  727  ////////////////////////////////////////////////////////////////////////////////
  728  
  729  impl<T: Send> Receiver<T> {
  730      fn new(innerFlavor<T>) -> Receiver<T> {
  731          Receiver { inner: Unsafe::new(inner), receives: Cell::new(0), marker: marker::NoShare }
  732      }
  733  
  734      /// Blocks waiting for a value on this receiver
  735      ///
  736      /// This function will block if necessary to wait for a corresponding send
  737      /// on the channel from its paired `Sender` structure. This receiver will
  738      /// be woken up when data is ready, and the data will be returned.
  739      ///
  740      /// # Failure
  741      ///
  742      /// Similar to channels, this method will trigger a task failure if the
  743      /// other end of the channel has hung up (been deallocated). The purpose of
  744      /// this is to propagate failure among tasks.
  745      ///
  746      /// If failure is not desired, then there are two options:
  747      ///
  748      /// * If blocking is still desired, the `recv_opt` method will return `None`
  749      ///   when the other end hangs up
  750      ///
  751      /// * If blocking is not desired, then the `try_recv` method will attempt to
  752      ///   peek at a value on this receiver.
  753      pub fn recv(&self) -> T {
  754          match self.recv_opt() {
  755              Ok(t) => t,
  756              Err(()) => fail!("receiving on a closed channel"),
  757          }
  758      }
  759  
  760      /// Attempts to return a pending value on this receiver without blocking
  761      ///
  762      /// This method will never block the caller in order to wait for data to
  763      /// become available. Instead, this will always return immediately with a
  764      /// possible option of pending data on the channel.
  765      ///
  766      /// This is useful for a flavor of "optimistic check" before deciding to
  767      /// block on a receiver.
  768      ///
  769      /// This function cannot fail.
  770      pub fn try_recv(&self) -> Result<T, TryRecvError> {
  771          // If a thread is spinning in try_recv, we should take the opportunity
  772          // to reschedule things occasionally. See notes above in scheduling on
  773          // sends for why this doesn't always hit TLS, and also for why this uses
  774          // `try_take` instead of `take`.
  775          let cnt = self.receives.get() + 1;
  776          self.receives.set(cnt);
  777          if cnt % (RESCHED_FREQ as uint) == 0 {
  778              let taskOption<Box<Task>> = Local::try_take();
  779              task.map(|t| t.maybe_yield());
  780          }
  781  
  782          loop {
  783              let new_port = match *unsafe { self.inner() } {
  784                  Oneshot(ref p) => {
  785                      match unsafe { (*p.get()).try_recv() } {
  786                          Ok(t) => return Ok(t),
  787                          Err(oneshot::Empty) => return Err(Empty),
  788                          Err(oneshot::Disconnected) => return Err(Disconnected),
  789                          Err(oneshot::Upgraded(rx)) => rx,
  790                      }
  791                  }
  792                  Stream(ref p) => {
  793                      match unsafe { (*p.get()).try_recv() } {
  794                          Ok(t) => return Ok(t),
  795                          Err(stream::Empty) => return Err(Empty),
  796                          Err(stream::Disconnected) => return Err(Disconnected),
  797                          Err(stream::Upgraded(rx)) => rx,
  798                      }
  799                  }
  800                  Shared(ref p) => {
  801                      match unsafe { (*p.get()).try_recv() } {
  802                          Ok(t) => return Ok(t),
  803                          Err(shared::Empty) => return Err(Empty),
  804                          Err(shared::Disconnected) => return Err(Disconnected),
  805                      }
  806                  }
  807                  Sync(ref p) => {
  808                      match unsafe { (*p.get()).try_recv() } {
  809                          Ok(t) => return Ok(t),
  810                          Err(sync::Empty) => return Err(Empty),
  811                          Err(sync::Disconnected) => return Err(Disconnected),
  812                      }
  813                  }
  814              };
  815              unsafe {
  816                  mem::swap(self.mut_inner(),
  817                            new_port.mut_inner());
  818              }
  819          }
  820      }
  821  
  822      /// Attempt to wait for a value on this receiver, but does not fail if the
  823      /// corresponding channel has hung up.
  824      ///
  825      /// This implementation of iterators for ports will always block if there is
  826      /// not data available on the receiver, but it will not fail in the case
  827      /// that the channel has been deallocated.
  828      ///
  829      /// In other words, this function has the same semantics as the `recv`
  830      /// method except for the failure aspect.
  831      ///
  832      /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of
  833      /// the value found on the receiver is returned.
  834      pub fn recv_opt(&self) -> Result<T, ()> {
  835          loop {
  836              let new_port = match *unsafe { self.inner() } {
  837                  Oneshot(ref p) => {
  838                      match unsafe { (*p.get()).recv() } {
  839                          Ok(t) => return Ok(t),
  840                          Err(oneshot::Empty) => return unreachable!(),
  841                          Err(oneshot::Disconnected) => return Err(()),
  842                          Err(oneshot::Upgraded(rx)) => rx,
  843                      }
  844                  }
  845                  Stream(ref p) => {
  846                      match unsafe { (*p.get()).recv() } {
  847                          Ok(t) => return Ok(t),
  848                          Err(stream::Empty) => return unreachable!(),
  849                          Err(stream::Disconnected) => return Err(()),
  850                          Err(stream::Upgraded(rx)) => rx,
  851                      }
  852                  }
  853                  Shared(ref p) => {
  854                      match unsafe { (*p.get()).recv() } {
  855                          Ok(t) => return Ok(t),
  856                          Err(shared::Empty) => return unreachable!(),
  857                          Err(shared::Disconnected) => return Err(()),
  858                      }
  859                  }
  860                  Sync(ref p) => return unsafe { (*p.get()).recv() }
  861              };
  862              unsafe {
  863                  mem::swap(self.mut_inner(), new_port.mut_inner());
  864              }
  865          }
  866      }
  867  
  868      /// Returns an iterator which will block waiting for messages, but never
  869      /// `fail!`. It will return `None` when the channel has hung up.
  870      pub fn iter<'a>(&'a self) -> Messages<'a, T> {
  871          Messages { rx: self }
  872      }
  873  }
  874  
  875  impl<T: Send> select::Packet for Receiver<T> {
  876      fn can_recv(&self) -> bool {
  877          loop {
  878              let new_port = match *unsafe { self.inner() } {
  879                  Oneshot(ref p) => {
  880                      match unsafe { (*p.get()).can_recv() } {
  881                          Ok(ret) => return ret,
  882                          Err(upgrade) => upgrade,
  883                      }
  884                  }
  885                  Stream(ref p) => {
  886                      match unsafe { (*p.get()).can_recv() } {
  887                          Ok(ret) => return ret,
  888                          Err(upgrade) => upgrade,
  889                      }
  890                  }
  891                  Shared(ref p) => {
  892                      return unsafe { (*p.get()).can_recv() };
  893                  }
  894                  Sync(ref p) => {
  895                      return unsafe { (*p.get()).can_recv() };
  896                  }
  897              };
  898              unsafe {
  899                  mem::swap(self.mut_inner(),
  900                            new_port.mut_inner());
  901              }
  902          }
  903      }
  904  
  905      fn start_selection(&self, mut taskBlockedTask) -> Result<(), BlockedTask>{
  906          loop {
  907              let (t, new_port) = match *unsafe { self.inner() } {
  908                  Oneshot(ref p) => {
  909                      match unsafe { (*p.get()).start_selection(task) } {
  910                          oneshot::SelSuccess => return Ok(()),
  911                          oneshot::SelCanceled(task) => return Err(task),
  912                          oneshot::SelUpgraded(t, rx) => (t, rx),
  913                      }
  914                  }
  915                  Stream(ref p) => {
  916                      match unsafe { (*p.get()).start_selection(task) } {
  917                          stream::SelSuccess => return Ok(()),
  918                          stream::SelCanceled(task) => return Err(task),
  919                          stream::SelUpgraded(t, rx) => (t, rx),
  920                      }
  921                  }
  922                  Shared(ref p) => {
  923                      return unsafe { (*p.get()).start_selection(task) };
  924                  }
  925                  Sync(ref p) => {
  926                      return unsafe { (*p.get()).start_selection(task) };
  927                  }
  928              };
  929              task = t;
  930              unsafe {
  931                  mem::swap(self.mut_inner(),
  932                            new_port.mut_inner());
  933              }
  934          }
  935      }
  936  
  937      fn abort_selection(&self) -> bool {
  938          let mut was_upgrade = false;
  939          loop {
  940              let result = match *unsafe { self.inner() } {
  941                  Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
  942                  Stream(ref p) => unsafe {
  943                      (*p.get()).abort_selection(was_upgrade)
  944                  },
  945                  Shared(ref p) => return unsafe {
  946                      (*p.get()).abort_selection(was_upgrade)
  947                  },
  948                  Sync(ref p) => return unsafe {
  949                      (*p.get()).abort_selection()
  950                  },
  951              };
  952              let new_port = match result { Ok(b) => return b, Err(p) => p };
  953              was_upgrade = true;
  954              unsafe {
  955                  mem::swap(self.mut_inner(),
  956                            new_port.mut_inner());
  957              }
  958          }
  959      }
  960  }
  961  
  962  impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
  963      fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
  964  }
  965  
  966  #[unsafe_destructor]
  967  impl<T: Send> Drop for Receiver<T> {
  968      fn drop(&mut self) {
  969          match *unsafe { self.mut_inner() } {
  970              Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
  971              Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
  972              Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
  973              Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
  974          }
  975      }
  976  }
  977  
  978  #[cfg(test)]
  979  mod test {
  980      use prelude::*;
  981  
  982      use native;
  983      use os;
  984      use super::*;
  985  
  986      pub fn stress_factor() -> uint {
  987          match os::getenv("RUST_TEST_STRESS") {
  988              Some(val) => from_str::<uint>(val).unwrap(),
  989              None => 1,
  990          }
  991      }
  992  
  993      test!(fn smoke() {
  994          let (tx, rx) = channel();
  995          tx.send(1);
  996          assert_eq!(rx.recv(), 1);
  997      })
  998  
  999      test!(fn drop_full() {
 1000          let (tx, _rx) = channel();
 1001          tx.send(box 1);
 1002      })
 1003  
 1004      test!(fn drop_full_shared() {
 1005          let (tx, _rx) = channel();
 1006          drop(tx.clone());
 1007          drop(tx.clone());
 1008          tx.send(box 1);
 1009      })
 1010  
 1011      test!(fn smoke_shared() {
 1012          let (tx, rx) = channel();
 1013          tx.send(1);
 1014          assert_eq!(rx.recv(), 1);
 1015          let tx = tx.clone();
 1016          tx.send(1);
 1017          assert_eq!(rx.recv(), 1);
 1018      })
 1019  
 1020      test!(fn smoke_threads() {
 1021          let (tx, rx) = channel();
 1022          spawn(proc() {
 1023              tx.send(1);
 1024          });
 1025          assert_eq!(rx.recv(), 1);
 1026      })
 1027  
 1028      test!(fn smoke_port_gone() {
 1029          let (tx, rx) = channel();
 1030          drop(rx);
 1031          tx.send(1);
 1032      } #[should_fail])
 1033  
 1034      test!(fn smoke_shared_port_gone() {
 1035          let (tx, rx) = channel();
 1036          drop(rx);
 1037          tx.send(1);
 1038      } #[should_fail])
 1039  
 1040      test!(fn smoke_shared_port_gone2() {
 1041          let (tx, rx) = channel();
 1042          drop(rx);
 1043          let tx2 = tx.clone();
 1044          drop(tx);
 1045          tx2.send(1);
 1046      } #[should_fail])
 1047  
 1048      test!(fn port_gone_concurrent() {
 1049          let (tx, rx) = channel();
 1050          spawn(proc() {
 1051              rx.recv();
 1052          });
 1053          loop { tx.send(1) }
 1054      } #[should_fail])
 1055  
 1056      test!(fn port_gone_concurrent_shared() {
 1057          let (tx, rx) = channel();
 1058          let tx2 = tx.clone();
 1059          spawn(proc() {
 1060              rx.recv();
 1061          });
 1062          loop {
 1063              tx.send(1);
 1064              tx2.send(1);
 1065          }
 1066      } #[should_fail])
 1067  
 1068      test!(fn smoke_chan_gone() {
 1069          let (tx, rx) = channel::<int>();
 1070          drop(tx);
 1071          rx.recv();
 1072      } #[should_fail])
 1073  
 1074      test!(fn smoke_chan_gone_shared() {
 1075          let (tx, rx) = channel::<()>();
 1076          let tx2 = tx.clone();
 1077          drop(tx);
 1078          drop(tx2);
 1079          rx.recv();
 1080      } #[should_fail])
 1081  
 1082      test!(fn chan_gone_concurrent() {
 1083          let (tx, rx) = channel();
 1084          spawn(proc() {
 1085              tx.send(1);
 1086              tx.send(1);
 1087          });
 1088          loop { rx.recv(); }
 1089      } #[should_fail])
 1090  
 1091      test!(fn stress() {
 1092          let (tx, rx) = channel();
 1093          spawn(proc() {
 1094              for _ in range(0, 10000) { tx.send(1); }
 1095          });
 1096          for _ in range(0, 10000) {
 1097              assert_eq!(rx.recv(), 1);
 1098          }
 1099      })
 1100  
 1101      test!(fn stress_shared() {
 1102          static AMT: uint = 10000;
 1103          static NTHREADS: uint = 8;
 1104          let (tx, rx) = channel::<int>();
 1105          let (dtx, drx) = channel::<()>();
 1106  
 1107          spawn(proc() {
 1108              for _ in range(0, AMT * NTHREADS) {
 1109                  assert_eq!(rx.recv(), 1);
 1110              }
 1111              match rx.try_recv() {
 1112                  Ok(..) => fail!(),
 1113                  _ => {}
 1114              }
 1115              dtx.send(());
 1116          });
 1117  
 1118          for _ in range(0, NTHREADS) {
 1119              let tx = tx.clone();
 1120              spawn(proc() {
 1121                  for _ in range(0, AMT) { tx.send(1); }
 1122              });
 1123          }
 1124          drop(tx);
 1125          drx.recv();
 1126      })
 1127  
 1128      #[test]
 1129      fn send_from_outside_runtime() {
 1130          let (tx1, rx1) = channel::<()>();
 1131          let (tx2, rx2) = channel::<int>();
 1132          let (tx3, rx3) = channel::<()>();
 1133          let tx4 = tx3.clone();
 1134          spawn(proc() {
 1135              tx1.send(());
 1136              for _ in range(0, 40) {
 1137                  assert_eq!(rx2.recv(), 1);
 1138              }
 1139              tx3.send(());
 1140          });
 1141          rx1.recv();
 1142          native::task::spawn(proc() {
 1143              for _ in range(0, 40) {
 1144                  tx2.send(1);
 1145              }
 1146              tx4.send(());
 1147          });
 1148          rx3.recv();
 1149          rx3.recv();
 1150      }
 1151  
 1152      #[test]
 1153      fn recv_from_outside_runtime() {
 1154          let (tx, rx) = channel::<int>();
 1155          let (dtx, drx) = channel();
 1156          native::task::spawn(proc() {
 1157              for _ in range(0, 40) {
 1158                  assert_eq!(rx.recv(), 1);
 1159              }
 1160              dtx.send(());
 1161          });
 1162          for _ in range(0, 40) {
 1163              tx.send(1);
 1164          }
 1165          drx.recv();
 1166      }
 1167  
 1168      #[test]
 1169      fn no_runtime() {
 1170          let (tx1, rx1) = channel::<int>();
 1171          let (tx2, rx2) = channel::<int>();
 1172          let (tx3, rx3) = channel::<()>();
 1173          let tx4 = tx3.clone();
 1174          native::task::spawn(proc() {
 1175              assert_eq!(rx1.recv(), 1);
 1176              tx2.send(2);
 1177              tx4.send(());
 1178          });
 1179          native::task::spawn(proc() {
 1180              tx1.send(1);
 1181              assert_eq!(rx2.recv(), 2);
 1182              tx3.send(());
 1183          });
 1184          rx3.recv();
 1185          rx3.recv();
 1186      }
 1187  
 1188      test!(fn oneshot_single_thread_close_port_first() {
 1189          // Simple test of closing without sending
 1190          let (_tx, rx) = channel::<int>();
 1191          drop(rx);
 1192      })
 1193  
 1194      test!(fn oneshot_single_thread_close_chan_first() {
 1195          // Simple test of closing without sending
 1196          let (tx, _rx) = channel::<int>();
 1197          drop(tx);
 1198      })
 1199  
 1200      test!(fn oneshot_single_thread_send_port_close() {
 1201          // Testing that the sender cleans up the payload if receiver is closed
 1202          let (tx, rx) = channel::<Box<int>>();
 1203          drop(rx);
 1204          tx.send(box 0);
 1205      } #[should_fail])
 1206  
 1207      test!(fn oneshot_single_thread_recv_chan_close() {
 1208          // Receiving on a closed chan will fail
 1209          let res = task::try(proc() {
 1210              let (tx, rx) = channel::<int>();
 1211              drop(tx);
 1212              rx.recv();
 1213          });
 1214          // What is our res?
 1215          assert!(res.is_err());
 1216      })
 1217  
 1218      test!(fn oneshot_single_thread_send_then_recv() {
 1219          let (tx, rx) = channel::<Box<int>>();
 1220          tx.send(box 10);
 1221          assert!(rx.recv() == box 10);
 1222      })
 1223  
 1224      test!(fn oneshot_single_thread_try_send_open() {
 1225          let (tx, rx) = channel::<int>();
 1226          assert!(tx.send_opt(10).is_ok());
 1227          assert!(rx.recv() == 10);
 1228      })
 1229  
 1230      test!(fn oneshot_single_thread_try_send_closed() {
 1231          let (tx, rx) = channel::<int>();
 1232          drop(rx);
 1233          assert!(tx.send_opt(10).is_err());
 1234      })
 1235  
 1236      test!(fn oneshot_single_thread_try_recv_open() {
 1237          let (tx, rx) = channel::<int>();
 1238          tx.send(10);
 1239          assert!(rx.recv_opt() == Ok(10));
 1240      })
 1241  
 1242      test!(fn oneshot_single_thread_try_recv_closed() {
 1243          let (tx, rx) = channel::<int>();
 1244          drop(tx);
 1245          assert!(rx.recv_opt() == Err(()));
 1246      })
 1247  
 1248      test!(fn oneshot_single_thread_peek_data() {
 1249          let (tx, rx) = channel::<int>();
 1250          assert_eq!(rx.try_recv(), Err(Empty))
 1251          tx.send(10);
 1252          assert_eq!(rx.try_recv(), Ok(10));
 1253      })
 1254  
 1255      test!(fn oneshot_single_thread_peek_close() {
 1256          let (tx, rx) = channel::<int>();
 1257          drop(tx);
 1258          assert_eq!(rx.try_recv(), Err(Disconnected));
 1259          assert_eq!(rx.try_recv(), Err(Disconnected));
 1260      })
 1261  
 1262      test!(fn oneshot_single_thread_peek_open() {
 1263          let (_tx, rx) = channel::<int>();
 1264          assert_eq!(rx.try_recv(), Err(Empty));
 1265      })
 1266  
 1267      test!(fn oneshot_multi_task_recv_then_send() {
 1268          let (tx, rx) = channel::<Box<int>>();
 1269          spawn(proc() {
 1270              assert!(rx.recv() == box 10);
 1271          });
 1272  
 1273          tx.send(box 10);
 1274      })
 1275  
 1276      test!(fn oneshot_multi_task_recv_then_close() {
 1277          let (tx, rx) = channel::<Box<int>>();
 1278          spawn(proc() {
 1279              drop(tx);
 1280          });
 1281          let res = task::try(proc() {
 1282              assert!(rx.recv() == box 10);
 1283          });
 1284          assert!(res.is_err());
 1285      })
 1286  
 1287      test!(fn oneshot_multi_thread_close_stress() {
 1288          for _ in range(0, stress_factor()) {
 1289              let (tx, rx) = channel::<int>();
 1290              spawn(proc() {
 1291                  drop(rx);
 1292              });
 1293              drop(tx);
 1294          }
 1295      })
 1296  
 1297      test!(fn oneshot_multi_thread_send_close_stress() {
 1298          for _ in range(0, stress_factor()) {
 1299              let (tx, rx) = channel::<int>();
 1300              spawn(proc() {
 1301                  drop(rx);
 1302              });
 1303              let _ = task::try(proc() {
 1304                  tx.send(1);
 1305              });
 1306          }
 1307      })
 1308  
 1309      test!(fn oneshot_multi_thread_recv_close_stress() {
 1310          for _ in range(0, stress_factor()) {
 1311              let (tx, rx) = channel::<int>();
 1312              spawn(proc() {
 1313                  let res = task::try(proc() {
 1314                      rx.recv();
 1315                  });
 1316                  assert!(res.is_err());
 1317              });
 1318              spawn(proc() {
 1319                  spawn(proc() {
 1320                      drop(tx);
 1321                  });
 1322              });
 1323          }
 1324      })
 1325  
 1326      test!(fn oneshot_multi_thread_send_recv_stress() {
 1327          for _ in range(0, stress_factor()) {
 1328              let (tx, rx) = channel();
 1329              spawn(proc() {
 1330                  tx.send(box 10);
 1331              });
 1332              spawn(proc() {
 1333                  assert!(rx.recv() == box 10);
 1334              });
 1335          }
 1336      })
 1337  
 1338      test!(fn stream_send_recv_stress() {
 1339          for _ in range(0, stress_factor()) {
 1340              let (tx, rx) = channel();
 1341  
 1342              send(tx, 0);
 1343              recv(rx, 0);
 1344  
 1345              fn send(tx: Sender<Box<int>>, i: int) {
 1346                  if i == 10 { return }
 1347  
 1348                  spawn(proc() {
 1349                      tx.send(box i);
 1350                      send(tx, i + 1);
 1351                  });
 1352              }
 1353  
 1354              fn recv(rx: Receiver<Box<int>>, i: int) {
 1355                  if i == 10 { return }
 1356  
 1357                  spawn(proc() {
 1358                      assert!(rx.recv() == box i);
 1359                      recv(rx, i + 1);
 1360                  });
 1361              }
 1362          }
 1363      })
 1364  
 1365      test!(fn recv_a_lot() {
 1366          // Regression test that we don't run out of stack in scheduler context
 1367          let (tx, rx) = channel();
 1368          for _ in range(0, 10000) { tx.send(()); }
 1369          for _ in range(0, 10000) { rx.recv(); }
 1370      })
 1371  
 1372      test!(fn shared_chan_stress() {
 1373          let (tx, rx) = channel();
 1374          let total = stress_factor() + 100;
 1375          for _ in range(0, total) {
 1376              let tx = tx.clone();
 1377              spawn(proc() {
 1378                  tx.send(());
 1379              });
 1380          }
 1381  
 1382          for _ in range(0, total) {
 1383              rx.recv();
 1384          }
 1385      })
 1386  
 1387      test!(fn test_nested_recv_iter() {
 1388          let (tx, rx) = channel::<int>();
 1389          let (total_tx, total_rx) = channel::<int>();
 1390  
 1391          spawn(proc() {
 1392              let mut acc = 0;
 1393              for x in rx.iter() {
 1394                  acc += x;
 1395              }
 1396              total_tx.send(acc);
 1397          });
 1398  
 1399          tx.send(3);
 1400          tx.send(1);
 1401          tx.send(2);
 1402          drop(tx);
 1403          assert_eq!(total_rx.recv(), 6);
 1404      })
 1405  
 1406      test!(fn test_recv_iter_break() {
 1407          let (tx, rx) = channel::<int>();
 1408          let (count_tx, count_rx) = channel();
 1409  
 1410          spawn(proc() {
 1411              let mut count = 0;
 1412              for x in rx.iter() {
 1413                  if count >= 3 {
 1414                      break;
 1415                  } else {
 1416                      count += x;
 1417                  }
 1418              }
 1419              count_tx.send(count);
 1420          });
 1421  
 1422          tx.send(2);
 1423          tx.send(2);
 1424          tx.send(2);
 1425          let _ = tx.send_opt(2);
 1426          drop(tx);
 1427          assert_eq!(count_rx.recv(), 4);
 1428      })
 1429  
 1430      test!(fn try_recv_states() {
 1431          let (tx1, rx1) = channel::<int>();
 1432          let (tx2, rx2) = channel::<()>();
 1433          let (tx3, rx3) = channel::<()>();
 1434          spawn(proc() {
 1435              rx2.recv();
 1436              tx1.send(1);
 1437              tx3.send(());
 1438              rx2.recv();
 1439              drop(tx1);
 1440              tx3.send(());
 1441          });
 1442  
 1443          assert_eq!(rx1.try_recv(), Err(Empty));
 1444          tx2.send(());
 1445          rx3.recv();
 1446          assert_eq!(rx1.try_recv(), Ok(1));
 1447          assert_eq!(rx1.try_recv(), Err(Empty));
 1448          tx2.send(());
 1449          rx3.recv();
 1450          assert_eq!(rx1.try_recv(), Err(Disconnected));
 1451      })
 1452  
 1453      // This bug used to end up in a livelock inside of the Receiver destructor
 1454      // because the internal state of the Shared packet was corrupted
 1455      test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
 1456          let (tx, rx) = channel();
 1457          let (tx2, rx2) = channel();
 1458          spawn(proc() {
 1459              rx.recv(); // wait on a oneshot
 1460              drop(rx);  // destroy a shared
 1461              tx2.send(());
 1462          });
 1463          // make sure the other task has gone to sleep
 1464          for _ in range(0, 5000) { task::deschedule(); }
 1465  
 1466          // upgrade to a shared chan and send a message
 1467          let t = tx.clone();
 1468          drop(tx);
 1469          t.send(());
 1470  
 1471          // wait for the child task to exit before we exit
 1472          rx2.recv();
 1473      })
 1474  
 1475      test!(fn sends_off_the_runtime() {
 1476          use rt::thread::Thread;
 1477  
 1478          let (tx, rx) = channel();
 1479          let t = Thread::start(proc() {
 1480              for _ in range(0, 1000) {
 1481                  tx.send(());
 1482              }
 1483          });
 1484          for _ in range(0, 1000) {
 1485              rx.recv();
 1486          }
 1487          t.join();
 1488      })
 1489  
 1490      test!(fn try_recvs_off_the_runtime() {
 1491          use rt::thread::Thread;
 1492  
 1493          let (tx, rx) = channel();
 1494          let (cdone, pdone) = channel();
 1495          let t = Thread::start(proc() {
 1496              let mut hits = 0;
 1497              while hits < 10 {
 1498                  match rx.try_recv() {
 1499                      Ok(()) => { hits += 1; }
 1500                      Err(Empty) => { Thread::yield_now(); }
 1501                      Err(Disconnected) => return,
 1502                  }
 1503              }
 1504              cdone.send(());
 1505          });
 1506          for _ in range(0, 10) {
 1507              tx.send(());
 1508          }
 1509          t.join();
 1510          pdone.recv();
 1511      })
 1512  }
 1513  
 1514  #[cfg(test)]
 1515  mod sync_tests {
 1516      use prelude::*;
 1517      use os;
 1518  
 1519      pub fn stress_factor() -> uint {
 1520          match os::getenv("RUST_TEST_STRESS") {
 1521              Some(val) => from_str::<uint>(val).unwrap(),
 1522              None => 1,
 1523          }
 1524      }
 1525  
 1526      test!(fn smoke() {
 1527          let (tx, rx) = sync_channel(1);
 1528          tx.send(1);
 1529          assert_eq!(rx.recv(), 1);
 1530      })
 1531  
 1532      test!(fn drop_full() {
 1533          let (tx, _rx) = sync_channel(1);
 1534          tx.send(box 1);
 1535      })
 1536  
 1537      test!(fn smoke_shared() {
 1538          let (tx, rx) = sync_channel(1);
 1539          tx.send(1);
 1540          assert_eq!(rx.recv(), 1);
 1541          let tx = tx.clone();
 1542          tx.send(1);
 1543          assert_eq!(rx.recv(), 1);
 1544      })
 1545  
 1546      test!(fn smoke_threads() {
 1547          let (tx, rx) = sync_channel(0);
 1548          spawn(proc() {
 1549              tx.send(1);
 1550          });
 1551          assert_eq!(rx.recv(), 1);
 1552      })
 1553  
 1554      test!(fn smoke_port_gone() {
 1555          let (tx, rx) = sync_channel(0);
 1556          drop(rx);
 1557          tx.send(1);
 1558      } #[should_fail])
 1559  
 1560      test!(fn smoke_shared_port_gone2() {
 1561          let (tx, rx) = sync_channel(0);
 1562          drop(rx);
 1563          let tx2 = tx.clone();
 1564          drop(tx);
 1565          tx2.send(1);
 1566      } #[should_fail])
 1567  
 1568      test!(fn port_gone_concurrent() {
 1569          let (tx, rx) = sync_channel(0);
 1570          spawn(proc() {
 1571              rx.recv();
 1572          });
 1573          loop { tx.send(1) }
 1574      } #[should_fail])
 1575  
 1576      test!(fn port_gone_concurrent_shared() {
 1577          let (tx, rx) = sync_channel(0);
 1578          let tx2 = tx.clone();
 1579          spawn(proc() {
 1580              rx.recv();
 1581          });
 1582          loop {
 1583              tx.send(1);
 1584              tx2.send(1);
 1585          }
 1586      } #[should_fail])
 1587  
 1588      test!(fn smoke_chan_gone() {
 1589          let (tx, rx) = sync_channel::<int>(0);
 1590          drop(tx);
 1591          rx.recv();
 1592      } #[should_fail])
 1593  
 1594      test!(fn smoke_chan_gone_shared() {
 1595          let (tx, rx) = sync_channel::<()>(0);
 1596          let tx2 = tx.clone();
 1597          drop(tx);
 1598          drop(tx2);
 1599          rx.recv();
 1600      } #[should_fail])
 1601  
 1602      test!(fn chan_gone_concurrent() {
 1603          let (tx, rx) = sync_channel(0);
 1604          spawn(proc() {
 1605              tx.send(1);
 1606              tx.send(1);
 1607          });
 1608          loop { rx.recv(); }
 1609      } #[should_fail])
 1610  
 1611      test!(fn stress() {
 1612          let (tx, rx) = sync_channel(0);
 1613          spawn(proc() {
 1614              for _ in range(0, 10000) { tx.send(1); }
 1615          });
 1616          for _ in range(0, 10000) {
 1617              assert_eq!(rx.recv(), 1);
 1618          }
 1619      })
 1620  
 1621      test!(fn stress_shared() {
 1622          static AMT: uint = 1000;
 1623          static NTHREADS: uint = 8;
 1624          let (tx, rx) = sync_channel::<int>(0);
 1625          let (dtx, drx) = sync_channel::<()>(0);
 1626  
 1627          spawn(proc() {
 1628              for _ in range(0, AMT * NTHREADS) {
 1629                  assert_eq!(rx.recv(), 1);
 1630              }
 1631              match rx.try_recv() {
 1632                  Ok(..) => fail!(),
 1633                  _ => {}
 1634              }
 1635              dtx.send(());
 1636          });
 1637  
 1638          for _ in range(0, NTHREADS) {
 1639              let tx = tx.clone();
 1640              spawn(proc() {
 1641                  for _ in range(0, AMT) { tx.send(1); }
 1642              });
 1643          }
 1644          drop(tx);
 1645          drx.recv();
 1646      })
 1647  
 1648      test!(fn oneshot_single_thread_close_port_first() {
 1649          // Simple test of closing without sending
 1650          let (_tx, rx) = sync_channel::<int>(0);
 1651          drop(rx);
 1652      })
 1653  
 1654      test!(fn oneshot_single_thread_close_chan_first() {
 1655          // Simple test of closing without sending
 1656          let (tx, _rx) = sync_channel::<int>(0);
 1657          drop(tx);
 1658      })
 1659  
 1660      test!(fn oneshot_single_thread_send_port_close() {
 1661          // Testing that the sender cleans up the payload if receiver is closed
 1662          let (tx, rx) = sync_channel::<Box<int>>(0);
 1663          drop(rx);
 1664          tx.send(box 0);
 1665      } #[should_fail])
 1666  
 1667      test!(fn oneshot_single_thread_recv_chan_close() {
 1668          // Receiving on a closed chan will fail
 1669          let res = task::try(proc() {
 1670              let (tx, rx) = sync_channel::<int>(0);
 1671              drop(tx);
 1672              rx.recv();
 1673          });
 1674          // What is our res?
 1675          assert!(res.is_err());
 1676      })
 1677  
 1678      test!(fn oneshot_single_thread_send_then_recv() {
 1679          let (tx, rx) = sync_channel::<Box<int>>(1);
 1680          tx.send(box 10);
 1681          assert!(rx.recv() == box 10);
 1682      })
 1683  
 1684      test!(fn oneshot_single_thread_try_send_open() {
 1685          let (tx, rx) = sync_channel::<int>(1);
 1686          assert_eq!(tx.try_send(10), Ok(()));
 1687          assert!(rx.recv() == 10);
 1688      })
 1689  
 1690      test!(fn oneshot_single_thread_try_send_closed() {
 1691          let (tx, rx) = sync_channel::<int>(0);
 1692          drop(rx);
 1693          assert_eq!(tx.try_send(10), Err(RecvDisconnected(10)));
 1694      })
 1695  
 1696      test!(fn oneshot_single_thread_try_send_closed2() {
 1697          let (tx, _rx) = sync_channel::<int>(0);
 1698          assert_eq!(tx.try_send(10), Err(Full(10)));
 1699      })
 1700  
 1701      test!(fn oneshot_single_thread_try_recv_open() {
 1702          let (tx, rx) = sync_channel::<int>(1);
 1703          tx.send(10);
 1704          assert!(rx.recv_opt() == Ok(10));
 1705      })
 1706  
 1707      test!(fn oneshot_single_thread_try_recv_closed() {
 1708          let (tx, rx) = sync_channel::<int>(0);
 1709          drop(tx);
 1710          assert!(rx.recv_opt() == Err(()));
 1711      })
 1712  
 1713      test!(fn oneshot_single_thread_peek_data() {
 1714          let (tx, rx) = sync_channel::<int>(1);
 1715          assert_eq!(rx.try_recv(), Err(Empty))
 1716          tx.send(10);
 1717          assert_eq!(rx.try_recv(), Ok(10));
 1718      })
 1719  
 1720      test!(fn oneshot_single_thread_peek_close() {
 1721          let (tx, rx) = sync_channel::<int>(0);
 1722          drop(tx);
 1723          assert_eq!(rx.try_recv(), Err(Disconnected));
 1724          assert_eq!(rx.try_recv(), Err(Disconnected));
 1725      })
 1726  
 1727      test!(fn oneshot_single_thread_peek_open() {
 1728          let (_tx, rx) = sync_channel::<int>(0);
 1729          assert_eq!(rx.try_recv(), Err(Empty));
 1730      })
 1731  
 1732      test!(fn oneshot_multi_task_recv_then_send() {
 1733          let (tx, rx) = sync_channel::<Box<int>>(0);
 1734          spawn(proc() {
 1735              assert!(rx.recv() == box 10);
 1736          });
 1737  
 1738          tx.send(box 10);
 1739      })
 1740  
 1741      test!(fn oneshot_multi_task_recv_then_close() {
 1742          let (tx, rx) = sync_channel::<Box<int>>(0);
 1743          spawn(proc() {
 1744              drop(tx);
 1745          });
 1746          let res = task::try(proc() {
 1747              assert!(rx.recv() == box 10);
 1748          });
 1749          assert!(res.is_err());
 1750      })
 1751  
 1752      test!(fn oneshot_multi_thread_close_stress() {
 1753          for _ in range(0, stress_factor()) {
 1754              let (tx, rx) = sync_channel::<int>(0);
 1755              spawn(proc() {
 1756                  drop(rx);
 1757              });
 1758              drop(tx);
 1759          }
 1760      })
 1761  
 1762      test!(fn oneshot_multi_thread_send_close_stress() {
 1763          for _ in range(0, stress_factor()) {
 1764              let (tx, rx) = sync_channel::<int>(0);
 1765              spawn(proc() {
 1766                  drop(rx);
 1767              });
 1768              let _ = task::try(proc() {
 1769                  tx.send(1);
 1770              });
 1771          }
 1772      })
 1773  
 1774      test!(fn oneshot_multi_thread_recv_close_stress() {
 1775          for _ in range(0, stress_factor()) {
 1776              let (tx, rx) = sync_channel::<int>(0);
 1777              spawn(proc() {
 1778                  let res = task::try(proc() {
 1779                      rx.recv();
 1780                  });
 1781                  assert!(res.is_err());
 1782              });
 1783              spawn(proc() {
 1784                  spawn(proc() {
 1785                      drop(tx);
 1786                  });
 1787              });
 1788          }
 1789      })
 1790  
 1791      test!(fn oneshot_multi_thread_send_recv_stress() {
 1792          for _ in range(0, stress_factor()) {
 1793              let (tx, rx) = sync_channel(0);
 1794              spawn(proc() {
 1795                  tx.send(box 10);
 1796              });
 1797              spawn(proc() {
 1798                  assert!(rx.recv() == box 10);
 1799              });
 1800          }
 1801      })
 1802  
 1803      test!(fn stream_send_recv_stress() {
 1804          for _ in range(0, stress_factor()) {
 1805              let (tx, rx) = sync_channel(0);
 1806  
 1807              send(tx, 0);
 1808              recv(rx, 0);
 1809  
 1810              fn send(tx: SyncSender<Box<int>>, i: int) {
 1811                  if i == 10 { return }
 1812  
 1813                  spawn(proc() {
 1814                      tx.send(box i);
 1815                      send(tx, i + 1);
 1816                  });
 1817              }
 1818  
 1819              fn recv(rx: Receiver<Box<int>>, i: int) {
 1820                  if i == 10 { return }
 1821  
 1822                  spawn(proc() {
 1823                      assert!(rx.recv() == box i);
 1824                      recv(rx, i + 1);
 1825                  });
 1826              }
 1827          }
 1828      })
 1829  
 1830      test!(fn recv_a_lot() {
 1831          // Regression test that we don't run out of stack in scheduler context
 1832          let (tx, rx) = sync_channel(10000);
 1833          for _ in range(0, 10000) { tx.send(()); }
 1834          for _ in range(0, 10000) { rx.recv(); }
 1835      })
 1836  
 1837      test!(fn shared_chan_stress() {
 1838          let (tx, rx) = sync_channel(0);
 1839          let total = stress_factor() + 100;
 1840          for _ in range(0, total) {
 1841              let tx = tx.clone();
 1842              spawn(proc() {
 1843                  tx.send(());
 1844              });
 1845          }
 1846  
 1847          for _ in range(0, total) {
 1848              rx.recv();
 1849          }
 1850      })
 1851  
 1852      test!(fn test_nested_recv_iter() {
 1853          let (tx, rx) = sync_channel::<int>(0);
 1854          let (total_tx, total_rx) = sync_channel::<int>(0);
 1855  
 1856          spawn(proc() {
 1857              let mut acc = 0;
 1858              for x in rx.iter() {
 1859                  acc += x;
 1860              }
 1861              total_tx.send(acc);
 1862          });
 1863  
 1864          tx.send(3);
 1865          tx.send(1);
 1866          tx.send(2);
 1867          drop(tx);
 1868          assert_eq!(total_rx.recv(), 6);
 1869      })
 1870  
 1871      test!(fn test_recv_iter_break() {
 1872          let (tx, rx) = sync_channel::<int>(0);
 1873          let (count_tx, count_rx) = sync_channel(0);
 1874  
 1875          spawn(proc() {
 1876              let mut count = 0;
 1877              for x in rx.iter() {
 1878                  if count >= 3 {
 1879                      break;
 1880                  } else {
 1881                      count += x;
 1882                  }
 1883              }
 1884              count_tx.send(count);
 1885          });
 1886  
 1887          tx.send(2);
 1888          tx.send(2);
 1889          tx.send(2);
 1890          let _ = tx.try_send(2);
 1891          drop(tx);
 1892          assert_eq!(count_rx.recv(), 4);
 1893      })
 1894  
 1895      test!(fn try_recv_states() {
 1896          let (tx1, rx1) = sync_channel::<int>(1);
 1897          let (tx2, rx2) = sync_channel::<()>(1);
 1898          let (tx3, rx3) = sync_channel::<()>(1);
 1899          spawn(proc() {
 1900              rx2.recv();
 1901              tx1.send(1);
 1902              tx3.send(());
 1903              rx2.recv();
 1904              drop(tx1);
 1905              tx3.send(());
 1906          });
 1907  
 1908          assert_eq!(rx1.try_recv(), Err(Empty));
 1909          tx2.send(());
 1910          rx3.recv();
 1911          assert_eq!(rx1.try_recv(), Ok(1));
 1912          assert_eq!(rx1.try_recv(), Err(Empty));
 1913          tx2.send(());
 1914          rx3.recv();
 1915          assert_eq!(rx1.try_recv(), Err(Disconnected));
 1916      })
 1917  
 1918      // This bug used to end up in a livelock inside of the Receiver destructor
 1919      // because the internal state of the Shared packet was corrupted
 1920      test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
 1921          let (tx, rx) = sync_channel(0);
 1922          let (tx2, rx2) = sync_channel(0);
 1923          spawn(proc() {
 1924              rx.recv(); // wait on a oneshot
 1925              drop(rx);  // destroy a shared
 1926              tx2.send(());
 1927          });
 1928          // make sure the other task has gone to sleep
 1929          for _ in range(0, 5000) { task::deschedule(); }
 1930  
 1931          // upgrade to a shared chan and send a message
 1932          let t = tx.clone();
 1933          drop(tx);
 1934          t.send(());
 1935  
 1936          // wait for the child task to exit before we exit
 1937          rx2.recv();
 1938      })
 1939  
 1940      test!(fn try_recvs_off_the_runtime() {
 1941          use std::rt::thread::Thread;
 1942  
 1943          let (tx, rx) = sync_channel(0);
 1944          let (cdone, pdone) = channel();
 1945          let t = Thread::start(proc() {
 1946              let mut hits = 0;
 1947              while hits < 10 {
 1948                  match rx.try_recv() {
 1949                      Ok(()) => { hits += 1; }
 1950                      Err(Empty) => { Thread::yield_now(); }
 1951                      Err(Disconnected) => return,
 1952                  }
 1953              }
 1954              cdone.send(());
 1955          });
 1956          for _ in range(0, 10) {
 1957              tx.send(());
 1958          }
 1959          t.join();
 1960          pdone.recv();
 1961      })
 1962  
 1963      test!(fn send_opt1() {
 1964          let (tx, rx) = sync_channel(0);
 1965          spawn(proc() { rx.recv(); });
 1966          assert_eq!(tx.send_opt(1), Ok(()));
 1967      })
 1968  
 1969      test!(fn send_opt2() {
 1970          let (tx, rx) = sync_channel(0);
 1971          spawn(proc() { drop(rx); });
 1972          assert_eq!(tx.send_opt(1), Err(1));
 1973      })
 1974  
 1975      test!(fn send_opt3() {
 1976          let (tx, rx) = sync_channel(1);
 1977          assert_eq!(tx.send_opt(1), Ok(()));
 1978          spawn(proc() { drop(rx); });
 1979          assert_eq!(tx.send_opt(1), Err(1));
 1980      })
 1981  
 1982      test!(fn send_opt4() {
 1983          let (tx, rx) = sync_channel(0);
 1984          let tx2 = tx.clone();
 1985          let (done, donerx) = channel();
 1986          let done2 = done.clone();
 1987          spawn(proc() {
 1988              assert_eq!(tx.send_opt(1), Err(1));
 1989              done.send(());
 1990          });
 1991          spawn(proc() {
 1992              assert_eq!(tx2.send_opt(2), Err(2));
 1993              done2.send(());
 1994          });
 1995          drop(rx);
 1996          donerx.recv();
 1997          donerx.recv();
 1998      })
 1999  
 2000      test!(fn try_send1() {
 2001          let (tx, _rx) = sync_channel(0);
 2002          assert_eq!(tx.try_send(1), Err(Full(1)));
 2003      })
 2004  
 2005      test!(fn try_send2() {
 2006          let (tx, _rx) = sync_channel(1);
 2007          assert_eq!(tx.try_send(1), Ok(()));
 2008          assert_eq!(tx.try_send(1), Err(Full(1)));
 2009      })
 2010  
 2011      test!(fn try_send3() {
 2012          let (tx, rx) = sync_channel(1);
 2013          assert_eq!(tx.try_send(1), Ok(()));
 2014          drop(rx);
 2015          assert_eq!(tx.try_send(1), Err(RecvDisconnected(1)));
 2016      })
 2017  
 2018      test!(fn try_send4() {
 2019          let (tx, rx) = sync_channel(0);
 2020          spawn(proc() {
 2021              for _ in range(0, 1000) { task::deschedule(); }
 2022              assert_eq!(tx.try_send(1), Ok(()));
 2023          });
 2024          assert_eq!(rx.recv(), 1);
 2025      } #[ignore(reason = "flaky on libnative")])
 2026  }


libstd/comm/mod.rs:374:29-374:29 -enum- definition:
pub enum TrySendError<T> {
    /// The data could not be sent on the channel because it would require that
    /// the callee block to send the data.
references:- 8
373: /// `SyncSender::try_send` method.
375: pub enum TrySendError<T> {
--
705:     /// This function cannot fail
706:     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
707:         unsafe { (*self.inner.get()).try_send(t) }
libstd/comm/sync.rs:
205:     pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
206:         let (guard, state) = self.lock();
libstd/comm/mod.rs:
373: /// `SyncSender::try_send` method.
375: pub enum TrySendError<T> {


libstd/comm/mod.rs:436:8-436:8 -fn- definition:
/// ```
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
    let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
references:- 4
libstd/io/process.rs:
392:         fn read(stream: Option<io::PipeStream>) -> Receiver<IoResult<Vec<u8>>> {
393:             let (tx, rx) = channel();
394:             match stream {
libstd/io/signal.rs:
99:     pub fn new() -> Listener {
100:         let (tx, rx) = channel();
101:         Listener {
libstd/task.rs:
196:                -> Result<T, Box<Any:Send>> {
197:         let (tx, rx) = channel();


libstd/comm/mod.rs:395:15-395:15 -trait- definition:
trait UnsafeFlavor<T> {
    fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>>;
    unsafe fn mut_inner<'a>(&'a self) -> &'a mut Flavor<T> {
references:- 2
404: }
405: impl<T> UnsafeFlavor<T> for Sender<T> {
406:     fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
--
409: }
410: impl<T> UnsafeFlavor<T> for Receiver<T> {
411:     fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {


libstd/comm/mod.rs:362:29-362:29 -enum- definition:
pub enum TryRecvError {
    /// This channel is currently empty, but the sender(s) have not yet
    /// disconnected, so data may yet become available.
references:- 7
769:     /// This function cannot fail.
770:     pub fn try_recv(&self) -> Result<T, TryRecvError> {
771:         // If a thread is spinning in try_recv, we should take the opportunity


libstd/comm/mod.rs:387:1-387:1 -enum- definition:
enum Flavor<T> {
    Oneshot(UnsafeArc<oneshot::Packet<T>>),
    Stream(UnsafeArc<stream::Packet<T>>),
references:- 9
405: impl<T> UnsafeFlavor<T> for Sender<T> {
406:     fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
407:         &self.inner
--
410: impl<T> UnsafeFlavor<T> for Receiver<T> {
411:     fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
412:         &self.inner
--
482: impl<T: Send> Sender<T> {
483:     fn new(inner: Flavor<T>) -> Sender<T> {
484:         Sender { inner: Unsafe::new(inner), sends: Cell::new(0), marker: marker::NoShare }
--
729: impl<T: Send> Receiver<T> {
730:     fn new(inner: Flavor<T>) -> Receiver<T> {
731:         Receiver { inner: Unsafe::new(inner), receives: Cell::new(0), marker: marker::NoShare }


libstd/comm/mod.rs:338:57-338:57 -struct- definition:
/// returned when the corresponding channel has hung up.
pub struct Messages<'a, T> {
    rx: &'a Receiver<T>
references:- 3
870:     pub fn iter<'a>(&'a self) -> Messages<'a, T> {
871:         Messages { rx: self }
872:     }
--
962: impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
963:     fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }


libstd/comm/mod.rs:353:68-353:68 -struct- definition:
/// owned by one task, but it can be cloned to send to other tasks.
pub struct SyncSender<T> {
    inner: UnsafeArc<sync::Packet<T>>,
references:- 7
648:     fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
649:         SyncSender { inner: inner, marker: marker::NoShare }
650:     }
--
719: impl<T: Send> Drop for SyncSender<T> {
720:     fn drop(&mut self) {


libstd/comm/mod.rs:328:13-328:13 -struct- definition:
/// one task
pub struct Receiver<T> {
    inner: Unsafe<Flavor<T>>,
references:- 32
libstd/comm/select.rs:
libstd/comm/oneshot.rs:
libstd/comm/stream.rs:
libstd/io/comm_adapters.rs:
libstd/io/process.rs:
libstd/io/signal.rs:
libstd/io/timer.rs:
libstd/rt/rtio.rs:
libstd/task.rs:
libstd/comm/oneshot.rs:


libstd/comm/mod.rs:344:68-344:68 -struct- definition:
/// owned by one task, but it can be cloned to send to other tasks.
pub struct Sender<T> {
    inner: Unsafe<Flavor<T>>,
references:- 14
483:     fn new(inner: Flavor<T>) -> Sender<T> {
484:         Sender { inner: Unsafe::new(inner), sends: Cell::new(0), marker: marker::NoShare }
485:     }
--
597: impl<T: Send> Clone for Sender<T> {
598:     fn clone(&self) -> Sender<T> {
--
632: impl<T: Send> Drop for Sender<T> {
633:     fn drop(&mut self) {
libstd/io/comm_adapters.rs:
106:     /// Wraps a channel in a `ChanWriter` structure
107:     pub fn new(tx: Sender<~[u8]>) -> ChanWriter {
108:         ChanWriter { tx: tx }
libstd/io/signal.rs:
87:     /// the clients from the receiver.
88:     tx: Sender<Signum>,
libstd/rt/task.rs:
76:     /// A channel to send the result of the task on when the task exits
77:     SendMessage(Sender<TaskResult>),
78: }
libstd/rt/rtio.rs:
198:             -> IoResult<Box<RtioTTY:Send>>;
199:     fn signal(&mut self, signal: Signum, channel: Sender<Signum>)
200:         -> IoResult<Box<RtioSignal:Send>>;
libstd/task.rs:
65:     /// Enable lifecycle notifications on the given channel
66:     pub notify_chan: Option<Sender<TaskResult>>,
67:     /// A name for the task-to-be, for identification in failure messages
libstd/comm/mod.rs:
482: impl<T: Send> Sender<T> {
483:     fn new(inner: Flavor<T>) -> Sender<T> {
484:         Sender { inner: Unsafe::new(inner), sends: Cell::new(0), marker: marker::NoShare }