(index<- ) ./libstd/task.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12 * Utilities for managing and scheduling tasks
13 *
14 * An executing Rust program consists of a collection of tasks, each with their
15 * own stack, and sole ownership of their allocated heap data. Tasks communicate
16 * with each other using channels (see `std::comm` for more info about how
17 * communication works).
18 *
19 * Failure in one task does not propagate to any others (not to parent, not to
20 * child). Failure propagation is instead handled by using the channel send()
21 * and recv() methods which will fail if the other end has hung up already.
22 *
23 * Task Scheduling:
24 *
25 * By default, every task is created with the same "flavor" as the calling task.
26 * This flavor refers to the scheduling mode, with two possibilities currently
27 * being 1:1 and M:N modes. Green (M:N) tasks are cooperatively scheduled and
28 * native (1:1) tasks are scheduled by the OS kernel.
29 *
30 * # Example
31 *
32 * ```rust
33 * spawn(proc() {
34 * println!("Hello, World!");
35 * })
36 * ```
37 */
38
39 use any::Any;
40 use comm::{Sender, Receiver, channel};
41 use io::Writer;
42 use kinds::{Send, marker};
43 use option::{None, Some, Option};
44 use owned::Box;
45 use result::{Result, Ok, Err};
46 use rt::local::Local;
47 use rt::task::Task;
48 use str::{Str, SendStr, IntoMaybeOwned};
49
50 #[cfg(test)] use any::{AnyOwnExt, AnyRefExt};
51 #[cfg(test)] use result;
52 #[cfg(test)] use str::StrAllocating;
53 #[cfg(test)] use realstd::result::ResultUnwrap;
54
55 /// Indicates the manner in which a task exited.
56 ///
57 /// A task that completes without failing is considered to exit successfully.
58 ///
59 /// If you wish for this result's delivery to block until all
60 /// children tasks complete, recommend using a result future.
61 pub type TaskResult = Result<(), Box<Any:Send>>;
62
63 /// Task configuration options
64 pub struct TaskOpts {
65 /// Enable lifecycle notifications on the given channel
66 pub notify_chan: Option<Sender<TaskResult>>,
67 /// A name for the task-to-be, for identification in failure messages
68 pub name: Option<SendStr>,
69 /// The size of the stack for the spawned task
70 pub stack_size: Option<uint>,
71 /// Task-local stdout
72 pub stdout: Option<Box<Writer:Send>>,
73 /// Task-local stderr
74 pub stderr: Option<Box<Writer:Send>>,
75 }
76
77 /**
78 * The task builder type.
79 *
80 * Provides detailed control over the properties and behavior of new tasks.
81 */
82 // NB: Builders are designed to be single-use because they do stateful
83 // things that get weird when reusing - e.g. if you create a result future
84 // it only applies to a single task, so then you have to maintain Some
85 // potentially tricky state to ensure that everything behaves correctly
86 // when you try to reuse the builder to spawn a new task. We'll just
87 // sidestep that whole issue by making builders uncopyable and making
88 // the run function move them in.
89 pub struct TaskBuilder {
90 /// Options to spawn the new task with
91 pub opts: TaskOpts,
92 gen_body: Option<proc(v: proc():Send):Send -> proc():Send>,
93 nocopy: Option<marker::NoCopy>,
94 }
95
96 impl TaskBuilder {
97 /// Generate the base configuration for spawning a task, off of which more
98 /// configuration methods can be chained.
99 pub fn new() -> TaskBuilder {
100 TaskBuilder {
101 opts: TaskOpts::new(),
102 gen_body: None,
103 nocopy: None,
104 }
105 }
106
107 /// Get a future representing the exit status of the task.
108 ///
109 /// Taking the value of the future will block until the child task
110 /// terminates. The future result return value will be created *before* the task is
111 /// spawned; as such, do not invoke .get() on it directly;
112 /// rather, store it in an outer variable/list for later use.
113 ///
114 /// # Failure
115 /// Fails if a future_result was already set for this task.
116 pub fn future_result(&mut self) -> Receiver<TaskResult> {
117 // FIXME (#3725): Once linked failure and notification are
118 // handled in the library, I can imagine implementing this by just
119 // registering an arbitrary number of task::on_exit handlers and
120 // sending out messages.
121
122 if self.opts.notify_chan.is_some() {
123 fail!("Can't set multiple future_results for one task!");
124 }
125
126 // Construct the future and give it to the caller.
127 let (tx, rx) = channel();
128
129 // Reconfigure self to use a notify channel.
130 self.opts.notify_chan = Some(tx);
131
132 rx
133 }
134
135 /// Name the task-to-be. Currently the name is used for identification
136 /// only in failure messages.
137 pub fn named<S: IntoMaybeOwned<'static>>(mut self, name: S) -> TaskBuilder {
138 self.opts.name = Some(name.into_maybe_owned());
139 self
140 }
141
142 /**
143 * Add a wrapper to the body of the spawned task.
144 *
145 * Before the task is spawned it is passed through a 'body generator'
146 * function that may perform local setup operations as well as wrap
147 * the task body in remote setup operations. With this the behavior
148 * of tasks can be extended in simple ways.
149 *
150 * This function augments the current body generator with a new body
151 * generator by applying the task body which results from the
152 * existing body generator to the new body generator.
153 */
154 pub fn with_wrapper(mut self,
155 wrapper: proc(v: proc():Send):Send -> proc():Send)
156 -> TaskBuilder
157 {
158 self.gen_body = match self.gen_body.take() {
159 Some(prev) => Some(proc(body) { wrapper(prev(body)) }),
160 None => Some(wrapper)
161 };
162 self
163 }
164
165 /**
166 * Creates and executes a new child task
167 *
168 * Sets up a new task with its own call stack and schedules it to run
169 * the provided unique closure. The task has the properties and behavior
170 * specified by the task_builder.
171 */
172 pub fn spawn(mut self, f: proc():Send) {
173 let gen_body = self.gen_body.take();
174 let f = match gen_body {
175 Some(gen) => gen(f),
176 None => f
177 };
178 let t: Box<Task> = Local::take();
179 t.spawn_sibling(self.opts, f);
180 }
181
182 /**
183 * Execute a function in another task and return either the return value
184 * of the function or result::err.
185 *
186 * # Return value
187 *
188 * If the function executed successfully then try returns result::ok
189 * containing the value returned by the function. If the function fails
190 * then try returns result::err containing nil.
191 *
192 * # Failure
193 * Fails if a future_result was already set for this task.
194 */
195 pub fn try<T:Send>(mut self, f: proc():Send -> T)
196 -> Result<T, Box<Any:Send>> {
197 let (tx, rx) = channel();
198
199 let result = self.future_result();
200
201 self.spawn(proc() {
202 tx.send(f());
203 });
204
205 match result.recv() {
206 Ok(()) => Ok(rx.recv()),
207 Err(cause) => Err(cause)
208 }
209 }
210 }
211
212 /* Task construction */
213
214 impl TaskOpts {
215 pub fn new() -> TaskOpts {
216 /*!
217 * The default task options
218 */
219
220 TaskOpts {
221 notify_chan: None,
222 name: None,
223 stack_size: None,
224 stdout: None,
225 stderr: None,
226 }
227 }
228 }
229
230 /* Spawn convenience functions */
231
232 /// Creates and executes a new child task
233 ///
234 /// Sets up a new task with its own call stack and schedules it to run
235 /// the provided unique closure.
236 ///
237 /// This function is equivalent to `TaskBuilder::new().spawn(f)`.
238 pub fn spawn(f: proc():Send) {
239 TaskBuilder::new().spawn(f)
240 }
241
242 /// Execute a function in another task and return either the return value of
243 /// the function or an error if the task failed
244 ///
245 /// This is equivalent to TaskBuilder::new().try
246 pub fn try<T:Send>(f: proc():Send -> T) -> Result<T, Box<Any:Send>> {
247 TaskBuilder::new().try(f)
248 }
249
250
251 /* Lifecycle functions */
252
253 /// Read the name of the current task.
254 pub fn with_task_name<U>(blk: |Option<&str>| -> U) -> U {
255 use rt::task::Task;
256
257 let task = Local::borrow(None::<Task>);
258 match task.name {
259 Some(ref name) => blk(Some(name.as_slice())),
260 None => blk(None)
261 }
262 }
263
264 pub fn deschedule() {
265 //! Yield control to the task scheduler
266
267 use rt::local::Local;
268
269 // FIXME(#7544): Optimize this, since we know we won't block.
270 let task: Box<Task> = Local::take();
271 task.yield_now();
272 }
273
274 pub fn failing() -> bool {
275 //! True if the running task has failed
276 use rt::task::Task;
277 Local::borrow(None::<Task>).unwinder.unwinding()
278 }
279
280 // The following 8 tests test the following 2^3 combinations:
281 // {un,}linked {un,}supervised failure propagation {up,down}wards.
282
283 // !!! These tests are dangerous. If Something is buggy, they will hang, !!!
284 // !!! instead of exiting cleanly. This might wedge the buildbots. !!!
285
286 #[test]
287 fn test_unnamed_task() {
288 spawn(proc() {
289 with_task_name(|name| {
290 assert!(name.is_none());
291 })
292 })
293 }
294
295 #[test]
296 fn test_owned_named_task() {
297 TaskBuilder::new().named("ada lovelace".to_owned()).spawn(proc() {
298 with_task_name(|name| {
299 assert!(name.unwrap() == "ada lovelace");
300 })
301 })
302 }
303
304 #[test]
305 fn test_static_named_task() {
306 TaskBuilder::new().named("ada lovelace").spawn(proc() {
307 with_task_name(|name| {
308 assert!(name.unwrap() == "ada lovelace");
309 })
310 })
311 }
312
313 #[test]
314 fn test_send_named_task() {
315 TaskBuilder::new().named("ada lovelace".into_maybe_owned()).spawn(proc() {
316 with_task_name(|name| {
317 assert!(name.unwrap() == "ada lovelace");
318 })
319 })
320 }
321
322 #[test]
323 fn test_run_basic() {
324 let (tx, rx) = channel();
325 TaskBuilder::new().spawn(proc() {
326 tx.send(());
327 });
328 rx.recv();
329 }
330
331 #[test]
332 fn test_with_wrapper() {
333 let (tx, rx) = channel();
334 TaskBuilder::new().with_wrapper(proc(body) {
335 let result: proc():Send = proc() {
336 body();
337 tx.send(());
338 };
339 result
340 }).spawn(proc() { });
341 rx.recv();
342 }
343
344 #[test]
345 fn test_future_result() {
346 let mut builder = TaskBuilder::new();
347 let result = builder.future_result();
348 builder.spawn(proc() {});
349 assert!(result.recv().is_ok());
350
351 let mut builder = TaskBuilder::new();
352 let result = builder.future_result();
353 builder.spawn(proc() {
354 fail!();
355 });
356 assert!(result.recv().is_err());
357 }
358
359 #[test] #[should_fail]
360 fn test_back_to_the_future_result() {
361 let mut builder = TaskBuilder::new();
362 builder.future_result();
363 builder.future_result();
364 }
365
366 #[test]
367 fn test_try_success() {
368 match try(proc() {
369 "Success!".to_owned()
370 }).as_ref().map(|s| s.as_slice()) {
371 result::Ok("Success!") => (),
372 _ => fail!()
373 }
374 }
375
376 #[test]
377 fn test_try_fail() {
378 match try(proc() {
379 fail!()
380 }) {
381 result::Err(_) => (),
382 result::Ok(()) => fail!()
383 }
384 }
385
386 #[test]
387 fn test_spawn_sched() {
388 use clone::Clone;
389
390 let (tx, rx) = channel();
391
392 fn f(i: int, tx: Sender<()>) {
393 let tx = tx.clone();
394 spawn(proc() {
395 if i == 0 {
396 tx.send(());
397 } else {
398 f(i - 1, tx);
399 }
400 });
401
402 }
403 f(10, tx);
404 rx.recv();
405 }
406
407 #[test]
408 fn test_spawn_sched_childs_on_default_sched() {
409 let (tx, rx) = channel();
410
411 spawn(proc() {
412 spawn(proc() {
413 tx.send(());
414 });
415 });
416
417 rx.recv();
418 }
419
420 #[cfg(test)]
421 fn avoid_copying_the_body(spawnfn: |v: proc():Send|) {
422 let (tx, rx) = channel::<uint>();
423
424 let x = box 1;
425 let x_in_parent = (&*x) as *int as uint;
426
427 spawnfn(proc() {
428 let x_in_child = (&*x) as *int as uint;
429 tx.send(x_in_child);
430 });
431
432 let x_in_child = rx.recv();
433 assert_eq!(x_in_parent, x_in_child);
434 }
435
436 #[test]
437 fn test_avoid_copying_the_body_spawn() {
438 avoid_copying_the_body(spawn);
439 }
440
441 #[test]
442 fn test_avoid_copying_the_body_task_spawn() {
443 avoid_copying_the_body(|f| {
444 let builder = TaskBuilder::new();
445 builder.spawn(proc() {
446 f();
447 });
448 })
449 }
450
451 #[test]
452 fn test_avoid_copying_the_body_try() {
453 avoid_copying_the_body(|f| {
454 let _ = try(proc() {
455 f()
456 });
457 })
458 }
459
460 #[test]
461 fn test_child_doesnt_ref_parent() {
462 // If the child refcounts the parent task, this will stack overflow when
463 // climbing the task tree to dereference each ancestor. (See #1789)
464 // (well, it would if the constant were 8000+ - I lowered it to be more
465 // valgrind-friendly. try this at home, instead..!)
466 static generations: uint = 16;
467 fn child_no(x: uint) -> proc():Send {
468 return proc() {
469 if x < generations {
470 TaskBuilder::new().spawn(child_no(x+1));
471 }
472 }
473 }
474 TaskBuilder::new().spawn(child_no(0));
475 }
476
477 #[test]
478 fn test_simple_newsched_spawn() {
479 spawn(proc()())
480 }
481
482 #[test]
483 fn test_try_fail_message_static_str() {
484 match try(proc() {
485 fail!("static string");
486 }) {
487 Err(e) => {
488 type T = &'static str;
489 assert!(e.is::<T>());
490 assert_eq!(*e.move::<T>().unwrap(), "static string");
491 }
492 Ok(()) => fail!()
493 }
494 }
495
496 #[test]
497 fn test_try_fail_message_owned_str() {
498 match try(proc() {
499 fail!("owned string".to_owned());
500 }) {
501 Err(e) => {
502 type T = ~str;
503 assert!(e.is::<T>());
504 assert_eq!(*e.move::<T>().unwrap(), "owned string".to_owned());
505 }
506 Ok(()) => fail!()
507 }
508 }
509
510 #[test]
511 fn test_try_fail_message_any() {
512 match try(proc() {
513 fail!(box 413u16 as Box<Any:Send>);
514 }) {
515 Err(e) => {
516 type T = Box<Any:Send>;
517 assert!(e.is::<T>());
518 let any = e.move::<T>().unwrap();
519 assert!(any.is::<u16>());
520 assert_eq!(*any.move::<u16>().unwrap(), 413u16);
521 }
522 Ok(()) => fail!()
523 }
524 }
525
526 #[test]
527 fn test_try_fail_message_unit_struct() {
528 struct Juju;
529
530 match try(proc() {
531 fail!(Juju)
532 }) {
533 Err(ref e) if e.is::<Juju>() => {}
534 Err(_) | Ok(()) => fail!()
535 }
536 }
libstd/task.rs:63:31-63:31 -struct- definition:
/// Task configuration options
pub struct TaskOpts {
/// Enable lifecycle notifications on the given channel
references:- 6220: TaskOpts {
221: notify_chan: None,
libstd/rt/mod.rs:
164: cur_task: Box<Task>,
165: opts: TaskOpts,
166: f: proc():Send);
libstd/rt/task.rs:
240: /// the `opts` structure and will run `f` as the body of its code.
241: pub fn spawn_sibling(mut ~self, opts: TaskOpts, f: proc():Send) {
242: let ops = self.imp.take_unwrap();
libstd/task.rs:
90: /// Options to spawn the new task with
91: pub opts: TaskOpts,
92: gen_body: Option<proc(v: proc():Send):Send -> proc():Send>,
libstd/task.rs:60:62-60:62 -NK_AS_STR_TODO- definition:
/// children tasks complete, recommend using a result future.
pub type TaskResult = Result<(), Box<Any:Send>>;
/// Task configuration options
references:- 6115: /// Fails if a future_result was already set for this task.
116: pub fn future_result(&mut self) -> Receiver<TaskResult> {
117: // FIXME (#3725): Once linked failure and notification are
libstd/rt/task.rs:
74: /// until all its watched children exit before collecting the status.
75: Execute(proc(TaskResult):Send),
76: /// A channel to send the result of the task on when the task exits
77: SendMessage(Sender<TaskResult>),
78: }
libstd/rt/unwind.rs:
164: pub fn result(&mut self) -> TaskResult {
165: if self.unwinding {
libstd/rt/task.rs:
391: /// Collect failure exit codes from children and propagate them to a parent.
392: pub fn collect_failure(&mut self, result: TaskResult) {
393: match self.on_exit.take() {
libstd/task.rs:88:34-88:34 -struct- definition:
// the run function move them in.
pub struct TaskBuilder {
/// Options to spawn the new task with
references:- 599: pub fn new() -> TaskBuilder {
100: TaskBuilder {
101: opts: TaskOpts::new(),
--
155: wrapper: proc(v: proc():Send):Send -> proc():Send)
156: -> TaskBuilder
157: {