Skip to content

Commit

Permalink
Handle group infras capacity overflow
Browse files Browse the repository at this point in the history
  • Loading branch information
SviatoslavBoichuk committed Aug 22, 2024
1 parent 546e8fc commit 43b30e2
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 9 deletions.
8 changes: 6 additions & 2 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,11 @@ impl CGWConnectionServer {
});

let server_clone = server.clone();
let ifras_capacity = app_args.cgw_group_infras_capacity;
server.mbox_nb_api_runtime_handle.spawn(async move {
server_clone.process_internal_nb_api_mbox(nb_api_rx).await;
server_clone
.process_internal_nb_api_mbox(nb_api_rx, ifras_capacity)
.await;
});

server.queue_timeout_handle.spawn(async move {
Expand Down Expand Up @@ -584,6 +587,7 @@ impl CGWConnectionServer {
async fn process_internal_nb_api_mbox(
self: Arc<Self>,
mut rx_mbox: CGWConnectionServerNBAPIMboxRx,
infras_capacity: i32,
) {
debug!("process_nb_api_mbox entry");

Expand Down Expand Up @@ -691,7 +695,7 @@ impl CGWConnectionServer {
// DB stuff - create group for remote shards to be aware of change
let group = CGWDBInfrastructureGroup {
id: gid,
reserved_size: 1000i32,
reserved_size: infras_capacity,
actual_size: 0i32,
};
match self.cgw_remote_discovery.create_infra_group(&group).await {
Expand Down
178 changes: 171 additions & 7 deletions src/cgw_remote_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ static REDIS_KEY_SHARD_VALUE_ASSIGNED_G_NUM: &str = "assigned_groups_num";
static REDIS_KEY_GID: &str = "group_id_";
static REDIS_KEY_GID_VALUE_GID: &str = "gid";
static REDIS_KEY_GID_VALUE_SHARD_ID: &str = "shard_id";
static REDIS_KEY_GID_VALUE_INFRAS_CAPACITY: &str = "infras_capacity";
static REDIS_KEY_GID_VALUE_INFRAS_ASSIGNED: &str = "infras_assigned";

#[derive(Clone, Debug, Default, PartialEq)]
pub struct CGWREDISDBShard {
Expand Down Expand Up @@ -434,6 +436,13 @@ impl CGWRemoteDiscovery {
Default::default(),
),
);
CGWMetrics::get_ref()
.change_group_counter(
item.infra_group_id,
CGWMetricsCounterType::GroupInfrasAssignedNum,
CGWMetricsCounterOpType::Inc,
)
.await;
}
}
}
Expand Down Expand Up @@ -525,11 +534,11 @@ impl CGWRemoteDiscovery {
.await;
if res.is_err() {
error!(
"Failed to increment assigned group number:\n{}",
"Failed to increment assigned groups number:\n{}",
res.err().unwrap()
);
return Err(Error::RemoteDiscovery(
"Failed to increment assigned group number",
"Failed to increment assigned groups number",
));
}

Expand All @@ -554,7 +563,7 @@ impl CGWRemoteDiscovery {
.await;
if res.is_err() {
error!(
"Failed to decrement assigned group number:\n{}",
"Failed to decrement assigned groups number:\n{}",
res.err().unwrap()
);
return Err(Error::RemoteDiscovery(
Expand All @@ -572,6 +581,68 @@ impl CGWRemoteDiscovery {
Ok(())
}

async fn increment_group_assigned_infras_num(&self, gid: i32) -> Result<()> {
debug!("Incrementing assigned infras num group_id_{gid}");

let mut con = self.redis_client.clone();
let res: RedisResult<()> = redis::cmd("HINCRBY")
.arg(format!("{}{gid}", REDIS_KEY_GID))
.arg(REDIS_KEY_GID_VALUE_INFRAS_ASSIGNED)
.arg("1")
.query_async(&mut con)
.await;
if res.is_err() {
error!(
"Failed to increment assigned infras number:\n{}",
res.err().unwrap()
);
return Err(Error::RemoteDiscovery(
"Failed to increment assigned infras number",
));
}

CGWMetrics::get_ref()
.change_group_counter(
gid,
CGWMetricsCounterType::GroupInfrasAssignedNum,
CGWMetricsCounterOpType::Inc,
)
.await;

Ok(())
}

async fn decrement_group_assigned_infras_num(&self, gid: i32) -> Result<()> {
debug!("Decrementing assigned infras num group_id_{gid}");

let mut con = self.redis_client.clone();
let res: RedisResult<()> = redis::cmd("HINCRBY")
.arg(format!("{}{gid}", REDIS_KEY_GID))
.arg(REDIS_KEY_GID_VALUE_INFRAS_ASSIGNED)
.arg("-1")
.query_async(&mut con)
.await;
if res.is_err() {
error!(
"Failed to decrement assigned infras number:\n{}",
res.err().unwrap()
);
return Err(Error::RemoteDiscovery(
"Failed to decrement assigned infras number",
));
}

CGWMetrics::get_ref()
.change_group_counter(
gid,
CGWMetricsCounterType::GroupInfrasAssignedNum,
CGWMetricsCounterOpType::Dec,
)
.await;

Ok(())
}

async fn get_infra_group_cgw_assignee(&self) -> Result<i32> {
let lock = self.remote_cgws_map.read().await;
let mut hash_vec: Vec<(&i32, &CGWRemoteIface)> = lock.iter().collect();
Expand Down Expand Up @@ -611,7 +682,12 @@ impl CGWRemoteDiscovery {
))
}

async fn assign_infra_group_to_cgw(&self, gid: i32) -> Result<i32> {
async fn assign_infra_group_to_cgw(
&self,
gid: i32,
infras_capacity: i32,
infras_assigned: i32,
) -> Result<i32> {
// Delete key (if exists), recreate with new owner
let _ = self.deassign_infra_group_to_cgw(gid).await;

Expand All @@ -624,6 +700,10 @@ impl CGWRemoteDiscovery {
.arg(gid.to_string())
.arg(REDIS_KEY_GID_VALUE_SHARD_ID)
.arg(dst_cgw_id.to_string())
.arg(REDIS_KEY_GID_VALUE_INFRAS_CAPACITY)
.arg(infras_capacity.to_string())
.arg(REDIS_KEY_GID_VALUE_INFRAS_ASSIGNED)
.arg(infras_assigned.to_string())
.query_async(&mut con)
.await;

Expand Down Expand Up @@ -675,7 +755,10 @@ impl CGWRemoteDiscovery {
//TODO: transaction-based insert/assigned_group_num update (DB)
self.db_accessor.insert_new_infra_group(g).await?;

let shard_id: i32 = match self.assign_infra_group_to_cgw(g.id).await {
let shard_id: i32 = match self
.assign_infra_group_to_cgw(g.id, g.reserved_size, g.actual_size)
.await
{
Ok(v) => v,
Err(_e) => {
let _ = self.db_accessor.delete_infra_group(g.id).await;
Expand Down Expand Up @@ -734,6 +817,28 @@ impl CGWRemoteDiscovery {
// TODO: assign list to shards; currently - only created bulk, no assignment
let mut futures = Vec::with_capacity(infras.len());
// Results store vec of MACs we failed to add

let infras_capacity = match self.get_group_infras_capacity(gid).await {
Ok(capacity) => capacity,
Err(e) => {
error!("Failed to create infreas list: {}", e.to_string());
return Err(Error::RemoteDiscoveryFailedInfras(infras));
}
};

let infras_assigned = match self.get_group_infras_assigned_num(gid).await {
Ok(assigned) => assigned,
Err(e) => {
error!("Failed to create infreas list: {}", e.to_string());
return Err(Error::RemoteDiscoveryFailedInfras(infras));
}
};

if infras.len() as i32 + infras_assigned > infras_capacity {
error!("Failed to create infras list - GID {gid} has no enough capacity");
return Err(Error::RemoteDiscoveryFailedInfras(infras));
}

let mut failed_infras: Vec<MacAddress> = Vec::with_capacity(futures.len());
for x in infras.iter() {
let db_accessor_clone = self.db_accessor.clone();
Expand Down Expand Up @@ -775,6 +880,9 @@ impl CGWRemoteDiscovery {
),
);
}

// Update assigned infras num
let _ = self.increment_group_assigned_infras_num(gid).await;
}
}
Err(_) => {
Expand All @@ -792,7 +900,7 @@ impl CGWRemoteDiscovery {

pub async fn destroy_ifras_list(
&self,
_gid: i32,
gid: i32,
infras: Vec<MacAddress>,
cache: Arc<RwLock<CGWDevicesCache>>,
) -> Result<()> {
Expand Down Expand Up @@ -828,6 +936,8 @@ impl CGWRemoteDiscovery {
devices_cache.del_device(&device_mac);
}
}
// Update assigned infras num
let _ = self.decrement_group_assigned_infras_num(gid).await;
}
}
Err(_) => {
Expand Down Expand Up @@ -913,7 +1023,21 @@ impl CGWRemoteDiscovery {
for i in groups.iter() {
let _ = self.sync_remote_cgw_map().await;
let _ = self.sync_gid_to_cgw_map().await;
match self.assign_infra_group_to_cgw(i.id).await {

let infras_assigned: i32 = match self.get_group_infras_assigned_num(i.id).await {
Ok(infras_num) => infras_num,
Err(e) => {
warn!("Cannot execute rebalancing: {}", e.to_string());
return Err(Error::RemoteDiscovery(
"Cannot do rebalancing due to absence of any groups created in DB",
));
}
};

match self
.assign_infra_group_to_cgw(i.id, i.reserved_size, infras_assigned)
.await
{
Ok(shard_id) => {
debug!("Rebalancing: assigned {} to shard {}", i.id, shard_id);
let _ = self.increment_cgw_assigned_groups_num(shard_id).await;
Expand All @@ -940,4 +1064,44 @@ impl CGWRemoteDiscovery {
.query_async(&mut con)
.await;
}

pub async fn get_group_infras_capacity(&self, gid: i32) -> Result<i32> {
let mut con = self.redis_client.clone();

let capacity: i32 = match redis::cmd("HGET")
.arg(format!("{}{gid}", REDIS_KEY_GID))
.arg(REDIS_KEY_GID_VALUE_INFRAS_CAPACITY)
.query_async(&mut con)
.await
{
Ok(cap) => cap,
Err(e) => {
warn!("Failed to get infras capacity for GID {gid}:\n{e}");
return Err(Error::RemoteDiscovery("Failed to get infras capacity"));
}
};

Ok(capacity)
}

pub async fn get_group_infras_assigned_num(&self, gid: i32) -> Result<i32> {
let mut con = self.redis_client.clone();

let infras_assigned: i32 = match redis::cmd("HGET")
.arg(format!("{}{gid}", REDIS_KEY_GID))
.arg(REDIS_KEY_GID_VALUE_INFRAS_ASSIGNED)
.query_async(&mut con)
.await
{
Ok(cap) => cap,
Err(e) => {
warn!("Failed to get infras assigned number for GID {gid}:\n{e}");
return Err(Error::RemoteDiscovery(
"Failed to get group infras assigned number",
));
}
};

Ok(infras_assigned)
}
}

0 comments on commit 43b30e2

Please sign in to comment.