diff --git a/crates/catalog/inmemory/Cargo.toml b/crates/catalog/inmemory/Cargo.toml new file mode 100644 index 000000000..c62974a15 --- /dev/null +++ b/crates/catalog/inmemory/Cargo.toml @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-catalog-memory" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Rust Memory Catalog API" +repository = { workspace = true } +license = { workspace = true } +keywords = ["iceberg", "memory", "catalog"] + +[dependencies] +async-trait = { workspace = true } +futures = { workspace = true } +iceberg = { workspace = true } +itertools = { workspace = true } +serde_json = { workspace = true } +uuid = { workspace = true, features = ["v4"] } + +[dev-dependencies] +tempfile = { workspace = true } +tokio = { workspace = true } diff --git a/crates/catalog/inmemory/README.md b/crates/catalog/inmemory/README.md new file mode 100644 index 000000000..5b04f78ab --- /dev/null +++ b/crates/catalog/inmemory/README.md @@ -0,0 +1,27 @@ + + +# Apache Iceberg Memory Catalog Official Native Rust Implementation + +[![crates.io](https://img.shields.io/crates/v/iceberg-catalog-memory.svg)](https://crates.io/crates/iceberg-catalog-memory) +[![docs.rs](https://img.shields.io/docsrs/iceberg-catalog-memory.svg)](https://docs.rs/iceberg/latest/iceberg-catalog-memory/) + +This crate contains the official Native Rust implementation of Apache Iceberg Memory Catalog. + +See the [API documentation](https://docs.rs/iceberg-catalog-memory/latest) for examples and the full API. diff --git a/crates/catalog/inmemory/src/catalog.rs b/crates/catalog/inmemory/src/catalog.rs new file mode 100644 index 000000000..eb2a545c6 --- /dev/null +++ b/crates/catalog/inmemory/src/catalog.rs @@ -0,0 +1,1466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains memory catalog implementation. + +use futures::lock::Mutex; +use iceberg::io::FileIO; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; +use itertools::Itertools; +use std::collections::HashMap; +use uuid::Uuid; + +use async_trait::async_trait; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use crate::namespace_state::NamespaceState; + +/// Memory catalog implementation. +#[derive(Debug)] +pub struct MemoryCatalog { + root_namespace_state: Mutex, + file_io: FileIO, +} + +impl MemoryCatalog { + /// Creates an memory catalog. + pub fn new(file_io: FileIO) -> Self { + Self { + root_namespace_state: Mutex::new(NamespaceState::default()), + file_io, + } + } +} + +#[async_trait] +impl Catalog for MemoryCatalog { + /// List namespaces inside the catalog. + async fn list_namespaces( + &self, + maybe_parent: Option<&NamespaceIdent>, + ) -> Result> { + let root_namespace_state = self.root_namespace_state.lock().await; + + match maybe_parent { + None => { + let namespaces = root_namespace_state + .list_top_level_namespaces() + .into_iter() + .map(|str| NamespaceIdent::new(str.to_string())) + .collect_vec(); + + Ok(namespaces) + } + Some(parent_namespace_ident) => { + let namespaces = root_namespace_state + .list_namespaces_under(parent_namespace_ident)? + .into_iter() + .map(|name| NamespaceIdent::new(name.to_string())) + .collect_vec(); + + Ok(namespaces) + } + } + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + namespace_ident: &NamespaceIdent, + properties: HashMap, + ) -> Result { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?; + let namespace = Namespace::with_properties(namespace_ident.clone(), properties); + + Ok(namespace) + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result { + let root_namespace_state = self.root_namespace_state.lock().await; + + let namespace = Namespace::with_properties( + namespace_ident.clone(), + root_namespace_state + .get_properties(namespace_ident)? + .clone(), + ); + + Ok(namespace) + } + + /// Check if namespace exists in catalog. + async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result { + let guarded_namespaces = self.root_namespace_state.lock().await; + + Ok(guarded_namespaces.namespace_exists(namespace_ident)) + } + + /// Update a namespace inside the catalog. + /// + /// # Behavior + /// + /// The properties must be the full set of namespace. + async fn update_namespace( + &self, + namespace_ident: &NamespaceIdent, + properties: HashMap, + ) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.replace_properties(namespace_ident, properties) + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.remove_existing_namespace(namespace_ident) + } + + /// List tables from namespace. + async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result> { + let root_namespace_state = self.root_namespace_state.lock().await; + + let table_names = root_namespace_state.list_tables(namespace_ident)?; + let table_idents = table_names + .into_iter() + .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone())) + .collect_vec(); + + Ok(table_idents) + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + namespace_ident: &NamespaceIdent, + table_creation: TableCreation, + ) -> Result { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + let table_name = table_creation.name.clone(); + let table_ident = TableIdent::new(namespace_ident.clone(), table_name); + + let (table_creation, location) = match table_creation.location.clone() { + Some(location) => (table_creation, location), + None => { + let location = format!( + "{}/{}", + table_ident.namespace().join("/"), + table_ident.name() + ); + + let new_table_creation = TableCreation { + location: Some(location.clone()), + ..table_creation + }; + + (new_table_creation, location) + } + }; + + let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + let metadata_location = format!( + "{}/metadata/{}-{}.metadata.json", + &location, + 0, + Uuid::new_v4() + ); + + self.file_io + .new_output(&metadata_location)? + .write(serde_json::to_vec(&metadata)?.into()) + .await?; + + root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?; + + let table = Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(table_ident) + .build(); + + Ok(table) + } + + /// Load table from the catalog. + async fn load_table(&self, table_ident: &TableIdent) -> Result
{ + let root_namespace_state = self.root_namespace_state.lock().await; + + let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; + let input_file = self.file_io.new_input(metadata_location)?; + let metadata_content = input_file.read().await?; + let metadata = serde_json::from_slice::(&metadata_content)?; + let table = Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location.clone()) + .metadata(metadata) + .identifier(table_ident.clone()) + .build(); + + Ok(table) + } + + /// Drop a table from the catalog. + async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.remove_existing_table(table_ident) + } + + /// Check if a table exists in the catalog. + async fn table_exists(&self, table_ident: &TableIdent) -> Result { + let root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.table_exists(table_ident) + } + + /// Rename a table in the catalog. + async fn rename_table( + &self, + src_table_ident: &TableIdent, + dst_table_ident: &TableIdent, + ) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + let mut new_root_namespace_state = root_namespace_state.clone(); + let metadata_location = new_root_namespace_state + .get_existing_table_location(src_table_ident)? + .clone(); + new_root_namespace_state.remove_existing_table(src_table_ident)?; + new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?; + *root_namespace_state = new_root_namespace_state; + + Ok(()) + } + + /// Update a table to the catalog. + async fn update_table(&self, _commit: TableCommit) -> Result
{ + Err(Error::new( + ErrorKind::FeatureUnsupported, + "MemoryCatalog does not currently support updating tables.", + )) + } +} + +#[cfg(test)] +mod tests { + use iceberg::io::FileIOBuilder; + use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use std::collections::HashSet; + use std::hash::Hash; + use std::iter::FromIterator; + use tempfile::TempDir; + + use super::*; + + fn new_memory_catalog() -> impl Catalog { + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + MemoryCatalog::new(file_io) + } + + async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { + let _ = catalog + .create_namespace(namespace_ident, HashMap::new()) + .await + .unwrap(); + } + + async fn create_namespaces(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) { + for namespace_ident in namespace_idents { + let _ = create_namespace(catalog, namespace_ident).await; + } + } + + fn to_set(vec: Vec) -> HashSet { + HashSet::from_iter(vec) + } + + fn simple_table_schema() -> Schema { + Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "foo", + Type::Primitive(PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap() + } + + async fn create_table(catalog: &C, table_ident: &TableIdent) { + let tmp_dir = TempDir::new().unwrap(); + let location = tmp_dir.path().to_str().unwrap().to_string(); + + let _ = catalog + .create_table( + &table_ident.namespace, + TableCreation::builder() + .name(table_ident.name().into()) + .schema(simple_table_schema()) + .location(location) + .build(), + ) + .await + .unwrap(); + } + + async fn create_tables(catalog: &C, table_idents: Vec<&TableIdent>) { + for table_ident in table_idents { + create_table(catalog, table_ident).await; + } + } + + fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) { + assert_eq!(table.identifier(), expected_table_ident); + + let metadata = table.metadata(); + + assert_eq!(metadata.current_schema().as_ref(), expected_schema); + + let expected_partition_spec = PartitionSpec::builder() + .with_spec_id(0) + .with_fields(vec![]) + .build() + .unwrap(); + + assert_eq!( + metadata + .partition_specs_iter() + .map(|p| p.as_ref()) + .collect_vec(), + vec![&expected_partition_spec] + ); + + let expected_sorted_order = SortOrder::builder() + .with_order_id(0) + .with_fields(vec![]) + .build(expected_schema.clone()) + .unwrap(); + + assert_eq!( + metadata + .sort_orders_iter() + .map(|s| s.as_ref()) + .collect_vec(), + vec![&expected_sorted_order] + ); + + assert_eq!(metadata.properties(), &HashMap::new()); + + assert!(!table.readonly()); + } + + #[tokio::test] + async fn test_list_namespaces_returns_empty_vector() { + let catalog = new_memory_catalog(); + + assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); + } + + #[tokio::test] + async fn test_list_namespaces_returns_single_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("abc".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!( + catalog.list_namespaces(None).await.unwrap(), + vec![namespace_ident] + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_multiple_namespaces() { + let catalog = new_memory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1, namespace_ident_2]) + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_only_top_level_namespaces() { + let catalog = new_memory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::new("b".into()); + create_namespaces( + &catalog, + &vec![&namespace_ident_1, &namespace_ident_2, &namespace_ident_3], + ) + .await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1, namespace_ident_3]) + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_no_namespaces_under_parent() { + let catalog = new_memory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; + + assert_eq!( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap(), + vec![] + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_namespace_under_parent() { + let catalog = new_memory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::new("c".into()); + create_namespaces( + &catalog, + &vec![&namespace_ident_1, &namespace_ident_2, &namespace_ident_3], + ) + .await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1.clone(), namespace_ident_3]) + ); + + assert_eq!( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap(), + vec![NamespaceIdent::new("b".into())] + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_multiple_namespaces_under_parent() { + let catalog = new_memory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("a".to_string()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(); + let namespace_ident_5 = NamespaceIdent::new("b".into()); + create_namespaces( + &catalog, + &vec![ + &namespace_ident_1, + &namespace_ident_2, + &namespace_ident_3, + &namespace_ident_4, + &namespace_ident_5, + ], + ) + .await; + + assert_eq!( + to_set( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap() + ), + to_set(vec![ + NamespaceIdent::new("a".into()), + NamespaceIdent::new("b".into()), + NamespaceIdent::new("c".into()), + ]) + ); + } + + #[tokio::test] + async fn test_namespace_exists_returns_false() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert!(!catalog + .namespace_exists(&NamespaceIdent::new("b".into())) + .await + .unwrap()); + } + + #[tokio::test] + async fn test_namespace_exists_returns_true() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert!(catalog.namespace_exists(&namespace_ident).await.unwrap()); + } + + #[tokio::test] + async fn test_create_namespace_with_empty_properties() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + + assert_eq!( + catalog + .create_namespace(&namespace_ident, HashMap::new()) + .await + .unwrap(), + Namespace::new(namespace_ident.clone()) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_create_namespace_with_properties() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("abc".into()); + + let mut properties: HashMap = HashMap::new(); + properties.insert("k".into(), "v".into()); + + assert_eq!( + catalog + .create_namespace(&namespace_ident, properties.clone()) + .await + .unwrap(), + Namespace::with_properties(namespace_ident.clone(), properties.clone()) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, properties) + ); + } + + #[tokio::test] + async fn test_create_namespace_throws_error_if_namespace_already_exists() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!( + catalog + .create_namespace(&namespace_ident, HashMap::new()) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Cannot create namespace {:?}. Namespace already exists.", + &namespace_ident + ) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_create_nested_namespace() { + let catalog = new_memory_catalog(); + let parent_namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &parent_namespace_ident).await; + + let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&child_namespace_ident, HashMap::new()) + .await + .unwrap(), + Namespace::new(child_namespace_ident.clone()) + ); + + assert_eq!( + catalog.get_namespace(&child_namespace_ident).await.unwrap(), + Namespace::with_properties(child_namespace_ident, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_create_deeply_nested_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&namespace_ident_a_b_c, HashMap::new()) + .await + .unwrap(), + Namespace::new(namespace_ident_a_b_c.clone()) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b_c, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() { + let catalog = new_memory_catalog(); + + let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&nested_namespace_ident, HashMap::new()) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + NamespaceIdent::new("a".into()) + ) + ); + + assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); + } + + #[tokio::test] + async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist( + ) { + let catalog = new_memory_catalog(); + + let namespace_ident_a = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident_a).await; + + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&namespace_ident_a_b_c, HashMap::new()) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + NamespaceIdent::from_strs(vec!["a", "b"]).unwrap() + ) + ); + + assert_eq!( + catalog.list_namespaces(None).await.unwrap(), + vec![namespace_ident_a.clone()] + ); + + assert_eq!( + catalog + .list_namespaces(Some(&namespace_ident_a)) + .await + .unwrap(), + vec![] + ); + } + + #[tokio::test] + async fn test_get_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("abc".into()); + + let mut properties: HashMap = HashMap::new(); + properties.insert("k".into(), "v".into()); + let _ = catalog + .create_namespace(&namespace_ident, properties.clone()) + .await + .unwrap(); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, properties) + ) + } + + #[tokio::test] + async fn test_get_nested_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_get_deeply_nested_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + create_namespaces( + &catalog, + &vec![ + &namespace_ident_a, + &namespace_ident_a_b, + &namespace_ident_a_b_c, + ], + ) + .await; + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b_c, HashMap::new()) + ); + } + + #[tokio::test] + async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() { + let catalog = new_memory_catalog(); + create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; + + let non_existent_namespace_ident = NamespaceIdent::new("b".into()); + assert_eq!( + catalog + .get_namespace(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + async fn test_update_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("abc".into()); + create_namespace(&catalog, &namespace_ident).await; + + let mut new_properties: HashMap = HashMap::new(); + new_properties.insert("k".into(), "v".into()); + + catalog + .update_namespace(&namespace_ident, new_properties.clone()) + .await + .unwrap(); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, new_properties) + ) + } + + #[tokio::test] + async fn test_update_nested_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let mut new_properties = HashMap::new(); + new_properties.insert("k".into(), "v".into()); + + catalog + .update_namespace(&namespace_ident_a_b, new_properties.clone()) + .await + .unwrap(); + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b, new_properties) + ); + } + + #[tokio::test] + async fn test_update_deeply_nested_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + create_namespaces( + &catalog, + &vec![ + &namespace_ident_a, + &namespace_ident_a_b, + &namespace_ident_a_b_c, + ], + ) + .await; + + let mut new_properties = HashMap::new(); + new_properties.insert("k".into(), "v".into()); + + catalog + .update_namespace(&namespace_ident_a_b_c, new_properties.clone()) + .await + .unwrap(); + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b_c, new_properties) + ); + } + + #[tokio::test] + async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() { + let catalog = new_memory_catalog(); + create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await; + + let non_existent_namespace_ident = NamespaceIdent::new("def".into()); + assert_eq!( + catalog + .update_namespace(&non_existent_namespace_ident, HashMap::new()) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + async fn test_drop_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("abc".into()); + create_namespace(&catalog, &namespace_ident).await; + + catalog.drop_namespace(&namespace_ident).await.unwrap(); + + assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap()) + } + + #[tokio::test] + async fn test_drop_nested_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + catalog.drop_namespace(&namespace_ident_a_b).await.unwrap(); + + assert!(!catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); + + assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap()); + } + + #[tokio::test] + async fn test_drop_deeply_nested_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + create_namespaces( + &catalog, + &vec![ + &namespace_ident_a, + &namespace_ident_a_b, + &namespace_ident_a_b_c, + ], + ) + .await; + + catalog + .drop_namespace(&namespace_ident_a_b_c) + .await + .unwrap(); + + assert!(!catalog + .namespace_exists(&namespace_ident_a_b_c) + .await + .unwrap()); + + assert!(catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); + + assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap()); + } + + #[tokio::test] + async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() { + let catalog = new_memory_catalog(); + + let non_existent_namespace_ident = NamespaceIdent::new("abc".into()); + assert_eq!( + catalog + .drop_namespace(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() { + let catalog = new_memory_catalog(); + create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; + + let non_existent_namespace_ident = + NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap(); + assert_eq!( + catalog + .drop_namespace(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + catalog.drop_namespace(&namespace_ident_a).await.unwrap(); + + assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap()); + + assert!(!catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); + } + + #[tokio::test] + async fn test_create_table_with_location() { + let tmp_dir = TempDir::new().unwrap(); + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_name = "abc"; + let location = tmp_dir.path().to_str().unwrap().to_string(); + let table_creation = TableCreation::builder() + .name(table_name.into()) + .location(location.clone()) + .schema(simple_table_schema()) + .build(); + + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + + assert_table_eq( + &catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(), + &expected_table_ident, + &simple_table_schema(), + ); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + + assert!(table + .metadata_location() + .unwrap() + .to_string() + .starts_with(&location)); + + assert_table_eq( + &catalog.load_table(&expected_table_ident).await.unwrap(), + &expected_table_ident, + &simple_table_schema(), + ) + } + + #[tokio::test] + async fn test_create_table_throws_error_if_table_with_same_name_already_exists() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_table(&catalog, &table_ident).await; + + let tmp_dir = TempDir::new().unwrap(); + let location = tmp_dir.path().to_str().unwrap().to_string(); + + assert_eq!( + catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + .location(location) + .build() + ) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Cannot create table {:?}. Table already exists.", + &table_ident + ) + ); + } + + #[tokio::test] + async fn test_list_tables_returns_empty_vector() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]); + } + + #[tokio::test] + async fn test_list_tables_returns_a_single_table() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + assert_eq!( + catalog.list_tables(&namespace_ident).await.unwrap(), + vec![table_ident] + ); + } + + #[tokio::test] + async fn test_list_tables_returns_multiple_tables() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await; + + assert_eq!( + to_set(catalog.list_tables(&namespace_ident).await.unwrap()), + to_set(vec![table_ident_1, table_ident_2]) + ); + } + + #[tokio::test] + async fn test_list_tables_returns_tables_from_correct_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident_1 = NamespaceIdent::new("n1".into()); + let namespace_ident_2 = NamespaceIdent::new("n2".into()); + create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; + + let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into()); + let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into()); + let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into()); + let _ = create_tables( + &catalog, + vec![&table_ident_1, &table_ident_2, &table_ident_3], + ) + .await; + + assert_eq!( + to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()), + to_set(vec![table_ident_1, table_ident_2]) + ); + + assert_eq!( + to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()), + to_set(vec![table_ident_3]) + ); + } + + #[tokio::test] + async fn test_list_tables_returns_table_under_nested_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + assert_eq!( + catalog.list_tables(&namespace_ident_a_b).await.unwrap(), + vec![table_ident] + ); + } + + #[tokio::test] + async fn test_list_tables_throws_error_if_namespace_doesnt_exist() { + let catalog = new_memory_catalog(); + + let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); + + assert_eq!( + catalog + .list_tables(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ), + ); + } + + #[tokio::test] + async fn test_drop_table() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + catalog.drop_table(&table_ident).await.unwrap(); + } + + #[tokio::test] + async fn test_drop_table_drops_table_under_nested_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + catalog.drop_table(&table_ident).await.unwrap(); + + assert_eq!( + catalog.list_tables(&namespace_ident_a_b).await.unwrap(), + vec![] + ); + } + + #[tokio::test] + async fn test_drop_table_throws_error_if_namespace_doesnt_exist() { + let catalog = new_memory_catalog(); + + let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); + let non_existent_table_ident = + TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into()); + + assert_eq!( + catalog + .drop_table(&non_existent_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ), + ); + } + + #[tokio::test] + async fn test_drop_table_throws_error_if_table_doesnt_exist() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + + let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + + assert_eq!( + catalog + .drop_table(&non_existent_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such table: {:?}", + non_existent_table_ident + ), + ); + } + + #[tokio::test] + async fn test_table_exists_returns_true() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + assert!(catalog.table_exists(&table_ident).await.unwrap()); + } + + #[tokio::test] + async fn test_table_exists_returns_false() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + + assert!(!catalog + .table_exists(&non_existent_table_ident) + .await + .unwrap()); + } + + #[tokio::test] + async fn test_table_exists_under_nested_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + assert!(catalog.table_exists(&table_ident).await.unwrap()); + + let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into()); + assert!(!catalog + .table_exists(&non_existent_table_ident) + .await + .unwrap()); + } + + #[tokio::test] + async fn test_table_exists_throws_error_if_namespace_doesnt_exist() { + let catalog = new_memory_catalog(); + + let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); + let non_existent_table_ident = + TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into()); + + assert_eq!( + catalog + .table_exists(&non_existent_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ), + ); + } + + #[tokio::test] + async fn test_rename_table_in_same_namespace() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + create_table(&catalog, &src_table_ident).await; + + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(); + + assert_eq!( + catalog.list_tables(&namespace_ident).await.unwrap(), + vec![dst_table_ident], + ); + } + + #[tokio::test] + async fn test_rename_table_across_namespaces() { + let catalog = new_memory_catalog(); + let src_namespace_ident = NamespaceIdent::new("a".into()); + let dst_namespace_ident = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await; + let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into()); + create_table(&catalog, &src_table_ident).await; + + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(); + + assert_eq!( + catalog.list_tables(&src_namespace_ident).await.unwrap(), + vec![], + ); + + assert_eq!( + catalog.list_tables(&dst_namespace_ident).await.unwrap(), + vec![dst_table_ident], + ); + } + + #[tokio::test] + async fn test_rename_table_src_table_is_same_as_dst_table() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into()); + create_table(&catalog, &table_ident).await; + + catalog + .rename_table(&table_ident, &table_ident) + .await + .unwrap(); + + assert_eq!( + catalog.list_tables(&namespace_ident).await.unwrap(), + vec![table_ident], + ); + } + + #[tokio::test] + async fn test_rename_table_across_nested_namespaces() { + let catalog = new_memory_catalog(); + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + create_namespaces( + &catalog, + &vec![ + &namespace_ident_a, + &namespace_ident_a_b, + &namespace_ident_a_b_c, + ], + ) + .await; + + let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into()); + create_tables(&catalog, vec![&src_table_ident]).await; + + let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(); + + assert!(!catalog.table_exists(&src_table_ident).await.unwrap()); + + assert!(catalog.table_exists(&dst_table_ident).await.unwrap()); + } + + #[tokio::test] + async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() { + let catalog = new_memory_catalog(); + + let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into()); + let src_table_ident = + TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into()); + + let dst_namespace_ident = NamespaceIdent::new("n2".into()); + create_namespace(&catalog, &dst_namespace_ident).await; + let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into()); + + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_src_namespace_ident + ), + ); + } + + #[tokio::test] + async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() { + let catalog = new_memory_catalog(); + let src_namespace_ident = NamespaceIdent::new("n1".into()); + let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into()); + create_namespace(&catalog, &src_namespace_ident).await; + create_table(&catalog, &src_table_ident).await; + + let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into()); + let dst_table_ident = + TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into()); + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_dst_namespace_ident + ), + ); + } + + #[tokio::test] + async fn test_rename_table_throws_error_if_src_table_doesnt_exist() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap_err() + .to_string(), + format!("Unexpected => No such table: {:?}", src_table_ident), + ); + } + + #[tokio::test] + async fn test_rename_table_throws_error_if_dst_table_already_exists() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await; + + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Cannot create table {:? }. Table already exists.", + &dst_table_ident + ), + ); + } +} diff --git a/crates/catalog/inmemory/src/lib.rs b/crates/catalog/inmemory/src/lib.rs new file mode 100644 index 000000000..8988ac7b2 --- /dev/null +++ b/crates/catalog/inmemory/src/lib.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg memory Catalog API implementation. + +#![deny(missing_docs)] + +mod catalog; +mod namespace_state; + +pub use catalog::*; diff --git a/crates/catalog/inmemory/src/namespace_state.rs b/crates/catalog/inmemory/src/namespace_state.rs new file mode 100644 index 000000000..875e0c7e4 --- /dev/null +++ b/crates/catalog/inmemory/src/namespace_state.rs @@ -0,0 +1,297 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use itertools::Itertools; +use std::collections::{hash_map, HashMap}; + +// Represents the state of a namespace +#[derive(Debug, Clone, Default)] +pub(crate) struct NamespaceState { + // Properties of this namespace + properties: HashMap, + // Namespaces nested inside this namespace + namespaces: HashMap, + // Mapping of tables to metadata locations in this namespace + table_metadata_locations: HashMap, +} + +fn no_such_namespace_err(namespace_ident: &NamespaceIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!("No such namespace: {:?}", namespace_ident), + )) +} + +fn no_such_table_err(table_ident: &TableIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!("No such table: {:?}", table_ident), + )) +} + +fn namespace_already_exists_err(namespace_ident: &NamespaceIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot create namespace {:?}. Namespace already exists.", + namespace_ident + ), + )) +} + +fn table_already_exists_err(table_ident: &TableIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot create table {:?}. Table already exists.", + table_ident + ), + )) +} + +impl NamespaceState { + // Returns the state of the given namespace or an error if doesn't exist + fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<&NamespaceState> { + let mut acc_name_parts = vec![]; + let mut namespace_state = self; + for next_name in namespace_ident.iter() { + acc_name_parts.push(next_name); + match namespace_state.namespaces.get(next_name) { + None => { + let namespace_ident = NamespaceIdent::from_strs(acc_name_parts)?; + return no_such_namespace_err(&namespace_ident); + } + Some(intermediate_namespace) => { + namespace_state = intermediate_namespace; + } + } + } + + Ok(namespace_state) + } + + // Returns the state of the given namespace or an error if doesn't exist + fn get_mut_namespace( + &mut self, + namespace_ident: &NamespaceIdent, + ) -> Result<&mut NamespaceState> { + let mut acc_name_parts = vec![]; + let mut namespace_state = self; + for next_name in namespace_ident.iter() { + acc_name_parts.push(next_name); + match namespace_state.namespaces.get_mut(next_name) { + None => { + let namespace_ident = NamespaceIdent::from_strs(acc_name_parts)?; + return no_such_namespace_err(&namespace_ident); + } + Some(intermediate_namespace) => { + namespace_state = intermediate_namespace; + } + } + } + + Ok(namespace_state) + } + + // Returns the state of the parent of the given namespace or an error if doesn't exist + fn get_mut_parent_namespace_of( + &mut self, + namespace_ident: &NamespaceIdent, + ) -> Result<(&mut NamespaceState, String)> { + match namespace_ident.split_last() { + None => Err(Error::new( + ErrorKind::DataInvalid, + "Namespace identifier can't be empty!", + )), + Some((child_namespace_name, parent_name_parts)) => { + let parent_namespace_state = if parent_name_parts.is_empty() { + Ok(self) + } else { + let parent_namespace_ident = NamespaceIdent::from_strs(parent_name_parts)?; + self.get_mut_namespace(&parent_namespace_ident) + }?; + + Ok((parent_namespace_state, child_namespace_name.clone())) + } + } + } + + // Returns any top-level namespaces + pub(crate) fn list_top_level_namespaces(&self) -> Vec<&String> { + self.namespaces.keys().collect_vec() + } + + // Returns any namespaces nested under the given namespace or an error if the given namespace does not exist + pub(crate) fn list_namespaces_under( + &self, + namespace_ident: &NamespaceIdent, + ) -> Result> { + let nested_namespace_names = self + .get_namespace(namespace_ident)? + .namespaces + .keys() + .collect_vec(); + + Ok(nested_namespace_names) + } + + // Returns true if the given namespace exists, otherwise false + pub(crate) fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> bool { + self.get_namespace(namespace_ident).is_ok() + } + + // Inserts the given namespace or returns an error if it already exists + pub(crate) fn insert_new_namespace( + &mut self, + namespace_ident: &NamespaceIdent, + properties: HashMap, + ) -> Result<()> { + let (parent_namespace_state, child_namespace_name) = + self.get_mut_parent_namespace_of(namespace_ident)?; + + match parent_namespace_state + .namespaces + .entry(child_namespace_name) + { + hash_map::Entry::Occupied(_) => namespace_already_exists_err(namespace_ident), + hash_map::Entry::Vacant(entry) => { + let _ = entry.insert(NamespaceState { + properties, + namespaces: HashMap::new(), + table_metadata_locations: HashMap::new(), + }); + + Ok(()) + } + } + } + + // Removes the given namespace or returns an error if doesn't exist + pub(crate) fn remove_existing_namespace( + &mut self, + namespace_ident: &NamespaceIdent, + ) -> Result<()> { + let (parent_namespace_state, child_namespace_name) = + self.get_mut_parent_namespace_of(namespace_ident)?; + + match parent_namespace_state + .namespaces + .remove(&child_namespace_name) + { + None => no_such_namespace_err(namespace_ident), + Some(_) => Ok(()), + } + } + + // Returns the properties of the given namespace or an error if doesn't exist + pub(crate) fn get_properties( + &self, + namespace_ident: &NamespaceIdent, + ) -> Result<&HashMap> { + let properties = &self.get_namespace(namespace_ident)?.properties; + + Ok(properties) + } + + // Returns the properties of this namespace or an error if doesn't exist + fn get_mut_properties( + &mut self, + namespace_ident: &NamespaceIdent, + ) -> Result<&mut HashMap> { + let properties = &mut self.get_mut_namespace(namespace_ident)?.properties; + + Ok(properties) + } + + // Replaces the properties of the given namespace or an error if doesn't exist + pub(crate) fn replace_properties( + &mut self, + namespace_ident: &NamespaceIdent, + new_properties: HashMap, + ) -> Result<()> { + let properties = self.get_mut_properties(namespace_ident)?; + *properties = new_properties; + + Ok(()) + } + + // Returns the list of table names under the given namespace + pub(crate) fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result> { + let table_names = self + .get_namespace(namespace_ident)? + .table_metadata_locations + .keys() + .collect_vec(); + + Ok(table_names) + } + + // Returns true if the given table exists, otherwise false + pub(crate) fn table_exists(&self, table_ident: &TableIdent) -> Result { + let namespace_state = self.get_namespace(table_ident.namespace())?; + let table_exists = namespace_state + .table_metadata_locations + .contains_key(&table_ident.name); + + Ok(table_exists) + } + + // Returns the metadata location of the given table or an error if doesn't exist + pub(crate) fn get_existing_table_location(&self, table_ident: &TableIdent) -> Result<&String> { + let namespace = self.get_namespace(table_ident.namespace())?; + + match namespace.table_metadata_locations.get(table_ident.name()) { + None => no_such_table_err(table_ident), + Some(table_metadadata_location) => Ok(table_metadadata_location), + } + } + + // Inserts the given table or returns an error if it already exists + pub(crate) fn insert_new_table( + &mut self, + table_ident: &TableIdent, + metadata_location: String, + ) -> Result<()> { + let namespace = self.get_mut_namespace(table_ident.namespace())?; + + match namespace + .table_metadata_locations + .entry(table_ident.name().to_string()) + { + hash_map::Entry::Occupied(_) => table_already_exists_err(table_ident), + hash_map::Entry::Vacant(entry) => { + let _ = entry.insert(metadata_location); + + Ok(()) + } + } + } + + // Removes the given table or returns an error if doesn't exist + pub(crate) fn remove_existing_table(&mut self, table_ident: &TableIdent) -> Result<()> { + let namespace = self.get_mut_namespace(table_ident.namespace())?; + + match namespace + .table_metadata_locations + .remove(table_ident.name()) + { + None => no_such_table_err(table_ident), + Some(_) => Ok(()), + } + } +} diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 4d7a47ac7..b49a80c84 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -35,7 +35,7 @@ use uuid::Uuid; /// The catalog API for Iceberg Rust. #[async_trait] pub trait Catalog: Debug + Sync + Send { - /// List namespaces from table. + /// List namespaces inside the catalog. async fn list_namespaces(&self, parent: Option<&NamespaceIdent>) -> Result>;