(index<- )        ./libstd/io/comm_adapters.rs

    git branch:    * master           5200215 auto merge of #14035 : alexcrichton/rust/experimental, r=huonw
    modified:    Fri May  9 13:02:28 2014
   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  use clone::Clone;
  12  use cmp;
  13  use container::Container;
  14  use comm::{Sender, Receiver};
  15  use io;
  16  use option::{None, Option, Some};
  17  use result::{Ok, Err};
  18  use super::{Reader, Writer, IoResult};
  19  use str::StrSlice;
  20  use slice::{bytes, CloneableVector, MutableVector, ImmutableVector};
  21  
  22  /// Allows reading from a rx.
  23  ///
  24  /// # Example
  25  ///
  26  /// ```
  27  /// use std::io::ChanReader;
  28  ///
  29  /// let (tx, rx) = channel();
  30  /// # drop(tx);
  31  /// let mut reader = ChanReader::new(rx);
  32  ///
  33  /// let mut buf = ~[0u8, ..100];
  34  /// match reader.read(buf) {
  35  ///     Ok(nread) => println!("Read {} bytes", nread),
  36  ///     Err(e) => println!("read error: {}", e),
  37  /// }
  38  /// ```
  39  pub struct ChanReader {
  40      buf: Option<~[u8]>,  // A buffer of bytes received but not consumed.
  41      pos: uint,           // How many of the buffered bytes have already be consumed.
  42      rx: Receiver<~[u8]>,   // The rx to pull data from.
  43      closed: bool,        // Whether the pipe this rx connects to has been closed.
  44  }
  45  
  46  impl ChanReader {
  47      /// Wraps a `Port` in a `ChanReader` structure
  48      pub fn new(rxReceiver<~[u8]>) -> ChanReader {
  49          ChanReader {
  50              buf: None,
  51              pos: 0,
  52              rx: rx,
  53              closed: false,
  54          }
  55      }
  56  }
  57  
  58  impl Reader for ChanReader {
  59      fn read(&mut self, buf&mut [u8]) -> IoResult<uint> {
  60          let mut num_read = 0;
  61          loop {
  62              match self.buf {
  63                  Some(ref prev) => {
  64                      let dst = buf.mut_slice_from(num_read);
  65                      let src = prev.slice_from(self.pos);
  66                      let count = cmp::min(dst.len(), src.len());
  67                      bytes::copy_memory(dst, src.slice_to(count));
  68                      num_read += count;
  69                      self.pos += count;
  70                  },
  71                  None => (),
  72              };
  73              if num_read == buf.len() || self.closed {
  74                  break;
  75              }
  76              self.pos = 0;
  77              self.buf = self.rx.recv_opt().ok();
  78              self.closed = self.buf.is_none();
  79          }
  80          if self.closed && num_read == 0 {
  81              Err(io::standard_error(io::EndOfFile))
  82          } else {
  83              Ok(num_read)
  84          }
  85      }
  86  }
  87  
  88  /// Allows writing to a tx.
  89  ///
  90  /// # Example
  91  ///
  92  /// ```
  93  /// # #![allow(unused_must_use)]
  94  /// use std::io::ChanWriter;
  95  ///
  96  /// let (tx, rx) = channel();
  97  /// # drop(rx);
  98  /// let mut writer = ChanWriter::new(tx);
  99  /// writer.write("hello, world".as_bytes());
 100  /// ```
 101  pub struct ChanWriter {
 102      tx: Sender<~[u8]>,
 103  }
 104  
 105  impl ChanWriter {
 106      /// Wraps a channel in a `ChanWriter` structure
 107      pub fn new(txSender<~[u8]>) -> ChanWriter {
 108          ChanWriter { tx: tx }
 109      }
 110  }
 111  
 112  impl Clone for ChanWriter {
 113      fn clone(&self) -> ChanWriter {
 114          ChanWriter { tx: self.tx.clone() }
 115      }
 116  }
 117  
 118  impl Writer for ChanWriter {
 119      fn write(&mut self, buf&[u8]) -> IoResult<()> {
 120          self.tx.send_opt(buf.to_owned()).map_err(|_| {
 121              io::IoError {
 122                  kind: io::BrokenPipe,
 123                  desc: "Pipe closed",
 124                  detail: None
 125              }
 126          })
 127      }
 128  }
 129  
 130  
 131  #[cfg(test)]
 132  mod test {
 133      use prelude::*;
 134      use super::*;
 135      use io;
 136      use task;
 137  
 138      #[test]
 139      fn test_rx_reader() {
 140          let (tx, rx) = channel();
 141          task::spawn(proc() {
 142            tx.send(box [1u8, 2u8]);
 143            tx.send(box []);
 144            tx.send(box [3u8, 4u8]);
 145            tx.send(box [5u8, 6u8]);
 146            tx.send(box [7u8, 8u8]);
 147          });
 148  
 149          let mut reader = ChanReader::new(rx);
 150          let mut buf = box [0u8, ..3];
 151  
 152  
 153          assert_eq!(Ok(0), reader.read([]));
 154  
 155          assert_eq!(Ok(3), reader.read(buf));
 156          assert_eq!(box [1,2,3], buf);
 157  
 158          assert_eq!(Ok(3), reader.read(buf));
 159          assert_eq!(box [4,5,6], buf);
 160  
 161          assert_eq!(Ok(2), reader.read(buf));
 162          assert_eq!(box [7,8,6], buf);
 163  
 164          match reader.read(buf) {
 165              Ok(..) => fail!(),
 166              Err(e) => assert_eq!(e.kind, io::EndOfFile),
 167          }
 168          assert_eq!(box [7,8,6], buf);
 169  
 170          // Ensure it continues to fail in the same way.
 171          match reader.read(buf) {
 172              Ok(..) => fail!(),
 173              Err(e) => assert_eq!(e.kind, io::EndOfFile),
 174          }
 175          assert_eq!(box [7,8,6], buf);
 176      }
 177  
 178      #[test]
 179      fn test_chan_writer() {
 180          let (tx, rx) = channel();
 181          let mut writer = ChanWriter::new(tx);
 182          writer.write_be_u32(42).unwrap();
 183  
 184          let wanted = box [0u8, 0u8, 0u8, 42u8];
 185          let got = task::try(proc() { rx.recv() }).unwrap();
 186          assert_eq!(wanted, got);
 187  
 188          match writer.write_u8(1) {
 189              Ok(..) => fail!(),
 190              Err(e) => assert_eq!(e.kind, io::BrokenPipe),
 191          }
 192      }
 193  }