Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
refactor: API (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
simbleau authored Jan 13, 2024
1 parent f65d155 commit 81156c5
Show file tree
Hide file tree
Showing 17 changed files with 283 additions and 271 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ bevy-silk is a simple, multi-platform WebRTC networking library for client<->ser
- Bevy system parameters for reading and writing packets
- Derive macros for creating protocols
- Easily introspect latency
- Easily throttle sending and receiving packets
- Support for unbounded and bounded network buffers

## Quickstart

Expand Down
2 changes: 1 addition & 1 deletion bevy-silk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ mod systems;

pub use events::{ConnectionRequest, SilkClientEvent};
pub use plugin::SilkClientPlugin;
pub use router::AddNetworkMessageExt;
pub use router::AddProtocolExt;
pub use state::{SilkClientStatus, SilkState};
pub use system_params::{NetworkReader, NetworkWriter};
15 changes: 10 additions & 5 deletions bevy-silk/src/client/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
use super::{
systems, AddNetworkMessageExt, ConnectionRequest, SilkClientEvent,
systems, AddProtocolExt, ConnectionRequest, SilkClientEvent,
SilkClientStatus, SilkState,
};
use crate::{
events::SocketRecvEvent,
latency::LatencyTracerPayload,
socket::{common_socket_reader, SilkSocket},
};
use bevy::prelude::*;
use bevy::{prelude::*, time::common_conditions::on_timer};
use instant::Duration;

/// The socket client abstraction
/// A plugin to connect to a WebRTC server.
pub struct SilkClientPlugin;

impl Plugin for SilkClientPlugin {
fn build(&self, app: &mut App) {
app.add_event::<SocketRecvEvent>()
.insert_resource(SilkState::default())
.add_network_message::<LatencyTracerPayload>()
.add_bounded_protocol::<LatencyTracerPayload>(1)
.add_state::<SilkClientStatus>()
.add_event::<ConnectionRequest>()
.add_event::<SilkClientEvent>()
Expand Down Expand Up @@ -45,7 +46,11 @@ impl Plugin for SilkClientPlugin {
)
.add_systems(
Update,
(systems::read_latency_tracers, systems::send_latency_tracers)
(
systems::read_latency_tracers,
systems::send_latency_tracers
.run_if(on_timer(Duration::from_millis(100))),
)
.run_if(state_exists_and_equals(
SilkClientStatus::Connected,
)),
Expand Down
56 changes: 32 additions & 24 deletions bevy-silk/src/client/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,25 @@ use crate::{
socket::{common_socket_reader, SilkSocket},
};
use bevy::prelude::*;
use std::collections::VecDeque;

pub use receive::IncomingMessages;
pub use send::OutgoingMessages;

pub trait AddNetworkMessageExt {
fn add_network_message<M: Payload>(&mut self) -> &mut Self;
pub trait AddProtocolExt {
/// Register a protocol, dually allocating a sized buffer for
/// payloads received, per peer.
fn add_bounded_protocol<M: Payload>(&mut self, bound: usize) -> &mut Self;
/// Register a protocol, with a growable buffer.
fn add_unbounded_protocol<M: Payload>(&mut self) -> &mut Self;
}

impl AddNetworkMessageExt for App {
fn add_network_message<M>(&mut self) -> &mut Self
impl AddProtocolExt for App {
fn add_unbounded_protocol<M: Payload>(&mut self) -> &mut Self {
self.add_bounded_protocol::<M>(usize::MAX)
}

fn add_bounded_protocol<M>(&mut self, bound: usize) -> &mut Self
where
M: Payload,
{
Expand All @@ -24,26 +33,25 @@ impl AddNetworkMessageExt for App {
{
panic!("client already contains resource: {}", M::reflect_name());
}
self.insert_resource(IncomingMessages::<M> { messages: vec![] })
.insert_resource(OutgoingMessages::<M> {
reliable_to_host: vec![],
unreliable_to_host: vec![],
})
.add_systems(
First,
(
IncomingMessages::<M>::flush,
IncomingMessages::<M>::receive_payloads,
)
.chain()
.after(common_socket_reader)
.run_if(resource_exists::<SilkSocket>()),
)
.add_systems(
Last,
OutgoingMessages::<M>::send_payloads
.run_if(resource_exists::<SilkSocket>()),
);
self.insert_resource(IncomingMessages::<M> {
bound,
messages: VecDeque::new(),
})
.insert_resource(OutgoingMessages::<M> {
reliable_to_host: vec![],
unreliable_to_host: vec![],
})
.add_systems(
First,
IncomingMessages::<M>::receive_payloads
.after(common_socket_reader)
.run_if(resource_exists::<SilkSocket>()),
)
.add_systems(
Last,
OutgoingMessages::<M>::send_payloads
.run_if(resource_exists::<SilkSocket>()),
);
self
}
}
28 changes: 16 additions & 12 deletions bevy-silk/src/client/router/receive.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
use crate::{events::SocketRecvEvent, protocol::Payload};
use bevy::prelude::*;
use std::collections::VecDeque;

#[derive(Default, Debug, Resource)]
pub struct IncomingMessages<M: Payload> {
pub messages: Vec<M>,
pub bound: usize,
pub messages: VecDeque<M>,
}

impl<M: Payload> IncomingMessages<M> {
/// Swaps the event buffers and clears the oldest event buffer. In general,
/// this should be called once per frame/update.
pub fn flush(mut incoming: ResMut<Self>) {
if !incoming.messages.is_empty() {
trace!("flushing {} messages", incoming.messages.len());
}
incoming.messages.clear();
}

pub fn receive_payloads(
mut incoming: ResMut<Self>,
mut events: EventReader<SocketRecvEvent>,
) {
let mut read = 0;
for SocketRecvEvent((_peer_id, packet)) in events.read() {
let bound = incoming.bound;
for SocketRecvEvent((peer_id, packet)) in events.read() {
if let Some(message) = M::from_packet(packet) {
incoming.messages.push(message);
// Insert the new message
incoming.messages.push_back(message);
// Ensure only the last BOUND messages are kept
while incoming.messages.len() > bound {
incoming.messages.pop_front();
warn!(
"The `{}` protocol is overflowing its bounded buffer ({bound}) and dropping packets! The payloads may not being read fast enough, or {peer_id} is exceeding rate!",
M::reflect_name()
);
}

read += 1;
}
}
Expand Down
26 changes: 20 additions & 6 deletions bevy-silk/src/client/system_params.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,43 @@
use crate::protocol::Payload;

use super::router::{IncomingMessages, OutgoingMessages};
use crate::protocol::Payload;
use bevy::{ecs::system::SystemParam, prelude::*};

#[derive(SystemParam, Debug)]
pub struct NetworkReader<'w, M: Payload> {
incoming: Res<'w, IncomingMessages<M>>,
incoming: ResMut<'w, IncomingMessages<M>>,
}

impl<'w, M: Payload> NetworkReader<'w, M> {
pub fn iter(&mut self) -> std::slice::Iter<'_, M> {
self.incoming.messages.iter()
/// Consumes all messages in the buffer and iterate on them.
pub fn read(&mut self) -> std::collections::vec_deque::Drain<'_, M> {
self.incoming.messages.drain(..)
}
}

#[derive(SystemParam, Debug)]
pub struct NetworkWriter<'w, M: Payload> {
outgoing: ResMut<'w, OutgoingMessages<M>>,
pub(crate) outgoing: ResMut<'w, OutgoingMessages<M>>,
}

impl<'w, M: Payload> NetworkWriter<'w, M> {
/// Send a payload to the host with reliability. The payload is created with
/// lazy behavior, only when the send rate allows.
pub fn reliable_to_host_with(&mut self, message_fn: impl Fn() -> M) {
self.outgoing.reliable_to_host.push(message_fn());
}

/// Send a payload to the host with no expectation of delivery. The payload
/// is created with lazy behavior, only when the send rate allows.
pub fn unreliable_to_host_with(&mut self, message_fn: impl Fn() -> M) {
self.outgoing.unreliable_to_host.push(message_fn());
}

/// Send a payload to the host with reliability.
pub fn reliable_to_host(&mut self, message: M) {
self.outgoing.reliable_to_host.push(message);
}

/// Send a payload to the host with no expectation of delivery.
pub fn unreliable_to_host(&mut self, message: M) {
self.outgoing.unreliable_to_host.push(message);
}
Expand Down
33 changes: 9 additions & 24 deletions bevy-silk/src/client/systems.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use super::{
use crate::{
latency::{LatencyTracer, LatencyTracerPayload},
socket::{SilkSocket, SilkSocketPlurality},
system_param::NetworkThrottle,
};
use bevy::prelude::*;
use bevy_matchbox::{
Expand Down Expand Up @@ -147,12 +146,9 @@ pub(crate) fn client_event_writer(
pub fn send_latency_tracers(
state: Res<SilkState>,
mut writer: NetworkWriter<LatencyTracerPayload>,
throttle: NetworkThrottle<100>,
) {
if throttle.ready() {
let peer_id = state.id.expect("expected peer id");
writer.unreliable_to_host(LatencyTracerPayload::new(peer_id));
}
let peer_id = state.id.expect("expected peer id");
writer.unreliable_to_host(LatencyTracerPayload::new(peer_id));
}

pub fn read_latency_tracers(
Expand All @@ -165,32 +161,21 @@ pub fn read_latency_tracers(
let peer_id = state.id.expect("expected peer id");
let mut tracer = trace_query.single_mut();

// Only collect the most recent payloads that happens this tick.
let mut most_recent_payload: Option<LatencyTracerPayload> = None;

for payload in reader.iter() {
// Server time payloads get sent right back to the server
if payload.from == host_id {
let mrp = most_recent_payload.get_or_insert(payload.clone());
if mrp.age() > payload.age() {
*mrp = payload.clone();
}
for payload in reader.read() {
if payload.from == peer_id {
tracer.process(payload);
} else if payload.from == host_id {
// Server time payloads get sent right back to the server
writer.unreliable_to_host(payload);
}
// Process payloads we sent out
else if payload.from == peer_id {
tracer.process(payload.clone());
} else {
else {
warn!(
"Invalid latency tracer from address: {}, ignoring",
payload.from
);
}
}

// Send all server requests
if let Some(payload) = most_recent_payload.take() {
writer.unreliable_to_host(payload);
}
}

pub fn calculate_latency(
Expand Down
1 change: 0 additions & 1 deletion bevy-silk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub(crate) mod events;
pub(crate) mod latency;
pub mod protocol;
pub(crate) mod socket;
pub mod system_param;

// Re-exports
pub use bevy_matchbox;
Expand Down
2 changes: 1 addition & 1 deletion bevy-silk/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ mod systems;

pub use events::SilkServerEvent;
pub use plugin::SilkServerPlugin;
pub use router::AddNetworkMessageExt;
pub use router::AddProtocolExt;
pub use state::{SilkServerStatus, SilkState};
pub use system_params::{NetworkReader, NetworkWriter};
13 changes: 9 additions & 4 deletions bevy-silk/src/server/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use crate::{
latency::LatencyTracerPayload,
socket::{common_socket_reader, SilkSocket},
};
use bevy::prelude::*;
use bevy::{prelude::*, time::common_conditions::on_timer};
use instant::Duration;
use std::net::Ipv4Addr;

use super::{
systems, AddNetworkMessageExt, SilkServerEvent, SilkServerStatus, SilkState,
systems, AddProtocolExt, SilkServerEvent, SilkServerStatus, SilkState,
};

/// A plugin to serve a WebRTC server.
Expand All @@ -21,7 +22,7 @@ impl Plugin for SilkServerPlugin {
// Initialize the schedule for silk
app.add_event::<SocketRecvEvent>()
.add_event::<SilkServerEvent>()
.add_network_message::<LatencyTracerPayload>()
.add_bounded_protocol::<LatencyTracerPayload>(1)
.add_state::<SilkServerStatus>()
.insert_resource(SilkState::new(
(Ipv4Addr::UNSPECIFIED, self.port).into(),
Expand All @@ -45,7 +46,11 @@ impl Plugin for SilkServerPlugin {
)
.add_systems(
Update,
(systems::read_latency_tracers, systems::send_latency_tracers)
(
systems::read_latency_tracers,
systems::send_latency_tracers
.run_if(on_timer(Duration::from_millis(100))),
)
.run_if(state_exists_and_equals(SilkServerStatus::Ready)),
);
}
Expand Down
Loading

0 comments on commit 81156c5

Please sign in to comment.