(index<- )        ./libgreen/lib.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! The "green scheduling" library
  12  //!
  13  //! This library provides M:N threading for rust programs. Internally this has
  14  //! the implementation of a green scheduler along with context switching and a
  15  //! stack-allocation strategy. This can be optionally linked in to rust
  16  //! programs in order to provide M:N functionality inside of 1:1 programs.
  17  //!
  18  //! # Architecture
  19  //!
  20  //! An M:N scheduling library implies that there are N OS thread upon which M
  21  //! "green threads" are multiplexed. In other words, a set of green threads are
  22  //! all run inside a pool of OS threads.
  23  //!
  24  //! With this design, you can achieve _concurrency_ by spawning many green
  25  //! threads, and you can achieve _parallelism_ by running the green threads
  26  //! simultaneously on multiple OS threads. Each OS thread is a candidate for
  27  //! being scheduled on a different core (the source of parallelism), and then
  28  //! all of the green threads cooperatively schedule amongst one another (the
  29  //! source of concurrency).
  30  //!
  31  //! ## Schedulers
  32  //!
  33  //! In order to coordinate among green threads, each OS thread is primarily
  34  //! running something which we call a Scheduler. Whenever a reference to a
  35  //! Scheduler is made, it is synonymous to referencing one OS thread. Each
  36  //! scheduler is bound to one and exactly one OS thread, and the thread that it
  37  //! is bound to never changes.
  38  //!
  39  //! Each scheduler is connected to a pool of other schedulers (a `SchedPool`)
  40  //! which is the thread pool term from above. A pool of schedulers all share the
  41  //! work that they create. Furthermore, whenever a green thread is created (also
  42  //! synonymously referred to as a green task), it is associated with a
  43  //! `SchedPool` forevermore. A green thread cannot leave its scheduler pool.
  44  //!
  45  //! Schedulers can have at most one green thread running on them at a time. When
  46  //! a scheduler is asleep on its event loop, there are no green tasks running on
  47  //! the OS thread or the scheduler. The term "context switch" is used for when
  48  //! the running green thread is swapped out, but this simply changes the one
  49  //! green thread which is running on the scheduler.
  50  //!
  51  //! ## Green Threads
  52  //!
  53  //! A green thread can largely be summarized by a stack and a register context.
  54  //! Whenever a green thread is spawned, it allocates a stack, and then prepares
  55  //! a register context for execution. The green task may be executed across
  56  //! multiple OS threads, but it will always use the same stack and it will carry
  57  //! its register context across OS threads.
  58  //!
  59  //! Each green thread is cooperatively scheduled with other green threads.
  60  //! Primarily, this means that there is no pre-emption of a green thread. The
  61  //! major consequence of this design is that a green thread stuck in an infinite
  62  //! loop will prevent all other green threads from running on that particular
  63  //! scheduler.
  64  //!
  65  //! Scheduling events for green threads occur on communication and I/O
  66  //! boundaries. For example, if a green task blocks waiting for a message on a
  67  //! channel some other green thread can now run on the scheduler. This also has
  68  //! the consequence that until a green thread performs any form of scheduling
  69  //! event, it will be running on the same OS thread (unconditionally).
  70  //!
  71  //! ## Work Stealing
  72  //!
  73  //! With a pool of schedulers, a new green task has a number of options when
  74  //! deciding where to run initially. The current implementation uses a concept
  75  //! called work stealing in order to spread out work among schedulers.
  76  //!
  77  //! In a work-stealing model, each scheduler maintains a local queue of tasks to
  78  //! run, and this queue is stolen from by other schedulers. Implementation-wise,
  79  //! work stealing has some hairy parts, but from a user-perspective, work
  80  //! stealing simply implies what with M green threads and N schedulers where
  81  //! M > N it is very likely that all schedulers will be busy executing work.
  82  //!
  83  //! # Considerations when using libgreen
  84  //!
  85  //! An M:N runtime has both pros and cons, and there is no one answer as to
  86  //! whether M:N or 1:1 is appropriate to use. As always, there are many
  87  //! advantages and disadvantages between the two. Regardless of the workload,
  88  //! however, there are some aspects of using green thread which you should be
  89  //! aware of:
  90  //!
  91  //! * The largest concern when using libgreen is interoperating with native
  92  //!   code. Care should be taken when calling native code that will block the OS
  93  //!   thread as it will prevent further green tasks from being scheduled on the
  94  //!   OS thread.
  95  //!
  96  //! * Native code using thread-local-storage should be approached
  97  //!   with care. Green threads may migrate among OS threads at any time, so
  98  //!   native libraries using thread-local state may not always work.
  99  //!
 100  //! * Native synchronization primitives (e.g. pthread mutexes) will also not
 101  //!   work for green threads. The reason for this is because native primitives
 102  //!   often operate on a _os thread_ granularity whereas green threads are
 103  //!   operating on a more granular unit of work.
 104  //!
 105  //! * A green threading runtime is not fork-safe. If the process forks(), it
 106  //!   cannot expect to make reasonable progress by continuing to use green
 107  //!   threads.
 108  //!
 109  //! Note that these concerns do not mean that operating with native code is a
 110  //! lost cause. These are simply just concerns which should be considered when
 111  //! invoking native code.
 112  //!
 113  //! # Starting with libgreen
 114  //!
 115  //! ```rust
 116  //! extern crate green;
 117  //!
 118  //! #[start]
 119  //! fn start(argc: int, argv: **u8) -> int {
 120  //!     green::start(argc, argv, green::basic::event_loop, main)
 121  //! }
 122  //!
 123  //! fn main() {
 124  //!     // this code is running in a pool of schedulers
 125  //! }
 126  //! ```
 127  //!
 128  //! > **Note**: This `main` funciton in this example does *not* have I/O
 129  //! >           support. The basic event loop does not provide any support
 130  //!
 131  //! # Starting with I/O support in libgreen
 132  //!
 133  //! ```rust
 134  //! extern crate green;
 135  //! extern crate rustuv;
 136  //!
 137  //! #[start]
 138  //! fn start(argc: int, argv: **u8) -> int {
 139  //!     green::start(argc, argv, rustuv::event_loop, main)
 140  //! }
 141  //!
 142  //! fn main() {
 143  //!     // this code is running in a pool of schedulers all powered by libuv
 144  //! }
 145  //! ```
 146  //!
 147  //! The above code can also be shortened with a macro from libgreen.
 148  //!
 149  //! ```
 150  //! #![feature(phase)]
 151  //! #[phase(syntax)] extern crate green;
 152  //!
 153  //! green_start!(main)
 154  //!
 155  //! fn main() {
 156  //!     // run inside of a green pool
 157  //! }
 158  //! ```
 159  //!
 160  //! # Using a scheduler pool
 161  //!
 162  //! ```rust
 163  //! use std::task::TaskOpts;
 164  //! use green::{SchedPool, PoolConfig};
 165  //! use green::sched::{PinnedTask, TaskFromFriend};
 166  //!
 167  //! let config = PoolConfig::new();
 168  //! let mut pool = SchedPool::new(config);
 169  //!
 170  //! // Spawn tasks into the pool of schedulers
 171  //! pool.spawn(TaskOpts::new(), proc() {
 172  //!     // this code is running inside the pool of schedulers
 173  //!
 174  //!     spawn(proc() {
 175  //!         // this code is also running inside the same scheduler pool
 176  //!     });
 177  //! });
 178  //!
 179  //! // Dynamically add a new scheduler to the scheduler pool. This adds another
 180  //! // OS thread that green threads can be multiplexed on to.
 181  //! let mut handle = pool.spawn_sched();
 182  //!
 183  //! // Pin a task to the spawned scheduler
 184  //! let task = pool.task(TaskOpts::new(), proc() { /* ... */ });
 185  //! handle.send(PinnedTask(task));
 186  //!
 187  //! // Schedule a task on this new scheduler
 188  //! let task = pool.task(TaskOpts::new(), proc() { /* ... */ });
 189  //! handle.send(TaskFromFriend(task));
 190  //!
 191  //! // Handles keep schedulers alive, so be sure to drop all handles before
 192  //! // destroying the sched pool
 193  //! drop(handle);
 194  //!
 195  //! // Required to shut down this scheduler pool.
 196  //! // The task will fail if `shutdown` is not called.
 197  //! pool.shutdown();
 198  //! ```
 199  
 200  #![crate_id = "green#0.11-pre"]
 201  #![license = "MIT/ASL2"]
 202  #![crate_type = "rlib"]
 203  #![crate_type = "dylib"]
 204  #![doc(html_logo_url = "http://www.rust-lang.org/logos/rust-logo-128x128-blk-v2.png",
 205         html_favicon_url = "http://www.rust-lang.org/favicon.ico",
 206         html_root_url = "http://static.rust-lang.org/doc/master")]
 207  
 208  // NB this does *not* include globs, please keep it that way.
 209  #![feature(macro_rules, phase)]
 210  #![allow(visible_private_types)]
 211  #![deny(deprecated_owned_vector)]
 212  
 213  #[cfg(test)] #[phase(syntax, link)] extern crate log;
 214  #[cfg(test)] extern crate rustuv;
 215  extern crate rand;
 216  extern crate libc;
 217  
 218  use std::mem::replace;
 219  use std::os;
 220  use std::rt::rtio;
 221  use std::rt::thread::Thread;
 222  use std::rt;
 223  use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
 224  use std::sync::deque;
 225  use std::task::TaskOpts;
 226  use std::sync::arc::UnsafeArc;
 227  
 228  use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
 229  use sleeper_list::SleeperList;
 230  use stack::StackPool;
 231  use task::GreenTask;
 232  
 233  mod macros;
 234  mod simple;
 235  mod message_queue;
 236  
 237  pub mod basic;
 238  pub mod context;
 239  pub mod coroutine;
 240  pub mod sched;
 241  pub mod sleeper_list;
 242  pub mod stack;
 243  pub mod task;
 244  
 245  /// A helper macro for booting a program with libgreen
 246  ///
 247  /// # Example
 248  ///
 249  /// ```
 250  /// #![feature(phase)]
 251  /// #[phase(syntax)] extern crate green;
 252  ///
 253  /// green_start!(main)
 254  ///
 255  /// fn main() {
 256  ///     // running with libgreen
 257  /// }
 258  /// ```
 259  #[macro_export]
 260  macro_rules! green_start( ($f:ident) => (
 261      mod __start {
 262          extern crate green;
 263          extern crate rustuv;
 264  
 265          #[start]
 266          fn start(argc: int, argv: **u8) -> int {
 267              green::start(argc, argv, rustuv::event_loop, super::$f)
 268          }
 269      }
 270  ) )
 271  
 272  /// Set up a default runtime configuration, given compiler-supplied arguments.
 273  ///
 274  /// This function will block until the entire pool of M:N schedulers have
 275  /// exited. This function also requires a local task to be available.
 276  ///
 277  /// # Arguments
 278  ///
 279  /// * `argc` & `argv` - The argument vector. On Unix this information is used
 280  ///   by os::args.
 281  /// * `main` - The initial procedure to run inside of the M:N scheduling pool.
 282  ///            Once this procedure exits, the scheduling pool will begin to shut
 283  ///            down. The entire pool (and this function) will only return once
 284  ///            all child tasks have finished executing.
 285  ///
 286  /// # Return value
 287  ///
 288  /// The return value is used as the process return code. 0 on success, 101 on
 289  /// error.
 290  pub fn start(argc: int, argv: **u8,
 291               event_loop_factoryfn() -> Box<rtio::EventLoop:Send>,
 292               main: proc():Send) -> int {
 293      rt::init(argc, argv);
 294      let mut main = Some(main);
 295      let mut ret = None;
 296      simple::task().run(|| {
 297          ret = Some(run(event_loop_factory, main.take_unwrap()));
 298      });
 299      // unsafe is ok b/c we're sure that the runtime is gone
 300      unsafe { rt::cleanup() }
 301      ret.unwrap()
 302  }
 303  
 304  /// Execute the main function in a pool of M:N schedulers.
 305  ///
 306  /// Configures the runtime according to the environment, by default using a task
 307  /// scheduler with the same number of threads as cores.  Returns a process exit
 308  /// code.
 309  ///
 310  /// This function will not return until all schedulers in the associated pool
 311  /// have returned.
 312  pub fn run(event_loop_factoryfn() -> Box<rtio::EventLoop:Send>,
 313             main: proc():Send) -> int {
 314      // Create a scheduler pool and spawn the main task into this pool. We will
 315      // get notified over a channel when the main task exits.
 316      let mut cfg = PoolConfig::new();
 317      cfg.event_loop_factory = event_loop_factory;
 318      let mut pool = SchedPool::new(cfg);
 319      let (tx, rx) = channel();
 320      let mut opts = TaskOpts::new();
 321      opts.notify_chan = Some(tx);
 322      opts.name = Some("<main>".into_maybe_owned());
 323      pool.spawn(opts, main);
 324  
 325      // Wait for the main task to return, and set the process error code
 326      // appropriately.
 327      if rx.recv().is_err() {
 328          os::set_exit_status(rt::DEFAULT_ERROR_CODE);
 329      }
 330  
 331      // Now that we're sure all tasks are dead, shut down the pool of schedulers,
 332      // waiting for them all to return.
 333      pool.shutdown();
 334      os::get_exit_status()
 335  }
 336  
 337  /// Configuration of how an M:N pool of schedulers is spawned.
 338  pub struct PoolConfig {
 339      /// The number of schedulers (OS threads) to spawn into this M:N pool.
 340      pub threads: uint,
 341      /// A factory function used to create new event loops. If this is not
 342      /// specified then the default event loop factory is used.
 343      pub event_loop_factory: fn() -> Box<rtio::EventLoop:Send>,
 344  }
 345  
 346  impl PoolConfig {
 347      /// Returns the default configuration, as determined the environment
 348      /// variables of this process.
 349      pub fn new() -> PoolConfig {
 350          PoolConfig {
 351              threads: rt::default_sched_threads(),
 352              event_loop_factory: basic::event_loop,
 353          }
 354      }
 355  }
 356  
 357  /// A structure representing a handle to a pool of schedulers. This handle is
 358  /// used to keep the pool alive and also reap the status from the pool.
 359  pub struct SchedPool {
 360      id: uint,
 361      threads: Vec<Thread<()>>,
 362      handles: Vec<SchedHandle>,
 363      stealers: Vec<deque::Stealer<Box<task::GreenTask>>>,
 364      next_friend: uint,
 365      stack_pool: StackPool,
 366      deque_pool: deque::BufferPool<Box<task::GreenTask>>,
 367      sleepers: SleeperList,
 368      factory: fn() -> Box<rtio::EventLoop:Send>,
 369      task_state: TaskState,
 370      tasks_done: Receiver<()>,
 371  }
 372  
 373  /// This is an internal state shared among a pool of schedulers. This is used to
 374  /// keep track of how many tasks are currently running in the pool and then
 375  /// sending on a channel once the entire pool has been drained of all tasks.
 376  #[deriving(Clone)]
 377  struct TaskState {
 378      cnt: UnsafeArc<AtomicUint>,
 379      done: Sender<()>,
 380  }
 381  
 382  impl SchedPool {
 383      /// Execute the main function in a pool of M:N schedulers.
 384      ///
 385      /// This will configure the pool according to the `config` parameter, and
 386      /// initially run `main` inside the pool of schedulers.
 387      pub fn new(configPoolConfig) -> SchedPool {
 388          static mut POOL_ID: AtomicUint = INIT_ATOMIC_UINT;
 389  
 390          let PoolConfig {
 391              threads: nscheds,
 392              event_loop_factory: factory
 393          } = config;
 394          assert!(nscheds > 0);
 395  
 396          // The pool of schedulers that will be returned from this function
 397          let (p, state) = TaskState::new();
 398          let mut pool = SchedPool {
 399              threads: vec![],
 400              handles: vec![],
 401              stealers: vec![],
 402              id: unsafe { POOL_ID.fetch_add(1, SeqCst) },
 403              sleepers: SleeperList::new(),
 404              stack_pool: StackPool::new(),
 405              deque_pool: deque::BufferPool::new(),
 406              next_friend: 0,
 407              factory: factory,
 408              task_state: state,
 409              tasks_done: p,
 410          };
 411  
 412          // Create a work queue for each scheduler, ntimes. Create an extra
 413          // for the main thread if that flag is set. We won't steal from it.
 414          let mut workers = Vec::with_capacity(nscheds);
 415          let mut stealers = Vec::with_capacity(nscheds);
 416  
 417          for _ in range(0, nscheds) {
 418              let (w, s) = pool.deque_pool.deque();
 419              workers.push(w);
 420              stealers.push(s);
 421          }
 422          pool.stealers = stealers;
 423  
 424          // Now that we've got all our work queues, create one scheduler per
 425          // queue, spawn the scheduler into a thread, and be sure to keep a
 426          // handle to the scheduler and the thread to keep them alive.
 427          for worker in workers.move_iter() {
 428              rtdebug!("inserting a regular scheduler");
 429  
 430              let mut sched = box Scheduler::new(pool.id,
 431                                              (pool.factory)(),
 432                                              worker,
 433                                              pool.stealers.clone(),
 434                                              pool.sleepers.clone(),
 435                                              pool.task_state.clone());
 436              pool.handles.push(sched.make_handle());
 437              let sched = sched;
 438              pool.threads.push(Thread::start(proc() { sched.bootstrap(); }));
 439          }
 440  
 441          return pool;
 442      }
 443  
 444      /// Creates a new task configured to run inside of this pool of schedulers.
 445      /// This is useful to create a task which can then be sent to a specific
 446      /// scheduler created by `spawn_sched` (and possibly pin it to that
 447      /// scheduler).
 448      pub fn task(&mut self, optsTaskOpts, fproc():Send) -> Box<GreenTask> {
 449          GreenTask::configure(&mut self.stack_pool, opts, f)
 450      }
 451  
 452      /// Spawns a new task into this pool of schedulers, using the specified
 453      /// options to configure the new task which is spawned.
 454      ///
 455      /// New tasks are spawned in a round-robin fashion to the schedulers in this
 456      /// pool, but tasks can certainly migrate among schedulers once they're in
 457      /// the pool.
 458      pub fn spawn(&mut self, optsTaskOpts, fproc():Send) {
 459          let task = self.task(opts, f);
 460  
 461          // Figure out someone to send this task to
 462          let idx = self.next_friend;
 463          self.next_friend += 1;
 464          if self.next_friend >= self.handles.len() {
 465              self.next_friend = 0;
 466          }
 467  
 468          // Jettison the task away!
 469          self.handles.get_mut(idx).send(TaskFromFriend(task));
 470      }
 471  
 472      /// Spawns a new scheduler into this M:N pool. A handle is returned to the
 473      /// scheduler for use. The scheduler will not exit as long as this handle is
 474      /// active.
 475      ///
 476      /// The scheduler spawned will participate in work stealing with all of the
 477      /// other schedulers currently in the scheduler pool.
 478      pub fn spawn_sched(&mut self) -> SchedHandle {
 479          let (worker, stealer) = self.deque_pool.deque();
 480          self.stealers.push(stealer.clone());
 481  
 482          // Tell all existing schedulers about this new scheduler so they can all
 483          // steal work from it
 484          for handle in self.handles.mut_iter() {
 485              handle.send(NewNeighbor(stealer.clone()));
 486          }
 487  
 488          // Create the new scheduler, using the same sleeper list as all the
 489          // other schedulers as well as having a stealer handle to all other
 490          // schedulers.
 491          let mut sched = box Scheduler::new(self.id,
 492                                          (self.factory)(),
 493                                          worker,
 494                                          self.stealers.clone(),
 495                                          self.sleepers.clone(),
 496                                          self.task_state.clone());
 497          let ret = sched.make_handle();
 498          self.handles.push(sched.make_handle());
 499          let sched = sched;
 500          self.threads.push(Thread::start(proc() { sched.bootstrap() }));
 501  
 502          return ret;
 503      }
 504  
 505      /// Consumes the pool of schedulers, waiting for all tasks to exit and all
 506      /// schedulers to shut down.
 507      ///
 508      /// This function is required to be called in order to drop a pool of
 509      /// schedulers, it is considered an error to drop a pool without calling
 510      /// this method.
 511      ///
 512      /// This only waits for all tasks in *this pool* of schedulers to exit, any
 513      /// native tasks or extern pools will not be waited on
 514      pub fn shutdown(mut self) {
 515          self.stealers = vec![];
 516  
 517          // Wait for everyone to exit. We may have reached a 0-task count
 518          // multiple times in the past, meaning there could be several buffered
 519          // messages on the `tasks_done` port. We're guaranteed that after *some*
 520          // message the current task count will be 0, so we just receive in a
 521          // loop until everything is totally dead.
 522          while self.task_state.active() {
 523              self.tasks_done.recv();
 524          }
 525  
 526          // Now that everyone's gone, tell everything to shut down.
 527          for mut handle in replace(&mut self.handles, vec![]).move_iter() {
 528              handle.send(Shutdown);
 529          }
 530          for thread in replace(&mut self.threads, vec![]).move_iter() {
 531              thread.join();
 532          }
 533      }
 534  }
 535  
 536  impl TaskState {
 537      fn new() -> (Receiver<()>, TaskState) {
 538          let (tx, rx) = channel();
 539          (rx, TaskState {
 540              cnt: UnsafeArc::new(AtomicUint::new(0)),
 541              done: tx,
 542          })
 543      }
 544  
 545      fn increment(&mut self) {
 546          unsafe { (*self.cnt.get()).fetch_add(1, SeqCst); }
 547      }
 548  
 549      fn active(&self) -> bool {
 550          unsafe { (*self.cnt.get()).load(SeqCst) != 0 }
 551      }
 552  
 553      fn decrement(&mut self) {
 554          let prev = unsafe { (*self.cnt.get()).fetch_sub(1, SeqCst) };
 555          if prev == 1 {
 556              self.done.send(());
 557          }
 558      }
 559  }
 560  
 561  impl Drop for SchedPool {
 562      fn drop(&mut self) {
 563          if self.threads.len() > 0 {
 564              fail!("dropping a M:N scheduler pool that wasn't shut down");
 565          }
 566      }
 567  }


