From 6d2999298e477ad99b568cffa8d88fcb683b7f04 Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Mon, 16 Dec 2024 12:24:22 +0200 Subject: [PATCH] Added separate redis client to work with infras cache! --- src/cgw_connection_server.rs | 2 +- src/cgw_remote_discovery.rs | 124 ++++++++++++++++++----------------- 2 files changed, 65 insertions(+), 61 deletions(-) diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index 9442ecc..24b5ddd 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -1901,7 +1901,7 @@ impl CGWConnectionServer { self: Arc, socket: TcpStream, tls_acceptor: tokio_rustls::TlsAcceptor, - addr: SocketAddr + addr: SocketAddr, ) { // Only ACK connection. We will either drop it or accept it once processor starts // (we'll handle it via "mailbox" notify handle in process_internal_mbox) diff --git a/src/cgw_remote_discovery.rs b/src/cgw_remote_discovery.rs index 8d4fe60..000febf 100644 --- a/src/cgw_remote_discovery.rs +++ b/src/cgw_remote_discovery.rs @@ -44,7 +44,6 @@ static REDIS_KEY_GID_VALUE_SHARD_ID: &str = "shard_id"; static REDIS_KEY_GID_VALUE_INFRAS_CAPACITY: &str = "infras_capacity"; static REDIS_KEY_GID_VALUE_INFRAS_ASSIGNED: &str = "infras_assigned"; -const CGW_REDIS_DEFAULT_DB: u32 = 0; const CGW_REDIS_DEVICES_CACHE_DB: u32 = 1; #[derive(Clone, Debug, Default, PartialEq)] @@ -152,6 +151,7 @@ pub struct CGWRemoteIface { pub struct CGWRemoteDiscovery { db_accessor: Arc, redis_client: MultiplexedConnection, + redis_infra_cache_client: MultiplexedConnection, gid_to_cgw_cache: Arc>>, remote_cgws_map: Arc>>, local_shard_id: i32, @@ -236,6 +236,58 @@ impl CGWRemoteDiscovery { } }; + /* Start Redis Infra Cache Client */ + let redis_infra_cache_client = match cgw_create_redis_client(&app_args.redis_args).await { + Ok(c) => c, + Err(e) => { + error!( + "Can't create CGW Remote Discovery client! Redis infra cache client create failed! Error: {e}" + ); + return Err(Error::RemoteDiscovery( + "Redis infra cache client create failed", + )); + } + }; + + let mut redis_infra_cache_client = match redis_infra_cache_client + .get_multiplexed_tokio_connection_with_response_timeouts( + Duration::from_secs(1), + Duration::from_secs(5), + ) + .await + { + Ok(conn) => conn, + Err(e) => { + error!( + "Can't create CGW Remote Discovery client! Get Redis infra cache async connection failed! Error: {e}" + ); + return Err(Error::RemoteDiscovery( + "Redis infra cache client create failed", + )); + } + }; + + let res: RedisResult<()> = redis::cmd("SELECT") + .arg(CGW_REDIS_DEVICES_CACHE_DB.to_string()) + .query_async(&mut redis_infra_cache_client) + .await; + match res { + Ok(_) => debug!( + "Switched Redis infra cache client to Redis Database {CGW_REDIS_DEVICES_CACHE_DB}" + ), + Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + warn!( + "Failed to switch to Redis Database {CGW_REDIS_DEVICES_CACHE_DB}! Error: {e}" + ); + return Err(Error::RemoteDiscovery("Failed to switch Redis Database")); + } + }; + + /* End Redis Infra Cache Client */ + let db_accessor = match CGWDBAccessor::new(&app_args.db_args).await { Ok(c) => c, Err(e) => { @@ -249,6 +301,7 @@ impl CGWRemoteDiscovery { let rc = CGWRemoteDiscovery { db_accessor: Arc::new(db_accessor), redis_client, + redis_infra_cache_client, gid_to_cgw_cache: Arc::new(RwLock::new(HashMap::new())), local_shard_id: app_args.cgw_id, remote_cgws_map: Arc::new(RwLock::new(HashMap::new())), @@ -1295,29 +1348,12 @@ impl CGWRemoteDiscovery { Ok(infras_assigned) } - async fn switch_database(&self, database_id: u32) -> Result<()> { - let mut con = self.redis_client.clone(); - - let res: RedisResult<()> = redis::cmd("SELECT") - .arg(database_id.to_string()) - .query_async(&mut con) - .await; - match res { - Ok(_) => debug!("Switched to Redis Database {database_id}"), - Err(e) => { - if e.is_io_error() { - Self::set_redis_health_state_not_ready(e.to_string()).await; - } - warn!("Failed to switch to Redis Database {database_id}! Error: {e}"); - return Err(Error::RemoteDiscovery("Failed to switch Redis Database")); - } - }; - - Ok(()) - } - - async fn add_device_to_redis(&self, device_mac: &MacAddress, device_json: &str) -> Result<()> { - let mut con = self.redis_client.clone(); + pub async fn add_device_to_redis_cache( + &self, + device_mac: &MacAddress, + device_json: &str, + ) -> Result<()> { + let mut con = self.redis_infra_cache_client.clone(); let key = format!("shard_id_{}|{}", self.local_shard_id, device_mac); let res: RedisResult<()> = redis::cmd("SET") @@ -1342,8 +1378,8 @@ impl CGWRemoteDiscovery { Ok(()) } - async fn del_device_from_redis(&self, device_mac: &MacAddress) -> Result<()> { - let mut con = self.redis_client.clone(); + pub async fn del_device_from_redis_cache(&self, device_mac: &MacAddress) -> Result<()> { + let mut con = self.redis_infra_cache_client.clone(); let key = format!("shard_id_{}|{}", self.local_shard_id, device_mac); let res: RedisResult<()> = redis::cmd("DEL").arg(&key).query_async(&mut con).await; @@ -1367,30 +1403,6 @@ impl CGWRemoteDiscovery { Ok(()) } - pub async fn add_device_to_redis_cache( - &self, - device_mac: &MacAddress, - device_json: &str, - ) -> Result<()> { - self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?; - - self.add_device_to_redis(device_mac, device_json).await?; - - self.switch_database(CGW_REDIS_DEFAULT_DB).await?; - - Ok(()) - } - - pub async fn del_device_from_redis_cache(&self, device_mac: &MacAddress) -> Result<()> { - self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?; - - self.del_device_from_redis(device_mac).await?; - - self.switch_database(CGW_REDIS_DEFAULT_DB).await?; - - Ok(()) - } - pub async fn sync_devices_cache_with_redis( &self, cache: Arc>, @@ -1399,9 +1411,7 @@ impl CGWRemoteDiscovery { let mut devices_cache = cache.write().await; devices_cache.flush_all(); - self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?; - - let mut con = self.redis_client.clone(); + let mut con = self.redis_infra_cache_client.clone(); let key = format!("shard_id_{}|*", self.local_shard_id); let redis_keys: Vec = match redis::cmd("KEYS").arg(&key).query_async(&mut con).await { @@ -1483,16 +1493,12 @@ impl CGWRemoteDiscovery { }; } - self.switch_database(CGW_REDIS_DEFAULT_DB).await?; - Ok(()) } pub async fn sync_devices_cache(&self) -> Result<()> { if let Some(infras_list) = self.db_accessor.get_all_infras().await { - self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?; - - let mut con = self.redis_client.clone(); + let mut con = self.redis_infra_cache_client.clone(); let mut redis_keys: Vec = match redis::cmd("KEYS") .arg(&format!("shard_id_{}|*", self.local_shard_id)) .query_async(&mut con) @@ -1529,8 +1535,6 @@ impl CGWRemoteDiscovery { ); } } - - self.switch_database(CGW_REDIS_DEFAULT_DB).await?; } Ok(())