Skip to content

Commit

Permalink
unify frame socket to cap-media
Browse files Browse the repository at this point in the history
  • Loading branch information
Brendonovich committed Dec 16, 2024
1 parent 210359d commit 1585a75
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 268 deletions.
22 changes: 14 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apps/desktop/src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ cap-media = { path = "../../../crates/media" }
cap-flags = { path = "../../../crates/flags" }
cap-recording = { path = "../../../crates/recording" }
cap-export = { path = "../../../crates/export" }
flume.workspace = true

[target.'cfg(target_os = "macos")'.dependencies]
core-graphics = "0.24.0"
Expand Down
140 changes: 76 additions & 64 deletions apps/desktop/src-tauri/src/camera.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,87 @@
use std::sync::Arc;
use std::{pin::pin, sync::Arc, task::Poll};

use cap_media::feeds::CameraFrameReceiver;
use futures::{FutureExt, SinkExt, StreamExt};

// TODO: Possibly replace this with ffmpeg's network outputs in the pipeline somehow?
pub async fn create_camera_ws(frame_rx: CameraFrameReceiver) -> u16 {
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
response::IntoResponse,
routing::get,
};
use tokio::sync::Mutex;
// pub async fn create_camera_ws(frame_rx: CameraFrameReceiver) -> u16 {
// use axum::{
// extract::{
// ws::{Message, WebSocket, WebSocketUpgrade},
// State,
// },
// response::IntoResponse,
// routing::get,
// };
// use tokio::sync::Mutex;

type RouterState = Arc<Mutex<CameraFrameReceiver>>;
// type RouterState = Arc<Mutex<CameraFrameReceiver>>;

async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<RouterState>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, state))
}
// async fn ws_handler(
// ws: WebSocketUpgrade,
// State(state): State<RouterState>,
// ) -> impl IntoResponse {
// ws.on_upgrade(move |socket| handle_socket(socket, state))
// }

async fn handle_socket(mut socket: WebSocket, state: RouterState) {
let camera_rx = state.lock().await;
println!("socket connection established");
tracing::info!("Socket connection established");
let now = std::time::Instant::now();
// async fn handle_socket(socket: WebSocket, state: RouterState) {
// let camera_rx = state.lock().await;
// println!("socket connection established");
// tracing::info!("Socket connection established");
// let now = std::time::Instant::now();

loop {
tokio::select! {
_ = socket.recv() => {
tracing::info!("Received message from socket");
break;
}
incoming_frame = camera_rx.recv_async() => {
match incoming_frame {
Ok(data) => {
tracing::info!("Received frame from camera");
if let Err(e) = socket.send(Message::Binary(data)).await {
tracing::error!("Failed to send frame to socket: {:?}", e);
break;
}
},
Err(e) => {
tracing::warn!("Connection has been lost! Shutting down camera server: {:?}", e);
break;
},
}
}
}
}
// let (mut socket_sink, mut socket_stream) = socket.split();

let elapsed = now.elapsed();
println!("Websocket closing after {elapsed:.2?}");
tracing::info!("Websocket closing after {elapsed:.2?}");
}
// let mut stream = futures::stream::poll_fn(|cx| {
// if let Poll::Ready(_) = socket_stream.poll_next_unpin(cx) {
// tracing::info!("Received message from socket");
// return Poll::Ready(None);
// };

let router = axum::Router::new()
.route("/", get(ws_handler))
.with_state(Arc::new(Mutex::new(frame_rx)));
// camera_rx.recv_async().poll_unpin(cx).map(|v| Some(v))
// });

let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tracing::info!("WebSocket server listening on port {}", port);
tokio::spawn(async move {
axum::serve(listener, router.into_make_service())
.await
.unwrap();
});
// while let Some(incoming_frame) = stream.next().await {
// match incoming_frame {
// Ok(mut frame) => {
// frame
// .data
// .extend_from_slice(&(frame.width * 4).to_le_bytes());
// frame.data.extend_from_slice(&frame.height.to_le_bytes());
// frame.data.extend_from_slice(&frame.width.to_le_bytes());

port
}
// tracing::info!("Received frame from camera");
// if let Err(e) = socket_sink.send(Message::Binary(frame.data)).await {
// tracing::error!("Failed to send frame to socket: {:?}", e);
// break;
// }
// }
// Err(e) => {
// tracing::warn!(
// "Connection has been lost! Shutting down camera server: {:?}",
// e
// );
// break;
// }
// }
// }

// let elapsed = now.elapsed();
// println!("Websocket closing after {elapsed:.2?}");
// tracing::info!("Websocket closing after {elapsed:.2?}");
// }

// let router = axum::Router::new()
// .route("/", get(ws_handler))
// .with_state(Arc::new(Mutex::new(frame_rx)));

// let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
// let port = listener.local_addr().unwrap().port();
// tracing::info!("WebSocket server listening on port {}", port);
// tokio::spawn(async move {
// axum::serve(listener, router.into_make_service())
// .await
// .unwrap();
// });

// port
// }
11 changes: 5 additions & 6 deletions apps/desktop/src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ use auth::{AuthStore, AuthenticationInvalid};
use cap_editor::EditorState;
use cap_editor::{EditorInstance, FRAMES_WS_PATH};
use cap_media::feeds::{AudioInputFeed, AudioInputSamplesSender};
use cap_media::frame_ws::WSFrame;
use cap_media::sources::CaptureScreen;
use cap_media::{
feeds::{CameraFeed, CameraFrameSender},
sources::ScreenCaptureTarget,
};
use cap_media::{feeds::CameraFeed, sources::ScreenCaptureTarget};
use cap_project::{Content, ProjectConfiguration, RecordingMeta, SharingMeta};
use cap_recording::RecordingOptions;
use cap_rendering::ProjectRecordings;
Expand Down Expand Up @@ -68,7 +66,7 @@ use windows::{CapWindowId, ShowCapWindow};
pub struct App {
start_recording_options: RecordingOptions,
#[serde(skip)]
camera_tx: CameraFrameSender,
camera_tx: flume::Sender<WSFrame>,
camera_ws_port: u16,
#[serde(skip)]
camera_feed: Option<Arc<Mutex<CameraFeed>>>,
Expand Down Expand Up @@ -1882,7 +1880,8 @@ pub async fn run() {
.expect("Failed to export typescript bindings");

let (camera_tx, camera_rx) = CameraFeed::create_channel();
let camera_ws_port = camera::create_camera_ws(camera_rx.clone()).await;
// _shutdown needs to be kept alive to keep the camera ws running
let (camera_ws_port, _shutdown) = cap_media::frame_ws::create_frame_ws(camera_rx.clone()).await;

let (audio_input_tx, audio_input_rx) = AudioInputFeed::create_channel();

Expand Down
2 changes: 2 additions & 0 deletions apps/desktop/src/utils/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ export function createImageDataWS(
(strideArr[3] << 24)) /
4;

console.log({ width, height, stride });

const imageData = new ImageData(
clamped.slice(0, clamped.length - 12),
stride,
Expand Down
3 changes: 3 additions & 0 deletions crates/editor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ ffmpeg.workspace = true
specta.workspace = true
serde = { version = "1", features = ["derive"] }
sentry.workspace = true
futures = "0.3.31"
tracing = "0.1.41"
flume.workspace = true
Loading

1 comment on commit 1585a75

@vercel
Copy link

@vercel vercel bot commented on 1585a75 Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.