(index<- ) ./libstd/rt/comm.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Ports and channels.
12
13 use option::*;
14 use cast;
15 use ops::Drop;
16 use rt::kill::BlockedTask;
17 use kinds::Send;
18 use rt;
19 use rt::sched::Scheduler;
20 use rt::local::Local;
21 use rt::select::{SelectInner, SelectPortInner};
22 use select::{Select, SelectPort};
23 use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
24 use unstable::sync::UnsafeArc;
25 use util::Void;
26 use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
27 use cell::Cell;
28 use clone::Clone;
29 use tuple::ImmutableTuple;
30
31 /// A combined refcount / BlockedTask-as-uint pointer.
32 ///
33 /// Can be equal to the following values:
34 ///
35 /// * 2 - both endpoints are alive
36 /// * 1 - either the sender or the receiver is dead, determined by context
37 /// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
38 type State = uint;
39
40 static STATE_BOTH: State = 2;
41 static STATE_ONE: State = 1;
42
43 /// The heap-allocated structure shared between two endpoints.
44 struct Packet<T> {
45 state: AtomicUint,
46 payload: Option<T>,
47 }
48
49 // A one-shot channel.
50 pub struct ChanOne<T> {
51 void_packet: *mut Void,
52 suppress_finalize: bool
53 }
54
55 /// A one-shot port.
56 pub struct PortOne<T> {
57 void_packet: *mut Void,
58 suppress_finalize: bool
59 }
60
61 pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
62 let packet: ~Packet<T> = ~Packet {
63 state: AtomicUint::new(STATE_BOTH),
64 payload: None
65 };
66
67 unsafe {
68 let packet: *mut Void = cast::transmute(packet);
69 let port = PortOne {
70 void_packet: packet,
71 suppress_finalize: false
72 };
73 let chan = ChanOne {
74 void_packet: packet,
75 suppress_finalize: false
76 };
77 return (port, chan);
78 }
79 }
80
81 impl<T> ChanOne<T> {
82 #[inline]
83 fn packet(&self) -> *mut Packet<T> {
84 unsafe {
85 let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
86 let p: *mut Packet<T> = &mut **p;
87 return p;
88 }
89 }
90
91 /// Send a message on the one-shot channel. If a receiver task is blocked
92 /// waiting for the message, will wake it up and reschedule to it.
93 pub fn send(self, val: T) {
94 self.try_send(val);
95 }
96
97 /// As `send`, but also returns whether or not the receiver endpoint is still open.
98 pub fn try_send(self, val: T) -> bool {
99 self.try_send_inner(val, true)
100 }
101
102 /// Send a message without immediately rescheduling to a blocked receiver.
103 /// This can be useful in contexts where rescheduling is forbidden, or to
104 /// optimize for when the sender expects to still have useful work to do.
105 pub fn send_deferred(self, val: T) {
106 self.try_send_deferred(val);
107 }
108
109 /// As `send_deferred` and `try_send` together.
110 pub fn try_send_deferred(self, val: T) -> bool {
111 self.try_send_inner(val, false)
112 }
113
114 // 'do_resched' configures whether the scheduler immediately switches to
115 // the receiving task, or leaves the sending task still running.
116 fn try_send_inner(self, val: T, do_resched: bool) -> bool {
117 if do_resched {
118 rtassert!(!rt::in_sched_context());
119 }
120
121 // In order to prevent starvation of other tasks in situations
122 // where a task sends repeatedly without ever receiving, we
123 // occassionally yield instead of doing a send immediately.
124 // Only doing this if we're doing a rescheduling send,
125 // otherwise the caller is expecting not to context switch.
126 if do_resched {
127 // XXX: This TLS hit should be combined with other uses of the scheduler below
128 let sched: ~Scheduler = Local::take();
129 sched.maybe_yield();
130 }
131
132 let mut this = self;
133 let mut recvr_active = true;
134 let packet = this.packet();
135
136 unsafe {
137
138 // Install the payload
139 rtassert!((*packet).payload.is_none());
140 (*packet).payload = Some(val);
141
142 // Atomically swap out the old state to figure out what
143 // the port's up to, issuing a release barrier to prevent
144 // reordering of the payload write. This also issues an
145 // acquire barrier that keeps the subsequent access of the
146 // ~Task pointer from being reordered.
147 let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
148
149 // Suppress the synchronizing actions in the finalizer. We're
150 // done with the packet. NB: In case of do_resched, this *must*
151 // happen before waking up a blocked task (or be unkillable),
152 // because we might get a kill signal during the reschedule.
153 this.suppress_finalize = true;
154
155 match oldstate {
156 STATE_BOTH => {
157 // Port is not waiting yet. Nothing to do
158 }
159 STATE_ONE => {
160 // Port has closed. Need to clean up.
161 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
162 recvr_active = false;
163 }
164 task_as_state => {
165 // Port is blocked. Wake it up.
166 let recvr = BlockedTask::cast_from_uint(task_as_state);
167 if do_resched {
168 do recvr.wake().map |woken_task| {
169 Scheduler::run_task(woken_task);
170 };
171 } else {
172 let recvr = Cell::new(recvr);
173 do Local::borrow |sched: &mut Scheduler| {
174 sched.enqueue_blocked_task(recvr.take());
175 }
176 }
177 }
178 }
179 }
180
181 return recvr_active;
182 }
183 }
184
185 impl<T> PortOne<T> {
186 fn packet(&self) -> *mut Packet<T> {
187 unsafe {
188 let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
189 let p: *mut Packet<T> = &mut **p;
190 return p;
191 }
192 }
193
194 /// Wait for a message on the one-shot port. Fails if the send end is closed.
195 pub fn recv(self) -> T {
196 match self.try_recv() {
197 Some(val) => val,
198 None => {
199 fail2!("receiving on closed channel");
200 }
201 }
202 }
203
204 /// As `recv`, but returns `None` if the send end is closed rather than failing.
205 pub fn try_recv(self) -> Option<T> {
206 let mut this = self;
207
208 // Optimistic check. If data was sent already, we don't even need to block.
209 // No release barrier needed here; we're not handing off our task pointer yet.
210 if !this.optimistic_check() {
211 // No data available yet.
212 // Switch to the scheduler to put the ~Task into the Packet state.
213 let sched: ~Scheduler = Local::take();
214 do sched.deschedule_running_task_and_then |sched, task| {
215 this.block_on(sched, task);
216 }
217 }
218
219 // Task resumes.
220 this.recv_ready()
221 }
222 }
223
224 impl<T> SelectInner for PortOne<T> {
225 #[inline] #[cfg(not(test))]
226 fn optimistic_check(&mut self) -> bool {
227 unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
228 }
229
230 #[inline] #[cfg(test)]
231 fn optimistic_check(&mut self) -> bool {
232 // The optimistic check is never necessary for correctness. For testing
233 // purposes, making it randomly return false simulates a racing sender.
234 use rand::{Rand};
235 let actually_check = do Local::borrow |sched: &mut Scheduler| {
236 Rand::rand(&mut sched.rng)
237 };
238 if actually_check {
239 unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
240 } else {
241 false
242 }
243 }
244
245 fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
246 unsafe {
247 // Atomically swap the task pointer into the Packet state, issuing
248 // an acquire barrier to prevent reordering of the subsequent read
249 // of the payload. Also issues a release barrier to prevent
250 // reordering of any previous writes to the task structure.
251 let task_as_state = task.cast_to_uint();
252 let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
253 match oldstate {
254 STATE_BOTH => {
255 // Data has not been sent. Now we're blocked.
256 rtdebug!("non-rendezvous recv");
257 false
258 }
259 STATE_ONE => {
260 // Re-record that we are the only owner of the packet.
261 // No barrier needed, even if the task gets reawoken
262 // on a different core -- this is analogous to writing a
263 // payload; a barrier in enqueueing the task protects it.
264 // NB(#8132). This *must* occur before the enqueue below.
265 // FIXME(#6842, #8130) This is usually only needed for the
266 // assertion in recv_ready, except in the case of select().
267 // This won't actually ever have cacheline contention, but
268 // maybe should be optimized out with a cfg(test) anyway?
269 (*self.packet()).state.store(STATE_ONE, Relaxed);
270
271 rtdebug!("rendezvous recv");
272
273 // Channel is closed. Switch back and check the data.
274 // NB: We have to drop back into the scheduler event loop here
275 // instead of switching immediately back or we could end up
276 // triggering infinite recursion on the scheduler's stack.
277 let recvr = BlockedTask::cast_from_uint(task_as_state);
278 sched.enqueue_blocked_task(recvr);
279 true
280 }
281 _ => rtabort!("can't block_on; a task is already blocked")
282 }
283 }
284 }
285
286 // This is the only select trait function that's not also used in recv.
287 fn unblock_from(&mut self) -> bool {
288 let packet = self.packet();
289 unsafe {
290 // In case the data is available, the acquire barrier here matches
291 // the release barrier the sender used to release the payload.
292 match (*packet).state.load(Acquire) {
293 // Impossible. We removed STATE_BOTH when blocking on it, and
294 // no self-respecting sender would put it back.
295 STATE_BOTH => rtabort!("refcount already 2 in unblock_from"),
296 // Here, a sender already tried to wake us up. Perhaps they
297 // even succeeded! Data is available.
298 STATE_ONE => true,
299 // Still registered as blocked. Need to "unblock" the pointer.
300 task_as_state => {
301 // In the window between the load and the CAS, a sender
302 // might take the pointer and set the refcount to ONE. If
303 // that happens, we shouldn't clobber that with BOTH!
304 // Acquire barrier again for the same reason as above.
305 match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
306 Acquire) {
307 STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
308 STATE_ONE => true, // Lost the race. Data available.
309 same_ptr => {
310 // We successfully unblocked our task pointer.
311 rtassert!(task_as_state == same_ptr);
312 let handle = BlockedTask::cast_from_uint(task_as_state);
313 // Because we are already awake, the handle we
314 // gave to this port shall already be empty.
315 handle.assert_already_awake();
316 false
317 }
318 }
319 }
320 }
321 }
322 }
323 }
324
325 impl<T> Select for PortOne<T> { }
326
327 impl<T> SelectPortInner<T> for PortOne<T> {
328 fn recv_ready(self) -> Option<T> {
329 let mut this = self;
330 let packet = this.packet();
331
332 // No further memory barrier is needed here to access the
333 // payload. Some scenarios:
334 //
335 // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
336 // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us
337 // and ran on its thread. The sending task issued a read barrier when taking the
338 // pointer to the receiving task.
339 // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
340 // is pinned to some other scheduler, so the sending task had to give us to
341 // a different scheduler for resuming. That send synchronized memory.
342 unsafe {
343 // See corresponding store() above in block_on for rationale.
344 // FIXME(#8130) This can happen only in test builds.
345 // This load is not required for correctness and may be compiled out.
346 rtassert!((*packet).state.load(Relaxed) == STATE_ONE);
347
348 let payload = (*packet).payload.take();
349
350 // The sender has closed up shop. Drop the packet.
351 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
352 // Suppress the synchronizing actions in the finalizer. We're done with the packet.
353 this.suppress_finalize = true;
354 return payload;
355 }
356 }
357 }
358
359 impl<T> SelectPort<T> for PortOne<T> { }
360
361 impl<T> Peekable<T> for PortOne<T> {
362 fn peek(&self) -> bool {
363 unsafe {
364 let packet: *mut Packet<T> = self.packet();
365 let oldstate = (*packet).state.load(SeqCst);
366 match oldstate {
367 STATE_BOTH => false,
368 STATE_ONE => (*packet).payload.is_some(),
369 _ => rtabort!("peeked on a blocked task")
370 }
371 }
372 }
373 }
374
375 #[unsafe_destructor]
376 impl<T> Drop for ChanOne<T> {
377 fn drop(&mut self) {
378 if self.suppress_finalize { return }
379
380 unsafe {
381 let this = cast::transmute_mut(self);
382 let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
383 match oldstate {
384 STATE_BOTH => {
385 // Port still active. It will destroy the Packet.
386 },
387 STATE_ONE => {
388 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
389 },
390 task_as_state => {
391 // The port is blocked waiting for a message we will never send. Wake it.
392 rtassert!((*this.packet()).payload.is_none());
393 let recvr = BlockedTask::cast_from_uint(task_as_state);
394 do recvr.wake().map |woken_task| {
395 Scheduler::run_task(woken_task);
396 };
397 }
398 }
399 }
400 }
401 }
402
403 #[unsafe_destructor]
404 impl<T> Drop for PortOne<T> {
405 fn drop(&mut self) {
406 if self.suppress_finalize { return }
407
408 unsafe {
409 let this = cast::transmute_mut(self);
410 let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
411 match oldstate {
412 STATE_BOTH => {
413 // Chan still active. It will destroy the packet.
414 },
415 STATE_ONE => {
416 let _packet: ~Packet<T> = cast::transmute(this.void_packet);
417 }
418 task_as_state => {
419 // This case occurs during unwinding, when the blocked
420 // receiver was killed awake. The task can't still be
421 // blocked (we are it), but we need to free the handle.
422 let recvr = BlockedTask::cast_from_uint(task_as_state);
423 recvr.assert_already_awake();
424 }
425 }
426 }
427 }
428 }
429
430 /// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
431 pub trait SendDeferred<T> {
432 fn send_deferred(&self, val: T);
433 fn try_send_deferred(&self, val: T) -> bool;
434 }
435
436 struct StreamPayload<T> {
437 val: T,
438 next: PortOne<StreamPayload<T>>
439 }
440
441 type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
442 type StreamPortOne<T> = PortOne<StreamPayload<T>>;
443
444 /// A channel with unbounded size.
445 pub struct Chan<T> {
446 // FIXME #5372. Using Cell because we don't take &mut self
447 next: Cell<StreamChanOne<T>>
448 }
449
450 /// An port with unbounded size.
451 pub struct Port<T> {
452 // FIXME #5372. Using Cell because we don't take &mut self
453 next: Cell<StreamPortOne<T>>
454 }
455
456 pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
457 let (pone, cone) = oneshot();
458 let port = Port { next: Cell::new(pone) };
459 let chan = Chan { next: Cell::new(cone) };
460 return (port, chan);
461 }
462
463 impl<T: Send> Chan<T> {
464 fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
465 let (next_pone, next_cone) = oneshot();
466 let cone = self.next.take();
467 self.next.put_back(next_cone);
468 cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
469 }
470 }
471
472 impl<T: Send> GenericChan<T> for Chan<T> {
473 fn send(&self, val: T) {
474 self.try_send(val);
475 }
476 }
477
478 impl<T: Send> GenericSmartChan<T> for Chan<T> {
479 fn try_send(&self, val: T) -> bool {
480 self.try_send_inner(val, true)
481 }
482 }
483
484 impl<T: Send> SendDeferred<T> for Chan<T> {
485 fn send_deferred(&self, val: T) {
486 self.try_send_deferred(val);
487 }
488 fn try_send_deferred(&self, val: T) -> bool {
489 self.try_send_inner(val, false)
490 }
491 }
492
493 impl<T> GenericPort<T> for Port<T> {
494 fn recv(&self) -> T {
495 match self.try_recv() {
496 Some(val) => val,
497 None => {
498 fail2!("receiving on closed channel");
499 }
500 }
501 }
502
503 fn try_recv(&self) -> Option<T> {
504 do self.next.take_opt().map_default(None) |pone| {
505 match pone.try_recv() {
506 Some(StreamPayload { val, next }) => {
507 self.next.put_back(next);
508 Some(val)
509 }
510 None => None
511 }
512 }
513 }
514 }
515
516 impl<T> Peekable<T> for Port<T> {
517 fn peek(&self) -> bool {
518 self.next.with_mut_ref(|p| p.peek())
519 }
520 }
521
522 // XXX: Kind of gross. A Port<T> should be selectable so you can make an array
523 // of them, but a &Port<T> should also be selectable so you can select2 on it
524 // alongside a PortOne<U> without passing the port by value in recv_ready.
525
526 impl<'self, T> SelectInner for &'self Port<T> {
527 #[inline]
528 fn optimistic_check(&mut self) -> bool {
529 do self.next.with_mut_ref |pone| { pone.optimistic_check() }
530 }
531
532 #[inline]
533 fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
534 let task = Cell::new(task);
535 do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) }
536 }
537
538 #[inline]
539 fn unblock_from(&mut self) -> bool {
540 do self.next.with_mut_ref |pone| { pone.unblock_from() }
541 }
542 }
543
544 impl<'self, T> Select for &'self Port<T> { }
545
546 impl<T> SelectInner for Port<T> {
547 #[inline]
548 fn optimistic_check(&mut self) -> bool {
549 (&*self).optimistic_check()
550 }
551
552 #[inline]
553 fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
554 (&*self).block_on(sched, task)
555 }
556
557 #[inline]
558 fn unblock_from(&mut self) -> bool {
559 (&*self).unblock_from()
560 }
561 }
562
563 impl<T> Select for Port<T> { }
564
565 impl<'self, T> SelectPortInner<T> for &'self Port<T> {
566 fn recv_ready(self) -> Option<T> {
567 match self.next.take().recv_ready() {
568 Some(StreamPayload { val, next }) => {
569 self.next.put_back(next);
570 Some(val)
571 }
572 None => None
573 }
574 }
575 }
576
577 impl<'self, T> SelectPort<T> for &'self Port<T> { }
578
579 pub struct SharedChan<T> {
580 // Just like Chan, but a shared AtomicOption instead of Cell
581 priv next: UnsafeArc<AtomicOption<StreamChanOne<T>>>
582 }
583
584 impl<T> SharedChan<T> {
585 pub fn new(chan: Chan<T>) -> SharedChan<T> {
586 let next = chan.next.take();
587 let next = AtomicOption::new(~next);
588 SharedChan { next: UnsafeArc::new(next) }
589 }
590 }
591
592 impl<T: Send> SharedChan<T> {
593 fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
594 unsafe {
595 let (next_pone, next_cone) = oneshot();
596 let cone = (*self.next.get()).swap(~next_cone, SeqCst);
597 cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
598 do_resched)
599 }
600 }
601 }
602
603 impl<T: Send> GenericChan<T> for SharedChan<T> {
604 fn send(&self, val: T) {
605 self.try_send(val);
606 }
607 }
608
609 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
610 fn try_send(&self, val: T) -> bool {
611 self.try_send_inner(val, true)
612 }
613 }
614
615 impl<T: Send> SendDeferred<T> for SharedChan<T> {
616 fn send_deferred(&self, val: T) {
617 self.try_send_deferred(val);
618 }
619 fn try_send_deferred(&self, val: T) -> bool {
620 self.try_send_inner(val, false)
621 }
622 }
623
624 impl<T> Clone for SharedChan<T> {
625 fn clone(&self) -> SharedChan<T> {
626 SharedChan {
627 next: self.next.clone()
628 }
629 }
630 }
631
632 pub struct SharedPort<T> {
633 // The next port on which we will receive the next port on which we will receive T
634 priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>>
635 }
636
637 impl<T> SharedPort<T> {
638 pub fn new(port: Port<T>) -> SharedPort<T> {
639 // Put the data port into a new link pipe
640 let next_data_port = port.next.take();
641 let (next_link_port, next_link_chan) = oneshot();
642 next_link_chan.send(next_data_port);
643 let next_link = AtomicOption::new(~next_link_port);
644 SharedPort { next_link: UnsafeArc::new(next_link) }
645 }
646 }
647
648 impl<T: Send> GenericPort<T> for SharedPort<T> {
649 fn recv(&self) -> T {
650 match self.try_recv() {
651 Some(val) => val,
652 None => {
653 fail2!("receiving on a closed channel");
654 }
655 }
656 }
657
658 fn try_recv(&self) -> Option<T> {
659 unsafe {
660 let (next_link_port, next_link_chan) = oneshot();
661 let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
662 let link_port = link_port.unwrap();
663 let data_port = link_port.recv();
664 let (next_data_port, res) = match data_port.try_recv() {
665 Some(StreamPayload { val, next }) => {
666 (next, Some(val))
667 }
668 None => {
669 let (next_data_port, _) = oneshot();
670 (next_data_port, None)
671 }
672 };
673 next_link_chan.send(next_data_port);
674 return res;
675 }
676 }
677 }
678
679 impl<T> Clone for SharedPort<T> {
680 fn clone(&self) -> SharedPort<T> {
681 SharedPort {
682 next_link: self.next_link.clone()
683 }
684 }
685 }
686
687 // FIXME #7760: Need better name
688 type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
689
690 pub fn megapipe<T: Send>() -> MegaPipe<T> {
691 let (port, chan) = stream();
692 (SharedPort::new(port), SharedChan::new(chan))
693 }
694
695 impl<T: Send> GenericChan<T> for MegaPipe<T> {
696 fn send(&self, val: T) {
697 self.second_ref().send(val)
698 }
699 }
700
701 impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
702 fn try_send(&self, val: T) -> bool {
703 self.second_ref().try_send(val)
704 }
705 }
706
707 impl<T: Send> GenericPort<T> for MegaPipe<T> {
708 fn recv(&self) -> T {
709 self.first_ref().recv()
710 }
711
712 fn try_recv(&self) -> Option<T> {
713 self.first_ref().try_recv()
714 }
715 }
716
717 impl<T: Send> SendDeferred<T> for MegaPipe<T> {
718 fn send_deferred(&self, val: T) {
719 self.second_ref().send_deferred(val)
720 }
721 fn try_send_deferred(&self, val: T) -> bool {
722 self.second_ref().try_send_deferred(val)
723 }
724 }
725
726 #[cfg(test)]
727 mod test {
728 use super::*;
729 use option::*;
730 use rt::test::*;
731 use cell::Cell;
732 use num::Times;
733 use rt::util;
734
735 #[test]
736 fn oneshot_single_thread_close_port_first() {
737 // Simple test of closing without sending
738 do run_in_newsched_task {
739 let (port, _chan) = oneshot::<int>();
740 { let _p = port; }
741 }
742 }
743
744 #[test]
745 fn oneshot_single_thread_close_chan_first() {
746 // Simple test of closing without sending
747 do run_in_newsched_task {
748 let (_port, chan) = oneshot::<int>();
749 { let _c = chan; }
750 }
751 }
752
753 #[test]
754 fn oneshot_single_thread_send_port_close() {
755 // Testing that the sender cleans up the payload if receiver is closed
756 do run_in_newsched_task {
757 let (port, chan) = oneshot::<~int>();
758 { let _p = port; }
759 chan.send(~0);
760 }
761 }
762
763 #[test]
764 fn oneshot_single_thread_recv_chan_close() {
765 // Receiving on a closed chan will fail
766 do run_in_newsched_task {
767 let res = do spawntask_try {
768 let (port, chan) = oneshot::<~int>();
769 { let _c = chan; }
770 port.recv();
771 };
772 // What is our res?
773 rtdebug!("res is: {:?}", res.is_err());
774 assert!(res.is_err());
775 }
776 }
777
778 #[test]
779 fn oneshot_single_thread_send_then_recv() {
780 do run_in_newsched_task {
781 let (port, chan) = oneshot::<~int>();
782 chan.send(~10);
783 assert!(port.recv() == ~10);
784 }
785 }
786
787 #[test]
788 fn oneshot_single_thread_try_send_open() {
789 do run_in_newsched_task {
790 let (port, chan) = oneshot::<int>();
791 assert!(chan.try_send(10));
792 assert!(port.recv() == 10);
793 }
794 }
795
796 #[test]
797 fn oneshot_single_thread_try_send_closed() {
798 do run_in_newsched_task {
799 let (port, chan) = oneshot::<int>();
800 { let _p = port; }
801 assert!(!chan.try_send(10));
802 }
803 }
804
805 #[test]
806 fn oneshot_single_thread_try_recv_open() {
807 do run_in_newsched_task {
808 let (port, chan) = oneshot::<int>();
809 chan.send(10);
810 assert!(port.try_recv() == Some(10));
811 }
812 }
813
814 #[test]
815 fn oneshot_single_thread_try_recv_closed() {
816 do run_in_newsched_task {
817 let (port, chan) = oneshot::<int>();
818 { let _c = chan; }
819 assert!(port.try_recv() == None);
820 }
821 }
822
823 #[test]
824 fn oneshot_single_thread_peek_data() {
825 do run_in_newsched_task {
826 let (port, chan) = oneshot::<int>();
827 assert!(!port.peek());
828 chan.send(10);
829 assert!(port.peek());
830 }
831 }
832
833 #[test]
834 fn oneshot_single_thread_peek_close() {
835 do run_in_newsched_task {
836 let (port, chan) = oneshot::<int>();
837 { let _c = chan; }
838 assert!(!port.peek());
839 assert!(!port.peek());
840 }
841 }
842
843 #[test]
844 fn oneshot_single_thread_peek_open() {
845 do run_in_newsched_task {
846 let (port, _) = oneshot::<int>();
847 assert!(!port.peek());
848 }
849 }
850
851 #[test]
852 fn oneshot_multi_task_recv_then_send() {
853 do run_in_newsched_task {
854 let (port, chan) = oneshot::<~int>();
855 let port_cell = Cell::new(port);
856 do spawntask {
857 assert!(port_cell.take().recv() == ~10);
858 }
859
860 chan.send(~10);
861 }
862 }
863
864 #[test]
865 fn oneshot_multi_task_recv_then_close() {
866 do run_in_newsched_task {
867 let (port, chan) = oneshot::<~int>();
868 let port_cell = Cell::new(port);
869 let chan_cell = Cell::new(chan);
870 do spawntask_later {
871 let _cell = chan_cell.take();
872 }
873 let res = do spawntask_try {
874 assert!(port_cell.take().recv() == ~10);
875 };
876 assert!(res.is_err());
877 }
878 }
879
880 #[test]
881 fn oneshot_multi_thread_close_stress() {
882 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
883 do stress_factor().times {
884 do run_in_newsched_task {
885 let (port, chan) = oneshot::<int>();
886 let port_cell = Cell::new(port);
887 let thread = do spawntask_thread {
888 let _p = port_cell.take();
889 };
890 let _chan = chan;
891 thread.join();
892 }
893 }
894 }
895
896 #[test]
897 fn oneshot_multi_thread_send_close_stress() {
898 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
899 do stress_factor().times {
900 do run_in_newsched_task {
901 let (port, chan) = oneshot::<int>();
902 let chan_cell = Cell::new(chan);
903 let port_cell = Cell::new(port);
904 let thread1 = do spawntask_thread {
905 let _p = port_cell.take();
906 };
907 let thread2 = do spawntask_thread {
908 let c = chan_cell.take();
909 c.send(1);
910 };
911 thread1.join();
912 thread2.join();
913 }
914 }
915 }
916
917 #[test]
918 fn oneshot_multi_thread_recv_close_stress() {
919 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
920 do stress_factor().times {
921 do run_in_newsched_task {
922 let (port, chan) = oneshot::<int>();
923 let chan_cell = Cell::new(chan);
924 let port_cell = Cell::new(port);
925 let thread1 = do spawntask_thread {
926 let port_cell = Cell::new(port_cell.take());
927 let res = do spawntask_try {
928 port_cell.take().recv();
929 };
930 assert!(res.is_err());
931 };
932 let thread2 = do spawntask_thread {
933 let chan_cell = Cell::new(chan_cell.take());
934 do spawntask {
935 chan_cell.take();
936 }
937 };
938 thread1.join();
939 thread2.join();
940 }
941 }
942 }
943
944 #[test]
945 fn oneshot_multi_thread_send_recv_stress() {
946 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
947 do stress_factor().times {
948 do run_in_newsched_task {
949 let (port, chan) = oneshot::<~int>();
950 let chan_cell = Cell::new(chan);
951 let port_cell = Cell::new(port);
952 let thread1 = do spawntask_thread {
953 chan_cell.take().send(~10);
954 };
955 let thread2 = do spawntask_thread {
956 assert!(port_cell.take().recv() == ~10);
957 };
958 thread1.join();
959 thread2.join();
960 }
961 }
962 }
963
964 #[test]
965 fn stream_send_recv_stress() {
966 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
967 do stress_factor().times {
968 do run_in_mt_newsched_task {
969 let (port, chan) = stream::<~int>();
970
971 send(chan, 0);
972 recv(port, 0);
973
974 fn send(chan: Chan<~int>, i: int) {
975 if i == 10 { return }
976
977 let chan_cell = Cell::new(chan);
978 do spawntask_random {
979 let chan = chan_cell.take();
980 chan.send(~i);
981 send(chan, i + 1);
982 }
983 }
984
985 fn recv(port: Port<~int>, i: int) {
986 if i == 10 { return }
987
988 let port_cell = Cell::new(port);
989 do spawntask_random {
990 let port = port_cell.take();
991 assert!(port.recv() == ~i);
992 recv(port, i + 1);
993 };
994 }
995 }
996 }
997 }
998
999 #[test]
1000 fn recv_a_lot() {
1001 // Regression test that we don't run out of stack in scheduler context
1002 do run_in_newsched_task {
1003 let (port, chan) = stream();
1004 do 10000.times { chan.send(()) }
1005 do 10000.times { port.recv() }
1006 }
1007 }
1008
1009 #[test]
1010 fn shared_chan_stress() {
1011 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1012 do run_in_mt_newsched_task {
1013 let (port, chan) = stream();
1014 let chan = SharedChan::new(chan);
1015 let total = stress_factor() + 100;
1016 do total.times {
1017 let chan_clone = chan.clone();
1018 do spawntask_random {
1019 chan_clone.send(());
1020 }
1021 }
1022
1023 do total.times {
1024 port.recv();
1025 }
1026 }
1027 }
1028
1029 #[test]
1030 fn shared_port_stress() {
1031 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1032 do run_in_mt_newsched_task {
1033 let (end_port, end_chan) = stream();
1034 let (port, chan) = stream();
1035 let end_chan = SharedChan::new(end_chan);
1036 let port = SharedPort::new(port);
1037 let total = stress_factor() + 100;
1038 do total.times {
1039 let end_chan_clone = end_chan.clone();
1040 let port_clone = port.clone();
1041 do spawntask_random {
1042 port_clone.recv();
1043 end_chan_clone.send(());
1044 }
1045 }
1046
1047 do total.times {
1048 chan.send(());
1049 }
1050
1051 do total.times {
1052 end_port.recv();
1053 }
1054 }
1055 }
1056
1057 #[test]
1058 fn shared_port_close_simple() {
1059 do run_in_mt_newsched_task {
1060 let (port, chan) = stream::<()>();
1061 let port = SharedPort::new(port);
1062 { let _chan = chan; }
1063 assert!(port.try_recv().is_none());
1064 }
1065 }
1066
1067 #[test]
1068 fn shared_port_close() {
1069 do run_in_mt_newsched_task {
1070 let (end_port, end_chan) = stream::<bool>();
1071 let (port, chan) = stream::<()>();
1072 let end_chan = SharedChan::new(end_chan);
1073 let port = SharedPort::new(port);
1074 let chan = SharedChan::new(chan);
1075 let send_total = 10;
1076 let recv_total = 20;
1077 do spawntask_random {
1078 do send_total.times {
1079 let chan_clone = chan.clone();
1080 do spawntask_random {
1081 chan_clone.send(());
1082 }
1083 }
1084 }
1085 let end_chan_clone = end_chan.clone();
1086 do spawntask_random {
1087 do recv_total.times {
1088 let port_clone = port.clone();
1089 let end_chan_clone = end_chan_clone.clone();
1090 do spawntask_random {
1091 let recvd = port_clone.try_recv().is_some();
1092 end_chan_clone.send(recvd);
1093 }
1094 }
1095 }
1096
1097 let mut recvd = 0;
1098 do recv_total.times {
1099 recvd += if end_port.recv() { 1 } else { 0 };
1100 }
1101
1102 assert!(recvd == send_total);
1103 }
1104 }
1105
1106 #[test]
1107 fn megapipe_stress() {
1108 use rand;
1109 use rand::Rng;
1110
1111 if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1112
1113 do run_in_mt_newsched_task {
1114 let (end_port, end_chan) = stream::<()>();
1115 let end_chan = SharedChan::new(end_chan);
1116 let pipe = megapipe();
1117 let total = stress_factor() + 10;
1118 let mut rng = rand::rng();
1119 do total.times {
1120 let msgs = rng.gen_integer_range(0u, 10);
1121 let pipe_clone = pipe.clone();
1122 let end_chan_clone = end_chan.clone();
1123 do spawntask_random {
1124 do msgs.times {
1125 pipe_clone.send(());
1126 }
1127 do msgs.times {
1128 pipe_clone.recv();
1129 }
1130 }
1131
1132 end_chan_clone.send(());
1133 }
1134
1135 do total.times {
1136 end_port.recv();
1137 }
1138 }
1139 }
1140
1141 #[test]
1142 fn send_deferred() {
1143 use unstable::sync::atomically;
1144
1145 // Tests no-rescheduling of send_deferred on all types of channels.
1146 do run_in_newsched_task {
1147 let (pone, cone) = oneshot();
1148 let (pstream, cstream) = stream();
1149 let (pshared, cshared) = stream();
1150 let cshared = SharedChan::new(cshared);
1151 let mp = megapipe();
1152
1153 let pone = Cell::new(pone);
1154 do spawntask { pone.take().recv(); }
1155 let pstream = Cell::new(pstream);
1156 do spawntask { pstream.take().recv(); }
1157 let pshared = Cell::new(pshared);
1158 do spawntask { pshared.take().recv(); }
1159 let p_mp = Cell::new(mp.clone());
1160 do spawntask { p_mp.take().recv(); }
1161
1162 let cs = Cell::new((cone, cstream, cshared, mp));
1163 unsafe {
1164 do atomically {
1165 let (cone, cstream, cshared, mp) = cs.take();
1166 cone.send_deferred(());
1167 cstream.send_deferred(());
1168 cshared.send_deferred(());
1169 mp.send_deferred(());
1170 }
1171 }
1172 }
1173 }
1174
1175 }
libstd/rt/comm.rs:60:1-60:1 -fn- definition:
pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
references:-641: let (next_link_port, next_link_chan) = oneshot();
595: let (next_pone, next_cone) = oneshot();
669: let (next_data_port, _) = oneshot();
465: let (next_pone, next_cone) = oneshot();
457: let (pone, cone) = oneshot();
660: let (next_link_port, next_link_chan) = oneshot();
libstd/comm.rs:
57: let (p, c) = rtcomm::oneshot();
libstd/rt/test.rs:
279: let (port, chan) = oneshot();
libstd/rt/comm.rs:37:81-37:81 -ty- definition:
/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
type State = uint;
references:-40: static STATE_BOTH: State = 2;
41: static STATE_ONE: State = 1;
libstd/rt/comm.rs:55:21-55:21 -struct- definition:
/// A one-shot port.
pub struct PortOne<T> {
references:-327: impl<T> SelectPortInner<T> for PortOne<T> {
634: priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>>
69: let port = PortOne {
61: pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
359: impl<T> SelectPort<T> for PortOne<T> { }
224: impl<T> SelectInner for PortOne<T> {
361: impl<T> Peekable<T> for PortOne<T> {
404: impl<T> Drop for PortOne<T> {
442: type StreamPortOne<T> = PortOne<StreamPayload<T>>;
438: next: PortOne<StreamPayload<T>>
325: impl<T> Select for PortOne<T> { }
185: impl<T> PortOne<T> {
libstd/comm.rs:
53: pub struct PortOne<T> { x: rtcomm::PortOne<T> }
libstd/rt/comm.rs:578:1-578:1 -struct- definition:
pub struct SharedChan<T> {
references:-609: impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
625: fn clone(&self) -> SharedChan<T> {
624: impl<T> Clone for SharedChan<T> {
615: impl<T: Send> SendDeferred<T> for SharedChan<T> {
592: impl<T: Send> SharedChan<T> {
584: impl<T> SharedChan<T> {
688: type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
585: pub fn new(chan: Chan<T>) -> SharedChan<T> {
626: SharedChan {
603: impl<T: Send> GenericChan<T> for SharedChan<T> {
588: SharedChan { next: UnsafeArc::new(next) }
libstd/comm.rs:
156: pub struct SharedChan<T> { x: rtcomm::SharedChan<T> }
libstd/rt/comm.rs:435:1-435:1 -struct- definition:
struct StreamPayload<T> {
references:-438: next: PortOne<StreamPayload<T>>
597: cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
665: Some(StreamPayload { val, next }) => {
468: cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
506: Some(StreamPayload { val, next }) => {
442: type StreamPortOne<T> = PortOne<StreamPayload<T>>;
441: type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
568: Some(StreamPayload { val, next }) => {
libstd/rt/comm.rs:455:1-455:1 -fn- definition:
pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
references:-691: let (port, chan) = stream();
libstd/comm.rs:
65: let (p, c) = rtcomm::stream();
libstd/rt/comm.rs:43:63-43:63 -struct- definition:
/// The heap-allocated structure shared between two endpoints.
struct Packet<T> {
references:-364: let packet: *mut Packet<T> = self.packet();
62: let packet: ~Packet<T> = ~Packet {
83: fn packet(&self) -> *mut Packet<T> {
416: let _packet: ~Packet<T> = cast::transmute(this.void_packet);
188: let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
161: let _packet: ~Packet<T> = cast::transmute(this.void_packet);
388: let _packet: ~Packet<T> = cast::transmute(this.void_packet);
86: let p: *mut Packet<T> = &mut **p;
186: fn packet(&self) -> *mut Packet<T> {
351: let _packet: ~Packet<T> = cast::transmute(this.void_packet);
62: let packet: ~Packet<T> = ~Packet {
85: let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
189: let p: *mut Packet<T> = &mut **p;
libstd/rt/comm.rs:440:1-440:1 -ty- definition:
type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
references:-581: priv next: UnsafeArc<AtomicOption<StreamChanOne<T>>>
447: next: Cell<StreamChanOne<T>>
libstd/rt/comm.rs:450:33-450:33 -struct- definition:
/// An port with unbounded size.
pub struct Port<T> {
references:-458: let port = Port { next: Cell::new(pone) };
456: pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
563: impl<T> Select for Port<T> { }
544: impl<'self, T> Select for &'self Port<T> { }
546: impl<T> SelectInner for Port<T> {
638: pub fn new(port: Port<T>) -> SharedPort<T> {
493: impl<T> GenericPort<T> for Port<T> {
526: impl<'self, T> SelectInner for &'self Port<T> {
516: impl<T> Peekable<T> for Port<T> {
577: impl<'self, T> SelectPort<T> for &'self Port<T> { }
565: impl<'self, T> SelectPortInner<T> for &'self Port<T> {
libstd/comm.rs:
61: pub struct Port<T> { x: rtcomm::Port<T> }
libstd/rt/comm.rs:49:23-49:23 -struct- definition:
// A one-shot channel.
pub struct ChanOne<T> {
references:-441: type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
73: let chan = ChanOne {
61: pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
376: impl<T> Drop for ChanOne<T> {
81: impl<T> ChanOne<T> {
libstd/comm.rs:
54: pub struct ChanOne<T> { x: rtcomm::ChanOne<T> }
libstd/rt/comm.rs:687:33-687:33 -ty- definition:
// FIXME #7760: Need better name
type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
references:-707: impl<T: Send> GenericPort<T> for MegaPipe<T> {
690: pub fn megapipe<T: Send>() -> MegaPipe<T> {
695: impl<T: Send> GenericChan<T> for MegaPipe<T> {
717: impl<T: Send> SendDeferred<T> for MegaPipe<T> {
701: impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
libstd/rt/comm.rs:444:35-444:35 -struct- definition:
/// A channel with unbounded size.
pub struct Chan<T> {
references:-459: let chan = Chan { next: Cell::new(cone) };
456: pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
585: pub fn new(chan: Chan<T>) -> SharedChan<T> {
463: impl<T: Send> Chan<T> {
484: impl<T: Send> SendDeferred<T> for Chan<T> {
472: impl<T: Send> GenericChan<T> for Chan<T> {
478: impl<T: Send> GenericSmartChan<T> for Chan<T> {
libstd/comm.rs:
62: pub struct Chan<T> { x: rtcomm::Chan<T> }
libstd/rt/comm.rs:631:1-631:1 -struct- definition:
pub struct SharedPort<T> {
references:-637: impl<T> SharedPort<T> {
644: SharedPort { next_link: UnsafeArc::new(next_link) }
638: pub fn new(port: Port<T>) -> SharedPort<T> {
688: type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
679: impl<T> Clone for SharedPort<T> {
681: SharedPort {
680: fn clone(&self) -> SharedPort<T> {
648: impl<T: Send> GenericPort<T> for SharedPort<T> {
libstd/comm.rs:
198: pub struct SharedPort<T> { x: rtcomm::SharedPort<T> }
libstd/rt/comm.rs:430:87-430:87 -trait- definition:
/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
pub trait SendDeferred<T> {
references:-615: impl<T: Send> SendDeferred<T> for SharedChan<T> {
484: impl<T: Send> SendDeferred<T> for Chan<T> {
717: impl<T: Send> SendDeferred<T> for MegaPipe<T> {
libstd/comm.rs:
179: impl<T: Send> SendDeferred<T> for SharedChan<T> {
124: impl<T: Send> SendDeferred<T> for Chan<T> {
libstd/rt/comm.rs:441:51-441:51 -ty- definition:
type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
type StreamPortOne<T> = PortOne<StreamPayload<T>>;
references:-634: priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>>
453: next: Cell<StreamPortOne<T>>