Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
fix: remove RATE_MS
Browse files Browse the repository at this point in the history
  • Loading branch information
simbleau committed Jan 13, 2024
1 parent fbf07eb commit a5210b4
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 273 deletions.
2 changes: 1 addition & 1 deletion bevy-silk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ mod systems;

pub use events::{ConnectionRequest, SilkClientEvent};
pub use plugin::SilkClientPlugin;
pub use router::AddNetworkMessageExt;
pub use router::AddProtocolExt;
pub use state::{SilkClientStatus, SilkState};
pub use system_params::{NetworkReader, NetworkWriter};
15 changes: 10 additions & 5 deletions bevy-silk/src/client/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
use super::{
systems, AddNetworkMessageExt, ConnectionRequest, SilkClientEvent,
systems, AddProtocolExt, ConnectionRequest, SilkClientEvent,
SilkClientStatus, SilkState,
};
use crate::{
events::SocketRecvEvent,
latency::LatencyTracerPayload,
socket::{common_socket_reader, SilkSocket},
};
use bevy::prelude::*;
use bevy::{prelude::*, time::common_conditions::on_timer};
use instant::Duration;

/// The socket client abstraction
/// A plugin to connect to a WebRTC server.
pub struct SilkClientPlugin;

impl Plugin for SilkClientPlugin {
fn build(&self, app: &mut App) {
app.add_event::<SocketRecvEvent>()
.insert_resource(SilkState::default())
.add_network_message::<LatencyTracerPayload>()
.add_bounded_protocol::<LatencyTracerPayload>(1)
.add_state::<SilkClientStatus>()
.add_event::<ConnectionRequest>()
.add_event::<SilkClientEvent>()
Expand Down Expand Up @@ -45,7 +46,11 @@ impl Plugin for SilkClientPlugin {
)
.add_systems(
Update,
(systems::read_latency_tracers, systems::send_latency_tracers)
(
systems::read_latency_tracers,
systems::send_latency_tracers
.run_if(on_timer(Duration::from_millis(100))),
)
.run_if(state_exists_and_equals(
SilkClientStatus::Connected,
)),
Expand Down
54 changes: 30 additions & 24 deletions bevy-silk/src/client/router/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod receive;
mod send;

use std::collections::VecDeque;

use crate::{
protocol::Payload,
socket::{common_socket_reader, SilkSocket},
Expand All @@ -10,12 +12,17 @@ use bevy::prelude::*;
pub use receive::IncomingMessages;
pub use send::OutgoingMessages;

pub trait AddNetworkMessageExt {
fn add_network_message<M: Payload>(&mut self) -> &mut Self;
pub trait AddProtocolExt {
fn add_unbounded_protocol<M: Payload>(&mut self) -> &mut Self;
fn add_bounded_protocol<M: Payload>(&mut self, bound: usize) -> &mut Self;
}

impl AddNetworkMessageExt for App {
fn add_network_message<M>(&mut self) -> &mut Self
impl AddProtocolExt for App {
fn add_unbounded_protocol<M: Payload>(&mut self) -> &mut Self {
self.add_bounded_protocol::<M>(usize::MAX)
}

fn add_bounded_protocol<M>(&mut self, bound: usize) -> &mut Self
where
M: Payload,
{
Expand All @@ -24,26 +31,25 @@ impl AddNetworkMessageExt for App {
{
panic!("client already contains resource: {}", M::reflect_name());
}
self.insert_resource(IncomingMessages::<M> { messages: vec![] })
.insert_resource(OutgoingMessages::<M> {
reliable_to_host: vec![],
unreliable_to_host: vec![],
})
.add_systems(
First,
(
IncomingMessages::<M>::flush,
IncomingMessages::<M>::receive_payloads,
)
.chain()
.after(common_socket_reader)
.run_if(resource_exists::<SilkSocket>()),
)
.add_systems(
Last,
OutgoingMessages::<M>::send_payloads
.run_if(resource_exists::<SilkSocket>()),
);
self.insert_resource(IncomingMessages::<M> {
bound,
messages: VecDeque::new(),
})
.insert_resource(OutgoingMessages::<M> {
reliable_to_host: vec![],
unreliable_to_host: vec![],
})
.add_systems(
First,
IncomingMessages::<M>::receive_payloads
.after(common_socket_reader)
.run_if(resource_exists::<SilkSocket>()),
)
.add_systems(
Last,
OutgoingMessages::<M>::send_payloads
.run_if(resource_exists::<SilkSocket>()),
);
self
}
}
28 changes: 16 additions & 12 deletions bevy-silk/src/client/router/receive.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
use crate::{events::SocketRecvEvent, protocol::Payload};
use bevy::prelude::*;
use std::collections::VecDeque;

#[derive(Default, Debug, Resource)]
pub struct IncomingMessages<M: Payload> {
pub messages: Vec<M>,
pub bound: usize,
pub messages: VecDeque<M>,
}

impl<M: Payload> IncomingMessages<M> {
/// Swaps the event buffers and clears the oldest event buffer. In general,
/// this should be called once per frame/update.
pub fn flush(mut incoming: ResMut<Self>) {
if !incoming.messages.is_empty() {
trace!("flushing {} messages", incoming.messages.len());
}
incoming.messages.clear();
}

pub fn receive_payloads(
mut incoming: ResMut<Self>,
mut events: EventReader<SocketRecvEvent>,
) {
let mut read = 0;
for SocketRecvEvent((_peer_id, packet)) in events.read() {
let bound = incoming.bound;
for SocketRecvEvent((peer_id, packet)) in events.read() {
if let Some(message) = M::from_packet(packet) {
incoming.messages.push(message);
// Insert the new message
incoming.messages.push_back(message);
// Ensure only the last BOUND messages are kept
while incoming.messages.len() > bound {
incoming.messages.pop_front();
warn!(
"The `{}` protocol is overflowing its bounded buffer ({bound}) and dropping packets! The payloads may not being read fast enough, or {peer_id} is exceeding rate!",
M::reflect_name()
);
}

read += 1;
}
}
Expand Down
57 changes: 8 additions & 49 deletions bevy-silk/src/client/system_params.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use super::router::{IncomingMessages, OutgoingMessages};
use crate::protocol::Payload;
use bevy::{
ecs::system::{SystemChangeTick, SystemParam},
prelude::*,
};
use instant::Duration;
use bevy::{ecs::system::SystemParam, prelude::*};

#[derive(SystemParam, Debug)]
pub struct NetworkReader<'w, M: Payload> {
Expand All @@ -13,73 +9,36 @@ pub struct NetworkReader<'w, M: Payload> {

impl<'w, M: Payload> NetworkReader<'w, M> {
/// Consumes all messages in the buffer and iterate on them.
pub fn drain(&mut self) -> std::vec::Drain<M> {
pub fn read(&mut self) -> std::collections::vec_deque::Drain<'_, M> {
self.incoming.messages.drain(..)
}
}

#[derive(SystemParam, Debug)]
pub struct NetworkWriter<'w, 's, M: Payload, const RATE_MS: u64 = 0> {
pub(crate) tick: SystemChangeTick,
pub(crate) timer: Local<'s, Option<Timer>>,
pub(crate) last_tick: Local<'s, u32>,
pub(crate) last_instant: Local<'s, Option<instant::Instant>>,
pub struct NetworkWriter<'w, M: Payload> {
pub(crate) outgoing: ResMut<'w, OutgoingMessages<M>>,
}

impl<'w, 's, M: Payload, const RATE_MS: u64> NetworkWriter<'w, 's, M, RATE_MS> {
/// Returns true if the time since the last tick has passed the rate
/// duration.
#[inline]
pub(crate) fn ready(&mut self) -> bool {
if RATE_MS == 0 {
true
} else {
let tick = self.tick.this_run().get();
let timer = self.timer.get_or_insert(Timer::new(
Duration::from_millis(RATE_MS),
TimerMode::Repeating,
));
if *self.last_tick != tick {
let now = instant::Instant::now();
let last_instant = self.last_instant.get_or_insert(now);
timer.tick(now - *last_instant);
*last_instant = now;
*self.last_tick = tick;
}
timer.finished()
}
}

impl<'w, M: Payload> NetworkWriter<'w, M> {
/// Send a payload to the host with reliability. The payload is created with
/// lazy behavior, only when the send rate allows.
pub fn reliable_to_host_with(&mut self, message_fn: impl Fn() -> M) {
if self.ready() {
self.outgoing.reliable_to_host.push(message_fn());
}
self.outgoing.reliable_to_host.push(message_fn());
}

/// Send a payload to the host with no expectation of delivery. The payload
/// is created with lazy behavior, only when the send rate allows.
pub fn unreliable_to_host_with(&mut self, message_fn: impl Fn() -> M) {
if self.ready() {
self.outgoing.unreliable_to_host.push(message_fn());
}
self.outgoing.unreliable_to_host.push(message_fn());
}
}

impl<'w, 's, M: Payload> NetworkWriter<'w, 's, M, 0> {
/// Send a payload to the host with reliability.
pub fn reliable_to_host(&mut self, message: M) {
if self.ready() {
self.outgoing.reliable_to_host.push(message);
}
self.outgoing.reliable_to_host.push(message);
}

/// Send a payload to the host with no expectation of delivery.
pub fn unreliable_to_host(&mut self, message: M) {
if self.ready() {
self.outgoing.unreliable_to_host.push(message);
}
self.outgoing.unreliable_to_host.push(message);
}
}
32 changes: 9 additions & 23 deletions bevy-silk/src/client/systems.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ pub(crate) fn client_event_writer(

pub fn send_latency_tracers(
state: Res<SilkState>,
mut writer: NetworkWriter<LatencyTracerPayload, 100>,
mut writer: NetworkWriter<LatencyTracerPayload>,
) {
let peer_id = state.id.expect("expected peer id");
writer.unreliable_to_host_with(|| LatencyTracerPayload::new(peer_id));
writer.unreliable_to_host(LatencyTracerPayload::new(peer_id));
}

pub fn read_latency_tracers(
Expand All @@ -161,35 +161,21 @@ pub fn read_latency_tracers(
let peer_id = state.id.expect("expected peer id");
let mut tracer = trace_query.single_mut();

// Only collect the most recent payloads that happens this tick.
let mut most_recent_payload: Option<LatencyTracerPayload> = None;

for payload in reader.drain() {
// Server time payloads get sent right back to the server
if payload.from == host_id {
if let Some(ref mrp) = most_recent_payload {
if mrp.age() > payload.age() {
most_recent_payload.replace(payload);
}
} else {
most_recent_payload.replace(payload);
}
for payload in reader.read() {
if payload.from == peer_id {
tracer.process(payload);
} else if payload.from == host_id {
// Server time payloads get sent right back to the server
writer.unreliable_to_host(payload);
}
// Process payloads we sent out
else if payload.from == peer_id {
tracer.process(payload.clone());
} else {
else {
warn!(
"Invalid latency tracer from address: {}, ignoring",
payload.from
);
}
}

// Send all server requests
if let Some(payload) = most_recent_payload.take() {
writer.unreliable_to_host(payload);
}
}

pub fn calculate_latency(
Expand Down
2 changes: 1 addition & 1 deletion bevy-silk/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ mod systems;

pub use events::SilkServerEvent;
pub use plugin::SilkServerPlugin;
pub use router::AddNetworkMessageExt;
pub use router::AddProtocolExt;
pub use state::{SilkServerStatus, SilkState};
pub use system_params::{NetworkReader, NetworkWriter};
13 changes: 9 additions & 4 deletions bevy-silk/src/server/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use crate::{
latency::LatencyTracerPayload,
socket::{common_socket_reader, SilkSocket},
};
use bevy::prelude::*;
use bevy::{prelude::*, time::common_conditions::on_timer};
use instant::Duration;
use std::net::Ipv4Addr;

use super::{
systems, AddNetworkMessageExt, SilkServerEvent, SilkServerStatus, SilkState,
systems, AddProtocolExt, SilkServerEvent, SilkServerStatus, SilkState,
};

/// A plugin to serve a WebRTC server.
Expand All @@ -21,7 +22,7 @@ impl Plugin for SilkServerPlugin {
// Initialize the schedule for silk
app.add_event::<SocketRecvEvent>()
.add_event::<SilkServerEvent>()
.add_network_message::<LatencyTracerPayload>()
.add_bounded_protocol::<LatencyTracerPayload>(1)
.add_state::<SilkServerStatus>()
.insert_resource(SilkState::new(
(Ipv4Addr::UNSPECIFIED, self.port).into(),
Expand All @@ -45,7 +46,11 @@ impl Plugin for SilkServerPlugin {
)
.add_systems(
Update,
(systems::read_latency_tracers, systems::send_latency_tracers)
(
systems::read_latency_tracers,
systems::send_latency_tracers
.run_if(on_timer(Duration::from_millis(100))),
)
.run_if(state_exists_and_equals(SilkServerStatus::Ready)),
);
}
Expand Down
Loading

0 comments on commit a5210b4

Please sign in to comment.