From 7d3d7fb492ed3e6cedcc1f2093fa3bbf39db1e26 Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Mon, 28 Oct 2024 15:55:36 +0200 Subject: [PATCH] Change device to infra in NB API --- src/cgw_connection_server.rs | 64 +++++++++---------- src/cgw_nb_api_listener.rs | 50 +++++++-------- .../kafka_producer/data/message_template.json | 12 ++-- utils/kafka_producer/src/utils.py | 4 +- 4 files changed, 64 insertions(+), 66 deletions(-) diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index 2def66d..a7ecda7 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -2,10 +2,10 @@ use crate::cgw_device::{ cgw_detect_device_chages, CGWDevice, CGWDeviceCapabilities, CGWDeviceState, CGWDeviceType, }; use crate::cgw_nb_api_listener::{ - cgw_construct_device_capabilities_changed_msg, cgw_construct_device_enqueue_response, - cgw_construct_foreign_infra_connection_msg, cgw_construct_infra_group_create_response, - cgw_construct_infra_group_delete_response, cgw_construct_infra_group_device_add_response, - cgw_construct_infra_group_device_del_response, cgw_construct_infra_join_msg, + cgw_construct_foreign_infra_connection_msg, cgw_construct_infra_capabilities_changed_msg, + cgw_construct_infra_enqueue_response, cgw_construct_infra_group_create_response, + cgw_construct_infra_group_delete_response, cgw_construct_infra_group_infras_add_response, + cgw_construct_infra_group_infras_del_response, cgw_construct_infra_join_msg, cgw_construct_infra_leave_msg, cgw_construct_rebalance_group_response, cgw_construct_unassigned_infra_connection_msg, }; @@ -174,7 +174,7 @@ enum CGWNBApiParsedMsgType { InfrastructureGroupCreate, InfrastructureGroupCreateToShard(i32), InfrastructureGroupDelete, - InfrastructureGroupInfraAdd(Vec), + InfrastructureGroupInfrasAdd(Vec), InfrastructureGroupInfraDel(Vec), InfrastructureGroupInfraMsg(MacAddress, String), RebalanceGroups, @@ -498,18 +498,18 @@ impl CGWConnectionServer { } #[derive(Debug, Serialize, Deserialize)] - struct InfraGroupInfraAdd { + struct InfraGroupInfrasAdd { r#type: String, infra_group_id: String, - infra_group_infra_devices: Vec, + infra_group_infras: Vec, uuid: Uuid, } #[derive(Debug, Serialize, Deserialize)] - struct InfraGroupInfraDel { + struct InfraGroupInfrasDel { r#type: String, infra_group_id: String, - infra_group_infra_devices: Vec, + infra_group_infras: Vec, uuid: Uuid, } @@ -557,27 +557,25 @@ impl CGWConnectionServer { CGWNBApiParsedMsgType::InfrastructureGroupDelete, )); } - "infrastructure_group_device_add" => { - let json_msg: InfraGroupInfraAdd = serde_json::from_str(pload).ok()?; + "infrastructure_group_infras_add" => { + let json_msg: InfraGroupInfrasAdd = serde_json::from_str(pload).ok()?; return Some(CGWNBApiParsedMsg::new( json_msg.uuid, group_id, - CGWNBApiParsedMsgType::InfrastructureGroupInfraAdd( - json_msg.infra_group_infra_devices, + CGWNBApiParsedMsgType::InfrastructureGroupInfrasAdd( + json_msg.infra_group_infras, ), )); } - "infrastructure_group_device_del" => { - let json_msg: InfraGroupInfraDel = serde_json::from_str(pload).ok()?; + "infrastructure_group_infras_del" => { + let json_msg: InfraGroupInfrasDel = serde_json::from_str(pload).ok()?; return Some(CGWNBApiParsedMsg::new( json_msg.uuid, group_id, - CGWNBApiParsedMsgType::InfrastructureGroupInfraDel( - json_msg.infra_group_infra_devices, - ), + CGWNBApiParsedMsgType::InfrastructureGroupInfraDel(json_msg.infra_group_infras), )); } - "infrastructure_group_device_message" => { + "infrastructure_group_infra_message" => { let json_msg: InfraGroupMsgJSON = serde_json::from_str(pload).ok()?; debug!("{:?}", json_msg); return Some(CGWNBApiParsedMsg::new( @@ -986,7 +984,7 @@ impl CGWConnectionServer { } } None => { - if let Ok(resp) = cgw_construct_device_enqueue_response( + if let Ok(resp) = cgw_construct_infra_enqueue_response( Uuid::default(), false, Some(format!( @@ -1046,7 +1044,7 @@ impl CGWConnectionServer { .await) .is_err() { - if let Ok(resp) = cgw_construct_device_enqueue_response( + if let Ok(resp) = cgw_construct_infra_enqueue_response( Uuid::default(), false, Some(format!("Failed to relay MSG stream to remote CGW{cgw_id}")), @@ -1091,7 +1089,7 @@ impl CGWConnectionServer { CGWNBApiParsedMsg { uuid, gid, - msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfraAdd(mac_list), + msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfrasAdd(mac_list), } => { if (self .cgw_remote_discovery @@ -1099,7 +1097,7 @@ impl CGWConnectionServer { .await) .is_none() { - if let Ok(resp) = cgw_construct_infra_group_device_add_response( + if let Ok(resp) = cgw_construct_infra_group_infras_add_response( gid, mac_list.clone(), uuid, @@ -1126,7 +1124,7 @@ impl CGWConnectionServer { self.clone() .notify_devices_on_gid_change(mac_list.clone(), gid); - if let Ok(resp) = cgw_construct_infra_group_device_add_response( + if let Ok(resp) = cgw_construct_infra_group_infras_add_response( gid, mac_list, uuid, true, None, ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); @@ -1155,7 +1153,7 @@ impl CGWConnectionServer { .notify_devices_on_gid_change(macs_to_notify, gid); } - if let Ok(resp) = cgw_construct_infra_group_device_add_response( + if let Ok(resp) = cgw_construct_infra_group_infras_add_response( gid, mac_addresses, uuid, @@ -1186,7 +1184,7 @@ impl CGWConnectionServer { .await) .is_none() { - if let Ok(resp) = cgw_construct_infra_group_device_del_response( + if let Ok(resp) = cgw_construct_infra_group_infras_del_response( gid, mac_list.clone(), uuid, @@ -1217,7 +1215,7 @@ impl CGWConnectionServer { self.clone() .notify_devices_on_gid_change(mac_list.clone(), 0i32); - if let Ok(resp) = cgw_construct_infra_group_device_del_response( + if let Ok(resp) = cgw_construct_infra_group_infras_del_response( gid, mac_list, uuid, true, None, ) { self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); @@ -1246,7 +1244,7 @@ impl CGWConnectionServer { .notify_devices_on_gid_change(macs_to_notify, 0i32); } - if let Ok(resp) = cgw_construct_infra_group_device_del_response( + if let Ok(resp) = cgw_construct_infra_group_infras_del_response( gid, mac_addresses, uuid, @@ -1278,7 +1276,7 @@ impl CGWConnectionServer { .await) .is_none() { - if let Ok(resp) = cgw_construct_device_enqueue_response( + if let Ok(resp) = cgw_construct_infra_enqueue_response( uuid, false, Some(format!("Failed to sink down msg to device of nonexisting group, gid {gid}, uuid {uuid}: group does not exist")), @@ -1326,7 +1324,7 @@ impl CGWConnectionServer { } Err(e) => { error!("Failed to validate config message! Invalid configure message for device: {device_mac}!"); - if let Ok(resp) = cgw_construct_device_enqueue_response( + if let Ok(resp) = cgw_construct_infra_enqueue_response( uuid, false, Some(format!("Failed to validate config message! Invalid configure message for device: {device_mac}, uuid {uuid}\nError: {e}")), @@ -1346,7 +1344,7 @@ impl CGWConnectionServer { } } } else { - if let Ok(resp) = cgw_construct_device_enqueue_response( + if let Ok(resp) = cgw_construct_infra_enqueue_response( uuid, false, Some(format!("Failed to parse command message to device: {device_mac}, uuid {uuid}")), @@ -1562,7 +1560,7 @@ impl CGWConnectionServer { cgw_detect_device_chages(&device.get_device_capabilities(), &caps); match changes { Some(diff) => { - if let Ok(resp) = cgw_construct_device_capabilities_changed_msg( + if let Ok(resp) = cgw_construct_infra_capabilities_changed_msg( device_mac, device.get_device_group_id(), &diff, @@ -1604,7 +1602,7 @@ impl CGWConnectionServer { let changes = cgw_detect_device_chages(&default_caps, &caps); match changes { Some(diff) => { - if let Ok(resp) = cgw_construct_device_capabilities_changed_msg( + if let Ok(resp) = cgw_construct_infra_capabilities_changed_msg( device_mac, 0, &diff, ) { self.enqueue_mbox_message_from_cgw_to_nb_api(0, resp); diff --git a/src/cgw_nb_api_listener.rs b/src/cgw_nb_api_listener.rs index 2736094..b1dd552 100644 --- a/src/cgw_nb_api_listener.rs +++ b/src/cgw_nb_api_listener.rs @@ -52,27 +52,27 @@ pub struct InfraGroupDeleteResponse { } #[derive(Debug, Serialize)] -pub struct InfraGroupDeviceAddResponse { +pub struct InfraGroupInfrasAddResponse { pub r#type: &'static str, pub infra_group_id: i32, - pub infra_group_infra_devices: Vec, + pub infra_group_infras: Vec, pub uuid: Uuid, pub success: bool, pub error_message: Option, } #[derive(Debug, Serialize)] -pub struct InfraGroupDeviceDelResponse { +pub struct InfraGroupInfrasDelResponse { pub r#type: &'static str, pub infra_group_id: i32, - pub infra_group_infra_devices: Vec, + pub infra_group_infras: Vec, pub uuid: Uuid, pub success: bool, pub error_message: Option, } #[derive(Debug, Serialize)] -pub struct InfraGroupDeviceMessageEnqueueResponse { +pub struct InfraGroupInfraMessageEnqueueResponse { pub r#type: &'static str, pub uuid: Uuid, pub success: bool, @@ -89,10 +89,10 @@ pub struct RebalanceGroupsResponse { } #[derive(Debug, Serialize)] -pub struct InfraGroupDeviceCapabilitiesChanged { +pub struct InfraGroupInfraCapabilitiesChanged { pub r#type: &'static str, pub infra_group_id: i32, - pub infra_group_infra_device: MacAddress, + pub infra_group_infra: MacAddress, pub changes: Vec, } @@ -192,17 +192,17 @@ pub fn cgw_construct_infra_group_delete_response( Ok(serde_json::to_string(&group_delete)?) } -pub fn cgw_construct_infra_group_device_add_response( +pub fn cgw_construct_infra_group_infras_add_response( infra_group_id: i32, - infra_group_infra_devices: Vec, + infra_group_infras: Vec, uuid: Uuid, success: bool, error_message: Option, ) -> Result { - let dev_add = InfraGroupDeviceAddResponse { - r#type: "infrastructure_group_device_add_response", + let dev_add = InfraGroupInfrasAddResponse { + r#type: "infrastructure_group_infras_add_response", infra_group_id, - infra_group_infra_devices, + infra_group_infras, uuid, success, error_message, @@ -211,17 +211,17 @@ pub fn cgw_construct_infra_group_device_add_response( Ok(serde_json::to_string(&dev_add)?) } -pub fn cgw_construct_infra_group_device_del_response( +pub fn cgw_construct_infra_group_infras_del_response( infra_group_id: i32, - infra_group_infra_devices: Vec, + infra_group_infras: Vec, uuid: Uuid, success: bool, error_message: Option, ) -> Result { - let dev_del = InfraGroupDeviceDelResponse { - r#type: "infrastructure_group_device_del_response", + let dev_del = InfraGroupInfrasDelResponse { + r#type: "infrastructure_group_infras_del_response", infra_group_id, - infra_group_infra_devices, + infra_group_infras, uuid, success, error_message, @@ -230,13 +230,13 @@ pub fn cgw_construct_infra_group_device_del_response( Ok(serde_json::to_string(&dev_del)?) } -pub fn cgw_construct_device_enqueue_response( +pub fn cgw_construct_infra_enqueue_response( uuid: Uuid, success: bool, error_message: Option, ) -> Result { - let dev_enq_resp = InfraGroupDeviceMessageEnqueueResponse { - r#type: "infrastructure_group_device_message_enqueu_response", + let dev_enq_resp = InfraGroupInfraMessageEnqueueResponse { + r#type: "infrastructure_group_infra_message_enqueu_response", uuid, success, error_message, @@ -262,8 +262,8 @@ pub fn cgw_construct_rebalance_group_response( Ok(serde_json::to_string(&rebalanse_resp)?) } -pub fn cgw_construct_device_capabilities_changed_msg( - infra_group_infra_device: MacAddress, +pub fn cgw_construct_infra_capabilities_changed_msg( + infra_group_infra: MacAddress, infra_group_id: i32, diff: &HashMap, ) -> Result { @@ -277,10 +277,10 @@ pub fn cgw_construct_device_capabilities_changed_msg( }); } - let dev_cap_msg = InfraGroupDeviceCapabilitiesChanged { - r#type: "infrastructure_group_device_capabilities_changed", + let dev_cap_msg = InfraGroupInfraCapabilitiesChanged { + r#type: "infrastructure_group_infra_capabilities_changed", infra_group_id, - infra_group_infra_device, + infra_group_infra, changes, }; diff --git a/utils/kafka_producer/data/message_template.json b/utils/kafka_producer/data/message_template.json index 468541e..15ecac3 100644 --- a/utils/kafka_producer/data/message_template.json +++ b/utils/kafka_producer/data/message_template.json @@ -11,19 +11,19 @@ "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff" }, "add_to_group": { - "type": "infrastructure_group_device_add", + "type": "infrastructure_group_infras_add", "infra_group_id": "key", - "infra_group_infra_devices": [], + "infra_group_infras": [], "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff" }, "del_from_group": { - "type": "infrastructure_group_device_del", + "type": "infrastructure_group_infras_del", "infra_group_id": "key", - "infra_group_infra_devices": [], + "infra_group_infras": [], "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff" }, - "message_device": { - "type": "infrastructure_group_device_message", + "message_infra": { + "type": "infrastructure_group_infra_message", "infra_group_id": "key", "mac": "mac", "msg": {}, diff --git a/utils/kafka_producer/src/utils.py b/utils/kafka_producer/src/utils.py index 3f8f442..6d88357 100644 --- a/utils/kafka_producer/src/utils.py +++ b/utils/kafka_producer/src/utils.py @@ -83,11 +83,11 @@ class Message: GROUP_DEL = "del_group" DEV_TO_GROUP = "add_to_group" DEV_FROM_GROUP = "del_from_group" - TO_DEVICE = "message_device" + TO_DEVICE = "message_infra" GROUP_ID = "infra_group_id" GROUP_NAME = "infra_name" SHARD_ID = "infra_shard_id" - DEV_LIST = "infra_group_infra_devices" + DEV_LIST = "infra_group_infras" MAC = "mac" DATA = "msg" MSG_UUID = "uuid"