(index<- )        ./librustuv/stream.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  use std::cast;
  12  use libc::{c_int, size_t, ssize_t};
  13  use std::ptr;
  14  use std::rt::task::BlockedTask;
  15  
  16  use Loop;
  17  use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
  18              ForbidUnwind, wakeup};
  19  use uvll;
  20  
  21  // This is a helper structure which is intended to get embedded into other
  22  // Watcher structures. This structure will retain a handle to the underlying
  23  // uv_stream_t instance, and all I/O operations assume that it's already located
  24  // on the appropriate scheduler.
  25  pub struct StreamWatcher {
  26      pub handle: *uvll::uv_stream_t,
  27  
  28      // Cache the last used uv_write_t so we don't have to allocate a new one on
  29      // every call to uv_write(). Ideally this would be a stack-allocated
  30      // structure, but currently we don't have mappings for all the structures
  31      // defined in libuv, so we're foced to malloc this.
  32      last_write_req: Option<Request>,
  33  
  34      blocked_writer: Option<BlockedTask>,
  35  }
  36  
  37  struct ReadContext {
  38      buf: Option<Buf>,
  39      result: ssize_t,
  40      task: Option<BlockedTask>,
  41  }
  42  
  43  struct WriteContext {
  44      result: c_int,
  45      stream: *mut StreamWatcher,
  46      data: Option<Vec<u8>>,
  47  }
  48  
  49  impl StreamWatcher {
  50      // Creates a new helper structure which should be then embedded into another
  51      // watcher. This provides the generic read/write methods on streams.
  52      //
  53      // This structure will *not* close the stream when it is dropped. It is up
  54      // to the enclosure structure to be sure to call the close method (which
  55      // will block the task). Note that this is also required to prevent memory
  56      // leaks.
  57      //
  58      // It should also be noted that the `data` field of the underlying uv handle
  59      // will be manipulated on each of the methods called on this watcher.
  60      // Wrappers should ensure to always reset the field to an appropriate value
  61      // if they rely on the field to perform an action.
  62      pub fn new(stream*uvll::uv_stream_t) -> StreamWatcher {
  63          unsafe { uvll::set_data_for_uv_handle(stream, 0 as *int) }
  64          StreamWatcher {
  65              handle: stream,
  66              last_write_req: None,
  67              blocked_writer: None,
  68          }
  69      }
  70  
  71      pub fn read(&mut self, buf&mut [u8]) -> Result<uint, UvError> {
  72          // This read operation needs to get canceled on an unwind via libuv's
  73          // uv_read_stop function
  74          let _f = ForbidUnwind::new("stream read");
  75  
  76          let mut rcx = ReadContext {
  77              buf: Some(slice_to_uv_buf(buf)),
  78              // if the read is canceled, we'll see eof, otherwise this will get
  79              // overwritten
  80              result: 0,
  81              task: None,
  82          };
  83          // When reading a TTY stream on windows, libuv will invoke alloc_cb
  84          // immediately as part of the call to alloc_cb. What this means is that
  85          // we must be ready for this to happen (by setting the data in the uv
  86          // handle). In theory this otherwise doesn't need to happen until after
  87          // the read is succesfully started.
  88          unsafe { uvll::set_data_for_uv_handle(self.handle, &rcx) }
  89  
  90          // Send off the read request, but don't block until we're sure that the
  91          // read request is queued.
  92          let ret = match unsafe {
  93              uvll::uv_read_start(self.handle, alloc_cb, read_cb)
  94          } {
  95              0 => {
  96                  let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
  97                  wait_until_woken_after(&mut rcx.task, &Loop::wrap(loop_), || {});
  98                  match rcx.result {
  99                      n if n < 0 => Err(UvError(n as c_int)),
 100                      n => Ok(n as uint),
 101                  }
 102              }
 103              n => Err(UvError(n))
 104          };
 105          // Make sure a read cancellation sees that there's no pending read
 106          unsafe { uvll::set_data_for_uv_handle(self.handle, 0 as *int) }
 107          return ret;
 108      }
 109  
 110      pub fn cancel_read(&mut self, reasonssize_t) -> Option<BlockedTask> {
 111          // When we invoke uv_read_stop, it cancels the read and alloc
 112          // callbacks. We need to manually wake up a pending task (if one was
 113          // present).
 114          assert_eq!(unsafe { uvll::uv_read_stop(self.handle) }, 0);
 115          let data = unsafe {
 116              let data = uvll::get_data_for_uv_handle(self.handle);
 117              if data.is_null() { return None }
 118              uvll::set_data_for_uv_handle(self.handle, 0 as *int);
 119              &mut *(data as *mut ReadContext)
 120          };
 121          data.result = reason;
 122          data.task.take()
 123      }
 124  
 125      pub fn write(&mut self, buf&[u8], may_timeoutbool) -> Result<(), UvError> {
 126          // The ownership of the write request is dubious if this function
 127          // unwinds. I believe that if the write_cb fails to re-schedule the task
 128          // then the write request will be leaked.
 129          let _f = ForbidUnwind::new("stream write");
 130  
 131          // Prepare the write request, either using a cached one or allocating a
 132          // new one
 133          let mut req = match self.last_write_req.take() {
 134              Some(req) => req, None => Request::new(uvll::UV_WRITE),
 135          };
 136          req.set_data(ptr::null::<()>());
 137  
 138          // And here's where timeouts get a little interesting. Currently, libuv
 139          // does not support canceling an in-flight write request. Consequently,
 140          // when a write timeout expires, there's not much we can do other than
 141          // detach the sleeping task from the write request itself. Semantically,
 142          // this means that the write request will complete asynchronously, but
 143          // the calling task will return error (because the write timed out).
 144          //
 145          // There is special wording in the documentation of set_write_timeout()
 146          // indicating that this is a plausible failure scenario, and this
 147          // function is why that wording exists.
 148          //
 149          // Implementation-wise, we must be careful when passing a buffer down to
 150          // libuv. Most of this implementation avoids allocations becuase of the
 151          // blocking guarantee (all stack local variables are valid for the
 152          // entire read/write request). If our write request can be timed out,
 153          // however, we must heap allocate the data and pass that to the libuv
 154          // functions instead. The reason for this is that if we time out and
 155          // return, there's no guarantee that `buf` is a valid buffer any more.
 156          //
 157          // To do this, the write context has an optionally owned vector of
 158          // bytes.
 159          let data = if may_timeout {Some(Vec::from_slice(buf))} else {None};
 160          let uv_buf = if may_timeout {
 161              slice_to_uv_buf(data.get_ref().as_slice())
 162          } else {
 163              slice_to_uv_buf(buf)
 164          };
 165  
 166          // Send off the request, but be careful to not block until we're sure
 167          // that the write reqeust is queued. If the reqeust couldn't be queued,
 168          // then we should return immediately with an error.
 169          match unsafe {
 170              uvll::uv_write(req.handle, self.handle, [uv_buf], write_cb)
 171          } {
 172              0 => {
 173                  let mut wcx = WriteContext {
 174                      result: uvll::ECANCELED,
 175                      stream: self as *mut _,
 176                      data: data,
 177                  };
 178                  req.defuse(); // uv callback now owns this request
 179  
 180                  let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
 181                  wait_until_woken_after(&mut self.blocked_writer,
 182                                         &Loop::wrap(loop_), || {
 183                      req.set_data(&wcx);
 184                  });
 185  
 186                  if wcx.result != uvll::ECANCELED {
 187                      self.last_write_req = Some(Request::wrap(req.handle));
 188                      return match wcx.result {
 189                          0 => Ok(()),
 190                          n => Err(UvError(n)),
 191                      }
 192                  }
 193  
 194                  // This is the second case where canceling an in-flight write
 195                  // gets interesting. If we've been canceled (no one reset our
 196                  // result), then someone still needs to free the request, and
 197                  // someone still needs to free the allocate buffer.
 198                  //
 199                  // To take care of this, we swap out the stack-allocated write
 200                  // context for a heap-allocated context, transferring ownership
 201                  // of everything to the write_cb. Libuv guarantees that this
 202                  // callback will be invoked at some point, and the callback will
 203                  // be responsible for deallocating these resources.
 204                  //
 205                  // Note that we don't cache this write request back in the
 206                  // stream watcher because we no longer have ownership of it, and
 207                  // we never will.
 208                  let new_wcx = box WriteContext {
 209                      result: 0,
 210                      stream: 0 as *mut StreamWatcher,
 211                      data: wcx.data.take(),
 212                  };
 213                  unsafe {
 214                      req.set_data(&*new_wcx);
 215                      cast::forget(new_wcx);
 216                  }
 217                  Err(UvError(wcx.result))
 218              }
 219              n => Err(UvError(n)),
 220          }
 221      }
 222  
 223      pub fn cancel_write(&mut self) -> Option<BlockedTask> {
 224          self.blocked_writer.take()
 225      }
 226  }
 227  
 228  // This allocation callback expects to be invoked once and only once. It will
 229  // unwrap the buffer in the ReadContext stored in the stream and return it. This
 230  // will fail if it is called more than once.
 231  extern fn alloc_cb(stream: *uvll::uv_stream_t, _hintsize_t, buf: *mut Buf) {
 232      uvdebug!("alloc_cb");
 233      unsafe {
 234          let rcx&mut ReadContext =
 235              cast::transmute(uvll::get_data_for_uv_handle(stream));
 236          *buf = rcx.buf.take().expect("stream alloc_cb called more than once");
 237      }
 238  }
 239  
 240  // When a stream has read some data, we will always forcibly stop reading and
 241  // return all the data read (even if it didn't fill the whole buffer).
 242  extern fn read_cb(handle: *uvll::uv_stream_t, nreadssize_t, _buf: *Buf) {
 243      uvdebug!("read_cb {}", nread);
 244      assert!(nread != uvll::ECANCELED as ssize_t);
 245      let rcx&mut ReadContext = unsafe {
 246          cast::transmute(uvll::get_data_for_uv_handle(handle))
 247      };
 248      // Stop reading so that no read callbacks are
 249      // triggered before the user calls `read` again.
 250      // FIXME: Is there a performance impact to calling
 251      // stop here?
 252      unsafe { assert_eq!(uvll::uv_read_stop(handle), 0); }
 253      rcx.result = nread;
 254  
 255      wakeup(&mut rcx.task);
 256  }
 257  
 258  // Unlike reading, the WriteContext is stored in the uv_write_t request. Like
 259  // reading, however, all this does is wake up the blocked task after squirreling
 260  // away the error code as a result.
 261  extern fn write_cb(req: *uvll::uv_write_t, statusc_int) {
 262      let mut req = Request::wrap(req);
 263      // Remember to not free the request because it is re-used between writes on
 264      // the same stream.
 265      let wcx&mut WriteContext = unsafe { req.get_data() };
 266      wcx.result = status;
 267  
 268      // If the stream is present, we haven't timed out, otherwise we acquire
 269      // ownership of everything and then deallocate it all at once.
 270      if wcx.stream as uint != 0 {
 271          req.defuse();
 272          let stream&mut StreamWatcher = unsafe { &mut *wcx.stream };
 273          wakeup(&mut stream.blocked_writer);
 274      } else {
 275          let _wcxBox<WriteContext> = unsafe { cast::transmute(wcx) };
 276      }
 277  }


