Skip to content

Error writing STRUCT to parquet in parallel: internal error: entered unreachable code: cannot downcast Int64 to byte array #8853

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closed
alamb opened this issue Jan 13, 2024 · 5 comments · Fixed by #8923
Labels
bug Something isn't working help wanted Extra attention is needed

Comments

@alamb
Copy link
Contributor

alamb commented Jan 13, 2024

Describe the bug

I can't write a struct to parquet when trying to write in parallel, instead I get the following error

internal error: entered unreachable code: cannot downcast Int64 to byte array

To Reproduce

$ datafusion-cli
DataFusion CLI v34.0.0
❯ create table t as values (struct ('foo', 1)), (struct('bar', 2));
0 rows in set. Query took 0.004 seconds.

❯ select * from t;
+------------------+
| column1          |
+------------------+
| {c0: foo, c1: 1} |
| {c0: bar, c1: 2} |
+------------------+
2 rows in set. Query took 0.001 seconds.

❯ copy (select * from t) to '/tmp/foo.parquet';
thread 'tokio-runtime-worker' panicked at /Users/andrewlamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/parquet-49.0.0/src/arrow/arrow_writer/byte_array.rs:441:9:
internal error: entered unreachable code: cannot downcast Int64 to byte array
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Expected behavior

I expect the parquet file to be written successfully. This works fine with JSON:

$ datafusion-cli
DataFusion CLI v34.0.0
❯ create table t as values (struct ('foo', 1)), (struct('bar', 2));
0 rows in set. Query took 0.003 seconds.

❯ select * from t;
+------------------+
| column1          |
+------------------+
| {c0: foo, c1: 1} |
| {c0: bar, c1: 2} |
+------------------+
2 rows in set. Query took 0.001 seconds.

❯ copy (select * from t) to '/tmp/foo.json';
+-------+
| count |
+-------+
| 2     |
+-------+
1 row in set. Query took 0.010 seconds.

❯
\q
$ cat /tmp/foo.json
{"column1":{"c0":"foo","c1":1}}
{"column1":{"c0":"bar","c1":2}}

Additional context

No response

@alamb alamb added bug Something isn't working help wanted Extra attention is needed labels Jan 13, 2024
@devinjdangelo
Copy link
Contributor

Similar to #8851, the parallelized parquet writer code is to blame here. There is something wrong with how that code is handling nested types.

DataFusion CLI v34.0.0
❯ set datafusion.execution.parquet.allow_single_file_parallelism = false;
0 rows in set. Query took 0.010 seconds.

❯ create table t as values (struct ('foo', 1)), (struct('bar', 2));
0 rows in set. Query took 0.002 seconds.

❯ copy (select * from t) to '/tmp/foo.parquet';
+-------+
| count |
+-------+
| 2     |
+-------+
1 row in set. Query took 0.021 seconds.

@devinjdangelo
Copy link
Contributor

devinjdangelo commented Jan 13, 2024

I tried to make a minimal reproducer with just arrow-rs, but it appears to work fine.

It must be then that there is an issue with the tokio implementation of this logic in DataFusion.

use std::sync::Arc;
use arrow_array::*;
use arrow_schema::*;
use parquet::arrow::arrow_to_parquet_schema;
use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers};
use parquet::file::properties::WriterProperties;
use parquet::file::writer::SerializedFileWriter;

fn main(){
    let schema = Arc::new(Schema::new(vec![
        Field::new("struct", DataType::Struct(vec![
            Field::new("b", DataType::Boolean, false),
            Field::new("c", DataType::Int32, false),].into()), false
    )
    ]));
    
    // Compute the parquet schema
    let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
    let props = Arc::new(WriterProperties::default());
    
    // Create writers for each of the leaf columns
    let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
    
    // Spawn a worker thread for each column
    // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better
    let mut workers: Vec<_> = col_writers
        .into_iter()
        .map(|mut col_writer| {
            let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
            let handle = std::thread::spawn(move || {
                for col in recv {
                    col_writer.write(&col)?;
                }
                col_writer.close()
            });
            (handle, send)
        })
        .collect();
    
    // Create parquet writer
    let root_schema = parquet_schema.root_schema_ptr();
    let mut out = Vec::with_capacity(1024); // This could be a File
    let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone()).unwrap();
    
    // Start row group
    let mut row_group = writer.next_row_group().unwrap();
    
    let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true]));
    let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31]));
    
    // Columns to encode
    let to_write = vec![Arc::new(
        StructArray::from(vec![
        (
            Arc::new(Field::new("b", DataType::Boolean, false)),
            boolean.clone() as ArrayRef,
        ),
        (
            Arc::new(Field::new("c", DataType::Int32, false)),
            int.clone() as ArrayRef,
        ),
    ])) as _,
    ];
    
    // Spawn work to encode columns
    let mut worker_iter = workers.iter_mut();
    for (arr, field) in to_write.iter().zip(&schema.fields) {
        for leaves in compute_leaves(field, arr).unwrap() {
            worker_iter.next().unwrap().1.send(leaves).unwrap();
        }
    }
    
    // Finish up parallel column encoding
    for (handle, send) in workers {
        drop(send); // Drop send side to signal termination
        let chunk = handle.join().unwrap().unwrap();
        chunk.append_to_row_group(&mut row_group).unwrap();
    }
    row_group.close().unwrap();
    
    let metadata = writer.close().unwrap();
    assert_eq!(metadata.num_rows, 4);
}

@devinjdangelo
Copy link
Contributor

@tustvold is it apparent to you what the issue is within the DataFusion parallel parquet code?

If not, I propose we disable the feature by default and add many more tests to cover writing nested parquet files and other data types like dictionaries (#8854). Then take a longer time and likely multiple PRs to bring the parallel parquet writer in DataFusion to feature parity with the non-parallel version.

@tustvold
Copy link
Contributor

I can take some time to take a look next week, my guess is it is something in the logic that performs slicing for row group parallelism

@alamb
Copy link
Contributor Author

alamb commented Jan 16, 2024

I don't think this issue should block the datafusion release. @devinjdangelo set the feature allow_single_file_parallelism to false by default in #8854 and added test coverage

I updated this PR's description to mention that.

Once we re-enable single file parallelism by default we should verify this query still works

@alamb alamb changed the title Error writing STRUCT to parquet: internal error: entered unreachable code: cannot downcast Int64 to byte array Error writing STRUCT to parquet in parallel: internal error: entered unreachable code: cannot downcast Int64 to byte array Jan 16, 2024
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
bug Something isn't working help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants