(index<- )        ./libstd/rt/comm.rs

    1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
    2  // file at the top-level directory of this distribution and at
    3  // http://rust-lang.org/COPYRIGHT.
    4  //
    5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
    6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
    7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
    8  // option. This file may not be copied, modified, or distributed
    9  // except according to those terms.
   10  
   11  //! Ports and channels.
   12  
   13  use option::*;
   14  use cast;
   15  use ops::Drop;
   16  use rt::kill::BlockedTask;
   17  use kinds::Send;
   18  use rt;
   19  use rt::sched::Scheduler;
   20  use rt::local::Local;
   21  use rt::select::{SelectInner, SelectPortInner};
   22  use select::{Select, SelectPort};
   23  use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
   24  use unstable::sync::UnsafeArc;
   25  use util::Void;
   26  use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
   27  use cell::Cell;
   28  use clone::Clone;
   29  use tuple::ImmutableTuple;
   30  
   31  /// A combined refcount / BlockedTask-as-uint pointer.
   32  ///
   33  /// Can be equal to the following values:
   34  ///
   35  /// * 2 - both endpoints are alive
   36  /// * 1 - either the sender or the receiver is dead, determined by context
   37  /// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
   38  type State = uint;
   39  
   40  static STATE_BOTH: State = 2;
   41  static STATE_ONE: State = 1;
   42  
   43  /// The heap-allocated structure shared between two endpoints.
   44  struct Packet<T> {
   45      state: AtomicUint,
   46      payload: Option<T>,
   47  }
   48  
   49  // A one-shot channel.
   50  pub struct ChanOne<T> {
   51      void_packet: *mut Void,
   52      suppress_finalize: bool
   53  }
   54  
   55  /// A one-shot port.
   56  pub struct PortOne<T> {
   57      void_packet: *mut Void,
   58      suppress_finalize: bool
   59  }
   60  
   61  pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
   62      let packet~Packet<T> = ~Packet {
   63          state: AtomicUint::new(STATE_BOTH),
   64          payload: None
   65      };
   66  
   67      unsafe {
   68          let packet*mut Void = cast::transmute(packet);
   69          let port = PortOne {
   70              void_packet: packet,
   71              suppress_finalize: false
   72          };
   73          let chan = ChanOne {
   74              void_packet: packet,
   75              suppress_finalize: false
   76          };
   77          return (port, chan);
   78      }
   79  }
   80  
   81  impl<T> ChanOne<T> {
   82      #[inline]
   83      fn packet(&self) -> *mut Packet<T> {
   84          unsafe {
   85              let p*mut ~Packet<T> = cast::transmute(&self.void_packet);
   86              let p*mut Packet<T> = &mut **p;
   87              return p;
   88          }
   89      }
   90  
   91      /// Send a message on the one-shot channel. If a receiver task is blocked
   92      /// waiting for the message, will wake it up and reschedule to it.
   93      pub fn send(self, valT) {
   94          self.try_send(val);
   95      }
   96  
   97      /// As `send`, but also returns whether or not the receiver endpoint is still open.
   98      pub fn try_send(self, valT) -> bool {
   99          self.try_send_inner(val, true)
  100      }
  101  
  102      /// Send a message without immediately rescheduling to a blocked receiver.
  103      /// This can be useful in contexts where rescheduling is forbidden, or to
  104      /// optimize for when the sender expects to still have useful work to do.
  105      pub fn send_deferred(self, valT) {
  106          self.try_send_deferred(val);
  107      }
  108  
  109      /// As `send_deferred` and `try_send` together.
  110      pub fn try_send_deferred(self, valT) -> bool {
  111          self.try_send_inner(val, false)
  112      }
  113  
  114      // 'do_resched' configures whether the scheduler immediately switches to
  115      // the receiving task, or leaves the sending task still running.
  116      fn try_send_inner(self, valT, do_reschedbool) -> bool {
  117          if do_resched {
  118              rtassert!(!rt::in_sched_context());
  119          }
  120  
  121          // In order to prevent starvation of other tasks in situations
  122          // where a task sends repeatedly without ever receiving, we
  123          // occassionally yield instead of doing a send immediately.
  124          // Only doing this if we're doing a rescheduling send,
  125          // otherwise the caller is expecting not to context switch.
  126          if do_resched {
  127              // XXX: This TLS hit should be combined with other uses of the scheduler below
  128              let sched~Scheduler = Local::take();
  129              sched.maybe_yield();
  130          }
  131  
  132          let mut this = self;
  133          let mut recvr_active = true;
  134          let packet = this.packet();
  135  
  136          unsafe {
  137  
  138              // Install the payload
  139              rtassert!((*packet).payload.is_none());
  140              (*packet).payload = Some(val);
  141  
  142              // Atomically swap out the old state to figure out what
  143              // the port's up to, issuing a release barrier to prevent
  144              // reordering of the payload write. This also issues an
  145              // acquire barrier that keeps the subsequent access of the
  146              // ~Task pointer from being reordered.
  147              let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
  148  
  149              // Suppress the synchronizing actions in the finalizer. We're
  150              // done with the packet. NB: In case of do_resched, this *must*
  151              // happen before waking up a blocked task (or be unkillable),
  152              // because we might get a kill signal during the reschedule.
  153              this.suppress_finalize = true;
  154  
  155              match oldstate {
  156                  STATE_BOTH => {
  157                      // Port is not waiting yet. Nothing to do
  158                  }
  159                  STATE_ONE => {
  160                      // Port has closed. Need to clean up.
  161                      let _packet~Packet<T> = cast::transmute(this.void_packet);
  162                      recvr_active = false;
  163                  }
  164                  task_as_state => {
  165                      // Port is blocked. Wake it up.
  166                      let recvr = BlockedTask::cast_from_uint(task_as_state);
  167                      if do_resched {
  168                          do recvr.wake().map |woken_task| {
  169                              Scheduler::run_task(woken_task);
  170                          };
  171                      } else {
  172                          let recvr = Cell::new(recvr);
  173                          do Local::borrow |sched&mut Scheduler{
  174                              sched.enqueue_blocked_task(recvr.take());
  175                          }
  176                      }
  177                  }
  178              }
  179          }
  180  
  181          return recvr_active;
  182      }
  183  }
  184  
  185  impl<T> PortOne<T> {
  186      fn packet(&self) -> *mut Packet<T> {
  187          unsafe {
  188              let p*mut ~Packet<T> = cast::transmute(&self.void_packet);
  189              let p*mut Packet<T> = &mut **p;
  190              return p;
  191          }
  192      }
  193  
  194      /// Wait for a message on the one-shot port. Fails if the send end is closed.
  195      pub fn recv(self) -> T {
  196          match self.try_recv() {
  197              Some(val) => val,
  198              None => {
  199                  fail2!("receiving on closed channel");
  200              }
  201          }
  202      }
  203  
  204      /// As `recv`, but returns `None` if the send end is closed rather than failing.
  205      pub fn try_recv(self) -> Option<T> {
  206          let mut this = self;
  207  
  208          // Optimistic check. If data was sent already, we don't even need to block.
  209          // No release barrier needed here; we're not handing off our task pointer yet.
  210          if !this.optimistic_check() {
  211              // No data available yet.
  212              // Switch to the scheduler to put the ~Task into the Packet state.
  213              let sched~Scheduler = Local::take();
  214              do sched.deschedule_running_task_and_then |sched, task| {
  215                  this.block_on(sched, task);
  216              }
  217          }
  218  
  219          // Task resumes.
  220          this.recv_ready()
  221      }
  222  }
  223  
  224  impl<T> SelectInner for PortOne<T> {
  225      #[inline] #[cfg(not(test))]
  226      fn optimistic_check(&mut self) -> bool {
  227          unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
  228      }
  229  
  230      #[inline] #[cfg(test)]
  231      fn optimistic_check(&mut self) -> bool {
  232          // The optimistic check is never necessary for correctness. For testing
  233          // purposes, making it randomly return false simulates a racing sender.
  234          use rand::{Rand};
  235          let actually_check = do Local::borrow |sched: &mut Scheduler| {
  236              Rand::rand(&mut sched.rng)
  237          };
  238          if actually_check {
  239              unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
  240          } else {
  241              false
  242          }
  243      }
  244  
  245      fn block_on(&mut self, sched&mut Scheduler, taskBlockedTask) -> bool {
  246          unsafe {
  247              // Atomically swap the task pointer into the Packet state, issuing
  248              // an acquire barrier to prevent reordering of the subsequent read
  249              // of the payload. Also issues a release barrier to prevent
  250              // reordering of any previous writes to the task structure.
  251              let task_as_state = task.cast_to_uint();
  252              let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
  253              match oldstate {
  254                  STATE_BOTH => {
  255                      // Data has not been sent. Now we're blocked.
  256                      rtdebug!("non-rendezvous recv");
  257                      false
  258                  }
  259                  STATE_ONE => {
  260                      // Re-record that we are the only owner of the packet.
  261                      // No barrier needed, even if the task gets reawoken
  262                      // on a different core -- this is analogous to writing a
  263                      // payload; a barrier in enqueueing the task protects it.
  264                      // NB(#8132). This *must* occur before the enqueue below.
  265                      // FIXME(#6842, #8130) This is usually only needed for the
  266                      // assertion in recv_ready, except in the case of select().
  267                      // This won't actually ever have cacheline contention, but
  268                      // maybe should be optimized out with a cfg(test) anyway?
  269                      (*self.packet()).state.store(STATE_ONE, Relaxed);
  270  
  271                      rtdebug!("rendezvous recv");
  272  
  273                      // Channel is closed. Switch back and check the data.
  274                      // NB: We have to drop back into the scheduler event loop here
  275                      // instead of switching immediately back or we could end up
  276                      // triggering infinite recursion on the scheduler's stack.
  277                      let recvr = BlockedTask::cast_from_uint(task_as_state);
  278                      sched.enqueue_blocked_task(recvr);
  279                      true
  280                  }
  281                  _ => rtabort!("can't block_on; a task is already blocked")
  282              }
  283          }
  284      }
  285  
  286      // This is the only select trait function that's not also used in recv.
  287      fn unblock_from(&mut self) -> bool {
  288          let packet = self.packet();
  289          unsafe {
  290              // In case the data is available, the acquire barrier here matches
  291              // the release barrier the sender used to release the payload.
  292              match (*packet).state.load(Acquire) {
  293                  // Impossible. We removed STATE_BOTH when blocking on it, and
  294                  // no self-respecting sender would put it back.
  295                  STATE_BOTH    => rtabort!("refcount already 2 in unblock_from"),
  296                  // Here, a sender already tried to wake us up. Perhaps they
  297                  // even succeeded! Data is available.
  298                  STATE_ONE     => true,
  299                  // Still registered as blocked. Need to "unblock" the pointer.
  300                  task_as_state => {
  301                      // In the window between the load and the CAS, a sender
  302                      // might take the pointer and set the refcount to ONE. If
  303                      // that happens, we shouldn't clobber that with BOTH!
  304                      // Acquire barrier again for the same reason as above.
  305                      match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
  306                                                             Acquire) {
  307                          STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
  308                          STATE_ONE  => true, // Lost the race. Data available.
  309                          same_ptr   => {
  310                              // We successfully unblocked our task pointer.
  311                              rtassert!(task_as_state == same_ptr);
  312                              let handle = BlockedTask::cast_from_uint(task_as_state);
  313                              // Because we are already awake, the handle we
  314                              // gave to this port shall already be empty.
  315                              handle.assert_already_awake();
  316                              false
  317                          }
  318                      }
  319                  }
  320              }
  321          }
  322      }
  323  }
  324  
  325  impl<T> Select for PortOne<T> { }
  326  
  327  impl<T> SelectPortInner<T> for PortOne<T> {
  328      fn recv_ready(self) -> Option<T> {
  329          let mut this = self;
  330          let packet = this.packet();
  331  
  332          // No further memory barrier is needed here to access the
  333          // payload. Some scenarios:
  334          //
  335          // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
  336          // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
  337          //    and ran on its thread. The sending task issued a read barrier when taking the
  338          //    pointer to the receiving task.
  339          // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
  340          //    is pinned to some other scheduler, so the sending task had to give us to
  341          //    a different scheduler for resuming. That send synchronized memory.
  342          unsafe {
  343              // See corresponding store() above in block_on for rationale.
  344              // FIXME(#8130) This can happen only in test builds.
  345              // This load is not required for correctness and may be compiled out.
  346              rtassert!((*packet).state.load(Relaxed) == STATE_ONE);
  347  
  348              let payload = (*packet).payload.take();
  349  
  350              // The sender has closed up shop. Drop the packet.
  351              let _packet~Packet<T> = cast::transmute(this.void_packet);
  352              // Suppress the synchronizing actions in the finalizer. We're done with the packet.
  353              this.suppress_finalize = true;
  354              return payload;
  355          }
  356      }
  357  }
  358  
  359  impl<T> SelectPort<T> for PortOne<T> { }
  360  
  361  impl<T> Peekable<T> for PortOne<T> {
  362      fn peek(&self) -> bool {
  363          unsafe {
  364              let packet*mut Packet<T> = self.packet();
  365              let oldstate = (*packet).state.load(SeqCst);
  366              match oldstate {
  367                  STATE_BOTH => false,
  368                  STATE_ONE => (*packet).payload.is_some(),
  369                  _ => rtabort!("peeked on a blocked task")
  370              }
  371          }
  372      }
  373  }
  374  
  375  #[unsafe_destructor]
  376  impl<T> Drop for ChanOne<T> {
  377      fn drop(&mut self) {
  378          if self.suppress_finalize { return }
  379  
  380          unsafe {
  381              let this = cast::transmute_mut(self);
  382              let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
  383              match oldstate {
  384                  STATE_BOTH => {
  385                      // Port still active. It will destroy the Packet.
  386                  },
  387                  STATE_ONE => {
  388                      let _packet~Packet<T> = cast::transmute(this.void_packet);
  389                  },
  390                  task_as_state => {
  391                      // The port is blocked waiting for a message we will never send. Wake it.
  392                      rtassert!((*this.packet()).payload.is_none());
  393                      let recvr = BlockedTask::cast_from_uint(task_as_state);
  394                      do recvr.wake().map |woken_task| {
  395                          Scheduler::run_task(woken_task);
  396                      };
  397                  }
  398              }
  399          }
  400      }
  401  }
  402  
  403  #[unsafe_destructor]
  404  impl<T> Drop for PortOne<T> {
  405      fn drop(&mut self) {
  406          if self.suppress_finalize { return }
  407  
  408          unsafe {
  409              let this = cast::transmute_mut(self);
  410              let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
  411              match oldstate {
  412                  STATE_BOTH => {
  413                      // Chan still active. It will destroy the packet.
  414                  },
  415                  STATE_ONE => {
  416                      let _packet~Packet<T> = cast::transmute(this.void_packet);
  417                  }
  418                  task_as_state => {
  419                      // This case occurs during unwinding, when the blocked
  420                      // receiver was killed awake. The task can't still be
  421                      // blocked (we are it), but we need to free the handle.
  422                      let recvr = BlockedTask::cast_from_uint(task_as_state);
  423                      recvr.assert_already_awake();
  424                  }
  425              }
  426          }
  427      }
  428  }
  429  
  430  /// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
  431  pub trait SendDeferred<T> {
  432      fn send_deferred(&self, val: T);
  433      fn try_send_deferred(&self, val: T) -> bool;
  434  }
  435  
  436  struct StreamPayload<T> {
  437      val: T,
  438      next: PortOne<StreamPayload<T>>
  439  }
  440  
  441  type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
  442  type StreamPortOne<T> = PortOne<StreamPayload<T>>;
  443  
  444  /// A channel with unbounded size.
  445  pub struct Chan<T> {
  446      // FIXME #5372. Using Cell because we don't take &mut self
  447      next: Cell<StreamChanOne<T>>
  448  }
  449  
  450  /// An port with unbounded size.
  451  pub struct Port<T> {
  452      // FIXME #5372. Using Cell because we don't take &mut self
  453      next: Cell<StreamPortOne<T>>
  454  }
  455  
  456  pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
  457      let (pone, cone) = oneshot();
  458      let port = Port { next: Cell::new(pone) };
  459      let chan = Chan { next: Cell::new(cone) };
  460      return (port, chan);
  461  }
  462  
  463  impl<T: Send> Chan<T> {
  464      fn try_send_inner(&self, valT, do_reschedbool) -> bool {
  465          let (next_pone, next_cone) = oneshot();
  466          let cone = self.next.take();
  467          self.next.put_back(next_cone);
  468          cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
  469      }
  470  }
  471  
  472  impl<T: Send> GenericChan<T> for Chan<T> {
  473      fn send(&self, valT) {
  474          self.try_send(val);
  475      }
  476  }
  477  
  478  impl<T: Send> GenericSmartChan<T> for Chan<T> {
  479      fn try_send(&self, valT) -> bool {
  480          self.try_send_inner(val, true)
  481      }
  482  }
  483  
  484  impl<T: Send> SendDeferred<T> for Chan<T> {
  485      fn send_deferred(&self, valT) {
  486          self.try_send_deferred(val);
  487      }
  488      fn try_send_deferred(&self, valT) -> bool {
  489          self.try_send_inner(val, false)
  490      }
  491  }
  492  
  493  impl<T> GenericPort<T> for Port<T> {
  494      fn recv(&self) -> T {
  495          match self.try_recv() {
  496              Some(val) => val,
  497              None => {
  498                  fail2!("receiving on closed channel");
  499              }
  500          }
  501      }
  502  
  503      fn try_recv(&self) -> Option<T> {
  504          do self.next.take_opt().map_default(None) |pone| {
  505              match pone.try_recv() {
  506                  Some(StreamPayload { val, next }) => {
  507                      self.next.put_back(next);
  508                      Some(val)
  509                  }
  510                  None => None
  511              }
  512          }
  513      }
  514  }
  515  
  516  impl<T> Peekable<T> for Port<T> {
  517      fn peek(&self) -> bool {
  518          self.next.with_mut_ref(|p| p.peek())
  519      }
  520  }
  521  
  522  // XXX: Kind of gross. A Port<T> should be selectable so you can make an array
  523  // of them, but a &Port<T> should also be selectable so you can select2 on it
  524  // alongside a PortOne<U> without passing the port by value in recv_ready.
  525  
  526  impl<'self, T> SelectInner for &'self Port<T> {
  527      #[inline]
  528      fn optimistic_check(&mut self) -> bool {
  529          do self.next.with_mut_ref |pone| { pone.optimistic_check() }
  530      }
  531  
  532      #[inline]
  533      fn block_on(&mut self, sched&mut Scheduler, taskBlockedTask) -> bool {
  534          let task = Cell::new(task);
  535          do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) }
  536      }
  537  
  538      #[inline]
  539      fn unblock_from(&mut self) -> bool {
  540          do self.next.with_mut_ref |pone| { pone.unblock_from() }
  541      }
  542  }
  543  
  544  impl<'self, T> Select for &'self Port<T> { }
  545  
  546  impl<T> SelectInner for Port<T> {
  547      #[inline]
  548      fn optimistic_check(&mut self) -> bool {
  549          (&*self).optimistic_check()
  550      }
  551  
  552      #[inline]
  553      fn block_on(&mut self, sched&mut Scheduler, taskBlockedTask) -> bool {
  554          (&*self).block_on(sched, task)
  555      }
  556  
  557      #[inline]
  558      fn unblock_from(&mut self) -> bool {
  559          (&*self).unblock_from()
  560      }
  561  }
  562  
  563  impl<T> Select for Port<T> { }
  564  
  565  impl<'self, T> SelectPortInner<T> for &'self Port<T> {
  566      fn recv_ready(self) -> Option<T> {
  567          match self.next.take().recv_ready() {
  568              Some(StreamPayload { val, next }) => {
  569                  self.next.put_back(next);
  570                  Some(val)
  571              }
  572              None => None
  573          }
  574      }
  575  }
  576  
  577  impl<'self, T> SelectPort<T> for &'self Port<T> { }
  578  
  579  pub struct SharedChan<T> {
  580      // Just like Chan, but a shared AtomicOption instead of Cell
  581      priv next: UnsafeArc<AtomicOption<StreamChanOne<T>>>
  582  }
  583  
  584  impl<T> SharedChan<T> {
  585      pub fn new(chanChan<T>) -> SharedChan<T> {
  586          let next = chan.next.take();
  587          let next = AtomicOption::new(~next);
  588          SharedChan { next: UnsafeArc::new(next) }
  589      }
  590  }
  591  
  592  impl<T: Send> SharedChan<T> {
  593      fn try_send_inner(&self, valT, do_reschedbool) -> bool {
  594          unsafe {
  595              let (next_pone, next_cone) = oneshot();
  596              let cone = (*self.next.get()).swap(~next_cone, SeqCst);
  597              cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
  598                                           do_resched)
  599          }
  600      }
  601  }
  602  
  603  impl<T: Send> GenericChan<T> for SharedChan<T> {
  604      fn send(&self, valT) {
  605          self.try_send(val);
  606      }
  607  }
  608  
  609  impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
  610      fn try_send(&self, valT) -> bool {
  611          self.try_send_inner(val, true)
  612      }
  613  }
  614  
  615  impl<T: Send> SendDeferred<T> for SharedChan<T> {
  616      fn send_deferred(&self, valT) {
  617          self.try_send_deferred(val);
  618      }
  619      fn try_send_deferred(&self, valT) -> bool {
  620          self.try_send_inner(val, false)
  621      }
  622  }
  623  
  624  impl<T> Clone for SharedChan<T> {
  625      fn clone(&self) -> SharedChan<T> {
  626          SharedChan {
  627              next: self.next.clone()
  628          }
  629      }
  630  }
  631  
  632  pub struct SharedPort<T> {
  633      // The next port on which we will receive the next port on which we will receive T
  634      priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>>
  635  }
  636  
  637  impl<T> SharedPort<T> {
  638      pub fn new(portPort<T>) -> SharedPort<T> {
  639          // Put the data port into a new link pipe
  640          let next_data_port = port.next.take();
  641          let (next_link_port, next_link_chan) = oneshot();
  642          next_link_chan.send(next_data_port);
  643          let next_link = AtomicOption::new(~next_link_port);
  644          SharedPort { next_link: UnsafeArc::new(next_link) }
  645      }
  646  }
  647  
  648  impl<T: Send> GenericPort<T> for SharedPort<T> {
  649      fn recv(&self) -> T {
  650          match self.try_recv() {
  651              Some(val) => val,
  652              None => {
  653                  fail2!("receiving on a closed channel");
  654              }
  655          }
  656      }
  657  
  658      fn try_recv(&self) -> Option<T> {
  659          unsafe {
  660              let (next_link_port, next_link_chan) = oneshot();
  661              let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
  662              let link_port = link_port.unwrap();
  663              let data_port = link_port.recv();
  664              let (next_data_port, res) = match data_port.try_recv() {
  665                  Some(StreamPayload { val, next }) => {
  666                      (next, Some(val))
  667                  }
  668                  None => {
  669                      let (next_data_port, _) = oneshot();
  670                      (next_data_port, None)
  671                  }
  672              };
  673              next_link_chan.send(next_data_port);
  674              return res;
  675          }
  676      }
  677  }
  678  
  679  impl<T> Clone for SharedPort<T> {
  680      fn clone(&self) -> SharedPort<T> {
  681          SharedPort {
  682              next_link: self.next_link.clone()
  683          }
  684      }
  685  }
  686  
  687  // FIXME #7760: Need better name
  688  type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
  689  
  690  pub fn megapipe<T: Send>() -> MegaPipe<T> {
  691      let (port, chan) = stream();
  692      (SharedPort::new(port), SharedChan::new(chan))
  693  }
  694  
  695  impl<T: Send> GenericChan<T> for MegaPipe<T> {
  696      fn send(&self, valT) {
  697          self.second_ref().send(val)
  698      }
  699  }
  700  
  701  impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
  702      fn try_send(&self, valT) -> bool {
  703          self.second_ref().try_send(val)
  704      }
  705  }
  706  
  707  impl<T: Send> GenericPort<T> for MegaPipe<T> {
  708      fn recv(&self) -> T {
  709          self.first_ref().recv()
  710      }
  711  
  712      fn try_recv(&self) -> Option<T> {
  713          self.first_ref().try_recv()
  714      }
  715  }
  716  
  717  impl<T: Send> SendDeferred<T> for MegaPipe<T> {
  718      fn send_deferred(&self, valT) {
  719          self.second_ref().send_deferred(val)
  720      }
  721      fn try_send_deferred(&self, valT) -> bool {
  722          self.second_ref().try_send_deferred(val)
  723      }
  724  }
  725  
  726  #[cfg(test)]
  727  mod test {
  728      use super::*;
  729      use option::*;
  730      use rt::test::*;
  731      use cell::Cell;
  732      use num::Times;
  733      use rt::util;
  734  
  735      #[test]
  736      fn oneshot_single_thread_close_port_first() {
  737          // Simple test of closing without sending
  738          do run_in_newsched_task {
  739              let (port, _chan) = oneshot::<int>();
  740              { let _p = port; }
  741          }
  742      }
  743  
  744      #[test]
  745      fn oneshot_single_thread_close_chan_first() {
  746          // Simple test of closing without sending
  747          do run_in_newsched_task {
  748              let (_port, chan) = oneshot::<int>();
  749              { let _c = chan; }
  750          }
  751      }
  752  
  753      #[test]
  754      fn oneshot_single_thread_send_port_close() {
  755          // Testing that the sender cleans up the payload if receiver is closed
  756          do run_in_newsched_task {
  757              let (port, chan) = oneshot::<~int>();
  758              { let _p = port; }
  759              chan.send(~0);
  760          }
  761      }
  762  
  763      #[test]
  764      fn oneshot_single_thread_recv_chan_close() {
  765          // Receiving on a closed chan will fail
  766          do run_in_newsched_task {
  767              let res = do spawntask_try {
  768                  let (port, chan) = oneshot::<~int>();
  769                  { let _c = chan; }
  770                  port.recv();
  771              };
  772              // What is our res?
  773              rtdebug!("res is: {:?}", res.is_err());
  774              assert!(res.is_err());
  775          }
  776      }
  777  
  778      #[test]
  779      fn oneshot_single_thread_send_then_recv() {
  780          do run_in_newsched_task {
  781              let (port, chan) = oneshot::<~int>();
  782              chan.send(~10);
  783              assert!(port.recv() == ~10);
  784          }
  785      }
  786  
  787      #[test]
  788      fn oneshot_single_thread_try_send_open() {
  789          do run_in_newsched_task {
  790              let (port, chan) = oneshot::<int>();
  791              assert!(chan.try_send(10));
  792              assert!(port.recv() == 10);
  793          }
  794      }
  795  
  796      #[test]
  797      fn oneshot_single_thread_try_send_closed() {
  798          do run_in_newsched_task {
  799              let (port, chan) = oneshot::<int>();
  800              { let _p = port; }
  801              assert!(!chan.try_send(10));
  802          }
  803      }
  804  
  805      #[test]
  806      fn oneshot_single_thread_try_recv_open() {
  807          do run_in_newsched_task {
  808              let (port, chan) = oneshot::<int>();
  809              chan.send(10);
  810              assert!(port.try_recv() == Some(10));
  811          }
  812      }
  813  
  814      #[test]
  815      fn oneshot_single_thread_try_recv_closed() {
  816          do run_in_newsched_task {
  817              let (port, chan) = oneshot::<int>();
  818              { let _c = chan; }
  819              assert!(port.try_recv() == None);
  820          }
  821      }
  822  
  823      #[test]
  824      fn oneshot_single_thread_peek_data() {
  825          do run_in_newsched_task {
  826              let (port, chan) = oneshot::<int>();
  827              assert!(!port.peek());
  828              chan.send(10);
  829              assert!(port.peek());
  830          }
  831      }
  832  
  833      #[test]
  834      fn oneshot_single_thread_peek_close() {
  835          do run_in_newsched_task {
  836              let (port, chan) = oneshot::<int>();
  837              { let _c = chan; }
  838              assert!(!port.peek());
  839              assert!(!port.peek());
  840          }
  841      }
  842  
  843      #[test]
  844      fn oneshot_single_thread_peek_open() {
  845          do run_in_newsched_task {
  846              let (port, _) = oneshot::<int>();
  847              assert!(!port.peek());
  848          }
  849      }
  850  
  851      #[test]
  852      fn oneshot_multi_task_recv_then_send() {
  853          do run_in_newsched_task {
  854              let (port, chan) = oneshot::<~int>();
  855              let port_cell = Cell::new(port);
  856              do spawntask {
  857                  assert!(port_cell.take().recv() == ~10);
  858              }
  859  
  860              chan.send(~10);
  861          }
  862      }
  863  
  864      #[test]
  865      fn oneshot_multi_task_recv_then_close() {
  866          do run_in_newsched_task {
  867              let (port, chan) = oneshot::<~int>();
  868              let port_cell = Cell::new(port);
  869              let chan_cell = Cell::new(chan);
  870              do spawntask_later {
  871                  let _cell = chan_cell.take();
  872              }
  873              let res = do spawntask_try {
  874                  assert!(port_cell.take().recv() == ~10);
  875              };
  876              assert!(res.is_err());
  877          }
  878      }
  879  
  880      #[test]
  881      fn oneshot_multi_thread_close_stress() {
  882          if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
  883          do stress_factor().times {
  884              do run_in_newsched_task {
  885                  let (port, chan) = oneshot::<int>();
  886                  let port_cell = Cell::new(port);
  887                  let thread = do spawntask_thread {
  888                      let _p = port_cell.take();
  889                  };
  890                  let _chan = chan;
  891                  thread.join();
  892              }
  893          }
  894      }
  895  
  896      #[test]
  897      fn oneshot_multi_thread_send_close_stress() {
  898          if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
  899          do stress_factor().times {
  900              do run_in_newsched_task {
  901                  let (port, chan) = oneshot::<int>();
  902                  let chan_cell = Cell::new(chan);
  903                  let port_cell = Cell::new(port);
  904                  let thread1 = do spawntask_thread {
  905                      let _p = port_cell.take();
  906                  };
  907                  let thread2 = do spawntask_thread {
  908                      let c = chan_cell.take();
  909                      c.send(1);
  910                  };
  911                  thread1.join();
  912                  thread2.join();
  913              }
  914          }
  915      }
  916  
  917      #[test]
  918      fn oneshot_multi_thread_recv_close_stress() {
  919          if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
  920          do stress_factor().times {
  921              do run_in_newsched_task {
  922                  let (port, chan) = oneshot::<int>();
  923                  let chan_cell = Cell::new(chan);
  924                  let port_cell = Cell::new(port);
  925                  let thread1 = do spawntask_thread {
  926                      let port_cell = Cell::new(port_cell.take());
  927                      let res = do spawntask_try {
  928                          port_cell.take().recv();
  929                      };
  930                      assert!(res.is_err());
  931                  };
  932                  let thread2 = do spawntask_thread {
  933                      let chan_cell = Cell::new(chan_cell.take());
  934                      do spawntask {
  935                          chan_cell.take();
  936                      }
  937                  };
  938                  thread1.join();
  939                  thread2.join();
  940              }
  941          }
  942      }
  943  
  944      #[test]
  945      fn oneshot_multi_thread_send_recv_stress() {
  946          if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
  947          do stress_factor().times {
  948              do run_in_newsched_task {
  949                  let (port, chan) = oneshot::<~int>();
  950                  let chan_cell = Cell::new(chan);
  951                  let port_cell = Cell::new(port);
  952                  let thread1 = do spawntask_thread {
  953                      chan_cell.take().send(~10);
  954                  };
  955                  let thread2 = do spawntask_thread {
  956                      assert!(port_cell.take().recv() == ~10);
  957                  };
  958                  thread1.join();
  959                  thread2.join();
  960              }
  961          }
  962      }
  963  
  964      #[test]
  965      fn stream_send_recv_stress() {
  966          if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
  967          do stress_factor().times {
  968              do run_in_mt_newsched_task {
  969                  let (port, chan) = stream::<~int>();
  970  
  971                  send(chan, 0);
  972                  recv(port, 0);
  973  
  974                  fn send(chan: Chan<~int>, i: int) {
  975                      if i == 10 { return }
  976  
  977                      let chan_cell = Cell::new(chan);
  978                      do spawntask_random {
  979                          let chan = chan_cell.take();
  980                          chan.send(~i);
  981                          send(chan, i + 1);
  982                      }
  983                  }
  984  
  985                  fn recv(port: Port<~int>, i: int) {
  986                      if i == 10 { return }
  987  
  988                      let port_cell = Cell::new(port);
  989                      do spawntask_random {
  990                          let port = port_cell.take();
  991                          assert!(port.recv() == ~i);
  992                          recv(port, i + 1);
  993                      };
  994                  }
  995              }
  996          }
  997      }
  998  
  999      #[test]
 1000      fn recv_a_lot() {
 1001          // Regression test that we don't run out of stack in scheduler context
 1002          do run_in_newsched_task {
 1003              let (port, chan) = stream();
 1004              do 10000.times { chan.send(()) }
 1005              do 10000.times { port.recv() }
 1006          }
 1007      }
 1008  
 1009      #[test]
 1010      fn shared_chan_stress() {
 1011          if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
 1012          do run_in_mt_newsched_task {
 1013              let (port, chan) = stream();
 1014              let chan = SharedChan::new(chan);
 1015              let total = stress_factor() + 100;
 1016              do total.times {
 1017                  let chan_clone = chan.clone();
 1018                  do spawntask_random {
 1019                      chan_clone.send(());
 1020                  }
 1021              }
 1022  
 1023              do total.times {
 1024                  port.recv();
 1025              }
 1026          }
 1027      }
 1028  
 1029      #[test]
 1030      fn shared_port_stress() {
 1031          if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
 1032          do run_in_mt_newsched_task {
 1033              let (end_port, end_chan) = stream();
 1034              let (port, chan) = stream();
 1035              let end_chan = SharedChan::new(end_chan);
 1036              let port = SharedPort::new(port);
 1037              let total = stress_factor() + 100;
 1038              do total.times {
 1039                  let end_chan_clone = end_chan.clone();
 1040                  let port_clone = port.clone();
 1041                  do spawntask_random {
 1042                      port_clone.recv();
 1043                      end_chan_clone.send(());
 1044                  }
 1045              }
 1046  
 1047              do total.times {
 1048                  chan.send(());
 1049              }
 1050  
 1051              do total.times {
 1052                  end_port.recv();
 1053              }
 1054          }
 1055      }
 1056  
 1057      #[test]
 1058      fn shared_port_close_simple() {
 1059          do run_in_mt_newsched_task {
 1060              let (port, chan) = stream::<()>();
 1061              let port = SharedPort::new(port);
 1062              { let _chan = chan; }
 1063              assert!(port.try_recv().is_none());
 1064          }
 1065      }
 1066  
 1067      #[test]
 1068      fn shared_port_close() {
 1069          do run_in_mt_newsched_task {
 1070              let (end_port, end_chan) = stream::<bool>();
 1071              let (port, chan) = stream::<()>();
 1072              let end_chan = SharedChan::new(end_chan);
 1073              let port = SharedPort::new(port);
 1074              let chan = SharedChan::new(chan);
 1075              let send_total = 10;
 1076              let recv_total = 20;
 1077              do spawntask_random {
 1078                  do send_total.times {
 1079                      let chan_clone = chan.clone();
 1080                      do spawntask_random {
 1081                          chan_clone.send(());
 1082                      }
 1083                  }
 1084              }
 1085              let end_chan_clone = end_chan.clone();
 1086              do spawntask_random {
 1087                  do recv_total.times {
 1088                      let port_clone = port.clone();
 1089                      let end_chan_clone = end_chan_clone.clone();
 1090                      do spawntask_random {
 1091                          let recvd = port_clone.try_recv().is_some();
 1092                          end_chan_clone.send(recvd);
 1093                      }
 1094                  }
 1095              }
 1096  
 1097              let mut recvd = 0;
 1098              do recv_total.times {
 1099                  recvd += if end_port.recv() { 1 } else { 0 };
 1100              }
 1101  
 1102              assert!(recvd == send_total);
 1103          }
 1104      }
 1105  
 1106      #[test]
 1107      fn megapipe_stress() {
 1108          use rand;
 1109          use rand::Rng;
 1110  
 1111          if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
 1112  
 1113          do run_in_mt_newsched_task {
 1114              let (end_port, end_chan) = stream::<()>();
 1115              let end_chan = SharedChan::new(end_chan);
 1116              let pipe = megapipe();
 1117              let total = stress_factor() + 10;
 1118              let mut rng = rand::rng();
 1119              do total.times {
 1120                  let msgs = rng.gen_integer_range(0u, 10);
 1121                  let pipe_clone = pipe.clone();
 1122                  let end_chan_clone = end_chan.clone();
 1123                  do spawntask_random {
 1124                      do msgs.times {
 1125                          pipe_clone.send(());
 1126                      }
 1127                      do msgs.times {
 1128                          pipe_clone.recv();
 1129                      }
 1130                  }
 1131  
 1132                  end_chan_clone.send(());
 1133              }
 1134  
 1135              do total.times {
 1136                  end_port.recv();
 1137              }
 1138          }
 1139      }
 1140  
 1141      #[test]
 1142      fn send_deferred() {
 1143          use unstable::sync::atomically;
 1144  
 1145          // Tests no-rescheduling of send_deferred on all types of channels.
 1146          do run_in_newsched_task {
 1147              let (pone, cone) = oneshot();
 1148              let (pstream, cstream) = stream();
 1149              let (pshared, cshared) = stream();
 1150              let cshared = SharedChan::new(cshared);
 1151              let mp = megapipe();
 1152  
 1153              let pone = Cell::new(pone);
 1154              do spawntask { pone.take().recv(); }
 1155              let pstream = Cell::new(pstream);
 1156              do spawntask { pstream.take().recv(); }
 1157              let pshared = Cell::new(pshared);
 1158              do spawntask { pshared.take().recv(); }
 1159              let p_mp = Cell::new(mp.clone());
 1160              do spawntask { p_mp.take().recv(); }
 1161  
 1162              let cs = Cell::new((cone, cstream, cshared, mp));
 1163              unsafe {
 1164                  do atomically {
 1165                      let (cone, cstream, cshared, mp) = cs.take();
 1166                      cone.send_deferred(());
 1167                      cstream.send_deferred(());
 1168                      cshared.send_deferred(());
 1169                      mp.send_deferred(());
 1170                  }
 1171              }
 1172          }
 1173      }
 1174  
 1175  }

