Skip to content

Commit

Permalink
Refactor: use [fusio](https://github.com/tonbo-io/fusio) to implement…
Browse files Browse the repository at this point in the history
… the storage layer and support multiple storage (#163)

* refactor: use [fusio](https://github.com/tonbo-io/fusio) to implement the storage layer and support multiple storage

* chore: remove println macros codegen

* fix: blocking of LevelStream in `open_file` under Unix system

* refactor: use fusio on Benchmark

* fix: benchmark/writes.rs file not found

* test: add unit tests for serdes

* chore: update read & write for fusio

* fix: example datafusion projection error

* fix: `Fs::list` may not sorted

* ci: add exmaples check

* feat: enable SQL query support in datafusion example (#169)

Added support for executing SQL queries using DataFusion's parser and physical plan execution. This enhancement allows querying the "music" table with SQL statements, improving flexibility and functionality.

* chore: resolve review

---------

Co-authored-by: Xwg <loloxwg@gmail.com>
  • Loading branch information
KKould and loloxwg authored Sep 27, 2024
1 parent 2dea4ef commit 6a16796
Show file tree
Hide file tree
Showing 47 changed files with 1,739 additions and 1,142 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ jobs:
command: fmt
args: -- --check

exmaples:
name: Rust exmaples
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run datafusion example
uses: actions-rs/cargo@v1
with:
command: run
args: --example datafusion --features=datafusion

- name: Run declare example
uses: actions-rs/cargo@v1
with:
command: run
args: --example declare --all-features

benchmark:
name: Rust benchmark
runs-on: self-hosted
Expand Down
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,23 @@ path = "benches/criterion/writes.rs"
required-features = ["sled"]

[dependencies]
arrow = "52"
arrow = "53"
async-lock = "3"
async-stream = "0.3"
async-trait = { version = "0.1", optional = true }
bytes = { version = "1.7", optional = true }
crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "41", optional = true }
datafusion = { version = "42", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { git = "https://github.com/tonbo-io/fusio.git", package = "fusio", rev = "317b1b0621b297f52145b41b90506632f2dc7a1d", features = ["tokio", "dyn"] }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", package = "fusio-parquet", rev = "317b1b0621b297f52145b41b90506632f2dc7a1d" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
lockable = "0.0.8"
once_cell = "1"
parquet = { version = "52", features = ["async"] }
parquet = { version = "53", features = ["async"] }
pin-project-lite = "0.2"
regex = "1"
thiserror = "1"
Expand All @@ -74,6 +76,7 @@ tracing = "0.1"
ulid = "1"

# Only used for benchmarks
log = "0.4.22"
redb = { version = "2", optional = true }
rocksdb = { version = "0.22", optional = true }
sled = { version = "0.34", optional = true }
Expand Down
22 changes: 15 additions & 7 deletions benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ use std::{
fs::File,
io::{BufRead, BufReader},
path::{Path, PathBuf},
sync::Arc,
};

use async_stream::stream;
use fusio::local::TokioFs;
use futures_core::Stream;
use futures_util::StreamExt;
use parquet::data_type::AsBytes;
use redb::TableDefinition;
use rocksdb::{Direction, IteratorMode, TransactionDB};
use tonbo::{
executor::tokio::TokioExecutor, stream, transaction::TransactionEntry, DbOption, Projection,
executor::tokio::TokioExecutor, fs::manager::StoreManager, stream,
transaction::TransactionEntry, DbOption, Projection,
};
use tonbo_macros::Record;

Expand Down Expand Up @@ -222,15 +225,20 @@ impl BenchDatabase for TonboBenchDataBase {
}

async fn build(path: impl AsRef<Path>) -> Self {
let option = DbOption::from(path.as_ref()).disable_wal();
let manager = StoreManager::new(Arc::new(TokioFs), vec![]);
let option =
DbOption::from(fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap())
.disable_wal();

let db = tonbo::DB::new(option, TokioExecutor::new()).await.unwrap();
let db = tonbo::DB::new(option, TokioExecutor::new(), manager)
.await
.unwrap();
TonboBenchDataBase::new(db)
}
}

pub struct TonboBenchReadTransaction<'a> {
txn: tonbo::transaction::Transaction<'a, Customer, TokioExecutor>,
txn: tonbo::transaction::Transaction<'a, Customer>,
}

impl<'db> BenchReadTransaction for TonboBenchReadTransaction<'db> {
Expand All @@ -245,7 +253,7 @@ impl<'db> BenchReadTransaction for TonboBenchReadTransaction<'db> {
}

pub struct TonboBenchReader<'db, 'txn> {
txn: &'txn tonbo::transaction::Transaction<'db, Customer, TokioExecutor>,
txn: &'txn tonbo::transaction::Transaction<'db, Customer>,
}

impl BenchReader for TonboBenchReader<'_, '_> {
Expand Down Expand Up @@ -285,7 +293,7 @@ impl BenchReader for TonboBenchReader<'_, '_> {
}

pub struct TonboBenchWriteTransaction<'a> {
txn: tonbo::transaction::Transaction<'a, Customer, TokioExecutor>,
txn: tonbo::transaction::Transaction<'a, Customer>,
}

impl<'db> BenchWriteTransaction for TonboBenchWriteTransaction<'db> {
Expand All @@ -305,7 +313,7 @@ impl<'db> BenchWriteTransaction for TonboBenchWriteTransaction<'db> {
}

pub struct TonboBenchInserter<'db, 'txn> {
txn: &'txn mut tonbo::transaction::Transaction<'db, Customer, TokioExecutor>,
txn: &'txn mut tonbo::transaction::Transaction<'db, Customer>,
}

impl BenchInserter for TonboBenchInserter<'_, '_> {
Expand Down
15 changes: 12 additions & 3 deletions benches/criterion/writes.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{iter::repeat_with, sync::Arc};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use fusio::local::TokioFs;
use mimalloc::MiMalloc;
use tonbo::{executor::tokio::TokioExecutor, DbOption, Record, DB};
use tonbo::{executor::tokio::TokioExecutor, fs::manager::StoreManager, DbOption, Record, DB};

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand Down Expand Up @@ -55,10 +56,14 @@ fn single_write(c: &mut Criterion) {
let batches = [1, 16, 128];

let _ = std::fs::remove_dir_all("/tmp/tonbo");
let _ = std::fs::create_dir_all("/tmp/tonbo");

for batch in batches {
let option = DbOption::from("/tmp/tonbo").disable_wal();
let manager = StoreManager::new(Arc::new(TokioFs), vec![]);
let option = DbOption::from(fusio::path::Path::from_filesystem_path("/tmp/tonbo").unwrap())
.disable_wal();
let db = runtime
.block_on(async { DB::new(option, TokioExecutor::default()).await })
.block_on(async { DB::new(option, TokioExecutor::default(), manager).await })
.unwrap();

group.bench_with_input(BenchmarkId::new("Tonbo", batch), &batch, |b, batch| {
Expand All @@ -67,9 +72,12 @@ fn single_write(c: &mut Criterion) {
.iter(|| async { tonbo_write(&db, *batch).await });
});
let _ = std::fs::remove_dir_all("/tmp/tonbo");
let _ = std::fs::create_dir_all("/tmp/tonbo");
}

let _ = std::fs::remove_dir_all("/tmp/sled");
let _ = std::fs::create_dir_all("/tmp/sled");

for batch in batches {
let sled = sled::open("/tmp/sled").unwrap();
group.bench_with_input(BenchmarkId::new("Sled", batch), &batch, |b, batch| {
Expand All @@ -78,6 +86,7 @@ fn single_write(c: &mut Criterion) {
.iter(|| async { sled_write(&sled, *batch).await });
});
let _ = std::fs::remove_dir_all("/tmp/sled");
let _ = std::fs::create_dir_all("/tmp/sled");
}

group.finish();
Expand Down
6 changes: 2 additions & 4 deletions benches/read_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ mod common;

use std::{
collections::Bound,
env::current_dir,
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};

use futures_util::{future::join_all, StreamExt};
use tokio::io::AsyncWriteExt;
use tonbo::{executor::tokio::TokioExecutor, fs::FileProvider};
use tokio::{fs, io::AsyncWriteExt};

use crate::common::{
read_tbl, BenchDatabase, BenchReadTransaction, BenchReader, RedbBenchDatabase,
Expand Down Expand Up @@ -181,7 +179,7 @@ async fn main() {
println!();
println!("{table}");

let mut file = TokioExecutor::open("read_benchmark.md").await.unwrap();
let mut file = fs::File::create("read_benchmark.md").await.unwrap();
file.write_all(b"Read: \n```shell\n").await.unwrap();
for line in table.lines() {
file.write_all(line.as_bytes()).await.unwrap();
Expand Down
3 changes: 1 addition & 2 deletions benches/write_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use common::*;
use futures_util::future::join_all;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
use tonbo::{executor::tokio::TokioExecutor, fs::FileProvider};

const WRITE_TIMES: usize = 500_000;
const WRITE_BATCH_TIMES: usize = 5000;
Expand Down Expand Up @@ -227,7 +226,7 @@ async fn main() {
println!();
println!("{table}");

let mut file = TokioExecutor::open("write_benchmark.md").await.unwrap();
let mut file = tokio::fs::File::create("write_benchmark.md").await.unwrap();
file.write_all(b"Write: \n```shell\n").await.unwrap();
for line in table.lines() {
file.write_all(line.as_bytes()).await.unwrap();
Expand Down
48 changes: 41 additions & 7 deletions examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ use datafusion::{
error::{DataFusionError, Result},
execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
physical_expr::EquivalenceProperties,
physical_plan::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties},
physical_plan::{
execute_stream, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
},
prelude::*,
sql::parser::DFParser,
};
use fusio::{local::TokioFs, path::Path};
use futures_core::Stream;
use futures_util::StreamExt;
use tonbo::{executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DB};
use tokio::fs;
use tonbo::{
executor::tokio::TokioExecutor, fs::manager::StoreManager, inmem::immutable::ArrowArrays,
record::Record, DbOption, DB,
};
use tonbo_macros::Record;

#[derive(Record, Debug)]
Expand Down Expand Up @@ -198,7 +206,13 @@ impl ExecutionPlan for MusicExec {

#[tokio::main]
async fn main() -> Result<()> {
let db = DB::new("./db_path/music".into(), TokioExecutor::default())
// make sure the path exists
let _ = fs::create_dir_all("./db_path/music").await;

let manager = StoreManager::new(Arc::new(TokioFs), vec![]);
let options = DbOption::from(Path::from_filesystem_path("./db_path/music").unwrap());

let db = DB::new(options, TokioExecutor::default(), manager)
.await
.unwrap();
for (id, name, like) in [
Expand All @@ -214,9 +228,29 @@ async fn main() -> Result<()> {
let provider = MusicProvider { db: Arc::new(db) };
ctx.register_table("music", Arc::new(provider))?;

let df = ctx.table("music").await?;
let df = df.select(vec![col("name")])?;
let batches = df.collect().await?;
pretty::print_batches(&batches).unwrap();
{
let df = ctx.table("music").await?;
let df = df.select(vec![col("name")])?;
let batches = df.collect().await?;
pretty::print_batches(&batches).unwrap();
}

{
// support sql query for tonbo
let statements = DFParser::parse_sql("select * from music")?;
let plan = ctx
.state()
.statement_to_plan(statements.front().cloned().unwrap())
.await?;
ctx.execute_logical_plan(plan).await?;
let df = ctx.table("music").await?;
let physical_plan = df.create_physical_plan().await?;
let mut stream = execute_stream(physical_plan, ctx.task_ctx())?;
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
pretty::print_batches(&[batch]).unwrap();
}
}

Ok(())
}
15 changes: 12 additions & 3 deletions examples/declare.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::ops::Bound;
use std::{ops::Bound, sync::Arc};

use bytes::Bytes;
use fusio::{local::TokioFs, path::Path};
use futures_util::stream::StreamExt;
use tonbo::{executor::tokio::TokioExecutor, Projection, Record, DB};
use tokio::fs;
use tonbo::{
executor::tokio::TokioExecutor, fs::manager::StoreManager, DbOption, Projection, Record, DB,
};

/// Use macro to define schema of column family just like ORM
/// It provides type-safe read & write API
Expand All @@ -17,8 +21,13 @@ pub struct User {

#[tokio::main]
async fn main() {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/users").await;

let manager = StoreManager::new(Arc::new(TokioFs), vec![]);
let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap());
// pluggable async runtime and I/O
let db = DB::new("./db_path/users".into(), TokioExecutor::default())
let db = DB::new(options, TokioExecutor::default(), manager)
.await
.unwrap();

Expand Down
Loading

0 comments on commit 6a16796

Please # to comment.