Skip to content

Commit

Permalink
Merge pull request #99 from Telecominfraproject/dev-infra-request-result
Browse files Browse the repository at this point in the history
Send request execution result to NB
  • Loading branch information
Cahb authored Nov 12, 2024
2 parents 1117a3a + 388871d commit 6c41c2f
Show file tree
Hide file tree
Showing 7 changed files with 485 additions and 156 deletions.
68 changes: 58 additions & 10 deletions src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
cgw_connection_server::{CGWConnectionServer, CGWConnectionServerReqMsg},
cgw_device::{CGWDeviceCapabilities, CGWDeviceType},
cgw_errors::{Error, Result},
cgw_nb_api_listener::cgw_construct_infra_request_result_msg,
cgw_ucentral_messages_queue_manager::{
CGWUCentralMessagesQueueItem, CGWUCentralMessagesQueueState, CGW_MESSAGES_QUEUE,
MESSAGE_TIMEOUT_DURATION,
Expand All @@ -19,6 +20,7 @@ use futures_util::{
stream::{SplitSink, SplitStream},
FutureExt, SinkExt, StreamExt,
};
use uuid::Uuid;

use std::{net::SocketAddr, str::FromStr, sync::Arc};
use tokio::{
Expand Down Expand Up @@ -270,8 +272,6 @@ impl CGWConnectionProcessor {
queue_lock
.set_device_queue_state(&evt.serial, CGWUCentralMessagesQueueState::RxTx)
.await;
} else {
queue_lock.create_device_messages_queue(&evt.serial).await;
}
}

Expand All @@ -285,6 +285,7 @@ impl CGWConnectionProcessor {
msg: std::result::Result<Message, tungstenite::error::Error>,
fsm_state: &mut CGWUCentralMessageProcessorState,
pending_req_id: u64,
pending_req_uuid: Uuid,
) -> Result<CGWConnectionState> {
// Make sure we always track the as accurate as possible the time
// of receiving of the event (where needed).
Expand Down Expand Up @@ -341,6 +342,18 @@ impl CGWConnectionProcessor {

*fsm_state = CGWUCentralMessageProcessorState::Idle;
debug!("Got reply event for pending request id: {pending_req_id}");
if let Ok(resp) = cgw_construct_infra_request_result_msg(
self.cgw_server.get_local_id(),
pending_req_uuid,
pending_req_id,
true,
None,
) {
self.cgw_server
.enqueue_mbox_message_from_cgw_to_nb_api(self.group_id, resp);
} else {
error!("Failed to construct rebalance_group message!");
}
} else if let CGWUCentralEventType::RealtimeEvent(_) = evt.evt_type {
if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
Expand Down Expand Up @@ -453,6 +466,7 @@ impl CGWConnectionProcessor {
}

let device_mac = self.serial;
let mut pending_req_uuid = Uuid::default();
let mut pending_req_id: u64 = 0;
let mut pending_req_type: CGWUCentralCommandType;
let mut fsm_state = CGWUCentralMessageProcessorState::Idle;
Expand Down Expand Up @@ -485,19 +499,23 @@ impl CGWConnectionProcessor {
if let Some(queue_msg) = queue_lock.dequeue_device_message(&device_mac).await {
// Get message from queue, start measure requet processing time
start_time = Instant::now();

pending_req_id = queue_msg.command.id;
pending_req_type = queue_msg.command.cmd_type.clone();
pending_req_uuid = queue_msg.uuid;

let timeout = match queue_msg.timeout {
Some(secs) => Duration::from_secs(secs),
None => MESSAGE_TIMEOUT_DURATION,
};

wakeup_reason = WakeupReason::MboxRx(Some(
CGWConnectionProcessorReqMsg::SinkRequestToDevice(queue_msg),
));

// Set new pending request timeout value
queue_lock
.set_device_last_req_info(
&device_mac,
pending_req_id,
MESSAGE_TIMEOUT_DURATION,
)
.set_device_last_req_info(&device_mac, pending_req_id, timeout)
.await;

debug!("Got pending request with id: {}", pending_req_id);
Expand Down Expand Up @@ -562,7 +580,7 @@ impl CGWConnectionProcessor {
}

// Doesn't matter if connection was closed or terminated
// Do message queue timeout tick and cleanup queue dut to timeout\
// Do message queue timeout tick and cleanup queue due to timeout
// Or decrease timer value - on connection termination - background task
// is responsible to cleanup queue
if fsm_state == CGWUCentralMessageProcessorState::ResultPending {
Expand All @@ -574,7 +592,25 @@ impl CGWConnectionProcessor {
.await
{
let queue_lock = CGW_MESSAGES_QUEUE.read().await;
queue_lock.clear_device_message_queue(&device_mac).await;
let flushed_reqs = queue_lock.clear_device_message_queue(&device_mac).await;

for req in flushed_reqs {
if let Ok(resp) = cgw_construct_infra_request_result_msg(
self.cgw_server.get_local_id(),
req.uuid,
req.command.id,
false,
Some(format!(
"Reques flushed from infra queue {device_mac} due to previous request timeout!"
)),
) {
// Currently Device Queue Manager does not store infars GID
self.cgw_server
.enqueue_mbox_message_from_cgw_to_nb_api(self.group_id, resp);
} else {
error!("Failed to construct message!");
}
}

// reset request duration, request id and queue state
pending_req_id = 0;
Expand All @@ -585,14 +621,26 @@ impl CGWConnectionProcessor {
.set_device_last_req_info(&device_mac, 0, Duration::ZERO)
.await;
fsm_state = CGWUCentralMessageProcessorState::Idle;
if let Ok(resp) = cgw_construct_infra_request_result_msg(
self.cgw_server.get_local_id(),
pending_req_uuid,
pending_req_id,
false,
Some(format!("Request timed out")),
) {
self.cgw_server
.enqueue_mbox_message_from_cgw_to_nb_api(self.group_id, resp);
} else {
error!("Failed to construct rebalance_group message!");
}
}
}

// Process WakeUp reason
let rc = match wakeup_reason {
WakeupReason::WSSRxMsg(res) => {
last_contact = Instant::now();
self.process_wss_rx_msg(res, &mut fsm_state, pending_req_id)
self.process_wss_rx_msg(res, &mut fsm_state, pending_req_id, pending_req_uuid)
.await
}
WakeupReason::MboxRx(mbox_message) => {
Expand Down
Loading

0 comments on commit 6c41c2f

Please sign in to comment.