From 90d8ce314ac59244d738d20313cb3525ab5d63d8 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Fri, 1 Mar 2024 23:19:56 +0000 Subject: [PATCH] SendReliabilityLayer: do not send reliable packets outside the client's reliable window the client implements a window of 512 packets, outside of which packets will be dropped and eventually resent. To avoid this, we buffer packets until their reliable message indexes are low enough to be accepted by the client's window. This significantly improves performance when transmitting large amounts of data. --- src/generic/SendReliabilityLayer.php | 101 ++++++++++++++++++++------- 1 file changed, 77 insertions(+), 24 deletions(-) diff --git a/src/generic/SendReliabilityLayer.php b/src/generic/SendReliabilityLayer.php index c1bc254..5234655 100644 --- a/src/generic/SendReliabilityLayer.php +++ b/src/generic/SendReliabilityLayer.php @@ -23,6 +23,8 @@ use raklib\protocol\PacketReliability; use raklib\protocol\SplitPacketInfo; use function array_fill; +use function array_push; +use function assert; use function count; use function str_split; use function strlen; @@ -41,12 +43,23 @@ final class SendReliabilityLayer{ private int $messageIndex = 0; + private int $reliableWindowStart; + private int $reliableWindowEnd; + /** + * @var bool[] message index => acked + * @phpstan-var array + */ + private array $reliableWindow = []; + /** @var int[] */ private array $sendOrderedIndex; /** @var int[] */ private array $sendSequencedIndex; - /** @var ReliableCacheEntry[] */ + /** @var EncapsulatedPacket[] */ + private array $reliableBacklog = []; + + /** @var EncapsulatedPacket[] */ private array $resendQueue = []; /** @var ReliableCacheEntry[] */ @@ -60,18 +73,22 @@ final class SendReliabilityLayer{ /** * @phpstan-param int $mtuSize - * @phpstan-param \Closure(Datagram) : void $sendDatagramCallback - * @phpstan-param \Closure(int) : void $onACK + * @phpstan-param \Closure(Datagram) : void $sendDatagramCallback + * @phpstan-param \Closure(int) : void $onACK */ public function __construct( private int $mtuSize, private \Closure $sendDatagramCallback, - private \Closure $onACK + private \Closure $onACK, + private int $reliableWindowSize = 512, ){ $this->sendOrderedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0); $this->sendSequencedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0); $this->maxDatagramPayloadSize = $this->mtuSize - self::DATAGRAM_MTU_OVERHEAD; + + $this->reliableWindowStart = 0; + $this->reliableWindowEnd = $this->reliableWindowSize; } /** @@ -102,6 +119,19 @@ public function sendQueue() : void{ } private function addToQueue(EncapsulatedPacket $pk, bool $immediate) : void{ + if(PacketReliability::isReliable($pk->reliability)){ + if($pk->messageIndex === null || $pk->messageIndex < $this->reliableWindowStart){ + throw new \InvalidArgumentException("Cannot send a reliable packet with message index less than the window start ($pk->messageIndex < $this->reliableWindowStart)"); + } + if($pk->messageIndex >= $this->reliableWindowEnd){ + //If we send this now, the client's reliable window may overflow, causing the packet to need redelivery + $this->reliableBacklog[$pk->messageIndex] = $pk; + return; + } + + $this->reliableWindow[$pk->messageIndex] = false; + } + if($pk->identifierACK !== null and $pk->messageIndex !== null){ $this->needACK[$pk->identifierACK][$pk->messageIndex] = $pk->messageIndex; } @@ -172,11 +202,26 @@ public function addEncapsulatedToQueue(EncapsulatedPacket $packet, bool $immedia } } + private function updateReliableWindow() : void{ + while( + isset($this->reliableWindow[$this->reliableWindowStart]) && //this messageIndex has been used + $this->reliableWindow[$this->reliableWindowStart] === true //we received an ack for this messageIndex + ){ + $this->reliableWindowStart++; + $this->reliableWindowEnd++; + unset($this->reliableWindow[$this->reliableWindowStart]); + } + } + public function onACK(ACK $packet) : void{ foreach($packet->packets as $seq){ if(isset($this->reliableCache[$seq])){ foreach($this->reliableCache[$seq]->getPackets() as $pk){ - if($pk->identifierACK !== null and $pk->messageIndex !== null){ + assert($pk->messageIndex !== null && $pk->messageIndex >= $this->reliableWindowStart && $pk->messageIndex < $this->reliableWindowEnd); + $this->reliableWindow[$pk->messageIndex] = true; + $this->updateReliableWindow(); + + if($pk->identifierACK !== null){ unset($this->needACK[$pk->identifierACK][$pk->messageIndex]); if(count($this->needACK[$pk->identifierACK]) === 0){ unset($this->needACK[$pk->identifierACK]); @@ -192,8 +237,9 @@ public function onACK(ACK $packet) : void{ public function onNACK(NACK $packet) : void{ foreach($packet->packets as $seq){ if(isset($this->reliableCache[$seq])){ - //TODO: group resends if the resulting datagram is below the MTU - $this->resendQueue[] = $this->reliableCache[$seq]; + foreach($this->reliableCache[$seq]->getPackets() as $pk){ + $this->resendQueue[] = $pk; + } unset($this->reliableCache[$seq]); } } @@ -202,37 +248,44 @@ public function onNACK(NACK $packet) : void{ public function needsUpdate() : bool{ return ( count($this->sendQueue) !== 0 or + count($this->reliableBacklog) !== 0 or count($this->resendQueue) !== 0 or count($this->reliableCache) !== 0 ); } public function update() : void{ - if(count($this->resendQueue) > 0){ - $limit = 16; - foreach($this->resendQueue as $k => $pk){ - $this->sendDatagram($pk->getPackets()); - unset($this->resendQueue[$k]); - - if(--$limit <= 0){ - break; - } - } - - if(count($this->resendQueue) > ReceiveReliabilityLayer::$WINDOW_SIZE){ - $this->resendQueue = []; - } - } - foreach($this->reliableCache as $seq => $pk){ if($pk->getTimestamp() < (time() - 8)){ - $this->resendQueue[] = $pk; + //behave as if a NACK was received + array_push($this->resendQueue, ...$pk->getPackets()); unset($this->reliableCache[$seq]); }else{ break; } } + if(count($this->resendQueue) > 0){ + foreach($this->resendQueue as $pk){ + //resends should always be within the reliable window + $this->addToQueue($pk, false); + } + $this->resendQueue = []; + } + + if(count($this->reliableBacklog) > 0){ + foreach($this->reliableBacklog as $k => $pk){ + assert($pk->messageIndex !== null && $pk->messageIndex >= $this->reliableWindowStart); + if($pk->messageIndex >= $this->reliableWindowEnd){ + //we can't send this packet yet, the client's reliable window will drop it + break; + } + + $this->addToQueue($pk, false); + unset($this->reliableBacklog[$k]); + } + } + $this->sendQueue(); } }