From 2d716010289f842e3c098ac338536b8ed2cc597b Mon Sep 17 00:00:00 2001 From: Mark Cilia Vincenti Date: Sat, 20 Jan 2024 09:24:12 +0100 Subject: [PATCH 1/5] Fixes key locking issues --- .../AsyncDuplicateLock.cs | 88 ------------------- .../BTCPayServer.Lightning.LNDhub.csproj | 1 + .../LndHubClient.cs | 71 ++++++++------- .../LndHubInvoiceListener.cs | 36 +++++--- 4 files changed, 64 insertions(+), 132 deletions(-) delete mode 100644 src/BTCPayServer.Lightning.LNDhub/AsyncDuplicateLock.cs diff --git a/src/BTCPayServer.Lightning.LNDhub/AsyncDuplicateLock.cs b/src/BTCPayServer.Lightning.LNDhub/AsyncDuplicateLock.cs deleted file mode 100644 index 0483e99b..00000000 --- a/src/BTCPayServer.Lightning.LNDhub/AsyncDuplicateLock.cs +++ /dev/null @@ -1,88 +0,0 @@ -#nullable enable -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace BTCPayServer.Lightning.LndHub; - -///from https://stackoverflow.com/a/31194647/275504 -public sealed class AsyncDuplicateLock -{ - private sealed class RefCounted - { - public RefCounted(T value) - { - RefCount = 1; - Value = value; - } - - public int RefCount { get; set; } - public T Value { get; private set; } - } - - private readonly ConcurrentDictionary?> _semaphoreSlims = new(); - - private SemaphoreSlim GetOrCreate(object key) - { - RefCounted? item; - lock (_semaphoreSlims) - { - if (_semaphoreSlims.TryGetValue(key, out item) && item is { }) - { - ++item.RefCount; - } - else - { - item = new RefCounted(new SemaphoreSlim(1, 1)); - _semaphoreSlims[key] = item; - } - } - return item.Value; - } - - // get a lock for a specific key, and wait until it is available - public async Task LockAsync(object key, CancellationToken cancellationToken = default) - { - await GetOrCreate(key).WaitAsync(cancellationToken).ConfigureAwait(false); - return new Releaser(_semaphoreSlims, key); - } - - // get a lock for a specific key if it is available, or return null if it is currently locked - public async Task LockOrBustAsync(object key, CancellationToken cancellationToken = default) - { - var semaphore = GetOrCreate(key); - if (semaphore.CurrentCount == 0) - return null; - await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - return new Releaser(_semaphoreSlims, key); - } - private sealed class Releaser : IDisposable - { - private readonly ConcurrentDictionary?> _semaphoreSlims; - - public Releaser(ConcurrentDictionary?> semaphoreSlims, object key) - { - _semaphoreSlims = semaphoreSlims; - Key = key; - } - - private object Key { get; set; } - - public void Dispose() - { - RefCounted? item; - lock (_semaphoreSlims) - { - if (_semaphoreSlims.TryGetValue(Key, out item) && item is { }) - { - --item.RefCount; - if (item.RefCount == 0) - _semaphoreSlims.TryRemove(Key, out _); - } - } - item?.Value.Release(); - } - } -} diff --git a/src/BTCPayServer.Lightning.LNDhub/BTCPayServer.Lightning.LNDhub.csproj b/src/BTCPayServer.Lightning.LNDhub/BTCPayServer.Lightning.LNDhub.csproj index 27ca4004..52e313b9 100644 --- a/src/BTCPayServer.Lightning.LNDhub/BTCPayServer.Lightning.LNDhub.csproj +++ b/src/BTCPayServer.Lightning.LNDhub/BTCPayServer.Lightning.LNDhub.csproj @@ -18,6 +18,7 @@ + diff --git a/src/BTCPayServer.Lightning.LNDhub/LndHubClient.cs b/src/BTCPayServer.Lightning.LNDhub/LndHubClient.cs index 4be3e222..9b231ce8 100644 --- a/src/BTCPayServer.Lightning.LNDhub/LndHubClient.cs +++ b/src/BTCPayServer.Lightning.LNDhub/LndHubClient.cs @@ -8,6 +8,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using AsyncKeyedLock; using BTCPayServer.Lightning.LNDhub.JsonConverters; using BTCPayServer.Lightning.LNDhub.Models; using NBitcoin; @@ -32,7 +33,11 @@ public class LndHubClient private static readonly HttpClient _sharedClient = new (); private static readonly ConcurrentDictionary _cache = new(); public readonly string CacheKey; - private static readonly AsyncDuplicateLock _locker = new(); + private static readonly AsyncKeyedLocker _locker = new(o => + { + o.PoolSize = 20; + o.PoolInitialFill = 1; + }); public LndHubClient(Uri baseUri, string login, string password, Network network, HttpClient httpClient) { @@ -202,49 +207,53 @@ public async Task CreateInvoiceSession(CancellationTo private async Task ClearAccessToken() { - using var release = await _locker.LockAsync(CacheKey); - _cache.TryRemove(CacheKey, out _); + using (await _locker.LockAsync(CacheKey)) + { + _cache.TryRemove(CacheKey, out _); + } } private async Task GetAccessToken(CancellationToken cancellation = default) { - using var release = await _locker.LockAsync(CacheKey, cancellation); - AuthResponse response; - if (_cache.TryGetValue(CacheKey, out var cached)) + using (await _locker.LockAsync(CacheKey, cancellation)) { - if (cached.Expiry <= DateTimeOffset.UtcNow) + AuthResponse response; + if (_cache.TryGetValue(CacheKey, out var cached)) { - _cache.TryRemove(CacheKey, out _); + if (cached.Expiry <= DateTimeOffset.UtcNow) + { + _cache.TryRemove(CacheKey, out _); + } + else if (cached.Expiry - DateTimeOffset.UtcNow > TimeSpan.FromMinutes(5)) + { + return cached.AccessToken; + } + + response = await Post("auth?type=refresh_token", + new AuthRequest { RefreshToken = cached.RefreshToken }, cancellation); } - else if (cached.Expiry - DateTimeOffset.UtcNow > TimeSpan.FromMinutes(5)) + else { - return cached.AccessToken; + response = await Post("auth?type=auth", + new AuthRequest { Login = _login, Password = _password }, cancellation); } - response = await Post("auth?type=refresh_token", - new AuthRequest {RefreshToken = cached.RefreshToken}, cancellation); - } - else - { - response = await Post("auth?type=auth", - new AuthRequest {Login = _login, Password = _password}, cancellation); - } - - if (response.Expiry is null) - { - try + if (response.Expiry is null) { - response.Expiry = DateTimeOffset.FromUnixTimeSeconds( - long.Parse(ParseClaimsFromJwt(response.AccessToken).First(claim => claim.Type == "exp").Value)); - } - catch (Exception) - { - //it's ok if we dont parse it, once auth fails we try again + try + { + response.Expiry = DateTimeOffset.FromUnixTimeSeconds( + long.Parse(ParseClaimsFromJwt(response.AccessToken).First(claim => claim.Type == "exp").Value)); + } + catch (Exception) + { + //it's ok if we dont parse it, once auth fails we try again + } } - } - _cache.AddOrReplace(CacheKey, response); - return response.AccessToken; + _cache.AddOrReplace(CacheKey, response); + return response.AccessToken; + } } private static IEnumerable ParseClaimsFromJwt(string jwt) { diff --git a/src/BTCPayServer.Lightning.LNDhub/LndHubInvoiceListener.cs b/src/BTCPayServer.Lightning.LNDhub/LndHubInvoiceListener.cs index a59d9c8b..ee173c15 100644 --- a/src/BTCPayServer.Lightning.LNDhub/LndHubInvoiceListener.cs +++ b/src/BTCPayServer.Lightning.LNDhub/LndHubInvoiceListener.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; +using AsyncKeyedLock; using BTCPayServer.Lightning.LNDhub.Models; using NBitcoin; @@ -54,31 +55,36 @@ public void Dispose() Dispose(true); } - static readonly AsyncDuplicateLock _locker = new(); + static readonly AsyncKeyedLocker _locker = new(o => + { + o.PoolSize = 20; + o.PoolInitialFill = 1; + }); static readonly ConcurrentDictionary _activeListeners = new(); private async Task ListenLoop() { try { - var releaser = await _locker.LockOrBustAsync(_client.CacheKey, _cts.Token); - if (releaser is null) + AsyncKeyedLockTimeoutReleaser releaser = null; + try { - while (!_cts.IsCancellationRequested &&releaser is null) + releaser = await _locker.LockAsync(_client.CacheKey, 0, _cts.Token); + if (!releaser.EnteredSemaphore) { - if (_activeListeners.TryGetValue(_client.CacheKey, out var invoicesData)) + while (!_cts.IsCancellationRequested && !releaser.EnteredSemaphore) { - await HandleInvoicesData(invoicesData); + if (_activeListeners.TryGetValue(_client.CacheKey, out var invoicesData)) + { + await HandleInvoicesData(invoicesData); + } + releaser = await _locker.LockAsync(_client.CacheKey, 0, _cts.Token); + + if (!releaser.EnteredSemaphore) + await Task.Delay(2500, _cts.Token); } - releaser = await _locker.LockOrBustAsync(_client.CacheKey, _cts.Token); - - if(releaser is null) - await Task.Delay(2500, _cts.Token); } - } - using (releaser) - { while (!_cts.IsCancellationRequested) { var invoicesData = await _client.GetInvoices(_cts.Token); @@ -88,6 +94,10 @@ private async Task ListenLoop() await Task.Delay(2500, _cts.Token); } } + finally + { + releaser.Dispose(); + } } catch when (_cts.IsCancellationRequested) { From c08799179cc2209854fa2f278f8650f350465702 Mon Sep 17 00:00:00 2001 From: Mark Cilia Vincenti Date: Tue, 23 Jan 2024 12:28:36 +0100 Subject: [PATCH 2/5] Bump AsyncKeyedLock to 6.3.4; performance improvement --- .../BTCPayServer.Lightning.LNDhub.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/BTCPayServer.Lightning.LNDhub/BTCPayServer.Lightning.LNDhub.csproj b/src/BTCPayServer.Lightning.LNDhub/BTCPayServer.Lightning.LNDhub.csproj index 52e313b9..f057da18 100644 --- a/src/BTCPayServer.Lightning.LNDhub/BTCPayServer.Lightning.LNDhub.csproj +++ b/src/BTCPayServer.Lightning.LNDhub/BTCPayServer.Lightning.LNDhub.csproj @@ -18,7 +18,7 @@ - + From 987441a5a1d9419f758a249f533352b43e296cbf Mon Sep 17 00:00:00 2001 From: Mark Cilia Vincenti Date: Tue, 23 Jan 2024 12:33:12 +0100 Subject: [PATCH 3/5] Fixed tests to test on AsyncKeyedLocker --- tests/AsycLockTests.cs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/tests/AsycLockTests.cs b/tests/AsycLockTests.cs index 673ba88c..ed91cedb 100644 --- a/tests/AsycLockTests.cs +++ b/tests/AsycLockTests.cs @@ -1,7 +1,8 @@ -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; +using AsyncKeyedLock; using BTCPayServer.Lightning.LndHub; using Xunit; using Xunit.Abstractions; @@ -19,7 +20,11 @@ public AsyncDuplicateLockTests(ITestOutputHelper outputHelper) public class Wallet { - private static AsyncDuplicateLock _lock = new(); + private static AsyncKeyedLocker _lock = new(o => + { + o.PoolSize = 20; + o.PoolInitialFill = 1; + }); public string Id { get; set; } public decimal Balance { get; private set; } @@ -85,7 +90,11 @@ public async Task Spend_WhenConcurrentlyExceedingBalance_ShouldPreventOverdraw() [Fact] public async Task LockAsync_MultipleParallelForeach_ShouldNotDuplicateEntries() { - var lockObj = new AsyncDuplicateLock(); + var lockObj = new AsyncKeyedLocker(o => + { + o.PoolSize = 20; + o.PoolInitialFill = 1; + }); var alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; var resultList = new ConcurrentDictionary(); int iterationsPerLetter = 100; // Number of iterations per letter @@ -95,7 +104,7 @@ async Task WriteToList(char letter) while (true) { - var release = await lockObj.LockOrBustAsync(letter); + var release = await lockObj.LockAsync(letter, 0); if (release is null) { continue; From 88e155fdeace4ae9896786d12cea457cb8f2df24 Mon Sep 17 00:00:00 2001 From: Mark Cilia Vincenti Date: Tue, 30 Jan 2024 20:13:11 +0100 Subject: [PATCH 4/5] Fixed test --- tests/AsycLockTests.cs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/tests/AsycLockTests.cs b/tests/AsycLockTests.cs index ed91cedb..51fb6fef 100644 --- a/tests/AsycLockTests.cs +++ b/tests/AsycLockTests.cs @@ -102,16 +102,13 @@ public async Task LockAsync_MultipleParallelForeach_ShouldNotDuplicateEntries() async Task WriteToList(char letter) { while (true) - { - var release = await lockObj.LockAsync(letter, 0); - if (release is null) - { - continue; - } - - using (release) + using (var releaser = await lockObj.LockAsync(letter, 0)) { + if (releaser.EnteredSemaphore) + { + continue; + } try { if (resultList.TryGetValue(letter.ToString(), out var count) && From 379844e7607779a9b1f6a4c89229abdc9ff6bd61 Mon Sep 17 00:00:00 2001 From: Mark Cilia Vincenti Date: Tue, 30 Jan 2024 20:17:24 +0100 Subject: [PATCH 5/5] Fixed test. --- tests/AsycLockTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/AsycLockTests.cs b/tests/AsycLockTests.cs index 51fb6fef..4b25b6c0 100644 --- a/tests/AsycLockTests.cs +++ b/tests/AsycLockTests.cs @@ -105,7 +105,7 @@ async Task WriteToList(char letter) { using (var releaser = await lockObj.LockAsync(letter, 0)) { - if (releaser.EnteredSemaphore) + if (!releaser.EnteredSemaphore) { continue; }