Skip to content

Commit

Permalink
SendReliabilityLayer: do not send reliable packets outside the client…
Browse files Browse the repository at this point in the history
…'s reliable window

the client implements a window of 512 packets, outside of which packets will be dropped and eventually resent.
To avoid this, we buffer packets until their reliable message indexes are low enough to be accepted by the client's window.
This significantly improves performance when transmitting large amounts of data.
  • Loading branch information
dktapps committed Mar 1, 2024
1 parent fd74ba2 commit 90d8ce3
Showing 1 changed file with 77 additions and 24 deletions.
101 changes: 77 additions & 24 deletions src/generic/SendReliabilityLayer.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
use raklib\protocol\PacketReliability;
use raklib\protocol\SplitPacketInfo;
use function array_fill;
use function array_push;
use function assert;
use function count;
use function str_split;
use function strlen;
Expand All @@ -41,12 +43,23 @@ final class SendReliabilityLayer{

private int $messageIndex = 0;

private int $reliableWindowStart;
private int $reliableWindowEnd;
/**
* @var bool[] message index => acked
* @phpstan-var array<int, bool>
*/
private array $reliableWindow = [];

/** @var int[] */
private array $sendOrderedIndex;
/** @var int[] */
private array $sendSequencedIndex;

/** @var ReliableCacheEntry[] */
/** @var EncapsulatedPacket[] */
private array $reliableBacklog = [];

/** @var EncapsulatedPacket[] */
private array $resendQueue = [];

/** @var ReliableCacheEntry[] */
Expand All @@ -60,18 +73,22 @@ final class SendReliabilityLayer{

/**
* @phpstan-param int<Session::MIN_MTU_SIZE, max> $mtuSize
* @phpstan-param \Closure(Datagram) : void $sendDatagramCallback
* @phpstan-param \Closure(int) : void $onACK
* @phpstan-param \Closure(Datagram) : void $sendDatagramCallback
* @phpstan-param \Closure(int) : void $onACK
*/
public function __construct(
private int $mtuSize,
private \Closure $sendDatagramCallback,
private \Closure $onACK
private \Closure $onACK,
private int $reliableWindowSize = 512,
){
$this->sendOrderedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
$this->sendSequencedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);

$this->maxDatagramPayloadSize = $this->mtuSize - self::DATAGRAM_MTU_OVERHEAD;

$this->reliableWindowStart = 0;
$this->reliableWindowEnd = $this->reliableWindowSize;
}

/**
Expand Down Expand Up @@ -102,6 +119,19 @@ public function sendQueue() : void{
}

private function addToQueue(EncapsulatedPacket $pk, bool $immediate) : void{
if(PacketReliability::isReliable($pk->reliability)){
if($pk->messageIndex === null || $pk->messageIndex < $this->reliableWindowStart){
throw new \InvalidArgumentException("Cannot send a reliable packet with message index less than the window start ($pk->messageIndex < $this->reliableWindowStart)");
}
if($pk->messageIndex >= $this->reliableWindowEnd){
//If we send this now, the client's reliable window may overflow, causing the packet to need redelivery
$this->reliableBacklog[$pk->messageIndex] = $pk;
return;
}

$this->reliableWindow[$pk->messageIndex] = false;
}

if($pk->identifierACK !== null and $pk->messageIndex !== null){
$this->needACK[$pk->identifierACK][$pk->messageIndex] = $pk->messageIndex;
}
Expand Down Expand Up @@ -172,11 +202,26 @@ public function addEncapsulatedToQueue(EncapsulatedPacket $packet, bool $immedia
}
}

private function updateReliableWindow() : void{
while(
isset($this->reliableWindow[$this->reliableWindowStart]) && //this messageIndex has been used
$this->reliableWindow[$this->reliableWindowStart] === true //we received an ack for this messageIndex
){
$this->reliableWindowStart++;
$this->reliableWindowEnd++;
unset($this->reliableWindow[$this->reliableWindowStart]);
}
}

public function onACK(ACK $packet) : void{
foreach($packet->packets as $seq){
if(isset($this->reliableCache[$seq])){
foreach($this->reliableCache[$seq]->getPackets() as $pk){
if($pk->identifierACK !== null and $pk->messageIndex !== null){
assert($pk->messageIndex !== null && $pk->messageIndex >= $this->reliableWindowStart && $pk->messageIndex < $this->reliableWindowEnd);
$this->reliableWindow[$pk->messageIndex] = true;
$this->updateReliableWindow();

if($pk->identifierACK !== null){
unset($this->needACK[$pk->identifierACK][$pk->messageIndex]);
if(count($this->needACK[$pk->identifierACK]) === 0){
unset($this->needACK[$pk->identifierACK]);
Expand All @@ -192,8 +237,9 @@ public function onACK(ACK $packet) : void{
public function onNACK(NACK $packet) : void{
foreach($packet->packets as $seq){
if(isset($this->reliableCache[$seq])){
//TODO: group resends if the resulting datagram is below the MTU
$this->resendQueue[] = $this->reliableCache[$seq];
foreach($this->reliableCache[$seq]->getPackets() as $pk){
$this->resendQueue[] = $pk;
}
unset($this->reliableCache[$seq]);
}
}
Expand All @@ -202,37 +248,44 @@ public function onNACK(NACK $packet) : void{
public function needsUpdate() : bool{
return (
count($this->sendQueue) !== 0 or
count($this->reliableBacklog) !== 0 or
count($this->resendQueue) !== 0 or
count($this->reliableCache) !== 0
);
}

public function update() : void{
if(count($this->resendQueue) > 0){
$limit = 16;
foreach($this->resendQueue as $k => $pk){
$this->sendDatagram($pk->getPackets());
unset($this->resendQueue[$k]);

if(--$limit <= 0){
break;
}
}

if(count($this->resendQueue) > ReceiveReliabilityLayer::$WINDOW_SIZE){
$this->resendQueue = [];
}
}

foreach($this->reliableCache as $seq => $pk){
if($pk->getTimestamp() < (time() - 8)){
$this->resendQueue[] = $pk;
//behave as if a NACK was received
array_push($this->resendQueue, ...$pk->getPackets());
unset($this->reliableCache[$seq]);
}else{
break;
}
}

if(count($this->resendQueue) > 0){
foreach($this->resendQueue as $pk){
//resends should always be within the reliable window
$this->addToQueue($pk, false);
}
$this->resendQueue = [];
}

if(count($this->reliableBacklog) > 0){
foreach($this->reliableBacklog as $k => $pk){
assert($pk->messageIndex !== null && $pk->messageIndex >= $this->reliableWindowStart);
if($pk->messageIndex >= $this->reliableWindowEnd){
//we can't send this packet yet, the client's reliable window will drop it
break;
}

$this->addToQueue($pk, false);
unset($this->reliableBacklog[$k]);
}
}

$this->sendQueue();
}
}

0 comments on commit 90d8ce3

Please sign in to comment.