Skip to content

Commit

Permalink
feat: Table metadata (#29)
Browse files Browse the repository at this point in the history
* serde schemav1 & schemav2

* fix default schema id

* implement snapshot

* add partition spec

* add license

* add sortorder

* fix initial & write default

* serialize/deserialize table metadata

* impl table metadata

* fix docs

* fix clippy warnings

* change visibility

* fix rebase

* fix clippy warnings

* fix transform

* introduce static

* fix typo

* change spec export style

* improve table metadata v1 test

* improve table metadata v2 test

* delete temp file

* export all submodule types at spec module

* rename snapshotreference

* rename snapshot retention

* remove option from properties

* remove option from snapshot log

* improve builder

* introduce enum for manifest list files and manifests

* keep retention

* use arc for schema

* use arc for snapshot

* current snapshot returns option

* remove panic from snapshot conversion

* check if current_snapshot_id is -1

* fix schema

* use schema field as fallback in v1 table metadata

* use partition spec as fallback in v1 metadata

* fix parition spec

* introduce _serde module for schema

* introduce _serde module for snapshot

* introduce _serde module for table_metadata

* fix docs

* fix typo

* use minimal table metadata for v1 test
  • Loading branch information
JanKaul authored Aug 21, 2023
1 parent a31fba6 commit bdc66a0
Show file tree
Hide file tree
Showing 9 changed files with 1,783 additions and 19 deletions.
2 changes: 2 additions & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ chrono = "0.4"
uuid = "1.4.1"
ordered-float = "3.7.0"
bitvec = "1.0.1"
serde_repr = "0.1.16"
itertools = "0.11"
bimap = "0.6"
derive_builder = "0.12.0"


[dev-dependencies]
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#![deny(missing_docs)]

#[macro_use]
extern crate derive_builder;

mod error;
pub use error::Error;
pub use error::ErrorKind;
Expand Down
94 changes: 79 additions & 15 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
use ::serde::de::{MapAccess, Visitor};
use serde::de::{Error, IntoDeserializer};
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value as JsonValue;
use std::cell::OnceCell;
use std::convert::identity;
use std::sync::Arc;
use std::{collections::HashMap, fmt, ops::Index};

use super::values::Literal;

/// Field name for list type.
pub(crate) const LIST_FILED_NAME: &str = "element";
pub(crate) const MAP_KEY_FIELD_NAME: &str = "key";
Expand Down Expand Up @@ -341,8 +345,8 @@ impl fmt::Display for StructType {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)]
#[serde(from = "SerdeNestedField", into = "SerdeNestedField")]
/// A struct is a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema.
/// Each field can be either optional or required, meaning that values can (or cannot) be null. Fields may be any type.
/// Fields may have an optional comment or doc string. Fields can have default values.
Expand All @@ -354,17 +358,65 @@ pub struct NestedField {
/// Optional or required
pub required: bool,
/// Datatype
#[serde(rename = "type")]
pub field_type: Box<Type>,
/// Fields may have an optional comment or doc string.
#[serde(skip_serializing_if = "Option::is_none")]
pub doc: Option<String>,
/// Used to populate the field’s value for all records that were written before the field was added to the schema
#[serde(skip_serializing_if = "Option::is_none")]
pub initial_default: Option<String>,
pub initial_default: Option<Literal>,
/// Used to populate the field’s value for any records written after the field was added to the schema, if the writer does not supply the field’s value
pub write_default: Option<Literal>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
struct SerdeNestedField {
pub id: i32,
pub name: String,
pub required: bool,
#[serde(rename = "type")]
pub field_type: Box<Type>,
#[serde(skip_serializing_if = "Option::is_none")]
pub write_default: Option<String>,
pub doc: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub initial_default: Option<JsonValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub write_default: Option<JsonValue>,
}

impl From<SerdeNestedField> for NestedField {
fn from(value: SerdeNestedField) -> Self {
NestedField {
id: value.id,
name: value.name,
required: value.required,
initial_default: value.initial_default.and_then(|x| {
Literal::try_from_json(x, &value.field_type)
.ok()
.and_then(identity)
}),
write_default: value.write_default.and_then(|x| {
Literal::try_from_json(x, &value.field_type)
.ok()
.and_then(identity)
}),
field_type: value.field_type,
doc: value.doc,
}
}
}

impl From<NestedField> for SerdeNestedField {
fn from(value: NestedField) -> Self {
SerdeNestedField {
id: value.id,
name: value.name,
required: value.required,
field_type: value.field_type,
doc: value.doc,
initial_default: value.initial_default.map(|x| (&x).into()),
write_default: value.write_default.map(|x| (&x).into()),
}
}
}

/// Reference to nested field.
Expand Down Expand Up @@ -427,14 +479,14 @@ impl NestedField {
}

/// Set the field's initial default value.
pub fn with_initial_default(mut self, value: impl ToString) -> Self {
self.initial_default = Some(value.to_string());
pub fn with_initial_default(mut self, value: Literal) -> Self {
self.initial_default = Some(value);
self
}

/// Set the field's initial default value.
pub fn with_write_default(mut self, value: impl ToString) -> Self {
self.write_default = Some(value.to_string());
pub fn with_write_default(mut self, value: Literal) -> Self {
self.write_default = Some(value);
self
}
}
Expand Down Expand Up @@ -581,6 +633,10 @@ pub struct MapType {

#[cfg(test)]
mod tests {
use uuid::Uuid;

use crate::spec::values::PrimitiveLiteral;

use super::*;

fn check_type_serde(json: &str, expected_type: Type) {
Expand Down Expand Up @@ -685,8 +741,12 @@ mod tests {
Type::Struct(StructType {
fields: vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Uuid))
.with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb")
.with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae")
.with_initial_default(Literal::Primitive(PrimitiveLiteral::UUID(
Uuid::parse_str("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb").unwrap(),
)))
.with_write_default(Literal::Primitive(PrimitiveLiteral::UUID(
Uuid::parse_str("ec5911be-b0a7-458c-8438-c9a3e53cffae").unwrap(),
)))
.into(),
NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)).into(),
],
Expand Down Expand Up @@ -749,8 +809,12 @@ mod tests {

let struct_type = Type::Struct(StructType::new(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Uuid))
.with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb")
.with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae")
.with_initial_default(Literal::Primitive(PrimitiveLiteral::UUID(
Uuid::parse_str("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb").unwrap(),
)))
.with_write_default(Literal::Primitive(PrimitiveLiteral::UUID(
Uuid::parse_str("ec5911be-b0a7-458c-8438-c9a3e53cffae").unwrap(),
)))
.into(),
NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(
Expand Down
21 changes: 17 additions & 4 deletions crates/iceberg/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,20 @@

//! Spec for Iceberg.

pub mod datatypes;
pub mod schema;
pub mod transform;
pub mod values;
mod datatypes;
mod partition;
mod schema;
mod snapshot;
mod sort;
mod table_metadata;
mod transform;
mod values;

pub use datatypes::*;
pub use partition::*;
pub use schema::*;
pub use snapshot::*;
pub use sort::*;
pub use table_metadata::*;
pub use transform::*;
pub use values::*;
103 changes: 103 additions & 0 deletions crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/*!
* Partitioning
*/
use serde::{Deserialize, Serialize};

use super::transform::Transform;

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
/// Partition fields capture the transform from table data to partition values.
pub struct PartitionField {
/// A source column id from the table’s schema
pub source_id: i32,
/// A partition field id that is used to identify a partition field and is unique within a partition spec.
/// In v2 table metadata, it is unique across all partition specs.
pub field_id: i32,
/// A partition name.
pub name: String,
/// A transform that is applied to the source column to produce a partition value.
pub transform: Transform,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
#[serde(rename_all = "kebab-case")]
#[builder(setter(prefix = "with"))]
/// Partition spec that defines how to produce a tuple of partition values from a record.
pub struct PartitionSpec {
/// Identifier for PartitionSpec
pub spec_id: i32,
/// Details of the partition spec
#[builder(setter(each(name = "with_partition_field")))]
pub fields: Vec<PartitionField>,
}

impl PartitionSpec {
/// Create partition spec builer
pub fn builder() -> PartitionSpecBuilder {
PartitionSpecBuilder::default()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn partition_spec() {
let sort_order = r#"
{
"spec-id": 1,
"fields": [ {
"source-id": 4,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
}, {
"source-id": 1,
"field-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
}, {
"source-id": 2,
"field-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
} ]
}
"#;

let partition_spec: PartitionSpec = serde_json::from_str(sort_order).unwrap();
assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(1000, partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);

assert_eq!(1, partition_spec.fields[1].source_id);
assert_eq!(1001, partition_spec.fields[1].field_id);
assert_eq!("id_bucket", partition_spec.fields[1].name);
assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);

assert_eq!(2, partition_spec.fields[2].source_id);
assert_eq!(1002, partition_spec.fields[2].field_id);
assert_eq!("id_truncate", partition_spec.fields[2].name);
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
}
}
Loading

0 comments on commit bdc66a0

Please # to comment.