Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify stream capture and remove thread #213

Merged
merged 3 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 15 additions & 71 deletions crates/kornia-io/src/stream/capture.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::sync::{Arc, Mutex};

use crate::stream::error::StreamCaptureError;
use gst::prelude::*;
use kornia_image::{Image, ImageSize};
use kornia_image::Image;

/// Represents a stream capture pipeline using GStreamer.
pub struct StreamCapture {
pipeline: gst::Pipeline,
last_frame: Arc<Mutex<Option<Image<u8, 3>>>>,
running: bool,
handle: Option<std::thread::JoinHandle<()>>,
appsink: gst_app::AppSink,
}

impl StreamCapture {
Expand All @@ -35,64 +31,20 @@ impl StreamCapture {
.dynamic_cast::<gst_app::AppSink>()
.map_err(StreamCaptureError::DowncastPipelineError)?;

let last_frame = Arc::new(Mutex::new(None));

appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let last_frame = last_frame.clone();
move |sink| match Self::extract_image_frame(sink) {
Ok(frame) => {
// SAFETY: we have a lock on the last_frame
*last_frame.lock().unwrap() = Some(frame);
Ok(gst::FlowSuccess::Ok)
}
Err(_) => Err(gst::FlowError::Error),
}
})
.build(),
);

Ok(Self {
pipeline,
last_frame,
running: false,
handle: None,
})
Ok(Self { pipeline, appsink })
}

/// Starts the stream capture pipeline and processes messages on the bus.
pub fn start(&mut self) -> Result<(), StreamCaptureError> {
pub fn start(&self) -> Result<(), StreamCaptureError> {
self.pipeline.set_state(gst::State::Playing)?;
self.running = true;

let bus = self
.pipeline
.bus()
.ok_or_else(|| StreamCaptureError::BusError)?;

let handle = std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
break;
}
MessageView::Error(err) => {
eprintln!(
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
break;
}
_ => (),
}
}
});

self.handle = Some(handle);
// handle bus messages
bus.set_sync_handler(|_bus, _msg| gst::BusSyncReply::Pass);

Ok(())
}
Expand All @@ -103,42 +55,34 @@ impl StreamCapture {
///
/// An Option containing the last captured Image or None if no image has been captured yet.
pub fn grab(&self) -> Result<Option<Image<u8, 3>>, StreamCaptureError> {
if !self.running {
return Err(StreamCaptureError::PipelineNotRunning);
}

// SAFETY: we have a lock on the last_frame
Ok(self.last_frame.lock().unwrap().take())
self.appsink
.try_pull_sample(gst::ClockTime::ZERO)
.map(Self::extract_image_frame)
.transpose()
}

/// Closes the stream capture pipeline.
pub fn close(&mut self) -> Result<(), StreamCaptureError> {
pub fn close(&self) -> Result<(), StreamCaptureError> {
let res = self.pipeline.send_event(gst::event::Eos::new());
if !res {
return Err(StreamCaptureError::SendEosError);
}

if let Some(handle) = self.handle.take() {
handle.join().expect("Failed to join thread");
}

self.pipeline.set_state(gst::State::Null)?;
self.running = false;

Ok(())
}

/// Extracts an image frame from the AppSink.
///
/// # Arguments
///
/// * `appsink` - The AppSink to extract the frame from.
/// * `sample` - The sample to extract the frame from.
///
/// # Returns
///
/// A Result containing the extracted Image or a StreamCaptureError.
fn extract_image_frame(appsink: &gst_app::AppSink) -> Result<Image<u8, 3>, StreamCaptureError> {
let sample = appsink.pull_sample()?;

fn extract_image_frame(sample: gst::Sample) -> Result<Image<u8, 3>, StreamCaptureError> {
let caps = sample
.caps()
.ok_or_else(|| StreamCaptureError::GetCapsError)?;
Expand All @@ -160,7 +104,7 @@ impl StreamCapture {
.ok_or_else(|| StreamCaptureError::GetBufferError)?
.map_readable()?;

Image::<u8, 3>::new(ImageSize { width, height }, buffer.as_slice().to_vec())
Image::<u8, 3>::new([width, height].into(), buffer.to_owned())
.map_err(|_| StreamCaptureError::CreateImageFrameError)
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/features/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let mut webcam = V4L2CameraConfig::new().with_size(size).build()?;
let webcam = V4L2CameraConfig::new().with_size(size).build()?;

// start the background pipeline
webcam.start()?;
Expand Down
2 changes: 1 addition & 1 deletion examples/filters/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let mut webcam = V4L2CameraConfig::new().with_size(size).build()?;
let webcam = V4L2CameraConfig::new().with_size(size).build()?;

// start the background pipeline
webcam.start()?;
Expand Down
2 changes: 1 addition & 1 deletion examples/rtspcam/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let rec = rerun::RecordingStreamBuilder::new("Kornia Rtsp Stream Capture App").spawn()?;

//// create a stream capture object
let mut capture = RTSPCameraConfig::new()
let capture = RTSPCameraConfig::new()
.with_settings(
&args.username,
&args.password,
Expand Down
2 changes: 1 addition & 1 deletion examples/video_write/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let mut webcam = V4L2CameraConfig::new()
let webcam = V4L2CameraConfig::new()
.with_camera_id(args.camera_id)
.with_fps(args.fps as u32)
.with_size(frame_size)
Expand Down
2 changes: 1 addition & 1 deletion examples/webcam/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let mut webcam = V4L2CameraConfig::new()
let webcam = V4L2CameraConfig::new()
.with_camera_id(args.camera_id)
.with_fps(args.fps)
.with_size(ImageSize {
Expand Down
2 changes: 1 addition & 1 deletion kornia-viz/src/bin/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct KorniaApp {

impl Default for KorniaApp {
fn default() -> Self {
let mut capture = kornia::io::stream::V4L2CameraConfig::new()
let capture = kornia::io::stream::V4L2CameraConfig::new()
.with_camera_id(0)
.build()
.unwrap();
Expand Down
Loading