(index<- ) ./libstd/rt/io/buffered.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Buffering wrappers for I/O traits
12 //!
13 //! It can be excessively inefficient to work directly with a `Reader` or
14 //! `Writer`. Every call to `read` or `write` on `TcpStream` results in a
15 //! system call, for example. This module provides structures that wrap
16 //! `Readers`, `Writers`, and `Streams` and buffer input and output to them.
17 //!
18 //! # Examples
19 //!
20 //! ```
21 //! let tcp_stream = TcpStream::connect(addr);
22 //! let reader = BufferedReader::new(tcp_stream);
23 //!
24 //! let mut buf: ~[u8] = vec::from_elem(100, 0u8);
25 //! match reader.read(buf.as_slice()) {
26 //! Some(nread) => println!("Read {} bytes", nread),
27 //! None => println!("At the end of the stream!")
28 //! }
29 //! ```
30 //!
31 //! ```
32 //! let tcp_stream = TcpStream::connect(addr);
33 //! let writer = BufferedWriter::new(tcp_stream);
34 //!
35 //! writer.write("hello, world".as_bytes());
36 //! writer.flush();
37 //! ```
38 //!
39 //! ```
40 //! let tcp_stream = TcpStream::connect(addr);
41 //! let stream = BufferedStream::new(tcp_stream);
42 //!
43 //! stream.write("hello, world".as_bytes());
44 //! stream.flush();
45 //!
46 //! let mut buf = vec::from_elem(100, 0u8);
47 //! match stream.read(buf.as_slice()) {
48 //! Some(nread) => println!("Read {} bytes", nread),
49 //! None => println!("At the end of the stream!")
50 //! }
51 //! ```
52 //!
53
54 use prelude::*;
55
56 use num;
57 use vec;
58 use str;
59 use super::{Reader, Writer, Stream, Decorator};
60
61 // libuv recommends 64k buffers to maximize throughput
62 // https://groups.google.com/forum/#!topic/libuv/oQO1HJAIDdA
63 static DEFAULT_CAPACITY: uint = 64 * 1024;
64
65 /// Wraps a Reader and buffers input from it
66 pub struct BufferedReader<R> {
67 priv inner: R,
68 priv buf: ~[u8],
69 priv pos: uint,
70 priv cap: uint
71 }
72
73 impl<R: Reader> BufferedReader<R> {
74 /// Creates a new `BufferedReader` with with the specified buffer capacity
75 pub fn with_capacity(cap: uint, inner: R) -> BufferedReader<R> {
76 BufferedReader {
77 inner: inner,
78 buf: vec::from_elem(cap, 0u8),
79 pos: 0,
80 cap: 0
81 }
82 }
83
84 /// Creates a new `BufferedReader` with a default buffer capacity
85 pub fn new(inner: R) -> BufferedReader<R> {
86 BufferedReader::with_capacity(DEFAULT_CAPACITY, inner)
87 }
88
89 /// Reads the next line of input, interpreted as a sequence of utf-8
90 /// encoded unicode codepoints. If a newline is encountered, then the
91 /// newline is contained in the returned string.
92 pub fn read_line(&mut self) -> Option<~str> {
93 self.read_until('\n' as u8).map(str::from_utf8_owned)
94 }
95
96 /// Reads a sequence of bytes leading up to a specified delimeter. Once the
97 /// specified byte is encountered, reading ceases and the bytes up to and
98 /// including the delimiter are returned.
99 pub fn read_until(&mut self, byte: u8) -> Option<~[u8]> {
100 let mut res = ~[];
101 let mut used;
102 loop {
103 {
104 let available = self.fill_buffer();
105 match available.iter().position(|&b| b == byte) {
106 Some(i) => {
107 res.push_all(available.slice_to(i + 1));
108 used = i + 1;
109 break
110 }
111 None => {
112 res.push_all(available);
113 used = available.len();
114 }
115 }
116 }
117 if used == 0 {
118 break
119 }
120 self.pos += used;
121 }
122 self.pos += used;
123 return if res.len() == 0 {None} else {Some(res)};
124 }
125
126 fn fill_buffer<'a>(&'a mut self) -> &'a [u8] {
127 if self.pos == self.cap {
128 match self.inner.read(self.buf) {
129 Some(cap) => {
130 self.pos = 0;
131 self.cap = cap;
132 }
133 None => {}
134 }
135 }
136 return self.buf.slice(self.pos, self.cap);
137 }
138 }
139
140 impl<R: Reader> Reader for BufferedReader<R> {
141 fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
142 let nread = {
143 let available = self.fill_buffer();
144 if available.len() == 0 {
145 return None;
146 }
147 let nread = num::min(available.len(), buf.len());
148 vec::bytes::copy_memory(buf, available, nread);
149 nread
150 };
151 self.pos += nread;
152 Some(nread)
153 }
154
155 fn eof(&mut self) -> bool {
156 self.pos == self.cap && self.inner.eof()
157 }
158 }
159
160 impl<R: Reader> Decorator<R> for BufferedReader<R> {
161 fn inner(self) -> R {
162 self.inner
163 }
164
165 fn inner_ref<'a>(&'a self) -> &'a R {
166 &self.inner
167 }
168
169 fn inner_mut_ref<'a>(&'a mut self) -> &'a mut R {
170 &mut self.inner
171 }
172 }
173
174 /// Wraps a Writer and buffers output to it
175 ///
176 /// Note that `BufferedWriter` will NOT flush its buffer when dropped.
177 pub struct BufferedWriter<W> {
178 priv inner: W,
179 priv buf: ~[u8],
180 priv pos: uint
181 }
182
183 impl<W: Writer> BufferedWriter<W> {
184 /// Creates a new `BufferedWriter` with with the specified buffer capacity
185 pub fn with_capacity(cap: uint, inner: W) -> BufferedWriter<W> {
186 BufferedWriter {
187 inner: inner,
188 buf: vec::from_elem(cap, 0u8),
189 pos: 0
190 }
191 }
192
193 /// Creates a new `BufferedWriter` with a default buffer capacity
194 pub fn new(inner: W) -> BufferedWriter<W> {
195 BufferedWriter::with_capacity(DEFAULT_CAPACITY, inner)
196 }
197 }
198
199 impl<W: Writer> Writer for BufferedWriter<W> {
200 fn write(&mut self, buf: &[u8]) {
201 if self.pos + buf.len() > self.buf.len() {
202 self.flush();
203 }
204
205 if buf.len() > self.buf.len() {
206 self.inner.write(buf);
207 } else {
208 let dst = self.buf.mut_slice_from(self.pos);
209 vec::bytes::copy_memory(dst, buf, buf.len());
210 self.pos += buf.len();
211 }
212 }
213
214 fn flush(&mut self) {
215 if self.pos != 0 {
216 self.inner.write(self.buf.slice_to(self.pos));
217 self.pos = 0;
218 }
219 self.inner.flush();
220 }
221 }
222
223 impl<W: Writer> Decorator<W> for BufferedWriter<W> {
224 fn inner(self) -> W {
225 self.inner
226 }
227
228 fn inner_ref<'a>(&'a self) -> &'a W {
229 &self.inner
230 }
231
232 fn inner_mut_ref<'a>(&'a mut self) -> &'a mut W {
233 &mut self.inner
234 }
235 }
236
237 struct InternalBufferedWriter<W>(BufferedWriter<W>);
238
239 impl<W: Reader> Reader for InternalBufferedWriter<W> {
240 fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
241 self.inner.read(buf)
242 }
243
244 fn eof(&mut self) -> bool {
245 self.inner.eof()
246 }
247 }
248
249 /// Wraps a Stream and buffers input and output to and from it
250 ///
251 /// Note that `BufferedStream` will NOT flush its output buffer when dropped.
252 pub struct BufferedStream<S> {
253 priv inner: BufferedReader<InternalBufferedWriter<S>>
254 }
255
256 impl<S: Stream> BufferedStream<S> {
257 pub fn with_capacities(reader_cap: uint, writer_cap: uint, inner: S)
258 -> BufferedStream<S> {
259 let writer = BufferedWriter::with_capacity(writer_cap, inner);
260 let internal_writer = InternalBufferedWriter(writer);
261 let reader = BufferedReader::with_capacity(reader_cap,
262 internal_writer);
263 BufferedStream { inner: reader }
264 }
265
266 pub fn new(inner: S) -> BufferedStream<S> {
267 BufferedStream::with_capacities(DEFAULT_CAPACITY, DEFAULT_CAPACITY,
268 inner)
269 }
270 }
271
272 impl<S: Stream> Reader for BufferedStream<S> {
273 fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
274 self.inner.read(buf)
275 }
276
277 fn eof(&mut self) -> bool {
278 self.inner.eof()
279 }
280 }
281
282 impl<S: Stream> Writer for BufferedStream<S> {
283 fn write(&mut self, buf: &[u8]) {
284 self.inner.inner.write(buf)
285 }
286
287 fn flush(&mut self) {
288 self.inner.inner.flush()
289 }
290 }
291
292 impl<S: Stream> Decorator<S> for BufferedStream<S> {
293 fn inner(self) -> S {
294 self.inner.inner.inner()
295 }
296
297 fn inner_ref<'a>(&'a self) -> &'a S {
298 self.inner.inner.inner_ref()
299 }
300
301 fn inner_mut_ref<'a>(&'a mut self) -> &'a mut S {
302 self.inner.inner.inner_mut_ref()
303 }
304 }
305
306 #[cfg(test)]
307 mod test {
308 use prelude::*;
309 use super::*;
310 use super::super::mem::{MemReader, MemWriter};
311
312 #[test]
313 fn test_buffered_reader() {
314 let inner = MemReader::new(~[0, 1, 2, 3, 4]);
315 let mut reader = BufferedReader::with_capacity(2, inner);
316
317 let mut buf = [0, 0, 0];
318 let nread = reader.read(buf);
319 assert_eq!(Some(2), nread);
320 assert_eq!([0, 1, 0], buf);
321 assert!(!reader.eof());
322
323 let mut buf = [0];
324 let nread = reader.read(buf);
325 assert_eq!(Some(1), nread);
326 assert_eq!([2], buf);
327 assert!(!reader.eof());
328
329 let mut buf = [0, 0, 0];
330 let nread = reader.read(buf);
331 assert_eq!(Some(1), nread);
332 assert_eq!([3, 0, 0], buf);
333 assert!(!reader.eof());
334
335 let nread = reader.read(buf);
336 assert_eq!(Some(1), nread);
337 assert_eq!([4, 0, 0], buf);
338 assert!(reader.eof());
339
340 assert_eq!(None, reader.read(buf));
341 }
342
343 #[test]
344 fn test_buffered_writer() {
345 let inner = MemWriter::new();
346 let mut writer = BufferedWriter::with_capacity(2, inner);
347
348 writer.write([0, 1]);
349 assert_eq!([], writer.inner_ref().inner_ref().as_slice());
350
351 writer.write([2]);
352 assert_eq!([0, 1], writer.inner_ref().inner_ref().as_slice());
353
354 writer.write([3]);
355 assert_eq!([0, 1], writer.inner_ref().inner_ref().as_slice());
356
357 writer.flush();
358 assert_eq!([0, 1, 2, 3], writer.inner_ref().inner_ref().as_slice());
359
360 writer.write([4]);
361 writer.write([5]);
362 assert_eq!([0, 1, 2, 3], writer.inner_ref().inner_ref().as_slice());
363
364 writer.write([6]);
365 assert_eq!([0, 1, 2, 3, 4, 5],
366 writer.inner_ref().inner_ref().as_slice());
367
368 writer.write([7, 8]);
369 assert_eq!([0, 1, 2, 3, 4, 5, 6],
370 writer.inner_ref().inner_ref().as_slice());
371
372 writer.write([9, 10, 11]);
373 assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
374 writer.inner_ref().inner_ref().as_slice());
375
376 writer.flush();
377 assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
378 writer.inner_ref().inner_ref().as_slice());
379 }
380
381 // This is just here to make sure that we don't infinite loop in the
382 // newtype struct autoderef weirdness
383 #[test]
384 fn test_buffered_stream() {
385 use rt;
386 struct S;
387
388 impl rt::io::Writer for S {
389 fn write(&mut self, _: &[u8]) {}
390 fn flush(&mut self) {}
391 }
392
393 impl rt::io::Reader for S {
394 fn read(&mut self, _: &mut [u8]) -> Option<uint> { None }
395 fn eof(&mut self) -> bool { true }
396 }
397
398 let mut stream = BufferedStream::new(S);
399 let mut buf = [];
400 stream.read(buf);
401 stream.eof();
402 stream.write(buf);
403 stream.flush();
404 }
405
406 #[test]
407 fn test_read_until() {
408 let inner = MemReader::new(~[0, 1, 2, 1, 0]);
409 let mut reader = BufferedReader::with_capacity(2, inner);
410 assert_eq!(reader.read_until(0), Some(~[0]));
411 assert_eq!(reader.read_until(2), Some(~[1, 2]));
412 assert_eq!(reader.read_until(1), Some(~[1]));
413 assert_eq!(reader.read_until(8), Some(~[0]));
414 assert_eq!(reader.read_until(9), None);
415 }
416 }