Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fixed config template rendering for ops defined in triggers #427

Merged
merged 2 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ crates/wick-rpc/src/generated/wick.rs
crates/**/src/generated/mod.rs
/crates/integration/*/build
docs/static/rustdoc/
/dump

# Test & scratch files
.wick-test.sqlite3.db
Expand Down
6 changes: 3 additions & 3 deletions crates/components/wick-http-client/src/component.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use flow_component::{BoxFuture, Component, ComponentError, RuntimeCallback};
use flow_component::{BoxFuture, Component, ComponentError, IntoComponentResult, RuntimeCallback};
use futures::{Stream, StreamExt, TryStreamExt};
use reqwest::header::CONTENT_TYPE;
use reqwest::{ClientBuilder, Method, Request, RequestBuilder};
Expand Down Expand Up @@ -44,8 +44,8 @@ impl HttpClientComponent {
validate(&config, resolver)?;
let addr: UrlResource = resolver(config.resource())
.and_then(|r| r.try_resource())
.map_err(ComponentError::new)?
.into();
.and_then(|r| r.try_url())
.into_component_error()?;

let mut sig = ComponentSignature::new("wick/component/http");
sig.metadata.version = metadata.map(|v| v.version().to_owned());
Expand Down
2 changes: 1 addition & 1 deletion crates/components/wick-sql/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub(crate) fn gen_signature(
pub(crate) fn convert_url_resource(resolver: &Resolver, id: &str) -> Result<Url> {
let addr = resolver(id).and_then(|r| r.try_resource())?;

let resource: UrlResource = addr.into();
let resource: UrlResource = addr.try_into()?;
resource.url().value().cloned().ok_or(Error::InvalidResourceConfig)
}

Expand Down
38 changes: 20 additions & 18 deletions crates/components/wick-sql/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use once_cell::sync::Lazy;
use regex::{Captures, Regex};
use tracing::Span;
use url::Url;
use wick_config::config::components::{ComponentConfig, OperationConfig, SqlComponentConfig, SqlOperationKind};
use wick_config::config::components::{ComponentConfig, OperationConfig, SqlComponentConfig, SqlOperationDefinition};
use wick_config::config::{ErrorBehavior, Metadata};
use wick_config::Resolver;
use wick_interface_types::{ComponentSignature, Field, OperationSignatures, Type};
Expand Down Expand Up @@ -228,7 +228,7 @@ fn validate(config: &SqlComponentConfig, _resolver: &Resolver) -> Result<(), Err

async fn handle_call<'a, 'b, 'c>(
connection: &'a mut Connection<'c>,
opdef: SqlOperationKind,
opdef: SqlOperationDefinition,
input_streams: Vec<PacketStream>,
tx: PacketSender,
stmt: &'b str,
Expand All @@ -255,7 +255,7 @@ where

async fn handle_stream<'a, 'b, 'c>(
connection: &'a mut Connection<'c>,
opdef: SqlOperationKind,
opdef: SqlOperationDefinition,
mut input_streams: Vec<PacketStream>,
tx: PacketSender,
stmt: &'b str,
Expand Down Expand Up @@ -304,10 +304,12 @@ where

let start = SystemTime::now();
let result = match &opdef {
SqlOperationKind::Query(_) => {
SqlOperationDefinition::Query(_) => {
query(connection, tx.clone(), opdef.clone(), type_wrappers, stmt, span.clone()).await
}
SqlOperationKind::Exec(_) => exec(connection, tx.clone(), opdef.clone(), type_wrappers, stmt, span.clone()).await,
SqlOperationDefinition::Exec(_) => {
exec(connection, tx.clone(), opdef.clone(), type_wrappers, stmt, span.clone()).await
}
};
let duration = SystemTime::now().duration_since(start).unwrap();

Expand All @@ -331,7 +333,7 @@ where
async fn query<'a, 'b, 'c>(
client: &'a mut Connection<'c>,
tx: PacketSender,
def: SqlOperationKind,
def: SqlOperationDefinition,
args: Vec<(Type, Packet)>,
stmt: &'b str,
_span: Span,
Expand Down Expand Up @@ -360,7 +362,7 @@ where
async fn exec<'a, 'b, 'c>(
connection: &'a mut Connection<'c>,
tx: PacketSender,
def: SqlOperationKind,
def: SqlOperationDefinition,
args: Vec<(Type, Packet)>,
stmt: &'b str,
_span: Span,
Expand All @@ -386,10 +388,10 @@ where
static POSITIONAL_ARGS: Lazy<Regex> = Lazy::new(|| Regex::new(r"\$(?<id>\d+)\b").unwrap());
static WICK_ID_ARGS: Lazy<Regex> = Lazy::new(|| Regex::new(r"\$\{(?<id>\w+)\}").unwrap());

fn normalize_operations(ops: &mut Vec<SqlOperationKind>, db: DbKind) {
fn normalize_operations(ops: &mut Vec<SqlOperationDefinition>, db: DbKind) {
for operations in ops {
match operations {
wick_config::config::components::SqlOperationKind::Query(ref mut op) => {
wick_config::config::components::SqlOperationDefinition::Query(ref mut op) => {
let (mut query, args) = normalize_inline_ids(op.query(), op.arguments().to_vec());
if db == DbKind::Mssql {
query = normalize_mssql_query(query);
Expand All @@ -398,7 +400,7 @@ fn normalize_operations(ops: &mut Vec<SqlOperationKind>, db: DbKind) {
op.set_query(query);
op.set_arguments(args);
}
wick_config::config::components::SqlOperationKind::Exec(ref mut op) => {
wick_config::config::components::SqlOperationDefinition::Exec(ref mut op) => {
let (mut query, args) = normalize_inline_ids(op.exec(), op.arguments().to_vec());
if db == DbKind::Mssql {
query = normalize_mssql_query(query);
Expand Down Expand Up @@ -500,8 +502,8 @@ mod integration_test {
use wick_config::config::components::{
ComponentConfig,
SqlComponentConfigBuilder,
SqlOperationDefinitionBuilder,
SqlOperationKind,
SqlOperationDefinition,
SqlQueryOperationDefinitionBuilder,
};
use wick_config::config::ResourceDefinition;
use wick_interface_types::{Field, Type};
Expand All @@ -522,7 +524,7 @@ mod integration_test {
.tls(false)
.build()
.unwrap();
let op = SqlOperationDefinitionBuilder::default()
let op = SqlQueryOperationDefinitionBuilder::default()
.name("test")
.query("select id,name from users where id=$1;")
.inputs([Field::new("input", Type::I32)])
Expand All @@ -531,7 +533,7 @@ mod integration_test {
.build()
.unwrap();

config.operations_mut().push(SqlOperationKind::Query(op));
config.operations_mut().push(SqlOperationDefinition::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource(
"db",
Expand Down Expand Up @@ -578,7 +580,7 @@ mod integration_test {
.tls(false)
.build()
.unwrap();
let op = SqlOperationDefinitionBuilder::default()
let op = SqlQueryOperationDefinitionBuilder::default()
.name("test")
.query("select id,name from users where id=$1;")
.inputs([Field::new("input", Type::I32)])
Expand All @@ -587,7 +589,7 @@ mod integration_test {
.build()
.unwrap();

config.operations_mut().push(SqlOperationKind::Query(op));
config.operations_mut().push(SqlOperationDefinition::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource(
"db",
Expand Down Expand Up @@ -629,7 +631,7 @@ mod integration_test {
.tls(false)
.build()
.unwrap();
let op = SqlOperationDefinitionBuilder::default()
let op = SqlQueryOperationDefinitionBuilder::default()
.name("test")
.query("select id,name from users where id=$1;")
.inputs([Field::new("input", Type::I32)])
Expand All @@ -638,7 +640,7 @@ mod integration_test {
.build()
.unwrap();

config.operations_mut().push(SqlOperationKind::Query(op));
config.operations_mut().push(SqlOperationDefinition::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource(
"db",
Expand Down
10 changes: 7 additions & 3 deletions crates/components/wick-sql/src/mssql_tiberius/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ fn row_to_json(row: &Row) -> Value {
#[cfg(test)]
mod test {
use anyhow::Result;
use wick_config::config::components::{SqlComponentConfigBuilder, SqlOperationDefinitionBuilder, SqlOperationKind};
use wick_config::config::components::{
SqlComponentConfigBuilder,
SqlOperationDefinition,
SqlQueryOperationDefinitionBuilder,
};
use wick_config::config::{ResourceDefinition, TcpPort};
use wick_interface_types::{Field, Type};

Expand All @@ -234,7 +238,7 @@ mod test {
.tls(false)
.build()
.unwrap();
let op = SqlOperationDefinitionBuilder::default()
let op = SqlQueryOperationDefinitionBuilder::default()
.name("test")
.query("select * from users where user_id = $1;")
.inputs([Field::new("input", Type::I32)])
Expand All @@ -243,7 +247,7 @@ mod test {
.build()
.unwrap();

config.operations_mut().push(SqlOperationKind::Query(op));
config.operations_mut().push(SqlOperationDefinition::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource("db", ResourceDefinition::TcpPort(TcpPort::new("0.0.0.0", 11111)));

Expand Down
10 changes: 7 additions & 3 deletions crates/components/wick-sql/src/sqlx/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,11 @@ async fn init_context(config: &SqlComponentConfig, addr: &Url) -> Result<Context
#[cfg(test)]
mod test {
use anyhow::Result;
use wick_config::config::components::{SqlComponentConfigBuilder, SqlOperationDefinitionBuilder, SqlOperationKind};
use wick_config::config::components::{
SqlComponentConfigBuilder,
SqlOperationDefinition,
SqlQueryOperationDefinitionBuilder,
};
use wick_config::config::{ResourceDefinition, TcpPort};
use wick_interface_types::{Field, Type};

Expand All @@ -240,7 +244,7 @@ mod test {
.tls(false)
.build()
.unwrap();
let op = SqlOperationDefinitionBuilder::default()
let op = SqlQueryOperationDefinitionBuilder::default()
.name("test")
.query("select * from users where user_id = $1;")
.inputs([Field::new("input", Type::I32)])
Expand All @@ -249,7 +253,7 @@ mod test {
.build()
.unwrap();

config.operations_mut().push(SqlOperationKind::Query(op));
config.operations_mut().push(SqlOperationDefinition::Query(op));
let mut app_config = wick_config::config::AppConfiguration::default();
app_config.add_resource("db", ResourceDefinition::TcpPort(TcpPort::new("0.0.0.0", 11111)));

Expand Down
19 changes: 19 additions & 0 deletions crates/wick/flow-component/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub use serde_json::Value;
pub struct ComponentError {
source: Box<dyn std::error::Error + Send + Sync>,
}

impl std::error::Error for ComponentError {}
impl std::fmt::Display for ComponentError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down Expand Up @@ -142,6 +143,24 @@ impl From<anyhow::Error> for ComponentError {
}
}

/// Trait that allows for conversion of a result into a component error.
pub trait IntoComponentResult<T, E>
where
E: std::error::Error + Send + Sync + 'static,
{
/// Convert a Result<T,E> into a Result<T, ComponentError>.
fn into_component_error(self) -> Result<T, ComponentError>;
}

impl<T, E> IntoComponentResult<T, E> for Result<T, E>
where
E: std::error::Error + Send + Sync + 'static,
{
fn into_component_error(self) -> Result<T, ComponentError> {
self.map_err(ComponentError::new)
}
}

#[derive(Debug)]
struct GenericError(String);
impl std::error::Error for GenericError {}
Expand Down
91 changes: 81 additions & 10 deletions crates/wick/wick-config/src/config/app_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,27 +214,25 @@ impl AppConfiguration {
pub(super) fn initialize(&mut self) -> Result<&Self> {
// This pre-renders the application config's resources with access to the environment
// so they're resulting value is intuitively based on where it was initially defined.
let root_config = self.root_config.clone();
let root_config = self.root_config.as_ref();
trace!(
num_resources = self.resources.len(),
num_imports = self.import.len(),
?root_config,
"initializing app resources"
);
let env = self.env.clone();
let env = self.env.as_ref();

let mut bindings = Vec::new();
for (i, trigger) in self.triggers.iter_mut().enumerate() {
trigger.expand_imports(&mut bindings, i)?;
}
self.import.extend(bindings);

for resource in self.resources.iter_mut() {
resource.kind.render_config(root_config.as_ref(), env.as_ref())?;
}
for import in self.import.iter_mut() {
import.kind.render_config(root_config.as_ref(), env.as_ref())?;
}
self.resources.render_config(root_config, env)?;
self.import.render_config(root_config, env)?;
self.triggers.render_config(root_config, env)?;

Ok(self)
}

Expand Down Expand Up @@ -334,9 +332,82 @@ impl OwnedConfigurationItem {

impl AppConfigurationBuilder {
/// Build the configuration.
pub fn build(self) -> Result<AppConfiguration> {
let config = self.build_internal()?;
pub fn build(&self) -> Result<AppConfiguration> {
let config = self.clone();
let config = config.build_internal()?;
config.validate()?;
Ok(config)
}
}

#[cfg(test)]
mod test {
use anyhow::Result;
use serde_json::json;

use super::*;
use crate::config::components::ManifestComponentBuilder;
use crate::config::{Codec, ComponentOperationExpressionBuilder, LiquidJsonConfig, MiddlewareBuilder};

#[test]
fn test_trigger_render() -> Result<()> {
let op = ComponentOperationExpressionBuilder::default()
.component(ComponentDefinition::Manifest(
ManifestComponentBuilder::default()
.reference("this/that:0.0.1")
.build()?,
))
.config(LiquidJsonConfig::try_from(
json!({"op_config_field": "{{ctx.env.CARGO_MANIFEST_DIR}}"}),
)?)
.name("test")
.build()?;
let trigger = HttpTriggerConfigBuilder::default()
.resource("URL")
.routers(vec![HttpRouterConfig::RawRouter(RawRouterConfig {
path: "/".to_owned(),
middleware: Some(
MiddlewareBuilder::default()
.request(vec![op.clone()])
.response(vec![op.clone()])
.build()?,
),
codec: Some(Codec::Json),
operation: op,
})])
.build()?;
let mut config = AppConfigurationBuilder::default()
.name("test")
.resources(vec![ResourceBinding::new("PORT", TcpPort::new("0.0.0.0", 90))])
.triggers(vec![TriggerDefinition::Http(trigger)])
.build()?;

config.env = Some(std::env::vars().collect());
config.root_config = Some(json!({"config_val": "from_config"}).try_into()?);

config.initialize()?;

let TriggerDefinition::Http(mut trigger) = config.triggers.pop().unwrap() else {
unreachable!();
};

let HttpRouterConfig::RawRouter(mut router) = trigger.routers.pop().unwrap() else {
unreachable!();
};

let cargo_manifest_dir = json!(env!("CARGO_MANIFEST_DIR"));

let config = router.operation.config.take().unwrap().value.unwrap();
assert_eq!(config.get("op_config_field"), Some(&cargo_manifest_dir));

let mut mw = router.middleware.take().unwrap();

let mw_req_config = mw.request[0].config.take().unwrap().value.unwrap();
assert_eq!(mw_req_config.get("op_config_field"), Some(&cargo_manifest_dir));

let mw_res_config = mw.response[0].config.take().unwrap().value.unwrap();
assert_eq!(mw_res_config.get("op_config_field"), Some(&cargo_manifest_dir));

Ok(())
}
}
Loading