Skip to content

Commit

Permalink
Merge branch 'main' into ct/feat-statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
c-thiel authored Dec 15, 2024
2 parents 71051d2 + 54926a2 commit fc26394
Show file tree
Hide file tree
Showing 11 changed files with 657 additions and 99 deletions.
17 changes: 10 additions & 7 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,10 @@ impl RestCatalog {
async fn context(&self) -> Result<&RestContext> {
self.ctx
.get_or_try_init(|| async {
let catalog_config = RestCatalog::load_config(&self.user_config).await?;
let client = HttpClient::new(&self.user_config)?;
let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
let config = self.user_config.clone().merge_with_config(catalog_config);
let client = HttpClient::new(&config)?;
let client = client.update_with(&config)?;

Ok(RestContext { config, client })
})
Expand All @@ -268,9 +269,10 @@ impl RestCatalog {
/// Load the runtime config from the server by user_config.
///
/// It's required for a rest catalog to update it's config after creation.
async fn load_config(user_config: &RestCatalogConfig) -> Result<CatalogConfig> {
let client = HttpClient::new(user_config)?;

async fn load_config(
client: &HttpClient,
user_config: &RestCatalogConfig,
) -> Result<CatalogConfig> {
let mut request = client.request(Method::GET, user_config.config_endpoint());

if let Some(warehouse_location) = &user_config.warehouse {
Expand All @@ -280,6 +282,7 @@ impl RestCatalog {
let config = client
.query::<CatalogConfig, ErrorResponse, OK>(request.build()?)
.await?;

Ok(config)
}

Expand Down Expand Up @@ -777,7 +780,7 @@ mod tests {
"expires_in": 86400
}"#,
)
.expect(2)
.expect(1)
.create_async()
.await
}
Expand Down Expand Up @@ -831,7 +834,7 @@ mod tests {
"expires_in": 86400
}"#,
)
.expect(2)
.expect(1)
.create_async()
.await;

Expand Down
85 changes: 76 additions & 9 deletions crates/catalog/rest/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl Debug for HttpClient {
}

impl HttpClient {
/// Create a new http client.
pub fn new(cfg: &RestCatalogConfig) -> Result<Self> {
Ok(HttpClient {
client: Client::new(),
Expand All @@ -66,6 +67,32 @@ impl HttpClient {
})
}

/// Update the http client with new configuration.
///
/// If cfg carries new value, we will use cfg instead.
/// Otherwise, we will keep the old value.
pub fn update_with(self, cfg: &RestCatalogConfig) -> Result<Self> {
Ok(HttpClient {
client: self.client,

token: Mutex::new(
cfg.token()
.or_else(|| self.token.into_inner().ok().flatten()),
),
token_endpoint: (!cfg.get_token_endpoint().is_empty())
.then(|| cfg.get_token_endpoint())
.unwrap_or(self.token_endpoint),
credential: cfg.credential().or(self.credential),
extra_headers: (!cfg.extra_headers()?.is_empty())
.then(|| cfg.extra_headers())
.transpose()?
.unwrap_or(self.extra_headers),
extra_oauth_params: (!cfg.extra_oauth_params().is_empty())
.then(|| cfg.extra_oauth_params())
.unwrap_or(self.extra_oauth_params),
})
}

/// This API is testing only to assert the token.
#[cfg(test)]
pub(crate) async fn token(&self) -> Option<String> {
Expand Down Expand Up @@ -134,28 +161,39 @@ impl HttpClient {
.request(Method::POST, &self.token_endpoint)
.form(&params)
.build()?;
let auth_url = auth_req.url().clone();
let auth_resp = self.client.execute(auth_req).await?;

let auth_res: TokenResponse = if auth_resp.status().as_u16() == OK {
let text = auth_resp.bytes().await?;
let text = auth_resp
.bytes()
.await
.map_err(|err| err.with_url(auth_url.clone()))?;
Ok(serde_json::from_slice(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("operation", "auth")
.with_context("url", auth_url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?)
} else {
let code = auth_resp.status();
let text = auth_resp.bytes().await?;
let text = auth_resp
.bytes()
.await
.map_err(|err| err.with_url(auth_url.clone()))?;
let e: ErrorResponse = serde_json::from_slice(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("json", String::from_utf8_lossy(&text))
.with_context("code", code.to_string())
.with_context("operation", "auth")
.with_context("url", auth_url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?;
Err(Error::from(e))
Expand Down Expand Up @@ -193,28 +231,41 @@ impl HttpClient {
) -> Result<R> {
self.authenticate(&mut request).await?;

let method = request.method().clone();
let url = request.url().clone();

let resp = self.client.execute(request).await?;

if resp.status().as_u16() == SUCCESS_CODE {
let text = resp.bytes().await?;
let text = resp
.bytes()
.await
.map_err(|err| err.with_url(url.clone()))?;
Ok(serde_json::from_slice::<R>(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("method", method.to_string())
.with_context("url", url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?)
} else {
let code = resp.status();
let text = resp.bytes().await?;
let text = resp
.bytes()
.await
.map_err(|err| err.with_url(url.clone()))?;
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("json", String::from_utf8_lossy(&text))
.with_context("code", code.to_string())
.with_context("method", method.to_string())
.with_context("url", url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?;
Err(e.into())
Expand All @@ -227,20 +278,28 @@ impl HttpClient {
) -> Result<()> {
self.authenticate(&mut request).await?;

let method = request.method().clone();
let url = request.url().clone();

let resp = self.client.execute(request).await?;

if resp.status().as_u16() == SUCCESS_CODE {
Ok(())
} else {
let code = resp.status();
let text = resp.bytes().await?;
let text = resp
.bytes()
.await
.map_err(|err| err.with_url(url.clone()))?;
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("json", String::from_utf8_lossy(&text))
.with_context("code", code.to_string())
.with_context("method", method.to_string())
.with_context("url", url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?;
Err(e.into())
Expand All @@ -255,19 +314,27 @@ impl HttpClient {
) -> Result<R> {
self.authenticate(&mut request).await?;

let method = request.method().clone();
let url = request.url().clone();

let resp = self.client.execute(request).await?;

if let Some(ret) = handler(&resp) {
Ok(ret)
} else {
let code = resp.status();
let text = resp.bytes().await?;
let text = resp
.bytes()
.await
.map_err(|err| err.with_url(url.clone()))?;
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to parse response from rest catalog server!",
)
.with_context("code", code.to_string())
.with_context("method", method.to_string())
.with_context("url", url.to_string())
.with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?;
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/expr/visitors/expression_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ mod tests {
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: vec![],
key_metadata: None,
split_offsets: vec![],
equality_ids: vec![],
sort_order_id: None,
Expand All @@ -361,7 +361,7 @@ mod tests {
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: vec![],
key_metadata: None,
split_offsets: vec![],
equality_ids: vec![],
sort_order_id: None,
Expand Down
12 changes: 6 additions & 6 deletions crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1991,7 +1991,7 @@ mod test {
nan_value_counts: Default::default(),
lower_bounds: Default::default(),
upper_bounds: Default::default(),
key_metadata: vec![],
key_metadata: None,
split_offsets: vec![],
equality_ids: vec![],
sort_order_id: None,
Expand All @@ -2012,7 +2012,7 @@ mod test {
nan_value_counts: Default::default(),
lower_bounds: Default::default(),
upper_bounds: Default::default(),
key_metadata: vec![],
key_metadata: None,
split_offsets: vec![],
equality_ids: vec![],
sort_order_id: None,
Expand Down Expand Up @@ -2069,7 +2069,7 @@ mod test {
]),

column_sizes: Default::default(),
key_metadata: vec![],
key_metadata: None,
split_offsets: vec![],
equality_ids: vec![],
sort_order_id: None,
Expand All @@ -2095,7 +2095,7 @@ mod test {
upper_bounds: HashMap::from([(3, Datum::string("dC"))]),

column_sizes: Default::default(),
key_metadata: vec![],
key_metadata: None,
split_offsets: vec![],
equality_ids: vec![],
sort_order_id: None,
Expand All @@ -2122,7 +2122,7 @@ mod test {
upper_bounds: HashMap::from([(3, Datum::string("3str3"))]),

column_sizes: Default::default(),
key_metadata: vec![],
key_metadata: None,
split_offsets: vec![],
equality_ids: vec![],
sort_order_id: None,
Expand All @@ -2149,7 +2149,7 @@ mod test {
upper_bounds: HashMap::from([(3, Datum::string("イロハニホヘト"))]),

column_sizes: Default::default(),
key_metadata: vec![],
key_metadata: None,
split_offsets: vec![],
equality_ids: vec![],
sort_order_id: None,
Expand Down
11 changes: 10 additions & 1 deletion crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,15 @@ impl ObjectCache {
.entry_by_ref(&key)
.or_try_insert_with(self.fetch_and_parse_manifest_list(snapshot, table_metadata))
.await
.map_err(|err| Error::new(ErrorKind::Unexpected, err.as_ref().message()))?
.map_err(|err| {
Arc::try_unwrap(err).unwrap_or_else(|err| {
Error::new(
ErrorKind::Unexpected,
"Failed to load manifest list in cache",
)
.with_source(err)
})
})?
.into_value();

match cache_entry {
Expand Down Expand Up @@ -278,6 +286,7 @@ mod tests {
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.key_metadata(None)
.build()
.unwrap(),
)
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,7 @@ mod tests {
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.key_metadata(None)
.build()
.unwrap(),
)
Expand Down
Loading

0 comments on commit fc26394

Please # to comment.