Skip to content

Commit

Permalink
Add support for to/from json with extension codec.
Browse files Browse the repository at this point in the history
  • Loading branch information
Omega359 committed Feb 7, 2025
1 parent 4ef7d73 commit d990122
Showing 1 changed file with 31 additions and 12 deletions.
43 changes: 31 additions & 12 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,7 @@ pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
#[cfg(feature = "json")]
pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
let extension_codec = DefaultLogicalExtensionCodec {};
let protobuf =
protobuf::LogicalPlanNode::try_from_logical_plan(plan, &extension_codec)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
serde_json::to_string(&protobuf)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
logical_plan_to_json_with_extension_codec(plan, &extension_codec)
}

/// Serialize a LogicalPlan as bytes, using the provided extension codec
Expand All @@ -220,13 +216,17 @@ pub fn logical_plan_to_bytes_with_extension_codec(
Ok(buffer.into())
}

/// Deserialize a LogicalPlan from JSON
/// Serialize a LogicalPlan as JSON
#[cfg(feature = "json")]
pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultLogicalExtensionCodec {};
back.try_into_logical_plan(ctx, &extension_codec)
pub fn logical_plan_to_json_with_extension_codec(
plan: &LogicalPlan,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<String> {
let protobuf =
protobuf::LogicalPlanNode::try_from_logical_plan(plan, &extension_codec)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
serde_json::to_string(&protobuf)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
}

/// Deserialize a LogicalPlan from bytes
Expand All @@ -238,6 +238,13 @@ pub fn logical_plan_from_bytes(
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
}

/// Deserialize a LogicalPlan from JSON
#[cfg(feature = "json")]
pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
}

/// Deserialize a LogicalPlan from bytes
pub fn logical_plan_from_bytes_with_extension_codec(
bytes: &[u8],
Expand All @@ -249,6 +256,18 @@ pub fn logical_plan_from_bytes_with_extension_codec(
protobuf.try_into_logical_plan(ctx, extension_codec)
}

/// Deserialize a LogicalPlan from JSON
#[cfg(feature = "json")]
pub fn logical_plan_from_json_with_extension_codec(
json: &str,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
back.try_into_logical_plan(ctx, &extension_codec)
}

/// Serialize a PhysicalPlan as bytes
pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
let extension_codec = DefaultPhysicalExtensionCodec {};
Expand Down Expand Up @@ -287,7 +306,7 @@ pub fn physical_plan_from_json(
ctx: &SessionContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
.map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
let extension_codec = DefaultPhysicalExtensionCodec {};
back.try_into_physical_plan(ctx, &ctx.runtime_env(), &extension_codec)
}
Expand Down

0 comments on commit d990122

Please # to comment.