Skip to content

Commit

Permalink
fix: InternalFtpReply session overwriting
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya committed Oct 21, 2024
1 parent bef2b72 commit c0dadd3
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 24 deletions.
23 changes: 10 additions & 13 deletions src/Asv.Mavlink.Test/Microservices/Ftp/FtpMicroservice.Test.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public async Task CreateFile_WithProperInput_Success(string filePath, byte sessi
server.CreateFile = (path, cancellationToken) => Task.FromResult(session);

var result = await client.CreateFile(filePath);
Assert.Equal(session, result.ReadSession());
Assert.Equal(FtpOpcode.Ack, result.ReadOpcode());
Assert.Equal(session, result.Session);
}

[Theory]
Expand Down Expand Up @@ -393,41 +392,39 @@ public async Task Client_Call_UploadFile_And_Server_Catch_It()
const byte expectedSession = 1;
var expectedFileSize = (uint)testData.Length;

var openFileWriteCalled = 0;
var writeFileCalled = 0;
var terminateSessionCalled = 0;

server.OpenFileWrite = (path, cancel) =>
{
openFileWriteCalled++;
Assert.Equal(testFilePath, path);
return Task.FromResult(new WriteHandle(expectedSession, expectedFileSize));
};

var receivedData = new byte[testData.Length];
server.WriteFile = (request, data, cancel) =>
{
writeFileCalled++;
Assert.Equal(expectedSession, request.Session);
var expectedDataLength = request.Take;
_output.WriteLine($"Write exceeds buffer size. Written={request.Skip + data.Length}. To be write={receivedData.Length}");
data[..expectedDataLength].CopyTo(receivedData.AsMemory((int)request.Skip, expectedDataLength));
return Task.CompletedTask;
};

server.CreateFile = (path, cancel) =>
{
Assert.Equal(testFilePath, path);
return Task.FromResult(expectedSession);
};

server.TerminateSession = (session, cancel) =>
{
terminateSessionCalled++;
Assert.Equal(expectedSession, session);
return Task.CompletedTask;
};

var ftpClientEx = new FtpClientEx(client);
await ftpClientEx.UploadFile(testFilePath, streamToUpload);

Assert.Equal(1, openFileWriteCalled);
Assert.True(writeFileCalled > 0);
Assert.Equal(1, terminateSessionCalled);

