(index<- )        ./libextra/flatpipes.rs

   1  // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
   2  // file at the top-level directory of this distribution and at
   3  // http://rust-lang.org/COPYRIGHT.
   4  //
   5  // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
   6  // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
   7  // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
   8  // option. This file may not be copied, modified, or distributed
   9  // except according to those terms.
  10  
  11  /*!
  12  
  13  Generic communication channels for things that can be represented as,
  14  or transformed to and from, byte vectors.
  15  
  16  The `FlatPort` and `FlatChan` types implement the generic channel and
  17  port interface for arbitrary types and transport strategies. It can
  18  particularly be used to send and receive serializable types over I/O
  19  streams.
  20  
  21  `FlatPort` and `FlatChan` implement the same comm traits as pipe-based
  22  ports and channels.
  23  
  24  # Example
  25  
  26  This example sends boxed integers across tasks using serialization.
  27  
  28  ~~~ {.rust}
  29  let (port, chan) = serial::pipe_stream();
  30  
  31  do task::spawn || {
  32      for i in range(0, 10) {
  33          chan.send(@i)
  34      }
  35  }
  36  
  37  for i in range(0, 10) {
  38      assert @i == port.recv()
  39  }
  40  ~~~
  41  
  42  # Safety Note
  43  
  44  Flat pipes created from `io::Reader`s and `io::Writer`s share the same
  45  blocking properties as the underlying stream. Since some implementations
  46  block the scheduler thread, so will their pipes.
  47  
  48  */
  49  
  50  #[allow(missing_doc)];
  51  
  52  
  53  // The basic send/recv interface FlatChan and PortChan will implement
  54  use std::io;
  55  use std::comm::GenericChan;
  56  use std::comm::GenericPort;
  57  use std::sys::size_of;
  58  
  59  /**
  60  A FlatPort, consisting of a `BytePort` that receives byte vectors,
  61  and an `Unflattener` that converts the bytes to a value.
  62  
  63  Create using the constructors in the `serial` and `pod` modules.
  64  */
  65  pub struct FlatPort<T, U, P> {
  66      unflattener: U,
  67      byte_port: P
  68  }
  69  
  70  /**
  71  A FlatChan, consisting of a `Flattener` that converts values to
  72  byte vectors, and a `ByteChan` that transmits the bytes.
  73  
  74  Create using the constructors in the `serial` and `pod` modules.
  75  */
  76  pub struct FlatChan<T, F, C> {
  77      flattener: F,
  78      byte_chan: C
  79  }
  80  
  81  /**
  82  Constructors for flat pipes that using serialization-based flattening.
  83  */
  84  pub mod serial {
  85      pub use DefaultEncoder = ebml::writer::Encoder;
  86      pub use DefaultDecoder = ebml::reader::Decoder;
  87  
  88      use serialize::{Decodable, Encodable};
  89      use flatpipes::flatteners::{DeserializingUnflattener,
  90                                  SerializingFlattener};
  91      use flatpipes::flatteners::{deserialize_buffer, serialize_value};
  92      use flatpipes::bytepipes::{ReaderBytePort, WriterByteChan};
  93      use flatpipes::bytepipes::{PipeBytePort, PipeByteChan};
  94      use flatpipes::{FlatPort, FlatChan};
  95  
  96      use std::io::{Reader, Writer};
  97      use std::comm::{Port, Chan};
  98      use std::comm;
  99  
 100      pub type ReaderPort<T, R> = FlatPort<
 101          T, DeserializingUnflattener<DefaultDecoder, T>,
 102          ReaderBytePort<R>>;
 103      pub type WriterChan<T, W> = FlatChan<
 104          T, SerializingFlattener<DefaultEncoder, T>, WriterByteChan<W>>;
 105      pub type PipePort<T> = FlatPort<
 106          T, DeserializingUnflattener<DefaultDecoder, T>, PipeBytePort>;
 107      pub type PipeChan<T> = FlatChan<
 108          T, SerializingFlattener<DefaultEncoder, T>, PipeByteChan>;
 109  
 110      /// Create a `FlatPort` from a `Reader`
 111      pub fn reader_port<T: Decodable<DefaultDecoder>,
 112                         R: Reader>(readerR) -> ReaderPort<T, R> {
 113          let unflatDeserializingUnflattener<DefaultDecoder, T> =
 114              DeserializingUnflattener::new(
 115                  deserialize_buffer::<DefaultDecoder, T>);
 116          let byte_port = ReaderBytePort::new(reader);
 117          FlatPort::new(unflat, byte_port)
 118      }
 119  
 120      /// Create a `FlatChan` from a `Writer`
 121      pub fn writer_chan<T: Encodable<DefaultEncoder>,
 122                         W: Writer>(writerW) -> WriterChan<T, W> {
 123          let flatSerializingFlattener<DefaultEncoder, T> =
 124              SerializingFlattener::new(
 125                  serialize_value::<DefaultEncoder, T>);
 126          let byte_chan = WriterByteChan::new(writer);
 127          FlatChan::new(flat, byte_chan)
 128      }
 129  
 130      /// Create a `FlatPort` from a `Port<~[u8]>`
 131      pub fn pipe_port<T:Decodable<DefaultDecoder>>(
 132          portPort<~[u8]>
 133      ) -> PipePort<T> {
 134          let unflatDeserializingUnflattener<DefaultDecoder, T> =
 135              DeserializingUnflattener::new(
 136                  deserialize_buffer::<DefaultDecoder, T>);
 137          let byte_port = PipeBytePort::new(port);
 138          FlatPort::new(unflat, byte_port)
 139      }
 140  
 141      /// Create a `FlatChan` from a `Chan<~[u8]>`
 142      pub fn pipe_chan<T:Encodable<DefaultEncoder>>(
 143          chanChan<~[u8]>
 144      ) -> PipeChan<T> {
 145          let flatSerializingFlattener<DefaultEncoder, T> =
 146              SerializingFlattener::new(
 147                  serialize_value::<DefaultEncoder, T>);
 148          let byte_chan = PipeByteChan::new(chan);
 149          FlatChan::new(flat, byte_chan)
 150      }
 151  
 152      /// Create a pair of `FlatChan` and `FlatPort`, backed by pipes
 153      pub fn pipe_stream<T: Encodable<DefaultEncoder> +
 154                            Decodable<DefaultDecoder>>(
 155                            ) -> (PipePort<T>, PipeChan<T>) {
 156          let (port, chan) = comm::stream();
 157          return (pipe_port(port), pipe_chan(chan));
 158      }
 159  }
 160  
 161  // FIXME #4074 this doesn't correctly enforce POD bounds
 162  /**
 163  Constructors for flat pipes that send POD types using memcpy.
 164  
 165  # Safety Note
 166  
 167  This module is currently unsafe because it uses `Clone + Send` as a type
 168  parameter bounds meaning POD (plain old data), but `Clone + Send` and
 169  POD are not equivalent.
 170  
 171  */
 172  pub mod pod {
 173  
 174      use flatpipes::flatteners::{PodUnflattener, PodFlattener};
 175      use flatpipes::bytepipes::{ReaderBytePort, WriterByteChan};
 176      use flatpipes::bytepipes::{PipeBytePort, PipeByteChan};
 177      use flatpipes::{FlatPort, FlatChan};
 178  
 179      use std::io::{Reader, Writer};
 180      use std::comm::{Port, Chan};
 181      use std::comm;
 182  
 183      pub type ReaderPort<T, R> =
 184          FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>;
 185      pub type WriterChan<T, W> =
 186          FlatChan<T, PodFlattener<T>, WriterByteChan<W>>;
 187      pub type PipePort<T> = FlatPort<T, PodUnflattener<T>, PipeBytePort>;
 188      pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;
 189  
 190      /// Create a `FlatPort` from a `Reader`
 191      pub fn reader_port<T:Clone + Send,R:Reader>(
 192          readerR
 193      ) -> ReaderPort<T, R> {
 194          let unflatPodUnflattener<T> = PodUnflattener::new();
 195          let byte_port = ReaderBytePort::new(reader);
 196          FlatPort::new(unflat, byte_port)
 197      }
 198  
 199      /// Create a `FlatChan` from a `Writer`
 200      pub fn writer_chan<T:Clone + Send,W:Writer>(
 201          writerW
 202      ) -> WriterChan<T, W> {
 203          let flatPodFlattener<T> = PodFlattener::new();
 204          let byte_chan = WriterByteChan::new(writer);
 205          FlatChan::new(flat, byte_chan)
 206      }
 207  
 208      /// Create a `FlatPort` from a `Port<~[u8]>`
 209      pub fn pipe_port<T:Clone + Send>(portPort<~[u8]>) -> PipePort<T> {
 210          let unflatPodUnflattener<T> = PodUnflattener::new();
 211          let byte_port = PipeBytePort::new(port);
 212          FlatPort::new(unflat, byte_port)
 213      }
 214  
 215      /// Create a `FlatChan` from a `Chan<~[u8]>`
 216      pub fn pipe_chan<T:Clone + Send>(chanChan<~[u8]>) -> PipeChan<T> {
 217          let flatPodFlattener<T> = PodFlattener::new();
 218          let byte_chan = PipeByteChan::new(chan);
 219          FlatChan::new(flat, byte_chan)
 220      }
 221  
 222      /// Create a pair of `FlatChan` and `FlatPort`, backed by pipes
 223      pub fn pipe_stream<T:Clone + Send>() -> (PipePort<T>, PipeChan<T>) {
 224          let (port, chan) = comm::stream();
 225          return (pipe_port(port), pipe_chan(chan));
 226      }
 227  
 228  }
 229  
 230  /**
 231  Flatteners present a value as a byte vector
 232  */
 233  pub trait Flattener<T> {
 234      fn flatten(&self, val: T) -> ~[u8];
 235  }
 236  
 237  /**
 238  Unflatteners convert a byte vector to a value
 239  */
 240  pub trait Unflattener<T> {
 241      fn unflatten(&self, buf: ~[u8]) -> T;
 242  }
 243  
 244  /**
 245  BytePorts are a simple interface for receiving a specified number
 246  */
 247  pub trait BytePort {
 248      fn try_recv(&self, count: uint) -> Option<~[u8]>;
 249  }
 250  
 251  /**
 252  ByteChans are a simple interface for sending bytes
 253  */
 254  pub trait ByteChan {
 255      fn send(&self, val: ~[u8]);
 256  }
 257  
 258  static CONTINUE: [u8, ..4] = [0xAA, 0xBB, 0xCC, 0xDD];
 259  
 260  impl<T,U:Unflattener<T>,P:BytePort> GenericPort<T> for FlatPort<T, U, P> {
 261      fn recv(&self) -> T {
 262          match self.try_recv() {
 263              Some(val) => val,
 264              None => fail!("port is closed")
 265          }
 266      }
 267      fn try_recv(&self) -> Option<T> {
 268          let command = match self.byte_port.try_recv(CONTINUE.len()) {
 269              Some(c) => c,
 270              None => {
 271                  warn!("flatpipe: broken pipe");
 272                  return None;
 273              }
 274          };
 275  
 276          if CONTINUE.as_slice() == command {
 277              let msg_len = match self.byte_port.try_recv(size_of::<u64>()) {
 278                  Some(bytes) => {
 279                      io::u64_from_be_bytes(bytes, 0, size_of::<u64>())
 280                  },
 281                  None => {
 282                      warn!("flatpipe: broken pipe");
 283                      return None;
 284                  }
 285              };
 286  
 287              let msg_len = msg_len as uint;
 288  
 289              match self.byte_port.try_recv(msg_len) {
 290                  Some(bytes) => {
 291                      Some(self.unflattener.unflatten(bytes))
 292                  }
 293                  None => {
 294                      warn!("flatpipe: broken pipe");
 295                      return None;
 296                  }
 297              }
 298          }
 299          else {
 300              fail!("flatpipe: unrecognized command");
 301          }
 302      }
 303  }
 304  
 305  impl<T,F:Flattener<T>,C:ByteChan> GenericChan<T> for FlatChan<T, F, C> {
 306      fn send(&self, valT) {
 307          self.byte_chan.send(CONTINUE.to_owned());
 308          let bytes = self.flattener.flatten(val);
 309          let len = bytes.len() as u64;
 310          do io::u64_to_be_bytes(len, size_of::<u64>()) |len_bytes| {
 311              self.byte_chan.send(len_bytes.to_owned());
 312          }
 313          self.byte_chan.send(bytes);
 314      }
 315  }
 316  
 317  impl<T,U:Unflattener<T>,P:BytePort> FlatPort<T, U, P> {
 318      pub fn new(uU, pP) -> FlatPort<T, U, P> {
 319          FlatPort {
 320              unflattener: u,
 321              byte_port: p
 322          }
 323      }
 324  }
 325  
 326  impl<T,F:Flattener<T>,C:ByteChan> FlatChan<T, F, C> {
 327      pub fn new(fF, cC) -> FlatChan<T, F, C> {
 328          FlatChan {
 329              flattener: f,
 330              byte_chan: c
 331          }
 332      }
 333  }
 334  
 335  
 336  pub mod flatteners {
 337  
 338      use ebml;
 339      use flatpipes::{Flattener, Unflattener};
 340      use io_util::BufReader;
 341      use json;
 342      use serialize::{Encoder, Decoder, Encodable, Decodable};
 343  
 344      use std::cast;
 345      use std::io::{Writer, Reader, ReaderUtil};
 346      use std::io;
 347      use std::ptr;
 348      use std::sys::size_of;
 349      use std::vec;
 350  
 351      // FIXME #4074: Clone + Send != POD
 352      pub struct PodUnflattener<T> {
 353          bogus: ()
 354      }
 355  
 356      pub struct PodFlattener<T> {
 357          bogus: ()
 358      }
 359  
 360      impl<T:Clone + Send> Unflattener<T> for PodUnflattener<T> {
 361          fn unflatten(&self, buf~[u8]) -> T {
 362              assert!(size_of::<T>() != 0);
 363              assert_eq!(size_of::<T>(), buf.len());
 364              let addr_of_init&u8 = unsafe { &*vec::raw::to_ptr(buf) };
 365              let addr_of_value&T = unsafe { cast::transmute(addr_of_init) };
 366              (*addr_of_value).clone()
 367          }
 368      }
 369  
 370      impl<T:Clone + Send> Flattener<T> for PodFlattener<T> {
 371          fn flatten(&self, valT) -> ~[u8] {
 372              assert!(size_of::<T>() != 0);
 373              let val*T = ptr::to_unsafe_ptr(&val);
 374              let byte_value = val as *u8;
 375              unsafe { vec::from_buf(byte_value, size_of::<T>()) }
 376          }
 377      }
 378  
 379      impl<T:Clone + Send> PodUnflattener<T> {
 380          pub fn new() -> PodUnflattener<T> {
 381              PodUnflattener {
 382                  bogus: ()
 383              }
 384          }
 385      }
 386  
 387      impl<T:Clone + Send> PodFlattener<T> {
 388          pub fn new() -> PodFlattener<T> {
 389              PodFlattener {
 390                  bogus: ()
 391              }
 392          }
 393      }
 394  
 395  
 396      pub type DeserializeBuffer<T> = ~fn(buf: &[u8]) -> T;
 397  
 398      pub struct DeserializingUnflattener<D, T> {
 399          deserialize_buffer: DeserializeBuffer<T>
 400      }
 401  
 402      pub type SerializeValue<T> = ~fn(val: &T) -> ~[u8];
 403  
 404      pub struct SerializingFlattener<S, T> {
 405          serialize_value: SerializeValue<T>
 406      }
 407  
 408      impl<D:Decoder,T:Decodable<D>> Unflattener<T>
 409              for DeserializingUnflattener<D, T> {
 410          fn unflatten(&self, buf~[u8]) -> T {
 411              (self.deserialize_buffer)(buf)
 412          }
 413      }
 414  
 415      impl<S:Encoder,T:Encodable<S>> Flattener<T>
 416              for SerializingFlattener<S, T> {
 417          fn flatten(&self, valT) -> ~[u8] {
 418              (self.serialize_value)(&val)
 419          }
 420      }
 421  
 422      impl<D:Decoder,T:Decodable<D>> DeserializingUnflattener<D, T> {
 423          pub fn new(deserialize_bufferDeserializeBuffer<T>)
 424                     -> DeserializingUnflattener<D, T> {
 425              DeserializingUnflattener {
 426                  deserialize_buffer: deserialize_buffer
 427              }
 428          }
 429      }
 430  
 431      impl<S:Encoder,T:Encodable<S>> SerializingFlattener<S, T> {
 432          pub fn new(serialize_valueSerializeValue<T>)
 433                     -> SerializingFlattener<S, T> {
 434              SerializingFlattener {
 435                  serialize_value: serialize_value
 436              }
 437          }
 438      }
 439  
 440      /*
 441      Implementations of the serialization functions required by
 442      SerializingFlattener
 443      */
 444  
 445      pub fn deserialize_buffer<D: Decoder + FromReader,
 446                                T: Decodable<D>>(
 447                                buf&[u8])
 448                                -> T {
 449          let buf = buf.to_owned();
 450          let buf_reader = @BufReader::new(buf);
 451          let reader = buf_reader as @Reader;
 452          let mut deserD = FromReader::from_reader(reader);
 453          Decodable::decode(&mut deser)
 454      }
 455  
 456      pub fn serialize_value<D: Encoder + FromWriter,
 457                             T: Encodable<D>>(
 458                             val&T)
 459                             -> ~[u8] {
 460          do io::with_bytes_writer |writer| {
 461              let mut ser = FromWriter::from_writer(writer);
 462              val.encode(&mut ser);
 463          }
 464      }
 465  
 466      pub trait FromReader {
 467          fn from_reader(r: @Reader) -> Self;
 468      }
 469  
 470      pub trait FromWriter {
 471          fn from_writer(w: @Writer) -> Self;
 472      }
 473  
 474      impl FromReader for json::Decoder {
 475          fn from_reader(r@Reader) -> json::Decoder {
 476              match json::from_reader(r) {
 477                  Ok(json) => {
 478                      json::Decoder(json)
 479                  }
 480                  Err(e) => fail!("flatpipe: can't parse json: %?", e)
 481              }
 482          }
 483      }
 484  
 485      impl FromWriter for json::Encoder {
 486          fn from_writer(w@Writer) -> json::Encoder {
 487              json::Encoder(w)
 488          }
 489      }
 490  
 491      impl FromReader for ebml::reader::Decoder {
 492          fn from_reader(r@Reader) -> ebml::reader::Decoder {
 493              let buf = @r.read_whole_stream();
 494              let doc = ebml::reader::Doc(buf);
 495              ebml::reader::Decoder(doc)
 496          }
 497      }
 498  
 499      impl FromWriter for ebml::writer::Encoder {
 500          fn from_writer(w@Writer) -> ebml::writer::Encoder {
 501              ebml::writer::Encoder(w)
 502          }
 503      }
 504  
 505  }
 506  
 507  pub mod bytepipes {
 508  
 509      use flatpipes::{ByteChan, BytePort};
 510  
 511      use std::comm::{Port, Chan};
 512      use std::comm;
 513      use std::io::{Writer, Reader, ReaderUtil};
 514  
 515      pub struct ReaderBytePort<R> {
 516          reader: R
 517      }
 518  
 519      pub struct WriterByteChan<W> {
 520          writer: W
 521      }
 522  
 523      impl<R:Reader> BytePort for ReaderBytePort<R> {
 524          fn try_recv(&self, countuint) -> Option<~[u8]> {
 525              let mut left = count;
 526              let mut bytes = ~[];
 527              while !self.reader.eof() && left > 0 {
 528                  assert!(left <= count);
 529                  assert!(left > 0);
 530                  let new_bytes = self.reader.read_bytes(left);
 531                  bytes.push_all(new_bytes);
 532                  assert!(new_bytes.len() <= left);
 533                  left -= new_bytes.len();
 534              }
 535  
 536              if left == 0 {
 537                  return Some(bytes);
 538              } else {
 539                  warn!("flatpipe: dropped %? broken bytes", left);
 540                  return None;
 541              }
 542          }
 543      }
 544  
 545      impl<W:Writer> ByteChan for WriterByteChan<W> {
 546          fn send(&self, val~[u8]) {
 547              self.writer.write(val);
 548          }
 549      }
 550  
 551      impl<R:Reader> ReaderBytePort<R> {
 552          pub fn new(rR) -> ReaderBytePort<R> {
 553              ReaderBytePort {
 554                  reader: r
 555              }
 556          }
 557      }
 558  
 559      impl<W:Writer> WriterByteChan<W> {
 560          pub fn new(wW) -> WriterByteChan<W> {
 561              WriterByteChan {
 562                  writer: w
 563              }
 564          }
 565      }
 566  
 567      // FIXME #6850: Remove `@mut` when this module is ported to the new I/O traits,
 568      // which use `&mut self` properly. (For example, util::comm::GenericPort's try_recv
 569      // method doesn't use `&mut self`, so the `try_recv` method in the impl of `BytePort`
 570      // for `PipeBytePort` can't have `&mut self` either.)
 571      pub struct PipeBytePort {
 572          port: comm::Port<~[u8]>,
 573          buf: @mut ~[u8]
 574      }
 575  
 576      pub struct PipeByteChan {
 577          chan: comm::Chan<~[u8]>
 578      }
 579  
 580      impl BytePort for PipeBytePort {
 581          fn try_recv(&self, countuint) -> Option<~[u8]> {
 582              if self.buf.len() >= count {
 583                  let mut bytes = ::std::util::replace(&mut *self.buf, ~[]);
 584                  *self.buf = bytes.slice(count, bytes.len()).to_owned();
 585                  bytes.truncate(count);
 586                  return Some(bytes);
 587              } else if !self.buf.is_empty() {
 588                  let mut bytes = ::std::util::replace(&mut *self.buf, ~[]);
 589                  assert!(count > bytes.len());
 590                  match self.try_recv(count - bytes.len()) {
 591                      Some(rest) => {
 592                          bytes.push_all(rest);
 593                          return Some(bytes);
 594                      }
 595                      None => return None
 596                  }
 597              } else /* empty */ {
 598                  match self.port.try_recv() {
 599                      Some(buf) => {
 600                          assert!(!buf.is_empty());
 601                          *self.buf = buf;
 602                          return self.try_recv(count);
 603                      }
 604                      None => return None
 605                  }
 606              }
 607          }
 608      }
 609  
 610      impl ByteChan for PipeByteChan {
 611          fn send(&self, val~[u8]) {
 612              self.chan.send(val)
 613          }
 614      }
 615  
 616      impl PipeBytePort {
 617          pub fn new(pPort<~[u8]>) -> PipeBytePort {
 618              PipeBytePort {
 619                  port: p,
 620                  buf: @mut ~[]
 621              }
 622          }
 623      }
 624  
 625      impl PipeByteChan {
 626          pub fn new(cChan<~[u8]>) -> PipeByteChan {
 627              PipeByteChan {
 628                  chan: c
 629              }
 630          }
 631      }
 632  
 633  }
 634  
 635  #[cfg(test)]
 636  mod test {
 637  
 638      use flatpipes::BytePort;
 639      use flatpipes::pod;
 640      use flatpipes::serial;
 641      use io_util::BufReader;
 642  
 643      use std::io::BytesWriter;
 644      use std::task;
 645  
 646      #[test]
 647      #[ignore(reason = "ebml failure")]
 648      fn test_serializing_memory_stream() {
 649          let writer = BytesWriter::new();
 650          let chan = serial::writer_chan(writer);
 651  
 652          chan.send(10);
 653  
 654          let bytes = (*chan.byte_chan.writer.bytes).clone();
 655  
 656          let reader = BufReader::new(bytes);
 657          let port = serial::reader_port(reader);
 658  
 659          let res: int = port.recv();
 660          assert_eq!(res, 10i);
 661      }
 662  
 663      #[test]
 664      #[ignore(reason = "FIXME #6211 failing on linux snapshot machine")]
 665      fn test_serializing_pipes() {
 666          let (port, chan) = serial::pipe_stream();
 667  
 668          do task::spawn || {
 669              for i in range(0, 10) {
 670                  chan.send(i)
 671              }
 672          }
 673  
 674          for i in range(0, 10) {
 675              assert!(i == port.recv())
 676          }
 677      }
 678  
 679      #[test]
 680      #[ignore(reason = "ebml failure")]
 681      fn test_serializing_boxes() {
 682          let (port, chan) = serial::pipe_stream();
 683  
 684          do task::spawn || {
 685              for i in range(0, 10) {
 686                  chan.send(@i)
 687              }
 688          }
 689  
 690          for i in range(0, 10) {
 691              assert!(@i == port.recv())
 692          }
 693      }
 694  
 695      #[test]
 696      fn test_pod_memory_stream() {
 697          let writer = BytesWriter::new();
 698          let chan = pod::writer_chan(writer);
 699  
 700          chan.send(10);
 701  
 702          let bytes = (*chan.byte_chan.writer.bytes).clone();
 703  
 704          let reader = BufReader::new(bytes);
 705          let port = pod::reader_port(reader);
 706  
 707          let res: int = port.recv();
 708          assert_eq!(res, 10);
 709      }
 710  
 711      #[test]
 712      fn test_pod_pipes() {
 713          let (port, chan) = pod::pipe_stream();
 714  
 715          do task::spawn || {
 716              for i in range(0, 10) {
 717                  chan.send(i)
 718              }
 719          }
 720  
 721          for i in range(0, 10) {
 722              assert!(i == port.recv())
 723          }
 724      }
 725  
 726      // FIXME #2064: Networking doesn't work on x86
 727      // XXX Broken until networking support is added back
 728      /*
 729      use flatpipes::{Flattener, Unflattener, FlatChan, FlatPort};
 730      use flatpipes::bytepipes::*;
 731  
 732      #[test]
 733      #[cfg(target_arch = "x86_64")]
 734      fn test_pod_tcp_stream() {
 735          fn reader_port(buf: TcpSocketBuf
 736                        ) -> pod::ReaderPort<int, TcpSocketBuf> {
 737              pod::reader_port(buf)
 738          }
 739          fn writer_chan(buf: TcpSocketBuf
 740                        ) -> pod::WriterChan<int, TcpSocketBuf> {
 741              pod::writer_chan(buf)
 742          }
 743          test_some_tcp_stream(reader_port, writer_chan, 9666);
 744      }
 745  
 746      #[test]
 747      #[cfg(target_arch = "x86_64")]
 748      fn test_serializing_tcp_stream() {
 749          // XXX Broken until networking support is added back
 750          fn reader_port(buf: TcpSocketBuf
 751                        ) -> serial::ReaderPort<int, TcpSocketBuf> {
 752              serial::reader_port(buf)
 753          }
 754          fn writer_chan(buf: TcpSocketBuf
 755                        ) -> serial::WriterChan<int, TcpSocketBuf> {
 756              serial::writer_chan(buf)
 757          }
 758          test_some_tcp_stream(reader_port, writer_chan, 9667);
 759      }
 760  
 761      type ReaderPortFactory<U> =
 762          ~fn(TcpSocketBuf) -> FlatPort<int, U, ReaderBytePort<TcpSocketBuf>>;
 763      type WriterChanFactory<F> =
 764          ~fn(TcpSocketBuf) -> FlatChan<int, F, WriterByteChan<TcpSocketBuf>>;
 765  
 766      fn test_some_tcp_stream<U:Unflattener<int>,F:Flattener<int>>(
 767          reader_port: ReaderPortFactory<U>,
 768          writer_chan: WriterChanFactory<F>,
 769          port: uint) {
 770  
 771          use std::cell::Cell;
 772          use std::comm;
 773          use std::result;
 774          use net::ip;
 775          use net::tcp;
 776          use uv;
 777  
 778          // Indicate to the client task that the server is listening
 779          let (begin_connect_port, begin_connect_chan) = comm::stream();
 780          // The connection is sent from the server task to the receiver task
 781          // to handle the connection
 782          let (accept_port, accept_chan) = comm::stream();
 783          // The main task will wait until the test is over to proceed
 784          let (finish_port, finish_chan) = comm::stream();
 785  
 786          let addr0 = ip::v4::parse_addr("127.0.0.1");
 787  
 788          let begin_connect_chan = Cell::new(begin_connect_chan);
 789          let accept_chan = Cell::new(accept_chan);
 790  
 791          // The server task
 792          let addr = addr0.clone();
 793          do task::spawn || {
 794              let iotask = &uv::global_loop::get();
 795              let begin_connect_chan = begin_connect_chan.take();
 796              let accept_chan = accept_chan.take();
 797              let listen_res = do tcp::listen(
 798                  addr.clone(), port, 128, iotask, |_kill_ch| {
 799                      // Tell the sender to initiate the connection
 800                      debug!("listening");
 801                      begin_connect_chan.send(())
 802                  }) |new_conn, kill_ch| {
 803  
 804                  // Incoming connection. Send it to the receiver task to accept
 805                  let (res_port, res_chan) = comm::stream();
 806                  accept_chan.send((new_conn, res_chan));
 807                  // Wait until the connection is accepted
 808                  res_port.recv();
 809  
 810                  // Stop listening
 811                  kill_ch.send(None)
 812              };
 813  
 814              assert!(listen_res.is_ok());
 815          }
 816  
 817          // Client task
 818          let addr = addr0.clone();
 819          do task::spawn || {
 820              // Wait for the server to start listening
 821              begin_connect_port.recv();
 822  
 823              debug!("connecting");
 824              let iotask = &uv::global_loop::get();
 825              let connect_result = tcp::connect(addr.clone(), port, iotask);
 826              assert!(connect_result.is_ok());
 827              let sock = result::unwrap(connect_result);
 828              let socket_buf: tcp::TcpSocketBuf = tcp::socket_buf(sock);
 829  
 830              // TcpSocketBuf is a Writer!
 831              let chan = writer_chan(socket_buf);
 832  
 833              for i in range(0, 10) {
 834                  debug!("sending %?", i);
 835                  chan.send(i)
 836              }
 837          }
 838  
 839          // Receiver task
 840          do task::spawn || {
 841              // Wait for a connection
 842              let (conn, res_chan) = accept_port.recv();
 843  
 844              debug!("accepting connection");
 845              let accept_result = tcp::accept(conn);
 846              debug!("accepted");
 847              assert!(accept_result.is_ok());
 848              let sock = result::unwrap(accept_result);
 849              res_chan.send(());
 850  
 851              let socket_buf: tcp::TcpSocketBuf = tcp::socket_buf(sock);
 852  
 853              // TcpSocketBuf is a Reader!
 854              let port = reader_port(socket_buf);
 855  
 856              for i in range(0, 10) {
 857                  let j = port.recv();
 858                  debug!("received %?", j);
 859                  assert_eq!(i, j);
 860              }
 861  
 862              // The test is over!
 863              finish_chan.send(());
 864          }
 865  
 866          finish_port.recv();
 867      }*/
 868  
 869      // Tests that the different backends behave the same when the
 870      // binary streaming protocol is broken
 871      mod broken_protocol {
 872  
 873          use flatpipes::{BytePort, FlatPort};
 874          use flatpipes::flatteners::PodUnflattener;
 875          use flatpipes::pod;
 876          use io_util::BufReader;
 877  
 878          use std::comm;
 879          use std::io;
 880          use std::sys;
 881          use std::task;
 882  
 883          type PortLoader<P> =
 884              ~fn(~[u8]) -> FlatPort<int, PodUnflattener<int>, P>;
 885  
 886          fn reader_port_loader(bytes: ~[u8]
 887                               ) -> pod::ReaderPort<int, BufReader> {
 888              let reader = BufReader::new(bytes);
 889              pod::reader_port(reader)
 890          }
 891  
 892          fn pipe_port_loader(bytes: ~[u8]
 893                             ) -> pod::PipePort<int> {
 894              let (port, chan) = comm::stream();
 895              if !bytes.is_empty() {
 896                  chan.send(bytes);
 897              }
 898              pod::pipe_port(port)
 899          }
 900  
 901          fn test_try_recv_none1<P:BytePort>(loader: PortLoader<P>) {
 902              let bytes = ~[];
 903              let port = loader(bytes);
 904              let res: Option<int> = port.try_recv();
 905              assert!(res.is_none());
 906          }
 907  
 908          #[test]
 909          fn test_try_recv_none1_reader() {
 910              test_try_recv_none1(reader_port_loader);
 911          }
 912          #[test]
 913          fn test_try_recv_none1_pipe() {
 914              test_try_recv_none1(pipe_port_loader);
 915          }
 916  
 917          fn test_try_recv_none2<P:BytePort>(loader: PortLoader<P>) {
 918              // The control word in the protocol is interrupted
 919              let bytes = ~[0];
 920              let port = loader(bytes);
 921              let res: Option<int> = port.try_recv();
 922              assert!(res.is_none());
 923          }
 924  
 925          #[test]
 926          fn test_try_recv_none2_reader() {
 927              test_try_recv_none2(reader_port_loader);
 928          }
 929          #[test]
 930          fn test_try_recv_none2_pipe() {
 931              test_try_recv_none2(pipe_port_loader);
 932          }
 933  
 934          fn test_try_recv_none3<P:BytePort>(loader: PortLoader<P>) {
 935              static CONTINUE: [u8, ..4] = [0xAA, 0xBB, 0xCC, 0xDD];
 936              // The control word is followed by garbage
 937              let bytes = CONTINUE.to_owned() + &[0u8];
 938              let port = loader(bytes);
 939              let res: Option<int> = port.try_recv();
 940              assert!(res.is_none());
 941          }
 942  
 943          #[test]
 944          fn test_try_recv_none3_reader() {
 945              test_try_recv_none3(reader_port_loader);
 946          }
 947          #[test]
 948          fn test_try_recv_none3_pipe() {
 949              test_try_recv_none3(pipe_port_loader);
 950          }
 951  
 952          fn test_try_recv_none4<P:BytePort>(loader: PortLoader<P>) {
 953              assert!(do task::try || {
 954                  static CONTINUE: [u8, ..4] = [0xAA, 0xBB, 0xCC, 0xDD];
 955                  // The control word is followed by a valid length,
 956                  // then undeserializable garbage
 957                  let len_bytes = do io::u64_to_be_bytes(
 958                      1, sys::size_of::<u64>()) |len_bytes| {
 959                      len_bytes.to_owned()
 960                  };
 961                  let bytes = CONTINUE.to_owned() + len_bytes + &[0u8, 0, 0, 0];
 962  
 963                  let port = loader(bytes);
 964  
 965                  let _res: Option<int> = port.try_recv();
 966              }.is_err());
 967          }
 968  
 969          #[test]
 970          fn test_try_recv_none4_reader() {
 971              test_try_recv_none4(reader_port_loader);
 972          }
 973          #[test]
 974          fn test_try_recv_none4_pipe() {
 975              test_try_recv_none4(pipe_port_loader);
 976          }
 977      }
 978  
 979  }

