Skip to content

Commit

Permalink
feat(io): add helper methods for connection
Browse files Browse the repository at this point in the history
  • Loading branch information
asvol committed Nov 27, 2024
1 parent 3320dc7 commit 420d3a6
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 116 deletions.
117 changes: 1 addition & 116 deletions src/Asv.IO/Protocol/Connection/IProtocolConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,122 +20,7 @@ public interface IProtocolConnection:ISupportTag,ISupportStatistic, IDisposable,

public static class ProtocolConnectionHelper
{
public static Observable<TMessage> RxFilter<TMessage, TMessageId>(this IProtocolConnection connection)
where TMessage : IProtocolMessage<TMessageId>, new()
{
var messageId = new TMessage().Id;
return connection.OnRxMessage.Where(messageId, (raw, id) =>
{
if (raw is TMessage message)
{
return message.Id != null && message.Id.Equals(id);
}

return false;

}).Cast<IProtocolMessage, TMessage>();
}

public static Observable<TMessage> RxFilter<TMessage, TMessageId>(this IProtocolConnection connection,
Func<TMessage, bool> filter)
where TMessage : IProtocolMessage<TMessageId>, new()
{
var messageId = new TMessage().Id;
return connection.OnRxMessage.Where(messageId, (raw, id) =>
{
if (raw is TMessage message)
{
return message.Id != null && message.Id.Equals(id) && filter(message);
}

return false;

}).Cast<IProtocolMessage, TMessage>();
}

public static async Task<TResult> SendAndWaitAnswer<TResult, TMessage, TMessageId>(
this IProtocolConnection connection,
IProtocolMessage request,
FilterDelegate<TResult, TMessage, TMessageId> filterAndGetResult,
CancellationToken cancel = default)
where TMessage : IProtocolMessage<TMessageId>, new()
{
cancel.ThrowIfCancellationRequested();
var tcs = new TaskCompletionSource<TResult>();
await using var c1 = cancel.Register(() => tcs.TrySetCanceled());
using var c2 = connection.RxFilter<TMessage, TMessageId>().Subscribe(filterAndGetResult, (res, f) =>
{
if (filterAndGetResult(res, out var result))
{
tcs.TrySetResult(result);
}
});
await connection.Send(request, cancel).ConfigureAwait(false);
return await tcs.Task.ConfigureAwait(false);
}

public static async Task<TResult> SendAndWaitAnswer<TResult, TMessage, TMessageId>(
this IProtocolConnection connection,
IProtocolMessage request,
FilterDelegate<TResult, TMessage, TMessageId> filterAndGetResult,
TimeSpan timeout,
CancellationToken cancel = default,
TimeProvider? timeProvider = null)
where TMessage : IProtocolMessage<TMessageId>, new()
{
timeProvider ??= TimeProvider.System;
using var linkedCancel = CancellationTokenSource.CreateLinkedTokenSource(cancel);
linkedCancel.CancelAfter(timeout, timeProvider);
return await connection.SendAndWaitAnswer(request, filterAndGetResult, cancel);
}

public static async Task<TResult> SendAndWaitAnswer<TResult, TRequestMessage, TResultMessage, TMessageId>(
this IProtocolConnection connection,
TRequestMessage request,
FilterDelegate<TResult, TResultMessage, TMessageId> filterAndGetResult,
TimeSpan timeout,
int maxAttemptCount,
ResendMessageModifyDelegate<TRequestMessage, TMessageId>? modifyRequestOnResend = null,
CancellationToken cancel = default,
TimeProvider? timeProvider = null,
IProgress<int>? progress = null)
where TResultMessage : IProtocolMessage<TMessageId>, new()
where TRequestMessage : IProtocolMessage<TMessageId>
{
cancel.ThrowIfCancellationRequested();
TResult? result = default;
byte currentAttempt = 0;
progress ??= new Progress<int>();
while (IsRetryCondition())
{
progress.Report(currentAttempt);
if (currentAttempt != 0)
{
modifyRequestOnResend?.Invoke(request, currentAttempt);
}

++currentAttempt;
try
{
result = await connection.SendAndWaitAnswer(request, filterAndGetResult, timeout, cancel, timeProvider)
.ConfigureAwait(false);
break;
}
catch (OperationCanceledException)
{
if (IsRetryCondition())
{
continue;
}

cancel.ThrowIfCancellationRequested();
}
}
if (result != null) return result;
throw new TimeoutException($"Timeout to execute '{request}' with {maxAttemptCount} x {timeout}'");

bool IsRetryCondition() => currentAttempt < maxAttemptCount;
}

}

public delegate bool FilterDelegate<TResult, in TMessage,TMessageId>(TMessage input, out TResult result)
Expand Down
1 change: 1 addition & 0 deletions src/Asv.IO/Protocol/IProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ public interface IProtocolFactory
IProtocolRouter CreateRouter(string id);
IVirtualConnection CreateVirtualConnection(Func<IProtocolMessage, bool>? clientToServerFilter = null,
Func<IProtocolMessage, bool>? serverToClientFilter = null);

}
138 changes: 138 additions & 0 deletions src/Asv.IO/Protocol/ProtocolHelper.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
using System;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Asv.Common;
using R3;

namespace Asv.IO;

Expand All @@ -12,6 +17,139 @@ public static partial class ProtocolHelper
private static readonly Regex IdNormailizeRegex = MyRegex();


