Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Redis Health state once detected broken connectiuon with server #96

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 110 additions & 49 deletions src/cgw_remote_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,20 +307,23 @@ impl CGWRemoteDiscovery {
.arg(format!("{REDIS_KEY_SHARD_ID_PREFIX}{}", app_args.cgw_id))
.query_async(&mut con)
.await;
if res.is_err() {
warn!(
"Failed to destroy record about shard in REDIS! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to destroy record about shard in REDIS! Error: {e}");
}

let res: RedisResult<()> = redis::cmd("HSET")
.arg(format!("{REDIS_KEY_SHARD_ID_PREFIX}{}", app_args.cgw_id))
.arg(redis_req_data.to_redis_args())
.query_async(&mut con)
.await;
if res.is_err() {
error!("Can't create CGW Remote Discovery client! Failed to create record about shard in REDIS! Error: {}", res.err().unwrap());
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Can't create CGW Remote Discovery client! Failed to create record about shard in REDIS! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to create record about shard in REDIS",
));
Expand Down Expand Up @@ -385,6 +388,9 @@ impl CGWRemoteDiscovery {
.await
{
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to sync gid to cgw map! Error: {e}");
return Err(Error::RemoteDiscovery("Failed to get KEYS list from REDIS"));
}
Expand All @@ -400,6 +406,9 @@ impl CGWRemoteDiscovery {
{
Ok(gid) => gid,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Found proper key '{key}' entry, but failed to fetch GID from it! Error: {e}");
continue;
}
Expand All @@ -413,6 +422,9 @@ impl CGWRemoteDiscovery {
{
Ok(shard_id) => shard_id,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Found proper key '{key}' entry, but failed to fetch SHARD_ID from it! Error: {e}");
continue;
}
Expand Down Expand Up @@ -457,6 +469,9 @@ impl CGWRemoteDiscovery {
{
Ok(keys) => keys,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Can't sync remote CGW map! Failed to get shard record in REDIS! Error: {e}"
);
Expand Down Expand Up @@ -487,6 +502,9 @@ impl CGWRemoteDiscovery {
lock.insert(cgw_iface.shard.id, cgw_iface);
}
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Found proper key '{key}' entry, but failed to fetch Shard info from it! Error: {e}");
continue;
}
Expand Down Expand Up @@ -529,11 +547,11 @@ impl CGWRemoteDiscovery {
.arg("1")
.query_async(&mut con)
.await;
if res.is_err() {
error!(
"Failed to increment assigned groups number! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to increment assigned groups number! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to increment assigned groups number",
));
Expand All @@ -558,11 +576,11 @@ impl CGWRemoteDiscovery {
.arg("-1")
.query_async(&mut con)
.await;
if res.is_err() {
error!(
"Failed to decrement assigned groups number! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to decrement assigned groups number! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to decrement assigned groups number",
));
Expand Down Expand Up @@ -592,11 +610,11 @@ impl CGWRemoteDiscovery {
.arg(&incremet_value.to_string())
.query_async(&mut con)
.await;
if res.is_err() {
error!(
"Failed to increment assigned infras number! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to increment assigned infras number! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to increment assigned infras number",
));
Expand Down Expand Up @@ -627,11 +645,11 @@ impl CGWRemoteDiscovery {
.arg(&(-decremet_value).to_string())
.query_async(&mut con)
.await;
if res.is_err() {
error!(
"Failed to decrement assigned infras number! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to decrement assigned infras number! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to decrement assigned infras number",
));
Expand Down Expand Up @@ -734,13 +752,11 @@ impl CGWRemoteDiscovery {
.query_async(&mut con)
.await;

if res.is_err() {
error!(
"Failed to assign infra group {} to cgw {}! Error: {}",
gid,
dst_cgw_id,
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to assign infra group {gid} to cgw {dst_cgw_id}! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to assign infra group to cgw",
));
Expand All @@ -760,12 +776,11 @@ impl CGWRemoteDiscovery {
.query_async(&mut con)
.await;

if res.is_err() {
error!(
"Failed to deassign infra group {}! Error: {}",
gid,
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to deassign infra group {gid}! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to deassign infra group to cgw",
));
Expand Down Expand Up @@ -1143,11 +1158,11 @@ impl CGWRemoteDiscovery {
.arg("0")
.query_async(&mut con)
.await;
if res.is_err() {
warn!(
"Failed to reset CGW{cgw_id} assigned group num count! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to reset CGW{cgw_id} assigned group num count! Error: {e}");
}
}

Expand Down Expand Up @@ -1213,10 +1228,15 @@ impl CGWRemoteDiscovery {
"Successfully cleaned up Redis for shard id {}",
self.local_shard_id
),
Err(e) => error!(
"Failed to cleanup Redis for shard id {}! Error: {}",
self.local_shard_id, e
),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Failed to cleanup Redis for shard id {}! Error: {}",
self.local_shard_id, e
);
}
}
}

Expand All @@ -1231,6 +1251,9 @@ impl CGWRemoteDiscovery {
{
Ok(cap) => cap,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to get infras capacity for GID {gid}! Ereor: {e}");
return Err(Error::RemoteDiscovery("Failed to get infras capacity"));
}
Expand All @@ -1250,6 +1273,9 @@ impl CGWRemoteDiscovery {
{
Ok(cap) => cap,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to get infras assigned number for GID {gid}! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to get group infras assigned number",
Expand All @@ -1270,6 +1296,9 @@ impl CGWRemoteDiscovery {
match res {
Ok(_) => debug!("Switched to Redis Database {database_id}"),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to switch to Redis Database {database_id}! Error: {e}");
return Err(Error::RemoteDiscovery("Failed to switch Redis Database"));
}
Expand All @@ -1291,6 +1320,9 @@ impl CGWRemoteDiscovery {
match res {
Ok(_) => debug!("Added device to Redis cache: {device_json}"),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to add device to Redis cache! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to add device to Redis cache",
Expand All @@ -1310,6 +1342,9 @@ impl CGWRemoteDiscovery {
match res {
Ok(_) => debug!("Removed device from Redis cache: {}", device_mac),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!(
"Failed to remove device {} from Redis cache! Error: {e}",
device_mac.to_hex_string()
Expand Down Expand Up @@ -1362,6 +1397,9 @@ impl CGWRemoteDiscovery {
let redis_keys: Vec<String> = match redis::cmd("KEYS").arg(&key).query_async(&mut con).await
{
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Failed to get devices cache from Redis for shard id {}, Error: {}",
self.local_shard_id, e
Expand All @@ -1377,6 +1415,9 @@ impl CGWRemoteDiscovery {
let device_str: String = match redis::cmd("GET").arg(&key).query_async(&mut con).await {
Ok(dev) => dev,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Failed to get devices cache from Redis for shard id {}, Error: {}",
self.local_shard_id, e
Expand Down Expand Up @@ -1449,6 +1490,9 @@ impl CGWRemoteDiscovery {
.await
{
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Failed to get devices cache from Redis for shard id {}, Error: {}",
self.local_shard_id, e
Expand Down Expand Up @@ -1498,6 +1542,9 @@ impl CGWRemoteDiscovery {
match res {
Ok(_) => debug!("Updated Redis timestamp: {timestamp}"),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed update Redis timestamp! Error: {e}");
return Err(Error::RemoteDiscovery("Failed update Redis timestamp"));
}
Expand All @@ -1516,6 +1563,9 @@ impl CGWRemoteDiscovery {
{
Ok(timestamp) => timestamp,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to get Redis last update timestamp! Error: {}", e);
return Err(Error::RemoteDiscovery(
"Failed to get Redis last update timestamp",
Expand All @@ -1525,4 +1575,15 @@ impl CGWRemoteDiscovery {

Ok(last_update_timestamp)
}

pub async fn set_redis_health_state_not_ready(error_message: String) {
tokio::spawn(async move {
CGWMetrics::get_ref()
.change_component_health_status(
CGWMetricsHealthComponent::RedisConnection,
CGWMetricsHealthComponentStatus::NotReady(error_message),
)
.await;
});
}
}