(index<- ) ./libstd/comm/mod.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Communication primitives for concurrent tasks
12 //!
13 //! Rust makes it very difficult to share data among tasks to prevent race
14 //! conditions and to improve parallelism, but there is often a need for
15 //! communication between concurrent tasks. The primitives defined in this
16 //! module are the building blocks for synchronization in rust.
17 //!
18 //! This module provides message-based communication over channels, concretely
19 //! defined among three types:
20 //!
21 //! * `Sender`
22 //! * `SyncSender`
23 //! * `Receiver`
24 //!
25 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
26 //! senders are clone-able such that many tasks can send simultaneously to one
27 //! receiver. These channels are *task blocking*, not *thread blocking*. This
28 //! means that if one task is blocked on a channel, other tasks can continue to
29 //! make progress.
30 //!
31 //! Rust channels come in one of two flavors:
32 //!
33 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
34 //! will return a `(Sender, Receiver)` tuple where all sends will be
35 //! **asynchronous** (they never block). The channel conceptually has an
36 //! infinite buffer.
37 //!
38 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
39 //! a `(SyncSender, Receiver)` tuple where the storage for pending messages
40 //! is a pre-allocated buffer of a fixed size. All sends will be
41 //! **synchronous** by blocking until there is buffer space available. Note
42 //! that a bound of 0 is allowed, causing the channel to become a
43 //! "rendezvous" channel where each sender atomically hands off a message to
44 //! a receiver.
45 //!
46 //! ## Failure Propagation
47 //!
48 //! In addition to being a core primitive for communicating in rust, channels
49 //! are the points at which failure is propagated among tasks. Whenever the one
50 //! half of channel is closed, the other half will have its next operation
51 //! `fail!`. The purpose of this is to allow propagation of failure among tasks
52 //! that are linked to one another via channels.
53 //!
54 //! There are methods on both of senders and receivers to perform their
55 //! respective operations without failing, however.
56 //!
57 //! ## Runtime Requirements
58 //!
59 //! The channel types defined in this module generally have very few runtime
60 //! requirements in order to operate. The major requirement they have is for a
61 //! local rust `Task` to be available if any *blocking* operation is performed.
62 //!
63 //! If a local `Task` is not available (for example an FFI callback), then the
64 //! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as
65 //! the `try_send` method on a `SyncSender`, but no other operations are
66 //! guaranteed to be safe.
67 //!
68 //! Additionally, channels can interoperate between runtimes. If one task in a
69 //! program is running on libnative and another is running on libgreen, they can
70 //! still communicate with one another using channels.
71 //!
72 //! # Example
73 //!
74 //! Simple usage:
75 //!
76 //! ```
77 //! // Create a simple streaming channel
78 //! let (tx, rx) = channel();
79 //! spawn(proc() {
80 //! tx.send(10);
81 //! });
82 //! assert_eq!(rx.recv(), 10);
83 //! ```
84 //!
85 //! Shared usage:
86 //!
87 //! ```
88 //! // Create a shared channel which can be sent along from many tasks
89 //! let (tx, rx) = channel();
90 //! for i in range(0, 10) {
91 //! let tx = tx.clone();
92 //! spawn(proc() {
93 //! tx.send(i);
94 //! })
95 //! }
96 //!
97 //! for _ in range(0, 10) {
98 //! let j = rx.recv();
99 //! assert!(0 <= j && j < 10);
100 //! }
101 //! ```
102 //!
103 //! Propagating failure:
104 //!
105 //! ```should_fail
106 //! // The call to recv() will fail!() because the channel has already hung
107 //! // up (or been deallocated)
108 //! let (tx, rx) = channel::<int>();
109 //! drop(tx);
110 //! rx.recv();
111 //! ```
112 //!
113 //! Synchronous channels:
114 //!
115 //! ```
116 //! let (tx, rx) = sync_channel(0);
117 //! spawn(proc() {
118 //! // This will wait for the parent task to start receiving
119 //! tx.send(53);
120 //! });
121 //! rx.recv();
122 //! ```
123
124 // A description of how Rust's channel implementation works
125 //
126 // Channels are supposed to be the basic building block for all other
127 // concurrent primitives that are used in Rust. As a result, the channel type
128 // needs to be highly optimized, flexible, and broad enough for use everywhere.
129 //
130 // The choice of implementation of all channels is to be built on lock-free data
131 // structures. The channels themselves are then consequently also lock-free data
132 // structures. As always with lock-free code, this is a very "here be dragons"
133 // territory, especially because I'm unaware of any academic papers which have
134 // gone into great length about channels of these flavors.
135 //
136 // ## Flavors of channels
137 //
138 // From the perspective of a consumer of this library, there is only one flavor
139 // of channel. This channel can be used as a stream and cloned to allow multiple
140 // senders. Under the hood, however, there are actually three flavors of
141 // channels in play.
142 //
143 // * Oneshots - these channels are highly optimized for the one-send use case.
144 // They contain as few atomics as possible and involve one and
145 // exactly one allocation.
146 // * Streams - these channels are optimized for the non-shared use case. They
147 // use a different concurrent queue which is more tailored for this
148 // use case. The initial allocation of this flavor of channel is not
149 // optimized.
150 // * Shared - this is the most general form of channel that this module offers,
151 // a channel with multiple senders. This type is as optimized as it
152 // can be, but the previous two types mentioned are much faster for
153 // their use-cases.
154 //
155 // ## Concurrent queues
156 //
157 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
158 // recv() obviously blocks. This means that under the hood there must be some
159 // shared and concurrent queue holding all of the actual data.
160 //
161 // With two flavors of channels, two flavors of queues are also used. We have
162 // chosen to use queues from a well-known author which are abbreviated as SPSC
163 // and MPSC (single producer, single consumer and multiple producer, single
164 // consumer). SPSC queues are used for streams while MPSC queues are used for
165 // shared channels.
166 //
167 // ### SPSC optimizations
168 //
169 // The SPSC queue found online is essentially a linked list of nodes where one
170 // half of the nodes are the "queue of data" and the other half of nodes are a
171 // cache of unused nodes. The unused nodes are used such that an allocation is
172 // not required on every push() and a free doesn't need to happen on every
173 // pop().
174 //
175 // As found online, however, the cache of nodes is of an infinite size. This
176 // means that if a channel at one point in its life had 50k items in the queue,
177 // then the queue will always have the capacity for 50k items. I believed that
178 // this was an unnecessary limitation of the implementation, so I have altered
179 // the queue to optionally have a bound on the cache size.
180 //
181 // By default, streams will have an unbounded SPSC queue with a small-ish cache
182 // size. The hope is that the cache is still large enough to have very fast
183 // send() operations while not too large such that millions of channels can
184 // coexist at once.
185 //
186 // ### MPSC optimizations
187 //
188 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
189 // a linked list under the hood to earn its unboundedness, but I have not put
190 // forth much effort into having a cache of nodes similar to the SPSC queue.
191 //
192 // For now, I believe that this is "ok" because shared channels are not the most
193 // common type, but soon we may wish to revisit this queue choice and determine
194 // another candidate for backend storage of shared channels.
195 //
196 // ## Overview of the Implementation
197 //
198 // Now that there's a little background on the concurrent queues used, it's
199 // worth going into much more detail about the channels themselves. The basic
200 // pseudocode for a send/recv are:
201 //
202 //
203 // send(t) recv()
204 // queue.push(t) return if queue.pop()
205 // if increment() == -1 deschedule {
206 // wakeup() if decrement() > 0
207 // cancel_deschedule()
208 // }
209 // queue.pop()
210 //
211 // As mentioned before, there are no locks in this implementation, only atomic
212 // instructions are used.
213 //
214 // ### The internal atomic counter
215 //
216 // Every channel has a shared counter with each half to keep track of the size
217 // of the queue. This counter is used to abort descheduling by the receiver and
218 // to know when to wake up on the sending side.
219 //
220 // As seen in the pseudocode, senders will increment this count and receivers
221 // will decrement the count. The theory behind this is that if a sender sees a
222 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
223 // then it doesn't need to block.
224 //
225 // The recv() method has a beginning call to pop(), and if successful, it needs
226 // to decrement the count. It is a crucial implementation detail that this
227 // decrement does *not* happen to the shared counter. If this were the case,
228 // then it would be possible for the counter to be very negative when there were
229 // no receivers waiting, in which case the senders would have to determine when
230 // it was actually appropriate to wake up a receiver.
231 //
232 // Instead, the "steal count" is kept track of separately (not atomically
233 // because it's only used by receivers), and then the decrement() call when
234 // descheduling will lump in all of the recent steals into one large decrement.
235 //
236 // The implication of this is that if a sender sees a -1 count, then there's
237 // guaranteed to be a waiter waiting!
238 //
239 // ## Native Implementation
240 //
241 // A major goal of these channels is to work seamlessly on and off the runtime.
242 // All of the previous race conditions have been worded in terms of
243 // scheduler-isms (which is obviously not available without the runtime).
244 //
245 // For now, native usage of channels (off the runtime) will fall back onto
246 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
247 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
248 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
249 // condition variable.
250 //
251 // ## Select
252 //
253 // Being able to support selection over channels has greatly influenced this
254 // design, and not only does selection need to work inside the runtime, but also
255 // outside the runtime.
256 //
257 // The implementation is fairly straightforward. The goal of select() is not to
258 // return some data, but only to return which channel can receive data without
259 // blocking. The implementation is essentially the entire blocking procedure
260 // followed by an increment as soon as its woken up. The cancellation procedure
261 // involves an increment and swapping out of to_wake to acquire ownership of the
262 // task to unblock.
263 //
264 // Sadly this current implementation requires multiple allocations, so I have
265 // seen the throughput of select() be much worse than it should be. I do not
266 // believe that there is anything fundamental which needs to change about these
267 // channels, however, in order to support a more efficient select().
268 //
269 // # Conclusion
270 //
271 // And now that you've seen all the races that I found and attempted to fix,
272 // here's the code for you to find some more!
273
274 use cell::Cell;
275 use clone::Clone;
276 use iter::Iterator;
277 use kinds::Send;
278 use kinds::marker;
279 use mem;
280 use ops::Drop;
281 use option::{Some, None, Option};
282 use owned::Box;
283 use result::{Ok, Err, Result};
284 use rt::local::Local;
285 use rt::task::{Task, BlockedTask};
286 use sync::arc::UnsafeArc;
287 use ty::Unsafe;
288
289 pub use comm::select::{Select, Handle};
290
291 macro_rules! test (
292 { fn $name:ident() $b:block $(#[$a:meta])*} => (
293 mod $name {
294 #![allow(unused_imports)]
295
296 use native;
297 use comm::*;
298 use prelude::*;
299 use super::*;
300 use super::super::*;
301 use owned::Box;
302 use task;
303
304 fn f() $b
305
306 $(#[$a])* #[test] fn uv() { f() }
307 $(#[$a])* #[test] fn native() {
308 use native;
309 let (tx, rx) = channel();
310 native::task::spawn(proc() { tx.send(f()) });
311 rx.recv();
312 }
313 }
314 )
315 )
316
317 mod select;
318 mod oneshot;
319 mod stream;
320 mod shared;
321 mod sync;
322
323 // Use a power of 2 to allow LLVM to optimize to something that's not a
324 // division, this is hit pretty regularly.
325 static RESCHED_FREQ: int = 256;
326
327 /// The receiving-half of Rust's channel type. This half can only be owned by
328 /// one task
329 pub struct Receiver<T> {
330 inner: Unsafe<Flavor<T>>,
331 receives: Cell<uint>,
332 // can't share in an arc
333 marker: marker::NoShare,
334 }
335
336 /// An iterator over messages on a receiver, this iterator will block
337 /// whenever `next` is called, waiting for a new message, and `None` will be
338 /// returned when the corresponding channel has hung up.
339 pub struct Messages<'a, T> {
340 rx: &'a Receiver<T>
341 }
342
343 /// The sending-half of Rust's asynchronous channel type. This half can only be
344 /// owned by one task, but it can be cloned to send to other tasks.
345 pub struct Sender<T> {
346 inner: Unsafe<Flavor<T>>,
347 sends: Cell<uint>,
348 // can't share in an arc
349 marker: marker::NoShare,
350 }
351
352 /// The sending-half of Rust's synchronous channel type. This half can only be
353 /// owned by one task, but it can be cloned to send to other tasks.
354 pub struct SyncSender<T> {
355 inner: UnsafeArc<sync::Packet<T>>,
356 // can't share in an arc
357 marker: marker::NoShare,
358 }
359
360 /// This enumeration is the list of the possible reasons that try_recv could not
361 /// return data when called.
362 #[deriving(Eq, Clone, Show)]
363 pub enum TryRecvError {
364 /// This channel is currently empty, but the sender(s) have not yet
365 /// disconnected, so data may yet become available.
366 Empty,
367 /// This channel's sending half has become disconnected, and there will
368 /// never be any more data received on this channel
369 Disconnected,
370 }
371
372 /// This enumeration is the list of the possible error outcomes for the
373 /// `SyncSender::try_send` method.
374 #[deriving(Eq, Clone, Show)]
375 pub enum TrySendError<T> {
376 /// The data could not be sent on the channel because it would require that
377 /// the callee block to send the data.
378 ///
379 /// If this is a buffered channel, then the buffer is full at this time. If
380 /// this is not a buffered channel, then there is no receiver available to
381 /// acquire the data.
382 Full(T),
383 /// This channel's receiving half has disconnected, so the data could not be
384 /// sent. The data is returned back to the callee in this case.
385 RecvDisconnected(T),
386 }
387
388 enum Flavor<T> {
389 Oneshot(UnsafeArc<oneshot::Packet<T>>),
390 Stream(UnsafeArc<stream::Packet<T>>),
391 Shared(UnsafeArc<shared::Packet<T>>),
392 Sync(UnsafeArc<sync::Packet<T>>),
393 }
394
395 #[doc(hidden)]
396 trait UnsafeFlavor<T> {
397 fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>>;
398 unsafe fn mut_inner<'a>(&'a self) -> &'a mut Flavor<T> {
399 &mut *self.inner_unsafe().get()
400 }
401 unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
402 &*self.inner_unsafe().get()
403 }
404 }
405 impl<T> UnsafeFlavor<T> for Sender<T> {
406 fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
407 &self.inner
408 }
409 }
410 impl<T> UnsafeFlavor<T> for Receiver<T> {
411 fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
412 &self.inner
413 }
414 }
415
416 /// Creates a new asynchronous channel, returning the sender/receiver halves.
417 ///
418 /// All data sent on the sender will become available on the receiver, and no
419 /// send will block the calling task (this channel has an "infinite buffer").
420 ///
421 /// # Example
422 ///
423 /// ```
424 /// let (tx, rx) = channel();
425 ///
426 /// // Spawn off an expensive computation
427 /// spawn(proc() {
428 /// # fn expensive_computation() {}
429 /// tx.send(expensive_computation());
430 /// });
431 ///
432 /// // Do some useful work for awhile
433 ///
434 /// // Let's see what that answer was
435 /// println!("{}", rx.recv());
436 /// ```
437 pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
438 let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
439 (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a)))
440 }
441
442 /// Creates a new synchronous, bounded channel.
443 ///
444 /// Like asynchronous channels, the `Receiver` will block until a message
445 /// becomes available. These channels differ greatly in the semantics of the
446 /// sender from asynchronous channels, however.
447 ///
448 /// This channel has an internal buffer on which messages will be queued. When
449 /// the internal buffer becomes full, future sends will *block* waiting for the
450 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
451 /// becomes "rendezvous channel" where each send will not return until a recv
452 /// is paired with it.
453 ///
454 /// As with asynchronous channels, all senders will fail in `send` if the
455 /// `Receiver` has been destroyed.
456 ///
457 /// # Example
458 ///
459 /// ```
460 /// let (tx, rx) = sync_channel(1);
461 ///
462 /// // this returns immediately
463 /// tx.send(1);
464 ///
465 /// spawn(proc() {
466 /// // this will block until the previous message has been received
467 /// tx.send(2);
468 /// });
469 ///
470 /// assert_eq!(rx.recv(), 1);
471 /// assert_eq!(rx.recv(), 2);
472 /// ```
473 pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
474 let (a, b) = UnsafeArc::new2(sync::Packet::new(bound));
475 (SyncSender::new(a), Receiver::new(Sync(b)))
476 }
477
478 ////////////////////////////////////////////////////////////////////////////////
479 // Sender
480 ////////////////////////////////////////////////////////////////////////////////
481
482 impl<T: Send> Sender<T> {
483 fn new(inner: Flavor<T>) -> Sender<T> {
484 Sender { inner: Unsafe::new(inner), sends: Cell::new(0), marker: marker::NoShare }
485 }
486
487 /// Sends a value along this channel to be received by the corresponding
488 /// receiver.
489 ///
490 /// Rust channels are infinitely buffered so this method will never block.
491 ///
492 /// # Failure
493 ///
494 /// This function will fail if the other end of the channel has hung up.
495 /// This means that if the corresponding receiver has fallen out of scope,
496 /// this function will trigger a fail message saying that a message is
497 /// being sent on a closed channel.
498 ///
499 /// Note that if this function does *not* fail, it does not mean that the
500 /// data will be successfully received. All sends are placed into a queue,
501 /// so it is possible for a send to succeed (the other end is alive), but
502 /// then the other end could immediately disconnect.
503 ///
504 /// The purpose of this functionality is to propagate failure among tasks.
505 /// If failure is not desired, then consider using the `send_opt` method
506 pub fn send(&self, t: T) {
507 if self.send_opt(t).is_err() {
508 fail!("sending on a closed channel");
509 }
510 }
511
512 /// Attempts to send a value on this channel, returning it back if it could
513 /// not be sent.
514 ///
515 /// A successful send occurs when it is determined that the other end of
516 /// the channel has not hung up already. An unsuccessful send would be one
517 /// where the corresponding receiver has already been deallocated. Note
518 /// that a return value of `Err` means that the data will never be
519 /// received, but a return value of `Ok` does *not* mean that the data
520 /// will be received. It is possible for the corresponding receiver to
521 /// hang up immediately after this function returns `Ok`.
522 ///
523 /// Like `send`, this method will never block.
524 ///
525 /// # Failure
526 ///
527 /// This method will never fail, it will return the message back to the
528 /// caller if the other end is disconnected
529 ///
530 /// # Example
531 ///
532 /// ```
533 /// let (tx, rx) = channel();
534 ///
535 /// // This send is always successful
536 /// assert_eq!(tx.send_opt(1), Ok(()));
537 ///
538 /// // This send will fail because the receiver is gone
539 /// drop(rx);
540 /// assert_eq!(tx.send_opt(1), Err(1));
541 /// ```
542 pub fn send_opt(&self, t: T) -> Result<(), T> {
543 // In order to prevent starvation of other tasks in situations where
544 // a task sends repeatedly without ever receiving, we occassionally
545 // yield instead of doing a send immediately.
546 //
547 // Don't unconditionally attempt to yield because the TLS overhead can
548 // be a bit much, and also use `try_take` instead of `take` because
549 // there's no reason that this send shouldn't be usable off the
550 // runtime.
551 let cnt = self.sends.get() + 1;
552 self.sends.set(cnt);
553 if cnt % (RESCHED_FREQ as uint) == 0 {
554 let task: Option<Box<Task>> = Local::try_take();
555 task.map(|t| t.maybe_yield());
556 }
557
558 let (new_inner, ret) = match *unsafe { self.inner() } {
559 Oneshot(ref p) => {
560 let p = p.get();
561 unsafe {
562 if !(*p).sent() {
563 return (*p).send(t);
564 } else {
565 let (a, b) = UnsafeArc::new2(stream::Packet::new());
566 match (*p).upgrade(Receiver::new(Stream(b))) {
567 oneshot::UpSuccess => {
568 let ret = (*a.get()).send(t);
569 (a, ret)
570 }
571 oneshot::UpDisconnected => (a, Err(t)),
572 oneshot::UpWoke(task) => {
573 // This send cannot fail because the task is
574 // asleep (we're looking at it), so the receiver
575 // can't go away.
576 (*a.get()).send(t).ok().unwrap();
577 task.wake().map(|t| t.reawaken());
578 (a, Ok(()))
579 }
580 }
581 }
582 }
583 }
584 Stream(ref p) => return unsafe { (*p.get()).send(t) },
585 Shared(ref p) => return unsafe { (*p.get()).send(t) },
586 Sync(..) => unreachable!(),
587 };
588
589 unsafe {
590 let tmp = Sender::new(Stream(new_inner));
591 mem::swap(self.mut_inner(), tmp.mut_inner());
592 }
593 return ret;
594 }
595 }
596
597 impl<T: Send> Clone for Sender<T> {
598 fn clone(&self) -> Sender<T> {
599 let (packet, sleeper) = match *unsafe { self.inner() } {
600 Oneshot(ref p) => {
601 let (a, b) = UnsafeArc::new2(shared::Packet::new());
602 match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
603 oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
604 oneshot::UpWoke(task) => (b, Some(task))
605 }
606 }
607 Stream(ref p) => {
608 let (a, b) = UnsafeArc::new2(shared::Packet::new());
609 match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
610 stream::UpSuccess | stream::UpDisconnected => (b, None),
611 stream::UpWoke(task) => (b, Some(task)),
612 }
613 }
614 Shared(ref p) => {
615 unsafe { (*p.get()).clone_chan(); }
616 return Sender::new(Shared(p.clone()));
617 }
618 Sync(..) => unreachable!(),
619 };
620
621 unsafe {
622 (*packet.get()).inherit_blocker(sleeper);
623
624 let tmp = Sender::new(Shared(packet.clone()));
625 mem::swap(self.mut_inner(), tmp.mut_inner());
626 }
627 Sender::new(Shared(packet))
628 }
629 }
630
631 #[unsafe_destructor]
632 impl<T: Send> Drop for Sender<T> {
633 fn drop(&mut self) {
634 match *unsafe { self.mut_inner() } {
635 Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
636 Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
637 Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
638 Sync(..) => unreachable!(),
639 }
640 }
641 }
642
643 ////////////////////////////////////////////////////////////////////////////////
644 // SyncSender
645 ////////////////////////////////////////////////////////////////////////////////
646
647 impl<T: Send> SyncSender<T> {
648 fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
649 SyncSender { inner: inner, marker: marker::NoShare }
650 }
651
652 /// Sends a value on this synchronous channel.
653 ///
654 /// This function will *block* until space in the internal buffer becomes
655 /// available or a receiver is available to hand off the message to.
656 ///
657 /// Note that a successful send does *not* guarantee that the receiver will
658 /// ever see the data if there is a buffer on this channel. Messages may be
659 /// enqueued in the internal buffer for the receiver to receive at a later
660 /// time. If the buffer size is 0, however, it can be guaranteed that the
661 /// receiver has indeed received the data if this function returns success.
662 ///
663 /// # Failure
664 ///
665 /// Similarly to `Sender::send`, this function will fail if the
666 /// corresponding `Receiver` for this channel has disconnected. This
667 /// behavior is used to propagate failure among tasks.
668 ///
669 /// If failure is not desired, you can achieve the same semantics with the
670 /// `SyncSender::send_opt` method which will not fail if the receiver
671 /// disconnects.
672 pub fn send(&self, t: T) {
673 if self.send_opt(t).is_err() {
674 fail!("sending on a closed channel");
675 }
676 }
677
678 /// Send a value on a channel, returning it back if the receiver
679 /// disconnected
680 ///
681 /// This method will *block* to send the value `t` on the channel, but if
682 /// the value could not be sent due to the receiver disconnecting, the value
683 /// is returned back to the callee. This function is similar to `try_send`,
684 /// except that it will block if the channel is currently full.
685 ///
686 /// # Failure
687 ///
688 /// This function cannot fail.
689 pub fn send_opt(&self, t: T) -> Result<(), T> {
690 unsafe { (*self.inner.get()).send(t) }
691 }
692
693 /// Attempts to send a value on this channel without blocking.
694 ///
695 /// This method differs from `send_opt` by returning immediately if the
696 /// channel's buffer is full or no receiver is waiting to acquire some
697 /// data. Compared with `send_opt`, this function has two failure cases
698 /// instead of one (one for disconnection, one for a full buffer).
699 ///
700 /// See `SyncSender::send` for notes about guarantees of whether the
701 /// receiver has received the data or not if this function is successful.
702 ///
703 /// # Failure
704 ///
705 /// This function cannot fail
706 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
707 unsafe { (*self.inner.get()).try_send(t) }
708 }
709 }
710
711 impl<T: Send> Clone for SyncSender<T> {
712 fn clone(&self) -> SyncSender<T> {
713 unsafe { (*self.inner.get()).clone_chan(); }
714 return SyncSender::new(self.inner.clone());
715 }
716 }
717
718 #[unsafe_destructor]
719 impl<T: Send> Drop for SyncSender<T> {
720 fn drop(&mut self) {
721 unsafe { (*self.inner.get()).drop_chan(); }
722 }
723 }
724
725 ////////////////////////////////////////////////////////////////////////////////
726 // Receiver
727 ////////////////////////////////////////////////////////////////////////////////
728
729 impl<T: Send> Receiver<T> {
730 fn new(inner: Flavor<T>) -> Receiver<T> {
731 Receiver { inner: Unsafe::new(inner), receives: Cell::new(0), marker: marker::NoShare }
732 }
733
734 /// Blocks waiting for a value on this receiver
735 ///
736 /// This function will block if necessary to wait for a corresponding send
737 /// on the channel from its paired `Sender` structure. This receiver will
738 /// be woken up when data is ready, and the data will be returned.
739 ///
740 /// # Failure
741 ///
742 /// Similar to channels, this method will trigger a task failure if the
743 /// other end of the channel has hung up (been deallocated). The purpose of
744 /// this is to propagate failure among tasks.
745 ///
746 /// If failure is not desired, then there are two options:
747 ///
748 /// * If blocking is still desired, the `recv_opt` method will return `None`
749 /// when the other end hangs up
750 ///
751 /// * If blocking is not desired, then the `try_recv` method will attempt to
752 /// peek at a value on this receiver.
753 pub fn recv(&self) -> T {
754 match self.recv_opt() {
755 Ok(t) => t,
756 Err(()) => fail!("receiving on a closed channel"),
757 }
758 }
759
760 /// Attempts to return a pending value on this receiver without blocking
761 ///
762 /// This method will never block the caller in order to wait for data to
763 /// become available. Instead, this will always return immediately with a
764 /// possible option of pending data on the channel.
765 ///
766 /// This is useful for a flavor of "optimistic check" before deciding to
767 /// block on a receiver.
768 ///
769 /// This function cannot fail.
770 pub fn try_recv(&self) -> Result<T, TryRecvError> {
771 // If a thread is spinning in try_recv, we should take the opportunity
772 // to reschedule things occasionally. See notes above in scheduling on
773 // sends for why this doesn't always hit TLS, and also for why this uses
774 // `try_take` instead of `take`.
775 let cnt = self.receives.get() + 1;
776 self.receives.set(cnt);
777 if cnt % (RESCHED_FREQ as uint) == 0 {
778 let task: Option<Box<Task>> = Local::try_take();
779 task.map(|t| t.maybe_yield());
780 }
781
782 loop {
783 let new_port = match *unsafe { self.inner() } {
784 Oneshot(ref p) => {
785 match unsafe { (*p.get()).try_recv() } {
786 Ok(t) => return Ok(t),
787 Err(oneshot::Empty) => return Err(Empty),
788 Err(oneshot::Disconnected) => return Err(Disconnected),
789 Err(oneshot::Upgraded(rx)) => rx,
790 }
791 }
792 Stream(ref p) => {
793 match unsafe { (*p.get()).try_recv() } {
794 Ok(t) => return Ok(t),
795 Err(stream::Empty) => return Err(Empty),
796 Err(stream::Disconnected) => return Err(Disconnected),
797 Err(stream::Upgraded(rx)) => rx,
798 }
799 }
800 Shared(ref p) => {
801 match unsafe { (*p.get()).try_recv() } {
802 Ok(t) => return Ok(t),
803 Err(shared::Empty) => return Err(Empty),
804 Err(shared::Disconnected) => return Err(Disconnected),
805 }
806 }
807 Sync(ref p) => {
808 match unsafe { (*p.get()).try_recv() } {
809 Ok(t) => return Ok(t),
810 Err(sync::Empty) => return Err(Empty),
811 Err(sync::Disconnected) => return Err(Disconnected),
812 }
813 }
814 };
815 unsafe {
816 mem::swap(self.mut_inner(),
817 new_port.mut_inner());
818 }
819 }
820 }
821
822 /// Attempt to wait for a value on this receiver, but does not fail if the
823 /// corresponding channel has hung up.
824 ///
825 /// This implementation of iterators for ports will always block if there is
826 /// not data available on the receiver, but it will not fail in the case
827 /// that the channel has been deallocated.
828 ///
829 /// In other words, this function has the same semantics as the `recv`
830 /// method except for the failure aspect.
831 ///
832 /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of
833 /// the value found on the receiver is returned.
834 pub fn recv_opt(&self) -> Result<T, ()> {
835 loop {
836 let new_port = match *unsafe { self.inner() } {
837 Oneshot(ref p) => {
838 match unsafe { (*p.get()).recv() } {
839 Ok(t) => return Ok(t),
840 Err(oneshot::Empty) => return unreachable!(),
841 Err(oneshot::Disconnected) => return Err(()),
842 Err(oneshot::Upgraded(rx)) => rx,
843 }
844 }
845 Stream(ref p) => {
846 match unsafe { (*p.get()).recv() } {
847 Ok(t) => return Ok(t),
848 Err(stream::Empty) => return unreachable!(),
849 Err(stream::Disconnected) => return Err(()),
850 Err(stream::Upgraded(rx)) => rx,
851 }
852 }
853 Shared(ref p) => {
854 match unsafe { (*p.get()).recv() } {
855 Ok(t) => return Ok(t),
856 Err(shared::Empty) => return unreachable!(),
857 Err(shared::Disconnected) => return Err(()),
858 }
859 }
860 Sync(ref p) => return unsafe { (*p.get()).recv() }
861 };
862 unsafe {
863 mem::swap(self.mut_inner(), new_port.mut_inner());
864 }
865 }
866 }
867
868 /// Returns an iterator which will block waiting for messages, but never
869 /// `fail!`. It will return `None` when the channel has hung up.
870 pub fn iter<'a>(&'a self) -> Messages<'a, T> {
871 Messages { rx: self }
872 }
873 }
874
875 impl<T: Send> select::Packet for Receiver<T> {
876 fn can_recv(&self) -> bool {
877 loop {
878 let new_port = match *unsafe { self.inner() } {
879 Oneshot(ref p) => {
880 match unsafe { (*p.get()).can_recv() } {
881 Ok(ret) => return ret,
882 Err(upgrade) => upgrade,
883 }
884 }
885 Stream(ref p) => {
886 match unsafe { (*p.get()).can_recv() } {
887 Ok(ret) => return ret,
888 Err(upgrade) => upgrade,
889 }
890 }
891 Shared(ref p) => {
892 return unsafe { (*p.get()).can_recv() };
893 }
894 Sync(ref p) => {
895 return unsafe { (*p.get()).can_recv() };
896 }
897 };
898 unsafe {
899 mem::swap(self.mut_inner(),
900 new_port.mut_inner());
901 }
902 }
903 }
904
905 fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
906 loop {
907 let (t, new_port) = match *unsafe { self.inner() } {
908 Oneshot(ref p) => {
909 match unsafe { (*p.get()).start_selection(task) } {
910 oneshot::SelSuccess => return Ok(()),
911 oneshot::SelCanceled(task) => return Err(task),
912 oneshot::SelUpgraded(t, rx) => (t, rx),
913 }
914 }
915 Stream(ref p) => {
916 match unsafe { (*p.get()).start_selection(task) } {
917 stream::SelSuccess => return Ok(()),
918 stream::SelCanceled(task) => return Err(task),
919 stream::SelUpgraded(t, rx) => (t, rx),
920 }
921 }
922 Shared(ref p) => {
923 return unsafe { (*p.get()).start_selection(task) };
924 }
925 Sync(ref p) => {
926 return unsafe { (*p.get()).start_selection(task) };
927 }
928 };
929 task = t;
930 unsafe {
931 mem::swap(self.mut_inner(),
932 new_port.mut_inner());
933 }
934 }
935 }
936
937 fn abort_selection(&self) -> bool {
938 let mut was_upgrade = false;
939 loop {
940 let result = match *unsafe { self.inner() } {
941 Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
942 Stream(ref p) => unsafe {
943 (*p.get()).abort_selection(was_upgrade)
944 },
945 Shared(ref p) => return unsafe {
946 (*p.get()).abort_selection(was_upgrade)
947 },
948 Sync(ref p) => return unsafe {
949 (*p.get()).abort_selection()
950 },
951 };
952 let new_port = match result { Ok(b) => return b, Err(p) => p };
953 was_upgrade = true;
954 unsafe {
955 mem::swap(self.mut_inner(),
956 new_port.mut_inner());
957 }
958 }
959 }
960 }
961
962 impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
963 fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
964 }
965
966 #[unsafe_destructor]
967 impl<T: Send> Drop for Receiver<T> {
968 fn drop(&mut self) {
969 match *unsafe { self.mut_inner() } {
970 Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
971 Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
972 Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
973 Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
974 }
975 }
976 }
977
978 #[cfg(test)]
979 mod test {
980 use prelude::*;
981
982 use native;
983 use os;
984 use super::*;
985
986 pub fn stress_factor() -> uint {
987 match os::getenv("RUST_TEST_STRESS") {
988 Some(val) => from_str::<uint>(val).unwrap(),
989 None => 1,
990 }
991 }
992
993 test!(fn smoke() {
994 let (tx, rx) = channel();
995 tx.send(1);
996 assert_eq!(rx.recv(), 1);
997 })
998
999 test!(fn drop_full() {
1000 let (tx, _rx) = channel();
1001 tx.send(box 1);
1002 })
1003
1004 test!(fn drop_full_shared() {
1005 let (tx, _rx) = channel();
1006 drop(tx.clone());
1007 drop(tx.clone());
1008 tx.send(box 1);
1009 })
1010
1011 test!(fn smoke_shared() {
1012 let (tx, rx) = channel();
1013 tx.send(1);
1014 assert_eq!(rx.recv(), 1);
1015 let tx = tx.clone();
1016 tx.send(1);
1017 assert_eq!(rx.recv(), 1);
1018 })
1019
1020 test!(fn smoke_threads() {
1021 let (tx, rx) = channel();
1022 spawn(proc() {
1023 tx.send(1);
1024 });
1025 assert_eq!(rx.recv(), 1);
1026 })
1027
1028 test!(fn smoke_port_gone() {
1029 let (tx, rx) = channel();
1030 drop(rx);
1031 tx.send(1);
1032 } #[should_fail])
1033
1034 test!(fn smoke_shared_port_gone() {
1035 let (tx, rx) = channel();
1036 drop(rx);
1037 tx.send(1);
1038 } #[should_fail])
1039
1040 test!(fn smoke_shared_port_gone2() {
1041 let (tx, rx) = channel();
1042 drop(rx);
1043 let tx2 = tx.clone();
1044 drop(tx);
1045 tx2.send(1);
1046 } #[should_fail])
1047
1048 test!(fn port_gone_concurrent() {
1049 let (tx, rx) = channel();
1050 spawn(proc() {
1051 rx.recv();
1052 });
1053 loop { tx.send(1) }
1054 } #[should_fail])
1055
1056 test!(fn port_gone_concurrent_shared() {
1057 let (tx, rx) = channel();
1058 let tx2 = tx.clone();
1059 spawn(proc() {
1060 rx.recv();
1061 });
1062 loop {
1063 tx.send(1);
1064 tx2.send(1);
1065 }
1066 } #[should_fail])
1067
1068 test!(fn smoke_chan_gone() {
1069 let (tx, rx) = channel::<int>();
1070 drop(tx);
1071 rx.recv();
1072 } #[should_fail])
1073
1074 test!(fn smoke_chan_gone_shared() {
1075 let (tx, rx) = channel::<()>();
1076 let tx2 = tx.clone();
1077 drop(tx);
1078 drop(tx2);
1079 rx.recv();
1080 } #[should_fail])
1081
1082 test!(fn chan_gone_concurrent() {
1083 let (tx, rx) = channel();
1084 spawn(proc() {
1085 tx.send(1);
1086 tx.send(1);
1087 });
1088 loop { rx.recv(); }
1089 } #[should_fail])
1090
1091 test!(fn stress() {
1092 let (tx, rx) = channel();
1093 spawn(proc() {
1094 for _ in range(0, 10000) { tx.send(1); }
1095 });
1096 for _ in range(0, 10000) {
1097 assert_eq!(rx.recv(), 1);
1098 }
1099 })
1100
1101 test!(fn stress_shared() {
1102 static AMT: uint = 10000;
1103 static NTHREADS: uint = 8;
1104 let (tx, rx) = channel::<int>();
1105 let (dtx, drx) = channel::<()>();
1106
1107 spawn(proc() {
1108 for _ in range(0, AMT * NTHREADS) {
1109 assert_eq!(rx.recv(), 1);
1110 }
1111 match rx.try_recv() {
1112 Ok(..) => fail!(),
1113 _ => {}
1114 }
1115 dtx.send(());
1116 });
1117
1118 for _ in range(0, NTHREADS) {
1119 let tx = tx.clone();
1120 spawn(proc() {
1121 for _ in range(0, AMT) { tx.send(1); }
1122 });
1123 }
1124 drop(tx);
1125 drx.recv();
1126 })
1127
1128 #[test]
1129 fn send_from_outside_runtime() {
1130 let (tx1, rx1) = channel::<()>();
1131 let (tx2, rx2) = channel::<int>();
1132 let (tx3, rx3) = channel::<()>();
1133 let tx4 = tx3.clone();
1134 spawn(proc() {
1135 tx1.send(());
1136 for _ in range(0, 40) {
1137 assert_eq!(rx2.recv(), 1);
1138 }
1139 tx3.send(());
1140 });
1141 rx1.recv();
1142 native::task::spawn(proc() {
1143 for _ in range(0, 40) {
1144 tx2.send(1);
1145 }
1146 tx4.send(());
1147 });
1148 rx3.recv();
1149 rx3.recv();
1150 }
1151
1152 #[test]
1153 fn recv_from_outside_runtime() {
1154 let (tx, rx) = channel::<int>();
1155 let (dtx, drx) = channel();
1156 native::task::spawn(proc() {
1157 for _ in range(0, 40) {
1158 assert_eq!(rx.recv(), 1);
1159 }
1160 dtx.send(());
1161 });
1162 for _ in range(0, 40) {
1163 tx.send(1);
1164 }
1165 drx.recv();
1166 }
1167
1168 #[test]
1169 fn no_runtime() {
1170 let (tx1, rx1) = channel::<int>();
1171 let (tx2, rx2) = channel::<int>();
1172 let (tx3, rx3) = channel::<()>();
1173 let tx4 = tx3.clone();
1174 native::task::spawn(proc() {
1175 assert_eq!(rx1.recv(), 1);
1176 tx2.send(2);
1177 tx4.send(());
1178 });
1179 native::task::spawn(proc() {
1180 tx1.send(1);
1181 assert_eq!(rx2.recv(), 2);
1182 tx3.send(());
1183 });
1184 rx3.recv();
1185 rx3.recv();
1186 }
1187
1188 test!(fn oneshot_single_thread_close_port_first() {
1189 // Simple test of closing without sending
1190 let (_tx, rx) = channel::<int>();
1191 drop(rx);
1192 })
1193
1194 test!(fn oneshot_single_thread_close_chan_first() {
1195 // Simple test of closing without sending
1196 let (tx, _rx) = channel::<int>();
1197 drop(tx);
1198 })
1199
1200 test!(fn oneshot_single_thread_send_port_close() {
1201 // Testing that the sender cleans up the payload if receiver is closed
1202 let (tx, rx) = channel::<Box<int>>();
1203 drop(rx);
1204 tx.send(box 0);
1205 } #[should_fail])
1206
1207 test!(fn oneshot_single_thread_recv_chan_close() {
1208 // Receiving on a closed chan will fail
1209 let res = task::try(proc() {
1210 let (tx, rx) = channel::<int>();
1211 drop(tx);
1212 rx.recv();
1213 });
1214 // What is our res?
1215 assert!(res.is_err());
1216 })
1217
1218 test!(fn oneshot_single_thread_send_then_recv() {
1219 let (tx, rx) = channel::<Box<int>>();
1220 tx.send(box 10);
1221 assert!(rx.recv() == box 10);
1222 })
1223
1224 test!(fn oneshot_single_thread_try_send_open() {
1225 let (tx, rx) = channel::<int>();
1226 assert!(tx.send_opt(10).is_ok());
1227 assert!(rx.recv() == 10);
1228 })
1229
1230 test!(fn oneshot_single_thread_try_send_closed() {
1231 let (tx, rx) = channel::<int>();
1232 drop(rx);
1233 assert!(tx.send_opt(10).is_err());
1234 })
1235
1236 test!(fn oneshot_single_thread_try_recv_open() {
1237 let (tx, rx) = channel::<int>();
1238 tx.send(10);
1239 assert!(rx.recv_opt() == Ok(10));
1240 })
1241
1242 test!(fn oneshot_single_thread_try_recv_closed() {
1243 let (tx, rx) = channel::<int>();
1244 drop(tx);
1245 assert!(rx.recv_opt() == Err(()));
1246 })
1247
1248 test!(fn oneshot_single_thread_peek_data() {
1249 let (tx, rx) = channel::<int>();
1250 assert_eq!(rx.try_recv(), Err(Empty))
1251 tx.send(10);
1252 assert_eq!(rx.try_recv(), Ok(10));
1253 })
1254
1255 test!(fn oneshot_single_thread_peek_close() {
1256 let (tx, rx) = channel::<int>();
1257 drop(tx);
1258 assert_eq!(rx.try_recv(), Err(Disconnected));
1259 assert_eq!(rx.try_recv(), Err(Disconnected));
1260 })
1261
1262 test!(fn oneshot_single_thread_peek_open() {
1263 let (_tx, rx) = channel::<int>();
1264 assert_eq!(rx.try_recv(), Err(Empty));
1265 })
1266
1267 test!(fn oneshot_multi_task_recv_then_send() {
1268 let (tx, rx) = channel::<Box<int>>();
1269 spawn(proc() {
1270 assert!(rx.recv() == box 10);
1271 });
1272
1273 tx.send(box 10);
1274 })
1275
1276 test!(fn oneshot_multi_task_recv_then_close() {
1277 let (tx, rx) = channel::<Box<int>>();
1278 spawn(proc() {
1279 drop(tx);
1280 });
1281 let res = task::try(proc() {
1282 assert!(rx.recv() == box 10);
1283 });
1284 assert!(res.is_err());
1285 })
1286
1287 test!(fn oneshot_multi_thread_close_stress() {
1288 for _ in range(0, stress_factor()) {
1289 let (tx, rx) = channel::<int>();
1290 spawn(proc() {
1291 drop(rx);
1292 });
1293 drop(tx);
1294 }
1295 })
1296
1297 test!(fn oneshot_multi_thread_send_close_stress() {
1298 for _ in range(0, stress_factor()) {
1299 let (tx, rx) = channel::<int>();
1300 spawn(proc() {
1301 drop(rx);
1302 });
1303 let _ = task::try(proc() {
1304 tx.send(1);
1305 });
1306 }
1307 })
1308
1309 test!(fn oneshot_multi_thread_recv_close_stress() {
1310 for _ in range(0, stress_factor()) {
1311 let (tx, rx) = channel::<int>();
1312 spawn(proc() {
1313 let res = task::try(proc() {
1314 rx.recv();
1315 });
1316 assert!(res.is_err());
1317 });
1318 spawn(proc() {
1319 spawn(proc() {
1320 drop(tx);
1321 });
1322 });
1323 }
1324 })
1325
1326 test!(fn oneshot_multi_thread_send_recv_stress() {
1327 for _ in range(0, stress_factor()) {
1328 let (tx, rx) = channel();
1329 spawn(proc() {
1330 tx.send(box 10);
1331 });
1332 spawn(proc() {
1333 assert!(rx.recv() == box 10);
1334 });
1335 }
1336 })
1337
1338 test!(fn stream_send_recv_stress() {
1339 for _ in range(0, stress_factor()) {
1340 let (tx, rx) = channel();
1341
1342 send(tx, 0);
1343 recv(rx, 0);
1344
1345 fn send(tx: Sender<Box<int>>, i: int) {
1346 if i == 10 { return }
1347
1348 spawn(proc() {
1349 tx.send(box i);
1350 send(tx, i + 1);
1351 });
1352 }
1353
1354 fn recv(rx: Receiver<Box<int>>, i: int) {
1355 if i == 10 { return }
1356
1357 spawn(proc() {
1358 assert!(rx.recv() == box i);
1359 recv(rx, i + 1);
1360 });
1361 }
1362 }
1363 })
1364
1365 test!(fn recv_a_lot() {
1366 // Regression test that we don't run out of stack in scheduler context
1367 let (tx, rx) = channel();
1368 for _ in range(0, 10000) { tx.send(()); }
1369 for _ in range(0, 10000) { rx.recv(); }
1370 })
1371
1372 test!(fn shared_chan_stress() {
1373 let (tx, rx) = channel();
1374 let total = stress_factor() + 100;
1375 for _ in range(0, total) {
1376 let tx = tx.clone();
1377 spawn(proc() {
1378 tx.send(());
1379 });
1380 }
1381
1382 for _ in range(0, total) {
1383 rx.recv();
1384 }
1385 })
1386
1387 test!(fn test_nested_recv_iter() {
1388 let (tx, rx) = channel::<int>();
1389 let (total_tx, total_rx) = channel::<int>();
1390
1391 spawn(proc() {
1392 let mut acc = 0;
1393 for x in rx.iter() {
1394 acc += x;
1395 }
1396 total_tx.send(acc);
1397 });
1398
1399 tx.send(3);
1400 tx.send(1);
1401 tx.send(2);
1402 drop(tx);
1403 assert_eq!(total_rx.recv(), 6);
1404 })
1405
1406 test!(fn test_recv_iter_break() {
1407 let (tx, rx) = channel::<int>();
1408 let (count_tx, count_rx) = channel();
1409
1410 spawn(proc() {
1411 let mut count = 0;
1412 for x in rx.iter() {
1413 if count >= 3 {
1414 break;
1415 } else {
1416 count += x;
1417 }
1418 }
1419 count_tx.send(count);
1420 });
1421
1422 tx.send(2);
1423 tx.send(2);
1424 tx.send(2);
1425 let _ = tx.send_opt(2);
1426 drop(tx);
1427 assert_eq!(count_rx.recv(), 4);
1428 })
1429
1430 test!(fn try_recv_states() {
1431 let (tx1, rx1) = channel::<int>();
1432 let (tx2, rx2) = channel::<()>();
1433 let (tx3, rx3) = channel::<()>();
1434 spawn(proc() {
1435 rx2.recv();
1436 tx1.send(1);
1437 tx3.send(());
1438 rx2.recv();
1439 drop(tx1);
1440 tx3.send(());
1441 });
1442
1443 assert_eq!(rx1.try_recv(), Err(Empty));
1444 tx2.send(());
1445 rx3.recv();
1446 assert_eq!(rx1.try_recv(), Ok(1));
1447 assert_eq!(rx1.try_recv(), Err(Empty));
1448 tx2.send(());
1449 rx3.recv();
1450 assert_eq!(rx1.try_recv(), Err(Disconnected));
1451 })
1452
1453 // This bug used to end up in a livelock inside of the Receiver destructor
1454 // because the internal state of the Shared packet was corrupted
1455 test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1456 let (tx, rx) = channel();
1457 let (tx2, rx2) = channel();
1458 spawn(proc() {
1459 rx.recv(); // wait on a oneshot
1460 drop(rx); // destroy a shared
1461 tx2.send(());
1462 });
1463 // make sure the other task has gone to sleep
1464 for _ in range(0, 5000) { task::deschedule(); }
1465
1466 // upgrade to a shared chan and send a message
1467 let t = tx.clone();
1468 drop(tx);
1469 t.send(());
1470
1471 // wait for the child task to exit before we exit
1472 rx2.recv();
1473 })
1474
1475 test!(fn sends_off_the_runtime() {
1476 use rt::thread::Thread;
1477
1478 let (tx, rx) = channel();
1479 let t = Thread::start(proc() {
1480 for _ in range(0, 1000) {
1481 tx.send(());
1482 }
1483 });
1484 for _ in range(0, 1000) {
1485 rx.recv();
1486 }
1487 t.join();
1488 })
1489
1490 test!(fn try_recvs_off_the_runtime() {
1491 use rt::thread::Thread;
1492
1493 let (tx, rx) = channel();
1494 let (cdone, pdone) = channel();
1495 let t = Thread::start(proc() {
1496 let mut hits = 0;
1497 while hits < 10 {
1498 match rx.try_recv() {
1499 Ok(()) => { hits += 1; }
1500 Err(Empty) => { Thread::yield_now(); }
1501 Err(Disconnected) => return,
1502 }
1503 }
1504 cdone.send(());
1505 });
1506 for _ in range(0, 10) {
1507 tx.send(());
1508 }
1509 t.join();
1510 pdone.recv();
1511 })
1512 }
1513
1514 #[cfg(test)]
1515 mod sync_tests {
1516 use prelude::*;
1517 use os;
1518
1519 pub fn stress_factor() -> uint {
1520 match os::getenv("RUST_TEST_STRESS") {
1521 Some(val) => from_str::<uint>(val).unwrap(),
1522 None => 1,
1523 }
1524 }
1525
1526 test!(fn smoke() {
1527 let (tx, rx) = sync_channel(1);
1528 tx.send(1);
1529 assert_eq!(rx.recv(), 1);
1530 })
1531
1532 test!(fn drop_full() {
1533 let (tx, _rx) = sync_channel(1);
1534 tx.send(box 1);
1535 })
1536
1537 test!(fn smoke_shared() {
1538 let (tx, rx) = sync_channel(1);
1539 tx.send(1);
1540 assert_eq!(rx.recv(), 1);
1541 let tx = tx.clone();
1542 tx.send(1);
1543 assert_eq!(rx.recv(), 1);
1544 })
1545
1546 test!(fn smoke_threads() {
1547 let (tx, rx) = sync_channel(0);
1548 spawn(proc() {
1549 tx.send(1);
1550 });
1551 assert_eq!(rx.recv(), 1);
1552 })
1553
1554 test!(fn smoke_port_gone() {
1555 let (tx, rx) = sync_channel(0);
1556 drop(rx);
1557 tx.send(1);
1558 } #[should_fail])
1559
1560 test!(fn smoke_shared_port_gone2() {
1561 let (tx, rx) = sync_channel(0);
1562 drop(rx);
1563 let tx2 = tx.clone();
1564 drop(tx);
1565 tx2.send(1);
1566 } #[should_fail])
1567
1568 test!(fn port_gone_concurrent() {
1569 let (tx, rx) = sync_channel(0);
1570 spawn(proc() {
1571 rx.recv();
1572 });
1573 loop { tx.send(1) }
1574 } #[should_fail])
1575
1576 test!(fn port_gone_concurrent_shared() {
1577 let (tx, rx) = sync_channel(0);
1578 let tx2 = tx.clone();
1579 spawn(proc() {
1580 rx.recv();
1581 });
1582 loop {
1583 tx.send(1);
1584 tx2.send(1);
1585 }
1586 } #[should_fail])
1587
1588 test!(fn smoke_chan_gone() {
1589 let (tx, rx) = sync_channel::<int>(0);
1590 drop(tx);
1591 rx.recv();
1592 } #[should_fail])
1593
1594 test!(fn smoke_chan_gone_shared() {
1595 let (tx, rx) = sync_channel::<()>(0);
1596 let tx2 = tx.clone();
1597 drop(tx);
1598 drop(tx2);
1599 rx.recv();
1600 } #[should_fail])
1601
1602 test!(fn chan_gone_concurrent() {
1603 let (tx, rx) = sync_channel(0);
1604 spawn(proc() {
1605 tx.send(1);
1606 tx.send(1);
1607 });
1608 loop { rx.recv(); }
1609 } #[should_fail])
1610
1611 test!(fn stress() {
1612 let (tx, rx) = sync_channel(0);
1613 spawn(proc() {
1614 for _ in range(0, 10000) { tx.send(1); }
1615 });
1616 for _ in range(0, 10000) {
1617 assert_eq!(rx.recv(), 1);
1618 }
1619 })
1620
1621 test!(fn stress_shared() {
1622 static AMT: uint = 1000;
1623 static NTHREADS: uint = 8;
1624 let (tx, rx) = sync_channel::<int>(0);
1625 let (dtx, drx) = sync_channel::<()>(0);
1626
1627 spawn(proc() {
1628 for _ in range(0, AMT * NTHREADS) {
1629 assert_eq!(rx.recv(), 1);
1630 }
1631 match rx.try_recv() {
1632 Ok(..) => fail!(),
1633 _ => {}
1634 }
1635 dtx.send(());
1636 });
1637
1638 for _ in range(0, NTHREADS) {
1639 let tx = tx.clone();
1640 spawn(proc() {
1641 for _ in range(0, AMT) { tx.send(1); }
1642 });
1643 }
1644 drop(tx);
1645 drx.recv();
1646 })
1647
1648 test!(fn oneshot_single_thread_close_port_first() {
1649 // Simple test of closing without sending
1650 let (_tx, rx) = sync_channel::<int>(0);
1651 drop(rx);
1652 })
1653
1654 test!(fn oneshot_single_thread_close_chan_first() {
1655 // Simple test of closing without sending
1656 let (tx, _rx) = sync_channel::<int>(0);
1657 drop(tx);
1658 })
1659
1660 test!(fn oneshot_single_thread_send_port_close() {
1661 // Testing that the sender cleans up the payload if receiver is closed
1662 let (tx, rx) = sync_channel::<Box<int>>(0);
1663 drop(rx);
1664 tx.send(box 0);
1665 } #[should_fail])
1666
1667 test!(fn oneshot_single_thread_recv_chan_close() {
1668 // Receiving on a closed chan will fail
1669 let res = task::try(proc() {
1670 let (tx, rx) = sync_channel::<int>(0);
1671 drop(tx);
1672 rx.recv();
1673 });
1674 // What is our res?
1675 assert!(res.is_err());
1676 })
1677
1678 test!(fn oneshot_single_thread_send_then_recv() {
1679 let (tx, rx) = sync_channel::<Box<int>>(1);
1680 tx.send(box 10);
1681 assert!(rx.recv() == box 10);
1682 })
1683
1684 test!(fn oneshot_single_thread_try_send_open() {
1685 let (tx, rx) = sync_channel::<int>(1);
1686 assert_eq!(tx.try_send(10), Ok(()));
1687 assert!(rx.recv() == 10);
1688 })
1689
1690 test!(fn oneshot_single_thread_try_send_closed() {
1691 let (tx, rx) = sync_channel::<int>(0);
1692 drop(rx);
1693 assert_eq!(tx.try_send(10), Err(RecvDisconnected(10)));
1694 })
1695
1696 test!(fn oneshot_single_thread_try_send_closed2() {
1697 let (tx, _rx) = sync_channel::<int>(0);
1698 assert_eq!(tx.try_send(10), Err(Full(10)));
1699 })
1700
1701 test!(fn oneshot_single_thread_try_recv_open() {
1702 let (tx, rx) = sync_channel::<int>(1);
1703 tx.send(10);
1704 assert!(rx.recv_opt() == Ok(10));
1705 })
1706
1707 test!(fn oneshot_single_thread_try_recv_closed() {
1708 let (tx, rx) = sync_channel::<int>(0);
1709 drop(tx);
1710 assert!(rx.recv_opt() == Err(()));
1711 })
1712
1713 test!(fn oneshot_single_thread_peek_data() {
1714 let (tx, rx) = sync_channel::<int>(1);
1715 assert_eq!(rx.try_recv(), Err(Empty))
1716 tx.send(10);
1717 assert_eq!(rx.try_recv(), Ok(10));
1718 })
1719
1720 test!(fn oneshot_single_thread_peek_close() {
1721 let (tx, rx) = sync_channel::<int>(0);
1722 drop(tx);
1723 assert_eq!(rx.try_recv(), Err(Disconnected));
1724 assert_eq!(rx.try_recv(), Err(Disconnected));
1725 })
1726
1727 test!(fn oneshot_single_thread_peek_open() {
1728 let (_tx, rx) = sync_channel::<int>(0);
1729 assert_eq!(rx.try_recv(), Err(Empty));
1730 })
1731
1732 test!(fn oneshot_multi_task_recv_then_send() {
1733 let (tx, rx) = sync_channel::<Box<int>>(0);
1734 spawn(proc() {
1735 assert!(rx.recv() == box 10);
1736 });
1737
1738 tx.send(box 10);
1739 })
1740
1741 test!(fn oneshot_multi_task_recv_then_close() {
1742 let (tx, rx) = sync_channel::<Box<int>>(0);
1743 spawn(proc() {
1744 drop(tx);
1745 });
1746 let res = task::try(proc() {
1747 assert!(rx.recv() == box 10);
1748 });
1749 assert!(res.is_err());
1750 })
1751
1752 test!(fn oneshot_multi_thread_close_stress() {
1753 for _ in range(0, stress_factor()) {
1754 let (tx, rx) = sync_channel::<int>(0);
1755 spawn(proc() {
1756 drop(rx);
1757 });
1758 drop(tx);
1759 }
1760 })
1761
1762 test!(fn oneshot_multi_thread_send_close_stress() {
1763 for _ in range(0, stress_factor()) {
1764 let (tx, rx) = sync_channel::<int>(0);
1765 spawn(proc() {
1766 drop(rx);
1767 });
1768 let _ = task::try(proc() {
1769 tx.send(1);
1770 });
1771 }
1772 })
1773
1774 test!(fn oneshot_multi_thread_recv_close_stress() {
1775 for _ in range(0, stress_factor()) {
1776 let (tx, rx) = sync_channel::<int>(0);
1777 spawn(proc() {
1778 let res = task::try(proc() {
1779 rx.recv();
1780 });
1781 assert!(res.is_err());
1782 });
1783 spawn(proc() {
1784 spawn(proc() {
1785 drop(tx);
1786 });
1787 });
1788 }
1789 })
1790
1791 test!(fn oneshot_multi_thread_send_recv_stress() {
1792 for _ in range(0, stress_factor()) {
1793 let (tx, rx) = sync_channel(0);
1794 spawn(proc() {
1795 tx.send(box 10);
1796 });
1797 spawn(proc() {
1798 assert!(rx.recv() == box 10);
1799 });
1800 }
1801 })
1802
1803 test!(fn stream_send_recv_stress() {
1804 for _ in range(0, stress_factor()) {
1805 let (tx, rx) = sync_channel(0);
1806
1807 send(tx, 0);
1808 recv(rx, 0);
1809
1810 fn send(tx: SyncSender<Box<int>>, i: int) {
1811 if i == 10 { return }
1812
1813 spawn(proc() {
1814 tx.send(box i);
1815 send(tx, i + 1);
1816 });
1817 }
1818
1819 fn recv(rx: Receiver<Box<int>>, i: int) {
1820 if i == 10 { return }
1821
1822 spawn(proc() {
1823 assert!(rx.recv() == box i);
1824 recv(rx, i + 1);
1825 });
1826 }
1827 }
1828 })
1829
1830 test!(fn recv_a_lot() {
1831 // Regression test that we don't run out of stack in scheduler context
1832 let (tx, rx) = sync_channel(10000);
1833 for _ in range(0, 10000) { tx.send(()); }
1834 for _ in range(0, 10000) { rx.recv(); }
1835 })
1836
1837 test!(fn shared_chan_stress() {
1838 let (tx, rx) = sync_channel(0);
1839 let total = stress_factor() + 100;
1840 for _ in range(0, total) {
1841 let tx = tx.clone();
1842 spawn(proc() {
1843 tx.send(());
1844 });
1845 }
1846
1847 for _ in range(0, total) {
1848 rx.recv();
1849 }
1850 })
1851
1852 test!(fn test_nested_recv_iter() {
1853 let (tx, rx) = sync_channel::<int>(0);
1854 let (total_tx, total_rx) = sync_channel::<int>(0);
1855
1856 spawn(proc() {
1857 let mut acc = 0;
1858 for x in rx.iter() {
1859 acc += x;
1860 }
1861 total_tx.send(acc);
1862 });
1863
1864 tx.send(3);
1865 tx.send(1);
1866 tx.send(2);
1867 drop(tx);
1868 assert_eq!(total_rx.recv(), 6);
1869 })
1870
1871 test!(fn test_recv_iter_break() {
1872 let (tx, rx) = sync_channel::<int>(0);
1873 let (count_tx, count_rx) = sync_channel(0);
1874
1875 spawn(proc() {
1876 let mut count = 0;
1877 for x in rx.iter() {
1878 if count >= 3 {
1879 break;
1880 } else {
1881 count += x;
1882 }
1883 }
1884 count_tx.send(count);
1885 });
1886
1887 tx.send(2);
1888 tx.send(2);
1889 tx.send(2);
1890 let _ = tx.try_send(2);
1891 drop(tx);
1892 assert_eq!(count_rx.recv(), 4);
1893 })
1894
1895 test!(fn try_recv_states() {
1896 let (tx1, rx1) = sync_channel::<int>(1);
1897 let (tx2, rx2) = sync_channel::<()>(1);
1898 let (tx3, rx3) = sync_channel::<()>(1);
1899 spawn(proc() {
1900 rx2.recv();
1901 tx1.send(1);
1902 tx3.send(());
1903 rx2.recv();
1904 drop(tx1);
1905 tx3.send(());
1906 });
1907
1908 assert_eq!(rx1.try_recv(), Err(Empty));
1909 tx2.send(());
1910 rx3.recv();
1911 assert_eq!(rx1.try_recv(), Ok(1));
1912 assert_eq!(rx1.try_recv(), Err(Empty));
1913 tx2.send(());
1914 rx3.recv();
1915 assert_eq!(rx1.try_recv(), Err(Disconnected));
1916 })
1917
1918 // This bug used to end up in a livelock inside of the Receiver destructor
1919 // because the internal state of the Shared packet was corrupted
1920 test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
1921 let (tx, rx) = sync_channel(0);
1922 let (tx2, rx2) = sync_channel(0);
1923 spawn(proc() {
1924 rx.recv(); // wait on a oneshot
1925 drop(rx); // destroy a shared
1926 tx2.send(());
1927 });
1928 // make sure the other task has gone to sleep
1929 for _ in range(0, 5000) { task::deschedule(); }
1930
1931 // upgrade to a shared chan and send a message
1932 let t = tx.clone();
1933 drop(tx);
1934 t.send(());
1935
1936 // wait for the child task to exit before we exit
1937 rx2.recv();
1938 })
1939
1940 test!(fn try_recvs_off_the_runtime() {
1941 use std::rt::thread::Thread;
1942
1943 let (tx, rx) = sync_channel(0);
1944 let (cdone, pdone) = channel();
1945 let t = Thread::start(proc() {
1946 let mut hits = 0;
1947 while hits < 10 {
1948 match rx.try_recv() {
1949 Ok(()) => { hits += 1; }
1950 Err(Empty) => { Thread::yield_now(); }
1951 Err(Disconnected) => return,
1952 }
1953 }
1954 cdone.send(());
1955 });
1956 for _ in range(0, 10) {
1957 tx.send(());
1958 }
1959 t.join();
1960 pdone.recv();
1961 })
1962
1963 test!(fn send_opt1() {
1964 let (tx, rx) = sync_channel(0);
1965 spawn(proc() { rx.recv(); });
1966 assert_eq!(tx.send_opt(1), Ok(()));
1967 })
1968
1969 test!(fn send_opt2() {
1970 let (tx, rx) = sync_channel(0);
1971 spawn(proc() { drop(rx); });
1972 assert_eq!(tx.send_opt(1), Err(1));
1973 })
1974
1975 test!(fn send_opt3() {
1976 let (tx, rx) = sync_channel(1);
1977 assert_eq!(tx.send_opt(1), Ok(()));
1978 spawn(proc() { drop(rx); });
1979 assert_eq!(tx.send_opt(1), Err(1));
1980 })
1981
1982 test!(fn send_opt4() {
1983 let (tx, rx) = sync_channel(0);
1984 let tx2 = tx.clone();
1985 let (done, donerx) = channel();
1986 let done2 = done.clone();
1987 spawn(proc() {
1988 assert_eq!(tx.send_opt(1), Err(1));
1989 done.send(());
1990 });
1991 spawn(proc() {
1992 assert_eq!(tx2.send_opt(2), Err(2));
1993 done2.send(());
1994 });
1995 drop(rx);
1996 donerx.recv();
1997 donerx.recv();
1998 })
1999
2000 test!(fn try_send1() {
2001 let (tx, _rx) = sync_channel(0);
2002 assert_eq!(tx.try_send(1), Err(Full(1)));
2003 })
2004
2005 test!(fn try_send2() {
2006 let (tx, _rx) = sync_channel(1);
2007 assert_eq!(tx.try_send(1), Ok(()));
2008 assert_eq!(tx.try_send(1), Err(Full(1)));
2009 })
2010
2011 test!(fn try_send3() {
2012 let (tx, rx) = sync_channel(1);
2013 assert_eq!(tx.try_send(1), Ok(()));
2014 drop(rx);
2015 assert_eq!(tx.try_send(1), Err(RecvDisconnected(1)));
2016 })
2017
2018 test!(fn try_send4() {
2019 let (tx, rx) = sync_channel(0);
2020 spawn(proc() {
2021 for _ in range(0, 1000) { task::deschedule(); }
2022 assert_eq!(tx.try_send(1), Ok(()));
2023 });
2024 assert_eq!(rx.recv(), 1);
2025 } #[ignore(reason = "flaky on libnative")])
2026 }
libstd/comm/mod.rs:374:29-374:29 -enum- definition:
pub enum TrySendError<T> {
/// The data could not be sent on the channel because it would require that
/// the callee block to send the data.
references:- 8373: /// `SyncSender::try_send` method.
375: pub enum TrySendError<T> {
--
705: /// This function cannot fail
706: pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
707: unsafe { (*self.inner.get()).try_send(t) }
libstd/comm/sync.rs:
205: pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
206: let (guard, state) = self.lock();
libstd/comm/mod.rs:
373: /// `SyncSender::try_send` method.
375: pub enum TrySendError<T> {
libstd/comm/mod.rs:436:8-436:8 -fn- definition:
/// ```
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
references:- 4libstd/io/process.rs:
392: fn read(stream: Option<io::PipeStream>) -> Receiver<IoResult<Vec<u8>>> {
393: let (tx, rx) = channel();
394: match stream {
libstd/io/signal.rs:
99: pub fn new() -> Listener {
100: let (tx, rx) = channel();
101: Listener {
libstd/task.rs:
196: -> Result<T, Box<Any:Send>> {
197: let (tx, rx) = channel();
libstd/comm/mod.rs:395:15-395:15 -trait- definition:
trait UnsafeFlavor<T> {
fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>>;
unsafe fn mut_inner<'a>(&'a self) -> &'a mut Flavor<T> {
references:- 2404: }
405: impl<T> UnsafeFlavor<T> for Sender<T> {
406: fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
--
409: }
410: impl<T> UnsafeFlavor<T> for Receiver<T> {
411: fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
libstd/comm/mod.rs:362:29-362:29 -enum- definition:
pub enum TryRecvError {
/// This channel is currently empty, but the sender(s) have not yet
/// disconnected, so data may yet become available.
references:- 7769: /// This function cannot fail.
770: pub fn try_recv(&self) -> Result<T, TryRecvError> {
771: // If a thread is spinning in try_recv, we should take the opportunity
libstd/comm/mod.rs:387:1-387:1 -enum- definition:
enum Flavor<T> {
Oneshot(UnsafeArc<oneshot::Packet<T>>),
Stream(UnsafeArc<stream::Packet<T>>),
references:- 9405: impl<T> UnsafeFlavor<T> for Sender<T> {
406: fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
407: &self.inner
--
410: impl<T> UnsafeFlavor<T> for Receiver<T> {
411: fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> {
412: &self.inner
--
482: impl<T: Send> Sender<T> {
483: fn new(inner: Flavor<T>) -> Sender<T> {
484: Sender { inner: Unsafe::new(inner), sends: Cell::new(0), marker: marker::NoShare }
--
729: impl<T: Send> Receiver<T> {
730: fn new(inner: Flavor<T>) -> Receiver<T> {
731: Receiver { inner: Unsafe::new(inner), receives: Cell::new(0), marker: marker::NoShare }
libstd/comm/mod.rs:338:57-338:57 -struct- definition:
/// returned when the corresponding channel has hung up.
pub struct Messages<'a, T> {
rx: &'a Receiver<T>
references:- 3870: pub fn iter<'a>(&'a self) -> Messages<'a, T> {
871: Messages { rx: self }
872: }
--
962: impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
963: fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
libstd/comm/mod.rs:353:68-353:68 -struct- definition:
/// owned by one task, but it can be cloned to send to other tasks.
pub struct SyncSender<T> {
inner: UnsafeArc<sync::Packet<T>>,
references:- 7648: fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
649: SyncSender { inner: inner, marker: marker::NoShare }
650: }
--
719: impl<T: Send> Drop for SyncSender<T> {
720: fn drop(&mut self) {
libstd/comm/mod.rs:328:13-328:13 -struct- definition:
/// one task
pub struct Receiver<T> {
inner: Unsafe<Flavor<T>>,
references:- 32libstd/comm/select.rs:
libstd/comm/oneshot.rs:
libstd/comm/stream.rs:
libstd/io/comm_adapters.rs:
libstd/io/process.rs:
libstd/io/signal.rs:
libstd/io/timer.rs:
libstd/rt/rtio.rs:
libstd/task.rs:
libstd/comm/oneshot.rs:
libstd/comm/mod.rs:344:68-344:68 -struct- definition:
/// owned by one task, but it can be cloned to send to other tasks.
pub struct Sender<T> {
inner: Unsafe<Flavor<T>>,
references:- 14483: fn new(inner: Flavor<T>) -> Sender<T> {
484: Sender { inner: Unsafe::new(inner), sends: Cell::new(0), marker: marker::NoShare }
485: }
--
597: impl<T: Send> Clone for Sender<T> {
598: fn clone(&self) -> Sender<T> {
--
632: impl<T: Send> Drop for Sender<T> {
633: fn drop(&mut self) {
libstd/io/comm_adapters.rs:
106: /// Wraps a channel in a `ChanWriter` structure
107: pub fn new(tx: Sender<~[u8]>) -> ChanWriter {
108: ChanWriter { tx: tx }
libstd/io/signal.rs:
87: /// the clients from the receiver.
88: tx: Sender<Signum>,
libstd/rt/task.rs:
76: /// A channel to send the result of the task on when the task exits
77: SendMessage(Sender<TaskResult>),
78: }
libstd/rt/rtio.rs:
198: -> IoResult<Box<RtioTTY:Send>>;
199: fn signal(&mut self, signal: Signum, channel: Sender<Signum>)
200: -> IoResult<Box<RtioSignal:Send>>;
libstd/task.rs:
65: /// Enable lifecycle notifications on the given channel
66: pub notify_chan: Option<Sender<TaskResult>>,
67: /// A name for the task-to-be, for identification in failure messages
libstd/comm/mod.rs:
482: impl<T: Send> Sender<T> {
483: fn new(inner: Flavor<T>) -> Sender<T> {
484: Sender { inner: Unsafe::new(inner), sends: Cell::new(0), marker: marker::NoShare }