Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

remove tokio from video capture and simplify pipeline #164

Merged
merged 1 commit into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions crates/kornia-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,17 @@ log = "0.4"
thiserror = "1"

# optional dependencies
futures = { version = "0.3.1", optional = true }
gst = { version = "0.23.0", package = "gstreamer", optional = true }
gst-app = { version = "0.23.0", package = "gstreamer-app", optional = true }
memmap2 = "0.9.4"
tokio = { version = "1", features = [
"sync",
"rt-multi-thread",
"macros",
], optional = true }
turbojpeg = { version = "1.0.0", optional = true }

[dev-dependencies]
criterion = "0.5"
tempfile = "3.10"

[features]
gstreamer = ["futures", "gst", "gst-app", "tokio"]
gstreamer = ["gst", "gst-app"]
jpegturbo = ["turbojpeg"]

[[bench]]
Expand Down
10 changes: 10 additions & 0 deletions crates/kornia-io/src/stream/camera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,13 @@ impl std::ops::Deref for CameraCapture {
&self.0
}
}

/// Allows `CameraCapture` to be dereferenced to `StreamCapture`.
///
/// This implementation enables direct access to `StreamCapture` methods
/// on a `CameraCapture` instance.
impl std::ops::DerefMut for CameraCapture {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
234 changes: 57 additions & 177 deletions crates/kornia-io/src/stream/capture.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use crate::stream::error::StreamCaptureError;
use futures::prelude::*;
use gst::prelude::*;
use kornia_image::{Image, ImageSize};

/// Represents a stream capture pipeline using GStreamer.
pub struct StreamCapture {
pipeline: gst::Pipeline,
last_frame: Arc<Mutex<Option<Image<u8, 3>>>>,
running: bool,
handle: Option<std::thread::JoinHandle<()>>,
}

impl StreamCapture {
Expand All @@ -27,194 +29,53 @@ impl StreamCapture {
.dynamic_cast::<gst::Pipeline>()
.map_err(StreamCaptureError::DowncastPipelineError)?;

Ok(Self { pipeline })
}

/// Runs the stream capture pipeline and processes each frame with the provided function.
///
/// # Arguments
///
/// * `f` - A function that processes each captured image frame.
///
/// # Returns
///
/// A Result indicating success or a StreamCaptureError.
pub async fn run<F>(&self, mut f: F) -> Result<(), StreamCaptureError>
where
F: FnMut(Image<u8, 3>) -> Result<(), Box<dyn std::error::Error + Send + Sync>>,
{
self.run_internal(
|img| futures::future::ready(f(img)),
None::<futures::future::Ready<()>>,
)
.await
}

/// Runs the stream capture pipeline with a termination signal and processes each frame.
///
/// # Arguments
///
/// * `f` - A function that processes each captured image frame.
/// * `signal` - A future that, when resolved, will terminate the capture.
///
/// # Returns
///
/// A Result indicating success or a StreamCaptureError.
pub async fn run_with_termination<F, Fut, S>(
&self,
f: F,
signal: S,
) -> Result<(), StreamCaptureError>
where
F: FnMut(Image<u8, 3>) -> Fut,
Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>,
S: Future<Output = ()>,
{
self.run_internal(f, Some(signal)).await
}

/// Internal method to run the stream capture pipeline.
///
/// # Arguments
///
/// * `f` - A function that processes each captured image frame.
/// * `signal` - An optional future that, when resolved, will terminate the capture.
///
/// # Returns
///
/// A Result indicating success or a StreamCaptureError.
async fn run_internal<F, Fut, S>(
&self,
mut f: F,
signal: Option<S>,
) -> Result<(), StreamCaptureError>
where
F: FnMut(Image<u8, 3>) -> Fut,
Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>,
S: Future<Output = ()>,
{
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let (signal_tx, mut signal_rx) = tokio::sync::watch::channel(());
let signal_tx = Arc::new(signal_tx);

self.setup_pipeline(&tx)?;

// Start the pipeline
self.pipeline.set_state(gst::State::Playing)?;

// Set up bus message handling
let bus = self
.pipeline
.bus()
.ok_or_else(|| StreamCaptureError::BusError)?;
self.spawn_bus_handler(bus, signal_tx.clone());

let mut sig = signal.map(|s| Box::pin(s.fuse()));

loop {
tokio::select! {
img = rx.recv() => {
if let Some(img) = img {
f(img).await?;
} else {
break;
}
}
_ = signal_rx.changed() => {
self.close()?;
break;
}
_ = async { if let Some(ref mut s) = sig { s.as_mut().await } }, if sig.is_some() => {
self.close()?;
break;
}
else => break,
}
}

Ok(())
}

/// Sets up the GStreamer pipeline for capturing.
///
/// # Arguments
///
/// * `tx` - A channel sender for transmitting captured image frames.
///
/// # Returns
///
/// A Result indicating success or a StreamCaptureError.
fn setup_pipeline(
&self,
tx: &tokio::sync::mpsc::Sender<Image<u8, 3>>,
) -> Result<(), StreamCaptureError> {
let appsink = self.get_appsink()?;
self.set_appsink_callbacks(appsink, tx.clone());
Ok(())
}

/// Retrieves the AppSink element from the pipeline.
///
/// # Returns
///
/// A Result containing the AppSink or a StreamCaptureError.
fn get_appsink(&self) -> Result<gst_app::AppSink, StreamCaptureError> {
self.pipeline
let appsink = pipeline
.by_name("sink")
.ok_or_else(|| StreamCaptureError::GetElementByNameError)?
.dynamic_cast::<gst_app::AppSink>()
.map_err(StreamCaptureError::DowncastPipelineError)
}
.map_err(StreamCaptureError::DowncastPipelineError)?;

let last_frame = Arc::new(Mutex::new(None));

/// Sets up callbacks for the AppSink element.
///
/// # Arguments
///
/// * `appsink` - The AppSink element to set callbacks for.
/// * `tx` - A channel sender for transmitting captured image frames.
fn set_appsink_callbacks(
&self,
appsink: gst_app::AppSink,
tx: tokio::sync::mpsc::Sender<Image<u8, 3>>,
) {
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| match Self::extract_image_frame(sink) {
Ok(frame) => {
if tx.blocking_send(frame).is_err() {
Err(gst::FlowError::Error)
} else {
.new_sample({
let last_frame = last_frame.clone();
move |sink| match Self::extract_image_frame(sink) {
Ok(frame) => {
// SAFETY: we have a lock on the last_frame
*last_frame.lock().unwrap() = Some(frame);
Ok(gst::FlowSuccess::Ok)
}
Err(_) => Err(gst::FlowError::Error),
}
Err(_) => Err(gst::FlowError::Error),
})
.build(),
);

Ok(Self {
pipeline,
last_frame,
running: false,
handle: None,
})
}

/// Spawns a task to handle GStreamer bus messages.
///
/// This method creates an asynchronous task that listens for messages on the GStreamer bus.
/// It handles End of Stream (EOS) and Error messages, and signals any errors to the main loop.
///
/// # Arguments
///
/// * `bus` - The GStreamer bus to listen for messages on.
/// * `signal_tx` - A shared sender to signal when an error occurs or the stream ends.
///
/// # Notes
///
/// This method spawns a Tokio task that runs until an EOS or Error message is received,
/// or until the bus is closed.
fn spawn_bus_handler(&self, bus: gst::Bus, signal_tx: Arc<tokio::sync::watch::Sender<()>>) {
let mut messages = bus.stream();
tokio::spawn(async move {
while let Some(msg) = messages.next().await {
/// Starts the stream capture pipeline and processes messages on the bus.
pub fn start(&mut self) -> Result<(), StreamCaptureError> {
self.pipeline.set_state(gst::State::Playing)?;
self.running = true;

let bus = self
.pipeline
.bus()
.ok_or_else(|| StreamCaptureError::BusError)?;

let handle = std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
println!("EOS");
break;
}
MessageView::Error(err) => {
Expand All @@ -224,26 +85,45 @@ impl StreamCapture {
err.error(),
err.debug()
);
let _ = signal_tx.send(());
break;
}
_ => (),
}
}
});

self.handle = Some(handle);

Ok(())
}

/// Closes the stream capture pipeline.
/// Grabs the last captured image frame.
///
/// # Returns
///
/// A Result indicating success or a StreamCaptureError.
pub fn close(&self) -> Result<(), StreamCaptureError> {
/// An Option containing the last captured Image or None if no image has been captured yet.
pub fn grab(&self) -> Result<Option<Image<u8, 3>>, StreamCaptureError> {
if !self.running {
return Err(StreamCaptureError::PipelineNotRunning);
}

// SAFETY: we have a lock on the last_frame
Ok(self.last_frame.lock().unwrap().take())
}

/// Closes the stream capture pipeline.
pub fn close(&mut self) -> Result<(), StreamCaptureError> {
let res = self.pipeline.send_event(gst::event::Eos::new());
if !res {
return Err(StreamCaptureError::SendEosError);
}

if let Some(handle) = self.handle.take() {
handle.join().expect("Failed to join thread");
}

self.pipeline.set_state(gst::State::Null)?;
self.running = false;
Ok(())
}

Expand Down
4 changes: 4 additions & 0 deletions crates/kornia-io/src/stream/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ pub enum StreamCaptureError {
/// An error occurred during checking the image format.
#[error("Invalid image format: {0}")]
InvalidImageFormat(String),

/// An error occurred when the pipeline is not running.
#[error("Pipeline is not running")]
PipelineNotRunning,
}

// ensure that can be sent over threads
Expand Down
10 changes: 5 additions & 5 deletions crates/kornia-io/src/stream/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ impl VideoWriter {
Ok(())
}

/// Stop the video writer.
/// Close the video writer.
///
/// Set the pipeline to null and join the thread.
///
pub fn stop(&mut self) -> Result<(), StreamCaptureError> {
pub fn close(&mut self) -> Result<(), StreamCaptureError> {
// send end of stream to the appsrc
self.appsrc.end_of_stream()?;

Expand Down Expand Up @@ -217,7 +217,7 @@ impl VideoWriter {
impl Drop for VideoWriter {
fn drop(&mut self) {
if self.handle.is_some() {
self.stop().expect("Failed to stop video writer");
self.close().expect("Failed to close video writer");
}
}
}
Expand Down Expand Up @@ -246,7 +246,7 @@ mod tests {

let img = Image::<u8, 3>::new(size, vec![0; size.width * size.height * 3])?;
writer.write(&img)?;
writer.stop()?;
writer.close()?;

assert!(file_path.exists(), "File does not exist: {:?}", file_path);

Expand All @@ -272,7 +272,7 @@ mod tests {

let img = Image::<u8, 1>::new(size, vec![0; size.width * size.height])?;
writer.write(&img)?;
writer.stop()?;
writer.close()?;

assert!(file_path.exists(), "File does not exist: {:?}", file_path);

Expand Down
1 change: 0 additions & 1 deletion examples/rtspcam/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@ clap = { version = "4.5.4", features = ["derive"] }
ctrlc = "3.4.4"
kornia = { workspace = true, features = ["gstreamer"] }
rerun = "0.18"
tokio = { version = "1" }
Loading
Loading