(index<- ) ./libstd/rt/tube.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! A very simple unsynchronized channel type for sending buffered data from
12 //! scheduler context to task context.
13 //!
14 //! XXX: This would be safer to use if split into two types like Port/Chan
15
16 use option::*;
17 use clone::Clone;
18 use super::rc::RC;
19 use rt::sched::Scheduler;
20 use rt::kill::BlockedTask;
21 use rt::local::Local;
22 use vec::OwnedVector;
23 use container::Container;
24
25 struct TubeState<T> {
26 blocked_task: Option<BlockedTask>,
27 buf: ~[T]
28 }
29
30 pub struct Tube<T> {
31 p: RC<TubeState<T>>
32 }
33
34 impl<T> Tube<T> {
35 pub fn new() -> Tube<T> {
36 Tube {
37 p: RC::new(TubeState {
38 blocked_task: None,
39 buf: ~[]
40 })
41 }
42 }
43
44 pub fn send(&mut self, val: T) {
45 rtdebug!("tube send");
46 unsafe {
47 let state = self.p.unsafe_borrow_mut();
48 (*state).buf.push(val);
49
50 if (*state).blocked_task.is_some() {
51 // There's a waiting task. Wake it up
52 rtdebug!("waking blocked tube");
53 let task = (*state).blocked_task.take_unwrap();
54 let sched: ~Scheduler = Local::take();
55 sched.resume_blocked_task_immediately(task);
56 }
57 }
58 }
59
60 pub fn recv(&mut self) -> T {
61 unsafe {
62 let state = self.p.unsafe_borrow_mut();
63 if !(*state).buf.is_empty() {
64 return (*state).buf.shift();
65 } else {
66 // Block and wait for the next message
67 rtdebug!("blocking on tube recv");
68 assert!(self.p.refcount() > 1); // There better be somebody to wake us up
69 assert!((*state).blocked_task.is_none());
70 let sched: ~Scheduler = Local::take();
71 do sched.deschedule_running_task_and_then |_, task| {
72 (*state).blocked_task = Some(task);
73 }
74 rtdebug!("waking after tube recv");
75 let buf = &mut (*state).buf;
76 assert!(!buf.is_empty());
77 return buf.shift();
78 }
79 }
80 }
81 }
82
83 impl<T> Clone for Tube<T> {
84 fn clone(&self) -> Tube<T> {
85 Tube { p: self.p.clone() }
86 }
87 }
88
89 #[cfg(test)]
90 mod test {
91 use cell::Cell;
92 use rt::test::*;
93 use rt::rtio::EventLoop;
94 use rt::sched::Scheduler;
95 use rt::local::Local;
96 use super::*;
97 use prelude::*;
98
99 #[test]
100 fn simple_test() {
101 do run_in_newsched_task {
102 let mut tube: Tube<int> = Tube::new();
103 let tube_clone = tube.clone();
104 let tube_clone_cell = Cell::new(tube_clone);
105 let sched: ~Scheduler = Local::take();
106 do sched.deschedule_running_task_and_then |sched, task| {
107 let mut tube_clone = tube_clone_cell.take();
108 tube_clone.send(1);
109 sched.enqueue_blocked_task(task);
110 }
111
112 assert!(tube.recv() == 1);
113 }
114 }
115
116 #[test]
117 fn blocking_test() {
118 do run_in_newsched_task {
119 let mut tube: Tube<int> = Tube::new();
120 let tube_clone = tube.clone();
121 let tube_clone = Cell::new(tube_clone);
122 let sched: ~Scheduler = Local::take();
123 do sched.deschedule_running_task_and_then |sched, task| {
124 let tube_clone = Cell::new(tube_clone.take());
125 do sched.event_loop.callback {
126 let mut tube_clone = tube_clone.take();
127 // The task should be blocked on this now and
128 // sending will wake it up.
129 tube_clone.send(1);
130 }
131 sched.enqueue_blocked_task(task);
132 }
133
134 assert!(tube.recv() == 1);
135 }
136 }
137
138 #[test]
139 fn many_blocking_test() {
140 static MAX: int = 100;
141
142 do run_in_newsched_task {
143 let mut tube: Tube<int> = Tube::new();
144 let tube_clone = tube.clone();
145 let tube_clone = Cell::new(tube_clone);
146 let sched: ~Scheduler = Local::take();
147 do sched.deschedule_running_task_and_then |sched, task| {
148 callback_send(tube_clone.take(), 0);
149
150 fn callback_send(tube: Tube<int>, i: int) {
151 if i == 100 { return; }
152
153 let tube = Cell::new(Cell::new(tube));
154 do Local::borrow |sched: &mut Scheduler| {
155 let tube = tube.take();
156 do sched.event_loop.callback {
157 let mut tube = tube.take();
158 // The task should be blocked on this now and
159 // sending will wake it up.
160 tube.send(i);
161 callback_send(tube, i + 1);
162 }
163 }
164 }
165
166 sched.enqueue_blocked_task(task);
167 }
168
169 for i in range(0, MAX) {
170 let j = tube.recv();
171 assert!(j == i);
172 }
173 }
174 }
175 }