|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; |
| 19 | +use arrow_cast::pretty::pretty_format_batches; |
| 20 | +use futures::TryStreamExt; |
| 21 | +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; |
| 22 | +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; |
| 23 | +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter}; |
| 24 | +use parquet::file::properties::{EnabledStatistics, WriterProperties}; |
| 25 | +use std::fs::File; |
| 26 | +use std::path::{Path, PathBuf}; |
| 27 | +use std::sync::Arc; |
| 28 | +use tempfile::TempDir; |
| 29 | + |
| 30 | +/// This example demonstrates advanced usage of the Parquet metadata APIs. |
| 31 | +/// |
| 32 | +/// It is sometimes desired to copy the metadata for parquet files stored on |
| 33 | +/// remote object storage (e.g. S3) to a local file or an in-memory cache, use a |
| 34 | +/// query engine like DataFusion to analyze the metadata to determine which file |
| 35 | +/// to read, and then read any matching files with a single subsequent object |
| 36 | +/// store request. |
| 37 | +/// |
| 38 | +/// # Example Overview |
| 39 | +/// |
| 40 | +/// 1. Reads the metadata of a Parquet file using [`ParquetMetaDataReader`] |
| 41 | +/// |
| 42 | +/// 2. Removes column statistics from the metadata (to make it smaller) |
| 43 | +/// |
| 44 | +/// 3. Stores the metadata in a separate file using [`ParquetMetaDataWriter`] |
| 45 | +/// |
| 46 | +/// 4. Reads the metadata from the separate file and uses that to read the |
| 47 | +/// Parquet file, thus avoiding a second IO to read metadata or reparsing |
| 48 | +/// the footer. |
| 49 | +/// |
| 50 | +#[tokio::main(flavor = "current_thread")] |
| 51 | +async fn main() -> parquet::errors::Result<()> { |
| 52 | + let tempdir = TempDir::new().unwrap(); |
| 53 | + let parquet_path = create_parquet_file(&tempdir); |
| 54 | + let metadata_path = tempdir.path().join("thrift_metadata.dat"); |
| 55 | + |
| 56 | + // In this example, we use a tokio file to mimic an async remote data source |
| 57 | + let mut remote_parquet_file = tokio::fs::File::open(&parquet_path).await?; |
| 58 | + |
| 59 | + let metadata = get_metadata_from_remote_parquet_file(&mut remote_parquet_file).await; |
| 60 | + println!( |
| 61 | + "Metadata from 'remote' Parquet file into memory: {} bytes", |
| 62 | + metadata.memory_size() |
| 63 | + ); |
| 64 | + |
| 65 | + // now slim down the metadata and write it to a "local" file |
| 66 | + let metadata = prepare_metadata(metadata); |
| 67 | + write_metadata_to_local_file(metadata, &metadata_path); |
| 68 | + |
| 69 | + // now read the metadata from the local file and use it to read the "remote" Parquet file |
| 70 | + let metadata = read_metadata_from_local_file(&metadata_path); |
| 71 | + println!("Read metadata from file"); |
| 72 | + |
| 73 | + let batches = read_remote_parquet_file_with_metadata(remote_parquet_file, metadata).await; |
| 74 | + |
| 75 | + // display the results |
| 76 | + let batches_string = pretty_format_batches(&batches).unwrap().to_string(); |
| 77 | + let batches_lines: Vec<_> = batches_string.split('\n').collect(); |
| 78 | + |
| 79 | + assert_eq!( |
| 80 | + batches_lines, |
| 81 | + [ |
| 82 | + "+-----+-------------+", |
| 83 | + "| id | description |", |
| 84 | + "+-----+-------------+", |
| 85 | + "| 100 | oranges |", |
| 86 | + "| 200 | apples |", |
| 87 | + "| 201 | grapefruit |", |
| 88 | + "| 300 | bannanas |", |
| 89 | + "| 102 | grapes |", |
| 90 | + "| 33 | pears |", |
| 91 | + "+-----+-------------+", |
| 92 | + ], |
| 93 | + "actual output:\n\n{batches_lines:#?}" |
| 94 | + ); |
| 95 | + |
| 96 | + Ok(()) |
| 97 | +} |
| 98 | + |
| 99 | +/// Reads the metadata from a "remote" parquet file |
| 100 | +/// |
| 101 | +/// Note that this function models reading from a remote file source using a |
| 102 | +/// tokio file. In a real application, you would implement [`MetadataFetch`] for |
| 103 | +/// your own remote source. |
| 104 | +/// |
| 105 | +/// [`MetadataFetch`]: parquet::arrow::async_reader::MetadataFetch |
| 106 | +async fn get_metadata_from_remote_parquet_file( |
| 107 | + remote_file: &mut tokio::fs::File, |
| 108 | +) -> ParquetMetaData { |
| 109 | + // the remote source must know the total file size (e.g. from an object store LIST operation) |
| 110 | + let file_size = remote_file.metadata().await.unwrap().len(); |
| 111 | + |
| 112 | + // tell the reader to read the page index |
| 113 | + ParquetMetaDataReader::new() |
| 114 | + .with_page_indexes(true) |
| 115 | + .load_and_finish(remote_file, file_size as usize) |
| 116 | + .await |
| 117 | + .unwrap() |
| 118 | +} |
| 119 | + |
| 120 | +/// modifies the metadata to reduce its size |
| 121 | +fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData { |
| 122 | + let orig_size = metadata.memory_size(); |
| 123 | + |
| 124 | + let mut builder = metadata.into_builder(); |
| 125 | + |
| 126 | + // remove column statistics to reduce the size of the metadata by converting |
| 127 | + // the various structures into their respective builders and modifying them |
| 128 | + // as needed. |
| 129 | + for row_group in builder.take_row_groups() { |
| 130 | + let mut row_group_builder = row_group.into_builder(); |
| 131 | + for column in row_group_builder.take_columns() { |
| 132 | + let column = column.into_builder().clear_statistics().build().unwrap(); |
| 133 | + row_group_builder = row_group_builder.add_column_metadata(column); |
| 134 | + } |
| 135 | + let row_group = row_group_builder.build().unwrap(); |
| 136 | + builder = builder.add_row_group(row_group); |
| 137 | + } |
| 138 | + let metadata = builder.build(); |
| 139 | + |
| 140 | + // verifiy that the size has indeed been reduced |
| 141 | + let new_size = metadata.memory_size(); |
| 142 | + assert!(new_size < orig_size, "metadata size did not decrease"); |
| 143 | + println!("Reduced metadata size from {} to {}", orig_size, new_size); |
| 144 | + metadata |
| 145 | +} |
| 146 | + |
| 147 | +/// writes the metadata to a file |
| 148 | +/// |
| 149 | +/// The data is stored using the same thrift format as the Parquet file metadata |
| 150 | +fn write_metadata_to_local_file(metadata: ParquetMetaData, file: impl AsRef<Path>) { |
| 151 | + let file = File::create(file).unwrap(); |
| 152 | + ParquetMetaDataWriter::new(file, &metadata) |
| 153 | + .finish() |
| 154 | + .unwrap() |
| 155 | +} |
| 156 | + |
| 157 | +/// Reads the metadata from a file |
| 158 | +/// |
| 159 | +/// This function reads the format written by `write_metadata_to_file` |
| 160 | +fn read_metadata_from_local_file(file: impl AsRef<Path>) -> ParquetMetaData { |
| 161 | + let file = File::open(file).unwrap(); |
| 162 | + ParquetMetaDataReader::new() |
| 163 | + .with_column_indexes(true) |
| 164 | + .with_offset_indexes(true) |
| 165 | + .parse_and_finish(&file) |
| 166 | + .unwrap() |
| 167 | +} |
| 168 | + |
| 169 | +/// Reads the "remote" Parquet file using the metadata |
| 170 | +/// |
| 171 | +/// This shows how to read the Parquet file using previously read metadata |
| 172 | +/// instead of the metadata in the Parquet file itself. This avoids an IO / |
| 173 | +/// having to fetch and decode the metadata from the Parquet file before |
| 174 | +/// beginning to read it. |
| 175 | +/// |
| 176 | +/// Note that this function models reading from a remote file source using a |
| 177 | +/// tokio file. In a real application, you would implement [`AsyncFileReader`] |
| 178 | +/// for your own remote source. |
| 179 | +/// |
| 180 | +/// In this example, we simply buffer the results in memory as Arrow record |
| 181 | +/// batches but a real application would likely process the batches as they are |
| 182 | +/// read. |
| 183 | +/// |
| 184 | +/// [`AsyncFileReader`]: parquet::arrow::async_reader::AsyncFileReader |
| 185 | +async fn read_remote_parquet_file_with_metadata( |
| 186 | + remote_file: tokio::fs::File, |
| 187 | + metadata: ParquetMetaData, |
| 188 | +) -> Vec<RecordBatch> { |
| 189 | + let options = ArrowReaderOptions::new() |
| 190 | + // tell the reader to read the page index |
| 191 | + .with_page_index(true); |
| 192 | + // create a reader with pre-existing metadata |
| 193 | + let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap(); |
| 194 | + let reader = |
| 195 | + ParquetRecordBatchStreamBuilder::new_with_metadata(remote_file, arrow_reader_metadata) |
| 196 | + .build() |
| 197 | + .unwrap(); |
| 198 | + |
| 199 | + reader.try_collect::<Vec<_>>().await.unwrap() |
| 200 | +} |
| 201 | + |
| 202 | +/// Make a new parquet file in the temporary directory, and returns the path |
| 203 | +fn create_parquet_file(tmpdir: &TempDir) -> PathBuf { |
| 204 | + let path = tmpdir.path().join("example.parquet"); |
| 205 | + let new_file = File::create(&path).unwrap(); |
| 206 | + |
| 207 | + let batch = RecordBatch::try_from_iter(vec![ |
| 208 | + ( |
| 209 | + "id", |
| 210 | + Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef, |
| 211 | + ), |
| 212 | + ( |
| 213 | + "description", |
| 214 | + Arc::new(StringArray::from(vec![ |
| 215 | + "oranges", |
| 216 | + "apples", |
| 217 | + "grapefruit", |
| 218 | + "bannanas", |
| 219 | + "grapes", |
| 220 | + "pears", |
| 221 | + ])), |
| 222 | + ), |
| 223 | + ]) |
| 224 | + .unwrap(); |
| 225 | + |
| 226 | + let props = WriterProperties::builder() |
| 227 | + // ensure we write the page index level statistics |
| 228 | + .set_statistics_enabled(EnabledStatistics::Page) |
| 229 | + .build(); |
| 230 | + |
| 231 | + let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap(); |
| 232 | + |
| 233 | + writer.write(&batch).unwrap(); |
| 234 | + writer.finish().unwrap(); |
| 235 | + |
| 236 | + path |
| 237 | +} |
0 commit comments