From 565a66046bb298fde896c5ee5deb8bd9b7359627 Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Thu, 6 Mar 2025 12:38:39 -0800 Subject: [PATCH 1/2] initial prototype --- arrow-json/Cargo.toml | 3 + arrow-json/src/reader/tape.rs | 542 ++++++++++++++++++++++++++++++++++ 2 files changed, 545 insertions(+) diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index 892238fe6828..8097a4ad6d0a 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -50,6 +50,9 @@ memchr = "2.7.4" simdutf8 = "0.1.5" [dev-dependencies] +bytebuffer = "2.3" +num_enum = "0.7" +static_assertions = "1.1" flate2 = { version = "1", default-features = false, features = ["rust_backend"] } serde = { version = "1.0", default-features = false, features = ["derive"] } futures = "0.3" diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index ed65baab9f2b..baef1bfef837 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -951,4 +951,546 @@ mod tests { let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Encountered truncated UTF-8 sequence"); } + + use bytebuffer::{ByteBuffer, Endian}; + use num_enum::IntoPrimitive; + use static_assertions::const_assert; + use std::collections::HashMap; + + #[derive(IntoPrimitive)] + #[repr(u64)] + enum BasicType { + Primitive = 0, + ShortString, + Object, + Array, + } + pub const BASIC_TYPE_BITS: u8 = 2; + + /// For efficiency, JSON parsing only supports a subset of Variant types that can be easily and + /// directly inferred from the JSON type, with some concessions for JSON numeric to avoid + /// information loss. Fancy values like Date and Uuid just come back as strings. + #[derive(IntoPrimitive)] + #[repr(u64)] + enum PrimitiveType { + Null = 0, + True, + False, + Int8, + Int16, + Int32, + Int64, + Double, + Decimal4, + Decimal8, + Decimal16 = 10, + Float = 14, + String = 16, + } + pub const VALUE_HEADER_BITS: u8 = 6; + + fn write_value_header( + values: &mut ByteBuffer, + basic_type: impl Into, + header: impl Into, + ) { + // WARNING: `ByteBuffer::write_bits` puts highest bits first, even in little-endian mode, so we have add + // the bitfields in reverse order. + values.write_bits(header.into(), VALUE_HEADER_BITS); + values.write_bits(basic_type.into(), BASIC_TYPE_BITS); + } + + /// Attempts to parse `n` as an integer no larger than i64. If successful, writes out the value + /// as the smallest int type that can hold it and returns true. Otherwise, returns false. + fn try_decode_int(n: &str, scale: u8, values: &mut ByteBuffer) -> bool { + let Ok(val): Result = n.parse() else { + return false; + }; + + if let Ok(val) = val.try_into() { + write_value_header(values, BasicType::Primitive, PrimitiveType::Int8); + values.write_u8(scale); + values.write_i8(val); + } else if let Ok(val) = val.try_into() { + write_value_header(values, BasicType::Primitive, PrimitiveType::Int16); + values.write_u8(scale); + values.write_i16(val); + } else if let Ok(val) = val.try_into() { + write_value_header(values, BasicType::Primitive, PrimitiveType::Int32); + values.write_u8(scale); + values.write_i32(val); + } else { + write_value_header(values, BasicType::Primitive, PrimitiveType::Int64); + values.write_u8(scale); + values.write_i64(val); + } + true + } + + /// Attempts to parse `n` as a decimal with precision no larger than 38 (backed by i128). If + /// successful, writes out the value as the smallest decimal type that can hold it and returns + /// true. Otherwise, returns false. + /// + /// NOTE: Variant decimals only store their scale (required for correctness). The precision + /// (always 9, 18, or 38) is determined by the size of the backing int. + fn try_decode_decimal(n: &str, scale: u8, values: &mut ByteBuffer) -> bool { + let n: String = n.chars().filter(|c| *c != '.').collect(); + let Ok(n) = n.parse() else { + return false; // too big even for i128 + }; + + // WARNING: Each decimal type has a smaller range than its backing integer, based on the + // number of base-10 digits the backing integer can fully represent. For example, decimal4 + // (backed by i32) is capped at precision=9 because the number 999999999 fits in 31 bits + // while 9999999999 (precision 10) needs 34 bits. We must exclude the subset of precision=10 + // numbers like 1234567890 that happen to fit in 31 bits. + match n { + -999999999..=999999999 => { + write_value_header(values, BasicType::Primitive, PrimitiveType::Decimal4); + values.write_u8(scale); + values.write_i32(n as _); // the match clause proved it fits + } + -999999999999999999..=999999999999999999 => { + write_value_header(values, BasicType::Primitive, PrimitiveType::Decimal8); + values.write_u8(scale); + values.write_i64(n as _); // the match clause proved it fits + } + -99999999999999999999999999999999999999..=99999999999999999999999999999999999999 => { + write_value_header(values, BasicType::Primitive, PrimitiveType::Decimal16); + values.write_u8(scale); + values.write_i128(n); + } + _ => return false, // too big for decimal16 + } + + true + } + + fn decode_number(n: &str, values: &mut ByteBuffer) { + // See `number` grammar rule: https://www.json.org/json-en.html + if !n.contains(['e', 'E']) { + if let Some(dot_pos) = n.find('.') { + let scale = n.len() - dot_pos - 1; // e.g. -1234.567 => 9-5-1 = 3 + if try_decode_decimal(n, u8::try_from(scale).expect("TODO"), values) { + return; // else fall out and parse as float + } + } else { + // No decimal point, so prefer int but use decimal if too large for i64. + if try_decode_int(n, 0, values) || try_decode_decimal(n, 0, values) { + return; // else fall out and parse as float + } + } + } + + // Floating point. No way to know the best size, so always parse it as double. + write_value_header(values, BasicType::Primitive, PrimitiveType::Double); + values.write_f64(n.parse().expect("TODO")); + } + + // Visits all top-level elements between start_idx and end_idx + fn for_each_element(tape: &Tape, start_idx: u32, end_idx: u32, mut f: impl FnMut(u32)) { + let mut idx = start_idx + 1; + while idx < end_idx { + f(idx); + idx = tape.next(idx, "value").expect("TODO"); + } + } + + /// Collects the strings we eventually need to store in a variant `metadata` column. We don't + /// know in advance how many strings there will be, and we prefer to dedup for space savings. As + /// each new string is inserted, we assign it an index, which the value column will use to refer + /// to it. We also write out the byte offset, since that can be computed easily up front. After + /// all strings are collected, the `build` method appends their actual bytes in order. + struct MetadataBuilder<'a> { + metadata: ByteBuffer, + strings: HashMap<&'a str, u32>, + num_string_bytes: u32, + } + impl<'a> MetadataBuilder<'a> { + fn new() -> Self { + let mut metadata = ByteBuffer::new(); + metadata.set_endian(Endian::LittleEndian); + metadata.write_bits(0b_11_0_0, 4); // 4-byte offsets, , not sorted + metadata.write_bits(1, 4); // version + metadata.write_u32(0); // placeholder for dictionary_size + Self { + metadata, + strings: HashMap::new(), + num_string_bytes: 0, + } + } + + fn add_string(&mut self, s: &'a str) -> u32 { + if let Some(idx) = self.strings.get(s) { + return *idx; + } + + let idx = u32::try_from(self.strings.len()).expect("TODO"); + self.strings.insert(s, idx); + + self.num_string_bytes += u32::try_from(s.len()).expect("TODO"); + self.metadata.write_u32(self.num_string_bytes); + idx + } + + fn build(self) -> Vec { + let mut strings: Vec<_> = self.strings.into_iter().collect(); + let dictionary_size = strings.len(); + strings.sort_by_key(|(_, idx)| *idx); + + // Append the string bytes now that the offset list is complete + let mut metadata = self.metadata; + for (name, _) in strings { + metadata.write_bytes(name.as_bytes()); + } + + // Fix up the dictionary_size field now that we know its value + metadata.set_wpos(1); + metadata.write_u32(u32::try_from(dictionary_size).expect("TODO")); + metadata.into_vec() + } + } + + fn decode_list<'a>( + start_idx: u32, + end_idx: u32, + tape: &Tape<'a>, + metadata: &mut MetadataBuilder<'a>, + values: &mut ByteBuffer, + ) { + // is_large=true, 4-byte offsets (tape can anyway only handle 32-bit offsets) + write_value_header(values, BasicType::Array, 0b_1_11_u8); + + // Write a placeholder for the number of elements + let num_elements_wpos = values.len(); + values.write_u32(0); + + // The first field offset is always 0, and each array element adds another offset. + values.write_u32(0); + + // Reserve space for the rest of the field offset array while computing the array + // length. Use the exact same loop here and below, to avoid any risk of bad array math. + let mut field_offset_array_wpos = values.len(); + let mut num_elements = 0; + for_each_element(tape, start_idx, end_idx, |_| { + values.write_u32(0); + num_elements += 1; + }); + + // Go back and set num_elements now that it's known + values.set_wpos(num_elements_wpos); + values.write_u32(num_elements); + values.set_wpos(values.len()); + + let value_bytes_start = values.len(); + + for_each_element(tape, start_idx, end_idx, |idx| { + // Write out the value of this array element + decode_one(idx, tape, metadata, values); + + // Write out the end offset of this array element's value + let end_offset = values.len() - value_bytes_start; + values.set_wpos(field_offset_array_wpos); + values.write_u32(u32::try_from(end_offset).expect("TODO")); + field_offset_array_wpos = values.get_wpos(); + values.set_wpos(values.len()); + }); + } + + fn decode_object<'a>( + start_idx: u32, + end_idx: u32, + tape: &Tape<'a>, + metadata: &mut MetadataBuilder<'a>, + values: &mut ByteBuffer, + ) { + // is_large=true, 4-byte name ids, 4-byte value offsets + write_value_header(values, BasicType::Object, 0b_1_11_11_u8); + + // WARNING: The fields of an object must be laid out in lexicographical name order. Make an + // initial pass in order to extract and sort the field names, and to reserve space for the + // name ids and value offsets. There are n+1 value offsets, so reserve the extra slot now. + let mut field_id_wpos = values.len(); + + values.write_u32(0); + let mut value_offset_wpos = values.len(); + + let mut field_indexes = Vec::new(); + let mut idx = start_idx; + while idx < end_idx { + // Each field is comprised of a name (string) immediately followed by a value (any type) + let TapeElement::String(name_idx) = tape.get(idx) else { + panic!("TODO"); + }; + let name = tape.get_string(name_idx); + field_indexes.push((name, idx + 1)); + + // Reserve space for both field id and value offset. Each field id we write increments + // the value_offset_wpos so that it remains accurate at all times. + // + // NOTE: It's important to write zeros here because one of these calls initializes the + // first value offset that must always be zero. We just don't know in advance which one. + values.write_u32(0); + values.write_u32(0); + + // advance past both name and value + idx = tape.next(idx + 1, "value").expect("TODO"); + } + field_indexes.sort_by_key(|(name, _)| -> &'a str { name }); + + let value_bytes_start = values.len(); + for (name, idx) in field_indexes { + // Append the value bytes + decode_one(idx, tape, metadata, values); + let end_offset = values.len() - value_bytes_start; + + let name_idx = metadata.add_string(name); + values.set_wpos(field_id_wpos); + values.write_u32(name_idx); + field_id_wpos = values.get_wpos(); + + values.set_wpos(value_offset_wpos); + values.write_u32(u32::try_from(end_offset).expect("TODO")); + value_offset_wpos = values.get_wpos(); + values.set_wpos(values.len()); + } + } + + fn decode_one<'a>( + idx: u32, + tape: &Tape<'a>, + metadata: &mut MetadataBuilder<'a>, + values: &mut ByteBuffer, + ) -> u32 { + let element = tape.get(idx); + match element { + TapeElement::Null => { + write_value_header(values, BasicType::Primitive, PrimitiveType::Null); + idx + 1 + } + TapeElement::True => { + write_value_header(values, BasicType::Primitive, PrimitiveType::True); + idx + 1 + } + TapeElement::False => { + write_value_header(values, BasicType::Primitive, PrimitiveType::False); + idx + 1 + } + TapeElement::String(idx) => { + let s = tape.get_string(idx); + if s.len() <= (1 << VALUE_HEADER_BITS) - 1 { + // There is no `From for u64`. 128-bit architectures rejoice? + const_assert!(size_of::() <= size_of::()); + write_value_header(values, BasicType::ShortString, s.len() as u64); + values.write_bytes(s.as_bytes()); + } else { + write_value_header(values, BasicType::Primitive, PrimitiveType::String); + values.write_string(s); + } + idx + 1 + } + TapeElement::Number(idx) => { + decode_number(tape.get_string(idx), values); + idx + 1 + } + TapeElement::StartObject(end_idx) => { + decode_object(idx, end_idx, tape, metadata, values); + end_idx + 1 + } + TapeElement::StartList(end_idx) => { + decode_list(idx, end_idx, tape, metadata, values); + end_idx + 1 + } + TapeElement::I32(val) => { + write_value_header(values, BasicType::Primitive, PrimitiveType::Int32); + values.write_i32(val); + idx + 1 + } + TapeElement::I64(hi) => { + let TapeElement::I32(lo) = tape.get(idx + 1) else { + panic!("TODO"); + }; + write_value_header(values, BasicType::Primitive, PrimitiveType::Int64); + values.write_i32(lo); + values.write_i32(hi); + idx + 2 + } + TapeElement::F32(val) => { + write_value_header(values, BasicType::Primitive, PrimitiveType::Float); + values.write_u32(val); + idx + 1 + } + TapeElement::F64(hi) => { + let TapeElement::F32(lo) = tape.get(idx + 1) else { + panic!("TODO"); + }; + write_value_header(values, BasicType::Primitive, PrimitiveType::Double); + values.write_u32(lo); + values.write_u32(hi); + idx + 2 + } + TapeElement::EndObject(_) | TapeElement::EndList(_) => { + panic!("TODO: unexpected element {element:?}"); + } + } + } + + #[test] + fn test_variant_decoding() { + let a = r#" + {"hello": "world", "foo": 2, "bar": 45} + + {"foo": "bar"} + + {"fiz": null} + + {"a": true, "b": false, "c": null} + + {"a": "", "": "a"} + + {"a": "b", "object": {"nested": "hello", "foo": 23}, "b": {}, "c": {"foo": null }} + + {"a": ["", "foo", ["bar", "c"]], "b": {"1": []}, "c": {"2": [1, 2.71, 3e3]} } + "#; + let mut decoder = TapeDecoder::new(16, 2); + decoder.decode(a.as_bytes()).unwrap(); + + let finished = decoder.finish().unwrap(); + assert_eq!( + finished.elements, + &[ + TapeElement::Null, + TapeElement::StartObject(8), // {"hello": "world", "foo": 2, "bar": 45} + TapeElement::String(0), // "hello" + TapeElement::String(1), // "world" + TapeElement::String(2), // "foo" + TapeElement::Number(3), // 2 + TapeElement::String(4), // "bar" + TapeElement::Number(5), // 45 + TapeElement::EndObject(1), + TapeElement::StartObject(12), // {"foo": "bar"} + TapeElement::String(6), // "foo" + TapeElement::String(7), // "bar" + TapeElement::EndObject(9), + TapeElement::StartObject(16), // {"fiz": null} + TapeElement::String(8), // "fiz + TapeElement::Null, // null + TapeElement::EndObject(13), + TapeElement::StartObject(24), // {"a": true, "b": false, "c": null} + TapeElement::String(9), // "a" + TapeElement::True, // true + TapeElement::String(10), // "b" + TapeElement::False, // false + TapeElement::String(11), // "c" + TapeElement::Null, // null + TapeElement::EndObject(17), + TapeElement::StartObject(30), // {"a": "", "": "a"} + TapeElement::String(12), // "a" + TapeElement::String(13), // "" + TapeElement::String(14), // "" + TapeElement::String(15), // "a" + TapeElement::EndObject(25), + TapeElement::StartObject(49), // {"a": "b", "object": {"nested": "hello", "foo": 23}, "b": {}, "c": {"foo": null }} + TapeElement::String(16), // "a" + TapeElement::String(17), // "b" + TapeElement::String(18), // "object" + TapeElement::StartObject(40), // {"nested": "hello", "foo": 23} + TapeElement::String(19), // "nested" + TapeElement::String(20), // "hello" + TapeElement::String(21), // "foo" + TapeElement::Number(22), // 23 + TapeElement::EndObject(35), + TapeElement::String(23), // "b" + TapeElement::StartObject(43), // {} + TapeElement::EndObject(42), + TapeElement::String(24), // "c" + TapeElement::StartObject(48), // {"foo": null } + TapeElement::String(25), // "foo" + TapeElement::Null, // null + TapeElement::EndObject(45), + TapeElement::EndObject(31), + TapeElement::StartObject(75), // {"a": ["", "foo", ["bar", "c"]], "b": {"1": []}, "c": {"2": [1, 2.71, 3e3]} } + TapeElement::String(26), // "a" + TapeElement::StartList(59), // ["", "foo", ["bar", "c"]] + TapeElement::String(27), // "" + TapeElement::String(28), // "foo" + TapeElement::StartList(58), // ["bar", "c"] + TapeElement::String(29), // "bar" + TapeElement::String(30), // "c" + TapeElement::EndList(55), + TapeElement::EndList(52), + TapeElement::String(31), // "b" + TapeElement::StartObject(65), // {"1": []} + TapeElement::String(32), // "1" + TapeElement::StartList(64), // [] + TapeElement::EndList(63), + TapeElement::EndObject(61), + TapeElement::String(33), // "c" + TapeElement::StartObject(74), // {"2": [1, 2.71, 3e3]} + TapeElement::String(34), // "2" + TapeElement::StartList(73), // [1, 2.71, 3e3] + TapeElement::Number(35), // 1 + TapeElement::Number(36), // 2.71 + TapeElement::Number(37), // 3e3 + TapeElement::EndList(69), + TapeElement::EndObject(67), + TapeElement::EndObject(50) + ] + ); + + // null + let mut values = ByteBuffer::new(); + values.set_endian(Endian::LittleEndian); + let mut metadata = MetadataBuilder::new(); + decode_one(0, &finished, &mut metadata, &mut values); + let metadata = metadata.build(); + let values = values.into_vec(); + assert_eq!(&values, &[0b_000000_00]); + assert_eq!(&metadata, &[0b_11_0_0_0001, 0, 0, 0, 0]); + + // true + let mut values = ByteBuffer::new(); + values.set_endian(Endian::LittleEndian); + let mut metadata = MetadataBuilder::new(); + decode_one(19, &finished, &mut metadata, &mut values); + let metadata = metadata.build(); + let values = values.into_vec(); + assert_eq!(&values, &[0b_000001_00]); + assert_eq!(&metadata, &[0b_11_0_0_0001, 0, 0, 0, 0]); + + // false + let mut values = ByteBuffer::new(); + values.set_endian(Endian::LittleEndian); + let mut metadata = MetadataBuilder::new(); + decode_one(21, &finished, &mut metadata, &mut values); + let metadata = metadata.build(); + let values = values.into_vec(); + assert_eq!(&values, &[0b_000010_00]); + assert_eq!(&metadata, &[0b_11_0_0_0001, 0, 0, 0, 0]); + + // short string + let mut values = ByteBuffer::new(); + values.set_endian(Endian::LittleEndian); + let mut metadata = MetadataBuilder::new(); + decode_one(2, &finished, &mut metadata, &mut values); + let metadata = metadata.build(); + let values = values.into_vec(); + assert_eq!(values[0], 0b_000101_01); + assert_eq!(&values[1..], b"hello"); + assert_eq!(&metadata, &[0b_11_0_0_0001, 0, 0, 0, 0]); + + panic!("boom"); + + assert_eq!( + finished.strings, + "helloworldfoo2bar45foobarfizabcaaabobjectnestedhellofoo23bcfooafoobarcb1c2123" + ); + assert_eq!( + &finished.string_offsets, + &[ + 0, 5, 10, 13, 14, 17, 19, 22, 25, 28, 29, 30, 31, 32, 32, 32, 33, 34, 35, 41, 47, + 52, 55, 57, 58, 59, 62, 63, 63, 66, 69, 70, 71, 72, 73, 74, 75, 76, 77 + ] + ) + } } From 62afa5fe47b38955d7122a5e19207dbdca4a0da1 Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Wed, 12 Mar 2025 13:30:57 -0700 Subject: [PATCH 2/2] also try serde_json --- arrow-json/src/reader/tape.rs | 255 +++++++++++++++++++++++++++++++++- 1 file changed, 249 insertions(+), 6 deletions(-) diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index baef1bfef837..12f042382593 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -1002,26 +1002,22 @@ mod tests { /// Attempts to parse `n` as an integer no larger than i64. If successful, writes out the value /// as the smallest int type that can hold it and returns true. Otherwise, returns false. - fn try_decode_int(n: &str, scale: u8, values: &mut ByteBuffer) -> bool { + fn try_decode_int(n: &str, values: &mut ByteBuffer) -> bool { let Ok(val): Result = n.parse() else { return false; }; if let Ok(val) = val.try_into() { write_value_header(values, BasicType::Primitive, PrimitiveType::Int8); - values.write_u8(scale); values.write_i8(val); } else if let Ok(val) = val.try_into() { write_value_header(values, BasicType::Primitive, PrimitiveType::Int16); - values.write_u8(scale); values.write_i16(val); } else if let Ok(val) = val.try_into() { write_value_header(values, BasicType::Primitive, PrimitiveType::Int32); - values.write_u8(scale); values.write_i32(val); } else { write_value_header(values, BasicType::Primitive, PrimitiveType::Int64); - values.write_u8(scale); values.write_i64(val); } true @@ -1076,7 +1072,7 @@ mod tests { } } else { // No decimal point, so prefer int but use decimal if too large for i64. - if try_decode_int(n, 0, values) || try_decode_decimal(n, 0, values) { + if try_decode_int(n, values) || try_decode_decimal(n, 0, values) { return; // else fall out and parse as float } } @@ -1494,3 +1490,250 @@ mod tests { ) } } + +#[cfg(test)] +mod serde_json_variant { + use bytebuffer::{ByteBuffer, Endian}; + use num_enum::IntoPrimitive; + use static_assertions::const_assert; + use std::collections::HashMap; + + use super::*; + + #[derive(IntoPrimitive)] + #[repr(u64)] + enum BasicType { + Primitive = 0, + ShortString, + Object, + Array, + } + pub const BASIC_TYPE_BITS: u8 = 2; + + /// For efficiency, JSON parsing only supports a subset of Variant types that can be easily and + /// directly inferred from the JSON type, with some concessions for JSON numeric to avoid + /// information loss. Fancy values like Date and Uuid just come back as strings. + #[derive(IntoPrimitive)] + #[repr(u64)] + enum PrimitiveType { + Null = 0, + True, + False, + Int8, + Int16, + Int32, + Int64, + Double, + Decimal4, + Decimal8, + Decimal16 = 10, + Float = 14, + String = 16, + } + pub const VALUE_HEADER_BITS: u8 = 6; + + fn write_value_header( + values: &mut ByteBuffer, + basic_type: impl Into, + header: impl Into, + ) { + // WARNING: `ByteBuffer::write_bits` puts highest bits first, even in little-endian mode, so we have add + // the bitfields in reverse order. + values.write_bits(header.into(), VALUE_HEADER_BITS); + values.write_bits(basic_type.into(), BASIC_TYPE_BITS); + } + + /// Attempts to parse `n` as an integer no larger than i64. If successful, writes out the value + /// as the smallest int type that can hold it and returns true. Otherwise, returns false. + fn decode_int(val: i64, values: &mut ByteBuffer) { + if let Ok(val) = val.try_into() { + write_value_header(values, BasicType::Primitive, PrimitiveType::Int8); + values.write_i8(val); + } else if let Ok(val) = val.try_into() { + write_value_header(values, BasicType::Primitive, PrimitiveType::Int16); + values.write_i16(val); + } else if let Ok(val) = val.try_into() { + write_value_header(values, BasicType::Primitive, PrimitiveType::Int32); + values.write_i32(val); + } else { + write_value_header(values, BasicType::Primitive, PrimitiveType::Int64); + values.write_i64(val); + } + } + + fn decode_number(n: &serde_json::Number, values: &mut ByteBuffer) { + if let Some(i) = n.as_i64() { + return decode_int(i, values); + } + + // Floating point. No way to know the best size, so always parse it as double. + write_value_header(values, BasicType::Primitive, PrimitiveType::Double); + values.write_f64(n.as_f64().expect("TODO")); + } + + /// Collects the strings we eventually need to store in a variant `metadata` column. We don't + /// know in advance how many strings there will be, and we prefer to dedup for space savings. As + /// each new string is inserted, we assign it an index, which the value column will use to refer + /// to it. We also write out the byte offset, since that can be computed easily up front. After + /// all strings are collected, the `build` method appends their actual bytes in order. + struct MetadataBuilder<'a> { + metadata: ByteBuffer, + strings: HashMap<&'a str, u32>, + num_string_bytes: u32, + } + impl<'a> MetadataBuilder<'a> { + fn new() -> Self { + let mut metadata = ByteBuffer::new(); + metadata.set_endian(Endian::LittleEndian); + metadata.write_bits(0b_11_0_0, 4); // 4-byte offsets, , not sorted + metadata.write_bits(1, 4); // version + metadata.write_u32(0); // placeholder for dictionary_size + Self { + metadata, + strings: HashMap::new(), + num_string_bytes: 0, + } + } + + fn add_string(&mut self, s: &'a str) -> u32 { + if let Some(idx) = self.strings.get(s) { + return *idx; + } + + let idx = u32::try_from(self.strings.len()).expect("TODO"); + self.strings.insert(s, idx); + + self.num_string_bytes += u32::try_from(s.len()).expect("TODO"); + self.metadata.write_u32(self.num_string_bytes); + idx + } + + fn build(self) -> Vec { + let mut strings: Vec<_> = self.strings.into_iter().collect(); + let dictionary_size = strings.len(); + strings.sort_by_key(|(_, idx)| *idx); + + // Append the string bytes now that the offset list is complete + let mut metadata = self.metadata; + for (name, _) in strings { + metadata.write_bytes(name.as_bytes()); + } + + // Fix up the dictionary_size field now that we know its value + metadata.set_wpos(1); + metadata.write_u32(u32::try_from(dictionary_size).expect("TODO")); + metadata.into_vec() + } + } + + fn decode_array<'a>( + array: &'a Vec, + metadata: &mut MetadataBuilder<'a>, + values: &mut ByteBuffer, + ) { + // is_large=true, 4-byte offsets (tape can anyway only handle 32-bit offsets) + write_value_header(values, BasicType::Array, 0b_1_11_u8); + + // Write out the number of elements + values.write_u32(array.len().try_into().unwrap()); + + // The first field offset is always 0, and each array element adds another offset. + values.write_u32(0); + + // Reserve space for the rest of the field offset array up front + let mut field_offset_array_wpos = values.len(); + for _ in 0..array.len() { + values.write_u32(0); + } + + let value_bytes_start = values.len(); + for item in array { + // Write out the value of this array element + decode_one(item, metadata, values); + + // Write out the end offset of this array element's value + let end_offset = values.len() - value_bytes_start; + values.set_wpos(field_offset_array_wpos); + values.write_u32(u32::try_from(end_offset).expect("TODO")); + field_offset_array_wpos = values.get_wpos(); + values.set_wpos(values.len()); + } + } + + fn decode_object<'a>( + items: &'a serde_json::Map, + metadata: &mut MetadataBuilder<'a>, + values: &mut ByteBuffer, + ) { + // is_large=true, 4-byte name ids, 4-byte value offsets + write_value_header(values, BasicType::Object, 0b_1_11_11_u8); + + // WARNING: The fields of an object must be laid out in lexicographical name order. The + // caller must _not_ enable the serde_json "preserve_order" feature that keeps keys in + // arrival order instead. + + // Make an initial pass in order reserve space for the name ids and value offsets. There are + // n+1 value offsets, so reserve the extra slot as well. + let mut field_id_wpos = values.len(); + values.write_u32(0); + let mut value_offset_wpos = values.len(); + for _ in 0..items.len() { + // Reserve space for both field id and value offset. Each field id we write increments + // the value_offset_wpos so that it remains accurate at all times. + // + // NOTE: It's important to write zeros here because one of these calls initializes the + // first value offset that must always be zero. We just don't know in advance which one. + values.write_u32(0); + values.write_u32(0); + } + + let value_bytes_start = values.len(); + for (name, value) in items { + // Append the value bytes + decode_one(value, metadata, values); + let end_offset = values.len() - value_bytes_start; + + let name_idx = metadata.add_string(name); + values.set_wpos(field_id_wpos); + values.write_u32(name_idx); + field_id_wpos = values.get_wpos(); + + values.set_wpos(value_offset_wpos); + values.write_u32(u32::try_from(end_offset).expect("TODO")); + value_offset_wpos = values.get_wpos(); + values.set_wpos(values.len()); + } + } + + fn decode_one<'a>( + value: &'a serde_json::Value, + metadata: &mut MetadataBuilder<'a>, + values: &mut ByteBuffer, + ) { + match value { + serde_json::Value::Null => { + write_value_header(values, BasicType::Primitive, PrimitiveType::Null); + } + serde_json::Value::Bool(true) => { + write_value_header(values, BasicType::Primitive, PrimitiveType::True); + } + serde_json::Value::Bool(false) => { + write_value_header(values, BasicType::Primitive, PrimitiveType::False); + } + serde_json::Value::String(s) => { + if s.len() <= (1 << VALUE_HEADER_BITS) - 1 { + // There is no `From for u64`. 128-bit architectures rejoice? + const_assert!(size_of::() <= size_of::()); + write_value_header(values, BasicType::ShortString, s.len() as u64); + values.write_bytes(s.as_bytes()); + } else { + write_value_header(values, BasicType::Primitive, PrimitiveType::String); + values.write_string(s); + } + } + serde_json::Value::Number(n) => decode_number(n, values), + serde_json::Value::Object(o) => decode_object(o, metadata, values), + serde_json::Value::Array(a) => decode_array(a, metadata, values), + } + } +}