Skip to content

Commit

Permalink
Reliable fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrakena committed Dec 12, 2023
1 parent c3f43e4 commit 5b63b1e
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 106 deletions.
48 changes: 32 additions & 16 deletions Common/Networking/Channels/ReliableChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ internal sealed class ReliableChannel : ChannelBase

private readonly SemaphoreSlim _pendingPacketsSem = new(1, 1);

private readonly SemaphoreSlim _pendingPacketsSemaphore = new(1, 1);
private readonly NetworkPacket?[]? _receivedPackets; //for order
private readonly int _windowSize;

Expand Down Expand Up @@ -83,19 +82,15 @@ private async ValueTask ProcessAckAsync(NetworkPacket packet)
return;
}

await _pendingPacketsSemaphore.WaitAsync();
await _pendingPacketsSem.WaitAsync();
try
{
for (var pendingSeq = _localWindowStart;
pendingSeq != _localSequence;
pendingSeq = (pendingSeq + 1) % NetConstants.MaxSequence)
{
var rel = NetUtils.RelativeSequenceNumber(pendingSeq, ackWindowStart);
if (rel >= _windowSize)
{
NetDebug.Write("[PA]REL: " + rel);
break;
}
if (rel >= _windowSize) break;

var pendingIdx = pendingSeq % _windowSize;
var currentByte = NetConstants.ChanneledHeaderSize + pendingIdx / BitsInByte;
Expand All @@ -109,7 +104,7 @@ private async ValueTask ProcessAckAsync(NetworkPacket packet)
}

//Skip false ack
NetDebug.Write($"[PA]False ack: {pendingSeq}");
Peer.LogDebug($"[Packet {packet.Sequence}] False ack for {pendingSeq}");
continue;
}

Expand All @@ -118,13 +113,18 @@ private async ValueTask ProcessAckAsync(NetworkPacket packet)
_localWindowStart = (_localWindowStart + 1) % NetConstants.MaxSequence;

//clear packet
if (await _pendingPackets[pendingIdx].ClearAsync(Peer))
NetDebug.Write($"[PA]Received ack for sequence: {pendingSeq}");
var p = _pendingPackets[pendingIdx];
var r = await p.ClearAsync(Peer);
_pendingPackets[pendingIdx] = p;
if (r)
{
Peer.LogDebug($"[Packet {packet.Sequence}] Correct ack for {pendingSeq}");
}
}
}
finally
{
_pendingPacketsSemaphore.Release();
_pendingPacketsSem.Release();
}
}

