(index<- ) ./librustuv/stream.rs
git branch: * master 5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
modified: Fri May 9 13:02:28 2014
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use std::cast;
12 use libc::{c_int, size_t, ssize_t};
13 use std::ptr;
14 use std::rt::task::BlockedTask;
15
16 use Loop;
17 use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
18 ForbidUnwind, wakeup};
19 use uvll;
20
21 // This is a helper structure which is intended to get embedded into other
22 // Watcher structures. This structure will retain a handle to the underlying
23 // uv_stream_t instance, and all I/O operations assume that it's already located
24 // on the appropriate scheduler.
25 pub struct StreamWatcher {
26 pub handle: *uvll::uv_stream_t,
27
28 // Cache the last used uv_write_t so we don't have to allocate a new one on
29 // every call to uv_write(). Ideally this would be a stack-allocated
30 // structure, but currently we don't have mappings for all the structures
31 // defined in libuv, so we're foced to malloc this.
32 last_write_req: Option<Request>,
33
34 blocked_writer: Option<BlockedTask>,
35 }
36
37 struct ReadContext {
38 buf: Option<Buf>,
39 result: ssize_t,
40 task: Option<BlockedTask>,
41 }
42
43 struct WriteContext {
44 result: c_int,
45 stream: *mut StreamWatcher,
46 data: Option<Vec<u8>>,
47 }
48
49 impl StreamWatcher {
50 // Creates a new helper structure which should be then embedded into another
51 // watcher. This provides the generic read/write methods on streams.
52 //
53 // This structure will *not* close the stream when it is dropped. It is up
54 // to the enclosure structure to be sure to call the close method (which
55 // will block the task). Note that this is also required to prevent memory
56 // leaks.
57 //
58 // It should also be noted that the `data` field of the underlying uv handle
59 // will be manipulated on each of the methods called on this watcher.
60 // Wrappers should ensure to always reset the field to an appropriate value
61 // if they rely on the field to perform an action.
62 pub fn new(stream: *uvll::uv_stream_t) -> StreamWatcher {
63 unsafe { uvll::set_data_for_uv_handle(stream, 0 as *int) }
64 StreamWatcher {
65 handle: stream,
66 last_write_req: None,
67 blocked_writer: None,
68 }
69 }
70
71 pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
72 // This read operation needs to get canceled on an unwind via libuv's
73 // uv_read_stop function
74 let _f = ForbidUnwind::new("stream read");
75
76 let mut rcx = ReadContext {
77 buf: Some(slice_to_uv_buf(buf)),
78 // if the read is canceled, we'll see eof, otherwise this will get
79 // overwritten
80 result: 0,
81 task: None,
82 };
83 // When reading a TTY stream on windows, libuv will invoke alloc_cb
84 // immediately as part of the call to alloc_cb. What this means is that
85 // we must be ready for this to happen (by setting the data in the uv
86 // handle). In theory this otherwise doesn't need to happen until after
87 // the read is succesfully started.
88 unsafe { uvll::set_data_for_uv_handle(self.handle, &rcx) }
89
90 // Send off the read request, but don't block until we're sure that the
91 // read request is queued.
92 let ret = match unsafe {
93 uvll::uv_read_start(self.handle, alloc_cb, read_cb)
94 } {
95 0 => {
96 let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
97 wait_until_woken_after(&mut rcx.task, &Loop::wrap(loop_), || {});
98 match rcx.result {
99 n if n < 0 => Err(UvError(n as c_int)),
100 n => Ok(n as uint),
101 }
102 }
103 n => Err(UvError(n))
104 };
105 // Make sure a read cancellation sees that there's no pending read
106 unsafe { uvll::set_data_for_uv_handle(self.handle, 0 as *int) }
107 return ret;
108 }
109
110 pub fn cancel_read(&mut self, reason: ssize_t) -> Option<BlockedTask> {
111 // When we invoke uv_read_stop, it cancels the read and alloc
112 // callbacks. We need to manually wake up a pending task (if one was
113 // present).
114 assert_eq!(unsafe { uvll::uv_read_stop(self.handle) }, 0);
115 let data = unsafe {
116 let data = uvll::get_data_for_uv_handle(self.handle);
117 if data.is_null() { return None }
118 uvll::set_data_for_uv_handle(self.handle, 0 as *int);
119 &mut *(data as *mut ReadContext)
120 };
121 data.result = reason;
122 data.task.take()
123 }
124
125 pub fn write(&mut self, buf: &[u8], may_timeout: bool) -> Result<(), UvError> {
126 // The ownership of the write request is dubious if this function
127 // unwinds. I believe that if the write_cb fails to re-schedule the task
128 // then the write request will be leaked.
129 let _f = ForbidUnwind::new("stream write");
130
131 // Prepare the write request, either using a cached one or allocating a
132 // new one
133 let mut req = match self.last_write_req.take() {
134 Some(req) => req, None => Request::new(uvll::UV_WRITE),
135 };
136 req.set_data(ptr::null::<()>());
137
138 // And here's where timeouts get a little interesting. Currently, libuv
139 // does not support canceling an in-flight write request. Consequently,
140 // when a write timeout expires, there's not much we can do other than
141 // detach the sleeping task from the write request itself. Semantically,
142 // this means that the write request will complete asynchronously, but
143 // the calling task will return error (because the write timed out).
144 //
145 // There is special wording in the documentation of set_write_timeout()
146 // indicating that this is a plausible failure scenario, and this
147 // function is why that wording exists.
148 //
149 // Implementation-wise, we must be careful when passing a buffer down to
150 // libuv. Most of this implementation avoids allocations becuase of the
151 // blocking guarantee (all stack local variables are valid for the
152 // entire read/write request). If our write request can be timed out,
153 // however, we must heap allocate the data and pass that to the libuv
154 // functions instead. The reason for this is that if we time out and
155 // return, there's no guarantee that `buf` is a valid buffer any more.
156 //
157 // To do this, the write context has an optionally owned vector of
158 // bytes.
159 let data = if may_timeout {Some(Vec::from_slice(buf))} else {None};
160 let uv_buf = if may_timeout {
161 slice_to_uv_buf(data.get_ref().as_slice())
162 } else {
163 slice_to_uv_buf(buf)
164 };
165
166 // Send off the request, but be careful to not block until we're sure
167 // that the write reqeust is queued. If the reqeust couldn't be queued,
168 // then we should return immediately with an error.
169 match unsafe {
170 uvll::uv_write(req.handle, self.handle, [uv_buf], write_cb)
171 } {
172 0 => {
173 let mut wcx = WriteContext {
174 result: uvll::ECANCELED,
175 stream: self as *mut _,
176 data: data,
177 };
178 req.defuse(); // uv callback now owns this request
179
180 let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
181 wait_until_woken_after(&mut self.blocked_writer,
182 &Loop::wrap(loop_), || {
183 req.set_data(&wcx);
184 });
185
186 if wcx.result != uvll::ECANCELED {
187 self.last_write_req = Some(Request::wrap(req.handle));
188 return match wcx.result {
189 0 => Ok(()),
190 n => Err(UvError(n)),
191 }
192 }
193
194 // This is the second case where canceling an in-flight write
195 // gets interesting. If we've been canceled (no one reset our
196 // result), then someone still needs to free the request, and
197 // someone still needs to free the allocate buffer.
198 //
199 // To take care of this, we swap out the stack-allocated write
200 // context for a heap-allocated context, transferring ownership
201 // of everything to the write_cb. Libuv guarantees that this
202 // callback will be invoked at some point, and the callback will
203 // be responsible for deallocating these resources.
204 //
205 // Note that we don't cache this write request back in the
206 // stream watcher because we no longer have ownership of it, and
207 // we never will.
208 let new_wcx = box WriteContext {
209 result: 0,
210 stream: 0 as *mut StreamWatcher,
211 data: wcx.data.take(),
212 };
213 unsafe {
214 req.set_data(&*new_wcx);
215 cast::forget(new_wcx);
216 }
217 Err(UvError(wcx.result))
218 }
219 n => Err(UvError(n)),
220 }
221 }
222
223 pub fn cancel_write(&mut self) -> Option<BlockedTask> {
224 self.blocked_writer.take()
225 }
226 }
227
228 // This allocation callback expects to be invoked once and only once. It will
229 // unwrap the buffer in the ReadContext stored in the stream and return it. This
230 // will fail if it is called more than once.
231 extern fn alloc_cb(stream: *uvll::uv_stream_t, _hint: size_t, buf: *mut Buf) {
232 uvdebug!("alloc_cb");
233 unsafe {
234 let rcx: &mut ReadContext =
235 cast::transmute(uvll::get_data_for_uv_handle(stream));
236 *buf = rcx.buf.take().expect("stream alloc_cb called more than once");
237 }
238 }
239
240 // When a stream has read some data, we will always forcibly stop reading and
241 // return all the data read (even if it didn't fill the whole buffer).
242 extern fn read_cb(handle: *uvll::uv_stream_t, nread: ssize_t, _buf: *Buf) {
243 uvdebug!("read_cb {}", nread);
244 assert!(nread != uvll::ECANCELED as ssize_t);
245 let rcx: &mut ReadContext = unsafe {
246 cast::transmute(uvll::get_data_for_uv_handle(handle))
247 };
248 // Stop reading so that no read callbacks are
249 // triggered before the user calls `read` again.
250 // FIXME: Is there a performance impact to calling
251 // stop here?
252 unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
253 rcx.result = nread;
254
255 wakeup(&mut rcx.task);
256 }
257
258 // Unlike reading, the WriteContext is stored in the uv_write_t request. Like
259 // reading, however, all this does is wake up the blocked task after squirreling
260 // away the error code as a result.
261 extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
262 let mut req = Request::wrap(req);
263 // Remember to not free the request because it is re-used between writes on
264 // the same stream.
265 let wcx: &mut WriteContext = unsafe { req.get_data() };
266 wcx.result = status;
267
268 // If the stream is present, we haven't timed out, otherwise we acquire
269 // ownership of everything and then deallocate it all at once.
270 if wcx.stream as uint != 0 {
271 req.defuse();
272 let stream: &mut StreamWatcher = unsafe { &mut *wcx.stream };
273 wakeup(&mut stream.blocked_writer);
274 } else {
275 let _wcx: Box<WriteContext> = unsafe { cast::transmute(wcx) };
276 }
277 }
librustuv/stream.rs:24:33-24:33 -struct- definition:
// on the appropriate scheduler.
pub struct StreamWatcher {
pub handle: *uvll::uv_stream_t,
references:- 1363: unsafe { uvll::set_data_for_uv_handle(stream, 0 as *int) }
64: StreamWatcher {
65: handle: stream,
--
271: req.defuse();
272: let stream: &mut StreamWatcher = unsafe { &mut *wcx.stream };
273: wakeup(&mut stream.blocked_writer);
librustuv/net.rs:
318: fn cancel_read(stream: uint) -> Option<BlockedTask> {
319: let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
320: stream.cancel_read(uvll::ECANCELED as ssize_t)
--
330: fn cancel_write(stream: uint) -> Option<BlockedTask> {
331: let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
332: stream.cancel_write()
librustuv/pipe.rs:
199: fn cancel_write(stream: uint) -> Option<BlockedTask> {
200: let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
201: stream.cancel_write()
librustuv/tty.rs:
23: tty: *uvll::uv_tty_t,
24: stream: StreamWatcher,
25: home: HomeHandle,
librustuv/stream.rs:36:1-36:1 -struct- definition:
struct ReadContext {
buf: Option<Buf>,
result: ssize_t,
references:- 4244: assert!(nread != uvll::ECANCELED as ssize_t);
245: let rcx: &mut ReadContext = unsafe {
246: cast::transmute(uvll::get_data_for_uv_handle(handle))
librustuv/stream.rs:42:1-42:1 -struct- definition:
struct WriteContext {
result: c_int,
stream: *mut StreamWatcher,
references:- 4207: // we never will.
208: let new_wcx = box WriteContext {
209: result: 0,
--
264: // the same stream.
265: let wcx: &mut WriteContext = unsafe { req.get_data() };
266: wcx.result = status;
--
274: } else {
275: let _wcx: Box<WriteContext> = unsafe { cast::transmute(wcx) };
276: }