Skip to content

Commit 44476d1

Browse files
committed
rewrite array_append to remove deplicate codes
Signed-off-by: veeupup <code@tanweime.com>
1 parent 4512805 commit 44476d1

File tree

2 files changed

+45
-69
lines changed

2 files changed

+45
-69
lines changed

datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1536,12 +1536,10 @@ mod test {
15361536
.unwrap()
15371537
.resolve(&schema)
15381538
.unwrap();
1539-
let r4 = apache_avro::to_value(serde_json::json!({
1540-
"col1": null
1541-
}))
1542-
.unwrap()
1543-
.resolve(&schema)
1544-
.unwrap();
1539+
let r4 = apache_avro::to_value(serde_json::json!({ "col1": null }))
1540+
.unwrap()
1541+
.resolve(&schema)
1542+
.unwrap();
15451543

15461544
let mut w = apache_avro::Writer::new(&schema, vec![]);
15471545
w.append(r1).unwrap();
@@ -1600,12 +1598,10 @@ mod test {
16001598
}"#,
16011599
)
16021600
.unwrap();
1603-
let r1 = apache_avro::to_value(serde_json::json!({
1604-
"col1": null
1605-
}))
1606-
.unwrap()
1607-
.resolve(&schema)
1608-
.unwrap();
1601+
let r1 = apache_avro::to_value(serde_json::json!({ "col1": null }))
1602+
.unwrap()
1603+
.resolve(&schema)
1604+
.unwrap();
16091605
let r2 = apache_avro::to_value(serde_json::json!({
16101606
"col1": {
16111607
"col2": "hello"

datafusion/physical-expr/src/array_expressions.rs

Lines changed: 37 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use arrow::array::*;
2424
use arrow::buffer::OffsetBuffer;
2525
use arrow::compute;
2626
use arrow::datatypes::{DataType, Field, UInt64Type};
27+
use arrow::row::{RowConverter, SortField};
2728
use arrow_buffer::NullBuffer;
2829

2930
use datafusion_common::cast::{
@@ -577,58 +578,6 @@ pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
577578
)
578579
}
579580

580-
macro_rules! append {
581-
($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{
582-
let mut offsets: Vec<i32> = vec![0];
583-
let mut values =
584-
downcast_arg!(new_empty_array($ELEMENT.data_type()), $ARRAY_TYPE).clone();
585-
586-
let element = downcast_arg!($ELEMENT, $ARRAY_TYPE);
587-
for (arr, el) in $ARRAY.iter().zip(element.iter()) {
588-
let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
589-
DataFusionError::Internal(format!("offsets should not be empty"))
590-
})?;
591-
match arr {
592-
Some(arr) => {
593-
let child_array = downcast_arg!(arr, $ARRAY_TYPE);
594-
values = downcast_arg!(
595-
compute::concat(&[
596-
&values,
597-
child_array,
598-
&$ARRAY_TYPE::from(vec![el])
599-
])?
600-
.clone(),
601-
$ARRAY_TYPE
602-
)
603-
.clone();
604-
offsets.push(last_offset + child_array.len() as i32 + 1i32);
605-
}
606-
None => {
607-
values = downcast_arg!(
608-
compute::concat(&[
609-
&values,
610-
&$ARRAY_TYPE::from(vec![el.clone()])
611-
])?
612-
.clone(),
613-
$ARRAY_TYPE
614-
)
615-
.clone();
616-
offsets.push(last_offset + 1i32);
617-
}
618-
}
619-
}
620-
621-
let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), true));
622-
623-
Arc::new(ListArray::try_new(
624-
field,
625-
OffsetBuffer::new(offsets.into()),
626-
Arc::new(values),
627-
None,
628-
)?)
629-
}};
630-
}
631-
632581
/// Array_append SQL function
633582
pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
634583
let arr = as_list_array(&args[0])?;
@@ -638,13 +587,44 @@ pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
638587
let res = match arr.value_type() {
639588
DataType::List(_) => concat_internal(args)?,
640589
DataType::Null => return make_array(&[element.to_owned()]),
641-
data_type => {
642-
macro_rules! array_function {
643-
($ARRAY_TYPE:ident) => {
644-
append!(arr, element, $ARRAY_TYPE)
590+
dt => {
591+
let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
592+
let r_rows = converter.convert_columns(&[element.clone()])?;
593+
let mut offsets = vec![0];
594+
let mut new_arrays = vec![];
595+
for (i, arr) in arr.iter().enumerate() {
596+
let r_row = r_rows.row(i);
597+
let rows = if let Some(arr) = arr {
598+
let mut l_rows = converter.convert_columns(&[arr])?;
599+
l_rows.push(r_row);
600+
l_rows
601+
} else {
602+
let mut rows = converter.empty_rows(1, 1);
603+
rows.push(r_row);
604+
rows
645605
};
606+
let last_offset: i32 = match offsets.last().copied() {
607+
Some(offset) => offset,
608+
None => return internal_err!("offsets should not be empty"),
609+
};
610+
offsets.push(last_offset + rows.num_rows() as i32);
611+
let arrays = converter.convert_rows(rows.iter())?;
612+
let array = match arrays.get(0) {
613+
Some(array) => array.clone(),
614+
None => {
615+
return internal_err!(
616+
"array_append: failed to get value from rows"
617+
)
618+
}
619+
};
620+
new_arrays.push(array);
646621
}
647-
call_array_function!(data_type, false)
622+
let field = Arc::new(Field::new("item", dt, true));
623+
let offsets = OffsetBuffer::new(offsets.into());
624+
let new_arrays_ref =
625+
new_arrays.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
626+
let values = compute::concat(&new_arrays_ref)?;
627+
Arc::new(ListArray::try_new(field, offsets, values, None)?)
648628
}
649629
};
650630

0 commit comments

Comments
 (0)