(index<- ) ./libgreen/lib.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! The "green scheduling" library
12 //!
13 //! This library provides M:N threading for rust programs. Internally this has
14 //! the implementation of a green scheduler along with context switching and a
15 //! stack-allocation strategy. This can be optionally linked in to rust
16 //! programs in order to provide M:N functionality inside of 1:1 programs.
17 //!
18 //! # Architecture
19 //!
20 //! An M:N scheduling library implies that there are N OS thread upon which M
21 //! "green threads" are multiplexed. In other words, a set of green threads are
22 //! all run inside a pool of OS threads.
23 //!
24 //! With this design, you can achieve _concurrency_ by spawning many green
25 //! threads, and you can achieve _parallelism_ by running the green threads
26 //! simultaneously on multiple OS threads. Each OS thread is a candidate for
27 //! being scheduled on a different core (the source of parallelism), and then
28 //! all of the green threads cooperatively schedule amongst one another (the
29 //! source of concurrency).
30 //!
31 //! ## Schedulers
32 //!
33 //! In order to coordinate among green threads, each OS thread is primarily
34 //! running something which we call a Scheduler. Whenever a reference to a
35 //! Scheduler is made, it is synonymous to referencing one OS thread. Each
36 //! scheduler is bound to one and exactly one OS thread, and the thread that it
37 //! is bound to never changes.
38 //!
39 //! Each scheduler is connected to a pool of other schedulers (a `SchedPool`)
40 //! which is the thread pool term from above. A pool of schedulers all share the
41 //! work that they create. Furthermore, whenever a green thread is created (also
42 //! synonymously referred to as a green task), it is associated with a
43 //! `SchedPool` forevermore. A green thread cannot leave its scheduler pool.
44 //!
45 //! Schedulers can have at most one green thread running on them at a time. When
46 //! a scheduler is asleep on its event loop, there are no green tasks running on
47 //! the OS thread or the scheduler. The term "context switch" is used for when
48 //! the running green thread is swapped out, but this simply changes the one
49 //! green thread which is running on the scheduler.
50 //!
51 //! ## Green Threads
52 //!
53 //! A green thread can largely be summarized by a stack and a register context.
54 //! Whenever a green thread is spawned, it allocates a stack, and then prepares
55 //! a register context for execution. The green task may be executed across
56 //! multiple OS threads, but it will always use the same stack and it will carry
57 //! its register context across OS threads.
58 //!
59 //! Each green thread is cooperatively scheduled with other green threads.
60 //! Primarily, this means that there is no pre-emption of a green thread. The
61 //! major consequence of this design is that a green thread stuck in an infinite
62 //! loop will prevent all other green threads from running on that particular
63 //! scheduler.
64 //!
65 //! Scheduling events for green threads occur on communication and I/O
66 //! boundaries. For example, if a green task blocks waiting for a message on a
67 //! channel some other green thread can now run on the scheduler. This also has
68 //! the consequence that until a green thread performs any form of scheduling
69 //! event, it will be running on the same OS thread (unconditionally).
70 //!
71 //! ## Work Stealing
72 //!
73 //! With a pool of schedulers, a new green task has a number of options when
74 //! deciding where to run initially. The current implementation uses a concept
75 //! called work stealing in order to spread out work among schedulers.
76 //!
77 //! In a work-stealing model, each scheduler maintains a local queue of tasks to
78 //! run, and this queue is stolen from by other schedulers. Implementation-wise,
79 //! work stealing has some hairy parts, but from a user-perspective, work
80 //! stealing simply implies what with M green threads and N schedulers where
81 //! M > N it is very likely that all schedulers will be busy executing work.
82 //!
83 //! # Considerations when using libgreen
84 //!
85 //! An M:N runtime has both pros and cons, and there is no one answer as to
86 //! whether M:N or 1:1 is appropriate to use. As always, there are many
87 //! advantages and disadvantages between the two. Regardless of the workload,
88 //! however, there are some aspects of using green thread which you should be
89 //! aware of:
90 //!
91 //! * The largest concern when using libgreen is interoperating with native
92 //! code. Care should be taken when calling native code that will block the OS
93 //! thread as it will prevent further green tasks from being scheduled on the
94 //! OS thread.
95 //!
96 //! * Native code using thread-local-storage should be approached
97 //! with care. Green threads may migrate among OS threads at any time, so
98 //! native libraries using thread-local state may not always work.
99 //!
100 //! * Native synchronization primitives (e.g. pthread mutexes) will also not
101 //! work for green threads. The reason for this is because native primitives
102 //! often operate on a _os thread_ granularity whereas green threads are
103 //! operating on a more granular unit of work.
104 //!
105 //! * A green threading runtime is not fork-safe. If the process forks(), it
106 //! cannot expect to make reasonable progress by continuing to use green
107 //! threads.
108 //!
109 //! Note that these concerns do not mean that operating with native code is a
110 //! lost cause. These are simply just concerns which should be considered when
111 //! invoking native code.
112 //!
113 //! # Starting with libgreen
114 //!
115 //! ```rust
116 //! extern crate green;
117 //!
118 //! #[start]
119 //! fn start(argc: int, argv: **u8) -> int {
120 //! green::start(argc, argv, green::basic::event_loop, main)
121 //! }
122 //!
123 //! fn main() {
124 //! // this code is running in a pool of schedulers
125 //! }
126 //! ```
127 //!
128 //! > **Note**: This `main` funciton in this example does *not* have I/O
129 //! > support. The basic event loop does not provide any support
130 //!
131 //! # Starting with I/O support in libgreen
132 //!
133 //! ```rust
134 //! extern crate green;
135 //! extern crate rustuv;
136 //!
137 //! #[start]
138 //! fn start(argc: int, argv: **u8) -> int {
139 //! green::start(argc, argv, rustuv::event_loop, main)
140 //! }
141 //!
142 //! fn main() {
143 //! // this code is running in a pool of schedulers all powered by libuv
144 //! }
145 //! ```
146 //!
147 //! The above code can also be shortened with a macro from libgreen.
148 //!
149 //! ```
150 //! #![feature(phase)]
151 //! #[phase(syntax)] extern crate green;
152 //!
153 //! green_start!(main)
154 //!
155 //! fn main() {
156 //! // run inside of a green pool
157 //! }
158 //! ```
159 //!
160 //! # Using a scheduler pool
161 //!
162 //! ```rust
163 //! use std::task::TaskOpts;
164 //! use green::{SchedPool, PoolConfig};
165 //! use green::sched::{PinnedTask, TaskFromFriend};
166 //!
167 //! let config = PoolConfig::new();
168 //! let mut pool = SchedPool::new(config);
169 //!
170 //! // Spawn tasks into the pool of schedulers
171 //! pool.spawn(TaskOpts::new(), proc() {
172 //! // this code is running inside the pool of schedulers
173 //!
174 //! spawn(proc() {
175 //! // this code is also running inside the same scheduler pool
176 //! });
177 //! });
178 //!
179 //! // Dynamically add a new scheduler to the scheduler pool. This adds another
180 //! // OS thread that green threads can be multiplexed on to.
181 //! let mut handle = pool.spawn_sched();
182 //!
183 //! // Pin a task to the spawned scheduler
184 //! let task = pool.task(TaskOpts::new(), proc() { /* ... */ });
185 //! handle.send(PinnedTask(task));
186 //!
187 //! // Schedule a task on this new scheduler
188 //! let task = pool.task(TaskOpts::new(), proc() { /* ... */ });
189 //! handle.send(TaskFromFriend(task));
190 //!
191 //! // Handles keep schedulers alive, so be sure to drop all handles before
192 //! // destroying the sched pool
193 //! drop(handle);
194 //!
195 //! // Required to shut down this scheduler pool.
196 //! // The task will fail if `shutdown` is not called.
197 //! pool.shutdown();
198 //! ```
199
200 #![crate_id = "green#0.11-pre"]
201 #![license = "MIT/ASL2"]
202 #![crate_type = "rlib"]
203 #![crate_type = "dylib"]
204 #![doc(html_logo_url = "http://www.rust-lang.org/logos/rust-logo-128x128-blk-v2.png",
205 html_favicon_url = "http://www.rust-lang.org/favicon.ico",
206 html_root_url = "http://static.rust-lang.org/doc/master")]
207
208 // NB this does *not* include globs, please keep it that way.
209 #![feature(macro_rules, phase)]
210 #![allow(visible_private_types)]
211 #![deny(deprecated_owned_vector)]
212
213 #[cfg(test)] #[phase(syntax, link)] extern crate log;
214 #[cfg(test)] extern crate rustuv;
215 extern crate rand;
216 extern crate libc;
217
218 use std::mem::replace;
219 use std::os;
220 use std::rt::rtio;
221 use std::rt::thread::Thread;
222 use std::rt;
223 use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
224 use std::sync::deque;
225 use std::task::TaskOpts;
226 use std::sync::arc::UnsafeArc;
227
228 use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
229 use sleeper_list::SleeperList;
230 use stack::StackPool;
231 use task::GreenTask;
232
233 mod macros;
234 mod simple;
235 mod message_queue;
236
237 pub mod basic;
238 pub mod context;
239 pub mod coroutine;
240 pub mod sched;
241 pub mod sleeper_list;
242 pub mod stack;
243 pub mod task;
244
245 /// A helper macro for booting a program with libgreen
246 ///
247 /// # Example
248 ///
249 /// ```
250 /// #![feature(phase)]
251 /// #[phase(syntax)] extern crate green;
252 ///
253 /// green_start!(main)
254 ///
255 /// fn main() {
256 /// // running with libgreen
257 /// }
258 /// ```
259 #[macro_export]
260 macro_rules! green_start( ($f:ident) => (
261 mod __start {
262 extern crate green;
263 extern crate rustuv;
264
265 #[start]
266 fn start(argc: int, argv: **u8) -> int {
267 green::start(argc, argv, rustuv::event_loop, super::$f)
268 }
269 }
270 ) )
271
272 /// Set up a default runtime configuration, given compiler-supplied arguments.
273 ///
274 /// This function will block until the entire pool of M:N schedulers have
275 /// exited. This function also requires a local task to be available.
276 ///
277 /// # Arguments
278 ///
279 /// * `argc` & `argv` - The argument vector. On Unix this information is used
280 /// by os::args.
281 /// * `main` - The initial procedure to run inside of the M:N scheduling pool.
282 /// Once this procedure exits, the scheduling pool will begin to shut
283 /// down. The entire pool (and this function) will only return once
284 /// all child tasks have finished executing.
285 ///
286 /// # Return value
287 ///
288 /// The return value is used as the process return code. 0 on success, 101 on
289 /// error.
290 pub fn start(argc: int, argv: **u8,
291 event_loop_factory: fn() -> Box<rtio::EventLoop:Send>,
292 main: proc():Send) -> int {
293 rt::init(argc, argv);
294 let mut main = Some(main);
295 let mut ret = None;
296 simple::task().run(|| {
297 ret = Some(run(event_loop_factory, main.take_unwrap()));
298 });
299 // unsafe is ok b/c we're sure that the runtime is gone
300 unsafe { rt::cleanup() }
301 ret.unwrap()
302 }
303
304 /// Execute the main function in a pool of M:N schedulers.
305 ///
306 /// Configures the runtime according to the environment, by default using a task
307 /// scheduler with the same number of threads as cores. Returns a process exit
308 /// code.
309 ///
310 /// This function will not return until all schedulers in the associated pool
311 /// have returned.
312 pub fn run(event_loop_factory: fn() -> Box<rtio::EventLoop:Send>,
313 main: proc():Send) -> int {
314 // Create a scheduler pool and spawn the main task into this pool. We will
315 // get notified over a channel when the main task exits.
316 let mut cfg = PoolConfig::new();
317 cfg.event_loop_factory = event_loop_factory;
318 let mut pool = SchedPool::new(cfg);
319 let (tx, rx) = channel();
320 let mut opts = TaskOpts::new();
321 opts.notify_chan = Some(tx);
322 opts.name = Some("<main>".into_maybe_owned());
323 pool.spawn(opts, main);
324
325 // Wait for the main task to return, and set the process error code
326 // appropriately.
327 if rx.recv().is_err() {
328 os::set_exit_status(rt::DEFAULT_ERROR_CODE);
329 }
330
331 // Now that we're sure all tasks are dead, shut down the pool of schedulers,
332 // waiting for them all to return.
333 pool.shutdown();
334 os::get_exit_status()
335 }
336
337 /// Configuration of how an M:N pool of schedulers is spawned.
338 pub struct PoolConfig {
339 /// The number of schedulers (OS threads) to spawn into this M:N pool.
340 pub threads: uint,
341 /// A factory function used to create new event loops. If this is not
342 /// specified then the default event loop factory is used.
343 pub event_loop_factory: fn() -> Box<rtio::EventLoop:Send>,
344 }
345
346 impl PoolConfig {
347 /// Returns the default configuration, as determined the environment
348 /// variables of this process.
349 pub fn new() -> PoolConfig {
350 PoolConfig {
351 threads: rt::default_sched_threads(),
352 event_loop_factory: basic::event_loop,
353 }
354 }
355 }
356
357 /// A structure representing a handle to a pool of schedulers. This handle is
358 /// used to keep the pool alive and also reap the status from the pool.
359 pub struct SchedPool {
360 id: uint,
361 threads: Vec<Thread<()>>,
362 handles: Vec<SchedHandle>,
363 stealers: Vec<deque::Stealer<Box<task::GreenTask>>>,
364 next_friend: uint,
365 stack_pool: StackPool,
366 deque_pool: deque::BufferPool<Box<task::GreenTask>>,
367 sleepers: SleeperList,
368 factory: fn() -> Box<rtio::EventLoop:Send>,
369 task_state: TaskState,
370 tasks_done: Receiver<()>,
371 }
372
373 /// This is an internal state shared among a pool of schedulers. This is used to
374 /// keep track of how many tasks are currently running in the pool and then
375 /// sending on a channel once the entire pool has been drained of all tasks.
376 #[deriving(Clone)]
377 struct TaskState {
378 cnt: UnsafeArc<AtomicUint>,
379 done: Sender<()>,
380 }
381
382 impl SchedPool {
383 /// Execute the main function in a pool of M:N schedulers.
384 ///
385 /// This will configure the pool according to the `config` parameter, and
386 /// initially run `main` inside the pool of schedulers.
387 pub fn new(config: PoolConfig) -> SchedPool {
388 static mut POOL_ID: AtomicUint = INIT_ATOMIC_UINT;
389
390 let PoolConfig {
391 threads: nscheds,
392 event_loop_factory: factory
393 } = config;
394 assert!(nscheds > 0);
395
396 // The pool of schedulers that will be returned from this function
397 let (p, state) = TaskState::new();
398 let mut pool = SchedPool {
399 threads: vec![],
400 handles: vec![],
401 stealers: vec![],
402 id: unsafe { POOL_ID.fetch_add(1, SeqCst) },
403 sleepers: SleeperList::new(),
404 stack_pool: StackPool::new(),
405 deque_pool: deque::BufferPool::new(),
406 next_friend: 0,
407 factory: factory,
408 task_state: state,
409 tasks_done: p,
410 };
411
412 // Create a work queue for each scheduler, ntimes. Create an extra
413 // for the main thread if that flag is set. We won't steal from it.
414 let mut workers = Vec::with_capacity(nscheds);
415 let mut stealers = Vec::with_capacity(nscheds);
416
417 for _ in range(0, nscheds) {
418 let (w, s) = pool.deque_pool.deque();
419 workers.push(w);
420 stealers.push(s);
421 }
422 pool.stealers = stealers;
423
424 // Now that we've got all our work queues, create one scheduler per
425 // queue, spawn the scheduler into a thread, and be sure to keep a
426 // handle to the scheduler and the thread to keep them alive.
427 for worker in workers.move_iter() {
428 rtdebug!("inserting a regular scheduler");
429
430 let mut sched = box Scheduler::new(pool.id,
431 (pool.factory)(),
432 worker,
433 pool.stealers.clone(),
434 pool.sleepers.clone(),
435 pool.task_state.clone());
436 pool.handles.push(sched.make_handle());
437 let sched = sched;
438 pool.threads.push(Thread::start(proc() { sched.bootstrap(); }));
439 }
440
441 return pool;
442 }
443
444 /// Creates a new task configured to run inside of this pool of schedulers.
445 /// This is useful to create a task which can then be sent to a specific
446 /// scheduler created by `spawn_sched` (and possibly pin it to that
447 /// scheduler).
448 pub fn task(&mut self, opts: TaskOpts, f: proc():Send) -> Box<GreenTask> {
449 GreenTask::configure(&mut self.stack_pool, opts, f)
450 }
451
452 /// Spawns a new task into this pool of schedulers, using the specified
453 /// options to configure the new task which is spawned.
454 ///
455 /// New tasks are spawned in a round-robin fashion to the schedulers in this
456 /// pool, but tasks can certainly migrate among schedulers once they're in
457 /// the pool.
458 pub fn spawn(&mut self, opts: TaskOpts, f: proc():Send) {
459 let task = self.task(opts, f);
460
461 // Figure out someone to send this task to
462 let idx = self.next_friend;
463 self.next_friend += 1;
464 if self.next_friend >= self.handles.len() {
465 self.next_friend = 0;
466 }
467
468 // Jettison the task away!
469 self.handles.get_mut(idx).send(TaskFromFriend(task));
470 }
471
472 /// Spawns a new scheduler into this M:N pool. A handle is returned to the
473 /// scheduler for use. The scheduler will not exit as long as this handle is
474 /// active.
475 ///
476 /// The scheduler spawned will participate in work stealing with all of the
477 /// other schedulers currently in the scheduler pool.
478 pub fn spawn_sched(&mut self) -> SchedHandle {
479 let (worker, stealer) = self.deque_pool.deque();
480 self.stealers.push(stealer.clone());
481
482 // Tell all existing schedulers about this new scheduler so they can all
483 // steal work from it
484 for handle in self.handles.mut_iter() {
485 handle.send(NewNeighbor(stealer.clone()));
486 }
487
488 // Create the new scheduler, using the same sleeper list as all the
489 // other schedulers as well as having a stealer handle to all other
490 // schedulers.
491 let mut sched = box Scheduler::new(self.id,
492 (self.factory)(),
493 worker,
494 self.stealers.clone(),
495 self.sleepers.clone(),
496 self.task_state.clone());
497 let ret = sched.make_handle();
498 self.handles.push(sched.make_handle());
499 let sched = sched;
500 self.threads.push(Thread::start(proc() { sched.bootstrap() }));
501
502 return ret;
503 }
504
505 /// Consumes the pool of schedulers, waiting for all tasks to exit and all
506 /// schedulers to shut down.
507 ///
508 /// This function is required to be called in order to drop a pool of
509 /// schedulers, it is considered an error to drop a pool without calling
510 /// this method.
511 ///
512 /// This only waits for all tasks in *this pool* of schedulers to exit, any
513 /// native tasks or extern pools will not be waited on
514 pub fn shutdown(mut self) {
515 self.stealers = vec![];
516
517 // Wait for everyone to exit. We may have reached a 0-task count
518 // multiple times in the past, meaning there could be several buffered
519 // messages on the `tasks_done` port. We're guaranteed that after *some*
520 // message the current task count will be 0, so we just receive in a
521 // loop until everything is totally dead.
522 while self.task_state.active() {
523 self.tasks_done.recv();
524 }
525
526 // Now that everyone's gone, tell everything to shut down.
527 for mut handle in replace(&mut self.handles, vec![]).move_iter() {
528 handle.send(Shutdown);
529 }
530 for thread in replace(&mut self.threads, vec![]).move_iter() {
531 thread.join();
532 }
533 }
534 }
535
536 impl TaskState {
537 fn new() -> (Receiver<()>, TaskState) {
538 let (tx, rx) = channel();
539 (rx, TaskState {
540 cnt: UnsafeArc::new(AtomicUint::new(0)),
541 done: tx,
542 })
543 }
544
545 fn increment(&mut self) {
546 unsafe { (*self.cnt.get()).fetch_add(1, SeqCst); }
547 }
548
549 fn active(&self) -> bool {
550 unsafe { (*self.cnt.get()).load(SeqCst) != 0 }
551 }
552
553 fn decrement(&mut self) {
554 let prev = unsafe { (*self.cnt.get()).fetch_sub(1, SeqCst) };
555 if prev == 1 {
556 self.done.send(());
557 }
558 }
559 }
560
561 impl Drop for SchedPool {
562 fn drop(&mut self) {
563 if self.threads.len() > 0 {
564 fail!("dropping a M:N scheduler pool that wasn't shut down");
565 }
566 }
567 }
libgreen/lib.rs:358:72-358:72 -struct- definition:
/// used to keep the pool alive and also reap the status from the pool.
pub struct SchedPool {
id: uint,
references:- 4397: let (p, state) = TaskState::new();
398: let mut pool = SchedPool {
399: threads: vec![],
--
561: impl Drop for SchedPool {
562: fn drop(&mut self) {
libgreen/lib.rs:376:19-376:19 -struct- definition:
struct TaskState {
cnt: UnsafeArc<AtomicUint>,
done: Sender<()>,
references:- 11375: /// sending on a channel once the entire pool has been drained of all tasks.
377: struct TaskState {
--
538: let (tx, rx) = channel();
539: (rx, TaskState {
540: cnt: UnsafeArc::new(AtomicUint::new(0)),
libgreen/sched.rs:
129: sleeper_list: SleeperList,
130: state: TaskState)
131: -> Scheduler {
--
144: friend: Option<SchedHandle>,
145: state: TaskState)
146: -> Scheduler {
libgreen/lib.rs:
375: /// sending on a channel once the entire pool has been drained of all tasks.
377: struct TaskState {
libgreen/lib.rs:337:63-337:63 -struct- definition:
/// Configuration of how an M:N pool of schedulers is spawned.
pub struct PoolConfig {
/// The number of schedulers (OS threads) to spawn into this M:N pool.
references:- 5349: pub fn new() -> PoolConfig {
350: PoolConfig {
351: threads: rt::default_sched_threads(),
--
390: let PoolConfig {
391: threads: nscheds,