Skip to content

Implement TPCH substrait integration teset, support tpch_3 #11298

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 48 additions & 7 deletions datafusion/substrait/tests/cases/consumer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ mod tests {
.await
}

async fn create_context_tpch1() -> Result<SessionContext> {
let ctx = SessionContext::new();
register_csv(
&ctx,
"FILENAME_PLACEHOLDER_0",
"tests/testdata/tpch/lineitem.csv",
)
.await?;
Ok(ctx)
}

async fn create_context_tpch2() -> Result<SessionContext> {
let ctx = SessionContext::new();

Expand All @@ -63,14 +74,19 @@ mod tests {
Ok(ctx)
}

async fn create_context_tpch1() -> Result<SessionContext> {
async fn create_context_tpch3() -> Result<SessionContext> {
let ctx = SessionContext::new();
register_csv(
&ctx,
"FILENAME_PLACEHOLDER_0",
"tests/testdata/tpch/lineitem.csv",
)
.await?;

let registrations = vec![
("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/customer.csv"),
("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/orders.csv"),
("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/lineitem.csv"),
];

for (table_name, file_path) in registrations {
register_csv(&ctx, table_name, file_path).await?;
}

Ok(ctx)
}

Expand Down Expand Up @@ -139,4 +155,29 @@ mod tests {
);
Ok(())
}

#[tokio::test]
async fn tpch_test_3() -> Result<()> {
let ctx = create_context_tpch3().await?;
let path = "tests/testdata/tpch_substrait_plans/query_3.json";
let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
File::open(path).expect("file not found"),
))
.expect("failed to parse json");

let plan = from_substrait_plan(&ctx, &proto).await?;
let plan_str = format!("{:?}", plan);
assert_eq!(plan_str, "Projection: FILENAME_PLACEHOLDER_2.l_orderkey AS L_ORDERKEY, sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_2.l_discount) AS REVENUE, FILENAME_PLACEHOLDER_1.o_orderdate AS O_ORDERDATE, FILENAME_PLACEHOLDER_1.o_shippriority AS O_SHIPPRIORITY\
\n Limit: skip=0, fetch=10\
\n Sort: sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_2.l_discount) DESC NULLS FIRST, FILENAME_PLACEHOLDER_1.o_orderdate ASC NULLS LAST\
\n Projection: FILENAME_PLACEHOLDER_2.l_orderkey, sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_2.l_discount), FILENAME_PLACEHOLDER_1.o_orderdate, FILENAME_PLACEHOLDER_1.o_shippriority\
\n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_2.l_orderkey, FILENAME_PLACEHOLDER_1.o_orderdate, FILENAME_PLACEHOLDER_1.o_shippriority]], aggr=[[sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_2.l_discount)]]\
\n Projection: FILENAME_PLACEHOLDER_2.l_orderkey, FILENAME_PLACEHOLDER_1.o_orderdate, FILENAME_PLACEHOLDER_1.o_shippriority, FILENAME_PLACEHOLDER_2.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_2.l_discount)\
\n Filter: FILENAME_PLACEHOLDER_0.c_mktsegment = CAST(Utf8(\"HOUSEHOLD\") AS Utf8) AND FILENAME_PLACEHOLDER_0.c_custkey = FILENAME_PLACEHOLDER_1.o_custkey AND FILENAME_PLACEHOLDER_2.l_orderkey = FILENAME_PLACEHOLDER_1.o_orderkey AND FILENAME_PLACEHOLDER_1.o_orderdate < Date32(\"1995-03-25\") AND FILENAME_PLACEHOLDER_2.l_shipdate > Date32(\"1995-03-25\")\
\n Inner Join: Filter: Boolean(true)\
\n Inner Join: Filter: Boolean(true)\
\n TableScan: FILENAME_PLACEHOLDER_0 projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment]\
\n TableScan: FILENAME_PLACEHOLDER_1 projection=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment]\n TableScan: FILENAME_PLACEHOLDER_2 projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]");
Ok(())
}
}
2 changes: 2 additions & 0 deletions datafusion/substrait/tests/testdata/tpch/customer.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment
1,Customer#000000001,Address1,1,123-456-7890,5000.00,BUILDING,No comment
2 changes: 2 additions & 0 deletions datafusion/substrait/tests/testdata/tpch/orders.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment
1,1,O,1000.00,2023-01-01,5-LOW,Clerk#000000001,0,No comment
Loading