Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into ct/feat-statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
c-thiel committed Dec 16, 2024
2 parents 71051d2 + 97f8a79 commit f1c166c
Show file tree
Hide file tree
Showing 20 changed files with 1,087 additions and 573 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci_typos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Check typos
uses: crate-ci/typos@v1.28.2
uses: crate-ci/typos@v1.28.3
6 changes: 3 additions & 3 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ mod tests {
use std::iter::FromIterator;

use iceberg::io::FileIOBuilder;
use iceberg::spec::{BoundPartitionSpec, NestedField, PrimitiveType, Schema, SortOrder, Type};
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
use regex::Regex;
use tempfile::TempDir;

Expand Down Expand Up @@ -357,7 +357,7 @@ mod tests {

assert_eq!(metadata.current_schema().as_ref(), expected_schema);

let expected_partition_spec = BoundPartitionSpec::builder((*expected_schema).clone())
let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
.with_spec_id(0)
.build()
.unwrap();
Expand All @@ -367,7 +367,7 @@ mod tests {
.partition_specs_iter()
.map(|p| p.as_ref())
.collect_vec(),
vec![&expected_partition_spec.into_schemaless()]
vec![&expected_partition_spec]
);

let expected_sorted_order = SortOrder::builder()
Expand Down
17 changes: 10 additions & 7 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,10 @@ impl RestCatalog {
async fn context(&self) -> Result<&RestContext> {
self.ctx
.get_or_try_init(|| async {
let catalog_config = RestCatalog::load_config(&self.user_config).await?;
let client = HttpClient::new(&self.user_config)?;
let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
let config = self.user_config.clone().merge_with_config(catalog_config);
let client = HttpClient::new(&config)?;
let client = client.update_with(&config)?;

Ok(RestContext { config, client })
})
Expand All @@ -268,9 +269,10 @@ impl RestCatalog {
/// Load the runtime config from the server by user_config.
///
/// It's required for a rest catalog to update it's config after creation.
async fn load_config(user_config: &RestCatalogConfig) -> Result<CatalogConfig> {
let client = HttpClient::new(user_config)?;

async fn load_config(
client: &HttpClient,
user_config: &RestCatalogConfig,
) -> Result<CatalogConfig> {
let mut request = client.request(Method::GET, user_config.config_endpoint());

if let Some(warehouse_location) = &user_config.warehouse {
Expand All @@ -280,6 +282,7 @@ impl RestCatalog {
let config = client
.query::<CatalogConfig, ErrorResponse, OK>(request.build()?)
.await?;

Ok(config)
}

Expand Down Expand Up @@ -777,7 +780,7 @@ mod tests {
"expires_in": 86400
}"#,
)
.expect(2)
.expect(1)
.create_async()
.await
}
Expand Down Expand Up @@ -831,7 +834,7 @@ mod tests {
"expires_in": 86400
}"#,
)
.expect(2)
.expect(1)
.create_async()
.await;

Expand Down
85 changes: 76 additions & 9 deletions crates/catalog/rest/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl Debug for HttpClient {
}

impl HttpClient {
/// Create a new http client.
pub fn new(cfg: &RestCatalogConfig) -> Result<Self> {
Ok(HttpClient {
client: Client::new(),
Expand All @@ -66,6 +67,32 @@ impl HttpClient {
})
}

/// Update the http client with new configuration.
///
/// If cfg carries new value, we will use cfg instead.
/// Otherwise, we will keep the old value.
pub fn update_with(self, cfg: &RestCatalogConfig) -> Result<Self> {
Ok(HttpClient {
client: self.client,

token: Mutex::new(
cfg.token()
.or_else(|| self.token.into_inner().ok().flatten()),
),
token_endpoint: (!cfg.get_token_endpoint().is_empty())
.then(|| cfg.get_token_endpoint())
.unwrap_or(self.token_endpoint),
credential: cfg.credential().or(self.credential),
extra_headers: (!cfg.extra_headers()?.is_empty())
.then(|| cfg.extra_headers())
.transpose()?
.unwrap_or(self.extra_headers),
extra_oauth_params: (!cfg.extra_oauth_params().is_empty())
.then(|| cfg.extra_oauth_params())
.unwrap_or(self.extra_oauth_params),
})
}

/// This API is testing only to assert the token.
#[cfg(test)]
pub(crate) async fn token(&self) -> Option<String> {
Expand Down Expand Up @@ -134,28 +161,39 @@ impl HttpClient {
.request(Method::POST, &self.token_endpoint)
.form(&params)
.build()?;
let auth_url = auth_req.url().clone();
let auth_resp = self.client.execute(auth_req).await?;

let auth_res: TokenResponse = if auth_resp.status().as_u16() == OK {
let text = auth_resp.bytes().await?;
let text = auth_resp
.bytes()
.await
.map_err(|err| err.with_url(auth_url.clone()))?;
Ok(serde_json::from_slice(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("operation", "auth")
.with_context("url", auth_url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?)
} else {
let code = auth_resp.status();
let text = auth_resp.bytes().await?;
let text = auth_resp
.bytes()
.await
.map_err(|err| err.with_url(auth_url.clone()))?;
let e: ErrorResponse = serde_json::from_slice(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("json", String::from_utf8_lossy(&text))
.with_context("code", code.to_string())
.with_context("operation", "auth")
.with_context("url", auth_url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?;
Err(Error::from(e))
Expand Down Expand Up @@ -193,28 +231,41 @@ impl HttpClient {
) -> Result<R> {
self.authenticate(&mut request).await?;

let method = request.method().clone();
let url = request.url().clone();

let resp = self.client.execute(request).await?;

if resp.status().as_u16() == SUCCESS_CODE {
let text = resp.bytes().await?;
let text = resp
.bytes()
.await
.map_err(|err| err.with_url(url.clone()))?;
Ok(serde_json::from_slice::<R>(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("method", method.to_string())
.with_context("url", url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?)
} else {
let code = resp.status();
let text = resp.bytes().await?;
let text = resp
.bytes()
.await
.map_err(|err| err.with_url(url.clone()))?;
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("json", String::from_utf8_lossy(&text))
.with_context("code", code.to_string())
.with_context("method", method.to_string())
.with_context("url", url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?;
Err(e.into())
Expand All @@ -227,20 +278,28 @@ impl HttpClient {
) -> Result<()> {
self.authenticate(&mut request).await?;

let method = request.method().clone();
let url = request.url().clone();

let resp = self.client.execute(request).await?;

if resp.status().as_u16() == SUCCESS_CODE {
Ok(())
} else {
let code = resp.status();
let text = resp.bytes().await?;
let text = resp
.bytes()
.await
.map_err(|err| err.with_url(url.clone()))?;
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("json", String::from_utf8_lossy(&text))
.with_context("code", code.to_string())
.with_context("method", method.to_string())
.with_context("url", url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?;
Err(e.into())
Expand All @@ -255,19 +314,27 @@ impl HttpClient {
) -> Result<R> {
self.authenticate(&mut request).await?;

let method = request.method().clone();
let url = request.url().clone();

let resp = self.client.execute(request).await?;

if let Some(ret) = handler(&resp) {
Ok(ret)
} else {
let code = resp.status();
let text = resp.bytes().await?;
let text = resp
.bytes()
.await
.map_err(|err| err.with_url(url.clone()))?;
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("code", code.to_string())
.with_context("method", method.to_string())
.with_context("url", url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?;
Expand Down
7 changes: 3 additions & 4 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ mod tests {
use std::hash::Hash;

use iceberg::io::FileIOBuilder;
use iceberg::spec::{BoundPartitionSpec, NestedField, PrimitiveType, Schema, SortOrder, Type};
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
use iceberg::table::Table;
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
use itertools::Itertools;
Expand Down Expand Up @@ -876,11 +876,10 @@ mod tests {

assert_eq!(metadata.current_schema().as_ref(), expected_schema);

let expected_partition_spec = BoundPartitionSpec::builder(expected_schema.clone())
let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
.with_spec_id(0)
.build()
.unwrap()
.into_schemaless();
.unwrap();

assert_eq!(
metadata
Expand Down
Loading

0 comments on commit f1c166c

Please # to comment.