Skip to content

Commit

Permalink
Added separate redis client to work with infras cache!
Browse files Browse the repository at this point in the history
  • Loading branch information
SviatoslavBoichuk committed Dec 16, 2024
1 parent 66625ee commit 6d29992
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 61 deletions.
2 changes: 1 addition & 1 deletion src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1901,7 +1901,7 @@ impl CGWConnectionServer {
self: Arc<Self>,
socket: TcpStream,
tls_acceptor: tokio_rustls::TlsAcceptor,
addr: SocketAddr
addr: SocketAddr,
) {
// Only ACK connection. We will either drop it or accept it once processor starts
// (we'll handle it via "mailbox" notify handle in process_internal_mbox)
Expand Down
124 changes: 64 additions & 60 deletions src/cgw_remote_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ static REDIS_KEY_GID_VALUE_SHARD_ID: &str = "shard_id";
static REDIS_KEY_GID_VALUE_INFRAS_CAPACITY: &str = "infras_capacity";
static REDIS_KEY_GID_VALUE_INFRAS_ASSIGNED: &str = "infras_assigned";

const CGW_REDIS_DEFAULT_DB: u32 = 0;
const CGW_REDIS_DEVICES_CACHE_DB: u32 = 1;

#[derive(Clone, Debug, Default, PartialEq)]
Expand Down Expand Up @@ -152,6 +151,7 @@ pub struct CGWRemoteIface {
pub struct CGWRemoteDiscovery {
db_accessor: Arc<CGWDBAccessor>,
redis_client: MultiplexedConnection,
redis_infra_cache_client: MultiplexedConnection,
gid_to_cgw_cache: Arc<RwLock<HashMap<i32, i32>>>,
remote_cgws_map: Arc<RwLock<HashMap<i32, CGWRemoteIface>>>,
local_shard_id: i32,
Expand Down Expand Up @@ -236,6 +236,58 @@ impl CGWRemoteDiscovery {
}
};

/* Start Redis Infra Cache Client */
let redis_infra_cache_client = match cgw_create_redis_client(&app_args.redis_args).await {
Ok(c) => c,
Err(e) => {
error!(
"Can't create CGW Remote Discovery client! Redis infra cache client create failed! Error: {e}"
);
return Err(Error::RemoteDiscovery(
"Redis infra cache client create failed",
));
}
};

let mut redis_infra_cache_client = match redis_infra_cache_client
.get_multiplexed_tokio_connection_with_response_timeouts(
Duration::from_secs(1),
Duration::from_secs(5),
)
.await
{
Ok(conn) => conn,
Err(e) => {
error!(
"Can't create CGW Remote Discovery client! Get Redis infra cache async connection failed! Error: {e}"
);
return Err(Error::RemoteDiscovery(
"Redis infra cache client create failed",
));
}
};

let res: RedisResult<()> = redis::cmd("SELECT")
.arg(CGW_REDIS_DEVICES_CACHE_DB.to_string())
.query_async(&mut redis_infra_cache_client)
.await;
match res {
Ok(_) => debug!(
"Switched Redis infra cache client to Redis Database {CGW_REDIS_DEVICES_CACHE_DB}"
),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!(
"Failed to switch to Redis Database {CGW_REDIS_DEVICES_CACHE_DB}! Error: {e}"
);
return Err(Error::RemoteDiscovery("Failed to switch Redis Database"));
}
};

/* End Redis Infra Cache Client */

let db_accessor = match CGWDBAccessor::new(&app_args.db_args).await {
Ok(c) => c,
Err(e) => {
Expand All @@ -249,6 +301,7 @@ impl CGWRemoteDiscovery {
let rc = CGWRemoteDiscovery {
db_accessor: Arc::new(db_accessor),
redis_client,
redis_infra_cache_client,
gid_to_cgw_cache: Arc::new(RwLock::new(HashMap::new())),
local_shard_id: app_args.cgw_id,
remote_cgws_map: Arc::new(RwLock::new(HashMap::new())),
Expand Down Expand Up @@ -1295,29 +1348,12 @@ impl CGWRemoteDiscovery {
Ok(infras_assigned)
}

async fn switch_database(&self, database_id: u32) -> Result<()> {
let mut con = self.redis_client.clone();

let res: RedisResult<()> = redis::cmd("SELECT")
.arg(database_id.to_string())
.query_async(&mut con)
.await;
match res {
Ok(_) => debug!("Switched to Redis Database {database_id}"),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to switch to Redis Database {database_id}! Error: {e}");
return Err(Error::RemoteDiscovery("Failed to switch Redis Database"));
}
};

Ok(())
}

async fn add_device_to_redis(&self, device_mac: &MacAddress, device_json: &str) -> Result<()> {
let mut con = self.redis_client.clone();
pub async fn add_device_to_redis_cache(
&self,
device_mac: &MacAddress,
device_json: &str,
) -> Result<()> {
let mut con = self.redis_infra_cache_client.clone();

let key = format!("shard_id_{}|{}", self.local_shard_id, device_mac);
let res: RedisResult<()> = redis::cmd("SET")
Expand All @@ -1342,8 +1378,8 @@ impl CGWRemoteDiscovery {
Ok(())
}

async fn del_device_from_redis(&self, device_mac: &MacAddress) -> Result<()> {
let mut con = self.redis_client.clone();
pub async fn del_device_from_redis_cache(&self, device_mac: &MacAddress) -> Result<()> {
let mut con = self.redis_infra_cache_client.clone();

let key = format!("shard_id_{}|{}", self.local_shard_id, device_mac);
let res: RedisResult<()> = redis::cmd("DEL").arg(&key).query_async(&mut con).await;
Expand All @@ -1367,30 +1403,6 @@ impl CGWRemoteDiscovery {
Ok(())
}

pub async fn add_device_to_redis_cache(
&self,
device_mac: &MacAddress,
device_json: &str,
) -> Result<()> {
self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?;

self.add_device_to_redis(device_mac, device_json).await?;

self.switch_database(CGW_REDIS_DEFAULT_DB).await?;

Ok(())
}

pub async fn del_device_from_redis_cache(&self, device_mac: &MacAddress) -> Result<()> {
self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?;

self.del_device_from_redis(device_mac).await?;

self.switch_database(CGW_REDIS_DEFAULT_DB).await?;

Ok(())
}

pub async fn sync_devices_cache_with_redis(
&self,
cache: Arc<RwLock<CGWDevicesCache>>,
Expand All @@ -1399,9 +1411,7 @@ impl CGWRemoteDiscovery {
let mut devices_cache = cache.write().await;
devices_cache.flush_all();

self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?;

let mut con = self.redis_client.clone();
let mut con = self.redis_infra_cache_client.clone();
let key = format!("shard_id_{}|*", self.local_shard_id);
let redis_keys: Vec<String> = match redis::cmd("KEYS").arg(&key).query_async(&mut con).await
{
Expand Down Expand Up @@ -1483,16 +1493,12 @@ impl CGWRemoteDiscovery {
};
}

self.switch_database(CGW_REDIS_DEFAULT_DB).await?;

Ok(())
}

pub async fn sync_devices_cache(&self) -> Result<()> {
if let Some(infras_list) = self.db_accessor.get_all_infras().await {
self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?;

let mut con = self.redis_client.clone();
let mut con = self.redis_infra_cache_client.clone();
let mut redis_keys: Vec<String> = match redis::cmd("KEYS")
.arg(&format!("shard_id_{}|*", self.local_shard_id))
.query_async(&mut con)
Expand Down Expand Up @@ -1529,8 +1535,6 @@ impl CGWRemoteDiscovery {
);
}
}

self.switch_database(CGW_REDIS_DEFAULT_DB).await?;
}

Ok(())
Expand Down

0 comments on commit 6d29992

Please sign in to comment.