(index<- ) ./libgreen/sched.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::cast;
12 use std::rt::local::Local;
13 use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop};
14 use std::rt::task::BlockedTask;
15 use std::rt::task::Task;
16 use std::sync::deque;
17 use std::unstable::mutex::NativeMutex;
18 use std::raw;
19
20 use rand::{XorShiftRng, Rng, Rand};
21
22 use TaskState;
23 use context::Context;
24 use coroutine::Coroutine;
25 use sleeper_list::SleeperList;
26 use stack::StackPool;
27 use task::{TypeSched, GreenTask, HomeSched, AnySched};
28 use msgq = message_queue;
29
30 /// A scheduler is responsible for coordinating the execution of Tasks
31 /// on a single thread. The scheduler runs inside a slightly modified
32 /// Rust Task. When not running this task is stored in the scheduler
33 /// struct. The scheduler struct acts like a baton, all scheduling
34 /// actions are transfers of the baton.
35 ///
36 /// FIXME: This creates too many callbacks to run_sched_once, resulting
37 /// in too much allocation and too many events.
38 pub struct Scheduler {
39 /// ID number of the pool that this scheduler is a member of. When
40 /// reawakening green tasks, this is used to ensure that tasks aren't
41 /// reawoken on the wrong pool of schedulers.
42 pub pool_id: uint,
43 /// The pool of stacks that this scheduler has cached
44 pub stack_pool: StackPool,
45 /// Bookkeeping for the number of tasks which are currently running around
46 /// inside this pool of schedulers
47 pub task_state: TaskState,
48 /// There are N work queues, one per scheduler.
49 work_queue: deque::Worker<Box<GreenTask>>,
50 /// Work queues for the other schedulers. These are created by
51 /// cloning the core work queues.
52 work_queues: Vec<deque::Stealer<Box<GreenTask>>>,
53 /// The queue of incoming messages from other schedulers.
54 /// These are enqueued by SchedHandles after which a remote callback
55 /// is triggered to handle the message.
56 message_queue: msgq::Consumer<SchedMessage>,
57 /// Producer used to clone sched handles from
58 message_producer: msgq::Producer<SchedMessage>,
59 /// A shared list of sleeping schedulers. We'll use this to wake
60 /// up schedulers when pushing work onto the work queue.
61 sleeper_list: SleeperList,
62 /// Indicates that we have previously pushed a handle onto the
63 /// SleeperList but have not yet received the Wake message.
64 /// Being `true` does not necessarily mean that the scheduler is
65 /// not active since there are multiple event sources that may
66 /// wake the scheduler. It just prevents the scheduler from pushing
67 /// multiple handles onto the sleeper list.
68 sleepy: bool,
69 /// A flag to indicate we've received the shutdown message and should
70 /// no longer try to go to sleep, but exit instead.
71 no_sleep: bool,
72 /// The scheduler runs on a special task. When it is not running
73 /// it is stored here instead of the work queue.
74 sched_task: Option<Box<GreenTask>>,
75 /// An action performed after a context switch on behalf of the
76 /// code running before the context switch
77 cleanup_job: Option<CleanupJob>,
78 /// If the scheduler shouldn't run some tasks, a friend to send
79 /// them to.
80 friend_handle: Option<SchedHandle>,
81 /// Should this scheduler run any task, or only pinned tasks?
82 run_anything: bool,
83 /// A fast XorShift rng for scheduler use
84 rng: XorShiftRng,
85 /// A togglable idle callback
86 idle_callback: Option<Box<PausableIdleCallback:Send>>,
87 /// A countdown that starts at a random value and is decremented
88 /// every time a yield check is performed. When it hits 0 a task
89 /// will yield.
90 yield_check_count: uint,
91 /// A flag to tell the scheduler loop it needs to do some stealing
92 /// in order to introduce randomness as part of a yield
93 steal_for_yield: bool,
94
95 // n.b. currently destructors of an object are run in top-to-bottom in order
96 // of field declaration. Due to its nature, the pausable idle callback
97 // must have some sort of handle to the event loop, so it needs to get
98 // destroyed before the event loop itself. For this reason, we destroy
99 // the event loop last to ensure that any unsafe references to it are
100 // destroyed before it's actually destroyed.
101
102 /// The event loop used to drive the scheduler and perform I/O
103 pub event_loop: Box<EventLoop:Send>,
104 }
105
106 /// An indication of how hard to work on a given operation, the difference
107 /// mainly being whether memory is synchronized or not
108 #[deriving(Eq)]
109 enum EffortLevel {
110 DontTryTooHard,
111 GiveItYourBest
112 }
113
114 static MAX_YIELD_CHECKS: uint = 20000;
115
116 fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
117 let r: uint = Rand::rand(rng);
118 r % MAX_YIELD_CHECKS + 1
119 }
120
121 impl Scheduler {
122
123 // * Initialization Functions
124
125 pub fn new(pool_id: uint,
126 event_loop: Box<EventLoop:Send>,
127 work_queue: deque::Worker<Box<GreenTask>>,
128 work_queues: Vec<deque::Stealer<Box<GreenTask>>>,
129 sleeper_list: SleeperList,
130 state: TaskState)
131 -> Scheduler {
132
133 Scheduler::new_special(pool_id, event_loop, work_queue, work_queues,
134 sleeper_list, true, None, state)
135
136 }
137
138 pub fn new_special(pool_id: uint,
139 event_loop: Box<EventLoop:Send>,
140 work_queue: deque::Worker<Box<GreenTask>>,
141 work_queues: Vec<deque::Stealer<Box<GreenTask>>>,
142 sleeper_list: SleeperList,
143 run_anything: bool,
144 friend: Option<SchedHandle>,
145 state: TaskState)
146 -> Scheduler {
147
148 let (consumer, producer) = msgq::queue();
149 let mut sched = Scheduler {
150 pool_id: pool_id,
151 sleeper_list: sleeper_list,
152 message_queue: consumer,
153 message_producer: producer,
154 sleepy: false,
155 no_sleep: false,
156 event_loop: event_loop,
157 work_queue: work_queue,
158 work_queues: work_queues,
159 stack_pool: StackPool::new(),
160 sched_task: None,
161 cleanup_job: None,
162 run_anything: run_anything,
163 friend_handle: friend,
164 rng: new_sched_rng(),
165 idle_callback: None,
166 yield_check_count: 0,
167 steal_for_yield: false,
168 task_state: state,
169 };
170
171 sched.yield_check_count = reset_yield_check(&mut sched.rng);
172
173 return sched;
174 }
175
176 // FIXME: This may eventually need to be refactored so that
177 // the scheduler itself doesn't have to call event_loop.run.
178 // That will be important for embedding the runtime into external
179 // event loops.
180
181 // Take a main task to run, and a scheduler to run it in. Create a
182 // scheduler task and bootstrap into it.
183 pub fn bootstrap(mut ~self) {
184
185 // Build an Idle callback.
186 let cb = box SchedRunner as Box<Callback:Send>;
187 self.idle_callback = Some(self.event_loop.pausable_idle_callback(cb));
188
189 // Create a task for the scheduler with an empty context.
190 let sched_task = GreenTask::new_typed(Some(Coroutine::empty()),
191 TypeSched);
192
193 // Before starting our first task, make sure the idle callback
194 // is active. As we do not start in the sleep state this is
195 // important.
196 self.idle_callback.get_mut_ref().resume();
197
198 // Now, as far as all the scheduler state is concerned, we are inside
199 // the "scheduler" context. The scheduler immediately hands over control
200 // to the event loop, and this will only exit once the event loop no
201 // longer has any references (handles or I/O objects).
202 rtdebug!("starting scheduler {}", self.sched_id());
203 let mut sched_task = self.run(sched_task);
204
205 // Close the idle callback.
206 let mut sched = sched_task.sched.take_unwrap();
207 sched.idle_callback.take();
208 // Make one go through the loop to run the close callback.
209 let mut stask = sched.run(sched_task);
210
211 // Now that we are done with the scheduler, clean up the
212 // scheduler task. Do so by removing it from TLS and manually
213 // cleaning up the memory it uses. As we didn't actually call
214 // task.run() on the scheduler task we never get through all
215 // the cleanup code it runs.
216 rtdebug!("stopping scheduler {}", stask.sched.get_ref().sched_id());
217
218 // Should not have any messages
219 let message = stask.sched.get_mut_ref().message_queue.pop();
220 rtassert!(match message { msgq::Empty => true, _ => false });
221
222 stask.task.get_mut_ref().destroyed = true;
223 }
224
225 // This does not return a scheduler, as the scheduler is placed
226 // inside the task.
227 pub fn run(mut ~self, stask: Box<GreenTask>) -> Box<GreenTask> {
228
229 // This is unsafe because we need to place the scheduler, with
230 // the event_loop inside, inside our task. But we still need a
231 // mutable reference to the event_loop to give it the "run"
232 // command.
233 unsafe {
234 let event_loop: *mut Box<EventLoop:Send> = &mut self.event_loop;
235 // Our scheduler must be in the task before the event loop
236 // is started.
237 stask.put_with_sched(self);
238 (*event_loop).run();
239 }
240
241 // This is a serious code smell, but this function could be done away
242 // with if necessary. The ownership of `stask` was transferred into
243 // local storage just before the event loop ran, so it is possible to
244 // transmute `stask` as a uint across the running of the event loop to
245 // re-acquire ownership here.
246 //
247 // This would involve removing the Task from TLS, removing the runtime,
248 // forgetting the runtime, and then putting the task into `stask`. For
249 // now, because we have `GreenTask::convert`, I chose to take this
250 // method for cleanliness. This function is *not* a fundamental reason
251 // why this function should exist.
252 GreenTask::convert(Local::take())
253 }
254
255 // * Execution Functions - Core Loop Logic
256
257 // This function is run from the idle callback on the uv loop, indicating
258 // that there are no I/O events pending. When this function returns, we will
259 // fall back to epoll() in the uv event loop, waiting for more things to
260 // happen. We may come right back off epoll() if the idle callback is still
261 // active, in which case we're truly just polling to see if I/O events are
262 // complete.
263 //
264 // The model for this function is to execute as much work as possible while
265 // still fairly considering I/O tasks. Falling back to epoll() frequently is
266 // often quite expensive, so we attempt to avoid it as much as possible. If
267 // we have any active I/O on the event loop, then we're forced to fall back
268 // to epoll() in order to provide fairness, but as long as we're doing work
269 // and there's no active I/O, we can continue to do work.
270 //
271 // If we try really hard to do some work, but no work is available to be
272 // done, then we fall back to epoll() to block this thread waiting for more
273 // work (instead of busy waiting).
274 fn run_sched_once(mut ~self, stask: Box<GreenTask>) {
275 // Make sure that we're not lying in that the `stask` argument is indeed
276 // the scheduler task for this scheduler.
277 assert!(self.sched_task.is_none());
278
279 // Assume that we need to continue idling unless we reach the
280 // end of this function without performing an action.
281 self.idle_callback.get_mut_ref().resume();
282
283 // First we check for scheduler messages, these are higher
284 // priority than regular tasks.
285 let (mut sched, mut stask, mut did_work) =
286 self.interpret_message_queue(stask, DontTryTooHard);
287
288 // After processing a message, we consider doing some more work on the
289 // event loop. The "keep going" condition changes after the first
290 // iteration becase we don't want to spin here infinitely.
291 //
292 // Once we start doing work we can keep doing work so long as the
293 // iteration does something. Note that we don't want to starve the
294 // message queue here, so each iteration when we're done working we
295 // check the message queue regardless of whether we did work or not.
296 let mut keep_going = !did_work || !sched.event_loop.has_active_io();
297 while keep_going {
298 let (a, b, c) = match sched.do_work(stask) {
299 (sched, task, false) => {
300 sched.interpret_message_queue(task, GiveItYourBest)
301 }
302 (sched, task, true) => {
303 let (sched, task, _) =
304 sched.interpret_message_queue(task, GiveItYourBest);
305 (sched, task, true)
306 }
307 };
308 sched = a;
309 stask = b;
310 did_work = c;
311
312 // We only keep going if we managed to do something productive and
313 // also don't have any active I/O. If we didn't do anything, we
314 // should consider going to sleep, and if we have active I/O we need
315 // to poll for completion.
316 keep_going = did_work && !sched.event_loop.has_active_io();
317 }
318
319 // If we ever did some work, then we shouldn't put our scheduler
320 // entirely to sleep just yet. Leave the idle callback active and fall
321 // back to epoll() to see what's going on.
322 if did_work {
323 return stask.put_with_sched(sched);
324 }
325
326 // If we got here then there was no work to do.
327 // Generate a SchedHandle and push it to the sleeper list so
328 // somebody can wake us up later.
329 if !sched.sleepy && !sched.no_sleep {
330 rtdebug!("scheduler has no work to do, going to sleep");
331 sched.sleepy = true;
332 let handle = sched.make_handle();
333 sched.sleeper_list.push(handle);
334 // Since we are sleeping, deactivate the idle callback.
335 sched.idle_callback.get_mut_ref().pause();
336 } else {
337 rtdebug!("not sleeping, already doing so or no_sleep set");
338 // We may not be sleeping, but we still need to deactivate
339 // the idle callback.
340 sched.idle_callback.get_mut_ref().pause();
341 }
342
343 // Finished a cycle without using the Scheduler. Place it back
344 // in TLS.
345 stask.put_with_sched(sched);
346 }
347
348 // This function returns None if the scheduler is "used", or it
349 // returns the still-available scheduler. At this point all
350 // message-handling will count as a turn of work, and as a result
351 // return None.
352 fn interpret_message_queue(mut ~self, stask: Box<GreenTask>,
353 effort: EffortLevel)
354 -> (Box<Scheduler>, Box<GreenTask>, bool)
355 {
356
357 let msg = if effort == DontTryTooHard {
358 self.message_queue.casual_pop()
359 } else {
360 // When popping our message queue, we could see an "inconsistent"
361 // state which means that we *should* be able to pop data, but we
362 // are unable to at this time. Our options are:
363 //
364 // 1. Spin waiting for data
365 // 2. Ignore this and pretend we didn't find a message
366 //
367 // If we choose route 1, then if the pusher in question is currently
368 // pre-empted, we're going to take up our entire time slice just
369 // spinning on this queue. If we choose route 2, then the pusher in
370 // question is still guaranteed to make a send() on its async
371 // handle, so we will guaranteed wake up and see its message at some
372 // point.
373 //
374 // I have chosen to take route #2.
375 match self.message_queue.pop() {
376 msgq::Data(t) => Some(t),
377 msgq::Empty | msgq::Inconsistent => None
378 }
379 };
380
381 match msg {
382 Some(PinnedTask(task)) => {
383 let mut task = task;
384 task.give_home(HomeSched(self.make_handle()));
385 let (sched, task) = self.resume_task_immediately(stask, task);
386 (sched, task, true)
387 }
388 Some(TaskFromFriend(task)) => {
389 rtdebug!("got a task from a friend. lovely!");
390 let (sched, task) =
391 self.process_task(stask, task,
392 Scheduler::resume_task_immediately_cl);
393 (sched, task, true)
394 }
395 Some(RunOnce(task)) => {
396 // bypass the process_task logic to force running this task once
397 // on this home scheduler. This is often used for I/O (homing).
398 let (sched, task) = self.resume_task_immediately(stask, task);
399 (sched, task, true)
400 }
401 Some(Wake) => {
402 self.sleepy = false;
403 (self, stask, true)
404 }
405 Some(Shutdown) => {
406 rtdebug!("shutting down");
407 if self.sleepy {
408 // There may be an outstanding handle on the
409 // sleeper list. Pop them all to make sure that's
410 // not the case.
411 loop {
412 match self.sleeper_list.pop() {
413 Some(handle) => {
414 let mut handle = handle;
415 handle.send(Wake);
416 }
417 None => break
418 }
419 }
420 }
421 // No more sleeping. After there are no outstanding
422 // event loop references we will shut down.
423 self.no_sleep = true;
424 self.sleepy = false;
425 (self, stask, true)
426 }
427 Some(NewNeighbor(neighbor)) => {
428 self.work_queues.push(neighbor);
429 (self, stask, false)
430 }
431 None => (self, stask, false)
432 }
433 }
434
435 fn do_work(mut ~self, stask: Box<GreenTask>)
436 -> (Box<Scheduler>, Box<GreenTask>, bool) {
437 rtdebug!("scheduler calling do work");
438 match self.find_work() {
439 Some(task) => {
440 rtdebug!("found some work! running the task");
441 let (sched, task) =
442 self.process_task(stask, task,
443 Scheduler::resume_task_immediately_cl);
444 (sched, task, true)
445 }
446 None => {
447 rtdebug!("no work was found, returning the scheduler struct");
448 (self, stask, false)
449 }
450 }
451 }
452
453 // Workstealing: In this iteration of the runtime each scheduler
454 // thread has a distinct work queue. When no work is available
455 // locally, make a few attempts to steal work from the queues of
456 // other scheduler threads. If a few steals fail we end up in the
457 // old "no work" path which is fine.
458
459 // First step in the process is to find a task. This function does
460 // that by first checking the local queue, and if there is no work
461 // there, trying to steal from the remote work queues.
462 fn find_work(&mut self) -> Option<Box<GreenTask>> {
463 rtdebug!("scheduler looking for work");
464 if !self.steal_for_yield {
465 match self.work_queue.pop() {
466 Some(task) => {
467 rtdebug!("found a task locally");
468 return Some(task)
469 }
470 None => {
471 rtdebug!("scheduler trying to steal");
472 return self.try_steals();
473 }
474 }
475 } else {
476 // During execution of the last task, it performed a 'yield',
477 // so we're doing some work stealing in order to introduce some
478 // scheduling randomness. Otherwise we would just end up popping
479 // that same task again. This is pretty lame and is to work around
480 // the problem that work stealing is not designed for 'non-strict'
481 // (non-fork-join) task parallelism.
482 self.steal_for_yield = false;
483 match self.try_steals() {
484 Some(task) => {
485 rtdebug!("stole a task after yielding");
486 return Some(task);
487 }
488 None => {
489 rtdebug!("did not steal a task after yielding");
490 // Back to business
491 return self.find_work();
492 }
493 }
494 }
495 }
496
497 // Try stealing from all queues the scheduler knows about. This
498 // naive implementation can steal from our own queue or from other
499 // special schedulers.
500 fn try_steals(&mut self) -> Option<Box<GreenTask>> {
501 let work_queues = &mut self.work_queues;
502 let len = work_queues.len();
503 let start_index = self.rng.gen_range(0, len);
504 for index in range(0, len).map(|i| (i + start_index) % len) {
505 match work_queues.get_mut(index).steal() {
506 deque::Data(task) => {
507 rtdebug!("found task by stealing");
508 return Some(task)
509 }
510 _ => ()
511 }
512 };
513 rtdebug!("giving up on stealing");
514 return None;
515 }
516
517 // * Task Routing Functions - Make sure tasks send up in the right
518 // place.
519
520 fn process_task(mut ~self,
521 cur: Box<GreenTask>,
522 mut next: Box<GreenTask>,
523 schedule_fn: SchedulingFn)
524 -> (Box<Scheduler>, Box<GreenTask>) {
525 rtdebug!("processing a task");
526
527 match next.take_unwrap_home() {
528 HomeSched(home_handle) => {
529 if home_handle.sched_id != self.sched_id() {
530 rtdebug!("sending task home");
531 next.give_home(HomeSched(home_handle));
532 Scheduler::send_task_home(next);
533 (self, cur)
534 } else {
535 rtdebug!("running task here");
536 next.give_home(HomeSched(home_handle));
537 schedule_fn(self, cur, next)
538 }
539 }
540 AnySched if self.run_anything => {
541 rtdebug!("running anysched task here");
542 next.give_home(AnySched);
543 schedule_fn(self, cur, next)
544 }
545 AnySched => {
546 rtdebug!("sending task to friend");
547 next.give_home(AnySched);
548 self.send_to_friend(next);
549 (self, cur)
550 }
551 }
552 }
553
554 fn send_task_home(task: Box<GreenTask>) {
555 let mut task = task;
556 match task.take_unwrap_home() {
557 HomeSched(mut home_handle) => home_handle.send(PinnedTask(task)),
558 AnySched => rtabort!("error: cannot send anysched task home"),
559 }
560 }
561
562 /// Take a non-homed task we aren't allowed to run here and send
563 /// it to the designated friend scheduler to execute.
564 fn send_to_friend(&mut self, task: Box<GreenTask>) {
565 rtdebug!("sending a task to friend");
566 match self.friend_handle {
567 Some(ref mut handle) => {
568 handle.send(TaskFromFriend(task));
569 }
570 None => {
571 rtabort!("tried to send task to a friend but scheduler has no friends");
572 }
573 }
574 }
575
576 /// Schedule a task to be executed later.
577 ///
578 /// Pushes the task onto the work stealing queue and tells the
579 /// event loop to run it later. Always use this instead of pushing
580 /// to the work queue directly.
581 pub fn enqueue_task(&mut self, task: Box<GreenTask>) {
582
583 // We push the task onto our local queue clone.
584 assert!(!task.is_sched());
585 self.work_queue.push(task);
586 match self.idle_callback {
587 Some(ref mut idle) => idle.resume(),
588 None => {} // allow enqueuing before the scheduler starts
589 }
590
591 // We've made work available. Notify a
592 // sleeping scheduler.
593
594 match self.sleeper_list.casual_pop() {
595 Some(handle) => {
596 let mut handle = handle;
597 handle.send(Wake)
598 }
599 None => { (/* pass */) }
600 };
601 }
602
603 // * Core Context Switching Functions
604
605 // The primary function for changing contexts. In the current
606 // design the scheduler is just a slightly modified GreenTask, so
607 // all context swaps are from GreenTask to GreenTask. The only difference
608 // between the various cases is where the inputs come from, and
609 // what is done with the resulting task. That is specified by the
610 // cleanup function f, which takes the scheduler and the
611 // old task as inputs.
612
613 pub fn change_task_context(mut ~self,
614 current_task: Box<GreenTask>,
615 mut next_task: Box<GreenTask>,
616 f: |&mut Scheduler, Box<GreenTask>|)
617 -> Box<GreenTask> {
618 let f_opaque = ClosureConverter::from_fn(f);
619
620 let current_task_dupe = &*current_task as *GreenTask;
621
622 // The current task is placed inside an enum with the cleanup
623 // function. This enum is then placed inside the scheduler.
624 self.cleanup_job = Some(CleanupJob::new(current_task, f_opaque));
625
626 // The scheduler is then placed inside the next task.
627 next_task.sched = Some(self);
628
629 // However we still need an internal mutable pointer to the
630 // original task. The strategy here was "arrange memory, then
631 // get pointers", so we crawl back up the chain using
632 // transmute to eliminate borrowck errors.
633 unsafe {
634
635 let sched: &mut Scheduler =
636 cast::transmute_mut_lifetime(*next_task.sched.get_mut_ref());
637
638 let current_task: &mut GreenTask = match sched.cleanup_job {
639 Some(CleanupJob { task: ref mut task, .. }) => &mut **task,
640 None => rtabort!("no cleanup job")
641 };
642
643 let (current_task_context, next_task_context) =
644 Scheduler::get_contexts(current_task, next_task);
645
646 // Done with everything - put the next task in TLS. This
647 // works because due to transmute the borrow checker
648 // believes that we have no internal pointers to
649 // next_task.
650 cast::forget(next_task);
651
652 // The raw context swap operation. The next action taken
653 // will be running the cleanup job from the context of the
654 // next task.
655 Context::swap(current_task_context, next_task_context);
656 }
657
658 // When the context swaps back to this task we immediately
659 // run the cleanup job, as expected by the previously called
660 // swap_contexts function.
661 let mut current_task: Box<GreenTask> = unsafe {
662 cast::transmute(current_task_dupe)
663 };
664 current_task.sched.get_mut_ref().run_cleanup_job();
665
666 // See the comments in switch_running_tasks_and_then for why a lock
667 // is acquired here. This is the resumption points and the "bounce"
668 // that it is referring to.
669 unsafe {
670 let _guard = current_task.nasty_deschedule_lock.lock();
671 }
672 return current_task;
673 }
674
675 // Returns a mutable reference to both contexts involved in this
676 // swap. This is unsafe - we are getting mutable internal
677 // references to keep even when we don't own the tasks. It looks
678 // kinda safe because we are doing transmutes before passing in
679 // the arguments.
680 pub fn get_contexts<'a>(current_task: &mut GreenTask, next_task: &mut GreenTask) ->
681 (&'a mut Context, &'a mut Context) {
682 let current_task_context =
683 &mut current_task.coroutine.get_mut_ref().saved_context;
684 let next_task_context =
685 &mut next_task.coroutine.get_mut_ref().saved_context;
686 unsafe {
687 (cast::transmute_mut_lifetime(current_task_context),
688 cast::transmute_mut_lifetime(next_task_context))
689 }
690 }
691
692 // * Context Swapping Helpers - Here be ugliness!
693
694 pub fn resume_task_immediately(~self,
695 cur: Box<GreenTask>,
696 next: Box<GreenTask>)
697 -> (Box<Scheduler>, Box<GreenTask>) {
698 assert!(cur.is_sched());
699 let mut cur = self.change_task_context(cur, next, |sched, stask| {
700 assert!(sched.sched_task.is_none());
701 sched.sched_task = Some(stask);
702 });
703 (cur.sched.take_unwrap(), cur)
704 }
705
706 fn resume_task_immediately_cl(sched: Box<Scheduler>,
707 cur: Box<GreenTask>,
708 next: Box<GreenTask>)
709 -> (Box<Scheduler>, Box<GreenTask>) {
710 sched.resume_task_immediately(cur, next)
711 }
712
713 /// Block a running task, context switch to the scheduler, then pass the
714 /// blocked task to a closure.
715 ///
716 /// # Safety note
717 ///
718 /// The closure here is a *stack* closure that lives in the
719 /// running task. It gets transmuted to the scheduler's lifetime
720 /// and called while the task is blocked.
721 ///
722 /// This passes a Scheduler pointer to the fn after the context switch
723 /// in order to prevent that fn from performing further scheduling operations.
724 /// Doing further scheduling could easily result in infinite recursion.
725 ///
726 /// Note that if the closure provided relinquishes ownership of the
727 /// BlockedTask, then it is possible for the task to resume execution before
728 /// the closure has finished executing. This would naturally introduce a
729 /// race if the closure and task shared portions of the environment.
730 ///
731 /// This situation is currently prevented, or in other words it is
732 /// guaranteed that this function will not return before the given closure
733 /// has returned.
734 pub fn deschedule_running_task_and_then(mut ~self,
735 cur: Box<GreenTask>,
736 f: |&mut Scheduler, BlockedTask|) {
737 // Trickier - we need to get the scheduler task out of self
738 // and use it as the destination.
739 let stask = self.sched_task.take_unwrap();
740 // Otherwise this is the same as below.
741 self.switch_running_tasks_and_then(cur, stask, f)
742 }
743
744 pub fn switch_running_tasks_and_then(~self,
745 cur: Box<GreenTask>,
746 next: Box<GreenTask>,
747 f: |&mut Scheduler, BlockedTask|) {
748 // And here comes one of the sad moments in which a lock is used in a
749 // core portion of the rust runtime. As always, this is highly
750 // undesirable, so there's a good reason behind it.
751 //
752 // There is an excellent outline of the problem in issue #8132, and it's
753 // summarized in that `f` is executed on a sched task, but its
754 // environment is on the previous task. If `f` relinquishes ownership of
755 // the BlockedTask, then it may introduce a race where `f` is using the
756 // environment as well as the code after the 'deschedule' block.
757 //
758 // The solution we have chosen to adopt for now is to acquire a
759 // task-local lock around this block. The resumption of the task in
760 // context switching will bounce on the lock, thereby waiting for this
761 // block to finish, eliminating the race mentioned above.
762 // fail!("should never return!");
763 //
764 // To actually maintain a handle to the lock, we use an unsafe pointer
765 // to it, but we're guaranteed that the task won't exit until we've
766 // unlocked the lock so there's no worry of this memory going away.
767 let cur = self.change_task_context(cur, next, |sched, mut task| {
768 let lock: *mut NativeMutex = &mut task.nasty_deschedule_lock;
769 unsafe {
770 let _guard = (*lock).lock();
771 f(sched, BlockedTask::block(task.swap()));
772 }
773 });
774 cur.put();
775 }
776
777 fn switch_task(sched: Box<Scheduler>,
778 cur: Box<GreenTask>,
779 next: Box<GreenTask>)
780 -> (Box<Scheduler>, Box<GreenTask>) {
781 let mut cur = sched.change_task_context(cur, next, |sched, last_task| {
782 if last_task.is_sched() {
783 assert!(sched.sched_task.is_none());
784 sched.sched_task = Some(last_task);
785 } else {
786 sched.enqueue_task(last_task);
787 }
788 });
789 (cur.sched.take_unwrap(), cur)
790 }
791
792 // * Task Context Helpers
793
794 /// Called by a running task to end execution, after which it will
795 /// be recycled by the scheduler for reuse in a new task.
796 pub fn terminate_current_task(mut ~self, cur: Box<GreenTask>) -> ! {
797 // Similar to deschedule running task and then, but cannot go through
798 // the task-blocking path. The task is already dying.
799 let stask = self.sched_task.take_unwrap();
800 let _cur = self.change_task_context(cur, stask, |sched, mut dead_task| {
801 let coroutine = dead_task.coroutine.take_unwrap();
802 coroutine.recycle(&mut sched.stack_pool);
803 sched.task_state.decrement();
804 });
805 fail!("should never return!");
806 }
807
808 pub fn run_task(~self, cur: Box<GreenTask>, next: Box<GreenTask>) {
809 let (sched, task) =
810 self.process_task(cur, next, Scheduler::switch_task);
811 task.put_with_sched(sched);
812 }
813
814 pub fn run_task_later(mut cur: Box<GreenTask>, next: Box<GreenTask>) {
815 let mut sched = cur.sched.take_unwrap();
816 sched.enqueue_task(next);
817 cur.put_with_sched(sched);
818 }
819
820 /// Yield control to the scheduler, executing another task. This is guaranteed
821 /// to introduce some amount of randomness to the scheduler. Currently the
822 /// randomness is a result of performing a round of work stealing (which
823 /// may end up stealing from the current scheduler).
824 pub fn yield_now(mut ~self, cur: Box<GreenTask>) {
825 // Async handles trigger the scheduler by calling yield_now on the local
826 // task, which eventually gets us to here. See comments in SchedRunner
827 // for more info on this.
828 if cur.is_sched() {
829 assert!(self.sched_task.is_none());
830 self.run_sched_once(cur);
831 } else {
832 self.yield_check_count = reset_yield_check(&mut self.rng);
833 // Tell the scheduler to start stealing on the next iteration
834 self.steal_for_yield = true;
835 let stask = self.sched_task.take_unwrap();
836 let cur = self.change_task_context(cur, stask, |sched, task| {
837 sched.enqueue_task(task);
838 });
839 cur.put()
840 }
841 }
842
843 pub fn maybe_yield(mut ~self, cur: Box<GreenTask>) {
844 // It's possible for sched tasks to possibly call this function, and it
845 // just means that they're likely sending on channels (which
846 // occasionally call this function). Sched tasks follow different paths
847 // when executing yield_now(), which may possibly trip the assertion
848 // below. For this reason, we just have sched tasks bail out soon.
849 //
850 // Sched tasks have no need to yield anyway because as soon as they
851 // return they'll yield to other threads by falling back to the event
852 // loop. Additionally, we completely control sched tasks, so we can make
853 // sure that they never execute more than enough code.
854 if cur.is_sched() {
855 return cur.put_with_sched(self)
856 }
857
858 // The number of times to do the yield check before yielding, chosen
859 // arbitrarily.
860 rtassert!(self.yield_check_count > 0);
861 self.yield_check_count -= 1;
862 if self.yield_check_count == 0 {
863 self.yield_now(cur);
864 } else {
865 cur.put_with_sched(self);
866 }
867 }
868
869
870 // * Utility Functions
871
872 pub fn sched_id(&self) -> uint { self as *Scheduler as uint }
873
874 pub fn run_cleanup_job(&mut self) {
875 let cleanup_job = self.cleanup_job.take_unwrap();
876 cleanup_job.run(self)
877 }
878
879 pub fn make_handle(&mut self) -> SchedHandle {
880 let remote = self.event_loop.remote_callback(box SchedRunner);
881
882 return SchedHandle {
883 remote: remote,
884 queue: self.message_producer.clone(),
885 sched_id: self.sched_id()
886 }
887 }
888 }
889
890 // Supporting types
891
892 type SchedulingFn = fn(Box<Scheduler>, Box<GreenTask>, Box<GreenTask>)
893 -> (Box<Scheduler>, Box<GreenTask>);
894
895 pub enum SchedMessage {
896 Wake,
897 Shutdown,
898 NewNeighbor(deque::Stealer<Box<GreenTask>>),
899 PinnedTask(Box<GreenTask>),
900 TaskFromFriend(Box<GreenTask>),
901 RunOnce(Box<GreenTask>),
902 }
903
904 pub struct SchedHandle {
905 remote: Box<RemoteCallback:Send>,
906 queue: msgq::Producer<SchedMessage>,
907 pub sched_id: uint
908 }
909
910 impl SchedHandle {
911 pub fn send(&mut self, msg: SchedMessage) {
912 self.queue.push(msg);
913 self.remote.fire();
914 }
915 }
916
917 struct SchedRunner;
918
919 impl Callback for SchedRunner {
920 fn call(&mut self) {
921 // In theory, this function needs to invoke the `run_sched_once`
922 // function on the scheduler. Sadly, we have no context here, except for
923 // knowledge of the local `Task`. In order to avoid a call to
924 // `GreenTask::convert`, we just call `yield_now` and the scheduler will
925 // detect when a sched task performs a yield vs a green task performing
926 // a yield (and act accordingly).
927 //
928 // This function could be converted to `GreenTask::convert` if
929 // absolutely necessary, but for cleanliness it is much better to not
930 // use the conversion function.
931 let task: Box<Task> = Local::take();
932 task.yield_now();
933 }
934 }
935
936 struct CleanupJob {
937 task: Box<GreenTask>,
938 f: UnsafeTaskReceiver
939 }
940
941 impl CleanupJob {
942 pub fn new(task: Box<GreenTask>, f: UnsafeTaskReceiver) -> CleanupJob {
943 CleanupJob {
944 task: task,
945 f: f
946 }
947 }
948
949 pub fn run(self, sched: &mut Scheduler) {
950 let CleanupJob { task: task, f: f } = self;
951 f.to_fn()(sched, task)
952 }
953 }
954
955 // FIXME: Some hacks to put a || closure in Scheduler without borrowck
956 // complaining
957 type UnsafeTaskReceiver = raw::Closure;
958 trait ClosureConverter {
959 fn from_fn(|&mut Scheduler, Box<GreenTask>|) -> Self;
960 fn to_fn(self) -> |&mut Scheduler, Box<GreenTask>|;
961 }
962 impl ClosureConverter for UnsafeTaskReceiver {
963 fn from_fn(f: |&mut Scheduler, Box<GreenTask>|) -> UnsafeTaskReceiver {
964 unsafe { cast::transmute(f) }
965 }
966 fn to_fn(self) -> |&mut Scheduler, Box<GreenTask>| {
967 unsafe { cast::transmute(self) }
968 }
969 }
970
971 // On unix, we read randomness straight from /dev/urandom, but the
972 // default constructor of an XorShiftRng does this via io::fs, which
973 // relies on the scheduler existing, so we have to manually load
974 // randomness. Windows has its own C API for this, so we don't need to
975 // worry there.
976 #[cfg(windows)]
977 fn new_sched_rng() -> XorShiftRng {
978 match XorShiftRng::new() {
979 Ok(r) => r,
980 Err(e) => {
981 rtabort!("sched: failed to create seeded RNG: {}", e)
982 }
983 }
984 }
985 #[cfg(unix)]
986 fn new_sched_rng() -> XorShiftRng {
987 use libc;
988 use std::mem;
989 use rand::SeedableRng;
990
991 let fd = "/dev/urandom".with_c_str(|name| {
992 unsafe { libc::open(name, libc::O_RDONLY, 0) }
993 });
994 if fd == -1 {
995 rtabort!("could not open /dev/urandom for reading.")
996 }
997
998 let mut seeds = [0u32, .. 4];
999 let size = mem::size_of_val(&seeds);
1000 loop {
1001 let nbytes = unsafe {
1002 libc::read(fd,
1003 seeds.as_mut_ptr() as *mut libc::c_void,
1004 size as libc::size_t)
1005 };
1006 rtassert!(nbytes as uint == size);
1007
1008 if !seeds.iter().all(|x| *x == 0) {
1009 break;
1010 }
1011 }
1012
1013 unsafe {libc::close(fd);}
1014
1015 SeedableRng::from_seed(seeds)
1016 }
1017
1018 #[cfg(test)]
1019 mod test {
1020 use rustuv;
1021
1022 use std::task::TaskOpts;
1023 use std::rt::task::Task;
1024 use std::rt::local::Local;
1025
1026 use {TaskState, PoolConfig, SchedPool};
1027 use basic;
1028 use sched::{TaskFromFriend, PinnedTask};
1029 use task::{GreenTask, HomeSched};
1030
1031 fn pool() -> SchedPool {
1032 SchedPool::new(PoolConfig {
1033 threads: 1,
1034 event_loop_factory: basic::event_loop,
1035 })
1036 }
1037
1038 fn run(f: proc():Send) {
1039 let mut pool = pool();
1040 pool.spawn(TaskOpts::new(), f);
1041 pool.shutdown();
1042 }
1043
1044 fn sched_id() -> uint {
1045 let mut task = Local::borrow(None::<Task>);
1046 match task.maybe_take_runtime::<GreenTask>() {
1047 Some(green) => {
1048 let ret = green.sched.get_ref().sched_id();
1049 task.put_runtime(green);
1050 return ret;
1051 }
1052 None => fail!()
1053 }
1054 }
1055
1056 #[test]
1057 fn trivial_run_in_newsched_task_test() {
1058 let mut task_ran = false;
1059 let task_ran_ptr: *mut bool = &mut task_ran;
1060 run(proc() {
1061 unsafe { *task_ran_ptr = true };
1062 rtdebug!("executed from the new scheduler")
1063 });
1064 assert!(task_ran);
1065 }
1066
1067 #[test]
1068 fn multiple_task_test() {
1069 let total = 10;
1070 let mut task_run_count = 0;
1071 let task_run_count_ptr: *mut uint = &mut task_run_count;
1072 // with only one thread this is safe to run in without worries of
1073 // contention.
1074 run(proc() {
1075 for _ in range(0u, total) {
1076 spawn(proc() {
1077 unsafe { *task_run_count_ptr = *task_run_count_ptr + 1};
1078 });
1079 }
1080 });
1081 assert!(task_run_count == total);
1082 }
1083
1084 #[test]
1085 fn multiple_task_nested_test() {
1086 let mut task_run_count = 0;
1087 let task_run_count_ptr: *mut uint = &mut task_run_count;
1088 run(proc() {
1089 spawn(proc() {
1090 unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
1091 spawn(proc() {
1092 unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
1093 spawn(proc() {
1094 unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 };
1095 })
1096 })
1097 })
1098 });
1099 assert!(task_run_count == 3);
1100 }
1101
1102 // A very simple test that confirms that a task executing on the
1103 // home scheduler notices that it is home.
1104 #[test]
1105 fn test_home_sched() {
1106 let mut pool = pool();
1107
1108 let (dtx, drx) = channel();
1109 {
1110 let (tx, rx) = channel();
1111 let mut handle1 = pool.spawn_sched();
1112 let mut handle2 = pool.spawn_sched();
1113
1114 handle1.send(TaskFromFriend(pool.task(TaskOpts::new(), proc() {
1115 tx.send(sched_id());
1116 })));
1117 let sched1_id = rx.recv();
1118
1119 let mut task = pool.task(TaskOpts::new(), proc() {
1120 assert_eq!(sched_id(), sched1_id);
1121 dtx.send(());
1122 });
1123 task.give_home(HomeSched(handle1));
1124 handle2.send(TaskFromFriend(task));
1125 }
1126 drx.recv();
1127
1128 pool.shutdown();
1129 }
1130
1131 // An advanced test that checks all four possible states that a
1132 // (task,sched) can be in regarding homes.
1133
1134 #[test]
1135 fn test_schedule_home_states() {
1136 use sleeper_list::SleeperList;
1137 use super::{Shutdown, Scheduler, SchedHandle};
1138 use std::unstable::run_in_bare_thread;
1139 use std::rt::thread::Thread;
1140 use std::sync::deque::BufferPool;
1141
1142 run_in_bare_thread(proc() {
1143 let sleepers = SleeperList::new();
1144 let mut pool = BufferPool::new();
1145 let (normal_worker, normal_stealer) = pool.deque();
1146 let (special_worker, special_stealer) = pool.deque();
1147 let queues = vec![normal_stealer, special_stealer];
1148 let (_p, state) = TaskState::new();
1149
1150 // Our normal scheduler
1151 let mut normal_sched = box Scheduler::new(
1152 1,
1153 basic::event_loop(),
1154 normal_worker,
1155 queues.clone(),
1156 sleepers.clone(),
1157 state.clone());
1158
1159 let normal_handle = normal_sched.make_handle();
1160 let friend_handle = normal_sched.make_handle();
1161
1162 // Our special scheduler
1163 let mut special_sched = box Scheduler::new_special(
1164 1,
1165 basic::event_loop(),
1166 special_worker,
1167 queues.clone(),
1168 sleepers.clone(),
1169 false,
1170 Some(friend_handle),
1171 state);
1172
1173 let special_handle = special_sched.make_handle();
1174
1175 let t1_handle = special_sched.make_handle();
1176 let t4_handle = special_sched.make_handle();
1177
1178 // Four test tasks:
1179 // 1) task is home on special
1180 // 2) task not homed, sched doesn't care
1181 // 3) task not homed, sched requeues
1182 // 4) task not home, send home
1183
1184 // Grab both the scheduler and the task from TLS and check if the
1185 // task is executing on an appropriate scheduler.
1186 fn on_appropriate_sched() -> bool {
1187 use task::{TypeGreen, TypeSched, HomeSched};
1188 let task = GreenTask::convert(Local::take());
1189 let sched_id = task.sched.get_ref().sched_id();
1190 let run_any = task.sched.get_ref().run_anything;
1191 let ret = match task.task_type {
1192 TypeGreen(Some(AnySched)) => {
1193 run_any
1194 }
1195 TypeGreen(Some(HomeSched(SchedHandle {
1196 sched_id: ref id,
1197 ..
1198 }))) => {
1199 *id == sched_id
1200 }
1201 TypeGreen(None) => { fail!("task without home"); }
1202 TypeSched => { fail!("expected green task"); }
1203 };
1204 task.put();
1205 ret
1206 }
1207
1208 let task1 = GreenTask::new_homed(&mut special_sched.stack_pool,
1209 None, HomeSched(t1_handle), proc() {
1210 rtassert!(on_appropriate_sched());
1211 });
1212
1213 let task2 = GreenTask::new(&mut normal_sched.stack_pool, None, proc() {
1214 rtassert!(on_appropriate_sched());
1215 });
1216
1217 let task3 = GreenTask::new(&mut normal_sched.stack_pool, None, proc() {
1218 rtassert!(on_appropriate_sched());
1219 });
1220
1221 let task4 = GreenTask::new_homed(&mut special_sched.stack_pool,
1222 None, HomeSched(t4_handle), proc() {
1223 rtassert!(on_appropriate_sched());
1224 });
1225
1226 // Signal from the special task that we are done.
1227 let (tx, rx) = channel::<()>();
1228
1229 fn run(next: Box<GreenTask>) {
1230 let mut task = GreenTask::convert(Local::take());
1231 let sched = task.sched.take_unwrap();
1232 sched.run_task(task, next)
1233 }
1234
1235 let normal_task = GreenTask::new(&mut normal_sched.stack_pool, None, proc() {
1236 run(task2);
1237 run(task4);
1238 rx.recv();
1239 let mut nh = normal_handle;
1240 nh.send(Shutdown);
1241 let mut sh = special_handle;
1242 sh.send(Shutdown);
1243 });
1244 normal_sched.enqueue_task(normal_task);
1245
1246 let special_task = GreenTask::new(&mut special_sched.stack_pool, None, proc() {
1247 run(task1);
1248 run(task3);
1249 tx.send(());
1250 });
1251 special_sched.enqueue_task(special_task);
1252
1253 let normal_sched = normal_sched;
1254 let normal_thread = Thread::start(proc() { normal_sched.bootstrap() });
1255
1256 let special_sched = special_sched;
1257 let special_thread = Thread::start(proc() { special_sched.bootstrap() });
1258
1259 normal_thread.join();
1260 special_thread.join();
1261 });
1262 }
1263
1264 //#[test]
1265 //fn test_stress_schedule_task_states() {
1266 // if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
1267 // let n = stress_factor() * 120;
1268 // for _ in range(0, n as int) {
1269 // test_schedule_home_states();
1270 // }
1271 //}
1272
1273 #[test]
1274 fn test_io_callback() {
1275 use std::io::timer;
1276
1277 let mut pool = SchedPool::new(PoolConfig {
1278 threads: 2,
1279 event_loop_factory: rustuv::event_loop,
1280 });
1281
1282 // This is a regression test that when there are no schedulable tasks in
1283 // the work queue, but we are performing I/O, that once we do put
1284 // something in the work queue again the scheduler picks it up and
1285 // doesn't exit before emptying the work queue
1286 pool.spawn(TaskOpts::new(), proc() {
1287 spawn(proc() {
1288 timer::sleep(10);
1289 });
1290 });
1291
1292 pool.shutdown();
1293 }
1294
1295 #[test]
1296 fn wakeup_across_scheds() {
1297 let (tx1, rx1) = channel();
1298 let (tx2, rx2) = channel();
1299
1300 let mut pool1 = pool();
1301 let mut pool2 = pool();
1302
1303 pool1.spawn(TaskOpts::new(), proc() {
1304 let id = sched_id();
1305 tx1.send(());
1306 rx2.recv();
1307 assert_eq!(id, sched_id());
1308 });
1309
1310 pool2.spawn(TaskOpts::new(), proc() {
1311 let id = sched_id();
1312 rx1.recv();
1313 assert_eq!(id, sched_id());
1314 tx2.send(());
1315 });
1316
1317 pool1.shutdown();
1318 pool2.shutdown();
1319 }
1320
1321 // A regression test that the final message is always handled.
1322 // Used to deadlock because Shutdown was never recvd.
1323 #[test]
1324 fn no_missed_messages() {
1325 let mut pool = pool();
1326
1327 let task = pool.task(TaskOpts::new(), proc()());
1328 pool.spawn_sched().send(TaskFromFriend(task));
1329
1330 pool.shutdown();
1331 }
1332
1333 #[test]
1334 fn multithreading() {
1335 run(proc() {
1336 let mut rxs = vec![];
1337 for _ in range(0, 10) {
1338 let (tx, rx) = channel();
1339 spawn(proc() {
1340 tx.send(());
1341 });
1342 rxs.push(rx);
1343 }
1344
1345 loop {
1346 match rxs.pop() {
1347 Some(rx) => rx.recv(),
1348 None => break,
1349 }
1350 }
1351 });
1352 }
1353
1354 #[test]
1355 fn thread_ring() {
1356 run(proc() {
1357 let (end_tx, end_rx) = channel();
1358
1359 let n_tasks = 10;
1360 let token = 2000;
1361
1362 let (tx1, mut rx) = channel();
1363 tx1.send((token, end_tx));
1364 let mut i = 2;
1365 while i <= n_tasks {
1366 let (tx, next_rx) = channel();
1367 let imm_i = i;
1368 let imm_rx = rx;
1369 spawn(proc() {
1370 roundtrip(imm_i, n_tasks, &imm_rx, &tx);
1371 });
1372 rx = next_rx;
1373 i += 1;
1374 }
1375 let rx = rx;
1376 spawn(proc() {
1377 roundtrip(1, n_tasks, &rx, &tx1);
1378 });
1379
1380 end_rx.recv();
1381 });
1382
1383 fn roundtrip(id: int, n_tasks: int,
1384 rx: &Receiver<(int, Sender<()>)>,
1385 tx: &Sender<(int, Sender<()>)>) {
1386 loop {
1387 match rx.recv() {
1388 (1, end_tx) => {
1389 debug!("{}\n", id);
1390 end_tx.send(());
1391 return;
1392 }
1393 (token, end_tx) => {
1394 debug!("thread: {} got token: {}", id, token);
1395 tx.send((token - 1, end_tx));
1396 if token <= n_tasks {
1397 return;
1398 }
1399 }
1400 }
1401 }
1402 }
1403 }
1404
1405 #[test]
1406 fn start_closure_dtor() {
1407 // Regression test that the `start` task entrypoint can
1408 // contain dtors that use task resources
1409 run(proc() {
1410 struct S { field: () }
1411
1412 impl Drop for S {
1413 fn drop(&mut self) {
1414 let _foo = box 0;
1415 }
1416 }
1417
1418 let s = S { field: () };
1419
1420 spawn(proc() {
1421 let _ss = &s;
1422 });
1423 });
1424 }
1425
1426 #[test]
1427 fn dont_starve_1() {
1428 let mut pool = SchedPool::new(PoolConfig {
1429 threads: 2, // this must be > 1
1430 event_loop_factory: basic::event_loop,
1431 });
1432 pool.spawn(TaskOpts::new(), proc() {
1433 let (tx, rx) = channel();
1434
1435 // This task should not be able to starve the sender;
1436 // The sender should get stolen to another thread.
1437 spawn(proc() {
1438 while rx.try_recv().is_err() { }
1439 });
1440
1441 tx.send(());
1442 });
1443 pool.shutdown();
1444 }
1445
1446 #[test]
1447 fn dont_starve_2() {
1448 run(proc() {
1449 let (tx1, rx1) = channel();
1450 let (tx2, _rx2) = channel();
1451
1452 // This task should not be able to starve the other task.
1453 // The sends should eventually yield.
1454 spawn(proc() {
1455 while rx1.try_recv().is_err() {
1456 tx2.send(());
1457 }
1458 });
1459
1460 tx1.send(());
1461 });
1462 }
1463
1464 // Regression test for a logic bug that would cause single-threaded
1465 // schedulers to sleep forever after yielding and stealing another task.
1466 #[test]
1467 fn single_threaded_yield() {
1468 use std::task::deschedule;
1469 run(proc() {
1470 for _ in range(0, 5) { deschedule(); }
1471 });
1472 }
1473
1474 #[test]
1475 fn test_spawn_sched_blocking() {
1476 use std::unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
1477 static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
1478
1479 // Testing that a task in one scheduler can block in foreign code
1480 // without affecting other schedulers
1481 for _ in range(0, 20) {
1482 let mut pool = pool();
1483 let (start_tx, start_rx) = channel();
1484 let (fin_tx, fin_rx) = channel();
1485
1486 let mut handle = pool.spawn_sched();
1487 handle.send(PinnedTask(pool.task(TaskOpts::new(), proc() {
1488 unsafe {
1489 let guard = LOCK.lock();
1490
1491 start_tx.send(());
1492 guard.wait(); // block the scheduler thread
1493 guard.signal(); // let them know we have the lock
1494 }
1495
1496 fin_tx.send(());
1497 })));
1498 drop(handle);
1499
1500 let mut handle = pool.spawn_sched();
1501 handle.send(PinnedTask(pool.task(TaskOpts::new(), proc() {
1502 // Wait until the other task has its lock
1503 start_rx.recv();
1504
1505 fn pingpong(po: &Receiver<int>, ch: &Sender<int>) {
1506 let mut val = 20;
1507 while val > 0 {
1508 val = po.recv();
1509 let _ = ch.send_opt(val - 1);
1510 }
1511 }
1512
1513 let (setup_tx, setup_rx) = channel();
1514 let (parent_tx, parent_rx) = channel();
1515 spawn(proc() {
1516 let (child_tx, child_rx) = channel();
1517 setup_tx.send(child_tx);
1518 pingpong(&child_rx, &parent_tx);
1519 });
1520
1521 let child_tx = setup_rx.recv();
1522 child_tx.send(20);
1523 pingpong(&parent_rx, &child_tx);
1524 unsafe {
1525 let guard = LOCK.lock();
1526 guard.signal(); // wakeup waiting scheduler
1527 guard.wait(); // wait for them to grab the lock
1528 }
1529 })));
1530 drop(handle);
1531
1532 fin_rx.recv();
1533 pool.shutdown();
1534 }
1535 unsafe { LOCK.destroy(); }
1536 }
1537 }
libgreen/sched.rs:37:48-37:48 -struct- definition:
/// in too much allocation and too many events.
pub struct Scheduler {
/// ID number of the pool that this scheduler is a member of. When
references:- 27148: let (consumer, producer) = msgq::queue();
149: let mut sched = Scheduler {
150: pool_id: pool_id,
--
735: cur: Box<GreenTask>,
736: f: |&mut Scheduler, BlockedTask|) {
737: // Trickier - we need to get the scheduler task out of self
--
958: trait ClosureConverter {
959: fn from_fn(|&mut Scheduler, Box<GreenTask>|) -> Self;
960: fn to_fn(self) -> |&mut Scheduler, Box<GreenTask>|;
--
962: impl ClosureConverter for UnsafeTaskReceiver {
963: fn from_fn(f: |&mut Scheduler, Box<GreenTask>|) -> UnsafeTaskReceiver {
964: unsafe { cast::transmute(f) }
libgreen/task.rs:
279: pub fn put_with_sched(mut ~self, sched: Box<Scheduler>) {
280: assert!(self.sched.is_none());
libgreen/sched.rs:
965: }
966: fn to_fn(self) -> |&mut Scheduler, Box<GreenTask>| {
967: unsafe { cast::transmute(self) }
libgreen/sched.rs:935:1-935:1 -struct- definition:
struct CleanupJob {
task: Box<GreenTask>,
f: UnsafeTaskReceiver
references:- 6942: pub fn new(task: Box<GreenTask>, f: UnsafeTaskReceiver) -> CleanupJob {
943: CleanupJob {
944: task: task,
--
949: pub fn run(self, sched: &mut Scheduler) {
950: let CleanupJob { task: task, f: f } = self;
951: f.to_fn()(sched, task)
libgreen/sched.rs:894:1-894:1 -enum- definition:
pub enum SchedMessage {
Wake,
Shutdown,
references:- 4910: impl SchedHandle {
911: pub fn send(&mut self, msg: SchedMessage) {
912: self.queue.push(msg);
libgreen/sched.rs:956:15-956:15 -NK_AS_STR_TODO- definition:
// complaining
type UnsafeTaskReceiver = raw::Closure;
trait ClosureConverter {
references:- 4941: impl CleanupJob {
942: pub fn new(task: Box<GreenTask>, f: UnsafeTaskReceiver) -> CleanupJob {
943: CleanupJob {
--
961: }
962: impl ClosureConverter for UnsafeTaskReceiver {
963: fn from_fn(f: |&mut Scheduler, Box<GreenTask>|) -> UnsafeTaskReceiver {
964: unsafe { cast::transmute(f) }
libgreen/sched.rs:108:16-108:16 -enum- definition:
enum EffortLevel {
DontTryTooHard,
GiveItYourBest
references:- 4107: /// mainly being whether memory is synchronized or not
109: enum EffortLevel {
--
352: fn interpret_message_queue(mut ~self, stask: Box<GreenTask>,
353: effort: EffortLevel)
354: -> (Box<Scheduler>, Box<GreenTask>, bool)
libgreen/sched.rs:957:40-957:40 -trait- definition:
type UnsafeTaskReceiver = raw::Closure;
trait ClosureConverter {
fn from_fn(|&mut Scheduler, Box<GreenTask>|) -> Self;
references:- 2961: }
962: impl ClosureConverter for UnsafeTaskReceiver {
963: fn from_fn(f: |&mut Scheduler, Box<GreenTask>|) -> UnsafeTaskReceiver {
libgreen/sched.rs:903:1-903:1 -struct- definition:
pub struct SchedHandle {
remote: Box<RemoteCallback:Send>,
queue: msgq::Producer<SchedMessage>,
references:- 16882: return SchedHandle {
883: remote: remote,
--
910: impl SchedHandle {
911: pub fn send(&mut self, msg: SchedMessage) {
libgreen/sleeper_list.rs:
31: pub fn pop(&mut self) -> Option<SchedHandle> {
32: self.q.pop()
--
35: pub fn casual_pop(&mut self) -> Option<SchedHandle> {
36: self.q.pop()
libgreen/task.rs:
79: AnySched,
80: HomeSched(SchedHandle),
81: }
--
327: let mtx = &mut self.nasty_deschedule_lock as *mut NativeMutex;
328: let handle = self.handle.get_mut_ref() as *mut SchedHandle;
329: let _guard = (*mtx).lock();
libgreen/lib.rs:
477: /// other schedulers currently in the scheduler pool.
478: pub fn spawn_sched(&mut self) -> SchedHandle {
479: let (worker, stealer) = self.deque_pool.deque();
libgreen/sleeper_list.rs:
18: pub struct SleeperList {
19: q: Queue<SchedHandle>,
20: }
libgreen/sched.rs:115:1-115:1 -fn- definition:
fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
let r: uint = Rand::rand(rng);
r % MAX_YIELD_CHECKS + 1
references:- 2831: } else {
832: self.yield_check_count = reset_yield_check(&mut self.rng);
833: // Tell the scheduler to start stealing on the next iteration