Skip to content

Commit 427248f

Browse files
Lordwormsfindepi
authored andcommitted
adding benchmark for extracting arrow statistics from parquet (apache#10610)
* adding benchmark for extracting arrow statistics from parquet * fix clippy * fix clippy
1 parent 752529f commit 427248f

File tree

2 files changed

+209
-0
lines changed

2 files changed

+209
-0
lines changed

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,3 +209,7 @@ name = "sort"
209209
[[bench]]
210210
harness = false
211211
name = "topk_aggregate"
212+
213+
[[bench]]
214+
harness = false
215+
name = "parquet_statistic"
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks of benchmark for extracting arrow statistics from parquet
19+
20+
use arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray, UInt64Array};
21+
use arrow_array::{Int32Array, RecordBatch};
22+
use arrow_schema::{
23+
DataType::{self, *},
24+
Field, Schema,
25+
};
26+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
27+
use datafusion::datasource::physical_plan::parquet::{
28+
RequestedStatistics, StatisticsConverter,
29+
};
30+
use parquet::arrow::{arrow_reader::ArrowReaderBuilder, ArrowWriter};
31+
use parquet::file::properties::WriterProperties;
32+
use std::sync::Arc;
33+
use tempfile::NamedTempFile;
34+
#[derive(Debug, Clone)]
35+
enum TestTypes {
36+
UInt64,
37+
F64,
38+
String,
39+
Dictionary,
40+
}
41+
42+
use std::fmt;
43+
44+
impl fmt::Display for TestTypes {
45+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
46+
match self {
47+
TestTypes::UInt64 => write!(f, "UInt64"),
48+
TestTypes::F64 => write!(f, "F64"),
49+
TestTypes::String => write!(f, "String"),
50+
TestTypes::Dictionary => write!(f, "Dictionary(Int32, String)"),
51+
}
52+
}
53+
}
54+
55+
fn create_parquet_file(dtype: TestTypes, row_groups: usize) -> NamedTempFile {
56+
let schema = match dtype {
57+
TestTypes::UInt64 => {
58+
Arc::new(Schema::new(vec![Field::new("col", DataType::UInt64, true)]))
59+
}
60+
TestTypes::F64 => Arc::new(Schema::new(vec![Field::new(
61+
"col",
62+
DataType::Float64,
63+
true,
64+
)])),
65+
TestTypes::String => {
66+
Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, true)]))
67+
}
68+
TestTypes::Dictionary => Arc::new(Schema::new(vec![Field::new(
69+
"col",
70+
DataType::Dictionary(Box::new(Int32), Box::new(Utf8)),
71+
true,
72+
)])),
73+
};
74+
75+
let props = WriterProperties::builder().build();
76+
let file = tempfile::Builder::new()
77+
.suffix(".parquet")
78+
.tempfile()
79+
.unwrap();
80+
let mut writer =
81+
ArrowWriter::try_new(file.reopen().unwrap(), schema.clone(), Some(props))
82+
.unwrap();
83+
84+
for _ in 0..row_groups {
85+
let batch = match dtype {
86+
TestTypes::UInt64 => make_uint64_batch(),
87+
TestTypes::F64 => make_f64_batch(),
88+
TestTypes::String => make_string_batch(),
89+
TestTypes::Dictionary => make_dict_batch(),
90+
};
91+
writer.write(&batch).unwrap();
92+
}
93+
writer.close().unwrap();
94+
file
95+
}
96+
97+
fn make_uint64_batch() -> RecordBatch {
98+
let array: ArrayRef = Arc::new(UInt64Array::from(vec![
99+
Some(1),
100+
Some(2),
101+
Some(3),
102+
Some(4),
103+
Some(5),
104+
]));
105+
RecordBatch::try_new(
106+
Arc::new(arrow::datatypes::Schema::new(vec![
107+
arrow::datatypes::Field::new("col", UInt64, false),
108+
])),
109+
vec![array],
110+
)
111+
.unwrap()
112+
}
113+
114+
fn make_f64_batch() -> RecordBatch {
115+
let array: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]));
116+
RecordBatch::try_new(
117+
Arc::new(arrow::datatypes::Schema::new(vec![
118+
arrow::datatypes::Field::new("col", Float64, false),
119+
])),
120+
vec![array],
121+
)
122+
.unwrap()
123+
}
124+
125+
fn make_string_batch() -> RecordBatch {
126+
let array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
127+
RecordBatch::try_new(
128+
Arc::new(arrow::datatypes::Schema::new(vec![
129+
arrow::datatypes::Field::new("col", Utf8, false),
130+
])),
131+
vec![array],
132+
)
133+
.unwrap()
134+
}
135+
136+
fn make_dict_batch() -> RecordBatch {
137+
let keys = Int32Array::from(vec![0, 1, 2, 3, 4]);
138+
let values = StringArray::from(vec!["a", "b", "c", "d", "e"]);
139+
let array: ArrayRef =
140+
Arc::new(DictionaryArray::try_new(keys, Arc::new(values)).unwrap());
141+
RecordBatch::try_new(
142+
Arc::new(Schema::new(vec![Field::new(
143+
"col",
144+
Dictionary(Box::new(Int32), Box::new(Utf8)),
145+
false,
146+
)])),
147+
vec![array],
148+
)
149+
.unwrap()
150+
}
151+
152+
fn criterion_benchmark(c: &mut Criterion) {
153+
let row_groups = 100;
154+
use TestTypes::*;
155+
let types = vec![UInt64, F64, String, Dictionary];
156+
157+
for dtype in types {
158+
let file = create_parquet_file(dtype.clone(), row_groups);
159+
let file = file.reopen().unwrap();
160+
let reader = ArrowReaderBuilder::try_new(file).unwrap();
161+
let metadata = reader.metadata();
162+
163+
let mut group =
164+
c.benchmark_group(format!("Extract statistics for {}", dtype.clone()));
165+
group.bench_function(
166+
BenchmarkId::new("extract_statistics", dtype.clone()),
167+
|b| {
168+
b.iter(|| {
169+
let _ = StatisticsConverter::try_new(
170+
"col",
171+
RequestedStatistics::Min,
172+
reader.schema(),
173+
)
174+
.unwrap()
175+
.extract(metadata)
176+
.unwrap();
177+
178+
let _ = StatisticsConverter::try_new(
179+
"col",
180+
RequestedStatistics::Max,
181+
reader.schema(),
182+
)
183+
.unwrap()
184+
.extract(reader.metadata())
185+
.unwrap();
186+
187+
let _ = StatisticsConverter::try_new(
188+
"col",
189+
RequestedStatistics::NullCount,
190+
reader.schema(),
191+
)
192+
.unwrap()
193+
.extract(reader.metadata())
194+
.unwrap();
195+
196+
let _ = StatisticsConverter::row_counts(reader.metadata()).unwrap();
197+
})
198+
},
199+
);
200+
group.finish();
201+
}
202+
}
203+
204+
criterion_group!(benches, criterion_benchmark);
205+
criterion_main!(benches);

0 commit comments

Comments
 (0)