Skip to content

Commit

Permalink
Implement codec for types supporting serde serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasb committed Dec 15, 2019
1 parent b9ae4c7 commit ba2e737
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 2 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ categories = ["asynchronous", "network-programming"]
bytes = "0.5.2"
futures = "0.3.1"
memchr = "2.2.1"
pin-project = "0.4.6"
pin-project = "0.4.6"
serde = "1.0.103"
bincode = "1.2.1"

[dev-dependencies]
serde_derive = "1.0.103"
3 changes: 3 additions & 0 deletions src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ pub use self::length::LengthCodec;

mod lines;
pub use self::lines::LinesCodec;

mod serde;
pub use self::serde::SerdeCodec;
60 changes: 60 additions & 0 deletions src/codec/serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::io;
use std::marker::PhantomData;
use bytes::{BytesMut, Bytes};
use serde::Serialize;
use serde::de::DeserializeOwned;
use bincode;

use crate::encoder::Encoder;
use crate::decoder::Decoder;
use crate::codec::LengthCodec;

/// Encodes/decodes types implementing Serde Serialize/Deserialize traits.
/// It is built on top of `LengthCodec`.
pub struct SerdeCodec<T: Serialize + DeserializeOwned> {
inner: LengthCodec,
phantom: PhantomData<T>,
}

impl<T: Serialize + DeserializeOwned> Default for SerdeCodec<T> {
fn default() -> Self {
Self {
inner: LengthCodec {},
phantom: PhantomData,
}
}
}

impl<T: Serialize + DeserializeOwned> Encoder for SerdeCodec<T> {
type Item = T;
type Error = io::Error;

fn encode(&mut self, src: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let data = bincode::serialize(&src).map_err(to_io_err)?;
let bytes = Bytes::from(data);
self.inner.encode(bytes, dst)
}
}

impl<T: Serialize + DeserializeOwned> Decoder for SerdeCodec<T> {
type Item = T;
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.inner.decode(src)? {
Some(bytes) => {
bincode::deserialize(&bytes)
.map_err(to_io_err)
.map(|item| Some(item))
}
None => Ok(None),
}
}
}

fn to_io_err(err: Box<bincode::ErrorKind>) -> io::Error {
match *err {
bincode::ErrorKind::Io(e) => e,
other => io::Error::new(io::ErrorKind::Other, other),
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! ```
mod codec;
pub use codec::{BytesCodec, LengthCodec, LinesCodec};
pub use codec::{BytesCodec, LengthCodec, LinesCodec, SerdeCodec};

mod decoder;
pub use decoder::Decoder;
Expand Down
47 changes: 47 additions & 0 deletions tests/serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use futures::io::Cursor;
use futures::{executor, SinkExt, StreamExt};
use futures_codec::{Framed, SerdeCodec};
use serde_derive::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct Person {
name: String,
age: u8,
}

impl Person {
fn new(name: &str, age: u8) -> Self {
Self {
name: name.into(),
age,
}
}
}

#[test]
fn serializes_serde_enabled_structures() {
let cur = Cursor::new(vec![0; 4096]);
let mut framed = Framed::new(cur, SerdeCodec::default());

let send_msgs = async {
framed.send(Person::new("John", 11)).await.unwrap();
framed.send(Person::new("Paul", 12)).await.unwrap();
framed.send(Person::new("Mike", 13)).await.unwrap();
};
executor::block_on(send_msgs);

let (mut cur, _) = framed.release();
cur.set_position(0);
let framed = Framed::new(cur, SerdeCodec::default());

let recv_msgs = framed.take(3)
.map(|res| res.unwrap())
.collect::<Vec<_>>();
let items: Vec<Person> = executor::block_on(recv_msgs);

assert!(items == vec![
Person::new("John", 11),
Person::new("Paul", 12),
Person::new("Mike", 13),
])
}

0 comments on commit ba2e737

Please # to comment.