(index<- ) ./libextra/flatpipes.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12
13 Generic communication channels for things that can be represented as,
14 or transformed to and from, byte vectors.
15
16 The `FlatPort` and `FlatChan` types implement the generic channel and
17 port interface for arbitrary types and transport strategies. It can
18 particularly be used to send and receive serializable types over I/O
19 streams.
20
21 `FlatPort` and `FlatChan` implement the same comm traits as pipe-based
22 ports and channels.
23
24 # Example
25
26 This example sends boxed integers across tasks using serialization.
27
28 ~~~ {.rust}
29 let (port, chan) = serial::pipe_stream();
30
31 do task::spawn || {
32 for i in range(0, 10) {
33 chan.send(@i)
34 }
35 }
36
37 for i in range(0, 10) {
38 assert @i == port.recv()
39 }
40 ~~~
41
42 # Safety Note
43
44 Flat pipes created from `io::Reader`s and `io::Writer`s share the same
45 blocking properties as the underlying stream. Since some implementations
46 block the scheduler thread, so will their pipes.
47
48 */
49
50 #[allow(missing_doc)];
51
52
53 // The basic send/recv interface FlatChan and PortChan will implement
54 use std::io;
55 use std::comm::GenericChan;
56 use std::comm::GenericPort;
57 use std::sys::size_of;
58
59 /**
60 A FlatPort, consisting of a `BytePort` that receives byte vectors,
61 and an `Unflattener` that converts the bytes to a value.
62
63 Create using the constructors in the `serial` and `pod` modules.
64 */
65 pub struct FlatPort<T, U, P> {
66 unflattener: U,
67 byte_port: P
68 }
69
70 /**
71 A FlatChan, consisting of a `Flattener` that converts values to
72 byte vectors, and a `ByteChan` that transmits the bytes.
73
74 Create using the constructors in the `serial` and `pod` modules.
75 */
76 pub struct FlatChan<T, F, C> {
77 flattener: F,
78 byte_chan: C
79 }
80
81 /**
82 Constructors for flat pipes that using serialization-based flattening.
83 */
84 pub mod serial {
85 pub use DefaultEncoder = ebml::writer::Encoder;
86 pub use DefaultDecoder = ebml::reader::Decoder;
87
88 use serialize::{Decodable, Encodable};
89 use flatpipes::flatteners::{DeserializingUnflattener,
90 SerializingFlattener};
91 use flatpipes::flatteners::{deserialize_buffer, serialize_value};
92 use flatpipes::bytepipes::{ReaderBytePort, WriterByteChan};
93 use flatpipes::bytepipes::{PipeBytePort, PipeByteChan};
94 use flatpipes::{FlatPort, FlatChan};
95
96 use std::io::{Reader, Writer};
97 use std::comm::{Port, Chan};
98 use std::comm;
99
100 pub type ReaderPort<T, R> = FlatPort<
101 T, DeserializingUnflattener<DefaultDecoder, T>,
102 ReaderBytePort<R>>;
103 pub type WriterChan<T, W> = FlatChan<
104 T, SerializingFlattener<DefaultEncoder, T>, WriterByteChan<W>>;
105 pub type PipePort<T> = FlatPort<
106 T, DeserializingUnflattener<DefaultDecoder, T>, PipeBytePort>;
107 pub type PipeChan<T> = FlatChan<
108 T, SerializingFlattener<DefaultEncoder, T>, PipeByteChan>;
109
110 /// Create a `FlatPort` from a `Reader`
111 pub fn reader_port<T: Decodable<DefaultDecoder>,
112 R: Reader>(reader: R) -> ReaderPort<T, R> {
113 let unflat: DeserializingUnflattener<DefaultDecoder, T> =
114 DeserializingUnflattener::new(
115 deserialize_buffer::<DefaultDecoder, T>);
116 let byte_port = ReaderBytePort::new(reader);
117 FlatPort::new(unflat, byte_port)
118 }
119
120 /// Create a `FlatChan` from a `Writer`
121 pub fn writer_chan<T: Encodable<DefaultEncoder>,
122 W: Writer>(writer: W) -> WriterChan<T, W> {
123 let flat: SerializingFlattener<DefaultEncoder, T> =
124 SerializingFlattener::new(
125 serialize_value::<DefaultEncoder, T>);
126 let byte_chan = WriterByteChan::new(writer);
127 FlatChan::new(flat, byte_chan)
128 }
129
130 /// Create a `FlatPort` from a `Port<~[u8]>`
131 pub fn pipe_port<T:Decodable<DefaultDecoder>>(
132 port: Port<~[u8]>
133 ) -> PipePort<T> {
134 let unflat: DeserializingUnflattener<DefaultDecoder, T> =
135 DeserializingUnflattener::new(
136 deserialize_buffer::<DefaultDecoder, T>);
137 let byte_port = PipeBytePort::new(port);
138 FlatPort::new(unflat, byte_port)
139 }
140
141 /// Create a `FlatChan` from a `Chan<~[u8]>`
142 pub fn pipe_chan<T:Encodable<DefaultEncoder>>(
143 chan: Chan<~[u8]>
144 ) -> PipeChan<T> {
145 let flat: SerializingFlattener<DefaultEncoder, T> =
146 SerializingFlattener::new(
147 serialize_value::<DefaultEncoder, T>);
148 let byte_chan = PipeByteChan::new(chan);
149 FlatChan::new(flat, byte_chan)
150 }
151
152 /// Create a pair of `FlatChan` and `FlatPort`, backed by pipes
153 pub fn pipe_stream<T: Encodable<DefaultEncoder> +
154 Decodable<DefaultDecoder>>(
155 ) -> (PipePort<T>, PipeChan<T>) {
156 let (port, chan) = comm::stream();
157 return (pipe_port(port), pipe_chan(chan));
158 }
159 }
160
161 // FIXME #4074 this doesn't correctly enforce POD bounds
162 /**
163 Constructors for flat pipes that send POD types using memcpy.
164
165 # Safety Note
166
167 This module is currently unsafe because it uses `Clone + Send` as a type
168 parameter bounds meaning POD (plain old data), but `Clone + Send` and
169 POD are not equivalent.
170
171 */
172 pub mod pod {
173
174 use flatpipes::flatteners::{PodUnflattener, PodFlattener};
175 use flatpipes::bytepipes::{ReaderBytePort, WriterByteChan};
176 use flatpipes::bytepipes::{PipeBytePort, PipeByteChan};
177 use flatpipes::{FlatPort, FlatChan};
178
179 use std::io::{Reader, Writer};
180 use std::comm::{Port, Chan};
181 use std::comm;
182
183 pub type ReaderPort<T, R> =
184 FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>;
185 pub type WriterChan<T, W> =
186 FlatChan<T, PodFlattener<T>, WriterByteChan<W>>;
187 pub type PipePort<T> = FlatPort<T, PodUnflattener<T>, PipeBytePort>;
188 pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;
189
190 /// Create a `FlatPort` from a `Reader`
191 pub fn reader_port<T:Clone + Send,R:Reader>(
192 reader: R
193 ) -> ReaderPort<T, R> {
194 let unflat: PodUnflattener<T> = PodUnflattener::new();
195 let byte_port = ReaderBytePort::new(reader);
196 FlatPort::new(unflat, byte_port)
197 }
198
199 /// Create a `FlatChan` from a `Writer`
200 pub fn writer_chan<T:Clone + Send,W:Writer>(
201 writer: W
202 ) -> WriterChan<T, W> {
203 let flat: PodFlattener<T> = PodFlattener::new();
204 let byte_chan = WriterByteChan::new(writer);
205 FlatChan::new(flat, byte_chan)
206 }
207
208 /// Create a `FlatPort` from a `Port<~[u8]>`
209 pub fn pipe_port<T:Clone + Send>(port: Port<~[u8]>) -> PipePort<T> {
210 let unflat: PodUnflattener<T> = PodUnflattener::new();
211 let byte_port = PipeBytePort::new(port);
212 FlatPort::new(unflat, byte_port)
213 }
214
215 /// Create a `FlatChan` from a `Chan<~[u8]>`
216 pub fn pipe_chan<T:Clone + Send>(chan: Chan<~[u8]>) -> PipeChan<T> {
217 let flat: PodFlattener<T> = PodFlattener::new();
218 let byte_chan = PipeByteChan::new(chan);
219 FlatChan::new(flat, byte_chan)
220 }
221
222 /// Create a pair of `FlatChan` and `FlatPort`, backed by pipes
223 pub fn pipe_stream<T:Clone + Send>() -> (PipePort<T>, PipeChan<T>) {
224 let (port, chan) = comm::stream();
225 return (pipe_port(port), pipe_chan(chan));
226 }
227
228 }
229
230 /**
231 Flatteners present a value as a byte vector
232 */
233 pub trait Flattener<T> {
234 fn flatten(&self, val: T) -> ~[u8];
235 }
236
237 /**
238 Unflatteners convert a byte vector to a value
239 */
240 pub trait Unflattener<T> {
241 fn unflatten(&self, buf: ~[u8]) -> T;
242 }
243
244 /**
245 BytePorts are a simple interface for receiving a specified number
246 */
247 pub trait BytePort {
248 fn try_recv(&self, count: uint) -> Option<~[u8]>;
249 }
250
251 /**
252 ByteChans are a simple interface for sending bytes
253 */
254 pub trait ByteChan {
255 fn send(&self, val: ~[u8]);
256 }
257
258 static CONTINUE: [u8, ..4] = [0xAA, 0xBB, 0xCC, 0xDD];
259
260 impl<T,U:Unflattener<T>,P:BytePort> GenericPort<T> for FlatPort<T, U, P> {
261 fn recv(&self) -> T {
262 match self.try_recv() {
263 Some(val) => val,
264 None => fail!("port is closed")
265 }
266 }
267 fn try_recv(&self) -> Option<T> {
268 let command = match self.byte_port.try_recv(CONTINUE.len()) {
269 Some(c) => c,
270 None => {
271 warn!("flatpipe: broken pipe");
272 return None;
273 }
274 };
275
276 if CONTINUE.as_slice() == command {
277 let msg_len = match self.byte_port.try_recv(size_of::<u64>()) {
278 Some(bytes) => {
279 io::u64_from_be_bytes(bytes, 0, size_of::<u64>())
280 },
281 None => {
282 warn!("flatpipe: broken pipe");
283 return None;
284 }
285 };
286
287 let msg_len = msg_len as uint;
288
289 match self.byte_port.try_recv(msg_len) {
290 Some(bytes) => {
291 Some(self.unflattener.unflatten(bytes))
292 }
293 None => {
294 warn!("flatpipe: broken pipe");
295 return None;
296 }
297 }
298 }
299 else {
300 fail!("flatpipe: unrecognized command");
301 }
302 }
303 }
304
305 impl<T,F:Flattener<T>,C:ByteChan> GenericChan<T> for FlatChan<T, F, C> {
306 fn send(&self, val: T) {
307 self.byte_chan.send(CONTINUE.to_owned());
308 let bytes = self.flattener.flatten(val);
309 let len = bytes.len() as u64;
310 do io::u64_to_be_bytes(len, size_of::<u64>()) |len_bytes| {
311 self.byte_chan.send(len_bytes.to_owned());
312 }
313 self.byte_chan.send(bytes);
314 }
315 }
316
317 impl<T,U:Unflattener<T>,P:BytePort> FlatPort<T, U, P> {
318 pub fn new(u: U, p: P) -> FlatPort<T, U, P> {
319 FlatPort {
320 unflattener: u,
321 byte_port: p
322 }
323 }
324 }
325
326 impl<T,F:Flattener<T>,C:ByteChan> FlatChan<T, F, C> {
327 pub fn new(f: F, c: C) -> FlatChan<T, F, C> {
328 FlatChan {
329 flattener: f,
330 byte_chan: c
331 }
332 }
333 }
334
335
336 pub mod flatteners {
337
338 use ebml;
339 use flatpipes::{Flattener, Unflattener};
340 use io_util::BufReader;
341 use json;
342 use serialize::{Encoder, Decoder, Encodable, Decodable};
343
344 use std::cast;
345 use std::io::{Writer, Reader, ReaderUtil};
346 use std::io;
347 use std::ptr;
348 use std::sys::size_of;
349 use std::vec;
350
351 // FIXME #4074: Clone + Send != POD
352 pub struct PodUnflattener<T> {
353 bogus: ()
354 }
355
356 pub struct PodFlattener<T> {
357 bogus: ()
358 }
359
360 impl<T:Clone + Send> Unflattener<T> for PodUnflattener<T> {
361 fn unflatten(&self, buf: ~[u8]) -> T {
362 assert!(size_of::<T>() != 0);
363 assert_eq!(size_of::<T>(), buf.len());
364 let addr_of_init: &u8 = unsafe { &*vec::raw::to_ptr(buf) };
365 let addr_of_value: &T = unsafe { cast::transmute(addr_of_init) };
366 (*addr_of_value).clone()
367 }
368 }
369
370 impl<T:Clone + Send> Flattener<T> for PodFlattener<T> {
371 fn flatten(&self, val: T) -> ~[u8] {
372 assert!(size_of::<T>() != 0);
373 let val: *T = ptr::to_unsafe_ptr(&val);
374 let byte_value = val as *u8;
375 unsafe { vec::from_buf(byte_value, size_of::<T>()) }
376 }
377 }
378
379 impl<T:Clone + Send> PodUnflattener<T> {
380 pub fn new() -> PodUnflattener<T> {
381 PodUnflattener {
382 bogus: ()
383 }
384 }
385 }
386
387 impl<T:Clone + Send> PodFlattener<T> {
388 pub fn new() -> PodFlattener<T> {
389 PodFlattener {
390 bogus: ()
391 }
392 }
393 }
394
395
396 pub type DeserializeBuffer<T> = ~fn(buf: &[u8]) -> T;
397
398 pub struct DeserializingUnflattener<D, T> {
399 deserialize_buffer: DeserializeBuffer<T>
400 }
401
402 pub type SerializeValue<T> = ~fn(val: &T) -> ~[u8];
403
404 pub struct SerializingFlattener<S, T> {
405 serialize_value: SerializeValue<T>
406 }
407
408 impl<D:Decoder,T:Decodable<D>> Unflattener<T>
409 for DeserializingUnflattener<D, T> {
410 fn unflatten(&self, buf: ~[u8]) -> T {
411 (self.deserialize_buffer)(buf)
412 }
413 }
414
415 impl<S:Encoder,T:Encodable<S>> Flattener<T>
416 for SerializingFlattener<S, T> {
417 fn flatten(&self, val: T) -> ~[u8] {
418 (self.serialize_value)(&val)
419 }
420 }
421
422 impl<D:Decoder,T:Decodable<D>> DeserializingUnflattener<D, T> {
423 pub fn new(deserialize_buffer: DeserializeBuffer<T>)
424 -> DeserializingUnflattener<D, T> {
425 DeserializingUnflattener {
426 deserialize_buffer: deserialize_buffer
427 }
428 }
429 }
430
431 impl<S:Encoder,T:Encodable<S>> SerializingFlattener<S, T> {
432 pub fn new(serialize_value: SerializeValue<T>)
433 -> SerializingFlattener<S, T> {
434 SerializingFlattener {
435 serialize_value: serialize_value
436 }
437 }
438 }
439
440 /*
441 Implementations of the serialization functions required by
442 SerializingFlattener
443 */
444
445 pub fn deserialize_buffer<D: Decoder + FromReader,
446 T: Decodable<D>>(
447 buf: &[u8])
448 -> T {
449 let buf = buf.to_owned();
450 let buf_reader = @BufReader::new(buf);
451 let reader = buf_reader as @Reader;
452 let mut deser: D = FromReader::from_reader(reader);
453 Decodable::decode(&mut deser)
454 }
455
456 pub fn serialize_value<D: Encoder + FromWriter,
457 T: Encodable<D>>(
458 val: &T)
459 -> ~[u8] {
460 do io::with_bytes_writer |writer| {
461 let mut ser = FromWriter::from_writer(writer);
462 val.encode(&mut ser);
463 }
464 }
465
466 pub trait FromReader {
467 fn from_reader(r: @Reader) -> Self;
468 }
469
470 pub trait FromWriter {
471 fn from_writer(w: @Writer) -> Self;
472 }
473
474 impl FromReader for json::Decoder {
475 fn from_reader(r: @Reader) -> json::Decoder {
476 match json::from_reader(r) {
477 Ok(json) => {
478 json::Decoder(json)
479 }
480 Err(e) => fail!("flatpipe: can't parse json: %?", e)
481 }
482 }
483 }
484
485 impl FromWriter for json::Encoder {
486 fn from_writer(w: @Writer) -> json::Encoder {
487 json::Encoder(w)
488 }
489 }
490
491 impl FromReader for ebml::reader::Decoder {
492 fn from_reader(r: @Reader) -> ebml::reader::Decoder {
493 let buf = @r.read_whole_stream();
494 let doc = ebml::reader::Doc(buf);
495 ebml::reader::Decoder(doc)
496 }
497 }
498
499 impl FromWriter for ebml::writer::Encoder {
500 fn from_writer(w: @Writer) -> ebml::writer::Encoder {
501 ebml::writer::Encoder(w)
502 }
503 }
504
505 }
506
507 pub mod bytepipes {
508
509 use flatpipes::{ByteChan, BytePort};
510
511 use std::comm::{Port, Chan};
512 use std::comm;
513 use std::io::{Writer, Reader, ReaderUtil};
514
515 pub struct ReaderBytePort<R> {
516 reader: R
517 }
518
519 pub struct WriterByteChan<W> {
520 writer: W
521 }
522
523 impl<R:Reader> BytePort for ReaderBytePort<R> {
524 fn try_recv(&self, count: uint) -> Option<~[u8]> {
525 let mut left = count;
526 let mut bytes = ~[];
527 while !self.reader.eof() && left > 0 {
528 assert!(left <= count);
529 assert!(left > 0);
530 let new_bytes = self.reader.read_bytes(left);
531 bytes.push_all(new_bytes);
532 assert!(new_bytes.len() <= left);
533 left -= new_bytes.len();
534 }
535
536 if left == 0 {
537 return Some(bytes);
538 } else {
539 warn!("flatpipe: dropped %? broken bytes", left);
540 return None;
541 }
542 }
543 }
544
545 impl<W:Writer> ByteChan for WriterByteChan<W> {
546 fn send(&self, val: ~[u8]) {
547 self.writer.write(val);
548 }
549 }
550
551 impl<R:Reader> ReaderBytePort<R> {
552 pub fn new(r: R) -> ReaderBytePort<R> {
553 ReaderBytePort {
554 reader: r
555 }
556 }
557 }
558
559 impl<W:Writer> WriterByteChan<W> {
560 pub fn new(w: W) -> WriterByteChan<W> {
561 WriterByteChan {
562 writer: w
563 }
564 }
565 }
566
567 // FIXME #6850: Remove `@mut` when this module is ported to the new I/O traits,
568 // which use `&mut self` properly. (For example, util::comm::GenericPort's try_recv
569 // method doesn't use `&mut self`, so the `try_recv` method in the impl of `BytePort`
570 // for `PipeBytePort` can't have `&mut self` either.)
571 pub struct PipeBytePort {
572 port: comm::Port<~[u8]>,
573 buf: @mut ~[u8]
574 }
575
576 pub struct PipeByteChan {
577 chan: comm::Chan<~[u8]>
578 }
579
580 impl BytePort for PipeBytePort {
581 fn try_recv(&self, count: uint) -> Option<~[u8]> {
582 if self.buf.len() >= count {
583 let mut bytes = ::std::util::replace(&mut *self.buf, ~[]);
584 *self.buf = bytes.slice(count, bytes.len()).to_owned();
585 bytes.truncate(count);
586 return Some(bytes);
587 } else if !self.buf.is_empty() {
588 let mut bytes = ::std::util::replace(&mut *self.buf, ~[]);
589 assert!(count > bytes.len());
590 match self.try_recv(count - bytes.len()) {
591 Some(rest) => {
592 bytes.push_all(rest);
593 return Some(bytes);
594 }
595 None => return None
596 }
597 } else /* empty */ {
598 match self.port.try_recv() {
599 Some(buf) => {
600 assert!(!buf.is_empty());
601 *self.buf = buf;
602 return self.try_recv(count);
603 }
604 None => return None
605 }
606 }
607 }
608 }
609
610 impl ByteChan for PipeByteChan {
611 fn send(&self, val: ~[u8]) {
612 self.chan.send(val)
613 }
614 }
615
616 impl PipeBytePort {
617 pub fn new(p: Port<~[u8]>) -> PipeBytePort {
618 PipeBytePort {
619 port: p,
620 buf: @mut ~[]
621 }
622 }
623 }
624
625 impl PipeByteChan {
626 pub fn new(c: Chan<~[u8]>) -> PipeByteChan {
627 PipeByteChan {
628 chan: c
629 }
630 }
631 }
632
633 }
634
635 #[cfg(test)]
636 mod test {
637
638 use flatpipes::BytePort;
639 use flatpipes::pod;
640 use flatpipes::serial;
641 use io_util::BufReader;
642
643 use std::io::BytesWriter;
644 use std::task;
645
646 #[test]
647 #[ignore(reason = "ebml failure")]
648 fn test_serializing_memory_stream() {
649 let writer = BytesWriter::new();
650 let chan = serial::writer_chan(writer);
651
652 chan.send(10);
653
654 let bytes = (*chan.byte_chan.writer.bytes).clone();
655
656 let reader = BufReader::new(bytes);
657 let port = serial::reader_port(reader);
658
659 let res: int = port.recv();
660 assert_eq!(res, 10i);
661 }
662
663 #[test]
664 #[ignore(reason = "FIXME #6211 failing on linux snapshot machine")]
665 fn test_serializing_pipes() {
666 let (port, chan) = serial::pipe_stream();
667
668 do task::spawn || {
669 for i in range(0, 10) {
670 chan.send(i)
671 }
672 }
673
674 for i in range(0, 10) {
675 assert!(i == port.recv())
676 }
677 }
678
679 #[test]
680 #[ignore(reason = "ebml failure")]
681 fn test_serializing_boxes() {
682 let (port, chan) = serial::pipe_stream();
683
684 do task::spawn || {
685 for i in range(0, 10) {
686 chan.send(@i)
687 }
688 }
689
690 for i in range(0, 10) {
691 assert!(@i == port.recv())
692 }
693 }
694
695 #[test]
696 fn test_pod_memory_stream() {
697 let writer = BytesWriter::new();
698 let chan = pod::writer_chan(writer);
699
700 chan.send(10);
701
702 let bytes = (*chan.byte_chan.writer.bytes).clone();
703
704 let reader = BufReader::new(bytes);
705 let port = pod::reader_port(reader);
706
707 let res: int = port.recv();
708 assert_eq!(res, 10);
709 }
710
711 #[test]
712 fn test_pod_pipes() {
713 let (port, chan) = pod::pipe_stream();
714
715 do task::spawn || {
716 for i in range(0, 10) {
717 chan.send(i)
718 }
719 }
720
721 for i in range(0, 10) {
722 assert!(i == port.recv())
723 }
724 }
725
726 // FIXME #2064: Networking doesn't work on x86
727 // XXX Broken until networking support is added back
728 /*
729 use flatpipes::{Flattener, Unflattener, FlatChan, FlatPort};
730 use flatpipes::bytepipes::*;
731
732 #[test]
733 #[cfg(target_arch = "x86_64")]
734 fn test_pod_tcp_stream() {
735 fn reader_port(buf: TcpSocketBuf
736 ) -> pod::ReaderPort<int, TcpSocketBuf> {
737 pod::reader_port(buf)
738 }
739 fn writer_chan(buf: TcpSocketBuf
740 ) -> pod::WriterChan<int, TcpSocketBuf> {
741 pod::writer_chan(buf)
742 }
743 test_some_tcp_stream(reader_port, writer_chan, 9666);
744 }
745
746 #[test]
747 #[cfg(target_arch = "x86_64")]
748 fn test_serializing_tcp_stream() {
749 // XXX Broken until networking support is added back
750 fn reader_port(buf: TcpSocketBuf
751 ) -> serial::ReaderPort<int, TcpSocketBuf> {
752 serial::reader_port(buf)
753 }
754 fn writer_chan(buf: TcpSocketBuf
755 ) -> serial::WriterChan<int, TcpSocketBuf> {
756 serial::writer_chan(buf)
757 }
758 test_some_tcp_stream(reader_port, writer_chan, 9667);
759 }
760
761 type ReaderPortFactory<U> =
762 ~fn(TcpSocketBuf) -> FlatPort<int, U, ReaderBytePort<TcpSocketBuf>>;
763 type WriterChanFactory<F> =
764 ~fn(TcpSocketBuf) -> FlatChan<int, F, WriterByteChan<TcpSocketBuf>>;
765
766 fn test_some_tcp_stream<U:Unflattener<int>,F:Flattener<int>>(
767 reader_port: ReaderPortFactory<U>,
768 writer_chan: WriterChanFactory<F>,
769 port: uint) {
770
771 use std::cell::Cell;
772 use std::comm;
773 use std::result;
774 use net::ip;
775 use net::tcp;
776 use uv;
777
778 // Indicate to the client task that the server is listening
779 let (begin_connect_port, begin_connect_chan) = comm::stream();
780 // The connection is sent from the server task to the receiver task
781 // to handle the connection
782 let (accept_port, accept_chan) = comm::stream();
783 // The main task will wait until the test is over to proceed
784 let (finish_port, finish_chan) = comm::stream();
785
786 let addr0 = ip::v4::parse_addr("127.0.0.1");
787
788 let begin_connect_chan = Cell::new(begin_connect_chan);
789 let accept_chan = Cell::new(accept_chan);
790
791 // The server task
792 let addr = addr0.clone();
793 do task::spawn || {
794 let iotask = &uv::global_loop::get();
795 let begin_connect_chan = begin_connect_chan.take();
796 let accept_chan = accept_chan.take();
797 let listen_res = do tcp::listen(
798 addr.clone(), port, 128, iotask, |_kill_ch| {
799 // Tell the sender to initiate the connection
800 debug!("listening");
801 begin_connect_chan.send(())
802 }) |new_conn, kill_ch| {
803
804 // Incoming connection. Send it to the receiver task to accept
805 let (res_port, res_chan) = comm::stream();
806 accept_chan.send((new_conn, res_chan));
807 // Wait until the connection is accepted
808 res_port.recv();
809
810 // Stop listening
811 kill_ch.send(None)
812 };
813
814 assert!(listen_res.is_ok());
815 }
816
817 // Client task
818 let addr = addr0.clone();
819 do task::spawn || {
820 // Wait for the server to start listening
821 begin_connect_port.recv();
822
823 debug!("connecting");
824 let iotask = &uv::global_loop::get();
825 let connect_result = tcp::connect(addr.clone(), port, iotask);
826 assert!(connect_result.is_ok());
827 let sock = result::unwrap(connect_result);
828 let socket_buf: tcp::TcpSocketBuf = tcp::socket_buf(sock);
829
830 // TcpSocketBuf is a Writer!
831 let chan = writer_chan(socket_buf);
832
833 for i in range(0, 10) {
834 debug!("sending %?", i);
835 chan.send(i)
836 }
837 }
838
839 // Receiver task
840 do task::spawn || {
841 // Wait for a connection
842 let (conn, res_chan) = accept_port.recv();
843
844 debug!("accepting connection");
845 let accept_result = tcp::accept(conn);
846 debug!("accepted");
847 assert!(accept_result.is_ok());
848 let sock = result::unwrap(accept_result);
849 res_chan.send(());
850
851 let socket_buf: tcp::TcpSocketBuf = tcp::socket_buf(sock);
852
853 // TcpSocketBuf is a Reader!
854 let port = reader_port(socket_buf);
855
856 for i in range(0, 10) {
857 let j = port.recv();
858 debug!("received %?", j);
859 assert_eq!(i, j);
860 }
861
862 // The test is over!
863 finish_chan.send(());
864 }
865
866 finish_port.recv();
867 }*/
868
869 // Tests that the different backends behave the same when the
870 // binary streaming protocol is broken
871 mod broken_protocol {
872
873 use flatpipes::{BytePort, FlatPort};
874 use flatpipes::flatteners::PodUnflattener;
875 use flatpipes::pod;
876 use io_util::BufReader;
877
878 use std::comm;
879 use std::io;
880 use std::sys;
881 use std::task;
882
883 type PortLoader<P> =
884 ~fn(~[u8]) -> FlatPort<int, PodUnflattener<int>, P>;
885
886 fn reader_port_loader(bytes: ~[u8]
887 ) -> pod::ReaderPort<int, BufReader> {
888 let reader = BufReader::new(bytes);
889 pod::reader_port(reader)
890 }
891
892 fn pipe_port_loader(bytes: ~[u8]
893 ) -> pod::PipePort<int> {
894 let (port, chan) = comm::stream();
895 if !bytes.is_empty() {
896 chan.send(bytes);
897 }
898 pod::pipe_port(port)
899 }
900
901 fn test_try_recv_none1<P:BytePort>(loader: PortLoader<P>) {
902 let bytes = ~[];
903 let port = loader(bytes);
904 let res: Option<int> = port.try_recv();
905 assert!(res.is_none());
906 }
907
908 #[test]
909 fn test_try_recv_none1_reader() {
910 test_try_recv_none1(reader_port_loader);
911 }
912 #[test]
913 fn test_try_recv_none1_pipe() {
914 test_try_recv_none1(pipe_port_loader);
915 }
916
917 fn test_try_recv_none2<P:BytePort>(loader: PortLoader<P>) {
918 // The control word in the protocol is interrupted
919 let bytes = ~[0];
920 let port = loader(bytes);
921 let res: Option<int> = port.try_recv();
922 assert!(res.is_none());
923 }
924
925 #[test]
926 fn test_try_recv_none2_reader() {
927 test_try_recv_none2(reader_port_loader);
928 }
929 #[test]
930 fn test_try_recv_none2_pipe() {
931 test_try_recv_none2(pipe_port_loader);
932 }
933
934 fn test_try_recv_none3<P:BytePort>(loader: PortLoader<P>) {
935 static CONTINUE: [u8, ..4] = [0xAA, 0xBB, 0xCC, 0xDD];
936 // The control word is followed by garbage
937 let bytes = CONTINUE.to_owned() + &[0u8];
938 let port = loader(bytes);
939 let res: Option<int> = port.try_recv();
940 assert!(res.is_none());
941 }
942
943 #[test]
944 fn test_try_recv_none3_reader() {
945 test_try_recv_none3(reader_port_loader);
946 }
947 #[test]
948 fn test_try_recv_none3_pipe() {
949 test_try_recv_none3(pipe_port_loader);
950 }
951
952 fn test_try_recv_none4<P:BytePort>(loader: PortLoader<P>) {
953 assert!(do task::try || {
954 static CONTINUE: [u8, ..4] = [0xAA, 0xBB, 0xCC, 0xDD];
955 // The control word is followed by a valid length,
956 // then undeserializable garbage
957 let len_bytes = do io::u64_to_be_bytes(
958 1, sys::size_of::<u64>()) |len_bytes| {
959 len_bytes.to_owned()
960 };
961 let bytes = CONTINUE.to_owned() + len_bytes + &[0u8, 0, 0, 0];
962
963 let port = loader(bytes);
964
965 let _res: Option<int> = port.try_recv();
966 }.is_err());
967 }
968
969 #[test]
970 fn test_try_recv_none4_reader() {
971 test_try_recv_none4(reader_port_loader);
972 }
973 #[test]
974 fn test_try_recv_none4_pipe() {
975 test_try_recv_none4(pipe_port_loader);
976 }
977 }
978
979 }
libextra/flatpipes.rs:75:3-75:3 -struct- definition:
*/
pub struct FlatChan<T, F, C> {
references:-328: FlatChan {
327: pub fn new(f: F, c: C) -> FlatChan<T, F, C> {
103: pub type WriterChan<T, W> = FlatChan<
188: pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;
107: pub type PipeChan<T> = FlatChan<
326: impl<T,F:Flattener<T>,C:ByteChan> FlatChan<T, F, C> {
305: impl<T,F:Flattener<T>,C:ByteChan> GenericChan<T> for FlatChan<T, F, C> {
186: FlatChan<T, PodFlattener<T>, WriterByteChan<W>>;
libextra/flatpipes.rs:466:4-466:4 -trait- definition:
pub trait FromReader {
fn from_reader(r: @Reader) -> Self;
references:-491: impl FromReader for ebml::reader::Decoder {
467: fn from_reader(r: @Reader) -> Self;
445: pub fn deserialize_buffer<D: Decoder + FromReader,
474: impl FromReader for json::Decoder {
libextra/flatpipes.rs:183:4-183:4 -ty- definition:
pub type ReaderPort<T, R> =
FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>;
references:-193: ) -> ReaderPort<T, R> {
libextra/flatpipes.rs:396:4-396:4 -ty- definition:
pub type DeserializeBuffer<T> = ~fn(buf: &[u8]) -> T;
references:-399: deserialize_buffer: DeserializeBuffer<T>
423: pub fn new(deserialize_buffer: DeserializeBuffer<T>)
libextra/flatpipes.rs:253:3-253:3 -trait- definition:
*/
pub trait ByteChan {
references:-545: impl<W:Writer> ByteChan for WriterByteChan<W> {
326: impl<T,F:Flattener<T>,C:ByteChan> FlatChan<T, F, C> {
610: impl ByteChan for PipeByteChan {
305: impl<T,F:Flattener<T>,C:ByteChan> GenericChan<T> for FlatChan<T, F, C> {
libextra/flatpipes.rs:398:4-398:4 -struct- definition:
pub struct DeserializingUnflattener<D, T> {
deserialize_buffer: DeserializeBuffer<T>
references:-425: DeserializingUnflattener {
101: T, DeserializingUnflattener<DefaultDecoder, T>,
424: -> DeserializingUnflattener<D, T> {
409: for DeserializingUnflattener<D, T> {
134: let unflat: DeserializingUnflattener<DefaultDecoder, T> =
422: impl<D:Decoder,T:Decodable<D>> DeserializingUnflattener<D, T> {
106: T, DeserializingUnflattener<DefaultDecoder, T>, PipeBytePort>;
113: let unflat: DeserializingUnflattener<DefaultDecoder, T> =
libextra/flatpipes.rs:246:3-246:3 -trait- definition:
*/
pub trait BytePort {
references:-523: impl<R:Reader> BytePort for ReaderBytePort<R> {
260: impl<T,U:Unflattener<T>,P:BytePort> GenericPort<T> for FlatPort<T, U, P> {
580: impl BytePort for PipeBytePort {
317: impl<T,U:Unflattener<T>,P:BytePort> FlatPort<T, U, P> {
libextra/flatpipes.rs:232:3-232:3 -trait- definition:
*/
pub trait Flattener<T> {
references:-370: impl<T:Clone + Send> Flattener<T> for PodFlattener<T> {
305: impl<T,F:Flattener<T>,C:ByteChan> GenericChan<T> for FlatChan<T, F, C> {
415: impl<S:Encoder,T:Encodable<S>> Flattener<T>
326: impl<T,F:Flattener<T>,C:ByteChan> FlatChan<T, F, C> {
libextra/flatpipes.rs:188:4-188:4 -ty- definition:
pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;
references:-223: pub fn pipe_stream<T:Clone + Send>() -> (PipePort<T>, PipeChan<T>) {
216: pub fn pipe_chan<T:Clone + Send>(chan: Chan<~[u8]>) -> PipeChan<T> {
libextra/flatpipes.rs:103:4-103:4 -ty- definition:
pub type WriterChan<T, W> = FlatChan<
T, SerializingFlattener<DefaultEncoder, T>, WriterByteChan<W>>;
references:-122: W: Writer>(writer: W) -> WriterChan<T, W> {
libextra/flatpipes.rs:576:4-576:4 -struct- definition:
pub struct PipeByteChan {
chan: comm::Chan<~[u8]>
references:-188: pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;
625: impl PipeByteChan {
627: PipeByteChan {
626: pub fn new(c: Chan<~[u8]>) -> PipeByteChan {
108: T, SerializingFlattener<DefaultEncoder, T>, PipeByteChan>;
610: impl ByteChan for PipeByteChan {
libextra/flatpipes.rs:142:4-142:4 -fn- definition:
pub fn pipe_chan<T:Encodable<DefaultEncoder>>(
chan: Chan<~[u8]>
references:-157: return (pipe_port(port), pipe_chan(chan));
libextra/flatpipes.rs:515:4-515:4 -struct- definition:
pub struct ReaderBytePort<R> {
reader: R
references:-551: impl<R:Reader> ReaderBytePort<R> {
102: ReaderBytePort<R>>;
523: impl<R:Reader> BytePort for ReaderBytePort<R> {
184: FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>;
553: ReaderBytePort {
552: pub fn new(r: R) -> ReaderBytePort<R> {
libextra/flatpipes.rs:456:4-456:4 -fn- definition:
pub fn serialize_value<D: Encoder + FromWriter,
T: Encodable<D>>(
references:-147: serialize_value::<DefaultEncoder, T>);
125: serialize_value::<DefaultEncoder, T>);
libextra/flatpipes.rs:131:4-131:4 -fn- definition:
pub fn pipe_port<T:Decodable<DefaultDecoder>>(
port: Port<~[u8]>
references:-157: return (pipe_port(port), pipe_chan(chan));
libextra/flatpipes.rs:356:4-356:4 -struct- definition:
pub struct PodFlattener<T> {
bogus: ()
references:-203: let flat: PodFlattener<T> = PodFlattener::new();
387: impl<T:Clone + Send> PodFlattener<T> {
186: FlatChan<T, PodFlattener<T>, WriterByteChan<W>>;
389: PodFlattener {
370: impl<T:Clone + Send> Flattener<T> for PodFlattener<T> {
188: pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;
217: let flat: PodFlattener<T> = PodFlattener::new();
388: pub fn new() -> PodFlattener<T> {
libextra/flatpipes.rs:571:4-571:4 -struct- definition:
pub struct PipeBytePort {
port: comm::Port<~[u8]>,
references:-187: pub type PipePort<T> = FlatPort<T, PodUnflattener<T>, PipeBytePort>;
616: impl PipeBytePort {
618: PipeBytePort {
580: impl BytePort for PipeBytePort {
106: T, DeserializingUnflattener<DefaultDecoder, T>, PipeBytePort>;
617: pub fn new(p: Port<~[u8]>) -> PipeBytePort {
libextra/flatpipes.rs:239:3-239:3 -trait- definition:
*/
pub trait Unflattener<T> {
references:-317: impl<T,U:Unflattener<T>,P:BytePort> FlatPort<T, U, P> {
408: impl<D:Decoder,T:Decodable<D>> Unflattener<T>
360: impl<T:Clone + Send> Unflattener<T> for PodUnflattener<T> {
260: impl<T,U:Unflattener<T>,P:BytePort> GenericPort<T> for FlatPort<T, U, P> {
libextra/flatpipes.rs:64:3-64:3 -struct- definition:
*/
pub struct FlatPort<T, U, P> {
references:-318: pub fn new(u: U, p: P) -> FlatPort<T, U, P> {
317: impl<T,U:Unflattener<T>,P:BytePort> FlatPort<T, U, P> {
184: FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>;
100: pub type ReaderPort<T, R> = FlatPort<
105: pub type PipePort<T> = FlatPort<
319: FlatPort {
260: impl<T,U:Unflattener<T>,P:BytePort> GenericPort<T> for FlatPort<T, U, P> {
187: pub type PipePort<T> = FlatPort<T, PodUnflattener<T>, PipeBytePort>;
libextra/flatpipes.rs:404:4-404:4 -struct- definition:
pub struct SerializingFlattener<S, T> {
serialize_value: SerializeValue<T>
references:-104: T, SerializingFlattener<DefaultEncoder, T>, WriterByteChan<W>>;
416: for SerializingFlattener<S, T> {
145: let flat: SerializingFlattener<DefaultEncoder, T> =
108: T, SerializingFlattener<DefaultEncoder, T>, PipeByteChan>;
431: impl<S:Encoder,T:Encodable<S>> SerializingFlattener<S, T> {
433: -> SerializingFlattener<S, T> {
123: let flat: SerializingFlattener<DefaultEncoder, T> =
434: SerializingFlattener {
libextra/flatpipes.rs:352:4-352:4 -struct- definition:
pub struct PodUnflattener<T> {
bogus: ()
references:-380: pub fn new() -> PodUnflattener<T> {
184: FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>;
381: PodUnflattener {
210: let unflat: PodUnflattener<T> = PodUnflattener::new();
194: let unflat: PodUnflattener<T> = PodUnflattener::new();
187: pub type PipePort<T> = FlatPort<T, PodUnflattener<T>, PipeBytePort>;
379: impl<T:Clone + Send> PodUnflattener<T> {
360: impl<T:Clone + Send> Unflattener<T> for PodUnflattener<T> {
libextra/flatpipes.rs:402:4-402:4 -ty- definition:
pub type SerializeValue<T> = ~fn(val: &T) -> ~[u8];
references:-405: serialize_value: SerializeValue<T>
432: pub fn new(serialize_value: SerializeValue<T>)
libextra/flatpipes.rs:187:4-187:4 -ty- definition:
pub type PipePort<T> = FlatPort<T, PodUnflattener<T>, PipeBytePort>;
pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>;
references:-209: pub fn pipe_port<T:Clone + Send>(port: Port<~[u8]>) -> PipePort<T> {
223: pub fn pipe_stream<T:Clone + Send>() -> (PipePort<T>, PipeChan<T>) {
libextra/flatpipes.rs:100:4-100:4 -ty- definition:
pub type ReaderPort<T, R> = FlatPort<
T, DeserializingUnflattener<DefaultDecoder, T>,
references:-112: R: Reader>(reader: R) -> ReaderPort<T, R> {
libextra/flatpipes.rs:216:4-216:4 -fn- definition:
pub fn pipe_chan<T:Clone + Send>(chan: Chan<~[u8]>) -> PipeChan<T> {
let flat: PodFlattener<T> = PodFlattener::new();
references:-225: return (pipe_port(port), pipe_chan(chan));
libextra/flatpipes.rs:209:4-209:4 -fn- definition:
pub fn pipe_port<T:Clone + Send>(port: Port<~[u8]>) -> PipePort<T> {
let unflat: PodUnflattener<T> = PodUnflattener::new();
references:-225: return (pipe_port(port), pipe_chan(chan));
libextra/flatpipes.rs:519:4-519:4 -struct- definition:
pub struct WriterByteChan<W> {
writer: W
references:-559: impl<W:Writer> WriterByteChan<W> {
186: FlatChan<T, PodFlattener<T>, WriterByteChan<W>>;
545: impl<W:Writer> ByteChan for WriterByteChan<W> {
104: T, SerializingFlattener<DefaultEncoder, T>, WriterByteChan<W>>;
560: pub fn new(w: W) -> WriterByteChan<W> {
561: WriterByteChan {
libextra/flatpipes.rs:185:4-185:4 -ty- definition:
pub type WriterChan<T, W> =
FlatChan<T, PodFlattener<T>, WriterByteChan<W>>;
references:-202: ) -> WriterChan<T, W> {
libextra/flatpipes.rs:470:4-470:4 -trait- definition:
pub trait FromWriter {
fn from_writer(w: @Writer) -> Self;
references:-485: impl FromWriter for json::Encoder {
471: fn from_writer(w: @Writer) -> Self;
456: pub fn serialize_value<D: Encoder + FromWriter,
499: impl FromWriter for ebml::writer::Encoder {
libextra/flatpipes.rs:105:4-105:4 -ty- definition:
pub type PipePort<T> = FlatPort<
T, DeserializingUnflattener<DefaultDecoder, T>, PipeBytePort>;
references:-155: ) -> (PipePort<T>, PipeChan<T>) {
133: ) -> PipePort<T> {
libextra/flatpipes.rs:107:4-107:4 -ty- definition:
pub type PipeChan<T> = FlatChan<
T, SerializingFlattener<DefaultEncoder, T>, PipeByteChan>;
references:-155: ) -> (PipePort<T>, PipeChan<T>) {
144: ) -> PipeChan<T> {
libextra/flatpipes.rs:445:4-445:4 -fn- definition:
pub fn deserialize_buffer<D: Decoder + FromReader,
T: Decodable<D>>(
references:-136: deserialize_buffer::<DefaultDecoder, T>);
115: deserialize_buffer::<DefaultDecoder, T>);