Skip to content

Commit fd092e0

Browse files
authored
Move filtered SMJ Full filtered join out of join_partial phase (#13369)
* Move filtered SMJ Full filtered join out of `join_partial` phase * Move filtered SMJ Full filtered join out of `join_partial` phase * Move filtered SMJ Full filtered join out of `join_partial` phase
1 parent 5467a28 commit fd092e0

File tree

3 files changed

+254
-158
lines changed

3 files changed

+254
-158
lines changed

datafusion/core/tests/fuzz_cases/join_fuzz.rs

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use datafusion::physical_plan::joins::{
4141
};
4242
use datafusion::physical_plan::memory::MemoryExec;
4343

44+
use crate::fuzz_cases::join_fuzz::JoinTestType::{HjSmj, NljHj};
4445
use datafusion::prelude::{SessionConfig, SessionContext};
4546
use test_utils::stagger_batch_with_seed;
4647

@@ -96,7 +97,7 @@ async fn test_inner_join_1k_filtered() {
9697
JoinType::Inner,
9798
Some(Box::new(col_lt_col_filter)),
9899
)
99-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
100+
.run_test(&[HjSmj, NljHj], false)
100101
.await
101102
}
102103

@@ -108,7 +109,7 @@ async fn test_inner_join_1k() {
108109
JoinType::Inner,
109110
None,
110111
)
111-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
112+
.run_test(&[HjSmj, NljHj], false)
112113
.await
113114
}
114115

@@ -120,7 +121,7 @@ async fn test_left_join_1k() {
120121
JoinType::Left,
121122
None,
122123
)
123-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
124+
.run_test(&[HjSmj, NljHj], false)
124125
.await
125126
}
126127

@@ -132,7 +133,7 @@ async fn test_left_join_1k_filtered() {
132133
JoinType::Left,
133134
Some(Box::new(col_lt_col_filter)),
134135
)
135-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
136+
.run_test(&[HjSmj, NljHj], false)
136137
.await
137138
}
138139

@@ -144,7 +145,7 @@ async fn test_right_join_1k() {
144145
JoinType::Right,
145146
None,
146147
)
147-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
148+
.run_test(&[HjSmj, NljHj], false)
148149
.await
149150
}
150151

@@ -156,7 +157,7 @@ async fn test_right_join_1k_filtered() {
156157
JoinType::Right,
157158
Some(Box::new(col_lt_col_filter)),
158159
)
159-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
160+
.run_test(&[HjSmj, NljHj], false)
160161
.await
161162
}
162163

@@ -168,21 +169,19 @@ async fn test_full_join_1k() {
168169
JoinType::Full,
169170
None,
170171
)
171-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
172+
.run_test(&[HjSmj, NljHj], false)
172173
.await
173174
}
174175

175176
#[tokio::test]
176-
// flaky for HjSmj case
177-
// https://github.com/apache/datafusion/issues/12359
178177
async fn test_full_join_1k_filtered() {
179178
JoinFuzzTestCase::new(
180179
make_staggered_batches(1000),
181180
make_staggered_batches(1000),
182181
JoinType::Full,
183182
Some(Box::new(col_lt_col_filter)),
184183
)
185-
.run_test(&[JoinTestType::NljHj], false)
184+
.run_test(&[NljHj, HjSmj], false)
186185
.await
187186
}
188187

@@ -194,7 +193,7 @@ async fn test_semi_join_1k() {
194193
JoinType::LeftSemi,
195194
None,
196195
)
197-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
196+
.run_test(&[HjSmj, NljHj], false)
198197
.await
199198
}
200199

@@ -206,7 +205,7 @@ async fn test_semi_join_1k_filtered() {
206205
JoinType::LeftSemi,
207206
Some(Box::new(col_lt_col_filter)),
208207
)
209-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
208+
.run_test(&[HjSmj, NljHj], false)
210209
.await
211210
}
212211

@@ -218,7 +217,7 @@ async fn test_anti_join_1k() {
218217
JoinType::LeftAnti,
219218
None,
220219
)
221-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
220+
.run_test(&[HjSmj, NljHj], false)
222221
.await
223222
}
224223

@@ -230,7 +229,7 @@ async fn test_anti_join_1k_filtered() {
230229
JoinType::LeftAnti,
231230
Some(Box::new(col_lt_col_filter)),
232231
)
233-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
232+
.run_test(&[HjSmj, NljHj], false)
234233
.await
235234
}
236235

@@ -242,7 +241,7 @@ async fn test_left_mark_join_1k() {
242241
JoinType::LeftMark,
243242
None,
244243
)
245-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
244+
.run_test(&[HjSmj, NljHj], false)
246245
.await
247246
}
248247

@@ -254,7 +253,7 @@ async fn test_left_mark_join_1k_filtered() {
254253
JoinType::LeftMark,
255254
Some(Box::new(col_lt_col_filter)),
256255
)
257-
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
256+
.run_test(&[HjSmj, NljHj], false)
258257
.await
259258
}
260259

@@ -512,8 +511,8 @@ impl JoinFuzzTestCase {
512511
nlj_formatted_sorted.sort_unstable();
513512

514513
if debug
515-
&& ((join_tests.contains(&JoinTestType::NljHj) && nlj_rows != hj_rows)
516-
|| (join_tests.contains(&JoinTestType::HjSmj) && smj_rows != hj_rows))
514+
&& ((join_tests.contains(&NljHj) && nlj_rows != hj_rows)
515+
|| (join_tests.contains(&HjSmj) && smj_rows != hj_rows))
517516
{
518517
let fuzz_debug = "fuzz_test_debug";
519518
std::fs::remove_dir_all(fuzz_debug).unwrap_or(());
@@ -533,7 +532,7 @@ impl JoinFuzzTestCase {
533532
"input2",
534533
);
535534

536-
if join_tests.contains(&JoinTestType::NljHj) && nlj_rows != hj_rows {
535+
if join_tests.contains(&NljHj) && nlj_rows != hj_rows {
537536
println!("=============== HashJoinExec ==================");
538537
hj_formatted_sorted.iter().for_each(|s| println!("{}", s));
539538
println!("=============== NestedLoopJoinExec ==================");
@@ -551,7 +550,7 @@ impl JoinFuzzTestCase {
551550
);
552551
}
553552

554-
if join_tests.contains(&JoinTestType::HjSmj) && smj_rows != hj_rows {
553+
if join_tests.contains(&HjSmj) && smj_rows != hj_rows {
555554
println!("=============== HashJoinExec ==================");
556555
hj_formatted_sorted.iter().for_each(|s| println!("{}", s));
557556
println!("=============== SortMergeJoinExec ==================");
@@ -570,7 +569,7 @@ impl JoinFuzzTestCase {
570569
}
571570
}
572571

573-
if join_tests.contains(&JoinTestType::NljHj) {
572+
if join_tests.contains(&NljHj) {
574573
let err_msg_rowcnt = format!("NestedLoopJoinExec and HashJoinExec produced different row counts, batch_size: {}", batch_size);
575574
assert_eq!(nlj_rows, hj_rows, "{}", err_msg_rowcnt.as_str());
576575

@@ -591,7 +590,7 @@ impl JoinFuzzTestCase {
591590
}
592591
}
593592

594-
if join_tests.contains(&JoinTestType::HjSmj) {
593+
if join_tests.contains(&HjSmj) {
595594
let err_msg_row_cnt = format!("HashJoinExec and SortMergeJoinExec produced different row counts, batch_size: {}", &batch_size);
596595
assert_eq!(hj_rows, smj_rows, "{}", err_msg_row_cnt.as_str());
597596

0 commit comments

Comments
 (0)