diff --git a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs index fd91ea1cc538..855a8d0dbf40 100644 --- a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs @@ -1536,12 +1536,10 @@ mod test { .unwrap() .resolve(&schema) .unwrap(); - let r4 = apache_avro::to_value(serde_json::json!({ - "col1": null - })) - .unwrap() - .resolve(&schema) - .unwrap(); + let r4 = apache_avro::to_value(serde_json::json!({ "col1": null })) + .unwrap() + .resolve(&schema) + .unwrap(); let mut w = apache_avro::Writer::new(&schema, vec![]); w.append(r1).unwrap(); @@ -1600,12 +1598,10 @@ mod test { }"#, ) .unwrap(); - let r1 = apache_avro::to_value(serde_json::json!({ - "col1": null - })) - .unwrap() - .resolve(&schema) - .unwrap(); + let r1 = apache_avro::to_value(serde_json::json!({ "col1": null })) + .unwrap() + .resolve(&schema) + .unwrap(); let r2 = apache_avro::to_value(serde_json::json!({ "col1": { "col2": "hello" diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 64550aabf424..a7dc45b929c0 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -577,58 +577,6 @@ pub fn array_pop_back(args: &[ArrayRef]) -> Result { ) } -macro_rules! append { - ($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{ - let mut offsets: Vec = vec![0]; - let mut values = - downcast_arg!(new_empty_array($ELEMENT.data_type()), $ARRAY_TYPE).clone(); - - let element = downcast_arg!($ELEMENT, $ARRAY_TYPE); - for (arr, el) in $ARRAY.iter().zip(element.iter()) { - let last_offset: i32 = offsets.last().copied().ok_or_else(|| { - DataFusionError::Internal(format!("offsets should not be empty")) - })?; - match arr { - Some(arr) => { - let child_array = downcast_arg!(arr, $ARRAY_TYPE); - values = downcast_arg!( - compute::concat(&[ - &values, - child_array, - &$ARRAY_TYPE::from(vec![el]) - ])? - .clone(), - $ARRAY_TYPE - ) - .clone(); - offsets.push(last_offset + child_array.len() as i32 + 1i32); - } - None => { - values = downcast_arg!( - compute::concat(&[ - &values, - &$ARRAY_TYPE::from(vec![el.clone()]) - ])? - .clone(), - $ARRAY_TYPE - ) - .clone(); - offsets.push(last_offset + 1i32); - } - } - } - - let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), true)); - - Arc::new(ListArray::try_new( - field, - OffsetBuffer::new(offsets.into()), - Arc::new(values), - None, - )?) - }}; -} - /// Array_append SQL function pub fn array_append(args: &[ArrayRef]) -> Result { let arr = as_list_array(&args[0])?; @@ -639,68 +587,51 @@ pub fn array_append(args: &[ArrayRef]) -> Result { DataType::List(_) => concat_internal(args)?, DataType::Null => return make_array(&[element.to_owned()]), data_type => { - macro_rules! array_function { - ($ARRAY_TYPE:ident) => { - append!(arr, element, $ARRAY_TYPE) + let mut new_values = vec![]; + let mut offsets = vec![0]; + + let elem_data = element.to_data(); + for (row_index, arr) in arr.iter().enumerate() { + let new_array = if let Some(arr) = arr { + let original_data = arr.to_data(); + let capacity = Capacities::Array(original_data.len() + 1); + let mut mutable = MutableArrayData::with_capacities( + vec![&original_data, &elem_data], + false, + capacity, + ); + mutable.extend(0, 0, original_data.len()); + mutable.extend(1, row_index, row_index + 1); + let data = mutable.freeze(); + arrow_array::make_array(data) + } else { + let capacity = Capacities::Array(1); + let mut mutable = MutableArrayData::with_capacities( + vec![&elem_data], + false, + capacity, + ); + mutable.extend(0, row_index, row_index + 1); + let data = mutable.freeze(); + arrow_array::make_array(data) }; + offsets.push(offsets[row_index] + new_array.len() as i32); + new_values.push(new_array); } - call_array_function!(data_type, false) - } - }; - - Ok(res) -} -macro_rules! prepend { - ($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident) => {{ - let mut offsets: Vec = vec![0]; - let mut values = - downcast_arg!(new_empty_array($ELEMENT.data_type()), $ARRAY_TYPE).clone(); + let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect(); + let values = arrow::compute::concat(&new_values)?; - let element = downcast_arg!($ELEMENT, $ARRAY_TYPE); - for (arr, el) in $ARRAY.iter().zip(element.iter()) { - let last_offset: i32 = offsets.last().copied().ok_or_else(|| { - DataFusionError::Internal(format!("offsets should not be empty")) - })?; - match arr { - Some(arr) => { - let child_array = downcast_arg!(arr, $ARRAY_TYPE); - values = downcast_arg!( - compute::concat(&[ - &values, - &$ARRAY_TYPE::from(vec![el]), - child_array - ])? - .clone(), - $ARRAY_TYPE - ) - .clone(); - offsets.push(last_offset + child_array.len() as i32 + 1i32); - } - None => { - values = downcast_arg!( - compute::concat(&[ - &values, - &$ARRAY_TYPE::from(vec![el.clone()]) - ])? - .clone(), - $ARRAY_TYPE - ) - .clone(); - offsets.push(last_offset + 1i32); - } - } + Arc::new(ListArray::try_new( + Arc::new(Field::new("item", data_type.to_owned(), true)), + OffsetBuffer::new(offsets.into()), + values, + None, + )?) } + }; - let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), true)); - - Arc::new(ListArray::try_new( - field, - OffsetBuffer::new(offsets.into()), - Arc::new(values), - None, - )?) - }}; + Ok(res) } /// Array_prepend SQL function @@ -713,12 +644,47 @@ pub fn array_prepend(args: &[ArrayRef]) -> Result { DataType::List(_) => concat_internal(args)?, DataType::Null => return make_array(&[element.to_owned()]), data_type => { - macro_rules! array_function { - ($ARRAY_TYPE:ident) => { - prepend!(arr, element, $ARRAY_TYPE) + let mut new_values = vec![]; + let mut offsets = vec![0]; + + let elem_data = element.to_data(); + for (row_index, arr) in arr.iter().enumerate() { + let new_array = if let Some(arr) = arr { + let original_data = arr.to_data(); + let capacity = Capacities::Array(original_data.len() + 1); + let mut mutable = MutableArrayData::with_capacities( + vec![&original_data, &elem_data], + false, + capacity, + ); + mutable.extend(1, row_index, row_index + 1); + mutable.extend(0, 0, original_data.len()); + let data = mutable.freeze(); + arrow_array::make_array(data) + } else { + let capacity = Capacities::Array(1); + let mut mutable = MutableArrayData::with_capacities( + vec![&elem_data], + false, + capacity, + ); + mutable.extend(0, row_index, row_index + 1); + let data = mutable.freeze(); + arrow_array::make_array(data) }; + offsets.push(offsets[row_index] + new_array.len() as i32); + new_values.push(new_array); } - call_array_function!(data_type, false) + + let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect(); + let values = arrow::compute::concat(&new_values)?; + + Arc::new(ListArray::try_new( + Arc::new(Field::new("item", data_type.to_owned(), true)), + OffsetBuffer::new(offsets.into()), + values, + None, + )?) } };