Skip to content

Commit

Permalink
feat: implement game cache and add client trait
Browse files Browse the repository at this point in the history
  • Loading branch information
hsnks100 committed Dec 7, 2024
1 parent 2109165 commit 085424f
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 53 deletions.
137 changes: 87 additions & 50 deletions src/game_cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;

const CACHE_SIZE: usize = 256;

/// Represents a cache for storing game data packets.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct GameCache {
/// A deque to hold cached game data.
cache: VecDeque<Vec<u8>>,
Expand Down Expand Up @@ -43,34 +44,44 @@ impl GameCache {
}
}

/// Represents a connected client.
#[derive(Debug)]
pub struct Client {
pub id: u32,
/// Cache for received data (client's send cache).
receive_cache: GameCache,
/// Queue of pending inputs.
pending_inputs: VecDeque<Vec<u8>>,
}
/// Trait representing a connected client.
pub trait ClientTrait: Debug {
fn id(&self) -> u32;

impl Client {
/// Creates a new client with empty caches.
pub fn new(id: u32) -> Self {
Self {
id,
receive_cache: GameCache::new(),
pending_inputs: VecDeque::new(),
}
}
/// Gets the receive cache of the client.
fn get_receive_cache(&mut self) -> &mut GameCache;

/// Adds an input to the pending inputs queue.
pub fn add_input(&mut self, data: Vec<u8>) {
self.pending_inputs.push_back(data);
}
fn add_input(&mut self, data: Vec<u8>);

/// Checks if the client has pending inputs.
fn has_pending_input(&self) -> bool;

/// Retrieves and removes the next input from the pending queue.
pub fn get_next_input(&mut self) -> Option<Vec<u8>> {
self.pending_inputs.pop_front()
fn get_next_input(&mut self) -> Option<Vec<u8>>;

/// Handles incoming data from the client.
/// Stores the data in the client's pending inputs.
fn handle_incoming(&mut self, data: Vec<u8>) {
let actual_data = if data.len() == 1 {
let position = data[0];
if let Some(cached_data) = self.get_receive_cache().get(position) {
cached_data.clone()
} else {
panic!(
"Invalid cache position {} received from client {}",
position,
self.id()
);
}
} else {
// Add data to client's receive cache.
let cache = self.get_receive_cache();
cache.add(data.clone());
data
};

self.add_input(actual_data);
}
}

Expand All @@ -88,7 +99,7 @@ pub struct FrameResult {
#[derive(Debug)]
pub struct GameDataProcessor {
/// Map of connected clients.
clients: HashMap<u32, Client>,
clients: HashMap<u32, Box<dyn ClientTrait>>,
/// Global cache for aggregated game data.
aggregated_cache: GameCache,
/// Current frame index.
Expand All @@ -109,31 +120,15 @@ impl GameDataProcessor {
}

/// Adds a new client to the processor.
pub fn add_client(&mut self, client_id: u32) {
self.clients.insert(client_id, Client::new(client_id));
fn add_client(&mut self, client_id: u32, client: Box<dyn ClientTrait>) {
self.clients.insert(client_id, client);
}

/// Processes incoming game data from a client.
/// Stores the data in the client's pending inputs.
pub fn process_incoming(&mut self, client_id: u32, data: Vec<u8>) {
fn process_incoming(&mut self, client_id: u32, data: Vec<u8>) {
let client = self.clients.get_mut(&client_id).expect("Client not found");

let actual_data = if data.len() == 1 {
let position = data[0];
if let Some(cached_data) = client.receive_cache.get(position) {
cached_data.clone()
} else {
panic!(
"Invalid cache position {} received from client {}",
position, client_id
);
}
} else {
client.receive_cache.add(data.clone());
data
};

client.add_input(actual_data);
client.handle_incoming(data);
}

/// Processes a frame if inputs from all clients are available.
Expand All @@ -144,7 +139,7 @@ impl GameDataProcessor {
let all_clients_have_input = self
.clients
.values()
.all(|client| !client.pending_inputs.is_empty());
.all(|client| client.has_pending_input());

if !all_clients_have_input {
return None;
Expand Down Expand Up @@ -188,20 +183,62 @@ impl GameDataProcessor {
mod tests {
use super::*;

/// Mock client for testing purposes.
#[derive(Debug)]
struct MockClient {
id: u32,
pending_inputs: VecDeque<Vec<u8>>,
receive_cache: crate::game_cache::GameCache,
}

impl MockClient {
fn new(id: u32) -> Self {
Self {
id,
pending_inputs: VecDeque::new(),
receive_cache: crate::game_cache::GameCache::new(),
}
}
}

impl ClientTrait for MockClient {
fn id(&self) -> u32 {
self.id
}

fn get_receive_cache(&mut self) -> &mut GameCache {
&mut self.receive_cache
}

fn add_input(&mut self, data: Vec<u8>) {
self.pending_inputs.push_back(data);
}

fn has_pending_input(&self) -> bool {
!self.pending_inputs.is_empty()
}

fn get_next_input(&mut self) -> Option<Vec<u8>> {
self.pending_inputs.pop_front()
}
}

#[test]
fn test_game_data_processing_async() {
let mut processor = GameDataProcessor::new();

// Simulate two clients connecting.
processor.add_client(1); // Client ID 1
processor.add_client(2); // Client ID 2
processor.add_client(1, Box::new(MockClient::new(1))); // Client ID 1
processor.add_client(2, Box::new(MockClient::new(2))); // Client ID 2

// Expected cache usage and positions per frame.
let expected_results = [(false, 0), // Frame 1: New data, cache position 0
let expected_results = [
(false, 0), // Frame 1: New data, cache position 0
(true, 0), // Frame 2: Using cache position 0
(false, 1), // Frame 3: New data, cache position 1
(false, 2), // Frame 4: New data, cache position 2
(true, 1)];
(true, 1),
];

// Test inputs from clients at different times.
let inputs = vec![
Expand Down
46 changes: 44 additions & 2 deletions src/handlers/data.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::{collections::HashSet, time::Instant};
use std::{
collections::{HashSet, VecDeque},
time::Instant,
};

type PlayerStatus = u8;
pub const PLAYER_STATUS_PLAYING: PlayerStatus = 0;
pub const PLAYER_STATUS_IDLE: PlayerStatus = 1;
pub const PLAYER_STATUS_NET_SYNC: PlayerStatus = 2;
// ClientInfo and GameInfo structs need to be accessible in both files
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ClientInfo {
pub username: String,
pub emulator_name: String,
Expand All @@ -16,6 +19,45 @@ pub struct ClientInfo {
pub game_id: Option<u32>,
pub last_ping_time: Option<Instant>,
pub ack_count: u16,
//////////////////
/// Cache for received data (client's send cache).
pub receive_cache: crate::game_cache::GameCache,
/// Queue of pending inputs.
pub pending_inputs: VecDeque<Vec<u8>>,
}

impl ClientInfo {
/// Adds an input to the pending inputs queue.
fn add_input(&mut self, data: Vec<u8>) {
self.pending_inputs.push_back(data);
}

/// Retrieves and removes the next input from the pending queue.
fn get_next_input(&mut self) -> Option<Vec<u8>> {
self.pending_inputs.pop_front()
}
}

impl crate::game_cache::ClientTrait for ClientInfo {
fn id(&self) -> u32 {
self.user_id as u32
}

fn get_receive_cache(&mut self) -> &mut crate::game_cache::GameCache {
&mut self.receive_cache
}

fn add_input(&mut self, data: Vec<u8>) {
self.pending_inputs.push_back(data);
}

fn has_pending_input(&self) -> bool {
!self.pending_inputs.is_empty()
}

fn get_next_input(&mut self) -> Option<Vec<u8>> {
self.get_next_input()
}
}

#[derive(Clone)]
Expand Down
10 changes: 10 additions & 0 deletions src/handlers/handlerf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub async fn handle_message(
"[{}] Received Game Cache",
Local::now().format("%Y-%m-%d %H:%M:%S%.3f")
);
//
// handle_game_cache(message, src, tx, clients, games, packet_history).await?
}
0x15 => {
handle_ready_to_play_signal(message, src, tx, clients, games, packet_history).await?
Expand Down Expand Up @@ -333,3 +335,11 @@ pub async fn handle_game_data(
}
Ok(())
}

pub async fn handle_game_cache(
message: kaillera::protocol::ParsedMessage,
src: &std::net::SocketAddr,
tx: mpsc::Sender<Message>,
) -> Result<(), Box<dyn Error>> {
Ok(())
}
4 changes: 3 additions & 1 deletion src/handlers/user_login.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::{Buf, BufMut, BytesMut};
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::error::Error;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
Expand Down Expand Up @@ -54,6 +54,8 @@ pub async fn handle_user_login(
game_id: None,
last_ping_time: None,
ack_count: 0,
receive_cache: game_cache::GameCache::new(),
pending_inputs: VecDeque::new(),
},
);
drop(clients_lock);
Expand Down

0 comments on commit 085424f

Please sign in to comment.