Skip to content

Commit

Permalink
Add new NB API to assigne group to specific CGW
Browse files Browse the repository at this point in the history
  • Loading branch information
SviatoslavBoichuk committed Aug 16, 2024
1 parent add81bb commit 800bfd9
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 28 deletions.
79 changes: 77 additions & 2 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ pub struct CGWConnectionServer {

enum CGWNBApiParsedMsgType {
InfrastructureGroupCreate,
InfrastructureGroupCreateToShard(i32),
InfrastructureGroupDelete,
InfrastructureGroupInfraAdd(Vec<MacAddress>),
InfrastructureGroupInfraDel(Vec<MacAddress>),
Expand Down Expand Up @@ -448,6 +449,13 @@ impl CGWConnectionServer {
fn parse_nbapi_msg(&self, pload: &str) -> Option<CGWNBApiParsedMsg> {
#[derive(Debug, Serialize, Deserialize)]
struct InfraGroupCreate {
r#type: String,
infra_group_id: String,
infra_name: String,
uuid: Uuid,
}
#[derive(Debug, Serialize, Deserialize)]
struct InfraGroupCreateToShard {
r#type: String,
infra_group_id: String,
infra_name: String,
Expand Down Expand Up @@ -503,6 +511,16 @@ impl CGWConnectionServer {
CGWNBApiParsedMsgType::InfrastructureGroupCreate,
));
}
"infrastructure_group_create_to_shard" => {
let json_msg: InfraGroupCreateToShard = serde_json::from_str(pload).ok()?;
return Some(CGWNBApiParsedMsg::new(
json_msg.uuid,
group_id,
CGWNBApiParsedMsgType::InfrastructureGroupCreateToShard(
json_msg.infra_shard_id,
),
));
}
"infrastructure_group_delete" => {
let json_msg: InfraGroupDelete = serde_json::from_str(pload).ok()?;
return Some(CGWNBApiParsedMsg::new(
Expand Down Expand Up @@ -699,7 +717,11 @@ impl CGWConnectionServer {
reserved_size: 1000i32,
actual_size: 0i32,
};
match self.cgw_remote_discovery.create_infra_group(&group).await {
match self
.cgw_remote_discovery
.create_infra_group(&group, None)
.await
{
Ok(_dst_cgw_id) => {
if let Ok(resp) = cgw_construct_infra_group_create_response(
gid,
Expand Down Expand Up @@ -730,8 +752,61 @@ impl CGWConnectionServer {
} else {
error!("Failed to construct infra_group_create message");
}
}
}
// This type of msg is handled in place, not added to buf
// for later processing.
continue;
} else if let CGWNBApiParsedMsg {
uuid,
gid,
msg_type: CGWNBApiParsedMsgType::InfrastructureGroupCreateToShard(shard_id),
} = parsed_msg
{
// DB stuff - create group for remote shards to be aware of change
let group = CGWDBInfrastructureGroup {
id: gid,
reserved_size: 1000i32,
actual_size: 0i32,
};
match self
.cgw_remote_discovery
.create_infra_group(&group, Some(shard_id))
.await
{
Ok(_dst_cgw_id) => {
if let Ok(resp) = cgw_construct_infra_group_create_response(
gid,
String::default(),
uuid,
true,
None,
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
error!("Failed to construct infra_group_create message");
}
}
Err(e) => {
warn!(
"Create group gid {gid}, uuid {uuid} request failed, reason: {:?}",
e
);

warn!("Create group gid {gid} received, but it already exists, uuid {uuid}");
if let Ok(resp) = cgw_construct_infra_group_create_response(
gid,
String::default(),
uuid,
false,
Some(format!(
"Failed to create new group to shard id {}: {:?}",
shard_id, e
)),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
error!("Failed to construct infra_group_create message");
}
}
}
// This type of msg is handled in place, not added to buf
Expand Down
4 changes: 2 additions & 2 deletions src/cgw_nb_api_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub fn cgw_construct_infra_group_create_response(
error_message: Option<String>,
) -> Result<String> {
let group_create = InfraGroupCreateResponse {
r#type: "infrastructure_group_create",
r#type: "infrastructure_group_create_response",
infra_group_id,
infra_name,
uuid,
Expand All @@ -137,7 +137,7 @@ pub fn cgw_construct_infra_group_delete_response(
error_message: Option<String>,
) -> Result<String> {
let group_delete = InfraGroupDeleteResponse {
r#type: "infrastructure_group_delete",
r#type: "infrastructure_group_delete_response",
infra_group_id,
uuid,
success,
Expand Down
70 changes: 47 additions & 23 deletions src/cgw_remote_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,32 +590,51 @@ impl CGWRemoteDiscovery {
}
}

warn!(
"Every available CGW is exceeding capacity+threshold limit, using least loaded one..."
);
if let Some(least_loaded_cgw) = lock
.iter()
.min_by(|a, b| {
a.1.shard
.assigned_groups_num
.cmp(&b.1.shard.assigned_groups_num)
})
.map(|(_k, _v)| _v)
{
warn!("Found least loaded CGW id: {}", least_loaded_cgw.shard.id);
return Ok(least_loaded_cgw.shard.id);
}

Err(Error::RemoteDiscovery(
"Unexpected: Failed to find the least loaded CGW shard",
))
}

async fn assign_infra_group_to_cgw(&self, gid: i32) -> Result<i32> {
async fn validate_infra_group_cgw_assignee(&self, shard_id: i32) -> Result<i32> {
let lock = self.remote_cgws_map.read().await;

match lock.get(&shard_id) {
Some(instance) => {
let max_capacity: i32 = instance.shard.capacity + instance.shard.threshold;
if instance.shard.assigned_groups_num < max_capacity {
debug!("Found CGW shard to assign group to (id {})", shard_id);
Ok(shard_id)
} else {
Err(Error::RemoteDiscovery(
"Unexpected: Failed to find the least loaded CGW shard",
))
}
}
None => Err(Error::RemoteDiscovery(
"Unexpected: Failed to find CGW shard",
)),
}
}

async fn assign_infra_group_to_cgw(&self, gid: i32, shard_id: Option<i32>) -> Result<i32> {
// Delete key (if exists), recreate with new owner
let _ = self.deassign_infra_group_to_cgw(gid).await;

let dst_cgw_id: i32 = self.get_infra_group_cgw_assignee().await?;
// Sync CGWs to get lates data
if let Err(e) = self.sync_remote_cgw_map().await {
error!("Can't create CGW Remote Discovery client: Can't pull records data from REDIS (wrong redis host/port?) ({:?})", e);
return Err(Error::RemoteDiscovery(
"Failed to sync remote CGW info from REDIS",
));
}

let dst_cgw_id: i32 = match shard_id {
Some(dest_shard_id) => {
self.validate_infra_group_cgw_assignee(dest_shard_id)
.await?
}
None => self.get_infra_group_cgw_assignee().await?,
};

let mut con = self.redis_client.clone();
let res: RedisResult<()> = redis::cmd("HSET")
Expand Down Expand Up @@ -671,15 +690,20 @@ impl CGWRemoteDiscovery {
Ok(())
}

pub async fn create_infra_group(&self, g: &CGWDBInfrastructureGroup) -> Result<i32> {
pub async fn create_infra_group(
&self,
g: &CGWDBInfrastructureGroup,
dest_shard_id: Option<i32>,
) -> Result<i32> {
//TODO: transaction-based insert/assigned_group_num update (DB)
self.db_accessor.insert_new_infra_group(g).await?;

let shard_id: i32 = match self.assign_infra_group_to_cgw(g.id).await {
let shard_id: i32 = match self.assign_infra_group_to_cgw(g.id, dest_shard_id).await {
Ok(v) => v,
Err(_e) => {
Err(e) => {
error!("Assign group to CGW shard failed! Err: {}", e.to_string());
let _ = self.db_accessor.delete_infra_group(g.id).await;
return Err(Error::RemoteDiscovery("Assign group to CGW shard failed"));
return Err(e);
}
};

Expand Down Expand Up @@ -911,7 +935,7 @@ impl CGWRemoteDiscovery {
for i in groups.iter() {
let _ = self.sync_remote_cgw_map().await;
let _ = self.sync_gid_to_cgw_map().await;
match self.assign_infra_group_to_cgw(i.id).await {
match self.assign_infra_group_to_cgw(i.id, None).await {
Ok(shard_id) => {
debug!("Rebalancing: assigned {} to shard {}", i.id, shard_id);
let _ = self.increment_cgw_assigned_groups_num(shard_id).await;
Expand Down
1 change: 0 additions & 1 deletion utils/kafka_producer/data/message_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"type": "infrastructure_group_create",
"infra_group_id": "key",
"infra_name": "name",
"infra_shard_id": 0,
"uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff"
},
"del_group": {
Expand Down

0 comments on commit 800bfd9

Please sign in to comment.