(index<- )        ./libstd/rt/io/buffered.rs

   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  //! Buffering wrappers for I/O traits
  12  //!
  13  //! It can be excessively inefficient to work directly with a `Reader` or
  14  //! `Writer`. Every call to `read` or `write` on `TcpStream` results in a
  15  //! system call, for example. This module provides structures that wrap
  16  //! `Readers`, `Writers`, and `Streams` and buffer input and output to them.
  17  //!
  18  //! # Examples
  19  //!
  20  //! ```
  21  //! let tcp_stream = TcpStream::connect(addr);
  22  //! let reader = BufferedReader::new(tcp_stream);
  23  //!
  24  //! let mut buf: ~[u8] = vec::from_elem(100, 0u8);
  25  //! match reader.read(buf.as_slice()) {
  26  //!     Some(nread) => println!("Read {} bytes", nread),
  27  //!     None => println!("At the end of the stream!")
  28  //! }
  29  //! ```
  30  //!
  31  //! ```
  32  //! let tcp_stream = TcpStream::connect(addr);
  33  //! let writer = BufferedWriter::new(tcp_stream);
  34  //!
  35  //! writer.write("hello, world".as_bytes());
  36  //! writer.flush();
  37  //! ```
  38  //!
  39  //! ```
  40  //! let tcp_stream = TcpStream::connect(addr);
  41  //! let stream = BufferedStream::new(tcp_stream);
  42  //!
  43  //! stream.write("hello, world".as_bytes());
  44  //! stream.flush();
  45  //!
  46  //! let mut buf = vec::from_elem(100, 0u8);
  47  //! match stream.read(buf.as_slice()) {
  48  //!     Some(nread) => println!("Read {} bytes", nread),
  49  //!     None => println!("At the end of the stream!")
  50  //! }
  51  //! ```
  52  //!
  53  
  54  use prelude::*;
  55  
  56  use num;
  57  use vec;
  58  use str;
  59  use super::{Reader, Writer, Stream, Decorator};
  60  
  61  // libuv recommends 64k buffers to maximize throughput
  62  // https://groups.google.com/forum/#!topic/libuv/oQO1HJAIDdA
  63  static DEFAULT_CAPACITY: uint = 64 * 1024;
  64  
  65  /// Wraps a Reader and buffers input from it
  66  pub struct BufferedReader<R> {
  67      priv inner: R,
  68      priv buf: ~[u8],
  69      priv pos: uint,
  70      priv cap: uint
  71  }
  72  
  73  impl<R: Reader> BufferedReader<R> {
  74      /// Creates a new `BufferedReader` with with the specified buffer capacity
  75      pub fn with_capacity(capuint, innerR) -> BufferedReader<R> {
  76          BufferedReader {
  77              inner: inner,
  78              buf: vec::from_elem(cap, 0u8),
  79              pos: 0,
  80              cap: 0
  81          }
  82      }
  83  
  84      /// Creates a new `BufferedReader` with a default buffer capacity
  85      pub fn new(innerR) -> BufferedReader<R> {
  86          BufferedReader::with_capacity(DEFAULT_CAPACITY, inner)
  87      }
  88  
  89      /// Reads the next line of input, interpreted as a sequence of utf-8
  90      /// encoded unicode codepoints. If a newline is encountered, then the
  91      /// newline is contained in the returned string.
  92      pub fn read_line(&mut self) -> Option<~str> {
  93          self.read_until('\n' as u8).map(str::from_utf8_owned)
  94      }
  95  
  96      /// Reads a sequence of bytes leading up to a specified delimeter. Once the
  97      /// specified byte is encountered, reading ceases and the bytes up to and
  98      /// including the delimiter are returned.
  99      pub fn read_until(&mut self, byteu8) -> Option<~[u8]> {
 100          let mut res = ~[];
 101          let mut used;
 102          loop {
 103              {
 104                  let available = self.fill_buffer();
 105                  match available.iter().position(|&b| b == byte) {
 106                      Some(i) => {
 107                          res.push_all(available.slice_to(i + 1));
 108                          used = i + 1;
 109                          break
 110                      }
 111                      None => {
 112                          res.push_all(available);
 113                          used = available.len();
 114                      }
 115                  }
 116              }
 117              if used == 0 {
 118                  break
 119              }
 120              self.pos += used;
 121          }
 122          self.pos += used;
 123          return if res.len() == 0 {None} else {Some(res)};
 124      }
 125  
 126      fn fill_buffer<'a>(&'a mut self) -> &'a [u8] {
 127          if self.pos == self.cap {
 128              match self.inner.read(self.buf) {
 129                  Some(cap) => {
 130                      self.pos = 0;
 131                      self.cap = cap;
 132                  }
 133                  None => {}
 134              }
 135          }
 136          return self.buf.slice(self.pos, self.cap);
 137      }
 138  }
 139  
 140  impl<R: Reader> Reader for BufferedReader<R> {
 141      fn read(&mut self, buf&mut [u8]) -> Option<uint> {
 142          let nread = {
 143              let available = self.fill_buffer();
 144              if available.len() == 0 {
 145                  return None;
 146              }
 147              let nread = num::min(available.len(), buf.len());
 148              vec::bytes::copy_memory(buf, available, nread);
 149              nread
 150          };
 151          self.pos += nread;
 152          Some(nread)
 153      }
 154  
 155      fn eof(&mut self) -> bool {
 156          self.pos == self.cap && self.inner.eof()
 157      }
 158  }
 159  
 160  impl<R: Reader> Decorator<R> for BufferedReader<R> {
 161      fn inner(self) -> R {
 162          self.inner
 163      }
 164  
 165      fn inner_ref<'a>(&'a self) -> &'a R {
 166          &self.inner
 167      }
 168  
 169      fn inner_mut_ref<'a>(&'a mut self) -> &'a mut R {
 170          &mut self.inner
 171      }
 172  }
 173  
 174  /// Wraps a Writer and buffers output to it
 175  ///
 176  /// Note that `BufferedWriter` will NOT flush its buffer when dropped.
 177  pub struct BufferedWriter<W> {
 178      priv inner: W,
 179      priv buf: ~[u8],
 180      priv pos: uint
 181  }
 182  
 183  impl<W: Writer> BufferedWriter<W> {
 184      /// Creates a new `BufferedWriter` with with the specified buffer capacity
 185      pub fn with_capacity(capuint, innerW) -> BufferedWriter<W> {
 186          BufferedWriter {
 187              inner: inner,
 188              buf: vec::from_elem(cap, 0u8),
 189              pos: 0
 190          }
 191      }
 192  
 193      /// Creates a new `BufferedWriter` with a default buffer capacity
 194      pub fn new(innerW) -> BufferedWriter<W> {
 195          BufferedWriter::with_capacity(DEFAULT_CAPACITY, inner)
 196      }
 197  }
 198  
 199  impl<W: Writer> Writer for BufferedWriter<W> {
 200      fn write(&mut self, buf&[u8]) {
 201          if self.pos + buf.len() > self.buf.len() {
 202              self.flush();
 203          }
 204  
 205          if buf.len() > self.buf.len() {
 206              self.inner.write(buf);
 207          } else {
 208              let dst = self.buf.mut_slice_from(self.pos);
 209              vec::bytes::copy_memory(dst, buf, buf.len());
 210              self.pos += buf.len();
 211          }
 212      }
 213  
 214      fn flush(&mut self) {
 215          if self.pos != 0 {
 216              self.inner.write(self.buf.slice_to(self.pos));
 217              self.pos = 0;
 218          }
 219          self.inner.flush();
 220      }
 221  }
 222  
 223  impl<W: Writer> Decorator<W> for BufferedWriter<W> {
 224      fn inner(self) -> W {
 225          self.inner
 226      }
 227  
 228      fn inner_ref<'a>(&'a self) -> &'a W {
 229          &self.inner
 230      }
 231  
 232      fn inner_mut_ref<'a>(&'a mut self) -> &'a mut W {
 233          &mut self.inner
 234      }
 235  }
 236  
 237  struct InternalBufferedWriter<W>(BufferedWriter<W>);
 238  
 239  impl<W: Reader> Reader for InternalBufferedWriter<W> {
 240      fn read(&mut self, buf&mut [u8]) -> Option<uint> {
 241          self.inner.read(buf)
 242      }
 243  
 244      fn eof(&mut self) -> bool {
 245          self.inner.eof()
 246      }
 247  }
 248  
 249  /// Wraps a Stream and buffers input and output to and from it
 250  ///
 251  /// Note that `BufferedStream` will NOT flush its output buffer when dropped.
 252  pub struct BufferedStream<S> {
 253      priv inner: BufferedReader<InternalBufferedWriter<S>>
 254  }
 255  
 256  impl<S: Stream> BufferedStream<S> {
 257      pub fn with_capacities(reader_capuint, writer_capuint, innerS)
 258                             -> BufferedStream<S> {
 259          let writer = BufferedWriter::with_capacity(writer_cap, inner);
 260          let internal_writer = InternalBufferedWriter(writer);
 261          let reader = BufferedReader::with_capacity(reader_cap,
 262                                                     internal_writer);
 263          BufferedStream { inner: reader }
 264      }
 265  
 266      pub fn new(innerS) -> BufferedStream<S> {
 267          BufferedStream::with_capacities(DEFAULT_CAPACITY, DEFAULT_CAPACITY,
 268                                          inner)
 269      }
 270  }
 271  
 272  impl<S: Stream> Reader for BufferedStream<S> {
 273      fn read(&mut self, buf&mut [u8]) -> Option<uint> {
 274          self.inner.read(buf)
 275      }
 276  
 277      fn eof(&mut self) -> bool {
 278          self.inner.eof()
 279      }
 280  }
 281  
 282  impl<S: Stream> Writer for BufferedStream<S> {
 283      fn write(&mut self, buf&[u8]) {
 284          self.inner.inner.write(buf)
 285      }
 286  
 287      fn flush(&mut self) {
 288          self.inner.inner.flush()
 289      }
 290  }
 291  
 292  impl<S: Stream> Decorator<S> for BufferedStream<S> {
 293      fn inner(self) -> S {
 294          self.inner.inner.inner()
 295      }
 296  
 297      fn inner_ref<'a>(&'a self) -> &'a S {
 298          self.inner.inner.inner_ref()
 299      }
 300  
 301      fn inner_mut_ref<'a>(&'a mut self) -> &'a mut S {
 302          self.inner.inner.inner_mut_ref()
 303      }
 304  }
 305  
 306  #[cfg(test)]
 307  mod test {
 308      use prelude::*;
 309      use super::*;
 310      use super::super::mem::{MemReader, MemWriter};
 311  
 312      #[test]
 313      fn test_buffered_reader() {
 314          let inner = MemReader::new(~[0, 1, 2, 3, 4]);
 315          let mut reader = BufferedReader::with_capacity(2, inner);
 316  
 317          let mut buf = [0, 0, 0];
 318          let nread = reader.read(buf);
 319          assert_eq!(Some(2), nread);
 320          assert_eq!([0, 1, 0], buf);
 321          assert!(!reader.eof());
 322  
 323          let mut buf = [0];
 324          let nread = reader.read(buf);
 325          assert_eq!(Some(1), nread);
 326          assert_eq!([2], buf);
 327          assert!(!reader.eof());
 328  
 329          let mut buf = [0, 0, 0];
 330          let nread = reader.read(buf);
 331          assert_eq!(Some(1), nread);
 332          assert_eq!([3, 0, 0], buf);
 333          assert!(!reader.eof());
 334  
 335          let nread = reader.read(buf);
 336          assert_eq!(Some(1), nread);
 337          assert_eq!([4, 0, 0], buf);
 338          assert!(reader.eof());
 339  
 340          assert_eq!(None, reader.read(buf));
 341      }
 342  
 343      #[test]
 344      fn test_buffered_writer() {
 345          let inner = MemWriter::new();
 346          let mut writer = BufferedWriter::with_capacity(2, inner);
 347  
 348          writer.write([0, 1]);
 349          assert_eq!([], writer.inner_ref().inner_ref().as_slice());
 350  
 351          writer.write([2]);
 352          assert_eq!([0, 1], writer.inner_ref().inner_ref().as_slice());
 353  
 354          writer.write([3]);
 355          assert_eq!([0, 1], writer.inner_ref().inner_ref().as_slice());
 356  
 357          writer.flush();
 358          assert_eq!([0, 1, 2, 3], writer.inner_ref().inner_ref().as_slice());
 359  
 360          writer.write([4]);
 361          writer.write([5]);
 362          assert_eq!([0, 1, 2, 3], writer.inner_ref().inner_ref().as_slice());
 363  
 364          writer.write([6]);
 365          assert_eq!([0, 1, 2, 3, 4, 5],
 366                     writer.inner_ref().inner_ref().as_slice());
 367  
 368          writer.write([7, 8]);
 369          assert_eq!([0, 1, 2, 3, 4, 5, 6],
 370                     writer.inner_ref().inner_ref().as_slice());
 371  
 372          writer.write([9, 10, 11]);
 373          assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
 374                     writer.inner_ref().inner_ref().as_slice());
 375  
 376          writer.flush();
 377          assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
 378                     writer.inner_ref().inner_ref().as_slice());
 379      }
 380  
 381      // This is just here to make sure that we don't infinite loop in the
 382      // newtype struct autoderef weirdness
 383      #[test]
 384      fn test_buffered_stream() {
 385          use rt;
 386          struct S;
 387  
 388          impl rt::io::Writer for S {
 389              fn write(&mut self, _: &[u8]) {}
 390              fn flush(&mut self) {}
 391          }
 392  
 393          impl rt::io::Reader for S {
 394              fn read(&mut self, _: &mut [u8]) -> Option<uint> { None }
 395              fn eof(&mut self) -> bool { true }
 396          }
 397  
 398          let mut stream = BufferedStream::new(S);
 399          let mut buf = [];
 400          stream.read(buf);
 401          stream.eof();
 402          stream.write(buf);
 403          stream.flush();
 404      }
 405  
 406      #[test]
 407      fn test_read_until() {
 408          let inner = MemReader::new(~[0, 1, 2, 1, 0]);
 409          let mut reader = BufferedReader::with_capacity(2, inner);
 410          assert_eq!(reader.read_until(0), Some(~[0]));
 411          assert_eq!(reader.read_until(2), Some(~[1, 2]));
 412          assert_eq!(reader.read_until(1), Some(~[1]));
 413          assert_eq!(reader.read_until(8), Some(~[0]));
 414          assert_eq!(reader.read_until(9), None);
 415      }
 416  }

