diff --git a/Cargo.toml b/Cargo.toml index b09514e..51af92d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,9 @@ categories = ["asynchronous", "network-programming"] bytes = "0.5.2" futures = "0.3.1" memchr = "2.2.1" -pin-project = "0.4.6" \ No newline at end of file +pin-project = "0.4.6" +serde = "1.0.103" +bincode = "1.2.1" + +[dev-dependencies] +serde_derive = "1.0.103" diff --git a/src/codec/mod.rs b/src/codec/mod.rs index f97b666..47a144f 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -6,3 +6,6 @@ pub use self::length::LengthCodec; mod lines; pub use self::lines::LinesCodec; + +mod serde; +pub use self::serde::SerdeCodec; diff --git a/src/codec/serde.rs b/src/codec/serde.rs new file mode 100644 index 0000000..840df2f --- /dev/null +++ b/src/codec/serde.rs @@ -0,0 +1,60 @@ +use std::io; +use std::marker::PhantomData; +use bytes::{BytesMut, Bytes}; +use serde::Serialize; +use serde::de::DeserializeOwned; +use bincode; + +use crate::encoder::Encoder; +use crate::decoder::Decoder; +use crate::codec::LengthCodec; + +/// Encodes/decodes types implementing Serde Serialize/Deserialize traits. +/// It is built on top of `LengthCodec`. +pub struct SerdeCodec { + inner: LengthCodec, + phantom: PhantomData, +} + +impl Default for SerdeCodec { + fn default() -> Self { + Self { + inner: LengthCodec {}, + phantom: PhantomData, + } + } +} + +impl Encoder for SerdeCodec { + type Item = T; + type Error = io::Error; + + fn encode(&mut self, src: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + let data = bincode::serialize(&src).map_err(to_io_err)?; + let bytes = Bytes::from(data); + self.inner.encode(bytes, dst) + } +} + +impl Decoder for SerdeCodec { + type Item = T; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self.inner.decode(src)? { + Some(bytes) => { + bincode::deserialize(&bytes) + .map_err(to_io_err) + .map(|item| Some(item)) + } + None => Ok(None), + } + } +} + +fn to_io_err(err: Box) -> io::Error { + match *err { + bincode::ErrorKind::Io(e) => e, + other => io::Error::new(io::ErrorKind::Other, other), + } +} diff --git a/src/lib.rs b/src/lib.rs index c9ec45c..8094084 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,7 +22,7 @@ //! ``` mod codec; -pub use codec::{BytesCodec, LengthCodec, LinesCodec}; +pub use codec::{BytesCodec, LengthCodec, LinesCodec, SerdeCodec}; mod decoder; pub use decoder::Decoder; diff --git a/tests/serde.rs b/tests/serde.rs new file mode 100644 index 0000000..2e2ea23 --- /dev/null +++ b/tests/serde.rs @@ -0,0 +1,47 @@ +use futures::io::Cursor; +use futures::{executor, SinkExt, StreamExt}; +use futures_codec::{Framed, SerdeCodec}; +use serde_derive::{Serialize, Deserialize}; + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +struct Person { + name: String, + age: u8, +} + +impl Person { + fn new(name: &str, age: u8) -> Self { + Self { + name: name.into(), + age, + } + } +} + +#[test] +fn serializes_serde_enabled_structures() { + let cur = Cursor::new(vec![0; 4096]); + let mut framed = Framed::new(cur, SerdeCodec::default()); + + let send_msgs = async { + framed.send(Person::new("John", 11)).await.unwrap(); + framed.send(Person::new("Paul", 12)).await.unwrap(); + framed.send(Person::new("Mike", 13)).await.unwrap(); + }; + executor::block_on(send_msgs); + + let (mut cur, _) = framed.release(); + cur.set_position(0); + let framed = Framed::new(cur, SerdeCodec::default()); + + let recv_msgs = framed.take(3) + .map(|res| res.unwrap()) + .collect::>(); + let items: Vec = executor::block_on(recv_msgs); + + assert!(items == vec![ + Person::new("John", 11), + Person::new("Paul", 12), + Person::new("Mike", 13), + ]) +}