(index<- )        ./libnative/io/pipe_win32.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! Named pipes implementation for windows
  12  //!
  13  //! If are unfortunate enough to be reading this code, I would like to first
  14  //! apologize. This was my first encounter with windows named pipes, and it
  15  //! didn't exactly turn out very cleanly. If you, too, are new to named pipes,
  16  //! read on as I'll try to explain some fun things that I ran into.
  17  //!
  18  //! # Unix pipes vs Named pipes
  19  //!
  20  //! As with everything else, named pipes on windows are pretty different from
  21  //! unix pipes on unix. On unix, you use one "server pipe" to accept new client
  22  //! pipes. So long as this server pipe is active, new children pipes can
  23  //! connect. On windows, you instead have a number of "server pipes", and each
  24  //! of these server pipes can throughout their lifetime be attached to a client
  25  //! or not. Once attached to a client, a server pipe may then disconnect at a
  26  //! later date.
  27  //!
  28  //! # Accepting clients
  29  //!
  30  //! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces
  31  //! are built around the unix flavors. This means that we have one "server
  32  //! pipe" to which many clients can connect. In order to make this compatible
  33  //! with the windows model, each connected client consumes ownership of a server
  34  //! pipe, and then a new server pipe is created for the next client.
  35  //!
  36  //! Note that the server pipes attached to clients are never given back to the
  37  //! listener for recycling. This could possibly be implemented with a channel so
  38  //! the listener half can re-use server pipes, but for now I err'd on the simple
  39  //! side of things. Each stream accepted by a listener will destroy the server
  40  //! pipe after the stream is dropped.
  41  //!
  42  //! This model ends up having a small race or two, and you can find more details
  43  //! on the `native_accept` method.
  44  //!
  45  //! # Simultaneous reads and writes
  46  //!
  47  //! In testing, I found that two simultaneous writes and two simultaneous reads
  48  //! on a pipe ended up working out just fine, but problems were encountered when
  49  //! a read was executed simultaneously with a write. After some googling around,
  50  //! it sounded like named pipes just weren't built for this kind of interaction,
  51  //! and the suggested solution was to use overlapped I/O.
  52  //!
  53  //! I don't realy know what overlapped I/O is, but my basic understanding after
  54  //! reading about it is that you have an external Event which is used to signal
  55  //! I/O completion, passed around in some OVERLAPPED structures. As to what this
  56  //! is, I'm not exactly sure.
  57  //!
  58  //! This problem implies that all named pipes are created with the
  59  //! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is
  60  //! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and
  61  //! inside of this structure is a HANDLE from CreateEvent. After the I/O is
  62  //! determined to be pending (may complete in the future), the
  63  //! GetOverlappedResult function is used to block on the event, waiting for the
  64  //! I/O to finish.
  65  //!
  66  //! This scheme ended up working well enough. There were two snags that I ran
  67  //! into, however:
  68  //!
  69  //! * Each UnixStream instance needs its own read/write events to wait on. These
  70  //!   can't be shared among clones of the same stream because the documentation
  71  //!   states that it unsets the event when the I/O is started (would possibly
  72  //!   corrupt other events simultaneously waiting). For convenience's sake,
  73  //!   these events are lazily initialized.
  74  //!
  75  //! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition
  76  //!   to all pipes created through `connect`. Notably this means that the
  77  //!   ConnectNamedPipe function is nonblocking, implying that the Listener needs
  78  //!   to have yet another event to do the actual blocking.
  79  //!
  80  //! # Conclusion
  81  //!
  82  //! The conclusion here is that I probably don't know the best way to work with
  83  //! windows named pipes, but the solution here seems to work well enough to get
  84  //! the test suite passing (the suite is in libstd), and that's good enough for
  85  //! me!
  86  
  87  use libc;
  88  use std::c_str::CString;
  89  use std::intrinsics;
  90  use std::io;
  91  use std::os::win32::as_utf16_p;
  92  use std::os;
  93  use std::ptr;
  94  use std::rt::rtio;
  95  use std::sync::arc::UnsafeArc;
  96  use std::sync::atomics;
  97  use std::unstable::mutex;
  98  
  99  use super::IoResult;
 100  use super::c;
 101  use super::util;
 102  
 103  struct Event(libc::HANDLE);
 104  
 105  impl Event {
 106      fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
 107          let event = unsafe {
 108              libc::CreateEventW(ptr::mut_null(),
 109                                 manual_reset as libc::BOOL,
 110                                 initial_state as libc::BOOL,
 111                                 ptr::null())
 112          };
 113          if event as uint == 0 {
 114              Err(super::last_error())
 115          } else {
 116              Ok(Event(event))
 117          }
 118      }
 119  
 120      fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
 121  }
 122  
 123  impl Drop for Event {
 124      fn drop(&mut self) {
 125          unsafe { let _ = libc::CloseHandle(self.handle()); }
 126      }
 127  }
 128  
 129  struct Inner {
 130      handle: libc::HANDLE,
 131      lock: mutex::NativeMutex,
 132      read_closed: atomics::AtomicBool,
 133      write_closed: atomics::AtomicBool,
 134  }
 135  
 136  impl Inner {
 137      fn new(handle: libc::HANDLE) -> Inner {
 138          Inner {
 139              handle: handle,
 140              lock: unsafe { mutex::NativeMutex::new() },
 141              read_closed: atomics::AtomicBool::new(false),
 142              write_closed: atomics::AtomicBool::new(false),
 143          }
 144      }
 145  }
 146  
 147  impl Drop for Inner {
 148      fn drop(&mut self) {
 149          unsafe {
 150              let _ = libc::FlushFileBuffers(self.handle);
 151              let _ = libc::CloseHandle(self.handle);
 152          }
 153      }
 154  }
 155  
 156  unsafe fn pipe(name: *u16, init: bool) -> libc::HANDLE {
 157      libc::CreateNamedPipeW(
 158          name,
 159          libc::PIPE_ACCESS_DUPLEX |
 160              if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} |
 161              libc::FILE_FLAG_OVERLAPPED,
 162          libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE |
 163              libc::PIPE_WAIT,
 164          libc::PIPE_UNLIMITED_INSTANCES,
 165          65536,
 166          65536,
 167          0,
 168          ptr::mut_null()
 169      )
 170  }
 171  
 172  pub fn await(handle: libc::HANDLE, deadline: u64,
 173               overlapped: &mut libc::OVERLAPPED) -> bool {
 174      if deadline == 0 { return true }
 175  
 176      // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
 177      // to figure out if we should indeed get the result.
 178      let now = ::io::timer::now();
 179      let timeout = deadline < now || unsafe {
 180          let ms = (deadline - now) as libc::DWORD;
 181          let r = libc::WaitForSingleObject(overlapped.hEvent,
 182                                            ms);
 183          r != libc::WAIT_OBJECT_0
 184      };
 185      if timeout {
 186          unsafe { let _ = c::CancelIo(handle); }
 187          false
 188      } else {
 189          true
 190      }
 191  }
 192  
 193  ////////////////////////////////////////////////////////////////////////////////
 194  // Unix Streams
 195  ////////////////////////////////////////////////////////////////////////////////
 196  
 197  pub struct UnixStream {
 198      inner: UnsafeArc<Inner>,
 199      write: Option<Event>,
 200      read: Option<Event>,
 201      read_deadline: u64,
 202      write_deadline: u64,
 203  }
 204  
 205  impl UnixStream {
 206      fn try_connect(p: *u16) -> Option<libc::HANDLE> {
 207          // Note that most of this is lifted from the libuv implementation.
 208          // The idea is that if we fail to open a pipe in read/write mode
 209          // that we try afterwards in just read or just write
 210          let mut result = unsafe {
 211              libc::CreateFileW(p,
 212                  libc::GENERIC_READ | libc::GENERIC_WRITE,
 213                  0,
 214                  ptr::mut_null(),
 215                  libc::OPEN_EXISTING,
 216                  libc::FILE_FLAG_OVERLAPPED,
 217                  ptr::mut_null())
 218          };
 219          if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE {
 220              return Some(result)
 221          }
 222  
 223          let err = unsafe { libc::GetLastError() };
 224          if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
 225              result = unsafe {
 226                  libc::CreateFileW(p,
 227                      libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES,
 228                      0,
 229                      ptr::mut_null(),
 230                      libc::OPEN_EXISTING,
 231                      libc::FILE_FLAG_OVERLAPPED,
 232                      ptr::mut_null())
 233              };
 234              if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE {
 235                  return Some(result)
 236              }
 237          }
 238          let err = unsafe { libc::GetLastError() };
 239          if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
 240              result = unsafe {
 241                  libc::CreateFileW(p,
 242                      libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES,
 243                      0,
 244                      ptr::mut_null(),
 245                      libc::OPEN_EXISTING,
 246                      libc::FILE_FLAG_OVERLAPPED,
 247                      ptr::mut_null())
 248              };
 249              if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE {
 250                  return Some(result)
 251              }
 252          }
 253          None
 254      }
 255  
 256      pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
 257          as_utf16_p(addr.as_str().unwrap(), |p| {
 258              let start = ::io::timer::now();
 259              loop {
 260                  match UnixStream::try_connect(p) {
 261                      Some(handle) => {
 262                          let inner = Inner::new(handle);
 263                          let mut mode = libc::PIPE_TYPE_BYTE |
 264                                         libc::PIPE_READMODE_BYTE |
 265                                         libc::PIPE_WAIT;
 266                          let ret = unsafe {
 267                              libc::SetNamedPipeHandleState(inner.handle,
 268                                                            &mut mode,
 269                                                            ptr::mut_null(),
 270                                                            ptr::mut_null())
 271                          };
 272                          return if ret == 0 {
 273                              Err(super::last_error())
 274                          } else {
 275                              Ok(UnixStream {
 276                                  inner: UnsafeArc::new(inner),
 277                                  read: None,
 278                                  write: None,
 279                                  read_deadline: 0,
 280                                  write_deadline: 0,
 281                              })
 282                          }
 283                      }
 284                      None => {}
 285                  }
 286  
 287                  // On windows, if you fail to connect, you may need to call the
 288                  // `WaitNamedPipe` function, and this is indicated with an error
 289                  // code of ERROR_PIPE_BUSY.
 290                  let code = unsafe { libc::GetLastError() };
 291                  if code as int != libc::ERROR_PIPE_BUSY as int {
 292                      return Err(super::last_error())
 293                  }
 294  
 295                  match timeout {
 296                      Some(timeout) => {
 297                          let now = ::io::timer::now();
 298                          let timed_out = (now - start) >= timeout || unsafe {
 299                              let ms = (timeout - (now - start)) as libc::DWORD;
 300                              libc::WaitNamedPipeW(p, ms) == 0
 301                          };
 302                          if timed_out {
 303                              return Err(util::timeout("connect timed out"))
 304                          }
 305                      }
 306  
 307                      // An example I found on microsoft's website used 20
 308                      // seconds, libuv uses 30 seconds, hence we make the
 309                      // obvious choice of waiting for 25 seconds.
 310                      None => {
 311                          if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 {
 312                              return Err(super::last_error())
 313                          }
 314                      }
 315                  }
 316              }
 317          })
 318      }
 319  
 320      fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } }
 321  
 322      fn read_closed(&self) -> bool {
 323          unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) }
 324      }
 325  
 326      fn write_closed(&self) -> bool {
 327          unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) }
 328      }
 329  
 330      fn cancel_io(&self) -> IoResult<(){
 331          match unsafe { c::CancelIoEx(self.handle(), ptr::mut_null()) } {
 332              0 if os::errno() == libc::ERROR_NOT_FOUND as uint => {
 333                  Ok(())
 334              }
 335              0 => Err(super::last_error()),
 336              _ => Ok(())
 337          }
 338      }
 339  }
 340  
 341  impl rtio::RtioPipe for UnixStream {
 342      fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
 343          if self.read.is_none() {
 344              self.read = Some(try!(Event::new(true, false)));
 345          }
 346  
 347          let mut bytes_read = 0;
 348          let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
 349          overlapped.hEvent = self.read.get_ref().handle();
 350  
 351          // Pre-flight check to see if the reading half has been closed. This
 352          // must be done before issuing the ReadFile request, but after we
 353          // acquire the lock.
 354          //
 355          // See comments in close_read() about why this lock is necessary.
 356          let guard = unsafe { (*self.inner.get()).lock.lock() };
 357          if self.read_closed() {
 358              return Err(io::standard_error(io::EndOfFile))
 359          }
 360  
 361          // Issue a nonblocking requests, succeeding quickly if it happened to
 362          // succeed.
 363          let ret = unsafe {
 364              libc::ReadFile(self.handle(),
 365                             buf.as_ptr() as libc::LPVOID,
 366                             buf.len() as libc::DWORD,
 367                             &mut bytes_read,
 368                             &mut overlapped)
 369          };
 370          if ret != 0 { return Ok(bytes_read as uint) }
 371  
 372          // If our errno doesn't say that the I/O is pending, then we hit some
 373          // legitimate error and reeturn immediately.
 374          if os::errno() != libc::ERROR_IO_PENDING as uint {
 375              return Err(super::last_error())
 376          }
 377  
 378          // Now that we've issued a successful nonblocking request, we need to
 379          // wait for it to finish. This can all be done outside the lock because
 380          // we'll see any invocation of CancelIoEx. We also call this in a loop
 381          // because we're woken up if the writing half is closed, we just need to
 382          // realize that the reading half wasn't closed and we go right back to
 383          // sleep.
 384          drop(guard);
 385          loop {
 386              // Process a timeout if one is pending
 387              let succeeded = await(self.handle(), self.read_deadline,
 388                                    &mut overlapped);
 389  
 390              let ret = unsafe {
 391                  libc::GetOverlappedResult(self.handle(),
 392                                            &mut overlapped,
 393                                            &mut bytes_read,
 394                                            libc::TRUE)
 395              };
 396              // If we succeeded, or we failed for some reason other than
 397              // CancelIoEx, return immediately
 398              if ret != 0 { return Ok(bytes_read as uint) }
 399              if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
 400                  return Err(super::last_error())
 401              }
 402  
 403              // If the reading half is now closed, then we're done. If we woke up
 404              // because the writing half was closed, keep trying.
 405              if !succeeded {
 406                  return Err(io::standard_error(io::TimedOut))
 407              }
 408              if self.read_closed() {
 409                  return Err(io::standard_error(io::EndOfFile))
 410              }
 411          }
 412      }
 413  
 414      fn write(&mut self, buf: &[u8]) -> IoResult<(){
 415          if self.write.is_none() {
 416              self.write = Some(try!(Event::new(true, false)));
 417          }
 418  
 419          let mut offset = 0;
 420          let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
 421          overlapped.hEvent = self.write.get_ref().handle();
 422  
 423          while offset < buf.len() {
 424              let mut bytes_written = 0;
 425  
 426              // This sequence below is quite similar to the one found in read().
 427              // Some careful looping is done to ensure that if close_write() is
 428              // invoked we bail out early, and if close_read() is invoked we keep
 429              // going after we woke up.
 430              //
 431              // See comments in close_read() about why this lock is necessary.
 432              let guard = unsafe { (*self.inner.get()).lock.lock() };
 433              if self.write_closed() {
 434                  return Err(io::standard_error(io::BrokenPipe))
 435              }
 436              let ret = unsafe {
 437                  libc::WriteFile(self.handle(),
 438                                  buf.slice_from(offset).as_ptr() as libc::LPVOID,
 439                                  (buf.len() - offset) as libc::DWORD,
 440                                  &mut bytes_written,
 441                                  &mut overlapped)
 442              };
 443              let err = os::errno();
 444              drop(guard);
 445  
 446              if ret == 0 {
 447                  if err != libc::ERROR_IO_PENDING as uint {
 448                      return Err(io::IoError::from_errno(err, true));
 449                  }
 450                  // Process a timeout if one is pending
 451                  let succeeded = await(self.handle(), self.write_deadline,
 452                                        &mut overlapped);
 453                  let ret = unsafe {
 454                      libc::GetOverlappedResult(self.handle(),
 455                                                &mut overlapped,
 456                                                &mut bytes_written,
 457                                                libc::TRUE)
 458                  };
 459                  // If we weren't aborted, this was a legit error, if we were
 460                  // aborted, then check to see if the write half was actually
 461                  // closed or whether we woke up from the read half closing.
 462                  if ret == 0 {
 463                      if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
 464                          return Err(super::last_error())
 465                      }
 466                      if !succeeded {
 467                          let amt = offset + bytes_written as uint;
 468                          return if amt > 0 {
 469                              Err(io::IoError {
 470                                  kind: io::ShortWrite(amt),
 471                                  desc: "short write during write",
 472                                  detail: None,
 473                              })
 474                          } else {
 475                              Err(util::timeout("write timed out"))
 476                          }
 477                      }
 478                      if self.write_closed() {
 479                          return Err(io::standard_error(io::BrokenPipe))
 480                      }
 481                      continue // retry
 482                  }
 483              }
 484              offset += bytes_written as uint;
 485          }
 486          Ok(())
 487      }
 488  
 489      fn clone(&self) -> Box<rtio::RtioPipe:Send> {
 490          box UnixStream {
 491              inner: self.inner.clone(),
 492              read: None,
 493              write: None,
 494              read_deadline: 0,
 495              write_deadline: 0,
 496          } as Box<rtio::RtioPipe:Send>
 497      }
 498  
 499      fn close_read(&mut self) -> IoResult<(){
 500          // On windows, there's no actual shutdown() method for pipes, so we're
 501          // forced to emulate the behavior manually at the application level. To
 502          // do this, we need to both cancel any pending requests, as well as
 503          // prevent all future requests from succeeding. These two operations are
 504          // not atomic with respect to one another, so we must use a lock to do
 505          // so.
 506          //
 507          // The read() code looks like:
 508          //
 509          //      1. Make sure the pipe is still open
 510          //      2. Submit a read request
 511          //      3. Wait for the read request to finish
 512          //
 513          // The race this lock is preventing is if another thread invokes
 514          // close_read() between steps 1 and 2. By atomically executing steps 1
 515          // and 2 with a lock with respect to close_read(), we're guaranteed that
 516          // no thread will erroneously sit in a read forever.
 517          let _guard = unsafe { (*self.inner.get()).lock.lock() };
 518          unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) }
 519          self.cancel_io()
 520      }
 521  
 522      fn close_write(&mut self) -> IoResult<(){
 523          // see comments in close_read() for why this lock is necessary
 524          let _guard = unsafe { (*self.inner.get()).lock.lock() };
 525          unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) }
 526          self.cancel_io()
 527      }
 528  
 529      fn set_timeout(&mut self, timeout: Option<u64>) {
 530          let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 531          self.read_deadline = deadline;
 532          self.write_deadline = deadline;
 533      }
 534      fn set_read_timeout(&mut self, timeout: Option<u64>) {
 535          self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 536      }
 537      fn set_write_timeout(&mut self, timeout: Option<u64>) {
 538          self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
 539      }
 540  }
 541  
 542  ////////////////////////////////////////////////////////////////////////////////
 543  // Unix Listener
 544  ////////////////////////////////////////////////////////////////////////////////
 545  
 546  pub struct UnixListener {
 547      handle: libc::HANDLE,
 548      name: CString,
 549  }
 550  
 551  impl UnixListener {
 552      pub fn bind(addr: &CString) -> IoResult<UnixListener> {
 553          // Although we technically don't need the pipe until much later, we
 554          // create the initial handle up front to test the validity of the name
 555          // and such.
 556          as_utf16_p(addr.as_str().unwrap(), |p| {
 557              let ret = unsafe { pipe(p, true) };
 558              if ret == libc::INVALID_HANDLE_VALUE as libc::HANDLE {
 559                  Err(super::last_error())
 560              } else {
 561                  Ok(UnixListener { handle: ret, name: addr.clone() })
 562              }
 563          })
 564      }
 565  
 566      pub fn native_listen(self) -> IoResult<UnixAcceptor> {
 567          Ok(UnixAcceptor {
 568              listener: self,
 569              event: try!(Event::new(true, false)),
 570              deadline: 0,
 571          })
 572      }
 573  }
 574  
 575  impl Drop for UnixListener {
 576      fn drop(&mut self) {
 577          unsafe { let _ = libc::CloseHandle(self.handle); }
 578      }
 579  }
 580  
 581  impl rtio::RtioUnixListener for UnixListener {
 582      fn listen(~self) -> IoResult<Box<rtio::RtioUnixAcceptor:Send>> {
 583          self.native_listen().map(|a| {
 584              box a as Box<rtio::RtioUnixAcceptor:Send>
 585          })
 586      }
 587  }
 588  
 589  pub struct UnixAcceptor {
 590      listener: UnixListener,
 591      event: Event,
 592      deadline: u64,
 593  }
 594  
 595  impl UnixAcceptor {
 596      pub fn native_accept(&mut self) -> IoResult<UnixStream> {
 597          // This function has some funky implementation details when working with
 598          // unix pipes. On windows, each server named pipe handle can be
 599          // connected to a one or zero clients. To the best of my knowledge, a
 600          // named server is considered active and present if there exists at
 601          // least one server named pipe for it.
 602          //
 603          // The model of this function is to take the current known server
 604          // handle, connect a client to it, and then transfer ownership to the
 605          // UnixStream instance. The next time accept() is invoked, it'll need a
 606          // different server handle to connect a client to.
 607          //
 608          // Note that there is a possible race here. Once our server pipe is
 609          // handed off to a `UnixStream` object, the stream could be closed,
 610          // meaning that there would be no active server pipes, hence even though
 611          // we have a valid `UnixAcceptor`, no one can connect to it. For this
 612          // reason, we generate the next accept call's server pipe at the end of
 613          // this function call.
 614          //
 615          // This provides us an invariant that we always have at least one server
 616          // connection open at a time, meaning that all connects to this acceptor
 617          // should succeed while this is active.
 618          //
 619          // The actual implementation of doing this is a little tricky. Once a
 620          // server pipe is created, a client can connect to it at any time. I
 621          // assume that which server a client connects to is nondeterministic, so
 622          // we also need to guarantee that the only server able to be connected
 623          // to is the one that we're calling ConnectNamedPipe on. This means that
 624          // we have to create the second server pipe *after* we've already
 625          // accepted a connection. In order to at least somewhat gracefully
 626          // handle errors, this means that if the second server pipe creation
 627          // fails that we disconnect the connected client and then just keep
 628          // using the original server pipe.
 629          let handle = self.listener.handle;
 630  
 631          // Once we've got a "server handle", we need to wait for a client to
 632          // connect. The ConnectNamedPipe function will block this thread until
 633          // someone on the other end connects. This function can "fail" if a
 634          // client connects after we created the pipe but before we got down
 635          // here. Thanks windows.
 636          let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
 637          overlapped.hEvent = self.event.handle();
 638          if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
 639              let mut err = unsafe { libc::GetLastError() };
 640  
 641              if err == libc::ERROR_IO_PENDING as libc::DWORD {
 642                  // Process a timeout if one is pending
 643                  let _ = await(handle, self.deadline, &mut overlapped);
 644  
 645                  // This will block until the overlapped I/O is completed. The
 646                  // timeout was previously handled, so this will either block in
 647                  // the normal case or succeed very quickly in the timeout case.
 648                  let ret = unsafe {
 649                      let mut transfer = 0;
 650                      libc::GetOverlappedResult(handle,
 651                                                &mut overlapped,
 652                                                &mut transfer,
 653                                                libc::TRUE)
 654                  };
 655                  if ret == 0 {
 656                      err = unsafe { libc::GetLastError() };
 657                  } else {
 658                      // we succeeded, bypass the check below
 659                      err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
 660                  }
 661              }
 662              if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD {
 663                  return Err(super::last_error())
 664              }
 665          }
 666  
 667          // Now that we've got a connected client to our handle, we need to
 668          // create a second server pipe. If this fails, we disconnect the
 669          // connected client and return an error (see comments above).
 670          let new_handle = as_utf16_p(self.listener.name.as_str().unwrap(), |p| {
 671              unsafe { pipe(p, false) }
 672          });
 673          if new_handle == libc::INVALID_HANDLE_VALUE as libc::HANDLE {
 674              let ret = Err(super::last_error());
 675              // If our disconnection fails, then there's not really a whole lot
 676              // that we can do, so fail the task.
 677              let err = unsafe { libc::DisconnectNamedPipe(handle) };
 678              assert!(err != 0);
 679              return ret;
 680          } else {
 681              self.listener.handle = new_handle;
 682          }
 683  
 684          // Transfer ownership of our handle into this stream
 685          Ok(UnixStream {
 686              inner: UnsafeArc::new(Inner::new(handle)),
 687              read: None,
 688              write: None,
 689              read_deadline: 0,
 690              write_deadline: 0,
 691          })
 692      }
 693  }
 694  
 695  impl rtio::RtioUnixAcceptor for UnixAcceptor {
 696      fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe:Send>> {
 697          self.native_accept().map(|s| box s as Box<rtio::RtioPipe:Send>)
 698      }
 699      fn set_timeout(&mut self, timeout: Option<u64>) {
 700          self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
 701      }
 702  }
 703