From df6a0f66acb757d8b6e973716d6f06bcac5751a8 Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 2 Jan 2025 23:53:48 +0100 Subject: [PATCH 1/3] simplify stream capture and remove thread --- crates/kornia-io/src/stream/capture.rs | 83 +++++--------------------- 1 file changed, 14 insertions(+), 69 deletions(-) diff --git a/crates/kornia-io/src/stream/capture.rs b/crates/kornia-io/src/stream/capture.rs index cdb16787..91508454 100644 --- a/crates/kornia-io/src/stream/capture.rs +++ b/crates/kornia-io/src/stream/capture.rs @@ -1,15 +1,11 @@ -use std::sync::{Arc, Mutex}; - use crate::stream::error::StreamCaptureError; use gst::prelude::*; -use kornia_image::{Image, ImageSize}; +use kornia_image::Image; /// Represents a stream capture pipeline using GStreamer. pub struct StreamCapture { pipeline: gst::Pipeline, - last_frame: Arc>>>, - running: bool, - handle: Option>, + appsink: gst_app::AppSink, } impl StreamCapture { @@ -35,64 +31,20 @@ impl StreamCapture { .dynamic_cast::() .map_err(StreamCaptureError::DowncastPipelineError)?; - let last_frame = Arc::new(Mutex::new(None)); - - appsink.set_callbacks( - gst_app::AppSinkCallbacks::builder() - .new_sample({ - let last_frame = last_frame.clone(); - move |sink| match Self::extract_image_frame(sink) { - Ok(frame) => { - // SAFETY: we have a lock on the last_frame - *last_frame.lock().unwrap() = Some(frame); - Ok(gst::FlowSuccess::Ok) - } - Err(_) => Err(gst::FlowError::Error), - } - }) - .build(), - ); - - Ok(Self { - pipeline, - last_frame, - running: false, - handle: None, - }) + Ok(Self { pipeline, appsink }) } /// Starts the stream capture pipeline and processes messages on the bus. pub fn start(&mut self) -> Result<(), StreamCaptureError> { self.pipeline.set_state(gst::State::Playing)?; - self.running = true; let bus = self .pipeline .bus() .ok_or_else(|| StreamCaptureError::BusError)?; - let handle = std::thread::spawn(move || { - for msg in bus.iter_timed(gst::ClockTime::NONE) { - use gst::MessageView; - match msg.view() { - MessageView::Eos(..) => { - break; - } - MessageView::Error(err) => { - eprintln!( - "Error from {:?}: {} ({:?})", - msg.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - break; - } - _ => (), - } - } - }); - - self.handle = Some(handle); + // handle bus messages + bus.set_sync_handler(|_bus, _msg| gst::BusSyncReply::Pass); Ok(()) } @@ -103,12 +55,11 @@ impl StreamCapture { /// /// An Option containing the last captured Image or None if no image has been captured yet. pub fn grab(&self) -> Result>, StreamCaptureError> { - if !self.running { - return Err(StreamCaptureError::PipelineNotRunning); - } - - // SAFETY: we have a lock on the last_frame - Ok(self.last_frame.lock().unwrap().take()) + // NOTE: this is a blocking call with a timeout of 1ns, so it should not block the thread at all + self.appsink + .try_pull_sample(gst::ClockTime::from_nseconds(1)) + .map(Self::extract_image_frame) + .transpose() } /// Closes the stream capture pipeline. @@ -118,12 +69,8 @@ impl StreamCapture { return Err(StreamCaptureError::SendEosError); } - if let Some(handle) = self.handle.take() { - handle.join().expect("Failed to join thread"); - } - self.pipeline.set_state(gst::State::Null)?; - self.running = false; + Ok(()) } @@ -131,14 +78,12 @@ impl StreamCapture { /// /// # Arguments /// - /// * `appsink` - The AppSink to extract the frame from. + /// * `sample` - The sample to extract the frame from. /// /// # Returns /// /// A Result containing the extracted Image or a StreamCaptureError. - fn extract_image_frame(appsink: &gst_app::AppSink) -> Result, StreamCaptureError> { - let sample = appsink.pull_sample()?; - + fn extract_image_frame(sample: gst::Sample) -> Result, StreamCaptureError> { let caps = sample .caps() .ok_or_else(|| StreamCaptureError::GetCapsError)?; @@ -160,7 +105,7 @@ impl StreamCapture { .ok_or_else(|| StreamCaptureError::GetBufferError)? .map_readable()?; - Image::::new(ImageSize { width, height }, buffer.as_slice().to_vec()) + Image::::new([width, height].into(), buffer.to_owned()) .map_err(|_| StreamCaptureError::CreateImageFrameError) } } From 934498d29e6c3c5f1f7859e3ab80460b678ab509 Mon Sep 17 00:00:00 2001 From: edgar Date: Thu, 2 Jan 2025 23:56:12 +0100 Subject: [PATCH 2/3] remove mutable self methods --- crates/kornia-io/src/stream/capture.rs | 4 ++-- examples/features/src/main.rs | 2 +- examples/filters/src/main.rs | 2 +- examples/rtspcam/src/main.rs | 2 +- examples/video_write/src/main.rs | 2 +- examples/webcam/src/main.rs | 2 +- kornia-viz/src/bin/app.rs | 2 +- 7 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/kornia-io/src/stream/capture.rs b/crates/kornia-io/src/stream/capture.rs index 91508454..c0cf3c70 100644 --- a/crates/kornia-io/src/stream/capture.rs +++ b/crates/kornia-io/src/stream/capture.rs @@ -35,7 +35,7 @@ impl StreamCapture { } /// Starts the stream capture pipeline and processes messages on the bus. - pub fn start(&mut self) -> Result<(), StreamCaptureError> { + pub fn start(&self) -> Result<(), StreamCaptureError> { self.pipeline.set_state(gst::State::Playing)?; let bus = self @@ -63,7 +63,7 @@ impl StreamCapture { } /// Closes the stream capture pipeline. - pub fn close(&mut self) -> Result<(), StreamCaptureError> { + pub fn close(&self) -> Result<(), StreamCaptureError> { let res = self.pipeline.send_event(gst::event::Eos::new()); if !res { return Err(StreamCaptureError::SendEosError); diff --git a/examples/features/src/main.rs b/examples/features/src/main.rs index 2e13e709..d5310e69 100644 --- a/examples/features/src/main.rs +++ b/examples/features/src/main.rs @@ -18,7 +18,7 @@ fn main() -> Result<(), Box> { // create a webcam capture object with camera id 0 // and force the image size to 640x480 - let mut webcam = V4L2CameraConfig::new().with_size(size).build()?; + let webcam = V4L2CameraConfig::new().with_size(size).build()?; // start the background pipeline webcam.start()?; diff --git a/examples/filters/src/main.rs b/examples/filters/src/main.rs index eae37c2d..dd8b918a 100644 --- a/examples/filters/src/main.rs +++ b/examples/filters/src/main.rs @@ -45,7 +45,7 @@ fn main() -> Result<(), Box> { // create a webcam capture object with camera id 0 // and force the image size to 640x480 - let mut webcam = V4L2CameraConfig::new().with_size(size).build()?; + let webcam = V4L2CameraConfig::new().with_size(size).build()?; // start the background pipeline webcam.start()?; diff --git a/examples/rtspcam/src/main.rs b/examples/rtspcam/src/main.rs index 9c2db100..c5e2612a 100644 --- a/examples/rtspcam/src/main.rs +++ b/examples/rtspcam/src/main.rs @@ -34,7 +34,7 @@ fn main() -> Result<(), Box> { let rec = rerun::RecordingStreamBuilder::new("Kornia Rtsp Stream Capture App").spawn()?; //// create a stream capture object - let mut capture = RTSPCameraConfig::new() + let capture = RTSPCameraConfig::new() .with_settings( &args.username, &args.password, diff --git a/examples/video_write/src/main.rs b/examples/video_write/src/main.rs index 90f2ef95..651fca75 100644 --- a/examples/video_write/src/main.rs +++ b/examples/video_write/src/main.rs @@ -46,7 +46,7 @@ fn main() -> Result<(), Box> { // create a webcam capture object with camera id 0 // and force the image size to 640x480 - let mut webcam = V4L2CameraConfig::new() + let webcam = V4L2CameraConfig::new() .with_camera_id(args.camera_id) .with_fps(args.fps as u32) .with_size(frame_size) diff --git a/examples/webcam/src/main.rs b/examples/webcam/src/main.rs index ff3af0c6..1c2b801b 100644 --- a/examples/webcam/src/main.rs +++ b/examples/webcam/src/main.rs @@ -30,7 +30,7 @@ fn main() -> Result<(), Box> { // create a webcam capture object with camera id 0 // and force the image size to 640x480 - let mut webcam = V4L2CameraConfig::new() + let webcam = V4L2CameraConfig::new() .with_camera_id(args.camera_id) .with_fps(args.fps) .with_size(ImageSize { diff --git a/kornia-viz/src/bin/app.rs b/kornia-viz/src/bin/app.rs index a1a83621..54491725 100644 --- a/kornia-viz/src/bin/app.rs +++ b/kornia-viz/src/bin/app.rs @@ -27,7 +27,7 @@ struct KorniaApp { impl Default for KorniaApp { fn default() -> Self { - let mut capture = kornia::io::stream::V4L2CameraConfig::new() + let capture = kornia::io::stream::V4L2CameraConfig::new() .with_camera_id(0) .build() .unwrap(); From fa62fefe9ca43f1798f71bbfd8b5a60ac4d8ae96 Mon Sep 17 00:00:00 2001 From: edgar Date: Fri, 3 Jan 2025 14:11:50 +0100 Subject: [PATCH 3/3] try_pull_sample timeout ZERO --- crates/kornia-io/src/stream/capture.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/kornia-io/src/stream/capture.rs b/crates/kornia-io/src/stream/capture.rs index c0cf3c70..93ebc317 100644 --- a/crates/kornia-io/src/stream/capture.rs +++ b/crates/kornia-io/src/stream/capture.rs @@ -55,9 +55,8 @@ impl StreamCapture { /// /// An Option containing the last captured Image or None if no image has been captured yet. pub fn grab(&self) -> Result>, StreamCaptureError> { - // NOTE: this is a blocking call with a timeout of 1ns, so it should not block the thread at all self.appsink - .try_pull_sample(gst::ClockTime::from_nseconds(1)) + .try_pull_sample(gst::ClockTime::ZERO) .map(Self::extract_image_frame) .transpose() }