1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Named pipes implementation for windows
12 //!
13 //! If are unfortunate enough to be reading this code, I would like to first
14 //! apologize. This was my first encounter with windows named pipes, and it
15 //! didn't exactly turn out very cleanly. If you, too, are new to named pipes,
16 //! read on as I'll try to explain some fun things that I ran into.
17 //!
18 //! # Unix pipes vs Named pipes
19 //!
20 //! As with everything else, named pipes on windows are pretty different from
21 //! unix pipes on unix. On unix, you use one "server pipe" to accept new client
22 //! pipes. So long as this server pipe is active, new children pipes can
23 //! connect. On windows, you instead have a number of "server pipes", and each
24 //! of these server pipes can throughout their lifetime be attached to a client
25 //! or not. Once attached to a client, a server pipe may then disconnect at a
26 //! later date.
27 //!
28 //! # Accepting clients
29 //!
30 //! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces
31 //! are built around the unix flavors. This means that we have one "server
32 //! pipe" to which many clients can connect. In order to make this compatible
33 //! with the windows model, each connected client consumes ownership of a server
34 //! pipe, and then a new server pipe is created for the next client.
35 //!
36 //! Note that the server pipes attached to clients are never given back to the
37 //! listener for recycling. This could possibly be implemented with a channel so
38 //! the listener half can re-use server pipes, but for now I err'd on the simple
39 //! side of things. Each stream accepted by a listener will destroy the server
40 //! pipe after the stream is dropped.
41 //!
42 //! This model ends up having a small race or two, and you can find more details
43 //! on the `native_accept` method.
44 //!
45 //! # Simultaneous reads and writes
46 //!
47 //! In testing, I found that two simultaneous writes and two simultaneous reads
48 //! on a pipe ended up working out just fine, but problems were encountered when
49 //! a read was executed simultaneously with a write. After some googling around,
50 //! it sounded like named pipes just weren't built for this kind of interaction,
51 //! and the suggested solution was to use overlapped I/O.
52 //!
53 //! I don't realy know what overlapped I/O is, but my basic understanding after
54 //! reading about it is that you have an external Event which is used to signal
55 //! I/O completion, passed around in some OVERLAPPED structures. As to what this
56 //! is, I'm not exactly sure.
57 //!
58 //! This problem implies that all named pipes are created with the
59 //! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is
60 //! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and
61 //! inside of this structure is a HANDLE from CreateEvent. After the I/O is
62 //! determined to be pending (may complete in the future), the
63 //! GetOverlappedResult function is used to block on the event, waiting for the
64 //! I/O to finish.
65 //!
66 //! This scheme ended up working well enough. There were two snags that I ran
67 //! into, however:
68 //!
69 //! * Each UnixStream instance needs its own read/write events to wait on. These
70 //! can't be shared among clones of the same stream because the documentation
71 //! states that it unsets the event when the I/O is started (would possibly
72 //! corrupt other events simultaneously waiting). For convenience's sake,
73 //! these events are lazily initialized.
74 //!
75 //! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition
76 //! to all pipes created through `connect`. Notably this means that the
77 //! ConnectNamedPipe function is nonblocking, implying that the Listener needs
78 //! to have yet another event to do the actual blocking.
79 //!
80 //! # Conclusion
81 //!
82 //! The conclusion here is that I probably don't know the best way to work with
83 //! windows named pipes, but the solution here seems to work well enough to get
84 //! the test suite passing (the suite is in libstd), and that's good enough for
85 //! me!
86
87 use libc;
88 use std::c_str::CString;
89 use std::intrinsics;
90 use std::io;
91 use std::os::win32::as_utf16_p;
92 use std::os;
93 use std::ptr;
94 use std::rt::rtio;
95 use std::sync::arc::UnsafeArc;
96 use std::sync::atomics;
97 use std::unstable::mutex;
98
99 use super::IoResult;
100 use super::c;
101 use super::util;
102
103 struct Event(libc::HANDLE);
104
105 impl Event {
106 fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
107 let event = unsafe {
108 libc::CreateEventW(ptr::mut_null(),
109 manual_reset as libc::BOOL,
110 initial_state as libc::BOOL,
111 ptr::null())
112 };
113 if event as uint == 0 {
114 Err(super::last_error())
115 } else {
116 Ok(Event(event))
117 }
118 }
119
120 fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
121 }
122
123 impl Drop for Event {
124 fn drop(&mut self) {
125 unsafe { let _ = libc::CloseHandle(self.handle()); }
126 }
127 }
128
129 struct Inner {
130 handle: libc::HANDLE,
131 lock: mutex::NativeMutex,
132 read_closed: atomics::AtomicBool,
133 write_closed: atomics::AtomicBool,
134 }
135
136 impl Inner {
137 fn new(handle: libc::HANDLE) -> Inner {
138 Inner {
139 handle: handle,
140 lock: unsafe { mutex::NativeMutex::new() },
141 read_closed: atomics::AtomicBool::new(false),
142 write_closed: atomics::AtomicBool::new(false),
143 }
144 }
145 }
146
147 impl Drop for Inner {
148 fn drop(&mut self) {
149 unsafe {
150 let _ = libc::FlushFileBuffers(self.handle);
151 let _ = libc::CloseHandle(self.handle);
152 }
153 }
154 }
155
156 unsafe fn pipe(name: *u16, init: bool) -> libc::HANDLE {
157 libc::CreateNamedPipeW(
158 name,
159 libc::PIPE_ACCESS_DUPLEX |
160 if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} |
161 libc::FILE_FLAG_OVERLAPPED,
162 libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE |
163 libc::PIPE_WAIT,
164 libc::PIPE_UNLIMITED_INSTANCES,
165 65536,
166 65536,
167 0,
168 ptr::mut_null()
169 )
170 }
171
172 pub fn await(handle: libc::HANDLE, deadline: u64,
173 overlapped: &mut libc::OVERLAPPED) -> bool {
174 if deadline == 0 { return true }
175
176 // If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
177 // to figure out if we should indeed get the result.
178 let now = ::io::timer::now();
179 let timeout = deadline < now || unsafe {
180 let ms = (deadline - now) as libc::DWORD;
181 let r = libc::WaitForSingleObject(overlapped.hEvent,
182 ms);
183 r != libc::WAIT_OBJECT_0
184 };
185 if timeout {
186 unsafe { let _ = c::CancelIo(handle); }
187 false
188 } else {
189 true
190 }
191 }
192
193 ////////////////////////////////////////////////////////////////////////////////
194 // Unix Streams
195 ////////////////////////////////////////////////////////////////////////////////
196
197 pub struct UnixStream {
198 inner: UnsafeArc<Inner>,
199 write: Option<Event>,
200 read: Option<Event>,
201 read_deadline: u64,
202 write_deadline: u64,
203 }
204
205 impl UnixStream {
206 fn try_connect(p: *u16) -> Option<libc::HANDLE> {
207 // Note that most of this is lifted from the libuv implementation.
208 // The idea is that if we fail to open a pipe in read/write mode
209 // that we try afterwards in just read or just write
210 let mut result = unsafe {
211 libc::CreateFileW(p,
212 libc::GENERIC_READ | libc::GENERIC_WRITE,
213 0,
214 ptr::mut_null(),
215 libc::OPEN_EXISTING,
216 libc::FILE_FLAG_OVERLAPPED,
217 ptr::mut_null())
218 };
219 if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE {
220 return Some(result)
221 }
222
223 let err = unsafe { libc::GetLastError() };
224 if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
225 result = unsafe {
226 libc::CreateFileW(p,
227 libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES,
228 0,
229 ptr::mut_null(),
230 libc::OPEN_EXISTING,
231 libc::FILE_FLAG_OVERLAPPED,
232 ptr::mut_null())
233 };
234 if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE {
235 return Some(result)
236 }
237 }
238 let err = unsafe { libc::GetLastError() };
239 if err == libc::ERROR_ACCESS_DENIED as libc::DWORD {
240 result = unsafe {
241 libc::CreateFileW(p,
242 libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES,
243 0,
244 ptr::mut_null(),
245 libc::OPEN_EXISTING,
246 libc::FILE_FLAG_OVERLAPPED,
247 ptr::mut_null())
248 };
249 if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE {
250 return Some(result)
251 }
252 }
253 None
254 }
255
256 pub fn connect(addr: &CString, timeout: Option<u64>) -> IoResult<UnixStream> {
257 as_utf16_p(addr.as_str().unwrap(), |p| {
258 let start = ::io::timer::now();
259 loop {
260 match UnixStream::try_connect(p) {
261 Some(handle) => {
262 let inner = Inner::new(handle);
263 let mut mode = libc::PIPE_TYPE_BYTE |
264 libc::PIPE_READMODE_BYTE |
265 libc::PIPE_WAIT;
266 let ret = unsafe {
267 libc::SetNamedPipeHandleState(inner.handle,
268 &mut mode,
269 ptr::mut_null(),
270 ptr::mut_null())
271 };
272 return if ret == 0 {
273 Err(super::last_error())
274 } else {
275 Ok(UnixStream {
276 inner: UnsafeArc::new(inner),
277 read: None,
278 write: None,
279 read_deadline: 0,
280 write_deadline: 0,
281 })
282 }
283 }
284 None => {}
285 }
286
287 // On windows, if you fail to connect, you may need to call the
288 // `WaitNamedPipe` function, and this is indicated with an error
289 // code of ERROR_PIPE_BUSY.
290 let code = unsafe { libc::GetLastError() };
291 if code as int != libc::ERROR_PIPE_BUSY as int {
292 return Err(super::last_error())
293 }
294
295 match timeout {
296 Some(timeout) => {
297 let now = ::io::timer::now();
298 let timed_out = (now - start) >= timeout || unsafe {
299 let ms = (timeout - (now - start)) as libc::DWORD;
300 libc::WaitNamedPipeW(p, ms) == 0
301 };
302 if timed_out {
303 return Err(util::timeout("connect timed out"))
304 }
305 }
306
307 // An example I found on microsoft's website used 20
308 // seconds, libuv uses 30 seconds, hence we make the
309 // obvious choice of waiting for 25 seconds.
310 None => {
311 if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 {
312 return Err(super::last_error())
313 }
314 }
315 }
316 }
317 })
318 }
319
320 fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } }
321
322 fn read_closed(&self) -> bool {
323 unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) }
324 }
325
326 fn write_closed(&self) -> bool {
327 unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) }
328 }
329
330 fn cancel_io(&self) -> IoResult<()> {
331 match unsafe { c::CancelIoEx(self.handle(), ptr::mut_null()) } {
332 0 if os::errno() == libc::ERROR_NOT_FOUND as uint => {
333 Ok(())
334 }
335 0 => Err(super::last_error()),
336 _ => Ok(())
337 }
338 }
339 }
340
341 impl rtio::RtioPipe for UnixStream {
342 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
343 if self.read.is_none() {
344 self.read = Some(try!(Event::new(true, false)));
345 }
346
347 let mut bytes_read = 0;
348 let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
349 overlapped.hEvent = self.read.get_ref().handle();
350
351 // Pre-flight check to see if the reading half has been closed. This
352 // must be done before issuing the ReadFile request, but after we
353 // acquire the lock.
354 //
355 // See comments in close_read() about why this lock is necessary.
356 let guard = unsafe { (*self.inner.get()).lock.lock() };
357 if self.read_closed() {
358 return Err(io::standard_error(io::EndOfFile))
359 }
360
361 // Issue a nonblocking requests, succeeding quickly if it happened to
362 // succeed.
363 let ret = unsafe {
364 libc::ReadFile(self.handle(),
365 buf.as_ptr() as libc::LPVOID,
366 buf.len() as libc::DWORD,
367 &mut bytes_read,
368 &mut overlapped)
369 };
370 if ret != 0 { return Ok(bytes_read as uint) }
371
372 // If our errno doesn't say that the I/O is pending, then we hit some
373 // legitimate error and reeturn immediately.
374 if os::errno() != libc::ERROR_IO_PENDING as uint {
375 return Err(super::last_error())
376 }
377
378 // Now that we've issued a successful nonblocking request, we need to
379 // wait for it to finish. This can all be done outside the lock because
380 // we'll see any invocation of CancelIoEx. We also call this in a loop
381 // because we're woken up if the writing half is closed, we just need to
382 // realize that the reading half wasn't closed and we go right back to
383 // sleep.
384 drop(guard);
385 loop {
386 // Process a timeout if one is pending
387 let succeeded = await(self.handle(), self.read_deadline,
388 &mut overlapped);
389
390 let ret = unsafe {
391 libc::GetOverlappedResult(self.handle(),
392 &mut overlapped,
393 &mut bytes_read,
394 libc::TRUE)
395 };
396 // If we succeeded, or we failed for some reason other than
397 // CancelIoEx, return immediately
398 if ret != 0 { return Ok(bytes_read as uint) }
399 if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
400 return Err(super::last_error())
401 }
402
403 // If the reading half is now closed, then we're done. If we woke up
404 // because the writing half was closed, keep trying.
405 if !succeeded {
406 return Err(io::standard_error(io::TimedOut))
407 }
408 if self.read_closed() {
409 return Err(io::standard_error(io::EndOfFile))
410 }
411 }
412 }
413
414 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
415 if self.write.is_none() {
416 self.write = Some(try!(Event::new(true, false)));
417 }
418
419 let mut offset = 0;
420 let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
421 overlapped.hEvent = self.write.get_ref().handle();
422
423 while offset < buf.len() {
424 let mut bytes_written = 0;
425
426 // This sequence below is quite similar to the one found in read().
427 // Some careful looping is done to ensure that if close_write() is
428 // invoked we bail out early, and if close_read() is invoked we keep
429 // going after we woke up.
430 //
431 // See comments in close_read() about why this lock is necessary.
432 let guard = unsafe { (*self.inner.get()).lock.lock() };
433 if self.write_closed() {
434 return Err(io::standard_error(io::BrokenPipe))
435 }
436 let ret = unsafe {
437 libc::WriteFile(self.handle(),
438 buf.slice_from(offset).as_ptr() as libc::LPVOID,
439 (buf.len() - offset) as libc::DWORD,
440 &mut bytes_written,
441 &mut overlapped)
442 };
443 let err = os::errno();
444 drop(guard);
445
446 if ret == 0 {
447 if err != libc::ERROR_IO_PENDING as uint {
448 return Err(io::IoError::from_errno(err, true));
449 }
450 // Process a timeout if one is pending
451 let succeeded = await(self.handle(), self.write_deadline,
452 &mut overlapped);
453 let ret = unsafe {
454 libc::GetOverlappedResult(self.handle(),
455 &mut overlapped,
456 &mut bytes_written,
457 libc::TRUE)
458 };
459 // If we weren't aborted, this was a legit error, if we were
460 // aborted, then check to see if the write half was actually
461 // closed or whether we woke up from the read half closing.
462 if ret == 0 {
463 if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
464 return Err(super::last_error())
465 }
466 if !succeeded {
467 let amt = offset + bytes_written as uint;
468 return if amt > 0 {
469 Err(io::IoError {
470 kind: io::ShortWrite(amt),
471 desc: "short write during write",
472 detail: None,
473 })
474 } else {
475 Err(util::timeout("write timed out"))
476 }
477 }
478 if self.write_closed() {
479 return Err(io::standard_error(io::BrokenPipe))
480 }
481 continue // retry
482 }
483 }
484 offset += bytes_written as uint;
485 }
486 Ok(())
487 }
488
489 fn clone(&self) -> Box<rtio::RtioPipe:Send> {
490 box UnixStream {
491 inner: self.inner.clone(),
492 read: None,
493 write: None,
494 read_deadline: 0,
495 write_deadline: 0,
496 } as Box<rtio::RtioPipe:Send>
497 }
498
499 fn close_read(&mut self) -> IoResult<()> {
500 // On windows, there's no actual shutdown() method for pipes, so we're
501 // forced to emulate the behavior manually at the application level. To
502 // do this, we need to both cancel any pending requests, as well as
503 // prevent all future requests from succeeding. These two operations are
504 // not atomic with respect to one another, so we must use a lock to do
505 // so.
506 //
507 // The read() code looks like:
508 //
509 // 1. Make sure the pipe is still open
510 // 2. Submit a read request
511 // 3. Wait for the read request to finish
512 //
513 // The race this lock is preventing is if another thread invokes
514 // close_read() between steps 1 and 2. By atomically executing steps 1
515 // and 2 with a lock with respect to close_read(), we're guaranteed that
516 // no thread will erroneously sit in a read forever.
517 let _guard = unsafe { (*self.inner.get()).lock.lock() };
518 unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) }
519 self.cancel_io()
520 }
521
522 fn close_write(&mut self) -> IoResult<()> {
523 // see comments in close_read() for why this lock is necessary
524 let _guard = unsafe { (*self.inner.get()).lock.lock() };
525 unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) }
526 self.cancel_io()
527 }
528
529 fn set_timeout(&mut self, timeout: Option<u64>) {
530 let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
531 self.read_deadline = deadline;
532 self.write_deadline = deadline;
533 }
534 fn set_read_timeout(&mut self, timeout: Option<u64>) {
535 self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
536 }
537 fn set_write_timeout(&mut self, timeout: Option<u64>) {
538 self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
539 }
540 }
541
542 ////////////////////////////////////////////////////////////////////////////////
543 // Unix Listener
544 ////////////////////////////////////////////////////////////////////////////////
545
546 pub struct UnixListener {
547 handle: libc::HANDLE,
548 name: CString,
549 }
550
551 impl UnixListener {
552 pub fn bind(addr: &CString) -> IoResult<UnixListener> {
553 // Although we technically don't need the pipe until much later, we
554 // create the initial handle up front to test the validity of the name
555 // and such.
556 as_utf16_p(addr.as_str().unwrap(), |p| {
557 let ret = unsafe { pipe(p, true) };
558 if ret == libc::INVALID_HANDLE_VALUE as libc::HANDLE {
559 Err(super::last_error())
560 } else {
561 Ok(UnixListener { handle: ret, name: addr.clone() })
562 }
563 })
564 }
565
566 pub fn native_listen(self) -> IoResult<UnixAcceptor> {
567 Ok(UnixAcceptor {
568 listener: self,
569 event: try!(Event::new(true, false)),
570 deadline: 0,
571 })
572 }
573 }
574
575 impl Drop for UnixListener {
576 fn drop(&mut self) {
577 unsafe { let _ = libc::CloseHandle(self.handle); }
578 }
579 }
580
581 impl rtio::RtioUnixListener for UnixListener {
582 fn listen(~self) -> IoResult<Box<rtio::RtioUnixAcceptor:Send>> {
583 self.native_listen().map(|a| {
584 box a as Box<rtio::RtioUnixAcceptor:Send>
585 })
586 }
587 }
588
589 pub struct UnixAcceptor {
590 listener: UnixListener,
591 event: Event,
592 deadline: u64,
593 }
594
595 impl UnixAcceptor {
596 pub fn native_accept(&mut self) -> IoResult<UnixStream> {
597 // This function has some funky implementation details when working with
598 // unix pipes. On windows, each server named pipe handle can be
599 // connected to a one or zero clients. To the best of my knowledge, a
600 // named server is considered active and present if there exists at
601 // least one server named pipe for it.
602 //
603 // The model of this function is to take the current known server
604 // handle, connect a client to it, and then transfer ownership to the
605 // UnixStream instance. The next time accept() is invoked, it'll need a
606 // different server handle to connect a client to.
607 //
608 // Note that there is a possible race here. Once our server pipe is
609 // handed off to a `UnixStream` object, the stream could be closed,
610 // meaning that there would be no active server pipes, hence even though
611 // we have a valid `UnixAcceptor`, no one can connect to it. For this
612 // reason, we generate the next accept call's server pipe at the end of
613 // this function call.
614 //
615 // This provides us an invariant that we always have at least one server
616 // connection open at a time, meaning that all connects to this acceptor
617 // should succeed while this is active.
618 //
619 // The actual implementation of doing this is a little tricky. Once a
620 // server pipe is created, a client can connect to it at any time. I
621 // assume that which server a client connects to is nondeterministic, so
622 // we also need to guarantee that the only server able to be connected
623 // to is the one that we're calling ConnectNamedPipe on. This means that
624 // we have to create the second server pipe *after* we've already
625 // accepted a connection. In order to at least somewhat gracefully
626 // handle errors, this means that if the second server pipe creation
627 // fails that we disconnect the connected client and then just keep
628 // using the original server pipe.
629 let handle = self.listener.handle;
630
631 // Once we've got a "server handle", we need to wait for a client to
632 // connect. The ConnectNamedPipe function will block this thread until
633 // someone on the other end connects. This function can "fail" if a
634 // client connects after we created the pipe but before we got down
635 // here. Thanks windows.
636 let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
637 overlapped.hEvent = self.event.handle();
638 if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } {
639 let mut err = unsafe { libc::GetLastError() };
640
641 if err == libc::ERROR_IO_PENDING as libc::DWORD {
642 // Process a timeout if one is pending
643 let _ = await(handle, self.deadline, &mut overlapped);
644
645 // This will block until the overlapped I/O is completed. The
646 // timeout was previously handled, so this will either block in
647 // the normal case or succeed very quickly in the timeout case.
648 let ret = unsafe {
649 let mut transfer = 0;
650 libc::GetOverlappedResult(handle,
651 &mut overlapped,
652 &mut transfer,
653 libc::TRUE)
654 };
655 if ret == 0 {
656 err = unsafe { libc::GetLastError() };
657 } else {
658 // we succeeded, bypass the check below
659 err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
660 }
661 }
662 if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD {
663 return Err(super::last_error())
664 }
665 }
666
667 // Now that we've got a connected client to our handle, we need to
668 // create a second server pipe. If this fails, we disconnect the
669 // connected client and return an error (see comments above).
670 let new_handle = as_utf16_p(self.listener.name.as_str().unwrap(), |p| {
671 unsafe { pipe(p, false) }
672 });
673 if new_handle == libc::INVALID_HANDLE_VALUE as libc::HANDLE {
674 let ret = Err(super::last_error());
675 // If our disconnection fails, then there's not really a whole lot
676 // that we can do, so fail the task.
677 let err = unsafe { libc::DisconnectNamedPipe(handle) };
678 assert!(err != 0);
679 return ret;
680 } else {
681 self.listener.handle = new_handle;
682 }
683
684 // Transfer ownership of our handle into this stream
685 Ok(UnixStream {
686 inner: UnsafeArc::new(Inner::new(handle)),
687 read: None,
688 write: None,
689 read_deadline: 0,
690 write_deadline: 0,
691 })
692 }
693 }
694
695 impl rtio::RtioUnixAcceptor for UnixAcceptor {
696 fn accept(&mut self) -> IoResult<Box<rtio::RtioPipe:Send>> {
697 self.native_accept().map(|s| box s as Box<rtio::RtioPipe:Send>)
698 }
699 fn set_timeout(&mut self, timeout: Option<u64>) {
700 self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0);
701 }
702 }
703