diff --git a/src/generic/SendReliabilityLayer.php b/src/generic/SendReliabilityLayer.php index c1bc254..5234655 100644 --- a/src/generic/SendReliabilityLayer.php +++ b/src/generic/SendReliabilityLayer.php @@ -23,6 +23,8 @@ use raklib\protocol\PacketReliability; use raklib\protocol\SplitPacketInfo; use function array_fill; +use function array_push; +use function assert; use function count; use function str_split; use function strlen; @@ -41,12 +43,23 @@ final class SendReliabilityLayer{ private int $messageIndex = 0; + private int $reliableWindowStart; + private int $reliableWindowEnd; + /** + * @var bool[] message index => acked + * @phpstan-var array + */ + private array $reliableWindow = []; + /** @var int[] */ private array $sendOrderedIndex; /** @var int[] */ private array $sendSequencedIndex; - /** @var ReliableCacheEntry[] */ + /** @var EncapsulatedPacket[] */ + private array $reliableBacklog = []; + + /** @var EncapsulatedPacket[] */ private array $resendQueue = []; /** @var ReliableCacheEntry[] */ @@ -60,18 +73,22 @@ final class SendReliabilityLayer{ /** * @phpstan-param int $mtuSize - * @phpstan-param \Closure(Datagram) : void $sendDatagramCallback - * @phpstan-param \Closure(int) : void $onACK + * @phpstan-param \Closure(Datagram) : void $sendDatagramCallback + * @phpstan-param \Closure(int) : void $onACK */ public function __construct( private int $mtuSize, private \Closure $sendDatagramCallback, - private \Closure $onACK + private \Closure $onACK, + private int $reliableWindowSize = 512, ){ $this->sendOrderedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0); $this->sendSequencedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0); $this->maxDatagramPayloadSize = $this->mtuSize - self::DATAGRAM_MTU_OVERHEAD; + + $this->reliableWindowStart = 0; + $this->reliableWindowEnd = $this->reliableWindowSize; } /** @@ -102,6 +119,19 @@ public function sendQueue() : void{ } private function addToQueue(EncapsulatedPacket $pk, bool $immediate) : void{ + if(PacketReliability::isReliable($pk->reliability)){ + if($pk->messageIndex === null || $pk->messageIndex < $this->reliableWindowStart){ + throw new \InvalidArgumentException("Cannot send a reliable packet with message index less than the window start ($pk->messageIndex < $this->reliableWindowStart)"); + } + if($pk->messageIndex >= $this->reliableWindowEnd){ + //If we send this now, the client's reliable window may overflow, causing the packet to need redelivery + $this->reliableBacklog[$pk->messageIndex] = $pk; + return; + } + + $this->reliableWindow[$pk->messageIndex] = false; + } + if($pk->identifierACK !== null and $pk->messageIndex !== null){ $this->needACK[$pk->identifierACK][$pk->messageIndex] = $pk->messageIndex; } @@ -172,11 +202,26 @@ public function addEncapsulatedToQueue(EncapsulatedPacket $packet, bool $immedia } } + private function updateReliableWindow() : void{ + while( + isset($this->reliableWindow[$this->reliableWindowStart]) && //this messageIndex has been used + $this->reliableWindow[$this->reliableWindowStart] === true //we received an ack for this messageIndex + ){ + $this->reliableWindowStart++; + $this->reliableWindowEnd++; + unset($this->reliableWindow[$this->reliableWindowStart]); + } + } + public function onACK(ACK $packet) : void{ foreach($packet->packets as $seq){ if(isset($this->reliableCache[$seq])){ foreach($this->reliableCache[$seq]->getPackets() as $pk){ - if($pk->identifierACK !== null and $pk->messageIndex !== null){ + assert($pk->messageIndex !== null && $pk->messageIndex >= $this->reliableWindowStart && $pk->messageIndex < $this->reliableWindowEnd); + $this->reliableWindow[$pk->messageIndex] = true; + $this->updateReliableWindow(); + + if($pk->identifierACK !== null){ unset($this->needACK[$pk->identifierACK][$pk->messageIndex]); if(count($this->needACK[$pk->identifierACK]) === 0){ unset($this->needACK[$pk->identifierACK]); @@ -192,8 +237,9 @@ public function onACK(ACK $packet) : void{ public function onNACK(NACK $packet) : void{ foreach($packet->packets as $seq){ if(isset($this->reliableCache[$seq])){ - //TODO: group resends if the resulting datagram is below the MTU - $this->resendQueue[] = $this->reliableCache[$seq]; + foreach($this->reliableCache[$seq]->getPackets() as $pk){ + $this->resendQueue[] = $pk; + } unset($this->reliableCache[$seq]); } } @@ -202,37 +248,44 @@ public function onNACK(NACK $packet) : void{ public function needsUpdate() : bool{ return ( count($this->sendQueue) !== 0 or + count($this->reliableBacklog) !== 0 or count($this->resendQueue) !== 0 or count($this->reliableCache) !== 0 ); } public function update() : void{ - if(count($this->resendQueue) > 0){ - $limit = 16; - foreach($this->resendQueue as $k => $pk){ - $this->sendDatagram($pk->getPackets()); - unset($this->resendQueue[$k]); - - if(--$limit <= 0){ - break; - } - } - - if(count($this->resendQueue) > ReceiveReliabilityLayer::$WINDOW_SIZE){ - $this->resendQueue = []; - } - } - foreach($this->reliableCache as $seq => $pk){ if($pk->getTimestamp() < (time() - 8)){ - $this->resendQueue[] = $pk; + //behave as if a NACK was received + array_push($this->resendQueue, ...$pk->getPackets()); unset($this->reliableCache[$seq]); }else{ break; } } + if(count($this->resendQueue) > 0){ + foreach($this->resendQueue as $pk){ + //resends should always be within the reliable window + $this->addToQueue($pk, false); + } + $this->resendQueue = []; + } + + if(count($this->reliableBacklog) > 0){ + foreach($this->reliableBacklog as $k => $pk){ + assert($pk->messageIndex !== null && $pk->messageIndex >= $this->reliableWindowStart); + if($pk->messageIndex >= $this->reliableWindowEnd){ + //we can't send this packet yet, the client's reliable window will drop it + break; + } + + $this->addToQueue($pk, false); + unset($this->reliableBacklog[$k]); + } + } + $this->sendQueue(); } }