Skip to content

Commit

Permalink
feat: address review comments
Browse files Browse the repository at this point in the history
1. account for binary message type
2. remove unused and redundant broadcast_to_all method
3. gracefully close websocket connection if a Close message is recieved
  • Loading branch information
Extheoisah committed Sep 12, 2024
1 parent ae3f14f commit 42d1b9f
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 64 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,3 @@ futures-util = "0.3.30"
rand = "0.8.5"
actix-ws = "0.3.0"
tokio = { version = "1.35.0", features = ["full", "macros"] }
bytestring = "1.3.1"
30 changes: 19 additions & 11 deletions src/app/websockets/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub async fn websocket_handler(

log::info!("WebSocket connected: Connection ID {:?}", conn_id);

let session = Some(session);
let session_clone = session.clone();
actix_web::rt::spawn(async move {
let mut last_heartbeat = Instant::now();
Expand Down Expand Up @@ -61,8 +62,10 @@ pub async fn websocket_handler(
log::info!("Client timeout: Connection ID {:?}", conn_id);
break;
}
if session.ping(b"").await.is_err() {
break;
if let Some(session) = session.as_mut() {
if session.ping(b"").await.is_err() {
break;
}
}
}
else => break,
Expand All @@ -78,17 +81,19 @@ pub async fn websocket_handler(

async fn handle_message(
msg: Message,
session: &mut Session,
session: &mut Option<Session>,
manager_handle: &WebSocketManagerHandle,
conn_id: ConnId,
session_token: &SessionToken,
) -> Result<(), Error> {
match msg {
Message::Ping(bytes) => {
session
.pong(&bytes)
.await
.map_err(|e| Error::from(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
if let Some(session) = session.as_mut() {
session
.pong(&bytes)
.await
.map_err(|e| Error::from(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
}
}
Message::Pong(_) => {
manager_handle.update_heartbeat(conn_id).await?;
Expand All @@ -109,10 +114,13 @@ async fn handle_message(
}
Message::Close(reason) => {
log::info!("Close message received: {:?}", reason);
return Err(Error::from(std::io::Error::new(
std::io::ErrorKind::Other,
"WebSocket closed",
)));
manager_handle.disconnect(conn_id).await?;
if let Some(sess) = session.take() {
sess.close(reason)
.await
.map_err(|e| Error::from(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
}
return Ok(());
}
_ => {}
}
Expand Down
106 changes: 64 additions & 42 deletions src/app/websockets/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, io, sync::Arc, time::Instant};
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;

use crate::shared::utils::{clone_websocket_message, websocket_message_to_bytestring};
use crate::shared::utils::clone_websocket_message;

pub type ConnId = Uuid;
pub type SessionToken = String;
Expand Down Expand Up @@ -101,15 +101,28 @@ impl WebSocketManager {
conn_id
);
if let Some(conn) = connections.get(&conn_id) {
let cloned_message = websocket_message_to_bytestring(&message);
log::info!(
"Sending message {:?} to connection: {:?}",
cloned_message,
conn_id
);
log::info!("Sending message {:?} to connection: {:?}", message, conn_id);
let mut session = conn.session.clone();
if let Err(e) = session.text(cloned_message.to_string()).await {
log::error!("Failed to send message to connection {}: {:?}", conn_id, e);
match message {
Message::Text(ref text) => {
session.text(text.clone()).await.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed to send message: {:?}", e),
)
})?;
}
Message::Binary(ref binary) => {
session.binary(binary.clone()).await.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed to send message: {:?}", e),
)
})?;
}
_ => {
log::error!("Received unsupported message type: {:?}", message);
}
}
}
}
Expand All @@ -131,15 +144,27 @@ impl WebSocketManager {
) -> io::Result<()> {
let mut connections = connections.write().await;
if let Some(conn) = connections.get_mut(&conn_id) {
conn.session
.text(websocket_message_to_bytestring(&message).to_string())
.await
.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed to send message: {:?}", e),
)
})?;
match message {
Message::Text(text) => {
conn.session.text(text).await.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed to send message: {:?}", e),
)
})?;
}
Message::Binary(binary) => {
conn.session.binary(binary).await.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed to send message: {:?}", e),
)
})?;
}
_ => {
log::error!("Received unsupported message type: {:?}", message);
}
}
}
Ok(())
}
Expand All @@ -157,32 +182,33 @@ impl WebSocketManager {
if let Some(conn) = connections.get(&conn_id) {
let cloned_message = clone_websocket_message(&message);
let mut session = conn.session.clone();
if let Err(e) = session
.text(websocket_message_to_bytestring(&cloned_message).to_string())
.await
{
log::error!("Failed to send message to connection {}: {:?}", conn_id, e);

match cloned_message {
Message::Text(text) => {
session.text(text).await.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed to send message: {:?}", e),
)
})?;
}
Message::Binary(binary) => {
session.binary(binary).await.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed to send message: {:?}", e),
)
})?;
}
_ => {
log::error!("Received unsupported message type: {:?}", cloned_message);
}
}
}
}
}
Ok(())
}

pub async fn broadcast_to_all(&self, message: Message) -> io::Result<()> {
let connections = self.connections.read().await;
for (conn_id, conn) in connections.iter() {
let cloned_message = clone_websocket_message(&message);
let mut session = conn.session.clone();
if let Err(e) = session
.text(websocket_message_to_bytestring(&cloned_message).to_string())
.await
{
log::error!("Failed to send message to connection {}: {:?}", conn_id, e);
}
}
Ok(())
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -233,8 +259,4 @@ impl WebSocketManagerHandle {
.broadcast_to_session(session_token, message)
.await
}

pub async fn broadcast_to_all(&self, message: Message) -> io::Result<()> {
self.manager.broadcast_to_all(message).await
}
}
7 changes: 6 additions & 1 deletion src/service/repository/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ impl Repository {
}
Ok(repo)
}
(Some(_), Some(_), None) | (Some(_), None, Some(_)) | (None, Some(_), Some(_)) => Err(
anyhow::anyhow!("Only one of id, user_id, or repo_url should be provided"),
),
(Some(_), Some(_), Some(_)) => Err(anyhow::anyhow!(
"Cannot provide id, user_id, and repo_url simultaneously"
)),
(None, None, None) => Err(anyhow::anyhow!("No input provided")),
_ => Err(anyhow::anyhow!("Invalid input")),
}
}
}
8 changes: 0 additions & 8 deletions src/shared/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use actix_ws::{Item, Message};
use bytestring::ByteString;
use rand::{distributions::Alphanumeric, Rng};
use std::str::FromStr;
use uuid::Uuid;
Expand All @@ -18,13 +17,6 @@ pub fn generate_session_token() -> String {
format!("hxckr_{}", random_string)
}

pub fn websocket_message_to_bytestring(msg: &Message) -> ByteString {
match msg {
Message::Text(text) => text.to_string().into(),
_ => "".to_string().into(),
}
}

pub fn clone_websocket_message(msg: &Message) -> Message {
match msg {
Message::Text(text) => Message::Text(text.clone()),
Expand Down

0 comments on commit 42d1b9f

Please sign in to comment.