libextra/flatpipes.rs:75:3-75:3 -struct- definition:
*/
pub struct FlatChan<T, F, C> {
references:-
328:         FlatChan {
327:     pub fn new(f: F, c: C) -> FlatChan<T, F, C> {
103:     pub type WriterChan<T, W> = FlatChan<
188:     pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;
107:     pub type PipeChan<T> = FlatChan<
326: impl<T,F:Flattener<T>,C:ByteChan> FlatChan<T, F, C> {
305: impl<T,F:Flattener<T>,C:ByteChan> GenericChan<T> for FlatChan<T, F, C> {
186:         FlatChan<T, PodFlattener<T>, WriterByteChan<W>>;


libextra/flatpipes.rs:466:4-466:4 -trait- definition:
    pub trait FromReader {
        fn from_reader(r: @Reader) -> Self;
references:-
491:     impl FromReader for ebml::reader::Decoder {
467:         fn from_reader(r: @Reader) -> Self;
445:     pub fn deserialize_buffer<D: Decoder + FromReader,
474:     impl FromReader for json::Decoder {


libextra/flatpipes.rs:183:4-183:4 -ty- definition:
    pub type ReaderPort<T, R> =
        FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>;
references:-
193:     ) -> ReaderPort<T, R> {


libextra/flatpipes.rs:396:4-396:4 -ty- definition:
    pub type DeserializeBuffer<T> = ~fn(buf: &[u8]) -> T;

references:-
399:         deserialize_buffer: DeserializeBuffer<T>
423:         pub fn new(deserialize_buffer: DeserializeBuffer<T>)


libextra/flatpipes.rs:253:3-253:3 -trait- definition:
*/
pub trait ByteChan {
references:-
545:     impl<W:Writer> ByteChan for WriterByteChan<W> {
326: impl<T,F:Flattener<T>,C:ByteChan> FlatChan<T, F, C> {
610:     impl ByteChan for PipeByteChan {
305: impl<T,F:Flattener<T>,C:ByteChan> GenericChan<T> for FlatChan<T, F, C> {


libextra/flatpipes.rs:398:4-398:4 -struct- definition:
    pub struct DeserializingUnflattener<D, T> {
        deserialize_buffer: DeserializeBuffer<T>
references:-
425:             DeserializingUnflattener {
101:         T, DeserializingUnflattener<DefaultDecoder, T>,
424:                    -> DeserializingUnflattener<D, T> {
409:             for DeserializingUnflattener<D, T> {
134:         let unflat: DeserializingUnflattener<DefaultDecoder, T> =
422:     impl<D:Decoder,T:Decodable<D>> DeserializingUnflattener<D, T> {
106:         T, DeserializingUnflattener<DefaultDecoder, T>, PipeBytePort>;
113:         let unflat: DeserializingUnflattener<DefaultDecoder, T> =


libextra/flatpipes.rs:246:3-246:3 -trait- definition:
*/
pub trait BytePort {
references:-
523:     impl<R:Reader> BytePort for ReaderBytePort<R> {
260: impl<T,U:Unflattener<T>,P:BytePort> GenericPort<T> for FlatPort<T, U, P> {
580:     impl BytePort for PipeBytePort {
317: impl<T,U:Unflattener<T>,P:BytePort> FlatPort<T, U, P> {


libextra/flatpipes.rs:232:3-232:3 -trait- definition:
*/
pub trait Flattener<T> {
references:-
370:     impl<T:Clone + Send> Flattener<T> for PodFlattener<T> {
305: impl<T,F:Flattener<T>,C:ByteChan> GenericChan<T> for FlatChan<T, F, C> {
415:     impl<S:Encoder,T:Encodable<S>> Flattener<T>
326: impl<T,F:Flattener<T>,C:ByteChan> FlatChan<T, F, C> {


libextra/flatpipes.rs:188:4-188:4 -ty- definition:
    pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;

references:-
223:     pub fn pipe_stream<T:Clone + Send>() -> (PipePort<T>, PipeChan<T>) {
216:     pub fn pipe_chan<T:Clone + Send>(chan: Chan<~[u8]>) -> PipeChan<T> {


libextra/flatpipes.rs:103:4-103:4 -ty- definition:
    pub type WriterChan<T, W> = FlatChan<
        T, SerializingFlattener<DefaultEncoder, T>, WriterByteChan<W>>;
references:-
122:                        W: Writer>(writer: W) -> WriterChan<T, W> {


libextra/flatpipes.rs:576:4-576:4 -struct- definition:
    pub struct PipeByteChan {
        chan: comm::Chan<~[u8]>
references:-
188:     pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;
625:     impl PipeByteChan {
627:             PipeByteChan {
626:         pub fn new(c: Chan<~[u8]>) -> PipeByteChan {
108:         T, SerializingFlattener<DefaultEncoder, T>, PipeByteChan>;
610:     impl ByteChan for PipeByteChan {


libextra/flatpipes.rs:142:4-142:4 -fn- definition:
    pub fn pipe_chan<T:Encodable<DefaultEncoder>>(
        chan: Chan<~[u8]>
references:-
157:         return (pipe_port(port), pipe_chan(chan));


libextra/flatpipes.rs:515:4-515:4 -struct- definition:
    pub struct ReaderBytePort<R> {
        reader: R
references:-
551:     impl<R:Reader> ReaderBytePort<R> {
102:         ReaderBytePort<R>>;
523:     impl<R:Reader> BytePort for ReaderBytePort<R> {
184:         FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>;
553:             ReaderBytePort {
552:         pub fn new(r: R) -> ReaderBytePort<R> {


libextra/flatpipes.rs:456:4-456:4 -fn- definition:
    pub fn serialize_value<D: Encoder + FromWriter,
                           T: Encodable<D>>(
references:-
147:                 serialize_value::<DefaultEncoder, T>);
125:                 serialize_value::<DefaultEncoder, T>);


libextra/flatpipes.rs:131:4-131:4 -fn- definition:
    pub fn pipe_port<T:Decodable<DefaultDecoder>>(
        port: Port<~[u8]>
references:-
157:         return (pipe_port(port), pipe_chan(chan));


libextra/flatpipes.rs:356:4-356:4 -struct- definition:
    pub struct PodFlattener<T> {
        bogus: ()
references:-
203:         let flat: PodFlattener<T> = PodFlattener::new();
387:     impl<T:Clone + Send> PodFlattener<T> {
186:         FlatChan<T, PodFlattener<T>, WriterByteChan<W>>;
389:             PodFlattener {
370:     impl<T:Clone + Send> Flattener<T> for PodFlattener<T> {
188:     pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;
217:         let flat: PodFlattener<T> = PodFlattener::new();
388:         pub fn new() -> PodFlattener<T> {


libextra/flatpipes.rs:571:4-571:4 -struct- definition:
    pub struct PipeBytePort {
        port: comm::Port<~[u8]>,
references:-
187:     pub type PipePort<T> = FlatPort<T, PodUnflattener<T>, PipeBytePort>;
616:     impl PipeBytePort {
618:             PipeBytePort {
580:     impl BytePort for PipeBytePort {
106:         T, DeserializingUnflattener<DefaultDecoder, T>, PipeBytePort>;
617:         pub fn new(p: Port<~[u8]>) -> PipeBytePort {


libextra/flatpipes.rs:239:3-239:3 -trait- definition:
*/
pub trait Unflattener<T> {
references:-
317: impl<T,U:Unflattener<T>,P:BytePort> FlatPort<T, U, P> {
408:     impl<D:Decoder,T:Decodable<D>> Unflattener<T>
360:     impl<T:Clone + Send> Unflattener<T> for PodUnflattener<T> {
260: impl<T,U:Unflattener<T>,P:BytePort> GenericPort<T> for FlatPort<T, U, P> {


libextra/flatpipes.rs:64:3-64:3 -struct- definition:
*/
pub struct FlatPort<T, U, P> {
references:-
318:     pub fn new(u: U, p: P) -> FlatPort<T, U, P> {
317: impl<T,U:Unflattener<T>,P:BytePort> FlatPort<T, U, P> {
184:         FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>;
100:     pub type ReaderPort<T, R> = FlatPort<
105:     pub type PipePort<T> = FlatPort<
319:         FlatPort {
260: impl<T,U:Unflattener<T>,P:BytePort> GenericPort<T> for FlatPort<T, U, P> {
187:     pub type PipePort<T> = FlatPort<T, PodUnflattener<T>, PipeBytePort>;


libextra/flatpipes.rs:404:4-404:4 -struct- definition:
    pub struct SerializingFlattener<S, T> {
        serialize_value: SerializeValue<T>
references:-
104:         T, SerializingFlattener<DefaultEncoder, T>, WriterByteChan<W>>;
416:             for SerializingFlattener<S, T> {
145:         let flat: SerializingFlattener<DefaultEncoder, T> =
108:         T, SerializingFlattener<DefaultEncoder, T>, PipeByteChan>;
431:     impl<S:Encoder,T:Encodable<S>> SerializingFlattener<S, T> {
433:                    -> SerializingFlattener<S, T> {
123:         let flat: SerializingFlattener<DefaultEncoder, T> =
434:             SerializingFlattener {


libextra/flatpipes.rs:352:4-352:4 -struct- definition:
    pub struct PodUnflattener<T> {
        bogus: ()
references:-
380:         pub fn new() -> PodUnflattener<T> {
184:         FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>;
381:             PodUnflattener {
210:         let unflat: PodUnflattener<T> = PodUnflattener::new();
194:         let unflat: PodUnflattener<T> = PodUnflattener::new();
187:     pub type PipePort<T> = FlatPort<T, PodUnflattener<T>, PipeBytePort>;
379:     impl<T:Clone + Send> PodUnflattener<T> {
360:     impl<T:Clone + Send> Unflattener<T> for PodUnflattener<T> {


libextra/flatpipes.rs:402:4-402:4 -ty- definition:
    pub type SerializeValue<T> = ~fn(val: &T) -> ~[u8];

references:-
405:         serialize_value: SerializeValue<T>
432:         pub fn new(serialize_value: SerializeValue<T>)


libextra/flatpipes.rs:187:4-187:4 -ty- definition:
    pub type PipePort<T> = FlatPort<T, PodUnflattener<T>, PipeBytePort>;
    pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;
references:-
209:     pub fn pipe_port<T:Clone + Send>(port: Port<~[u8]>) -> PipePort<T> {
223:     pub fn pipe_stream<T:Clone + Send>() -> (PipePort<T>, PipeChan<T>) {


libextra/flatpipes.rs:100:4-100:4 -ty- definition:
    pub type ReaderPort<T, R> = FlatPort<
        T, DeserializingUnflattener<DefaultDecoder, T>,
references:-
112:                        R: Reader>(reader: R) -> ReaderPort<T, R> {


libextra/flatpipes.rs:216:4-216:4 -fn- definition:
    pub fn pipe_chan<T:Clone + Send>(chan: Chan<~[u8]>) -> PipeChan<T> {
        let flat: PodFlattener<T> = PodFlattener::new();
references:-
225:         return (pipe_port(port), pipe_chan(chan));


libextra/flatpipes.rs:209:4-209:4 -fn- definition:
    pub fn pipe_port<T:Clone + Send>(port: Port<~[u8]>) -> PipePort<T> {
        let unflat: PodUnflattener<T> = PodUnflattener::new();
references:-
225:         return (pipe_port(port), pipe_chan(chan));


libextra/flatpipes.rs:519:4-519:4 -struct- definition:
    pub struct WriterByteChan<W> {
        writer: W
references:-
559:     impl<W:Writer> WriterByteChan<W> {
186:         FlatChan<T, PodFlattener<T>, WriterByteChan<W>>;
545:     impl<W:Writer> ByteChan for WriterByteChan<W> {
104:         T, SerializingFlattener<DefaultEncoder, T>, WriterByteChan<W>>;
560:         pub fn new(w: W) -> WriterByteChan<W> {
561:             WriterByteChan {


libextra/flatpipes.rs:185:4-185:4 -ty- definition:
    pub type WriterChan<T, W> =
        FlatChan<T, PodFlattener<T>, WriterByteChan<W>>;
references:-
202:     ) -> WriterChan<T, W> {


libextra/flatpipes.rs:470:4-470:4 -trait- definition:
    pub trait FromWriter {
        fn from_writer(w: @Writer) -> Self;
references:-
485:     impl FromWriter for json::Encoder {
471:         fn from_writer(w: @Writer) -> Self;
456:     pub fn serialize_value<D: Encoder + FromWriter,
499:     impl FromWriter for ebml::writer::Encoder {


libextra/flatpipes.rs:105:4-105:4 -ty- definition:
    pub type PipePort<T> = FlatPort<
        T, DeserializingUnflattener<DefaultDecoder, T>, PipeBytePort>;
references:-
155:                           ) -> (PipePort<T>, PipeChan<T>) {
133:     ) -> PipePort<T> {


libextra/flatpipes.rs:107:4-107:4 -ty- definition:
    pub type PipeChan<T> = FlatChan<
        T, SerializingFlattener<DefaultEncoder, T>, PipeByteChan>;
references:-
155:                           ) -> (PipePort<T>, PipeChan<T>) {
144:     ) -> PipeChan<T> {


libextra/flatpipes.rs:445:4-445:4 -fn- definition:
    pub fn deserialize_buffer<D: Decoder + FromReader,
                              T: Decodable<D>>(
references:-
136:                 deserialize_buffer::<DefaultDecoder, T>);
115:                 deserialize_buffer::<DefaultDecoder, T>);