Skip to content

Commit

Permalink
Change device to infra in NB API
Browse files Browse the repository at this point in the history
  • Loading branch information
SviatoslavBoichuk committed Oct 28, 2024
1 parent ea3a7d1 commit 7d3d7fb
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 66 deletions.
64 changes: 31 additions & 33 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use crate::cgw_device::{
cgw_detect_device_chages, CGWDevice, CGWDeviceCapabilities, CGWDeviceState, CGWDeviceType,
};
use crate::cgw_nb_api_listener::{
cgw_construct_device_capabilities_changed_msg, cgw_construct_device_enqueue_response,
cgw_construct_foreign_infra_connection_msg, cgw_construct_infra_group_create_response,
cgw_construct_infra_group_delete_response, cgw_construct_infra_group_device_add_response,
cgw_construct_infra_group_device_del_response, cgw_construct_infra_join_msg,
cgw_construct_foreign_infra_connection_msg, cgw_construct_infra_capabilities_changed_msg,
cgw_construct_infra_enqueue_response, cgw_construct_infra_group_create_response,
cgw_construct_infra_group_delete_response, cgw_construct_infra_group_infras_add_response,
cgw_construct_infra_group_infras_del_response, cgw_construct_infra_join_msg,
cgw_construct_infra_leave_msg, cgw_construct_rebalance_group_response,
cgw_construct_unassigned_infra_connection_msg,
};
Expand Down Expand Up @@ -174,7 +174,7 @@ enum CGWNBApiParsedMsgType {
InfrastructureGroupCreate,
InfrastructureGroupCreateToShard(i32),
InfrastructureGroupDelete,
InfrastructureGroupInfraAdd(Vec<MacAddress>),
InfrastructureGroupInfrasAdd(Vec<MacAddress>),
InfrastructureGroupInfraDel(Vec<MacAddress>),
InfrastructureGroupInfraMsg(MacAddress, String),
RebalanceGroups,
Expand Down Expand Up @@ -498,18 +498,18 @@ impl CGWConnectionServer {
}

#[derive(Debug, Serialize, Deserialize)]
struct InfraGroupInfraAdd {
struct InfraGroupInfrasAdd {
r#type: String,
infra_group_id: String,
infra_group_infra_devices: Vec<MacAddress>,
infra_group_infras: Vec<MacAddress>,
uuid: Uuid,
}

#[derive(Debug, Serialize, Deserialize)]
struct InfraGroupInfraDel {
struct InfraGroupInfrasDel {
r#type: String,
infra_group_id: String,
infra_group_infra_devices: Vec<MacAddress>,
infra_group_infras: Vec<MacAddress>,
uuid: Uuid,
}

Expand Down Expand Up @@ -557,27 +557,25 @@ impl CGWConnectionServer {
CGWNBApiParsedMsgType::InfrastructureGroupDelete,
));
}
"infrastructure_group_device_add" => {
let json_msg: InfraGroupInfraAdd = serde_json::from_str(pload).ok()?;
"infrastructure_group_infras_add" => {
let json_msg: InfraGroupInfrasAdd = serde_json::from_str(pload).ok()?;
return Some(CGWNBApiParsedMsg::new(
json_msg.uuid,
group_id,
CGWNBApiParsedMsgType::InfrastructureGroupInfraAdd(
json_msg.infra_group_infra_devices,
CGWNBApiParsedMsgType::InfrastructureGroupInfrasAdd(
json_msg.infra_group_infras,
),
));
}
"infrastructure_group_device_del" => {
let json_msg: InfraGroupInfraDel = serde_json::from_str(pload).ok()?;
"infrastructure_group_infras_del" => {
let json_msg: InfraGroupInfrasDel = serde_json::from_str(pload).ok()?;
return Some(CGWNBApiParsedMsg::new(
json_msg.uuid,
group_id,
CGWNBApiParsedMsgType::InfrastructureGroupInfraDel(
json_msg.infra_group_infra_devices,
),
CGWNBApiParsedMsgType::InfrastructureGroupInfraDel(json_msg.infra_group_infras),
));
}
"infrastructure_group_device_message" => {
"infrastructure_group_infra_message" => {
let json_msg: InfraGroupMsgJSON = serde_json::from_str(pload).ok()?;
debug!("{:?}", json_msg);
return Some(CGWNBApiParsedMsg::new(
Expand Down Expand Up @@ -986,7 +984,7 @@ impl CGWConnectionServer {
}
}
None => {
if let Ok(resp) = cgw_construct_device_enqueue_response(
if let Ok(resp) = cgw_construct_infra_enqueue_response(
Uuid::default(),
false,
Some(format!(
Expand Down Expand Up @@ -1046,7 +1044,7 @@ impl CGWConnectionServer {
.await)
.is_err()
{
if let Ok(resp) = cgw_construct_device_enqueue_response(
if let Ok(resp) = cgw_construct_infra_enqueue_response(
Uuid::default(),
false,
Some(format!("Failed to relay MSG stream to remote CGW{cgw_id}")),
Expand Down Expand Up @@ -1091,15 +1089,15 @@ impl CGWConnectionServer {
CGWNBApiParsedMsg {
uuid,
gid,
msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfraAdd(mac_list),
msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfrasAdd(mac_list),
} => {
if (self
.cgw_remote_discovery
.get_infra_group_owner_id(gid_numeric)
.await)
.is_none()
{
if let Ok(resp) = cgw_construct_infra_group_device_add_response(
if let Ok(resp) = cgw_construct_infra_group_infras_add_response(
gid,
mac_list.clone(),
uuid,
Expand All @@ -1126,7 +1124,7 @@ impl CGWConnectionServer {
self.clone()
.notify_devices_on_gid_change(mac_list.clone(), gid);

if let Ok(resp) = cgw_construct_infra_group_device_add_response(
if let Ok(resp) = cgw_construct_infra_group_infras_add_response(
gid, mac_list, uuid, true, None,
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
Expand Down Expand Up @@ -1155,7 +1153,7 @@ impl CGWConnectionServer {
.notify_devices_on_gid_change(macs_to_notify, gid);
}

if let Ok(resp) = cgw_construct_infra_group_device_add_response(
if let Ok(resp) = cgw_construct_infra_group_infras_add_response(
gid,
mac_addresses,
uuid,
Expand Down Expand Up @@ -1186,7 +1184,7 @@ impl CGWConnectionServer {
.await)
.is_none()
{
if let Ok(resp) = cgw_construct_infra_group_device_del_response(
if let Ok(resp) = cgw_construct_infra_group_infras_del_response(
gid,
mac_list.clone(),
uuid,
Expand Down Expand Up @@ -1217,7 +1215,7 @@ impl CGWConnectionServer {
self.clone()
.notify_devices_on_gid_change(mac_list.clone(), 0i32);

if let Ok(resp) = cgw_construct_infra_group_device_del_response(
if let Ok(resp) = cgw_construct_infra_group_infras_del_response(
gid, mac_list, uuid, true, None,
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
Expand Down Expand Up @@ -1246,7 +1244,7 @@ impl CGWConnectionServer {
.notify_devices_on_gid_change(macs_to_notify, 0i32);
}

if let Ok(resp) = cgw_construct_infra_group_device_del_response(
if let Ok(resp) = cgw_construct_infra_group_infras_del_response(
gid,
mac_addresses,
uuid,
Expand Down Expand Up @@ -1278,7 +1276,7 @@ impl CGWConnectionServer {
.await)
.is_none()
{
if let Ok(resp) = cgw_construct_device_enqueue_response(
if let Ok(resp) = cgw_construct_infra_enqueue_response(
uuid,
false,
Some(format!("Failed to sink down msg to device of nonexisting group, gid {gid}, uuid {uuid}: group does not exist")),
Expand Down Expand Up @@ -1326,7 +1324,7 @@ impl CGWConnectionServer {
}
Err(e) => {
error!("Failed to validate config message! Invalid configure message for device: {device_mac}!");
if let Ok(resp) = cgw_construct_device_enqueue_response(
if let Ok(resp) = cgw_construct_infra_enqueue_response(
uuid,
false,
Some(format!("Failed to validate config message! Invalid configure message for device: {device_mac}, uuid {uuid}\nError: {e}")),
Expand All @@ -1346,7 +1344,7 @@ impl CGWConnectionServer {
}
}
} else {
if let Ok(resp) = cgw_construct_device_enqueue_response(
if let Ok(resp) = cgw_construct_infra_enqueue_response(
uuid,
false,
Some(format!("Failed to parse command message to device: {device_mac}, uuid {uuid}")),
Expand Down Expand Up @@ -1562,7 +1560,7 @@ impl CGWConnectionServer {
cgw_detect_device_chages(&device.get_device_capabilities(), &caps);
match changes {
Some(diff) => {
if let Ok(resp) = cgw_construct_device_capabilities_changed_msg(
if let Ok(resp) = cgw_construct_infra_capabilities_changed_msg(
device_mac,
device.get_device_group_id(),
&diff,
Expand Down Expand Up @@ -1604,7 +1602,7 @@ impl CGWConnectionServer {
let changes = cgw_detect_device_chages(&default_caps, &caps);
match changes {
Some(diff) => {
if let Ok(resp) = cgw_construct_device_capabilities_changed_msg(
if let Ok(resp) = cgw_construct_infra_capabilities_changed_msg(
device_mac, 0, &diff,
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(0, resp);
Expand Down
50 changes: 25 additions & 25 deletions src/cgw_nb_api_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,27 @@ pub struct InfraGroupDeleteResponse {
}

#[derive(Debug, Serialize)]
pub struct InfraGroupDeviceAddResponse {
pub struct InfraGroupInfrasAddResponse {
pub r#type: &'static str,
pub infra_group_id: i32,
pub infra_group_infra_devices: Vec<MacAddress>,
pub infra_group_infras: Vec<MacAddress>,
pub uuid: Uuid,
pub success: bool,
pub error_message: Option<String>,
}

#[derive(Debug, Serialize)]
pub struct InfraGroupDeviceDelResponse {
pub struct InfraGroupInfrasDelResponse {
pub r#type: &'static str,
pub infra_group_id: i32,
pub infra_group_infra_devices: Vec<MacAddress>,
pub infra_group_infras: Vec<MacAddress>,
pub uuid: Uuid,
pub success: bool,
pub error_message: Option<String>,
}

#[derive(Debug, Serialize)]
pub struct InfraGroupDeviceMessageEnqueueResponse {
pub struct InfraGroupInfraMessageEnqueueResponse {
pub r#type: &'static str,
pub uuid: Uuid,
pub success: bool,
Expand All @@ -89,10 +89,10 @@ pub struct RebalanceGroupsResponse {
}

#[derive(Debug, Serialize)]
pub struct InfraGroupDeviceCapabilitiesChanged {
pub struct InfraGroupInfraCapabilitiesChanged {
pub r#type: &'static str,
pub infra_group_id: i32,
pub infra_group_infra_device: MacAddress,
pub infra_group_infra: MacAddress,
pub changes: Vec<CGWDeviceChange>,
}

Expand Down Expand Up @@ -192,17 +192,17 @@ pub fn cgw_construct_infra_group_delete_response(
Ok(serde_json::to_string(&group_delete)?)
}

pub fn cgw_construct_infra_group_device_add_response(
pub fn cgw_construct_infra_group_infras_add_response(
infra_group_id: i32,
infra_group_infra_devices: Vec<MacAddress>,
infra_group_infras: Vec<MacAddress>,
uuid: Uuid,
success: bool,
error_message: Option<String>,
) -> Result<String> {
let dev_add = InfraGroupDeviceAddResponse {
r#type: "infrastructure_group_device_add_response",
let dev_add = InfraGroupInfrasAddResponse {
r#type: "infrastructure_group_infras_add_response",
infra_group_id,
infra_group_infra_devices,
infra_group_infras,
uuid,
success,
error_message,
Expand All @@ -211,17 +211,17 @@ pub fn cgw_construct_infra_group_device_add_response(
Ok(serde_json::to_string(&dev_add)?)
}

pub fn cgw_construct_infra_group_device_del_response(
pub fn cgw_construct_infra_group_infras_del_response(
infra_group_id: i32,
infra_group_infra_devices: Vec<MacAddress>,
infra_group_infras: Vec<MacAddress>,
uuid: Uuid,
success: bool,
error_message: Option<String>,
) -> Result<String> {
let dev_del = InfraGroupDeviceDelResponse {
r#type: "infrastructure_group_device_del_response",
let dev_del = InfraGroupInfrasDelResponse {
r#type: "infrastructure_group_infras_del_response",
infra_group_id,
infra_group_infra_devices,
infra_group_infras,
uuid,
success,
error_message,
Expand All @@ -230,13 +230,13 @@ pub fn cgw_construct_infra_group_device_del_response(
Ok(serde_json::to_string(&dev_del)?)
}

pub fn cgw_construct_device_enqueue_response(
pub fn cgw_construct_infra_enqueue_response(
uuid: Uuid,
success: bool,
error_message: Option<String>,
) -> Result<String> {
let dev_enq_resp = InfraGroupDeviceMessageEnqueueResponse {
r#type: "infrastructure_group_device_message_enqueu_response",
let dev_enq_resp = InfraGroupInfraMessageEnqueueResponse {
r#type: "infrastructure_group_infra_message_enqueu_response",
uuid,
success,
error_message,
Expand All @@ -262,8 +262,8 @@ pub fn cgw_construct_rebalance_group_response(
Ok(serde_json::to_string(&rebalanse_resp)?)
}

pub fn cgw_construct_device_capabilities_changed_msg(
infra_group_infra_device: MacAddress,
pub fn cgw_construct_infra_capabilities_changed_msg(
infra_group_infra: MacAddress,
infra_group_id: i32,
diff: &HashMap<String, OldNew>,
) -> Result<String> {
Expand All @@ -277,10 +277,10 @@ pub fn cgw_construct_device_capabilities_changed_msg(
});
}

let dev_cap_msg = InfraGroupDeviceCapabilitiesChanged {
r#type: "infrastructure_group_device_capabilities_changed",
let dev_cap_msg = InfraGroupInfraCapabilitiesChanged {
r#type: "infrastructure_group_infra_capabilities_changed",
infra_group_id,
infra_group_infra_device,
infra_group_infra,
changes,
};

Expand Down
12 changes: 6 additions & 6 deletions utils/kafka_producer/data/message_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
"uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff"
},
"add_to_group": {
"type": "infrastructure_group_device_add",
"type": "infrastructure_group_infras_add",
"infra_group_id": "key",
"infra_group_infra_devices": [],
"infra_group_infras": [],
"uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff"
},
"del_from_group": {
"type": "infrastructure_group_device_del",
"type": "infrastructure_group_infras_del",
"infra_group_id": "key",
"infra_group_infra_devices": [],
"infra_group_infras": [],
"uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff"
},
"message_device": {
"type": "infrastructure_group_device_message",
"message_infra": {
"type": "infrastructure_group_infra_message",
"infra_group_id": "key",
"mac": "mac",
"msg": {},
Expand Down
4 changes: 2 additions & 2 deletions utils/kafka_producer/src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ class Message:
GROUP_DEL = "del_group"
DEV_TO_GROUP = "add_to_group"
DEV_FROM_GROUP = "del_from_group"
TO_DEVICE = "message_device"
TO_DEVICE = "message_infra"
GROUP_ID = "infra_group_id"
GROUP_NAME = "infra_name"
SHARD_ID = "infra_shard_id"
DEV_LIST = "infra_group_infra_devices"
DEV_LIST = "infra_group_infras"
MAC = "mac"
DATA = "msg"
MSG_UUID = "uuid"
Expand Down

0 comments on commit 7d3d7fb

Please sign in to comment.