librustuv/stream.rs:24:33-24:33 -struct- definition:
// on the appropriate scheduler.
pub struct StreamWatcher {
    pub handle: *uvll::uv_stream_t,
references:- 13
63:         unsafe { uvll::set_data_for_uv_handle(stream, 0 as *int) }
64:         StreamWatcher {
65:             handle: stream,
--
271:         req.defuse();
272:         let stream: &mut StreamWatcher = unsafe { &mut *wcx.stream };
273:         wakeup(&mut stream.blocked_writer);
librustuv/net.rs:
318:         fn cancel_read(stream: uint) -> Option<BlockedTask> {
319:             let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
320:             stream.cancel_read(uvll::ECANCELED as ssize_t)
--
330:         fn cancel_write(stream: uint) -> Option<BlockedTask> {
331:             let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
332:             stream.cancel_write()
librustuv/pipe.rs:
199:         fn cancel_write(stream: uint) -> Option<BlockedTask> {
200:             let stream: &mut StreamWatcher = unsafe { cast::transmute(stream) };
201:             stream.cancel_write()
librustuv/tty.rs:
23:     tty: *uvll::uv_tty_t,
24:     stream: StreamWatcher,
25:     home: HomeHandle,


librustuv/stream.rs:36:1-36:1 -struct- definition:
struct ReadContext {
    buf: Option<Buf>,
    result: ssize_t,
references:- 4
244:     assert!(nread != uvll::ECANCELED as ssize_t);
245:     let rcx: &mut ReadContext = unsafe {
246:         cast::transmute(uvll::get_data_for_uv_handle(handle))


librustuv/stream.rs:42:1-42:1 -struct- definition:
struct WriteContext {
    result: c_int,
    stream: *mut StreamWatcher,
references:- 4
207:                 // we never will.
208:                 let new_wcx = box WriteContext {
209:                     result: 0,
--
264:     // the same stream.
265:     let wcx: &mut WriteContext = unsafe { req.get_data() };
266:     wcx.result = status;
--
274:     } else {
275:         let _wcx: Box<WriteContext> = unsafe { cast::transmute(wcx) };
276:     }