Skip to content

Commit

Permalink
deserialize part
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Feb 25, 2025
1 parent 2990f4b commit c242b7d
Showing 1 changed file with 75 additions and 10 deletions.
85 changes: 75 additions & 10 deletions src/common/src/util/value_encoding/column_aware_row_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@ use rw_iter_util::ZipEqDebug;

use super::error::ValueEncodingError;
use super::{Result, ValueRowDeserializer, ValueRowSerializer};
use crate::catalog::{ColumnDesc, ColumnId};
use crate::catalog::ColumnId;
use crate::row::Row;
use crate::types::{DataType, Datum, ScalarRefImpl, ToDatumRef};

mod serde {
mod new_serde {
use itertools::Itertools;

use super::*;
use crate::array::{ListRef, MapRef, StructRef};
use crate::types::{MapType, StructType};
use crate::util::value_encoding::serialize_scalar as plain_serialize_scalar;
use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
use crate::types::{MapType, ScalarImpl, StructType};
use crate::util::value_encoding::{
deserialize_value as plain_deserialize_scalar, serialize_scalar as plain_serialize_scalar,
};

fn new_serialize_datum(
data_type: &DataType,
Expand All @@ -54,11 +58,12 @@ mod serde {

fn new_serialize_struct(struct_type: &StructType, value: StructRef<'_>, buf: &mut impl BufMut) {
let serializer = super::Serializer::new_new(
&[], // TODO: column ids
struct_type.types().cloned(),
&[], // TODO: column ids
struct_type.types().cloned(), // TODO: avoid this clone
);

let bytes = serializer.serialize(value);
let bytes = serializer.serialize(value); // TODO: serialize into the buf directly
buf.put_u32_le(bytes.len() as _);
buf.put_slice(&bytes);
}

Expand Down Expand Up @@ -96,6 +101,66 @@ mod serde {
_ => plain_serialize_scalar(value, buf),
}
}

// --- deserialize ---

fn new_inner_deserialize_datum(data: &mut impl Buf, ty: &DataType) -> Result<Datum> {
let null_tag = data.get_u8();
match null_tag {
0 => Ok(None),
1 => Some(new_deserialize_scalar(ty, data)).transpose(),
_ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
}
}

fn new_deserialize_struct(struct_def: &StructType, data: &mut impl Buf) -> Result<ScalarImpl> {
let deserializer = super::Deserializer::new(
&[], // TODO: column ids
struct_def.types().cloned().collect_vec().into(), // TODO: avoid this clone
std::iter::empty(),
);
let encoded_len = data.get_u32_le() as usize;
let data = data.copy_to_bytes(encoded_len); // TODO: avoid copy
let fields = deserializer.deserialize(&data)?;

Ok(ScalarImpl::Struct(StructValue::new(fields)))
}

fn new_deserialize_list(item_type: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
let len = data.get_u32_le();
let mut builder = item_type.create_array_builder(len as usize);
for _ in 0..len {
builder.append(new_inner_deserialize_datum(data, item_type)?);
}
Ok(ScalarImpl::List(ListValue::new(builder.finish())))
}

fn new_deserialize_map(map_type: &MapType, data: &mut impl Buf) -> Result<ScalarImpl> {
let len = data.get_u32_le();
let mut builder = map_type
.clone() // FIXME: clone type everytime here is inefficient
.into_struct()
.create_array_builder(len as usize);
for _ in 0..len {
let key = new_deserialize_scalar(map_type.key(), data)?;
let value = new_inner_deserialize_datum(data, map_type.value())?;
let entry = StructValue::new(vec![Some(key), value]);
builder.append(Some(ScalarImpl::Struct(entry)));
}
Ok(ScalarImpl::Map(MapValue::from_entries(ListValue::new(
builder.finish(),
))))
}

pub fn new_deserialize_scalar(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
Ok(match ty {
DataType::Struct(struct_def) => new_deserialize_struct(struct_def, data)?,
DataType::List(item_type) => new_deserialize_list(item_type, data)?,
DataType::Map(map_type) => new_deserialize_map(map_type, data)?,

_ => plain_deserialize_scalar(ty, data)?,
})
}
}

/// The width of the offset of the encoded data, i.e., how many bytes are used to represent the offset.
Expand Down Expand Up @@ -177,7 +242,7 @@ where
{
fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>) {
if let Some(v) = self.to_datum_ref() {
serde::new_serialize_scalar(data_type, v, data);
new_serde::new_serialize_scalar(data_type, v, data);
}
}
}
Expand Down Expand Up @@ -418,7 +483,7 @@ impl ValueRowDeserializer for Deserializer {
let datum = if data.is_empty() {
None
} else {
Some(super::deserialize_value(
Some(new_serde::new_deserialize_scalar(
&self.schema[decoded_idx],
&mut data,
)?)
Expand Down

0 comments on commit c242b7d

Please # to comment.