1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use clone::Clone;
12 use cmp;
13 use container::Container;
14 use comm::{Sender, Receiver};
15 use io;
16 use option::{None, Option, Some};
17 use result::{Ok, Err};
18 use super::{Reader, Writer, IoResult};
19 use str::StrSlice;
20 use slice::{bytes, CloneableVector, MutableVector, ImmutableVector};
21
22 /// Allows reading from a rx.
23 ///
24 /// # Example
25 ///
26 /// ```
27 /// use std::io::ChanReader;
28 ///
29 /// let (tx, rx) = channel();
30 /// # drop(tx);
31 /// let mut reader = ChanReader::new(rx);
32 ///
33 /// let mut buf = ~[0u8, ..100];
34 /// match reader.read(buf) {
35 /// Ok(nread) => println!("Read {} bytes", nread),
36 /// Err(e) => println!("read error: {}", e),
37 /// }
38 /// ```
39 pub struct ChanReader {
40 buf: Option<~[u8]>, // A buffer of bytes received but not consumed.
41 pos: uint, // How many of the buffered bytes have already be consumed.
42 rx: Receiver<~[u8]>, // The rx to pull data from.
43 closed: bool, // Whether the pipe this rx connects to has been closed.
44 }
45
46 impl ChanReader {
47 /// Wraps a `Port` in a `ChanReader` structure
48 pub fn new(rx: Receiver<~[u8]>) -> ChanReader {
49 ChanReader {
50 buf: None,
51 pos: 0,
52 rx: rx,
53 closed: false,
54 }
55 }
56 }
57
58 impl Reader for ChanReader {
59 fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
60 let mut num_read = 0;
61 loop {
62 match self.buf {
63 Some(ref prev) => {
64 let dst = buf.mut_slice_from(num_read);
65 let src = prev.slice_from(self.pos);
66 let count = cmp::min(dst.len(), src.len());
67 bytes::copy_memory(dst, src.slice_to(count));
68 num_read += count;
69 self.pos += count;
70 },
71 None => (),
72 };
73 if num_read == buf.len() || self.closed {
74 break;
75 }
76 self.pos = 0;
77 self.buf = self.rx.recv_opt().ok();
78 self.closed = self.buf.is_none();
79 }
80 if self.closed && num_read == 0 {
81 Err(io::standard_error(io::EndOfFile))
82 } else {
83 Ok(num_read)
84 }
85 }
86 }
87
88 /// Allows writing to a tx.
89 ///
90 /// # Example
91 ///
92 /// ```
93 /// # #![allow(unused_must_use)]
94 /// use std::io::ChanWriter;
95 ///
96 /// let (tx, rx) = channel();
97 /// # drop(rx);
98 /// let mut writer = ChanWriter::new(tx);
99 /// writer.write("hello, world".as_bytes());
100 /// ```
101 pub struct ChanWriter {
102 tx: Sender<~[u8]>,
103 }
104
105 impl ChanWriter {
106 /// Wraps a channel in a `ChanWriter` structure
107 pub fn new(tx: Sender<~[u8]>) -> ChanWriter {
108 ChanWriter { tx: tx }
109 }
110 }
111
112 impl Clone for ChanWriter {
113 fn clone(&self) -> ChanWriter {
114 ChanWriter { tx: self.tx.clone() }
115 }
116 }
117
118 impl Writer for ChanWriter {
119 fn write(&mut self, buf: &[u8]) -> IoResult<()> {
120 self.tx.send_opt(buf.to_owned()).map_err(|_| {
121 io::IoError {
122 kind: io::BrokenPipe,
123 desc: "Pipe closed",
124 detail: None
125 }
126 })
127 }
128 }
129
130
131 #[cfg(test)]
132 mod test {
133 use prelude::*;
134 use super::*;
135 use io;
136 use task;
137
138 #[test]
139 fn test_rx_reader() {
140 let (tx, rx) = channel();
141 task::spawn(proc() {
142 tx.send(box [1u8, 2u8]);
143 tx.send(box []);
144 tx.send(box [3u8, 4u8]);
145 tx.send(box [5u8, 6u8]);
146 tx.send(box [7u8, 8u8]);
147 });
148
149 let mut reader = ChanReader::new(rx);
150 let mut buf = box [0u8, ..3];
151
152
153 assert_eq!(Ok(0), reader.read([]));
154
155 assert_eq!(Ok(3), reader.read(buf));
156 assert_eq!(box [1,2,3], buf);
157
158 assert_eq!(Ok(3), reader.read(buf));
159 assert_eq!(box [4,5,6], buf);
160
161 assert_eq!(Ok(2), reader.read(buf));
162 assert_eq!(box [7,8,6], buf);
163
164 match reader.read(buf) {
165 Ok(..) => fail!(),
166 Err(e) => assert_eq!(e.kind, io::EndOfFile),
167 }
168 assert_eq!(box [7,8,6], buf);
169
170 // Ensure it continues to fail in the same way.
171 match reader.read(buf) {
172 Ok(..) => fail!(),
173 Err(e) => assert_eq!(e.kind, io::EndOfFile),
174 }
175 assert_eq!(box [7,8,6], buf);
176 }
177
178 #[test]
179 fn test_chan_writer() {
180 let (tx, rx) = channel();
181 let mut writer = ChanWriter::new(tx);
182 writer.write_be_u32(42).unwrap();
183
184 let wanted = box [0u8, 0u8, 0u8, 42u8];
185 let got = task::try(proc() { rx.recv() }).unwrap();
186 assert_eq!(wanted, got);
187
188 match writer.write_u8(1) {
189 Ok(..) => fail!(),
190 Err(e) => assert_eq!(e.kind, io::BrokenPipe),
191 }
192 }
193 }