libstd/rt/comm.rs:60:1-60:1 -fn- definition:

pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
references:-
641:         let (next_link_port, next_link_chan) = oneshot();
595:             let (next_pone, next_cone) = oneshot();
669:                     let (next_data_port, _) = oneshot();
465:         let (next_pone, next_cone) = oneshot();
457:     let (pone, cone) = oneshot();
660:             let (next_link_port, next_link_chan) = oneshot();
libstd/comm.rs:
57:     let (p, c) = rtcomm::oneshot();
libstd/rt/test.rs:
279:     let (port, chan) = oneshot();


libstd/rt/comm.rs:37:81-37:81 -ty- definition:
/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
type State = uint;
references:-
40: static STATE_BOTH: State = 2;
41: static STATE_ONE: State = 1;


libstd/rt/comm.rs:55:21-55:21 -struct- definition:
/// A one-shot port.
pub struct PortOne<T> {
references:-
327: impl<T> SelectPortInner<T> for PortOne<T> {
634:     priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>>
69:         let port = PortOne {
61: pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
359: impl<T> SelectPort<T> for PortOne<T> { }
224: impl<T> SelectInner for PortOne<T> {
361: impl<T> Peekable<T> for PortOne<T> {
404: impl<T> Drop for PortOne<T> {
442: type StreamPortOne<T> = PortOne<StreamPayload<T>>;
438:     next: PortOne<StreamPayload<T>>
325: impl<T> Select for PortOne<T> { }
185: impl<T> PortOne<T> {
libstd/comm.rs:
53: pub struct PortOne<T> { x: rtcomm::PortOne<T> }


libstd/rt/comm.rs:578:1-578:1 -struct- definition:

pub struct SharedChan<T> {
references:-
609: impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
625:     fn clone(&self) -> SharedChan<T> {
624: impl<T> Clone for SharedChan<T> {
615: impl<T: Send> SendDeferred<T> for SharedChan<T> {
592: impl<T: Send> SharedChan<T> {
584: impl<T> SharedChan<T> {
688: type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
585:     pub fn new(chan: Chan<T>) -> SharedChan<T> {
626:         SharedChan {
603: impl<T: Send> GenericChan<T> for SharedChan<T> {
588:         SharedChan { next: UnsafeArc::new(next) }
libstd/comm.rs:
156: pub struct SharedChan<T> { x: rtcomm::SharedChan<T> }


libstd/rt/comm.rs:435:1-435:1 -struct- definition:

struct StreamPayload<T> {
references:-
438:     next: PortOne<StreamPayload<T>>
597:             cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
665:                 Some(StreamPayload { val, next }) => {
468:         cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
506:                 Some(StreamPayload { val, next }) => {
442: type StreamPortOne<T> = PortOne<StreamPayload<T>>;
441: type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
568:             Some(StreamPayload { val, next }) => {


libstd/rt/comm.rs:455:1-455:1 -fn- definition:

pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
references:-
691:     let (port, chan) = stream();
libstd/comm.rs:
65:     let (p, c) = rtcomm::stream();


libstd/rt/comm.rs:43:63-43:63 -struct- definition:
/// The heap-allocated structure shared between two endpoints.
struct Packet<T> {
references:-
364:             let packet: *mut Packet<T> = self.packet();
62:     let packet: ~Packet<T> = ~Packet {
83:     fn packet(&self) -> *mut Packet<T> {
416:                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
188:             let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
161:                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
388:                     let _packet: ~Packet<T> = cast::transmute(this.void_packet);
86:             let p: *mut Packet<T> = &mut **p;
186:     fn packet(&self) -> *mut Packet<T> {
351:             let _packet: ~Packet<T> = cast::transmute(this.void_packet);
62:     let packet: ~Packet<T> = ~Packet {
85:             let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
189:             let p: *mut Packet<T> = &mut **p;


libstd/rt/comm.rs:440:1-440:1 -ty- definition:

type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
references:-
581:     priv next: UnsafeArc<AtomicOption<StreamChanOne<T>>>
447:     next: Cell<StreamChanOne<T>>


libstd/rt/comm.rs:450:33-450:33 -struct- definition:
/// An port with unbounded size.
pub struct Port<T> {
references:-
458:     let port = Port { next: Cell::new(pone) };
456: pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
563: impl<T> Select for Port<T> { }
544: impl<'self, T> Select for &'self Port<T> { }
546: impl<T> SelectInner for Port<T> {
638:     pub fn new(port: Port<T>) -> SharedPort<T> {
493: impl<T> GenericPort<T> for Port<T> {
526: impl<'self, T> SelectInner for &'self Port<T> {
516: impl<T> Peekable<T> for Port<T> {
577: impl<'self, T> SelectPort<T> for &'self Port<T> { }
565: impl<'self, T> SelectPortInner<T> for &'self Port<T> {
libstd/comm.rs:
61: pub struct Port<T> { x: rtcomm::Port<T> }


libstd/rt/comm.rs:49:23-49:23 -struct- definition:
// A one-shot channel.
pub struct ChanOne<T> {
references:-
441: type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
73:         let chan = ChanOne {
61: pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
376: impl<T> Drop for ChanOne<T> {
81: impl<T> ChanOne<T> {
libstd/comm.rs:
54: pub struct ChanOne<T> { x: rtcomm::ChanOne<T> }


libstd/rt/comm.rs:687:33-687:33 -ty- definition:
// FIXME #7760: Need better name
type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
references:-
707: impl<T: Send> GenericPort<T> for MegaPipe<T> {
690: pub fn megapipe<T: Send>() -> MegaPipe<T> {
695: impl<T: Send> GenericChan<T> for MegaPipe<T> {
717: impl<T: Send> SendDeferred<T> for MegaPipe<T> {
701: impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {


libstd/rt/comm.rs:444:35-444:35 -struct- definition:
/// A channel with unbounded size.
pub struct Chan<T> {
references:-
459:     let chan = Chan { next: Cell::new(cone) };
456: pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
585:     pub fn new(chan: Chan<T>) -> SharedChan<T> {
463: impl<T: Send> Chan<T> {
484: impl<T: Send> SendDeferred<T> for Chan<T> {
472: impl<T: Send> GenericChan<T> for Chan<T> {
478: impl<T: Send> GenericSmartChan<T> for Chan<T> {
libstd/comm.rs:
62: pub struct Chan<T> { x: rtcomm::Chan<T> }


libstd/rt/comm.rs:631:1-631:1 -struct- definition:

pub struct SharedPort<T> {
references:-
637: impl<T> SharedPort<T> {
644:         SharedPort { next_link: UnsafeArc::new(next_link) }
638:     pub fn new(port: Port<T>) -> SharedPort<T> {
688: type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
679: impl<T> Clone for SharedPort<T> {
681:         SharedPort {
680:     fn clone(&self) -> SharedPort<T> {
648: impl<T: Send> GenericPort<T> for SharedPort<T> {
libstd/comm.rs:
198: pub struct SharedPort<T> { x: rtcomm::SharedPort<T> }


libstd/rt/comm.rs:430:87-430:87 -trait- definition:
/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
pub trait SendDeferred<T> {
references:-
615: impl<T: Send> SendDeferred<T> for SharedChan<T> {
484: impl<T: Send> SendDeferred<T> for Chan<T> {
717: impl<T: Send> SendDeferred<T> for MegaPipe<T> {
libstd/comm.rs:
179: impl<T: Send> SendDeferred<T> for SharedChan<T> {
124: impl<T: Send> SendDeferred<T> for Chan<T> {


libstd/rt/comm.rs:441:51-441:51 -ty- definition:
type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
type StreamPortOne<T> = PortOne<StreamPayload<T>>;
references:-
634:     priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>>
453:     next: Cell<StreamPortOne<T>>