Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

refactor: Implement ArrowAsyncFileWriter directly to remove tokio #427

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
tokio = { workspace = true }
typed-builder = { workspace = true }
url = { workspace = true }
urlencoding = { workspace = true }
Expand All @@ -82,3 +81,4 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }
tokio = { workspace = true }
106 changes: 21 additions & 85 deletions crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

//! The module contains the file writer for parquet file format.

use std::pin::Pin;
use std::task::{Context, Poll};
use std::{
collections::HashMap,
sync::{atomic::AtomicI64, Arc},
Expand All @@ -34,7 +32,10 @@ use crate::{
use arrow_schema::SchemaRef as ArrowSchemaRef;
use bytes::Bytes;
use futures::future::BoxFuture;
use parquet::{arrow::AsyncArrowWriter, format::FileMetaData};
use parquet::{
arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter, arrow::AsyncArrowWriter,
format::FileMetaData,
};
use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties};

use super::{
Expand Down Expand Up @@ -256,97 +257,32 @@ impl CurrentFileStatus for ParquetWriter {
/// # NOTES
///
/// We keep this wrapper been used inside only.
///
/// # TODO
///
/// Maybe we can use the buffer from ArrowWriter directly.
struct AsyncFileWriter<W: FileWrite>(State<W>);

enum State<W: FileWrite> {
Idle(Option<W>),
Write(BoxFuture<'static, (W, Result<()>)>),
Close(BoxFuture<'static, (W, Result<()>)>),
}
struct AsyncFileWriter<W: FileWrite>(W);

impl<W: FileWrite> AsyncFileWriter<W> {
/// Create a new `AsyncFileWriter` with the given writer.
pub fn new(writer: W) -> Self {
Self(State::Idle(Some(writer)))
Self(writer)
}
}

impl<W: FileWrite> tokio::io::AsyncWrite for AsyncFileWriter<W> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
let this = self.get_mut();
loop {
match &mut this.0 {
State::Idle(w) => {
let mut writer = w.take().unwrap();
let bs = Bytes::copy_from_slice(buf);
let fut = async move {
let res = writer.write(bs).await;
(writer, res)
};
this.0 = State::Write(Box::pin(fut));
}
State::Write(fut) => {
let (writer, res) = futures::ready!(fut.as_mut().poll(cx));
this.0 = State::Idle(Some(writer));
return Poll::Ready(res.map(|_| buf.len()).map_err(|err| {
std::io::Error::new(std::io::ErrorKind::Other, Box::new(err))
}));
}
State::Close(_) => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"file is closed",
)));
}
}
}
}

fn poll_flush(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async {
self.0
.write(bs)
.await
.map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
})
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
let this = self.get_mut();
loop {
match &mut this.0 {
State::Idle(w) => {
let mut writer = w.take().unwrap();
let fut = async move {
let res = writer.close().await;
(writer, res)
};
this.0 = State::Close(Box::pin(fut));
}
State::Write(_) => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"file is writing",
)));
}
State::Close(fut) => {
let (writer, res) = futures::ready!(fut.as_mut().poll(cx));
this.0 = State::Idle(Some(writer));
return Poll::Ready(res.map_err(|err| {
std::io::Error::new(std::io::ErrorKind::Other, Box::new(err))
}));
}
}
}
fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async {
self.0
.close()
.await
.map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
})
}
}

Expand Down
Loading