Expand All @@ -151,7 +151,7 @@ protected override async Task<bool> FlushQueueAsync()
try
{
await outgoingQueueSem.WaitAsync();
try
try
{
while (OutgoingQueue.Count > 0)
{
Expand All @@ -162,7 +162,12 @@ protected override async Task<bool> FlushQueueAsync()
var netPacket = OutgoingQueue.Dequeue();
netPacket.Sequence = (ushort)_localSequence;
netPacket.ChannelId = _id;
_pendingPackets[_localSequence % _windowSize].Init(netPacket);
var prp = new PendingReliablePacket
{
_packet = netPacket
};
_pendingPackets[_localSequence % _windowSize] = prp;
//_pendingPackets[_localSequence % _windowSize] = new PendingReliablePacket(); .Init(netPacket);
_localSequence = (_localSequence + 1) % NetConstants.MaxSequence;
}
}
Expand All @@ -175,8 +180,17 @@ protected override async Task<bool> FlushQueueAsync()
for (var pendingSeq = _localWindowStart;
pendingSeq != _localSequence;
pendingSeq = (pendingSeq + 1) % NetConstants.MaxSequence)
if (await _pendingPackets[pendingSeq % _windowSize].TrySendAsync(currentTime, Peer))
{
var p = _pendingPackets[pendingSeq % _windowSize];
var sendSeq = await p.TrySendAsync(currentTime, Peer);
_pendingPackets[pendingSeq % _windowSize] = p;
//var sendSeq = await _pendingPackets[pendingSeq % _windowSize].TrySendAsync(currentTime, Peer);
Peer.LogDebug("Trying to send sequence number " + pendingSeq + ": " + sendSeq);
if (sendSeq)
{
hasPendingPackets = true;
}
}
}
finally
{
Expand Down Expand Up @@ -281,7 +295,7 @@ public override async ValueTask<bool> HandlePacketAsync(NetworkPacket packet)
//detailed check
if (seq == _remoteSequence)
{
Peer.LogDebug($"[Receive] {packet.Property} ({packet.Data.Count} bytes)");
Peer.LogDebug($"[Receive] {packet.Property} ({packet.Data.Count} bytes) (sequence {packet.Sequence})");
await Peer.AddReliablePacket(_deliveryMethod, packet);
_remoteSequence = (_remoteSequence + 1) % NetConstants.MaxSequence;

Expand Down Expand Up @@ -326,7 +340,7 @@ public override async ValueTask<bool> HandlePacketAsync(NetworkPacket packet)
private class PendingReliablePacket
{
private bool _isSent;
private NetworkPacket? _packet;
public NetworkPacket? _packet;
private long _timeStamp;

public override string ToString()
Expand All @@ -344,7 +358,9 @@ public void Init(NetworkPacket packet)
public async Task<bool> TrySendAsync(long utcNowTicks, PeerBase peer)
{
if (_packet == null)
{
return false;
}

if (_isSent) //check send time
{
Expand Down
5 changes: 0 additions & 5 deletions Common/Networking/Peers/PeerBase.Mtu.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ private async Task ProcessMtuPacketAsync(NetworkPacket packet)
{
case PacketProperty.MtuCheck:
_mtuCheckAttempts = 0;
NetDebug.Write($"[MTU] Check OK for MTU value {frontCheck}. Sending back MTU OK: " + frontCheck);
packet.Property = PacketProperty.MtuOk;
await PromulManager.RawSendAsync(packet, EndPoint);
break;
Expand All @@ -84,10 +83,6 @@ private async Task ProcessMtuPacketAsync(NetworkPacket packet)
_mtuNegotiationComplete = true;
NetDebug.Write($"[MTU] Negotiation complete. MTU for this session: {MaximumTransferUnit}.");
}
else
{
NetDebug.Write("[MTU] MTU confirm acknowledged. Setting MTU to " + MaximumTransferUnit);
}

break;
}
Expand Down
16 changes: 8 additions & 8 deletions Common/Networking/Peers/PeerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,13 @@ private async Task SendInternal(
var packetDataSize = maxMtuCarryingCapacity - NetConstants.FragmentHeaderSize;
var totalPackets = data.Count / packetDataSize + (data.Count % packetDataSize == 0 ? 0 : 1);

NetDebug.Write($@"Preparing to send {data.Count} bytes of fragmented data.
Complete data size (header + data): {completePackageSize}
Current MTU: {mtu}
Size of header for {property:G}: {headerSize}
Size of fragmentation header: {NetConstants.FragmentHeaderSize}
Maximum possible data per packet (MTU-header-fragment header): {packetDataSize}
That means we must send {totalPackets} total packets.");
// NetDebug.Write($@"Preparing to send {data.Count} bytes of fragmented data.
// Complete data size (header + data): {completePackageSize}
// Current MTU: {mtu}
// Size of header for {property:G}: {headerSize}
// Size of fragmentation header: {NetConstants.FragmentHeaderSize}
// Maximum possible data per packet (MTU-header-fragment header): {packetDataSize}
// That means we must send {totalPackets} total packets.");

if (totalPackets > ushort.MaxValue)
throw new TooBigPacketException("Data was split in " + totalPackets + " fragments, which exceeds " +
Expand Down Expand Up @@ -678,7 +678,7 @@ internal async Task SendUserData(NetworkPacket packet)
//if (mergedPacketSize + splitThreshold >= MaximumTransferUnit)
//{
await PromulManager.RawSendAsync(packet, EndPoint);
LogDebug($"[Send] {packet.Property} ({packet.Data.Count} bytes)");
LogDebug($"[Send] {packet.Property} ({packet.Data.Count} bytes) (sequence {packet.Sequence})");
//}
// if (_mergePos + mergedPacketSize > MaximumTransferUnit) await SendMerged();
//
Expand Down
7 changes: 0 additions & 7 deletions Common/Networking/README.md.meta

This file was deleted.

Loading

0 comments on commit 5b63b1e

Please sign in to comment.