libgreen/lib.rs:358:72-358:72 -struct- definition:
/// used to keep the pool alive and also reap the status from the pool.
pub struct SchedPool {
    id: uint,
references:- 4
397:         let (p, state) = TaskState::new();
398:         let mut pool = SchedPool {
399:             threads: vec![],
--
561: impl Drop for SchedPool {
562:     fn drop(&mut self) {


libgreen/lib.rs:376:19-376:19 -struct- definition:
struct TaskState {
    cnt: UnsafeArc<AtomicUint>,
    done: Sender<()>,
references:- 11
375: /// sending on a channel once the entire pool has been drained of all tasks.
377: struct TaskState {
--
538:         let (tx, rx) = channel();
539:         (rx, TaskState {
540:             cnt: UnsafeArc::new(AtomicUint::new(0)),
libgreen/sched.rs:
129:                sleeper_list: SleeperList,
130:                state: TaskState)
131:         -> Scheduler {
--
144:                        friend: Option<SchedHandle>,
145:                        state: TaskState)
146:         -> Scheduler {
libgreen/lib.rs:
375: /// sending on a channel once the entire pool has been drained of all tasks.
377: struct TaskState {


libgreen/lib.rs:337:63-337:63 -struct- definition:
/// Configuration of how an M:N pool of schedulers is spawned.
pub struct PoolConfig {
    /// The number of schedulers (OS threads) to spawn into this M:N pool.
references:- 5
349:     pub fn new() -> PoolConfig {
350:         PoolConfig {
351:             threads: rt::default_sched_threads(),
--
390:         let PoolConfig {
391:             threads: nscheds,