From 0ac41d7e1ad46fbdfea289a3d6ccc692a74da88d Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Sun, 14 Jul 2024 15:02:22 -0400 Subject: [PATCH 01/15] feat: Add in-memory catalog --- Cargo.toml | 1 + crates/catalog/inmemory/Cargo.toml | 41 + crates/catalog/inmemory/README.md | 27 + crates/catalog/inmemory/src/catalog.rs | 1582 +++++++++++++++++ crates/catalog/inmemory/src/lib.rs | 26 + .../catalog/inmemory/src/namespace_state.rs | 296 +++ crates/iceberg/src/catalog/mod.rs | 2 +- 7 files changed, 1974 insertions(+), 1 deletion(-) create mode 100644 crates/catalog/inmemory/Cargo.toml create mode 100644 crates/catalog/inmemory/README.md create mode 100644 crates/catalog/inmemory/src/catalog.rs create mode 100644 crates/catalog/inmemory/src/lib.rs create mode 100644 crates/catalog/inmemory/src/namespace_state.rs diff --git a/Cargo.toml b/Cargo.toml index c4f8482cc..b16e5ff46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ arrow-string = { version = "52" } async-stream = "0.3.5" async-trait = "0.1" async-std = "1.12.0" +async-lock = "3.4.0" aws-config = "1.1.8" aws-sdk-glue = "1.21.0" bimap = "0.6" diff --git a/crates/catalog/inmemory/Cargo.toml b/crates/catalog/inmemory/Cargo.toml new file mode 100644 index 000000000..11139f587 --- /dev/null +++ b/crates/catalog/inmemory/Cargo.toml @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-catalog-inmemory" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Rust In-Memory Catalog API" +repository = { workspace = true } +license = { workspace = true } +keywords = ["iceberg", "inmemory", "catalog"] + +[dependencies] +async-trait = { workspace = true } +iceberg = { workspace = true } +itertools = { workspace = true } +serde_json = { workspace = true } +uuid = { workspace = true, features = ["v4"] } +tempfile = { workspace = true } +async-lock = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } diff --git a/crates/catalog/inmemory/README.md b/crates/catalog/inmemory/README.md new file mode 100644 index 000000000..2a8ad6a87 --- /dev/null +++ b/crates/catalog/inmemory/README.md @@ -0,0 +1,27 @@ + + +# Apache Iceberg In-Memory Catalog Official Native Rust Implementation + +[![crates.io](https://img.shields.io/crates/v/iceberg.svg)](https://crates.io/crates/iceberg-catalog-inmemory) +[![docs.rs](https://img.shields.io/docsrs/iceberg.svg)](https://docs.rs/iceberg/latest/iceberg-catalog-inmemory/) + +This crate contains the official Native Rust implementation of Apache Iceberg In-Memory Catalog. + +See the [API documentation](https://docs.rs/iceberg-catalog-inmemory/latest) for examples and the full API. diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs new file mode 100644 index 000000000..3e466654f --- /dev/null +++ b/crates/catalog/inmemory/src/catalog.rs @@ -0,0 +1,1582 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains in-memory catalog implementation. + +use async_lock::Mutex; +use iceberg::io::{FileIO, FileIOBuilder}; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; +use itertools::Itertools; +use std::collections::HashMap; +use uuid::Uuid; + +use async_trait::async_trait; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; +use tempfile::TempDir; + +use crate::namespace_state::NamespaceState; + +/// In-memory catalog implementation. +#[derive(Debug)] +pub struct InMemoryCatalog { + root_namespace_state: Mutex, + file_io: FileIO, +} + +impl InMemoryCatalog { + /// Creates an in-memory catalog. + pub fn new() -> Result { + let root_namespace_state = NamespaceState::new(); + let file_io = FileIOBuilder::new_fs_io().build()?; + let inmemory_catalog = Self { + root_namespace_state: Mutex::new(root_namespace_state), + file_io: file_io, + }; + + Ok(inmemory_catalog) + } +} + +/// Create metadata location from `location` and `version` +fn create_metadata_location(location: impl AsRef, version: i32) -> Result { + if version < 0 { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Table metadata version: '{}' must be a non-negative integer", + version + ), + )) + } else { + let version = format!("{:0>5}", version); + let id = Uuid::new_v4(); + let metadata_location = format!( + "{}/metadata/{}-{}.metadata.json", + location.as_ref(), + version, + id + ); + + Ok(metadata_location) + } +} + +#[async_trait] +impl Catalog for InMemoryCatalog { + /// List namespaces inside the Catalog. + async fn list_namespaces( + &self, + maybe_parent: Option<&NamespaceIdent>, + ) -> Result> { + let root_namespace_state = self.root_namespace_state.lock().await; + + match maybe_parent { + None => Ok(root_namespace_state + .list_top_level_namespaces() + .into_iter() + .map(|str| NamespaceIdent::new(str.to_string())) + .collect_vec()), + Some(parent_namespace_ident) => { + let namespaces = root_namespace_state + .list_namespaces_under(parent_namespace_ident)? + .into_iter() + .map(|name| NamespaceIdent::new(name.to_string())) + .collect_vec(); + + Ok(namespaces) + } + } + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + namespace_ident: &NamespaceIdent, + properties: HashMap, + ) -> Result { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?; + let namespace = Namespace::with_properties(namespace_ident.clone(), properties); + + Ok(namespace) + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result { + let root_namespace_state = self.root_namespace_state.lock().await; + + let namespace = Namespace::with_properties( + namespace_ident.clone(), + root_namespace_state + .get_properties(namespace_ident)? + .clone(), + ); + + Ok(namespace) + } + + /// Check if namespace exists in catalog. + async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result { + let guarded_namespaces = self.root_namespace_state.lock().await; + + Ok(guarded_namespaces.namespace_exists(namespace_ident)) + } + + /// Update a namespace inside the catalog. + /// + /// # Behavior + /// + /// The properties must be the full set of namespace. + async fn update_namespace( + &self, + namespace_ident: &NamespaceIdent, + properties: HashMap, + ) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.replace_properties(namespace_ident, properties) + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.remove_existing_namespace(namespace_ident) + } + + /// List tables from namespace. + async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result> { + let root_namespace_state = self.root_namespace_state.lock().await; + + let table_names = root_namespace_state.list_tables(namespace_ident)?; + let table_idents = table_names + .into_iter() + .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone())) + .collect_vec(); + + Ok(table_idents) + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + namespace_ident: &NamespaceIdent, + table_creation: TableCreation, + ) -> Result { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + let table_name = table_creation.name.clone(); + let table_ident = TableIdent::new(namespace_ident.clone(), table_name); + let table_exists = root_namespace_state.table_exists(&table_ident)?; + + if table_exists { + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot create table {:?}. Table already exists", + &table_ident + ), + )) + } else { + let (table_creation, location) = match table_creation.location.clone() { + Some(location) => (table_creation, location), + None => { + let tmp_dir = TempDir::new()?; + let location = tmp_dir.path().to_str().unwrap().to_string(); + let new_table_creation = TableCreation { + location: Some(location.clone()), + ..table_creation + }; + (new_table_creation, location) + } + }; + let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + let metadata_location = create_metadata_location(&location, 0)?; + + self.file_io + .new_output(&metadata_location)? + .write(serde_json::to_vec(&metadata)?.into()) + .await?; + + root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?; + + let table = Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(table_ident) + .build(); + + Ok(table) + } + } + + /// Load table from the catalog. + async fn load_table(&self, table_ident: &TableIdent) -> Result
{ + let root_namespace_state = self.root_namespace_state.lock().await; + + let metadata_location = root_namespace_state.get_existing_table_location(&table_ident)?; + let input_file = self.file_io.new_input(&metadata_location)?; + let metadata_content = input_file.read().await?; + let metadata = serde_json::from_slice::(&metadata_content)?; + let table = Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location.clone()) + .metadata(metadata) + .identifier(table_ident.clone()) + .build(); + + Ok(table) + } + + /// Drop a table from the catalog. + async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.remove_existing_table(table_ident) + } + + /// Check if a table exists in the catalog. + async fn table_exists(&self, table_ident: &TableIdent) -> Result { + let root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.table_exists(table_ident) + } + + /// Rename a table in the catalog. + async fn rename_table( + &self, + src_table_ident: &TableIdent, + dst_table_ident: &TableIdent, + ) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + let mut new_root_namespace_state = root_namespace_state.clone(); + let metadata_location = new_root_namespace_state + .get_existing_table_location(&src_table_ident)? + .clone(); + new_root_namespace_state.remove_existing_table(&src_table_ident)?; + new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?; + Ok(*root_namespace_state = new_root_namespace_state) + } + + /// Update a table to the catalog. + async fn update_table(&self, _commit: TableCommit) -> Result
{ + Err(Error::new( + ErrorKind::FeatureUnsupported, + "In-memory catalog does not currently support updating tables.", + )) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::hash::Hash; + use std::iter::FromIterator; + + use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + + use super::*; + + fn new_inmemory_catalog() -> impl Catalog { + InMemoryCatalog::new().unwrap() + } + + async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) -> () { + let _ = catalog + .create_namespace(&namespace_ident, HashMap::new()) + .await + .unwrap(); + } + + async fn create_namespaces( + catalog: &C, + namespace_idents: &Vec<&NamespaceIdent>, + ) -> () { + for namespace_ident in namespace_idents { + let _ = create_namespace(catalog, namespace_ident).await; + } + } + + fn to_set(vec: Vec) -> HashSet { + HashSet::from_iter(vec) + } + + fn simple_table_schema() -> Schema { + Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "foo", + Type::Primitive(PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap() + } + + async fn create_table(catalog: &C, table_ident: &TableIdent) { + let _ = catalog + .create_table( + &table_ident.namespace, + TableCreation::builder() + .name(table_ident.name().into()) + .schema(simple_table_schema()) + .build(), + ) + .await + .unwrap(); + } + + async fn create_tables(catalog: &C, table_idents: Vec<&TableIdent>) { + for table_ident in table_idents { + create_table(catalog, table_ident).await; + } + } + + fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) { + assert_eq!(table.identifier(), expected_table_ident); + + let metadata = table.metadata(); + + assert_eq!(metadata.current_schema().as_ref(), expected_schema); + + let expected_partition_spec = PartitionSpec::builder() + .with_spec_id(0) + .with_fields(vec![]) + .build() + .unwrap(); + + assert_eq!( + metadata + .partition_specs_iter() + .map(|p| p.as_ref()) + .collect_vec(), + vec![&expected_partition_spec] + ); + + let expected_sorted_order = SortOrder::builder() + .with_order_id(0) + .with_fields(vec![]) + .build(expected_schema.clone()) + .unwrap(); + + assert_eq!( + metadata + .sort_orders_iter() + .map(|s| s.as_ref()) + .collect_vec(), + vec![&expected_sorted_order] + ); + + assert_eq!(metadata.properties(), &HashMap::new()); + + assert_eq!(table.readonly(), false); + } + + #[tokio::test] + async fn test_list_namespaces_returns_empty_vector() { + let catalog = new_inmemory_catalog(); + + assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); + } + + #[tokio::test] + async fn test_list_namespaces_returns_single_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("abc".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!( + catalog.list_namespaces(None).await.unwrap(), + vec![namespace_ident] + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_multiple_namespaces() { + let catalog = new_inmemory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1, namespace_ident_2]) + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_only_top_level_namespaces() { + let catalog = new_inmemory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::new("b".into()); + create_namespaces( + &catalog, + &vec![&namespace_ident_1, &namespace_ident_2, &namespace_ident_3], + ) + .await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1, namespace_ident_3]) + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_no_namespaces_under_parent() { + let catalog = new_inmemory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; + + assert_eq!( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap(), + vec![] + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_namespace_under_parent() { + let catalog = new_inmemory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::new("c".into()); + create_namespaces( + &catalog, + &vec![&namespace_ident_1, &namespace_ident_2, &namespace_ident_3], + ) + .await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1.clone(), namespace_ident_3]) + ); + + assert_eq!( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap(), + vec![NamespaceIdent::new("b".into())] + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_multiple_namespaces_under_parent() { + let catalog = new_inmemory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("a".to_string()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(); + let namespace_ident_5 = NamespaceIdent::new("b".into()); + create_namespaces( + &catalog, + &vec![ + &namespace_ident_1, + &namespace_ident_2, + &namespace_ident_3, + &namespace_ident_4, + &namespace_ident_5, + ], + ) + .await; + + assert_eq!( + to_set( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap() + ), + to_set(vec![ + NamespaceIdent::new("a".into()), + NamespaceIdent::new("b".into()), + NamespaceIdent::new("c".into()), + ]) + ); + } + + #[tokio::test] + async fn test_namespace_exists_returns_false() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!( + catalog + .namespace_exists(&NamespaceIdent::new("b".into())) + .await + .unwrap(), + false + ); + } + + #[tokio::test] + async fn test_namespace_exists_returns_true() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!( + catalog.namespace_exists(&namespace_ident).await.unwrap(), + true + ); + } + + #[tokio::test] + async fn test_create_namespace_with_empty_properties() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + + assert_eq!( + catalog + .create_namespace(&namespace_ident, HashMap::new()) + .await + .unwrap(), + Namespace::new(namespace_ident.clone()) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_create_namespace_with_properties() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("abc".into()); + + let mut properties: HashMap = HashMap::new(); + properties.insert("k".into(), "v".into()); + + assert_eq!( + catalog + .create_namespace(&namespace_ident, properties.clone()) + .await + .unwrap(), + Namespace::with_properties(namespace_ident.clone(), properties.clone()) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, properties) + ); + } + + #[tokio::test] + async fn test_create_namespace_throws_error_if_namespace_already_exists() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!( + catalog + .create_namespace(&namespace_ident, HashMap::new()) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Cannot create namespace {:?}. Namespace already exists", + &namespace_ident + ) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_create_nested_namespace() { + let catalog = new_inmemory_catalog(); + let parent_namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &parent_namespace_ident).await; + + let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&child_namespace_ident, HashMap::new()) + .await + .unwrap(), + Namespace::new(child_namespace_ident.clone()) + ); + + assert_eq!( + catalog.get_namespace(&child_namespace_ident).await.unwrap(), + Namespace::with_properties(child_namespace_ident, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_create_deeply_nested_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&namespace_ident_a_b_c, HashMap::new()) + .await + .unwrap(), + Namespace::new(namespace_ident_a_b_c.clone()) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b_c, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() { + let catalog = new_inmemory_catalog(); + + let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&nested_namespace_ident, HashMap::new()) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + NamespaceIdent::new("a".into()) + ) + ); + + assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); + } + + #[tokio::test] + async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist( + ) { + let catalog = new_inmemory_catalog(); + + let namespace_ident_a = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident_a).await; + + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&namespace_ident_a_b_c, HashMap::new()) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + NamespaceIdent::from_strs(vec!["a", "b"]).unwrap() + ) + ); + + assert_eq!( + catalog.list_namespaces(None).await.unwrap(), + vec![namespace_ident_a.clone()] + ); + + assert_eq!( + catalog + .list_namespaces(Some(&namespace_ident_a)) + .await + .unwrap(), + vec![] + ); + } + + #[tokio::test] + async fn test_get_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("abc".into()); + + let mut properties: HashMap = HashMap::new(); + properties.insert("k".into(), "v".into()); + let _ = catalog + .create_namespace(&namespace_ident, properties.clone()) + .await + .unwrap(); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, properties) + ) + } + + #[tokio::test] + async fn test_get_nested_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_get_deeply_nested_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + create_namespaces( + &catalog, + &vec![ + &namespace_ident_a, + &namespace_ident_a_b, + &namespace_ident_a_b_c, + ], + ) + .await; + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b_c, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() { + let catalog = new_inmemory_catalog(); + create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; + + let non_existent_namespace_ident = NamespaceIdent::new("b".into()); + assert_eq!( + catalog + .get_namespace(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + async fn test_update_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("abc".into()); + create_namespace(&catalog, &namespace_ident).await; + + let mut new_properties: HashMap = HashMap::new(); + new_properties.insert("k".into(), "v".into()); + + assert_eq!( + catalog + .update_namespace(&namespace_ident, new_properties.clone()) + .await + .unwrap(), + () + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, new_properties) + ) + } + + #[tokio::test] + async fn test_update_nested_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let mut new_properties = HashMap::new(); + new_properties.insert("k".into(), "v".into()); + + assert_eq!( + catalog + .update_namespace(&namespace_ident_a_b, new_properties.clone()) + .await + .unwrap(), + () + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b, new_properties) + ); + } + + #[tokio::test] + async fn test_update_deeply_nested_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + create_namespaces( + &catalog, + &vec![ + &namespace_ident_a, + &namespace_ident_a_b, + &namespace_ident_a_b_c, + ], + ) + .await; + + let mut new_properties = HashMap::new(); + new_properties.insert("k".into(), "v".into()); + + assert_eq!( + catalog + .update_namespace(&namespace_ident_a_b_c, new_properties.clone()) + .await + .unwrap(), + () + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b_c, new_properties) + ); + } + + #[tokio::test] + async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() { + let catalog = new_inmemory_catalog(); + create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await; + + let non_existent_namespace_ident = NamespaceIdent::new("def".into()); + assert_eq!( + catalog + .update_namespace(&non_existent_namespace_ident, HashMap::new()) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + async fn test_drop_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("abc".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!(catalog.drop_namespace(&namespace_ident).await.unwrap(), ()); + + assert_eq!( + catalog.namespace_exists(&namespace_ident).await.unwrap(), + false + ) + } + + #[tokio::test] + async fn test_drop_nested_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + assert_eq!( + catalog.drop_namespace(&namespace_ident_a_b).await.unwrap(), + () + ); + + assert_eq!( + catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap(), + false + ); + + assert_eq!( + catalog.namespace_exists(&namespace_ident_a).await.unwrap(), + true + ); + } + + #[tokio::test] + async fn test_drop_deeply_nested_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + create_namespaces( + &catalog, + &vec![ + &namespace_ident_a, + &namespace_ident_a_b, + &namespace_ident_a_b_c, + ], + ) + .await; + + assert_eq!( + catalog + .drop_namespace(&namespace_ident_a_b_c) + .await + .unwrap(), + () + ); + + assert_eq!( + catalog + .namespace_exists(&namespace_ident_a_b_c) + .await + .unwrap(), + false + ); + + assert_eq!( + catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap(), + true + ); + + assert_eq!( + catalog.namespace_exists(&namespace_ident_a).await.unwrap(), + true + ); + } + + #[tokio::test] + async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() { + let catalog = new_inmemory_catalog(); + + let non_existent_namespace_ident = NamespaceIdent::new("abc".into()); + assert_eq!( + catalog + .drop_namespace(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() { + let catalog = new_inmemory_catalog(); + create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; + + let non_existent_namespace_ident = + NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap(); + assert_eq!( + catalog + .drop_namespace(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + assert_eq!( + catalog.drop_namespace(&namespace_ident_a).await.unwrap(), + () + ); + + assert_eq!( + catalog.namespace_exists(&namespace_ident_a).await.unwrap(), + false + ); + + assert_eq!( + catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap(), + false + ); + } + + #[tokio::test] + async fn test_create_table_without_location() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_name = "tbl1"; + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + + assert_table_eq( + &catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + .build(), + ) + .await + .unwrap(), + &expected_table_ident, + &simple_table_schema(), + ); + + assert_table_eq( + &catalog.load_table(&expected_table_ident).await.unwrap(), + &expected_table_ident, + &simple_table_schema(), + ) + } + + #[tokio::test] + async fn test_create_table_with_location() { + let tmp_dir = TempDir::new().unwrap(); + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_name = "abc"; + let location = tmp_dir.path().to_str().unwrap().to_string(); + let table_creation = TableCreation::builder() + .name(table_name.into()) + .location(location.clone()) + .schema(simple_table_schema()) + .build(); + + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + + assert_table_eq( + &catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(), + &expected_table_ident, + &simple_table_schema(), + ); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + + assert_eq!( + table + .metadata_location() + .unwrap() + .to_string() + .starts_with(&location), + true + ); + + assert_table_eq( + &catalog.load_table(&expected_table_ident).await.unwrap(), + &expected_table_ident, + &simple_table_schema(), + ) + } + + #[tokio::test] + async fn test_create_table_throws_error_if_table_with_same_name_already_exists() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_table(&catalog, &table_ident).await; + + assert_eq!( + catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + .build() + ) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Cannot create table {:?}. Table already exists", + &table_ident + ) + ); + } + + #[tokio::test] + async fn test_list_tables_returns_empty_vector() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]); + } + + #[tokio::test] + async fn test_list_tables_returns_a_single_table() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + assert_eq!( + catalog.list_tables(&namespace_ident).await.unwrap(), + vec![table_ident] + ); + } + + #[tokio::test] + async fn test_list_tables_returns_multiple_tables() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await; + + assert_eq!( + to_set(catalog.list_tables(&namespace_ident).await.unwrap()), + to_set(vec![table_ident_1, table_ident_2]) + ); + } + + #[tokio::test] + async fn test_list_tables_returns_tables_from_correct_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("n1".into()); + let namespace_ident_2 = NamespaceIdent::new("n2".into()); + create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; + + let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into()); + let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into()); + let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into()); + let _ = create_tables( + &catalog, + vec![&table_ident_1, &table_ident_2, &table_ident_3], + ) + .await; + + assert_eq!( + to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()), + to_set(vec![table_ident_1, table_ident_2]) + ); + + assert_eq!( + to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()), + to_set(vec![table_ident_3]) + ); + } + + #[tokio::test] + async fn test_list_tables_returns_table_under_nested_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + assert_eq!( + catalog.list_tables(&namespace_ident_a_b).await.unwrap(), + vec![table_ident] + ); + } + + #[tokio::test] + async fn test_list_tables_throws_error_if_namespace_doesnt_exist() { + let catalog = new_inmemory_catalog(); + + let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); + + assert_eq!( + catalog + .list_tables(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ), + ); + } + + #[tokio::test] + async fn test_drop_table() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + assert_eq!(catalog.drop_table(&table_ident).await.unwrap(), ()); + } + + #[tokio::test] + async fn test_drop_table_drops_table_under_nested_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + assert_eq!(catalog.drop_table(&table_ident).await.unwrap(), ()); + + assert_eq!( + catalog.list_tables(&namespace_ident_a_b).await.unwrap(), + vec![] + ); + } + + #[tokio::test] + async fn test_drop_table_throws_error_if_namespace_doesnt_exist() { + let catalog = new_inmemory_catalog(); + + let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); + let non_existent_table_ident = + TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into()); + + assert_eq!( + catalog + .drop_table(&non_existent_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ), + ); + } + + #[tokio::test] + async fn test_drop_table_throws_error_if_table_doesnt_exist() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + + let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + + assert_eq!( + catalog + .drop_table(&non_existent_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such table: {:?}", + non_existent_table_ident + ), + ); + } + + #[tokio::test] + async fn test_table_exists_returns_true() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + assert_eq!(catalog.table_exists(&table_ident).await.unwrap(), true); + } + + #[tokio::test] + async fn test_table_exists_returns_false() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + + assert_eq!( + catalog + .table_exists(&non_existent_table_ident) + .await + .unwrap(), + false + ); + } + + #[tokio::test] + async fn test_table_exists_under_nested_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + assert_eq!(catalog.table_exists(&table_ident).await.unwrap(), true); + + let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into()); + assert_eq!( + catalog + .table_exists(&non_existent_table_ident) + .await + .unwrap(), + false + ); + } + + #[tokio::test] + async fn test_table_exists_throws_error_if_namespace_doesnt_exist() { + let catalog = new_inmemory_catalog(); + + let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); + let non_existent_table_ident = + TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into()); + + assert_eq!( + catalog + .table_exists(&non_existent_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ), + ); + } + + #[tokio::test] + async fn test_rename_table_in_same_namespace() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + create_table(&catalog, &src_table_ident).await; + + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(), + (), + ); + + assert_eq!( + catalog.list_tables(&namespace_ident).await.unwrap(), + vec![dst_table_ident], + ); + } + + #[tokio::test] + async fn test_rename_table_across_namespaces() { + let catalog = new_inmemory_catalog(); + let src_namespace_ident = NamespaceIdent::new("a".into()); + let dst_namespace_ident = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await; + let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into()); + create_table(&catalog, &src_table_ident).await; + + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(), + (), + ); + + assert_eq!( + catalog.list_tables(&src_namespace_ident).await.unwrap(), + vec![], + ); + + assert_eq!( + catalog.list_tables(&dst_namespace_ident).await.unwrap(), + vec![dst_table_ident], + ); + } + + #[tokio::test] + async fn test_rename_table_src_table_is_same_as_dst_table() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into()); + create_table(&catalog, &table_ident).await; + + assert_eq!( + catalog + .rename_table(&table_ident, &table_ident) + .await + .unwrap(), + (), + ); + + assert_eq!( + catalog.list_tables(&namespace_ident).await.unwrap(), + vec![table_ident], + ); + } + + #[tokio::test] + async fn test_rename_table_across_nested_namespaces() { + let catalog = new_inmemory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + create_namespaces( + &catalog, + &vec![ + &namespace_ident_a, + &namespace_ident_a_b, + &namespace_ident_a_b_c, + ], + ) + .await; + + let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into()); + create_tables(&catalog, vec![&src_table_ident]).await; + + let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(), + () + ); + + assert_eq!(catalog.table_exists(&src_table_ident).await.unwrap(), false); + + assert_eq!(catalog.table_exists(&dst_table_ident).await.unwrap(), true); + } + + #[tokio::test] + async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() { + let catalog = new_inmemory_catalog(); + + let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into()); + let src_table_ident = + TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into()); + + let dst_namespace_ident = NamespaceIdent::new("n2".into()); + create_namespace(&catalog, &dst_namespace_ident).await; + let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into()); + + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_src_namespace_ident + ), + ); + } + + #[tokio::test] + async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() { + let catalog = new_inmemory_catalog(); + let src_namespace_ident = NamespaceIdent::new("n1".into()); + let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into()); + create_namespace(&catalog, &src_namespace_ident).await; + create_table(&catalog, &src_table_ident).await; + + let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into()); + let dst_table_ident = + TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into()); + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_dst_namespace_ident + ), + ); + } + + #[tokio::test] + async fn test_rename_table_throws_error_if_src_table_doesnt_exist() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap_err() + .to_string(), + format!("Unexpected => No such table: {:?}", src_table_ident), + ); + } + + #[tokio::test] + async fn test_rename_table_throws_error_if_dst_table_already_exists() { + let catalog = new_inmemory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await; + + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Cannot insert table {:? } as it already exists.", + &dst_table_ident + ), + ); + } +} diff --git a/crates/catalog/inmemory/src/lib.rs b/crates/catalog/inmemory/src/lib.rs new file mode 100644 index 000000000..928e0e73d --- /dev/null +++ b/crates/catalog/inmemory/src/lib.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg in-memory Catalog API implementation. + +#![deny(missing_docs)] +#![feature(map_try_insert)] + +mod catalog; +mod namespace_state; + +pub use catalog::*; diff --git a/crates/catalog/inmemory/src/namespace_state.rs b/crates/catalog/inmemory/src/namespace_state.rs new file mode 100644 index 000000000..48d24a4de --- /dev/null +++ b/crates/catalog/inmemory/src/namespace_state.rs @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use itertools::Itertools; +use std::collections::HashMap; + +// Represents the state of a namespace +#[derive(Debug, Clone)] +pub(crate) struct NamespaceState { + // Properties of this namespace + properties: HashMap, + // Namespaces nested inside this namespace + namespaces: HashMap, + // Mapping of tables to metadata locations in this namespace + table_metadata_locations: HashMap, +} + +fn no_such_namespace_err(namespace_ident: &NamespaceIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!("No such namespace: {:?}", namespace_ident), + )) +} + +fn no_such_table_err(table_ident: &TableIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!("No such table: {:?}", table_ident), + )) +} + +impl NamespaceState { + // Creates a new namespace state + pub(crate) fn new() -> Self { + Self { + properties: HashMap::new(), + namespaces: HashMap::new(), + table_metadata_locations: HashMap::new(), + } + } + + // Returns the state of the given namespace or an error if doesn't exist + fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<&NamespaceState> { + let mut acc_name_parts = vec![]; + let mut namespace_state = self; + let mut remaining = namespace_ident.into_iter(); + while let Some(next_name) = remaining.next() { + acc_name_parts.push(next_name); + match namespace_state.namespaces.get(next_name) { + None => { + let namespace_ident = NamespaceIdent::from_strs(acc_name_parts)?; + return no_such_namespace_err(&namespace_ident); + } + Some(intermediate_namespace) => { + namespace_state = intermediate_namespace; + } + } + } + + Ok(namespace_state) + } + + // Returns the state of the given namespace or an error if doesn't exist + fn get_mut_namespace( + &mut self, + namespace_ident: &NamespaceIdent, + ) -> Result<&mut NamespaceState> { + let mut acc_name_parts = vec![]; + let mut namespace_state = self; + let mut remaining = namespace_ident.into_iter(); + while let Some(next_name) = remaining.next() { + acc_name_parts.push(next_name); + match namespace_state.namespaces.get_mut(next_name) { + None => { + let namespace_ident = NamespaceIdent::from_strs(acc_name_parts)?; + return no_such_namespace_err(&namespace_ident); + } + Some(intermediate_namespace) => { + namespace_state = intermediate_namespace; + } + } + } + + Ok(namespace_state) + } + + // Returns the state of the parent of the given namespace or an error if doesn't exist + fn get_mut_parent_namespace_of( + &mut self, + namespace_ident: &NamespaceIdent, + ) -> Result<(&mut NamespaceState, String)> { + match namespace_ident.split_last() { + None => Err(Error::new( + ErrorKind::DataInvalid, + "Namespace identifier can't be empty!", + )), + Some((child_namespace_name, parent_name_parts)) => { + let parent_namespace_state = if parent_name_parts.is_empty() { + Ok(self) + } else { + let parent_namespace_ident = NamespaceIdent::from_strs(parent_name_parts)?; + self.get_mut_namespace(&parent_namespace_ident) + }?; + + Ok((parent_namespace_state, child_namespace_name.clone())) + } + } + } + + // Returns any top-level namespaces + pub(crate) fn list_top_level_namespaces(&self) -> Vec<&String> { + self.namespaces + .keys() + .map(|namespace_name| namespace_name) + .collect_vec() + } + + // Returns any namespaces nested under the given namespace or an error if the given namespace does not exist + pub(crate) fn list_namespaces_under( + &self, + namespace_ident: &NamespaceIdent, + ) -> Result> { + let nested_namespace_names = self + .get_namespace(namespace_ident)? + .namespaces + .keys() + .map(|namespace_name| namespace_name) + .collect_vec(); + + Ok(nested_namespace_names) + } + + // Returns true if the given namespace exists, otherwise false + pub(crate) fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> bool { + self.get_namespace(namespace_ident).is_ok() + } + + // Inserts the given namespace or returns an error if it already exists + pub(crate) fn insert_new_namespace( + &mut self, + namespace_ident: &NamespaceIdent, + properties: HashMap, + ) -> Result<()> { + let (parent_namespace_state, child_namespace_name) = + self.get_mut_parent_namespace_of(namespace_ident)?; + + match parent_namespace_state.namespaces.try_insert( + child_namespace_name, + NamespaceState { + properties, + namespaces: HashMap::new(), + table_metadata_locations: HashMap::new(), + }, + ) { + Err(_) => Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot create namespace {:?}. Namespace already exists", + namespace_ident + ), + )), + Ok(_) => Ok(()), + } + } + + // Removes the given namespace or returns an error if doesn't exist + pub(crate) fn remove_existing_namespace( + &mut self, + namespace_ident: &NamespaceIdent, + ) -> Result<()> { + let (parent_namespace_state, child_namespace_name) = + self.get_mut_parent_namespace_of(namespace_ident)?; + + match parent_namespace_state + .namespaces + .remove(&child_namespace_name) + { + None => no_such_namespace_err(namespace_ident), + Some(_) => Ok(()), + } + } + + // Returns the properties of the given namespace or an error if doesn't exist + pub(crate) fn get_properties( + &self, + namespace_ident: &NamespaceIdent, + ) -> Result<&HashMap> { + let properties = &self.get_namespace(namespace_ident)?.properties; + + Ok(properties) + } + + // Returns the properties of this namespace or an error if doesn't exist + fn get_mut_properties( + &mut self, + namespace_ident: &NamespaceIdent, + ) -> Result<&mut HashMap> { + let properties = &mut self.get_mut_namespace(namespace_ident)?.properties; + + Ok(properties) + } + + // Replaces the properties of the given namespace or an error if doesn't exist + pub(crate) fn replace_properties( + &mut self, + namespace_ident: &NamespaceIdent, + new_properties: HashMap, + ) -> Result<()> { + let properties = self.get_mut_properties(namespace_ident)?; + + Ok(*properties = new_properties) + } + + // Returns the list of table names under the given namespace + pub(crate) fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result> { + let table_names = self + .get_namespace(namespace_ident)? + .table_metadata_locations + .keys() + .map(|table_name| table_name) + .collect_vec(); + + Ok(table_names) + } + + // Returns true if the given table exists, otherwise false + pub(crate) fn table_exists(&self, table_ident: &TableIdent) -> Result { + let namespace_state = self.get_namespace(table_ident.namespace())?; + let table_exists = namespace_state + .table_metadata_locations + .contains_key(&table_ident.name); + + Ok(table_exists) + } + + // Returns the metadata location of the given table or an error if doesn't exist + pub(crate) fn get_existing_table_location(&self, table_ident: &TableIdent) -> Result<&String> { + let namespace = self.get_namespace(table_ident.namespace())?; + + match namespace.table_metadata_locations.get(table_ident.name()) { + None => no_such_table_err(&table_ident), + Some(table_metadadata_location) => Ok(table_metadadata_location), + } + } + + // Inserts the given table or returns an error if it already exists + pub(crate) fn insert_new_table( + &mut self, + table_ident: &TableIdent, + metadata_location: String, + ) -> Result<()> { + let namespace = self.get_mut_namespace(table_ident.namespace())?; + + match namespace + .table_metadata_locations + .try_insert(table_ident.name().to_string(), metadata_location) + { + Err(_) => Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot insert table {:?} as it already exists.", + table_ident + ), + )), + Ok(_) => Ok(()), + } + } + + // Removes the given table or returns an error if doesn't exist + pub(crate) fn remove_existing_table(&mut self, table_ident: &TableIdent) -> Result<()> { + let namespace = self.get_mut_namespace(table_ident.namespace())?; + + match namespace + .table_metadata_locations + .remove(table_ident.name()) + { + None => no_such_table_err(&table_ident), + Some(_) => Ok(()), + } + } +} diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 4d7a47ac7..d6311e377 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -35,7 +35,7 @@ use uuid::Uuid; /// The catalog API for Iceberg Rust. #[async_trait] pub trait Catalog: Debug + Sync + Send { - /// List namespaces from table. + /// List namespaces inside the Catalog. async fn list_namespaces(&self, parent: Option<&NamespaceIdent>) -> Result>; From 0cdbbbbe64eb277394f6ff1fb4612749ac41224a Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Tue, 23 Jul 2024 11:37:25 -0400 Subject: [PATCH 02/15] Make clippy happy --- crates/catalog/inmemory/src/catalog.rs | 251 +++++++----------- .../catalog/inmemory/src/namespace_state.rs | 21 +- 2 files changed, 97 insertions(+), 175 deletions(-) diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index 3e466654f..f94a1c96d 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -49,7 +49,7 @@ impl InMemoryCatalog { let file_io = FileIOBuilder::new_fs_io().build()?; let inmemory_catalog = Self { root_namespace_state: Mutex::new(root_namespace_state), - file_io: file_io, + file_io, }; Ok(inmemory_catalog) @@ -235,8 +235,8 @@ impl Catalog for InMemoryCatalog { async fn load_table(&self, table_ident: &TableIdent) -> Result
{ let root_namespace_state = self.root_namespace_state.lock().await; - let metadata_location = root_namespace_state.get_existing_table_location(&table_ident)?; - let input_file = self.file_io.new_input(&metadata_location)?; + let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; + let input_file = self.file_io.new_input(metadata_location)?; let metadata_content = input_file.read().await?; let metadata = serde_json::from_slice::(&metadata_content)?; let table = Table::builder() @@ -273,11 +273,12 @@ impl Catalog for InMemoryCatalog { let mut new_root_namespace_state = root_namespace_state.clone(); let metadata_location = new_root_namespace_state - .get_existing_table_location(&src_table_ident)? + .get_existing_table_location(src_table_ident)? .clone(); - new_root_namespace_state.remove_existing_table(&src_table_ident)?; + new_root_namespace_state.remove_existing_table(src_table_ident)?; new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?; - Ok(*root_namespace_state = new_root_namespace_state) + *root_namespace_state = new_root_namespace_state; + Ok(()) } /// Update a table to the catalog. @@ -303,17 +304,14 @@ mod tests { InMemoryCatalog::new().unwrap() } - async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) -> () { + async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { let _ = catalog - .create_namespace(&namespace_ident, HashMap::new()) + .create_namespace(namespace_ident, HashMap::new()) .await .unwrap(); } - async fn create_namespaces( - catalog: &C, - namespace_idents: &Vec<&NamespaceIdent>, - ) -> () { + async fn create_namespaces(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) { for namespace_ident in namespace_idents { let _ = create_namespace(catalog, namespace_ident).await; } @@ -391,7 +389,7 @@ mod tests { assert_eq!(metadata.properties(), &HashMap::new()); - assert_eq!(table.readonly(), false); + assert!(!table.readonly()); } #[tokio::test] @@ -527,13 +525,10 @@ mod tests { let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; - assert_eq!( - catalog - .namespace_exists(&NamespaceIdent::new("b".into())) - .await - .unwrap(), - false - ); + assert!(!catalog + .namespace_exists(&NamespaceIdent::new("b".into())) + .await + .unwrap()); } #[tokio::test] @@ -542,10 +537,7 @@ mod tests { let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; - assert_eq!( - catalog.namespace_exists(&namespace_ident).await.unwrap(), - true - ); + assert!(catalog.namespace_exists(&namespace_ident).await.unwrap()); } #[tokio::test] @@ -796,13 +788,10 @@ mod tests { let mut new_properties: HashMap = HashMap::new(); new_properties.insert("k".into(), "v".into()); - assert_eq!( - catalog - .update_namespace(&namespace_ident, new_properties.clone()) - .await - .unwrap(), - () - ); + catalog + .update_namespace(&namespace_ident, new_properties.clone()) + .await + .unwrap(); assert_eq!( catalog.get_namespace(&namespace_ident).await.unwrap(), @@ -820,13 +809,10 @@ mod tests { let mut new_properties = HashMap::new(); new_properties.insert("k".into(), "v".into()); - assert_eq!( - catalog - .update_namespace(&namespace_ident_a_b, new_properties.clone()) - .await - .unwrap(), - () - ); + catalog + .update_namespace(&namespace_ident_a_b, new_properties.clone()) + .await + .unwrap(); assert_eq!( catalog.get_namespace(&namespace_ident_a_b).await.unwrap(), @@ -853,13 +839,10 @@ mod tests { let mut new_properties = HashMap::new(); new_properties.insert("k".into(), "v".into()); - assert_eq!( - catalog - .update_namespace(&namespace_ident_a_b_c, new_properties.clone()) - .await - .unwrap(), - () - ); + catalog + .update_namespace(&namespace_ident_a_b_c, new_properties.clone()) + .await + .unwrap(); assert_eq!( catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(), @@ -892,12 +875,9 @@ mod tests { let namespace_ident = NamespaceIdent::new("abc".into()); create_namespace(&catalog, &namespace_ident).await; - assert_eq!(catalog.drop_namespace(&namespace_ident).await.unwrap(), ()); + catalog.drop_namespace(&namespace_ident).await.unwrap(); - assert_eq!( - catalog.namespace_exists(&namespace_ident).await.unwrap(), - false - ) + assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap()) } #[tokio::test] @@ -907,23 +887,14 @@ mod tests { let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; - assert_eq!( - catalog.drop_namespace(&namespace_ident_a_b).await.unwrap(), - () - ); + catalog.drop_namespace(&namespace_ident_a_b).await.unwrap(); - assert_eq!( - catalog - .namespace_exists(&namespace_ident_a_b) - .await - .unwrap(), - false - ); + assert!(!catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); - assert_eq!( - catalog.namespace_exists(&namespace_ident_a).await.unwrap(), - true - ); + assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap()); } #[tokio::test] @@ -942,34 +913,22 @@ mod tests { ) .await; - assert_eq!( - catalog - .drop_namespace(&namespace_ident_a_b_c) - .await - .unwrap(), - () - ); + catalog + .drop_namespace(&namespace_ident_a_b_c) + .await + .unwrap(); - assert_eq!( - catalog - .namespace_exists(&namespace_ident_a_b_c) - .await - .unwrap(), - false - ); + assert!(!catalog + .namespace_exists(&namespace_ident_a_b_c) + .await + .unwrap()); - assert_eq!( - catalog - .namespace_exists(&namespace_ident_a_b) - .await - .unwrap(), - true - ); + assert!(catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); - assert_eq!( - catalog.namespace_exists(&namespace_ident_a).await.unwrap(), - true - ); + assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap()); } #[tokio::test] @@ -1017,23 +976,14 @@ mod tests { let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; - assert_eq!( - catalog.drop_namespace(&namespace_ident_a).await.unwrap(), - () - ); + catalog.drop_namespace(&namespace_ident_a).await.unwrap(); - assert_eq!( - catalog.namespace_exists(&namespace_ident_a).await.unwrap(), - false - ); + assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap()); - assert_eq!( - catalog - .namespace_exists(&namespace_ident_a_b) - .await - .unwrap(), - false - ); + assert!(!catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); } #[tokio::test] @@ -1097,14 +1047,11 @@ mod tests { assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); - assert_eq!( - table - .metadata_location() - .unwrap() - .to_string() - .starts_with(&location), - true - ); + assert!(table + .metadata_location() + .unwrap() + .to_string() + .starts_with(&location)); assert_table_eq( &catalog.load_table(&expected_table_ident).await.unwrap(), @@ -1251,7 +1198,7 @@ mod tests { let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); create_table(&catalog, &table_ident).await; - assert_eq!(catalog.drop_table(&table_ident).await.unwrap(), ()); + catalog.drop_table(&table_ident).await.unwrap(); } #[tokio::test] @@ -1264,7 +1211,7 @@ mod tests { let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); create_table(&catalog, &table_ident).await; - assert_eq!(catalog.drop_table(&table_ident).await.unwrap(), ()); + catalog.drop_table(&table_ident).await.unwrap(); assert_eq!( catalog.list_tables(&namespace_ident_a_b).await.unwrap(), @@ -1322,7 +1269,7 @@ mod tests { let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); create_table(&catalog, &table_ident).await; - assert_eq!(catalog.table_exists(&table_ident).await.unwrap(), true); + assert!(catalog.table_exists(&table_ident).await.unwrap()); } #[tokio::test] @@ -1332,13 +1279,10 @@ mod tests { create_namespace(&catalog, &namespace_ident).await; let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); - assert_eq!( - catalog - .table_exists(&non_existent_table_ident) - .await - .unwrap(), - false - ); + assert!(!catalog + .table_exists(&non_existent_table_ident) + .await + .unwrap()); } #[tokio::test] @@ -1351,16 +1295,13 @@ mod tests { let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); create_table(&catalog, &table_ident).await; - assert_eq!(catalog.table_exists(&table_ident).await.unwrap(), true); + assert!(catalog.table_exists(&table_ident).await.unwrap()); let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into()); - assert_eq!( - catalog - .table_exists(&non_existent_table_ident) - .await - .unwrap(), - false - ); + assert!(!catalog + .table_exists(&non_existent_table_ident) + .await + .unwrap()); } #[tokio::test] @@ -1393,13 +1334,10 @@ mod tests { let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into()); create_table(&catalog, &src_table_ident).await; - assert_eq!( - catalog - .rename_table(&src_table_ident, &dst_table_ident) - .await - .unwrap(), - (), - ); + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(); assert_eq!( catalog.list_tables(&namespace_ident).await.unwrap(), @@ -1417,13 +1355,10 @@ mod tests { let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into()); create_table(&catalog, &src_table_ident).await; - assert_eq!( - catalog - .rename_table(&src_table_ident, &dst_table_ident) - .await - .unwrap(), - (), - ); + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(); assert_eq!( catalog.list_tables(&src_namespace_ident).await.unwrap(), @@ -1444,13 +1379,10 @@ mod tests { let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into()); create_table(&catalog, &table_ident).await; - assert_eq!( - catalog - .rename_table(&table_ident, &table_ident) - .await - .unwrap(), - (), - ); + catalog + .rename_table(&table_ident, &table_ident) + .await + .unwrap(); assert_eq!( catalog.list_tables(&namespace_ident).await.unwrap(), @@ -1478,17 +1410,14 @@ mod tests { create_tables(&catalog, vec![&src_table_ident]).await; let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); - assert_eq!( - catalog - .rename_table(&src_table_ident, &dst_table_ident) - .await - .unwrap(), - () - ); + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(); - assert_eq!(catalog.table_exists(&src_table_ident).await.unwrap(), false); + assert!(!catalog.table_exists(&src_table_ident).await.unwrap()); - assert_eq!(catalog.table_exists(&dst_table_ident).await.unwrap(), true); + assert!(catalog.table_exists(&dst_table_ident).await.unwrap()); } #[tokio::test] diff --git a/crates/catalog/inmemory/src/namespace_state.rs b/crates/catalog/inmemory/src/namespace_state.rs index 48d24a4de..5a1a3e955 100644 --- a/crates/catalog/inmemory/src/namespace_state.rs +++ b/crates/catalog/inmemory/src/namespace_state.rs @@ -58,8 +58,7 @@ impl NamespaceState { fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<&NamespaceState> { let mut acc_name_parts = vec![]; let mut namespace_state = self; - let mut remaining = namespace_ident.into_iter(); - while let Some(next_name) = remaining.next() { + for next_name in namespace_ident.iter() { acc_name_parts.push(next_name); match namespace_state.namespaces.get(next_name) { None => { @@ -82,8 +81,7 @@ impl NamespaceState { ) -> Result<&mut NamespaceState> { let mut acc_name_parts = vec![]; let mut namespace_state = self; - let mut remaining = namespace_ident.into_iter(); - while let Some(next_name) = remaining.next() { + for next_name in namespace_ident.iter() { acc_name_parts.push(next_name); match namespace_state.namespaces.get_mut(next_name) { None => { @@ -124,10 +122,7 @@ impl NamespaceState { // Returns any top-level namespaces pub(crate) fn list_top_level_namespaces(&self) -> Vec<&String> { - self.namespaces - .keys() - .map(|namespace_name| namespace_name) - .collect_vec() + self.namespaces.keys().collect_vec() } // Returns any namespaces nested under the given namespace or an error if the given namespace does not exist @@ -139,7 +134,6 @@ impl NamespaceState { .get_namespace(namespace_ident)? .namespaces .keys() - .map(|namespace_name| namespace_name) .collect_vec(); Ok(nested_namespace_names) @@ -222,8 +216,8 @@ impl NamespaceState { new_properties: HashMap, ) -> Result<()> { let properties = self.get_mut_properties(namespace_ident)?; - - Ok(*properties = new_properties) + *properties = new_properties; + Ok(()) } // Returns the list of table names under the given namespace @@ -232,7 +226,6 @@ impl NamespaceState { .get_namespace(namespace_ident)? .table_metadata_locations .keys() - .map(|table_name| table_name) .collect_vec(); Ok(table_names) @@ -253,7 +246,7 @@ impl NamespaceState { let namespace = self.get_namespace(table_ident.namespace())?; match namespace.table_metadata_locations.get(table_ident.name()) { - None => no_such_table_err(&table_ident), + None => no_such_table_err(table_ident), Some(table_metadadata_location) => Ok(table_metadadata_location), } } @@ -289,7 +282,7 @@ impl NamespaceState { .table_metadata_locations .remove(table_ident.name()) { - None => no_such_table_err(&table_ident), + None => no_such_table_err(table_ident), Some(_) => Ok(()), } } From 933852cd43e8d2cd7875d833e3ee96244843cc4e Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Tue, 23 Jul 2024 11:51:51 -0400 Subject: [PATCH 03/15] Make cargo sort happy --- crates/catalog/inmemory/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/catalog/inmemory/Cargo.toml b/crates/catalog/inmemory/Cargo.toml index 11139f587..9babd1ec9 100644 --- a/crates/catalog/inmemory/Cargo.toml +++ b/crates/catalog/inmemory/Cargo.toml @@ -29,13 +29,13 @@ license = { workspace = true } keywords = ["iceberg", "inmemory", "catalog"] [dependencies] +async-lock = { workspace = true } async-trait = { workspace = true } iceberg = { workspace = true } itertools = { workspace = true } serde_json = { workspace = true } -uuid = { workspace = true, features = ["v4"] } tempfile = { workspace = true } -async-lock = { workspace = true } +uuid = { workspace = true, features = ["v4"] } [dev-dependencies] tokio = { workspace = true } From c053020328a2bb0f8344a877509f779ac6ee1639 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Wed, 24 Jul 2024 12:12:32 -0400 Subject: [PATCH 04/15] Fix README links --- crates/catalog/inmemory/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/catalog/inmemory/README.md b/crates/catalog/inmemory/README.md index 2a8ad6a87..f631aabc9 100644 --- a/crates/catalog/inmemory/README.md +++ b/crates/catalog/inmemory/README.md @@ -19,8 +19,8 @@ # Apache Iceberg In-Memory Catalog Official Native Rust Implementation -[![crates.io](https://img.shields.io/crates/v/iceberg.svg)](https://crates.io/crates/iceberg-catalog-inmemory) -[![docs.rs](https://img.shields.io/docsrs/iceberg.svg)](https://docs.rs/iceberg/latest/iceberg-catalog-inmemory/) +[![crates.io](https://img.shields.io/crates/v/iceberg-catalog-inmemory.svg)](https://crates.io/crates/iceberg-catalog-inmemory) +[![docs.rs](https://img.shields.io/docsrs/iceberg-catalog-inmemory.svg)](https://docs.rs/iceberg/latest/iceberg-catalog-inmemory/) This crate contains the official Native Rust implementation of Apache Iceberg In-Memory Catalog. From dc50c15d30151eecc2f0f7c4cafcbda38dd2e6a2 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Wed, 24 Jul 2024 17:52:03 -0400 Subject: [PATCH 05/15] Configurable file_io --- crates/catalog/inmemory/src/catalog.rs | 18 ++++++++---------- crates/catalog/inmemory/src/namespace_state.rs | 2 +- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index f94a1c96d..dbe5059f8 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -18,7 +18,7 @@ //! This module contains in-memory catalog implementation. use async_lock::Mutex; -use iceberg::io::{FileIO, FileIOBuilder}; +use iceberg::io::FileIO; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use itertools::Itertools; use std::collections::HashMap; @@ -44,15 +44,11 @@ pub struct InMemoryCatalog { impl InMemoryCatalog { /// Creates an in-memory catalog. - pub fn new() -> Result { - let root_namespace_state = NamespaceState::new(); - let file_io = FileIOBuilder::new_fs_io().build()?; - let inmemory_catalog = Self { - root_namespace_state: Mutex::new(root_namespace_state), + pub fn new(file_io: FileIO) -> Self { + Self { + root_namespace_state: Mutex::new(NamespaceState::new()), file_io, - }; - - Ok(inmemory_catalog) + } } } @@ -292,6 +288,7 @@ impl Catalog for InMemoryCatalog { #[cfg(test)] mod tests { + use iceberg::io::FileIOBuilder; use std::collections::HashSet; use std::hash::Hash; use std::iter::FromIterator; @@ -301,7 +298,8 @@ mod tests { use super::*; fn new_inmemory_catalog() -> impl Catalog { - InMemoryCatalog::new().unwrap() + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + InMemoryCatalog::new(file_io) } async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { diff --git a/crates/catalog/inmemory/src/namespace_state.rs b/crates/catalog/inmemory/src/namespace_state.rs index 5a1a3e955..0aad7d091 100644 --- a/crates/catalog/inmemory/src/namespace_state.rs +++ b/crates/catalog/inmemory/src/namespace_state.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use std::collections::HashMap; // Represents the state of a namespace -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub(crate) struct NamespaceState { // Properties of this namespace properties: HashMap, From bcfe18d73b8980ee10722de2c8a2c7b0d6360426 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Wed, 24 Jul 2024 18:37:32 -0400 Subject: [PATCH 06/15] Avoid nightly features --- crates/catalog/inmemory/src/catalog.rs | 2 +- crates/catalog/inmemory/src/lib.rs | 1 - .../catalog/inmemory/src/namespace_state.rs | 36 +++++++++++-------- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index dbe5059f8..8cadbc8dc 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -1501,7 +1501,7 @@ mod tests { .unwrap_err() .to_string(), format!( - "Unexpected => Cannot insert table {:? } as it already exists.", + "Unexpected => Cannot insert table {:? }. Table already exists.", &dst_table_ident ), ); diff --git a/crates/catalog/inmemory/src/lib.rs b/crates/catalog/inmemory/src/lib.rs index 928e0e73d..e06f1fa5b 100644 --- a/crates/catalog/inmemory/src/lib.rs +++ b/crates/catalog/inmemory/src/lib.rs @@ -18,7 +18,6 @@ //! Iceberg in-memory Catalog API implementation. #![deny(missing_docs)] -#![feature(map_try_insert)] mod catalog; mod namespace_state; diff --git a/crates/catalog/inmemory/src/namespace_state.rs b/crates/catalog/inmemory/src/namespace_state.rs index 0aad7d091..706ce8c8a 100644 --- a/crates/catalog/inmemory/src/namespace_state.rs +++ b/crates/catalog/inmemory/src/namespace_state.rs @@ -17,7 +17,7 @@ use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use itertools::Itertools; -use std::collections::HashMap; +use std::collections::{hash_map, HashMap}; // Represents the state of a namespace #[derive(Debug, Clone, Default)] @@ -153,22 +153,25 @@ impl NamespaceState { let (parent_namespace_state, child_namespace_name) = self.get_mut_parent_namespace_of(namespace_ident)?; - match parent_namespace_state.namespaces.try_insert( - child_namespace_name, - NamespaceState { - properties, - namespaces: HashMap::new(), - table_metadata_locations: HashMap::new(), - }, - ) { - Err(_) => Err(Error::new( + match parent_namespace_state + .namespaces + .entry(child_namespace_name) + { + hash_map::Entry::Occupied(_) => Err(Error::new( ErrorKind::Unexpected, format!( "Cannot create namespace {:?}. Namespace already exists", namespace_ident ), )), - Ok(_) => Ok(()), + hash_map::Entry::Vacant(entry) => { + let _ = entry.insert(NamespaceState { + properties, + namespaces: HashMap::new(), + table_metadata_locations: HashMap::new(), + }); + Ok(()) + } } } @@ -261,16 +264,19 @@ impl NamespaceState { match namespace .table_metadata_locations - .try_insert(table_ident.name().to_string(), metadata_location) + .entry(table_ident.name().to_string()) { - Err(_) => Err(Error::new( + hash_map::Entry::Occupied(_) => Err(Error::new( ErrorKind::Unexpected, format!( - "Cannot insert table {:?} as it already exists.", + "Cannot insert table {:?}. Table already exists.", table_ident ), )), - Ok(_) => Ok(()), + hash_map::Entry::Vacant(entry) => { + let _ = entry.insert(metadata_location); + Ok(()) + } } } From 8a6b826691c0a888457eda0f90ea2713fe722ff8 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Wed, 24 Jul 2024 19:55:28 -0400 Subject: [PATCH 07/15] Remove TempFile --- crates/catalog/inmemory/Cargo.toml | 2 +- crates/catalog/inmemory/src/catalog.rs | 26 +++++++++++++++++++------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/crates/catalog/inmemory/Cargo.toml b/crates/catalog/inmemory/Cargo.toml index 9babd1ec9..5a991bb9e 100644 --- a/crates/catalog/inmemory/Cargo.toml +++ b/crates/catalog/inmemory/Cargo.toml @@ -34,8 +34,8 @@ async-trait = { workspace = true } iceberg = { workspace = true } itertools = { workspace = true } serde_json = { workspace = true } -tempfile = { workspace = true } uuid = { workspace = true, features = ["v4"] } [dev-dependencies] +tempfile = { workspace = true } tokio = { workspace = true } diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index 8cadbc8dc..4f64795e5 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -31,7 +31,6 @@ use iceberg::Result; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, }; -use tempfile::TempDir; use crate::namespace_state::NamespaceState; @@ -40,14 +39,16 @@ use crate::namespace_state::NamespaceState; pub struct InMemoryCatalog { root_namespace_state: Mutex, file_io: FileIO, + default_table_root_location: String, } impl InMemoryCatalog { /// Creates an in-memory catalog. - pub fn new(file_io: FileIO) -> Self { + pub fn new(file_io: FileIO, default_table_root_location: String) -> Self { Self { root_namespace_state: Mutex::new(NamespaceState::new()), file_io, + default_table_root_location, } } } @@ -197,15 +198,22 @@ impl Catalog for InMemoryCatalog { let (table_creation, location) = match table_creation.location.clone() { Some(location) => (table_creation, location), None => { - let tmp_dir = TempDir::new()?; - let location = tmp_dir.path().to_str().unwrap().to_string(); + let location = format!( + "{}/{}/{}", + self.default_table_root_location, + table_ident.namespace().join("/"), + table_ident.name() + ); + let new_table_creation = TableCreation { location: Some(location.clone()), ..table_creation }; + (new_table_creation, location) } }; + let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; let metadata_location = create_metadata_location(&location, 0)?; @@ -289,17 +297,21 @@ impl Catalog for InMemoryCatalog { #[cfg(test)] mod tests { use iceberg::io::FileIOBuilder; + use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; use std::collections::HashSet; use std::hash::Hash; use std::iter::FromIterator; - - use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use tempfile::TempDir; use super::*; fn new_inmemory_catalog() -> impl Catalog { + let tmp_dir = TempDir::new().unwrap(); + let default_table_root_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - InMemoryCatalog::new(file_io) + + InMemoryCatalog::new(file_io, default_table_root_location) } async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { From cedf7ccf91cb0ab8b77ac328edb481fc52218515 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Wed, 24 Jul 2024 19:59:33 -0400 Subject: [PATCH 08/15] Use futures::lock::Mutex instead --- Cargo.toml | 1 - crates/catalog/inmemory/Cargo.toml | 2 +- crates/catalog/inmemory/src/catalog.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b16e5ff46..c4f8482cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,6 @@ arrow-string = { version = "52" } async-stream = "0.3.5" async-trait = "0.1" async-std = "1.12.0" -async-lock = "3.4.0" aws-config = "1.1.8" aws-sdk-glue = "1.21.0" bimap = "0.6" diff --git a/crates/catalog/inmemory/Cargo.toml b/crates/catalog/inmemory/Cargo.toml index 5a991bb9e..eb8a4f307 100644 --- a/crates/catalog/inmemory/Cargo.toml +++ b/crates/catalog/inmemory/Cargo.toml @@ -29,8 +29,8 @@ license = { workspace = true } keywords = ["iceberg", "inmemory", "catalog"] [dependencies] -async-lock = { workspace = true } async-trait = { workspace = true } +futures = { workspace = true } iceberg = { workspace = true } itertools = { workspace = true } serde_json = { workspace = true } diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index 4f64795e5..347d5979a 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -17,7 +17,7 @@ //! This module contains in-memory catalog implementation. -use async_lock::Mutex; +use futures::lock::Mutex; use iceberg::io::FileIO; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use itertools::Itertools; From a1dcb4a62ceb5b43a2cbc8bcf52f96681993f234 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Wed, 24 Jul 2024 20:11:44 -0400 Subject: [PATCH 09/15] Minor clean up --- crates/catalog/inmemory/src/catalog.rs | 137 +++++++----------- .../catalog/inmemory/src/namespace_state.rs | 39 +++-- 2 files changed, 81 insertions(+), 95 deletions(-) diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index 347d5979a..8428c4a67 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -53,30 +53,6 @@ impl InMemoryCatalog { } } -/// Create metadata location from `location` and `version` -fn create_metadata_location(location: impl AsRef, version: i32) -> Result { - if version < 0 { - Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Table metadata version: '{}' must be a non-negative integer", - version - ), - )) - } else { - let version = format!("{:0>5}", version); - let id = Uuid::new_v4(); - let metadata_location = format!( - "{}/metadata/{}-{}.metadata.json", - location.as_ref(), - version, - id - ); - - Ok(metadata_location) - } -} - #[async_trait] impl Catalog for InMemoryCatalog { /// List namespaces inside the Catalog. @@ -87,11 +63,15 @@ impl Catalog for InMemoryCatalog { let root_namespace_state = self.root_namespace_state.lock().await; match maybe_parent { - None => Ok(root_namespace_state - .list_top_level_namespaces() - .into_iter() - .map(|str| NamespaceIdent::new(str.to_string())) - .collect_vec()), + None => { + let namespaces = root_namespace_state + .list_top_level_namespaces() + .into_iter() + .map(|str| NamespaceIdent::new(str.to_string())) + .collect_vec(); + + Ok(namespaces) + } Some(parent_namespace_ident) => { let namespaces = root_namespace_state .list_namespaces_under(parent_namespace_ident)? @@ -184,55 +164,49 @@ impl Catalog for InMemoryCatalog { let table_name = table_creation.name.clone(); let table_ident = TableIdent::new(namespace_ident.clone(), table_name); - let table_exists = root_namespace_state.table_exists(&table_ident)?; - - if table_exists { - Err(Error::new( - ErrorKind::Unexpected, - format!( - "Cannot create table {:?}. Table already exists", - &table_ident - ), - )) - } else { - let (table_creation, location) = match table_creation.location.clone() { - Some(location) => (table_creation, location), - None => { - let location = format!( - "{}/{}/{}", - self.default_table_root_location, - table_ident.namespace().join("/"), - table_ident.name() - ); - - let new_table_creation = TableCreation { - location: Some(location.clone()), - ..table_creation - }; - - (new_table_creation, location) - } - }; - - let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; - let metadata_location = create_metadata_location(&location, 0)?; - - self.file_io - .new_output(&metadata_location)? - .write(serde_json::to_vec(&metadata)?.into()) - .await?; - - root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?; - - let table = Table::builder() - .file_io(self.file_io.clone()) - .metadata_location(metadata_location) - .metadata(metadata) - .identifier(table_ident) - .build(); - - Ok(table) - } + + let (table_creation, location) = match table_creation.location.clone() { + Some(location) => (table_creation, location), + None => { + let location = format!( + "{}/{}/{}", + self.default_table_root_location, + table_ident.namespace().join("/"), + table_ident.name() + ); + + let new_table_creation = TableCreation { + location: Some(location.clone()), + ..table_creation + }; + + (new_table_creation, location) + } + }; + + let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + let metadata_location = format!( + "{}/metadata/{}-{}.metadata.json", + &location, + 0, + Uuid::new_v4() + ); + + self.file_io + .new_output(&metadata_location)? + .write(serde_json::to_vec(&metadata)?.into()) + .await?; + + root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?; + + let table = Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(table_ident) + .build(); + + Ok(table) } /// Load table from the catalog. @@ -282,6 +256,7 @@ impl Catalog for InMemoryCatalog { new_root_namespace_state.remove_existing_table(src_table_ident)?; new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?; *root_namespace_state = new_root_namespace_state; + Ok(()) } @@ -604,7 +579,7 @@ mod tests { .unwrap_err() .to_string(), format!( - "Unexpected => Cannot create namespace {:?}. Namespace already exists", + "Unexpected => Cannot create namespace {:?}. Namespace already exists.", &namespace_ident ) ); @@ -1092,7 +1067,7 @@ mod tests { .unwrap_err() .to_string(), format!( - "Unexpected => Cannot create table {:?}. Table already exists", + "Unexpected => Cannot create table {:?}. Table already exists.", &table_ident ) ); @@ -1513,7 +1488,7 @@ mod tests { .unwrap_err() .to_string(), format!( - "Unexpected => Cannot insert table {:? }. Table already exists.", + "Unexpected => Cannot create table {:? }. Table already exists.", &dst_table_ident ), ); diff --git a/crates/catalog/inmemory/src/namespace_state.rs b/crates/catalog/inmemory/src/namespace_state.rs index 706ce8c8a..fb70eee89 100644 --- a/crates/catalog/inmemory/src/namespace_state.rs +++ b/crates/catalog/inmemory/src/namespace_state.rs @@ -44,6 +44,26 @@ fn no_such_table_err(table_ident: &TableIdent) -> Result { )) } +fn namespace_already_exists_err(namespace_ident: &NamespaceIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot create namespace {:?}. Namespace already exists.", + namespace_ident + ), + )) +} + +fn table_already_exists_err(table_ident: &TableIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot create table {:?}. Table already exists.", + table_ident + ), + )) +} + impl NamespaceState { // Creates a new namespace state pub(crate) fn new() -> Self { @@ -157,19 +177,14 @@ impl NamespaceState { .namespaces .entry(child_namespace_name) { - hash_map::Entry::Occupied(_) => Err(Error::new( - ErrorKind::Unexpected, - format!( - "Cannot create namespace {:?}. Namespace already exists", - namespace_ident - ), - )), + hash_map::Entry::Occupied(_) => namespace_already_exists_err(namespace_ident), hash_map::Entry::Vacant(entry) => { let _ = entry.insert(NamespaceState { properties, namespaces: HashMap::new(), table_metadata_locations: HashMap::new(), }); + Ok(()) } } @@ -220,6 +235,7 @@ impl NamespaceState { ) -> Result<()> { let properties = self.get_mut_properties(namespace_ident)?; *properties = new_properties; + Ok(()) } @@ -266,15 +282,10 @@ impl NamespaceState { .table_metadata_locations .entry(table_ident.name().to_string()) { - hash_map::Entry::Occupied(_) => Err(Error::new( - ErrorKind::Unexpected, - format!( - "Cannot insert table {:?}. Table already exists.", - table_ident - ), - )), + hash_map::Entry::Occupied(_) => table_already_exists_err(table_ident), hash_map::Entry::Vacant(entry) => { let _ = entry.insert(metadata_location); + Ok(()) } } From 807dd4cf649b5c367f25afc59f99341d6995c337 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Thu, 25 Jul 2024 12:58:04 -0400 Subject: [PATCH 10/15] Make root configurable in FS FileIO and remove default_table_root_location from Catalog --- crates/catalog/inmemory/src/catalog.rs | 20 ++++++++++---------- crates/iceberg/src/io/mod.rs | 2 +- crates/iceberg/src/io/storage_fs.rs | 22 ++++++++++++++++------ 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index 8428c4a67..c050f037f 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -39,16 +39,14 @@ use crate::namespace_state::NamespaceState; pub struct InMemoryCatalog { root_namespace_state: Mutex, file_io: FileIO, - default_table_root_location: String, } impl InMemoryCatalog { /// Creates an in-memory catalog. - pub fn new(file_io: FileIO, default_table_root_location: String) -> Self { + pub fn new(file_io: FileIO) -> Self { Self { root_namespace_state: Mutex::new(NamespaceState::new()), file_io, - default_table_root_location, } } } @@ -169,8 +167,7 @@ impl Catalog for InMemoryCatalog { Some(location) => (table_creation, location), None => { let location = format!( - "{}/{}/{}", - self.default_table_root_location, + "{}/{}", table_ident.namespace().join("/"), table_ident.name() ); @@ -272,6 +269,7 @@ impl Catalog for InMemoryCatalog { #[cfg(test)] mod tests { use iceberg::io::FileIOBuilder; + use iceberg::io::ROOT_LOCATION; use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; use std::collections::HashSet; use std::hash::Hash; @@ -282,11 +280,13 @@ mod tests { fn new_inmemory_catalog() -> impl Catalog { let tmp_dir = TempDir::new().unwrap(); - let default_table_root_location = tmp_dir.path().to_str().unwrap().to_string(); - - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let root_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIOBuilder::new_fs_io() + .with_prop(ROOT_LOCATION, root_location) + .build() + .unwrap(); - InMemoryCatalog::new(file_io, default_table_root_location) + InMemoryCatalog::new(file_io) } async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { @@ -999,7 +999,7 @@ mod tests { &catalog.load_table(&expected_table_ident).await.unwrap(), &expected_table_ident, &simple_table_schema(), - ) + ); } #[tokio::test] diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 914293da3..a1800ecb2 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -59,4 +59,4 @@ pub use storage_s3::*; #[cfg(feature = "storage-fs")] mod storage_fs; #[cfg(feature = "storage-fs")] -use storage_fs::*; +pub use storage_fs::*; diff --git a/crates/iceberg/src/io/storage_fs.rs b/crates/iceberg/src/io/storage_fs.rs index 38c3fa129..e0f583594 100644 --- a/crates/iceberg/src/io/storage_fs.rs +++ b/crates/iceberg/src/io/storage_fs.rs @@ -20,12 +20,17 @@ use opendal::{Operator, Scheme}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; +/// Property root location +pub const ROOT_LOCATION: &str = "root"; + /// # TODO /// /// opendal has a plan to introduce native config support. /// We manually parse the config here and those code will be finally removed. #[derive(Default, Clone)] -pub(crate) struct FsConfig {} +pub(crate) struct FsConfig { + root_location: String, +} impl Debug for FsConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -35,15 +40,20 @@ impl Debug for FsConfig { impl FsConfig { /// Decode from iceberg props. - pub fn new(_: HashMap) -> Self { - Self::default() + pub fn new(m: HashMap) -> Self { + let root_location = match m.get(ROOT_LOCATION) { + Some(root_location) => root_location.clone(), + None => "/".to_string(), + }; + + Self { root_location } } - /// Build new opendal operator from give path. + /// Build new opendal operator from given path. /// - /// fs always build from `/` + /// fs builds from `/` by default pub fn build(&self, _: &str) -> Result { - let m = HashMap::from_iter([("root".to_string(), "/".to_string())]); + let m = HashMap::from_iter([(ROOT_LOCATION.to_string(), self.root_location.clone())]); Ok(Operator::via_map(Scheme::Fs, m)?) } } From 1d010721ee417cc38f56d7580871c2fcbf07b889 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Thu, 25 Jul 2024 13:36:17 -0400 Subject: [PATCH 11/15] Revert "Make root configurable in FS FileIO and remove default_table_root_location from Catalog" This reverts commit 807dd4cf649b5c367f25afc59f99341d6995c337. --- crates/catalog/inmemory/src/catalog.rs | 20 ++++++++++---------- crates/iceberg/src/io/mod.rs | 2 +- crates/iceberg/src/io/storage_fs.rs | 22 ++++++---------------- 3 files changed, 17 insertions(+), 27 deletions(-) diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index c050f037f..8428c4a67 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -39,14 +39,16 @@ use crate::namespace_state::NamespaceState; pub struct InMemoryCatalog { root_namespace_state: Mutex, file_io: FileIO, + default_table_root_location: String, } impl InMemoryCatalog { /// Creates an in-memory catalog. - pub fn new(file_io: FileIO) -> Self { + pub fn new(file_io: FileIO, default_table_root_location: String) -> Self { Self { root_namespace_state: Mutex::new(NamespaceState::new()), file_io, + default_table_root_location, } } } @@ -167,7 +169,8 @@ impl Catalog for InMemoryCatalog { Some(location) => (table_creation, location), None => { let location = format!( - "{}/{}", + "{}/{}/{}", + self.default_table_root_location, table_ident.namespace().join("/"), table_ident.name() ); @@ -269,7 +272,6 @@ impl Catalog for InMemoryCatalog { #[cfg(test)] mod tests { use iceberg::io::FileIOBuilder; - use iceberg::io::ROOT_LOCATION; use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; use std::collections::HashSet; use std::hash::Hash; @@ -280,13 +282,11 @@ mod tests { fn new_inmemory_catalog() -> impl Catalog { let tmp_dir = TempDir::new().unwrap(); - let root_location = tmp_dir.path().to_str().unwrap().to_string(); - let file_io = FileIOBuilder::new_fs_io() - .with_prop(ROOT_LOCATION, root_location) - .build() - .unwrap(); + let default_table_root_location = tmp_dir.path().to_str().unwrap().to_string(); + + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - InMemoryCatalog::new(file_io) + InMemoryCatalog::new(file_io, default_table_root_location) } async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { @@ -999,7 +999,7 @@ mod tests { &catalog.load_table(&expected_table_ident).await.unwrap(), &expected_table_ident, &simple_table_schema(), - ); + ) } #[tokio::test] diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index a1800ecb2..914293da3 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -59,4 +59,4 @@ pub use storage_s3::*; #[cfg(feature = "storage-fs")] mod storage_fs; #[cfg(feature = "storage-fs")] -pub use storage_fs::*; +use storage_fs::*; diff --git a/crates/iceberg/src/io/storage_fs.rs b/crates/iceberg/src/io/storage_fs.rs index e0f583594..38c3fa129 100644 --- a/crates/iceberg/src/io/storage_fs.rs +++ b/crates/iceberg/src/io/storage_fs.rs @@ -20,17 +20,12 @@ use opendal::{Operator, Scheme}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; -/// Property root location -pub const ROOT_LOCATION: &str = "root"; - /// # TODO /// /// opendal has a plan to introduce native config support. /// We manually parse the config here and those code will be finally removed. #[derive(Default, Clone)] -pub(crate) struct FsConfig { - root_location: String, -} +pub(crate) struct FsConfig {} impl Debug for FsConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -40,20 +35,15 @@ impl Debug for FsConfig { impl FsConfig { /// Decode from iceberg props. - pub fn new(m: HashMap) -> Self { - let root_location = match m.get(ROOT_LOCATION) { - Some(root_location) => root_location.clone(), - None => "/".to_string(), - }; - - Self { root_location } + pub fn new(_: HashMap) -> Self { + Self::default() } - /// Build new opendal operator from given path. + /// Build new opendal operator from give path. /// - /// fs builds from `/` by default + /// fs always build from `/` pub fn build(&self, _: &str) -> Result { - let m = HashMap::from_iter([(ROOT_LOCATION.to_string(), self.root_location.clone())]); + let m = HashMap::from_iter([("root".to_string(), "/".to_string())]); Ok(Operator::via_map(Scheme::Fs, m)?) } } From d778682361af6bd4fc2dcd3827736f1504fb5e54 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Thu, 25 Jul 2024 13:59:42 -0400 Subject: [PATCH 12/15] Remove default_table_root_location from Catalog and explicitly configure a location for tables in unit tests --- crates/catalog/inmemory/src/catalog.rs | 52 ++++++-------------------- 1 file changed, 11 insertions(+), 41 deletions(-) diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index 8428c4a67..1af358748 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -39,16 +39,14 @@ use crate::namespace_state::NamespaceState; pub struct InMemoryCatalog { root_namespace_state: Mutex, file_io: FileIO, - default_table_root_location: String, } impl InMemoryCatalog { /// Creates an in-memory catalog. - pub fn new(file_io: FileIO, default_table_root_location: String) -> Self { + pub fn new(file_io: FileIO) -> Self { Self { root_namespace_state: Mutex::new(NamespaceState::new()), file_io, - default_table_root_location, } } } @@ -169,8 +167,7 @@ impl Catalog for InMemoryCatalog { Some(location) => (table_creation, location), None => { let location = format!( - "{}/{}/{}", - self.default_table_root_location, + "{}/{}", table_ident.namespace().join("/"), table_ident.name() ); @@ -281,12 +278,8 @@ mod tests { use super::*; fn new_inmemory_catalog() -> impl Catalog { - let tmp_dir = TempDir::new().unwrap(); - let default_table_root_location = tmp_dir.path().to_str().unwrap().to_string(); - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - - InMemoryCatalog::new(file_io, default_table_root_location) + InMemoryCatalog::new(file_io) } async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { @@ -319,12 +312,16 @@ mod tests { } async fn create_table(catalog: &C, table_ident: &TableIdent) { + let tmp_dir = TempDir::new().unwrap(); + let location = tmp_dir.path().to_str().unwrap().to_string(); + let _ = catalog .create_table( &table_ident.namespace, TableCreation::builder() .name(table_ident.name().into()) .schema(simple_table_schema()) + .location(location) .build(), ) .await @@ -971,37 +968,6 @@ mod tests { .unwrap()); } - #[tokio::test] - async fn test_create_table_without_location() { - let catalog = new_inmemory_catalog(); - let namespace_ident = NamespaceIdent::new("a".into()); - create_namespace(&catalog, &namespace_ident).await; - - let table_name = "tbl1"; - let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); - - assert_table_eq( - &catalog - .create_table( - &namespace_ident, - TableCreation::builder() - .name(table_name.into()) - .schema(simple_table_schema()) - .build(), - ) - .await - .unwrap(), - &expected_table_ident, - &simple_table_schema(), - ); - - assert_table_eq( - &catalog.load_table(&expected_table_ident).await.unwrap(), - &expected_table_ident, - &simple_table_schema(), - ) - } - #[tokio::test] async fn test_create_table_with_location() { let tmp_dir = TempDir::new().unwrap(); @@ -1054,6 +1020,9 @@ mod tests { let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); create_table(&catalog, &table_ident).await; + let tmp_dir = TempDir::new().unwrap(); + let location = tmp_dir.path().to_str().unwrap().to_string(); + assert_eq!( catalog .create_table( @@ -1061,6 +1030,7 @@ mod tests { TableCreation::builder() .name(table_name.into()) .schema(simple_table_schema()) + .location(location) .build() ) .await From b4c6e9125ec34c28c30f612871b77921c2b9c65e Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Fri, 26 Jul 2024 09:21:46 -0400 Subject: [PATCH 13/15] lowercase catalog --- crates/catalog/inmemory/src/catalog.rs | 2 +- crates/iceberg/src/catalog/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index 1af358748..07381cacd 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -53,7 +53,7 @@ impl InMemoryCatalog { #[async_trait] impl Catalog for InMemoryCatalog { - /// List namespaces inside the Catalog. + /// List namespaces inside the catalog. async fn list_namespaces( &self, maybe_parent: Option<&NamespaceIdent>, diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index d6311e377..b49a80c84 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -35,7 +35,7 @@ use uuid::Uuid; /// The catalog API for Iceberg Rust. #[async_trait] pub trait Catalog: Debug + Sync + Send { - /// List namespaces inside the Catalog. + /// List namespaces inside the catalog. async fn list_namespaces(&self, parent: Option<&NamespaceIdent>) -> Result>; From 055122ee075fac18b3055f94333001bddd301f60 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Fri, 26 Jul 2024 09:22:58 -0400 Subject: [PATCH 14/15] Use default instead of new --- crates/catalog/inmemory/src/catalog.rs | 2 +- crates/catalog/inmemory/src/namespace_state.rs | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index 07381cacd..e16889451 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -45,7 +45,7 @@ impl InMemoryCatalog { /// Creates an in-memory catalog. pub fn new(file_io: FileIO) -> Self { Self { - root_namespace_state: Mutex::new(NamespaceState::new()), + root_namespace_state: Mutex::new(NamespaceState::default()), file_io, } } diff --git a/crates/catalog/inmemory/src/namespace_state.rs b/crates/catalog/inmemory/src/namespace_state.rs index fb70eee89..875e0c7e4 100644 --- a/crates/catalog/inmemory/src/namespace_state.rs +++ b/crates/catalog/inmemory/src/namespace_state.rs @@ -65,15 +65,6 @@ fn table_already_exists_err(table_ident: &TableIdent) -> Result { } impl NamespaceState { - // Creates a new namespace state - pub(crate) fn new() -> Self { - Self { - properties: HashMap::new(), - namespaces: HashMap::new(), - table_metadata_locations: HashMap::new(), - } - } - // Returns the state of the given namespace or an error if doesn't exist fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<&NamespaceState> { let mut acc_name_parts = vec![]; From 505e18e387d7a0348987d1f3be8d65828bef38f6 Mon Sep 17 00:00:00 2001 From: fqaiser94 Date: Fri, 26 Jul 2024 09:28:57 -0400 Subject: [PATCH 15/15] Change references to memory --- crates/catalog/inmemory/Cargo.toml | 6 +- crates/catalog/inmemory/README.md | 10 +- crates/catalog/inmemory/src/catalog.rs | 126 ++++++++++++------------- crates/catalog/inmemory/src/lib.rs | 2 +- 4 files changed, 72 insertions(+), 72 deletions(-) diff --git a/crates/catalog/inmemory/Cargo.toml b/crates/catalog/inmemory/Cargo.toml index eb8a4f307..c62974a15 100644 --- a/crates/catalog/inmemory/Cargo.toml +++ b/crates/catalog/inmemory/Cargo.toml @@ -16,17 +16,17 @@ # under the License. [package] -name = "iceberg-catalog-inmemory" +name = "iceberg-catalog-memory" version = { workspace = true } edition = { workspace = true } homepage = { workspace = true } rust-version = { workspace = true } categories = ["database"] -description = "Apache Iceberg Rust In-Memory Catalog API" +description = "Apache Iceberg Rust Memory Catalog API" repository = { workspace = true } license = { workspace = true } -keywords = ["iceberg", "inmemory", "catalog"] +keywords = ["iceberg", "memory", "catalog"] [dependencies] async-trait = { workspace = true } diff --git a/crates/catalog/inmemory/README.md b/crates/catalog/inmemory/README.md index f631aabc9..5b04f78ab 100644 --- a/crates/catalog/inmemory/README.md +++ b/crates/catalog/inmemory/README.md @@ -17,11 +17,11 @@ ~ under the License. --> -# Apache Iceberg In-Memory Catalog Official Native Rust Implementation +# Apache Iceberg Memory Catalog Official Native Rust Implementation -[![crates.io](https://img.shields.io/crates/v/iceberg-catalog-inmemory.svg)](https://crates.io/crates/iceberg-catalog-inmemory) -[![docs.rs](https://img.shields.io/docsrs/iceberg-catalog-inmemory.svg)](https://docs.rs/iceberg/latest/iceberg-catalog-inmemory/) +[![crates.io](https://img.shields.io/crates/v/iceberg-catalog-memory.svg)](https://crates.io/crates/iceberg-catalog-memory) +[![docs.rs](https://img.shields.io/docsrs/iceberg-catalog-memory.svg)](https://docs.rs/iceberg/latest/iceberg-catalog-memory/) -This crate contains the official Native Rust implementation of Apache Iceberg In-Memory Catalog. +This crate contains the official Native Rust implementation of Apache Iceberg Memory Catalog. -See the [API documentation](https://docs.rs/iceberg-catalog-inmemory/latest) for examples and the full API. +See the [API documentation](https://docs.rs/iceberg-catalog-memory/latest) for examples and the full API. diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs index e16889451..eb2a545c6 100644 --- a/crates/catalog/inmemory/src/catalog.rs +++ b/crates/catalog/inmemory/src/catalog.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! This module contains in-memory catalog implementation. +//! This module contains memory catalog implementation. use futures::lock::Mutex; use iceberg::io::FileIO; @@ -34,15 +34,15 @@ use iceberg::{ use crate::namespace_state::NamespaceState; -/// In-memory catalog implementation. +/// Memory catalog implementation. #[derive(Debug)] -pub struct InMemoryCatalog { +pub struct MemoryCatalog { root_namespace_state: Mutex, file_io: FileIO, } -impl InMemoryCatalog { - /// Creates an in-memory catalog. +impl MemoryCatalog { + /// Creates an memory catalog. pub fn new(file_io: FileIO) -> Self { Self { root_namespace_state: Mutex::new(NamespaceState::default()), @@ -52,7 +52,7 @@ impl InMemoryCatalog { } #[async_trait] -impl Catalog for InMemoryCatalog { +impl Catalog for MemoryCatalog { /// List namespaces inside the catalog. async fn list_namespaces( &self, @@ -261,7 +261,7 @@ impl Catalog for InMemoryCatalog { async fn update_table(&self, _commit: TableCommit) -> Result
{ Err(Error::new( ErrorKind::FeatureUnsupported, - "In-memory catalog does not currently support updating tables.", + "MemoryCatalog does not currently support updating tables.", )) } } @@ -277,9 +277,9 @@ mod tests { use super::*; - fn new_inmemory_catalog() -> impl Catalog { + fn new_memory_catalog() -> impl Catalog { let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - InMemoryCatalog::new(file_io) + MemoryCatalog::new(file_io) } async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { @@ -376,14 +376,14 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_empty_vector() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); } #[tokio::test] async fn test_list_namespaces_returns_single_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("abc".into()); create_namespace(&catalog, &namespace_ident).await; @@ -395,7 +395,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_multiple_namespaces() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; @@ -408,7 +408,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_only_top_level_namespaces() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_3 = NamespaceIdent::new("b".into()); @@ -426,7 +426,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_no_namespaces_under_parent() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; @@ -442,7 +442,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_namespace_under_parent() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_3 = NamespaceIdent::new("c".into()); @@ -468,7 +468,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_multiple_namespaces_under_parent() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_1 = NamespaceIdent::new("a".to_string()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(); let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); @@ -503,7 +503,7 @@ mod tests { #[tokio::test] async fn test_namespace_exists_returns_false() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -515,7 +515,7 @@ mod tests { #[tokio::test] async fn test_namespace_exists_returns_true() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -524,7 +524,7 @@ mod tests { #[tokio::test] async fn test_create_namespace_with_empty_properties() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("a".into()); assert_eq!( @@ -543,7 +543,7 @@ mod tests { #[tokio::test] async fn test_create_namespace_with_properties() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("abc".into()); let mut properties: HashMap = HashMap::new(); @@ -565,7 +565,7 @@ mod tests { #[tokio::test] async fn test_create_namespace_throws_error_if_namespace_already_exists() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -589,7 +589,7 @@ mod tests { #[tokio::test] async fn test_create_nested_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let parent_namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &parent_namespace_ident).await; @@ -611,7 +611,7 @@ mod tests { #[tokio::test] async fn test_create_deeply_nested_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -634,7 +634,7 @@ mod tests { #[tokio::test] async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); @@ -656,7 +656,7 @@ mod tests { #[tokio::test] async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist( ) { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident_a).await; @@ -691,7 +691,7 @@ mod tests { #[tokio::test] async fn test_get_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("abc".into()); let mut properties: HashMap = HashMap::new(); @@ -709,7 +709,7 @@ mod tests { #[tokio::test] async fn test_get_nested_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -722,7 +722,7 @@ mod tests { #[tokio::test] async fn test_get_deeply_nested_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -744,7 +744,7 @@ mod tests { #[tokio::test] async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; let non_existent_namespace_ident = NamespaceIdent::new("b".into()); @@ -763,7 +763,7 @@ mod tests { #[tokio::test] async fn test_update_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("abc".into()); create_namespace(&catalog, &namespace_ident).await; @@ -783,7 +783,7 @@ mod tests { #[tokio::test] async fn test_update_nested_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -804,7 +804,7 @@ mod tests { #[tokio::test] async fn test_update_deeply_nested_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -834,7 +834,7 @@ mod tests { #[tokio::test] async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await; let non_existent_namespace_ident = NamespaceIdent::new("def".into()); @@ -853,7 +853,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("abc".into()); create_namespace(&catalog, &namespace_ident).await; @@ -864,7 +864,7 @@ mod tests { #[tokio::test] async fn test_drop_nested_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -881,7 +881,7 @@ mod tests { #[tokio::test] async fn test_drop_deeply_nested_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -915,7 +915,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let non_existent_namespace_ident = NamespaceIdent::new("abc".into()); assert_eq!( @@ -933,7 +933,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; let non_existent_namespace_ident = @@ -953,7 +953,7 @@ mod tests { #[tokio::test] async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -971,7 +971,7 @@ mod tests { #[tokio::test] async fn test_create_table_with_location() { let tmp_dir = TempDir::new().unwrap(); - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1013,7 +1013,7 @@ mod tests { #[tokio::test] async fn test_create_table_throws_error_if_table_with_same_name_already_exists() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; let table_name = "tbl1"; @@ -1045,7 +1045,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_empty_vector() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1054,7 +1054,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_a_single_table() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1069,7 +1069,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_multiple_tables() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1085,7 +1085,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_tables_from_correct_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_1 = NamespaceIdent::new("n1".into()); let namespace_ident_2 = NamespaceIdent::new("n2".into()); create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; @@ -1112,7 +1112,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_table_under_nested_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1128,7 +1128,7 @@ mod tests { #[tokio::test] async fn test_list_tables_throws_error_if_namespace_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); @@ -1147,7 +1147,7 @@ mod tests { #[tokio::test] async fn test_drop_table() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1158,7 +1158,7 @@ mod tests { #[tokio::test] async fn test_drop_table_drops_table_under_nested_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1176,7 +1176,7 @@ mod tests { #[tokio::test] async fn test_drop_table_throws_error_if_namespace_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); let non_existent_table_ident = @@ -1197,7 +1197,7 @@ mod tests { #[tokio::test] async fn test_drop_table_throws_error_if_table_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1218,7 +1218,7 @@ mod tests { #[tokio::test] async fn test_table_exists_returns_true() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1229,7 +1229,7 @@ mod tests { #[tokio::test] async fn test_table_exists_returns_false() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1242,7 +1242,7 @@ mod tests { #[tokio::test] async fn test_table_exists_under_nested_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1261,7 +1261,7 @@ mod tests { #[tokio::test] async fn test_table_exists_throws_error_if_namespace_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); let non_existent_table_ident = @@ -1282,7 +1282,7 @@ mod tests { #[tokio::test] async fn test_rename_table_in_same_namespace() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1302,7 +1302,7 @@ mod tests { #[tokio::test] async fn test_rename_table_across_namespaces() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let src_namespace_ident = NamespaceIdent::new("a".into()); let dst_namespace_ident = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await; @@ -1328,7 +1328,7 @@ mod tests { #[tokio::test] async fn test_rename_table_src_table_is_same_as_dst_table() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into()); @@ -1347,7 +1347,7 @@ mod tests { #[tokio::test] async fn test_rename_table_across_nested_namespaces() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -1377,7 +1377,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into()); let src_table_ident = @@ -1402,7 +1402,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let src_namespace_ident = NamespaceIdent::new("n1".into()); let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into()); create_namespace(&catalog, &src_namespace_ident).await; @@ -1426,7 +1426,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_src_table_doesnt_exist() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1444,7 +1444,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_dst_table_already_exists() { - let catalog = new_inmemory_catalog(); + let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); diff --git a/crates/catalog/inmemory/src/lib.rs b/crates/catalog/inmemory/src/lib.rs index e06f1fa5b..8988ac7b2 100644 --- a/crates/catalog/inmemory/src/lib.rs +++ b/crates/catalog/inmemory/src/lib.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Iceberg in-memory Catalog API implementation. +//! Iceberg memory Catalog API implementation. #![deny(missing_docs)]