public static Observable<TMessage> RxFilter<TMessage>(this IProtocolConnection connection)
where TMessage : IProtocolMessage => connection.OnRxMessage.RxFilter<TMessage>();

public static Observable<TMessage> RxFilter<TMessage>(this IProtocolConnection connection, Func<TMessage, bool> filter)
where TMessage : IProtocolMessage => connection.OnRxMessage.RxFilter<TMessage>(filter);

public static Observable<TMessage> RxFilter<TMessage>(this Observable<IProtocolMessage> src)
where TMessage : IProtocolMessage
{
return src.Where(raw => raw is TMessage).Cast<IProtocolMessage, TMessage>();
}

public static Observable<TMessage> RxFilter<TMessage>(this Observable<IProtocolMessage> src, Func<TMessage, bool> filter)
where TMessage : IProtocolMessage
{
return src.Where(raw => raw is TMessage)
.Cast<IProtocolMessage, TMessage>()
.Where(filter);
}

public static Observable<TMessage> RxFilterById<TMessage, TMessageId>(this Observable<IProtocolMessage> src)
where TMessage : IProtocolMessage<TMessageId>, new()
{
var messageId = new TMessage().Id;
return src.Where(messageId, (raw, id) =>
{
if (raw is TMessage message)
{
return message.Id != null && message.Id.Equals(id);
}
return false;

}).Cast<IProtocolMessage, TMessage>();
}
public static Observable<TMessage> RxFilterById<TMessage, TMessageId>(this Observable<IProtocolMessage> src,
Func<TMessage, bool> filter)
where TMessage : IProtocolMessage<TMessageId>, new()
{
return src.RxFilterById<TMessage, TMessageId>().Where(filter);
}

public static Observable<TMessage> RxFilterById<TMessage, TMessageId>(this IProtocolConnection connection)
where TMessage : IProtocolMessage<TMessageId>, new()
=> connection.OnRxMessage.RxFilterById<TMessage, TMessageId>();

public static Observable<TMessage> RxFilterById<TMessage, TMessageId>(this IProtocolConnection connection,
Func<TMessage, bool> filter)
where TMessage : IProtocolMessage<TMessageId>, new()
=> connection.OnRxMessage.RxFilterById<TMessage, TMessageId>(filter);

public static async Task<TResult> SendAndWaitAnswer<TResult, TMessage, TMessageId>(
this IProtocolConnection connection,
IProtocolMessage request,
FilterDelegate<TResult, TMessage, TMessageId> filterAndGetResult,
CancellationToken cancel = default)
where TMessage : IProtocolMessage<TMessageId>, new()
{
cancel.ThrowIfCancellationRequested();
var tcs = new TaskCompletionSource<TResult>();
await using var c1 = cancel.Register(() => tcs.TrySetCanceled());
using var c2 = connection.RxFilterById<TMessage, TMessageId>()
.Subscribe(filterAndGetResult, (res, f) =>
{
if (filterAndGetResult(res, out var result))
{
tcs.TrySetResult(result);
}
});
await connection.Send(request, cancel).ConfigureAwait(false);
return await tcs.Task.ConfigureAwait(false);
}

public static async Task<TResult> SendAndWaitAnswer<TResult, TMessage, TMessageId>(
this IProtocolConnection connection,
IProtocolMessage request,
FilterDelegate<TResult, TMessage, TMessageId> filterAndGetResult,
TimeSpan timeout,
CancellationToken cancel = default,
TimeProvider? timeProvider = null)
where TMessage : IProtocolMessage<TMessageId>, new()
{
timeProvider ??= TimeProvider.System;
using var linkedCancel = CancellationTokenSource.CreateLinkedTokenSource(cancel);
linkedCancel.CancelAfter(timeout, timeProvider);
return await connection.SendAndWaitAnswer(request, filterAndGetResult, cancel);
}

public static async Task<TResult> SendAndWaitAnswer<TResult, TRequestMessage, TResultMessage, TMessageId>(
this IProtocolConnection connection,
TRequestMessage request,
FilterDelegate<TResult, TResultMessage, TMessageId> filterAndGetResult,
TimeSpan timeout,
int maxAttemptCount,
ResendMessageModifyDelegate<TRequestMessage, TMessageId>? modifyRequestOnResend = null,
CancellationToken cancel = default,
TimeProvider? timeProvider = null,
IProgress<int>? progress = null)
where TResultMessage : IProtocolMessage<TMessageId>, new()
where TRequestMessage : IProtocolMessage<TMessageId>
{
cancel.ThrowIfCancellationRequested();
TResult? result = default;
byte currentAttempt = 0;
progress ??= new Progress<int>();
while (IsRetryCondition())
{
progress.Report(currentAttempt);
if (currentAttempt != 0)
{
modifyRequestOnResend?.Invoke(request, currentAttempt);
}

++currentAttempt;
try
{
result = await connection.SendAndWaitAnswer(request, filterAndGetResult, timeout, cancel, timeProvider)
.ConfigureAwait(false);
break;
}
catch (OperationCanceledException)
{
if (IsRetryCondition())
{
continue;
}

cancel.ThrowIfCancellationRequested();
}
}
if (result != null) return result;
throw new TimeoutException($"Timeout to execute '{request}' with {maxAttemptCount} x {timeout}'");

bool IsRetryCondition() => currentAttempt < maxAttemptCount;
}

}

0 comments on commit 420d3a6

Please sign in to comment.