Skip to content

Commit

Permalink
Merge pull request #55 from Telecominfraproject/fix/fix_deadlock
Browse files Browse the repository at this point in the history
CGW: Fix deadlock occuring in msg queue
  • Loading branch information
Cahb authored Jun 28, 2024
2 parents 408ee25 + 0058c82 commit 16e63e9
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 38 deletions.
49 changes: 32 additions & 17 deletions src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl CGWConnectionProcessor {
}
}

debug!("Done Parse Connect Event");
debug!("Done Parse Connect Event {}", evt.serial.to_hex_string());

let mut caps: CGWDeviceCapabilities = Default::default();
match evt.evt_type {
Expand All @@ -177,34 +177,20 @@ impl CGWConnectionProcessor {
self.serial = evt.serial;
let device_type = CGWDeviceType::from_str(caps.platform.as_str())?;

// Check if device queue already exist
// If yes - it could mean that we have device reconnection event
// The possible reconnect reason could be: FW Upgrade or Factory reset
// Need to make sure queue is unlocked to process requests
// If no - create new message queue for device
{
let queue_lock = CGW_MESSAGES_QUEUE.read().await;
if queue_lock.check_messages_queue_exists(&evt.serial).await {
queue_lock
.set_device_queue_state(&evt.serial, CGWUCentralMessagesQueueState::RxTx)
.await;
} else {
queue_lock.create_device_messages_queue(&evt.serial).await;
}
}

// TODO: we accepted tls stream and split the WS into RX TX part,
// now we have to ASK cgw_connection_server's permission whether
// we can proceed on with this underlying connection.
// cgw_connection_server has an authorative decision whether
// we can proceed.
debug!("Sending ACK req for {}", self.serial);
let (mbox_tx, mut mbox_rx) = unbounded_channel::<CGWConnectionProcessorReqMsg>();
let msg = CGWConnectionServerReqMsg::AddNewConnection(evt.serial, caps, mbox_tx);
self.cgw_server
.enqueue_mbox_message_to_cgw_server(msg)
.await;

let ack = mbox_rx.recv().await;
debug!("GOT ACK resp for {}", self.serial);
if let Some(m) = ack {
match m {
CGWConnectionProcessorReqMsg::AddNewConnectionAck => {
Expand All @@ -225,6 +211,35 @@ impl CGWConnectionProcessor {
return Err(Error::ConnectionProcessor("Websocker connection declined"));
}

// Remove device from disconnected device list
// Only connection processor can <know> that connection's established,
// however ConnServer knows about <disconnects>, hence
// we handle connect here, while it's up to ConnServer to <remove>
// connection (add it to <disconnected> list)
// NOTE: this most like also would require a proper DISCONNECT_ACK
// synchronization between processor / server, as there could be still
// race conditions in case if duplicate connection occurs, for example.
{
let queue_lock = CGW_MESSAGES_QUEUE.read().await;
queue_lock.device_connected(&self.serial).await;
}

// Check if device queue already exist
// If yes - it could mean that we have device reconnection event
// The possible reconnect reason could be: FW Upgrade or Factory reset
// Need to make sure queue is unlocked to process requests
// If no - create new message queue for device
{
let queue_lock = CGW_MESSAGES_QUEUE.read().await;
if queue_lock.check_messages_queue_exists(&evt.serial).await {
queue_lock
.set_device_queue_state(&evt.serial, CGWUCentralMessagesQueueState::RxTx)
.await;
} else {
queue_lock.create_device_messages_queue(&evt.serial).await;
}
}

self.process_connection(stream, sink, mbox_rx, device_type)
.await;

Expand Down
24 changes: 9 additions & 15 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ impl CGWConnectionServer {
.cgw_remote_discovery
.sync_device_to_gid_cache(server.devices_cache.clone())
.await;
server.devices_cache.write().await.dump_devices_cache();

tokio::spawn(async move {
CGWMetrics::get_ref()
Expand Down Expand Up @@ -981,8 +980,10 @@ impl CGWConnectionServer {
CGWUCentralMessagesQueueItem::new(parsed_cmd, msg);

// 2. Add message to queue
let queue_lock = CGW_MESSAGES_QUEUE.read().await;
let _ = queue_lock.push_device_message(device_mac, queue_msg).await;
{
let queue_lock = CGW_MESSAGES_QUEUE.read().await;
let _ = queue_lock.push_device_message(device_mac, queue_msg).await;
}
} else {
error!("Failed to parse UCentral command");
}
Expand Down Expand Up @@ -1098,9 +1099,6 @@ impl CGWConnectionServer {
conn_processor_mbox_tx,
) = msg
{
// Remove device from disconnected device list
let queue_lock = CGW_MESSAGES_QUEUE.read().await;
queue_lock.device_connected(&device_mac).await;

// if connection is unique: simply insert new conn
//
Expand Down Expand Up @@ -1248,9 +1246,6 @@ impl CGWConnectionServer {

let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.insert_device(&device_mac).await;
topo_map.debug_dump_map().await;

devices_cache.dump_devices_cache();

connmap_w_lock.insert(device_mac, conn_processor_mbox_tx);

Expand All @@ -1260,30 +1255,29 @@ impl CGWConnectionServer {
let _ = conn_processor_mbox_tx_clone.send(msg);
});
} else if let CGWConnectionServerReqMsg::ConnectionClosed(device_mac) = msg {
// Insert device to disconnected device list
{
let queue_lock = CGW_MESSAGES_QUEUE.read().await;
queue_lock.device_disconnected(&device_mac).await;
}
info!(
"connmap: removed {} serial from connmap, new num_of_connections:{}",
device_mac,
connmap_w_lock.len() - 1
);
connmap_w_lock.remove(&device_mac);

// Insert device to disconnected device list
let queue_lock = CGW_MESSAGES_QUEUE.read().await;
queue_lock.device_disconnected(&device_mac).await;

let mut devices_cache = self.devices_cache.write().await;
if let Some(device) = devices_cache.get_device(&device_mac) {
if device.get_device_remains_in_db() {
device.set_device_state(CGWDeviceState::CGWDeviceDisconnected);
} else {
devices_cache.del_device(&device_mac);
}
devices_cache.dump_devices_cache();
}

let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.remove_device(&device_mac).await;
topo_map.debug_dump_map().await;

CGWMetrics::get_ref().change_counter(
CGWMetricsCounterType::ConnectionsNum,
Expand Down
4 changes: 0 additions & 4 deletions src/cgw_remote_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,6 @@ impl CGWRemoteDiscovery {
device_cache.del_device(key);
}

device_cache.dump_devices_cache();

Ok(())
}

Expand Down Expand Up @@ -681,7 +679,6 @@ impl CGWRemoteDiscovery {
),
);
}
devices_cache.dump_devices_cache();
}
}
Err(_) => {
Expand Down Expand Up @@ -734,7 +731,6 @@ impl CGWRemoteDiscovery {
} else {
devices_cache.del_device(&device_mac);
}
devices_cache.dump_devices_cache();
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/cgw_ucentral_messages_queue_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,13 @@ impl CGWUCentralMessagesQueueManager {
) -> Option<CGWUCentralMessagesQueueItem> {
let ret_msg: CGWUCentralMessagesQueueItem;
let default_msg = CGWUCentralCommand::default();
let container_lock = self.queue.read().await;

if self.get_device_messages_queue_len(device_mac).await == 0 {
return None;
}

let container_lock = self.queue.read().await;

let mut device_msg_queue = container_lock.get(device_mac)?.write().await;
let reboot_msg = device_msg_queue
.get_item(MESSAGE_QUEUE_REBOOT_MSG_INDEX)?
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ async fn server_loop(app_core: Arc<AppCore>) -> Result<()> {
}
};

info!("Started WSS server.");
info!("ACK conn: {}", conn_idx);

app_core_clone.conn_ack_runtime_handle.spawn(async move {
cgw_server_clone
Expand Down

0 comments on commit 16e63e9

Please sign in to comment.