var writeHandle = await client.OpenFileWrite(testFilePath);
Assert.Equal(expectedSession, writeHandle.Session);
Assert.Equal(testData.Length, receivedData.Length);
Assert.Equal(testData.ToArray(), receivedData);
}
Expand Down
10 changes: 6 additions & 4 deletions src/Asv.Mavlink/Microservices/Ftp/Client/Ex/FtpClientEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,20 @@ public async Task DownloadFile(string filePath,Stream streamToSave, IProgress<do
public async Task UploadFile(string filePath, Stream streamToUpload, IProgress<double>? progress = null, CancellationToken cancel = default)
{
progress ??= new Progress<double>();
var file = await Base.OpenFileWrite(filePath, cancel).ConfigureAwait(false);
var writeHandle = await Base.OpenFileWrite(filePath, cancel).ConfigureAwait(false);
var session = writeHandle.Session;
var totalWritten = 0L;
var buffer = ArrayPool<byte>.Shared.Rent(MavlinkFtpHelper.MaxDataSize);

try
{
while (true)
{
var bytesRead = await streamToUpload.ReadAsync(buffer.AsMemory(0, MavlinkFtpHelper.MaxDataSize), cancel).ConfigureAwait(false);
int maxChunkSize = Math.Min(MavlinkFtpHelper.MaxDataSize, byte.MaxValue);
var bytesRead = await streamToUpload.ReadAsync(buffer.AsMemory(0, maxChunkSize), cancel).ConfigureAwait(false);
if (bytesRead == 0) break;

var request = new WriteRequest(file.Session, (uint)totalWritten, (byte)bytesRead);
var request = new WriteRequest(session, (uint)totalWritten, (byte)bytesRead);
var memory = new Memory<byte>(buffer, 0, bytesRead);

await Base.WriteFile(request, memory, cancel).ConfigureAwait(false);
Expand All @@ -272,7 +274,7 @@ public async Task UploadFile(string filePath, Stream streamToUpload, IProgress<d
finally
{
ArrayPool<byte>.Shared.Return(buffer);
await Base.TerminateSession(file.Session, cancel).ConfigureAwait(false);
await Base.TerminateSession(session, cancel).ConfigureAwait(false);
}
}
}
13 changes: 8 additions & 5 deletions src/Asv.Mavlink/Microservices/Ftp/Client/FtpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public async Task<FileTransferProtocolPacket> CreateDirectory(string path, Cance
return result;
}

public async Task<FileTransferProtocolPacket> CreateFile(string path, CancellationToken cancellationToken = default)
public async Task<CreateHandle> CreateFile(string path, CancellationToken cancellationToken = default)
{
MavlinkFtpHelper.CheckFilePath(path);
_logger.ZLogInformation($"{LogSend} {FtpOpcode.CreateFile:G}({path})");
Expand All @@ -202,7 +202,9 @@ public async Task<FileTransferProtocolPacket> CreateFile(string path, Cancellati
p => p.WriteDataAsString(path),
cancellationToken)
.ConfigureAwait(false);
return result;
var sessionId = result.ReadSession();
_logger.ZLogInformation($"{LogRecv} {FtpOpcode.CreateFile:G}({path}): session={sessionId}");
return new CreateHandle(sessionId, path);
}

public Task TerminateSession(byte session, CancellationToken cancel = default)
Expand Down Expand Up @@ -247,15 +249,16 @@ public async Task<ReadHandle> OpenFileRead(string path,CancellationToken cancel
_logger.ZLogInformation($"{LogSend} {FtpOpcode.OpenFileRO:G}({path})");
var result = await InternalFtpCall(FtpOpcode.OpenFileRO,p => p.WriteDataAsString(path), cancel).ConfigureAwait(false);
InternalCheckNack(result, _logger);
var resultSize = result.ReadSize();
// ACK on success. The payload must specify fields: session = file session id, size = 4, data = length of file that has been opened.


var sessionId = result.ReadSession();
var resultSize = result.ReadSize();
if (resultSize != 4)
{
_logger.ZLogError($"Unexpected error to {FtpOpcode.OpenFileRO:G}: ACK must be {4} byte length");
throw new FtpException($"Unexpected error to {FtpOpcode.OpenFileRO:G}: ACK must be {4} byte length");
}

var sessionId = result.ReadSession();
var fileSize = result.ReadDataAsUint();
_logger.ZLogInformation($"{LogRecv} {FtpOpcode.OpenFileRO:G}({path}): session={sessionId}, size={fileSize}, '{path}'={fileSize}");
return new ReadHandle(sessionId,fileSize);
Expand Down
4 changes: 2 additions & 2 deletions src/Asv.Mavlink/Microservices/Ftp/Client/IFtpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IFtpClient
Task<WriteHandle> OpenFileWrite(string path, CancellationToken cancel = default);

public Task<FileTransferProtocolPacket> CreateDirectory(string path, CancellationToken cancellationToken = default);
public Task<FileTransferProtocolPacket> CreateFile(string path, CancellationToken cancellationToken = default);
public Task<CreateHandle> CreateFile(string path, CancellationToken cancellationToken = default);
public Task<FileTransferProtocolPacket> ResetSessions(CancellationToken cancellationToken = default);
public Task<FileTransferProtocolPacket> RemoveDirectory(string path, CancellationToken cancellationToken = default);
public Task<FileTransferProtocolPacket> RemoveFile(string path, CancellationToken cancellationToken = default);
Expand Down Expand Up @@ -117,7 +117,7 @@ public readonly struct CreateHandle(byte session, string path)
{
public readonly byte Session = session;
public readonly string Path = path;
public override string ToString() => $"CREATE_DIR(session: {Session}, path: {path})";
public override string ToString() => $"CREATE(session: {Session}, path: {Path})";
}

public readonly struct TruncateRequest(string path, uint offset)
Expand Down
2 changes: 2 additions & 0 deletions src/Asv.Mavlink/Microservices/Ftp/Server/FtpServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ private async Task InternalCreateFile(FileTransferProtocolPacket input)
await InternalFtpReply(input, FtpOpcode.Ack, p =>
{
p.WriteSession(result);
p.WriteSize(0);
}).ConfigureAwait(false);
}

Expand Down Expand Up @@ -419,6 +420,7 @@ private async Task InternalOpenFileWrite(FileTransferProtocolPacket input)
await InternalFtpReply(input, FtpOpcode.Ack, p =>
{
p.WriteSession(_lastWriteHandle.Session);
p.WriteSize(4);
p.WriteDataAsUint(_lastWriteHandle.Size);
}).ConfigureAwait(false);
}
Expand Down

0 comments on commit c0dadd3

Please sign in to comment.