From 9278f9367929061640b115db615ee96419019786 Mon Sep 17 00:00:00 2001 From: Xwg Date: Fri, 27 Sep 2024 10:41:40 +0800 Subject: [PATCH] feat: enable SQL query support in datafusion example Added support for executing SQL queries using DataFusion's parser and physical plan execution. This enhancement allows querying the "music" table with SQL statements, improving flexibility and functionality. --- examples/datafusion.rs | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/examples/datafusion.rs b/examples/datafusion.rs index d7127c3e..6f0a1012 100644 --- a/examples/datafusion.rs +++ b/examples/datafusion.rs @@ -17,8 +17,11 @@ use datafusion::{ error::{DataFusionError, Result}, execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}, physical_expr::EquivalenceProperties, - physical_plan::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties}, + physical_plan::{ + execute_stream, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, + }, prelude::*, + sql::parser::DFParser, }; use fusio::{local::TokioFs, path::Path}; use futures_core::Stream; @@ -225,9 +228,29 @@ async fn main() -> Result<()> { let provider = MusicProvider { db: Arc::new(db) }; ctx.register_table("music", Arc::new(provider))?; - let df = ctx.table("music").await?; - let df = df.select(vec![col("name")])?; - let batches = df.collect().await?; - pretty::print_batches(&batches).unwrap(); + { + let df = ctx.table("music").await?; + let df = df.select(vec![col("name")])?; + let batches = df.collect().await?; + pretty::print_batches(&batches).unwrap(); + } + + { + // support sql query for tonbo + let statements = DFParser::parse_sql("select * from music")?; + let plan = ctx + .state() + .statement_to_plan(statements.front().cloned().unwrap()) + .await?; + ctx.execute_logical_plan(plan).await?; + let df = ctx.table("music").await?; + let physical_plan = df.create_physical_plan().await?; + let mut stream = execute_stream(physical_plan, ctx.task_ctx())?; + while let Some(maybe_batch) = stream.next().await { + let batch = maybe_batch?; + pretty::print_batches(&[batch]).unwrap(); + } + } + Ok(()) }