libstd/rt/io/buffered.rs:176:71-176:71 -struct- definition:
/// Note that `BufferedWriter` will NOT flush its buffer when dropped.
pub struct BufferedWriter<W> {
references:-
223: impl<W: Writer> Decorator<W> for BufferedWriter<W> {
186:         BufferedWriter {
237: struct InternalBufferedWriter<W>(BufferedWriter<W>);
194:     pub fn new(inner: W) -> BufferedWriter<W> {
199: impl<W: Writer> Writer for BufferedWriter<W> {
185:     pub fn with_capacity(cap: uint, inner: W) -> BufferedWriter<W> {
183: impl<W: Writer> BufferedWriter<W> {


libstd/rt/io/buffered.rs:65:45-65:45 -struct- definition:
/// Wraps a Reader and buffers input from it
pub struct BufferedReader<R> {
references:-
85:     pub fn new(inner: R) -> BufferedReader<R> {
253:     priv inner: BufferedReader<InternalBufferedWriter<S>>
140: impl<R: Reader> Reader for BufferedReader<R> {
75:     pub fn with_capacity(cap: uint, inner: R) -> BufferedReader<R> {
73: impl<R: Reader> BufferedReader<R> {
160: impl<R: Reader> Decorator<R> for BufferedReader<R> {
76:         BufferedReader {


libstd/rt/io/buffered.rs:236:1-236:1 -struct- definition:

struct InternalBufferedWriter<W>(BufferedWriter<W>);
references:-
239: impl<W: Reader> Reader for InternalBufferedWriter<W> {
245:         self.inner.eof()
253:     priv inner: BufferedReader<InternalBufferedWriter<S>>
241:         self.inner.read(buf)


libstd/rt/io/buffered.rs:251:78-251:78 -struct- definition:
/// Note that `BufferedStream` will NOT flush its output buffer when dropped.
pub struct BufferedStream<S> {
references:-
272: impl<S: Stream> Reader for BufferedStream<S> {
256: impl<S: Stream> BufferedStream<S> {
266:     pub fn new(inner: S) -> BufferedStream<S> {
258:                            -> BufferedStream<S> {
263:         BufferedStream { inner: reader }
292: impl<S: Stream> Decorator<S> for BufferedStream<S> {
282: impl<S: Stream> Writer for BufferedStream<S> {