(index<- ) ./libgreen/task.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! The Green Task implementation
12 //!
13 //! This module contains the glue to the libstd runtime necessary to integrate
14 //! M:N scheduling. This GreenTask structure is hidden as a trait object in all
15 //! rust tasks and virtual calls are made in order to interface with it.
16 //!
17 //! Each green task contains a scheduler if it is currently running, and it also
18 //! contains the rust task itself in order to juggle around ownership of the
19 //! values.
20
21 use std::any::Any;
22 use std::cast;
23 use std::raw;
24 use std::rt::Runtime;
25 use std::rt::env;
26 use std::rt::local::Local;
27 use std::rt::rtio;
28 use std::rt::stack;
29 use std::rt::task::{Task, BlockedTask, SendMessage};
30 use std::task::TaskOpts;
31 use std::unstable::mutex::NativeMutex;
32
33 use context::Context;
34 use coroutine::Coroutine;
35 use sched::{Scheduler, SchedHandle, RunOnce};
36 use stack::StackPool;
37
38 /// The necessary fields needed to keep track of a green task (as opposed to a
39 /// 1:1 task).
40 pub struct GreenTask {
41 /// Coroutine that this task is running on, otherwise known as the register
42 /// context and the stack that this task owns. This field is optional to
43 /// relinquish ownership back to a scheduler to recycle stacks at a later
44 /// date.
45 pub coroutine: Option<Coroutine>,
46
47 /// Optional handle back into the home sched pool of this task. This field
48 /// is lazily initialized.
49 pub handle: Option<SchedHandle>,
50
51 /// Slot for maintaining ownership of a scheduler. If a task is running,
52 /// this value will be Some(sched) where the task is running on "sched".
53 pub sched: Option<Box<Scheduler>>,
54
55 /// Temporary ownership slot of a std::rt::task::Task object. This is used
56 /// to squirrel that libstd task away while we're performing green task
57 /// operations.
58 pub task: Option<Box<Task>>,
59
60 /// Dictates whether this is a sched task or a normal green task
61 pub task_type: TaskType,
62
63 /// Home pool that this task was spawned into. This field is lazily
64 /// initialized until when the task is initially scheduled, and is used to
65 /// make sure that tasks are always woken up in the correct pool of
66 /// schedulers.
67 pub pool_id: uint,
68
69 // See the comments in the scheduler about why this is necessary
70 pub nasty_deschedule_lock: NativeMutex,
71 }
72
73 pub enum TaskType {
74 TypeGreen(Option<Home>),
75 TypeSched,
76 }
77
78 pub enum Home {
79 AnySched,
80 HomeSched(SchedHandle),
81 }
82
83 /// Trampoline code for all new green tasks which are running around. This
84 /// function is passed through to Context::new as the initial rust landing pad
85 /// for all green tasks. This code is actually called after the initial context
86 /// switch onto a green thread.
87 ///
88 /// The first argument to this function is the `Box<GreenTask>` pointer, and
89 /// the next two arguments are the user-provided procedure for running code.
90 ///
91 /// The goal for having this weird-looking function is to reduce the number of
92 /// allocations done on a green-task startup as much as possible.
93 extern fn bootstrap_green_task(task: uint, code: *(), env: *()) -> ! {
94 // Acquire ownership of the `proc()`
95 let start: proc() = unsafe {
96 cast::transmute(raw::Procedure { code: code, env: env })
97 };
98
99 // Acquire ownership of the `Box<GreenTask>`
100 let mut task: Box<GreenTask> = unsafe { cast::transmute(task) };
101
102 // First code after swap to this new context. Run our cleanup job
103 task.pool_id = {
104 let sched = task.sched.get_mut_ref();
105 sched.run_cleanup_job();
106 sched.task_state.increment();
107 sched.pool_id
108 };
109
110 // Convert our green task to a libstd task and then execute the code
111 // requested. This is the "try/catch" block for this green task and
112 // is the wrapper for *all* code run in the task.
113 let mut start = Some(start);
114 let task = task.swap().run(|| start.take_unwrap()());
115
116 // Once the function has exited, it's time to run the termination
117 // routine. This means we need to context switch one more time but
118 // clean ourselves up on the other end. Since we have no way of
119 // preserving a handle to the GreenTask down to this point, this
120 // unfortunately must call `GreenTask::convert`. In order to avoid
121 // this we could add a `terminate` function to the `Runtime` trait
122 // in libstd, but that seems less appropriate since the coversion
123 // method exists.
124 GreenTask::convert(task).terminate()
125 }
126
127 impl GreenTask {
128 /// Creates a new green task which is not homed to any particular scheduler
129 /// and will not have any contained Task structure.
130 pub fn new(stack_pool: &mut StackPool,
131 stack_size: Option<uint>,
132 start: proc():Send) -> Box<GreenTask> {
133 GreenTask::new_homed(stack_pool, stack_size, AnySched, start)
134 }
135
136 /// Creates a new task (like `new`), but specifies the home for new task.
137 pub fn new_homed(stack_pool: &mut StackPool,
138 stack_size: Option<uint>,
139 home: Home,
140 start: proc():Send) -> Box<GreenTask> {
141 // Allocate ourselves a GreenTask structure
142 let mut ops = GreenTask::new_typed(None, TypeGreen(Some(home)));
143
144 // Allocate a stack for us to run on
145 let stack_size = stack_size.unwrap_or_else(|| env::min_stack());
146 let mut stack = stack_pool.take_stack(stack_size);
147 let context = Context::new(bootstrap_green_task, ops.as_uint(), start,
148 &mut stack);
149
150 // Package everything up in a coroutine and return
151 ops.coroutine = Some(Coroutine {
152 current_stack_segment: stack,
153 saved_context: context,
154 });
155 return ops;
156 }
157
158 /// Creates a new green task with the specified coroutine and type, this is
159 /// useful when creating scheduler tasks.
160 pub fn new_typed(coroutine: Option<Coroutine>,
161 task_type: TaskType) -> Box<GreenTask> {
162 box GreenTask {
163 pool_id: 0,
164 coroutine: coroutine,
165 task_type: task_type,
166 sched: None,
167 handle: None,
168 nasty_deschedule_lock: unsafe { NativeMutex::new() },
169 task: Some(box Task::new()),
170 }
171 }
172
173 /// Creates a new green task with the given configuration options for the
174 /// contained Task object. The given stack pool is also used to allocate a
175 /// new stack for this task.
176 pub fn configure(pool: &mut StackPool,
177 opts: TaskOpts,
178 f: proc():Send) -> Box<GreenTask> {
179 let TaskOpts {
180 notify_chan, name, stack_size,
181 stderr, stdout,
182 } = opts;
183
184 let mut green = GreenTask::new(pool, stack_size, f);
185 {
186 let task = green.task.get_mut_ref();
187 task.name = name;
188 task.stderr = stderr;
189 task.stdout = stdout;
190 match notify_chan {
191 Some(chan) => {
192 task.death.on_exit = Some(SendMessage(chan));
193 }
194 None => {}
195 }
196 }
197 return green;
198 }
199
200 /// Just like the `maybe_take_runtime` function, this function should *not*
201 /// exist. Usage of this function is _strongly_ discouraged. This is an
202 /// absolute last resort necessary for converting a libstd task to a green
203 /// task.
204 ///
205 /// This function will assert that the task is indeed a green task before
206 /// returning (and will kill the entire process if this is wrong).
207 pub fn convert(mut task: Box<Task>) -> Box<GreenTask> {
208 match task.maybe_take_runtime::<GreenTask>() {
209 Some(mut green) => {
210 green.put_task(task);
211 green
212 }
213 None => rtabort!("not a green task any more?"),
214 }
215 }
216
217 pub fn give_home(&mut self, new_home: Home) {
218 match self.task_type {
219 TypeGreen(ref mut home) => { *home = Some(new_home); }
220 TypeSched => rtabort!("type error: used SchedTask as GreenTask"),
221 }
222 }
223
224 pub fn take_unwrap_home(&mut self) -> Home {
225 match self.task_type {
226 TypeGreen(ref mut home) => home.take_unwrap(),
227 TypeSched => rtabort!("type error: used SchedTask as GreenTask"),
228 }
229 }
230
231 // New utility functions for homes.
232
233 pub fn is_home_no_tls(&self, sched: &Scheduler) -> bool {
234 match self.task_type {
235 TypeGreen(Some(AnySched)) => { false }
236 TypeGreen(Some(HomeSched(SchedHandle { sched_id: ref id, .. }))) => {
237 *id == sched.sched_id()
238 }
239 TypeGreen(None) => { rtabort!("task without home"); }
240 TypeSched => {
241 // Awe yea
242 rtabort!("type error: expected: TypeGreen, found: TaskSched");
243 }
244 }
245 }
246
247 pub fn homed(&self) -> bool {
248 match self.task_type {
249 TypeGreen(Some(AnySched)) => { false }
250 TypeGreen(Some(HomeSched(SchedHandle { .. }))) => { true }
251 TypeGreen(None) => {
252 rtabort!("task without home");
253 }
254 TypeSched => {
255 rtabort!("type error: expected: TypeGreen, found: TaskSched");
256 }
257 }
258 }
259
260 pub fn is_sched(&self) -> bool {
261 match self.task_type {
262 TypeGreen(..) => false, TypeSched => true,
263 }
264 }
265
266 // Unsafe functions for transferring ownership of this GreenTask across
267 // context switches
268
269 pub fn as_uint(&self) -> uint {
270 self as *GreenTask as uint
271 }
272
273 pub unsafe fn from_uint(val: uint) -> Box<GreenTask> {
274 cast::transmute(val)
275 }
276
277 // Runtime glue functions and helpers
278
279 pub fn put_with_sched(mut ~self, sched: Box<Scheduler>) {
280 assert!(self.sched.is_none());
281 self.sched = Some(sched);
282 self.put();
283 }
284
285 pub fn put_task(&mut self, task: Box<Task>) {
286 assert!(self.task.is_none());
287 self.task = Some(task);
288 }
289
290 pub fn swap(mut ~self) -> Box<Task> {
291 let mut task = self.task.take_unwrap();
292 task.put_runtime(self);
293 return task;
294 }
295
296 pub fn put(~self) {
297 assert!(self.sched.is_some());
298 Local::put(self.swap());
299 }
300
301 fn terminate(mut ~self) -> ! {
302 let sched = self.sched.take_unwrap();
303 sched.terminate_current_task(self)
304 }
305
306 // This function is used to remotely wakeup this green task back on to its
307 // original pool of schedulers. In order to do so, each tasks arranges a
308 // SchedHandle upon descheduling to be available for sending itself back to
309 // the original pool.
310 //
311 // Note that there is an interesting transfer of ownership going on here. We
312 // must relinquish ownership of the green task, but then also send the task
313 // over the handle back to the original scheduler. In order to safely do
314 // this, we leverage the already-present "nasty descheduling lock". The
315 // reason for doing this is that each task will bounce on this lock after
316 // resuming after a context switch. By holding the lock over the enqueueing
317 // of the task, we're guaranteed that the SchedHandle's memory will be valid
318 // for this entire function.
319 //
320 // An alternative would include having incredibly cheaply cloneable handles,
321 // but right now a SchedHandle is something like 6 allocations, so it is
322 // *not* a cheap operation to clone a handle. Until the day comes that we
323 // need to optimize this, a lock should do just fine (it's completely
324 // uncontended except for when the task is rescheduled).
325 fn reawaken_remotely(mut ~self) {
326 unsafe {
327 let mtx = &mut self.nasty_deschedule_lock as *mut NativeMutex;
328 let handle = self.handle.get_mut_ref() as *mut SchedHandle;
329 let _guard = (*mtx).lock();
330 (*handle).send(RunOnce(self));
331 }
332 }
333 }
334
335 impl Runtime for GreenTask {
336 fn yield_now(mut ~self, cur_task: Box<Task>) {
337 self.put_task(cur_task);
338 let sched = self.sched.take_unwrap();
339 sched.yield_now(self);
340 }
341
342 fn maybe_yield(mut ~self, cur_task: Box<Task>) {
343 self.put_task(cur_task);
344 let sched = self.sched.take_unwrap();
345 sched.maybe_yield(self);
346 }
347
348 fn deschedule(mut ~self, times: uint, cur_task: Box<Task>,
349 f: |BlockedTask| -> Result<(), BlockedTask>) {
350 self.put_task(cur_task);
351 let mut sched = self.sched.take_unwrap();
352
353 // In order for this task to be reawoken in all possible contexts, we
354 // may need a handle back in to the current scheduler. When we're woken
355 // up in anything other than the local scheduler pool, this handle is
356 // used to send this task back into the scheduler pool.
357 if self.handle.is_none() {
358 self.handle = Some(sched.make_handle());
359 self.pool_id = sched.pool_id;
360 }
361
362 // This code is pretty standard, except for the usage of
363 // `GreenTask::convert`. Right now if we use `reawaken` directly it will
364 // expect for there to be a task in local TLS, but that is not true for
365 // this deschedule block (because the scheduler must retain ownership of
366 // the task while the cleanup job is running). In order to get around
367 // this for now, we invoke the scheduler directly with the converted
368 // Task => GreenTask structure.
369 if times == 1 {
370 sched.deschedule_running_task_and_then(self, |sched, task| {
371 match f(task) {
372 Ok(()) => {}
373 Err(t) => {
374 t.wake().map(|t| {
375 sched.enqueue_task(GreenTask::convert(t))
376 });
377 }
378 }
379 });
380 } else {
381 sched.deschedule_running_task_and_then(self, |sched, task| {
382 for task in task.make_selectable(times) {
383 match f(task) {
384 Ok(()) => {},
385 Err(task) => {
386 task.wake().map(|t| {
387 sched.enqueue_task(GreenTask::convert(t))
388 });
389 break
390 }
391 }
392 }
393 });
394 }
395 }
396
397 fn reawaken(mut ~self, to_wake: Box<Task>) {
398 self.put_task(to_wake);
399 assert!(self.sched.is_none());
400
401 // Optimistically look for a local task, but if one's not available to
402 // inspect (in order to see if it's in the same sched pool as we are),
403 // then just use our remote wakeup routine and carry on!
404 let mut running_task: Box<Task> = match Local::try_take() {
405 Some(task) => task,
406 None => return self.reawaken_remotely()
407 };
408
409 // Waking up a green thread is a bit of a tricky situation. We have no
410 // guarantee about where the current task is running. The options we
411 // have for where this current task is running are:
412 //
413 // 1. Our original scheduler pool
414 // 2. Some other scheduler pool
415 // 3. Something that isn't a scheduler pool
416 //
417 // In order to figure out what case we're in, this is the reason that
418 // the `maybe_take_runtime` function exists. Using this function we can
419 // dynamically check to see which of these cases is the current
420 // situation and then dispatch accordingly.
421 //
422 // In case 1, we just use the local scheduler to resume ourselves
423 // immediately (if a rescheduling is possible).
424 //
425 // In case 2 and 3, we need to remotely reawaken ourself in order to be
426 // transplanted back to the correct scheduler pool.
427 match running_task.maybe_take_runtime::<GreenTask>() {
428 Some(mut running_green_task) => {
429 running_green_task.put_task(running_task);
430 let sched = running_green_task.sched.take_unwrap();
431
432 if sched.pool_id == self.pool_id {
433 sched.run_task(running_green_task, self);
434 } else {
435 self.reawaken_remotely();
436
437 // put that thing back where it came from!
438 running_green_task.put_with_sched(sched);
439 }
440 }
441 None => {
442 self.reawaken_remotely();
443 Local::put(running_task);
444 }
445 }
446 }
447
448 fn spawn_sibling(mut ~self,
449 cur_task: Box<Task>,
450 opts: TaskOpts,
451 f: proc():Send) {
452 self.put_task(cur_task);
453
454 // Spawns a task into the current scheduler. We allocate the new task's
455 // stack from the scheduler's stack pool, and then configure it
456 // accordingly to `opts`. Afterwards we bootstrap it immediately by
457 // switching to it.
458 //
459 // Upon returning, our task is back in TLS and we're good to return.
460 let mut sched = self.sched.take_unwrap();
461 let sibling = GreenTask::configure(&mut sched.stack_pool, opts, f);
462 sched.run_task(self, sibling)
463 }
464
465 // Local I/O is provided by the scheduler's event loop
466 fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
467 match self.sched.get_mut_ref().event_loop.io() {
468 Some(io) => Some(rtio::LocalIo::new(io)),
469 None => None,
470 }
471 }
472
473 fn stack_bounds(&self) -> (uint, uint) {
474 let c = self.coroutine.as_ref()
475 .expect("GreenTask.stack_bounds called without a coroutine");
476
477 // Don't return the red zone as part of the usable stack of this task,
478 // it's essentially an implementation detail.
479 (c.current_stack_segment.start() as uint + stack::RED_ZONE,
480 c.current_stack_segment.end() as uint)
481 }
482
483 fn can_block(&self) -> bool { false }
484
485 fn wrap(~self) -> Box<Any> { self as Box<Any> }
486 }
487
488 #[cfg(test)]
489 mod tests {
490 use std::rt::local::Local;
491 use std::rt::task::Task;
492 use std::task;
493 use std::task::TaskOpts;
494
495 use super::super::{PoolConfig, SchedPool};
496 use super::GreenTask;
497
498 fn spawn_opts(opts: TaskOpts, f: proc():Send) {
499 let mut pool = SchedPool::new(PoolConfig {
500 threads: 1,
501 event_loop_factory: ::rustuv::event_loop,
502 });
503 pool.spawn(opts, f);
504 pool.shutdown();
505 }
506
507 #[test]
508 fn smoke() {
509 let (tx, rx) = channel();
510 spawn_opts(TaskOpts::new(), proc() {
511 tx.send(());
512 });
513 rx.recv();
514 }
515
516 #[test]
517 fn smoke_fail() {
518 let (tx, rx) = channel::<int>();
519 spawn_opts(TaskOpts::new(), proc() {
520 let _tx = tx;
521 fail!()
522 });
523 assert_eq!(rx.recv_opt(), Err(()));
524 }
525
526 #[test]
527 fn smoke_opts() {
528 let mut opts = TaskOpts::new();
529 opts.name = Some("test".into_maybe_owned());
530 opts.stack_size = Some(20 * 4096);
531 let (tx, rx) = channel();
532 opts.notify_chan = Some(tx);
533 spawn_opts(opts, proc() {});
534 assert!(rx.recv().is_ok());
535 }
536
537 #[test]
538 fn smoke_opts_fail() {
539 let mut opts = TaskOpts::new();
540 let (tx, rx) = channel();
541 opts.notify_chan = Some(tx);
542 spawn_opts(opts, proc() { fail!() });
543 assert!(rx.recv().is_err());
544 }
545
546 #[test]
547 fn yield_test() {
548 let (tx, rx) = channel();
549 spawn_opts(TaskOpts::new(), proc() {
550 for _ in range(0, 10) { task::deschedule(); }
551 tx.send(());
552 });
553 rx.recv();
554 }
555
556 #[test]
557 fn spawn_children() {
558 let (tx1, rx) = channel();
559 spawn_opts(TaskOpts::new(), proc() {
560 let (tx2, rx) = channel();
561 spawn(proc() {
562 let (tx3, rx) = channel();
563 spawn(proc() {
564 tx3.send(());
565 });
566 rx.recv();
567 tx2.send(());
568 });
569 rx.recv();
570 tx1.send(());
571 });
572 rx.recv();
573 }
574
575 #[test]
576 fn spawn_inherits() {
577 let (tx, rx) = channel();
578 spawn_opts(TaskOpts::new(), proc() {
579 spawn(proc() {
580 let mut task: Box<Task> = Local::take();
581 match task.maybe_take_runtime::<GreenTask>() {
582 Some(ops) => {
583 task.put_runtime(ops);
584 }
585 None => fail!(),
586 }
587 Local::put(task);
588 tx.send(());
589 });
590 });
591 rx.recv();
592 }
593 }