diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index fe65fb9..b648cb9 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -166,6 +166,7 @@ pub struct CGWConnectionServer { enum CGWNBApiParsedMsgType { InfrastructureGroupCreate, + InfrastructureGroupCreateToShard(i32), InfrastructureGroupDelete, InfrastructureGroupInfraAdd(Vec), InfrastructureGroupInfraDel(Vec), @@ -448,6 +449,13 @@ impl CGWConnectionServer { fn parse_nbapi_msg(&self, pload: &str) -> Option { #[derive(Debug, Serialize, Deserialize)] struct InfraGroupCreate { + r#type: String, + infra_group_id: String, + infra_name: String, + uuid: Uuid, + } + #[derive(Debug, Serialize, Deserialize)] + struct InfraGroupCreateToShard { r#type: String, infra_group_id: String, infra_name: String, @@ -503,6 +511,16 @@ impl CGWConnectionServer { CGWNBApiParsedMsgType::InfrastructureGroupCreate, )); } + "infrastructure_group_create_to_shard" => { + let json_msg: InfraGroupCreateToShard = serde_json::from_str(pload).ok()?; + return Some(CGWNBApiParsedMsg::new( + json_msg.uuid, + group_id, + CGWNBApiParsedMsgType::InfrastructureGroupCreateToShard( + json_msg.infra_shard_id, + ), + )); + } "infrastructure_group_delete" => { let json_msg: InfraGroupDelete = serde_json::from_str(pload).ok()?; return Some(CGWNBApiParsedMsg::new( @@ -699,7 +717,11 @@ impl CGWConnectionServer { reserved_size: 1000i32, actual_size: 0i32, }; - match self.cgw_remote_discovery.create_infra_group(&group).await { + match self + .cgw_remote_discovery + .create_infra_group(&group, None) + .await + { Ok(_dst_cgw_id) => { if let Ok(resp) = cgw_construct_infra_group_create_response( gid, @@ -730,8 +752,61 @@ impl CGWConnectionServer { } else { error!("Failed to construct infra_group_create message"); } + } + } + // This type of msg is handled in place, not added to buf + // for later processing. + continue; + } else if let CGWNBApiParsedMsg { + uuid, + gid, + msg_type: CGWNBApiParsedMsgType::InfrastructureGroupCreateToShard(shard_id), + } = parsed_msg + { + // DB stuff - create group for remote shards to be aware of change + let group = CGWDBInfrastructureGroup { + id: gid, + reserved_size: 1000i32, + actual_size: 0i32, + }; + match self + .cgw_remote_discovery + .create_infra_group(&group, Some(shard_id)) + .await + { + Ok(_dst_cgw_id) => { + if let Ok(resp) = cgw_construct_infra_group_create_response( + gid, + String::default(), + uuid, + true, + None, + ) { + self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); + } else { + error!("Failed to construct infra_group_create message"); + } + } + Err(e) => { + warn!( + "Create group gid {gid}, uuid {uuid} request failed, reason: {:?}", + e + ); - warn!("Create group gid {gid} received, but it already exists, uuid {uuid}"); + if let Ok(resp) = cgw_construct_infra_group_create_response( + gid, + String::default(), + uuid, + false, + Some(format!( + "Failed to create new group to shard id {}: {:?}", + shard_id, e + )), + ) { + self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp); + } else { + error!("Failed to construct infra_group_create message"); + } } } // This type of msg is handled in place, not added to buf diff --git a/src/cgw_nb_api_listener.rs b/src/cgw_nb_api_listener.rs index 815ffee..d82f390 100644 --- a/src/cgw_nb_api_listener.rs +++ b/src/cgw_nb_api_listener.rs @@ -119,7 +119,7 @@ pub fn cgw_construct_infra_group_create_response( error_message: Option, ) -> Result { let group_create = InfraGroupCreateResponse { - r#type: "infrastructure_group_create", + r#type: "infrastructure_group_create_response", infra_group_id, infra_name, uuid, @@ -137,7 +137,7 @@ pub fn cgw_construct_infra_group_delete_response( error_message: Option, ) -> Result { let group_delete = InfraGroupDeleteResponse { - r#type: "infrastructure_group_delete", + r#type: "infrastructure_group_delete_response", infra_group_id, uuid, success, diff --git a/src/cgw_remote_discovery.rs b/src/cgw_remote_discovery.rs index 6c82c5b..a570931 100644 --- a/src/cgw_remote_discovery.rs +++ b/src/cgw_remote_discovery.rs @@ -590,32 +590,51 @@ impl CGWRemoteDiscovery { } } - warn!( - "Every available CGW is exceeding capacity+threshold limit, using least loaded one..." - ); - if let Some(least_loaded_cgw) = lock - .iter() - .min_by(|a, b| { - a.1.shard - .assigned_groups_num - .cmp(&b.1.shard.assigned_groups_num) - }) - .map(|(_k, _v)| _v) - { - warn!("Found least loaded CGW id: {}", least_loaded_cgw.shard.id); - return Ok(least_loaded_cgw.shard.id); - } - Err(Error::RemoteDiscovery( "Unexpected: Failed to find the least loaded CGW shard", )) } - async fn assign_infra_group_to_cgw(&self, gid: i32) -> Result { + async fn validate_infra_group_cgw_assignee(&self, shard_id: i32) -> Result { + let lock = self.remote_cgws_map.read().await; + + match lock.get(&shard_id) { + Some(instance) => { + let max_capacity: i32 = instance.shard.capacity + instance.shard.threshold; + if instance.shard.assigned_groups_num < max_capacity { + debug!("Found CGW shard to assign group to (id {})", shard_id); + Ok(shard_id) + } else { + Err(Error::RemoteDiscovery( + "Unexpected: Failed to find the least loaded CGW shard", + )) + } + } + None => Err(Error::RemoteDiscovery( + "Unexpected: Failed to find CGW shard", + )), + } + } + + async fn assign_infra_group_to_cgw(&self, gid: i32, shard_id: Option) -> Result { // Delete key (if exists), recreate with new owner let _ = self.deassign_infra_group_to_cgw(gid).await; - let dst_cgw_id: i32 = self.get_infra_group_cgw_assignee().await?; + // Sync CGWs to get lates data + if let Err(e) = self.sync_remote_cgw_map().await { + error!("Can't create CGW Remote Discovery client: Can't pull records data from REDIS (wrong redis host/port?) ({:?})", e); + return Err(Error::RemoteDiscovery( + "Failed to sync remote CGW info from REDIS", + )); + } + + let dst_cgw_id: i32 = match shard_id { + Some(dest_shard_id) => { + self.validate_infra_group_cgw_assignee(dest_shard_id) + .await? + } + None => self.get_infra_group_cgw_assignee().await?, + }; let mut con = self.redis_client.clone(); let res: RedisResult<()> = redis::cmd("HSET") @@ -671,15 +690,20 @@ impl CGWRemoteDiscovery { Ok(()) } - pub async fn create_infra_group(&self, g: &CGWDBInfrastructureGroup) -> Result { + pub async fn create_infra_group( + &self, + g: &CGWDBInfrastructureGroup, + dest_shard_id: Option, + ) -> Result { //TODO: transaction-based insert/assigned_group_num update (DB) self.db_accessor.insert_new_infra_group(g).await?; - let shard_id: i32 = match self.assign_infra_group_to_cgw(g.id).await { + let shard_id: i32 = match self.assign_infra_group_to_cgw(g.id, dest_shard_id).await { Ok(v) => v, - Err(_e) => { + Err(e) => { + error!("Assign group to CGW shard failed! Err: {}", e.to_string()); let _ = self.db_accessor.delete_infra_group(g.id).await; - return Err(Error::RemoteDiscovery("Assign group to CGW shard failed")); + return Err(e); } }; @@ -911,7 +935,7 @@ impl CGWRemoteDiscovery { for i in groups.iter() { let _ = self.sync_remote_cgw_map().await; let _ = self.sync_gid_to_cgw_map().await; - match self.assign_infra_group_to_cgw(i.id).await { + match self.assign_infra_group_to_cgw(i.id, None).await { Ok(shard_id) => { debug!("Rebalancing: assigned {} to shard {}", i.id, shard_id); let _ = self.increment_cgw_assigned_groups_num(shard_id).await; diff --git a/utils/kafka_producer/data/message_template.json b/utils/kafka_producer/data/message_template.json index bb1f1fd..468541e 100644 --- a/utils/kafka_producer/data/message_template.json +++ b/utils/kafka_producer/data/message_template.json @@ -3,7 +3,6 @@ "type": "infrastructure_group_create", "infra_group_id": "key", "infra_name": "name", - "infra_shard_id": 0, "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff" }, "del_group": {