From 879573d5f6222f982b423da592025dcd6d2052d3 Mon Sep 17 00:00:00 2001 From: Thorsten Thiel Date: Wed, 4 Sep 2024 23:28:24 +0200 Subject: [PATCH 1/2] Add Benchmark --- src/Benchmark/Bench.cs | 139 ++++++++++++++++++ src/Benchmark/Benchmark.csproj | 19 +++ src/Benchmark/Program.cs | 4 + src/Fluss.sln | 6 + .../UnitOfWork/UnitOfWork.Authorization.cs | 19 ++- 5 files changed, 182 insertions(+), 5 deletions(-) create mode 100644 src/Benchmark/Bench.cs create mode 100644 src/Benchmark/Benchmark.csproj create mode 100644 src/Benchmark/Program.cs diff --git a/src/Benchmark/Bench.cs b/src/Benchmark/Bench.cs new file mode 100644 index 0000000..f1d7c3c --- /dev/null +++ b/src/Benchmark/Bench.cs @@ -0,0 +1,139 @@ +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Diagnosers; +using BenchmarkDotNet.Jobs; +using Fluss; +using Fluss.Authentication; +using Fluss.Events; +using Fluss.ReadModel; +using Microsoft.Extensions.DependencyInjection; + +namespace Benchmark; + +[SimpleJob(baseline: true)] +[SimpleJob(RuntimeMoniker.Net90)] +[MemoryDiagnoser] +public class Bench +{ + [IterationSetup] + public void Setup() + { + var sc = new ServiceCollection(); + sc.AddEventSourcing(); + sc.AddPoliciesFrom(typeof(Bench).Assembly); + + _sp = sc.BuildServiceProvider(); + } + + ServiceProvider _sp = null!; + + [Benchmark] + public async Task PublishEventsAndReadMixedReadWrite() + { + var sum = 0; + for (var j = 0; j < 1000; j++) + { + await _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork => { + for (var i = 0; i < 50; i++) + { + await unitOfWork.Publish(new TestEvent(1)); + await unitOfWork.Publish(new TestEvent(2)); + } + }); + + var unitOfWork = _sp.GetSystemUserUnitOfWork(); + var readModel1 = await unitOfWork.GetReadModel(1); + var readModel2 = await unitOfWork.GetReadModel(2); + sum += readModel1.GotEvents + readModel2.GotEvents; + } + + return sum; + } + + [IterationSetup(Targets = [nameof(PublishEventsAndReadReadHeavySingleReadModel), nameof(PublishEventsAndReadReadHeavyMultipleReadModel)])] + public void SetupHeavyRead() + { + var sc = new ServiceCollection(); + sc.AddEventSourcing(); + sc.AddPoliciesFrom(typeof(Bench).Assembly); + + _sp = sc.BuildServiceProvider(); + + _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork => { + for (var i = 0; i < 10000; i++) + { + await unitOfWork.Publish(new TestEvent(i)); + } + }).AsTask().Wait(); + } + + + [Benchmark] + public async Task PublishEventsAndReadReadHeavySingleReadModel() + { + var sum = 0; + for (var j = 0; j < 50000; j++) + { + var unitOfWork = _sp.GetSystemUserUnitOfWork(); + var readModel1 = await unitOfWork.GetReadModel(3); + sum += readModel1.GotEvents; + } + + return sum; + } + + [Benchmark] + public async Task PublishEventsAndReadReadHeavyMultipleReadModel() + { + var sum = 0; + for (var j = 1; j < 5000; j++) + { + var unitOfWork = _sp.GetSystemUserUnitOfWork(); + var readModel1 = await unitOfWork.GetReadModel(j); + sum += readModel1.GotEvents; + } + + return sum; + } +} + +public class AllowAllPolicy : Policy { + public ValueTask AuthenticateEvent(EventEnvelope envelope, IAuthContext authContext) + { + return ValueTask.FromResult(true); + } + + public ValueTask AuthenticateReadModel(IReadModel readModel, IAuthContext authContext) + { + return ValueTask.FromResult(true); + } +} + +public record TestEvent(int Id) : Event; + +public record EventsEqualReadModel : ReadModelWithKey +{ + public int GotEvents { get; private init; } + + protected override EventsEqualReadModel When(EventEnvelope envelope) + { + return envelope.Event switch + { + TestEvent testEvent when testEvent.Id == Id => this with { GotEvents = GotEvents + 1 }, + _ => this, + }; + } +} + +public record EventsModEqualReadModel : ReadModelWithKey +{ + public int GotEvents { get; private init; } + + protected override EventsModEqualReadModel When(EventEnvelope envelope) + { + return envelope.Event switch + { + TestEvent testEvent when testEvent.Id % Id == 0 => this with { GotEvents = GotEvents + 1 }, + _ => this, + }; + } +} \ No newline at end of file diff --git a/src/Benchmark/Benchmark.csproj b/src/Benchmark/Benchmark.csproj new file mode 100644 index 0000000..6aac7ea --- /dev/null +++ b/src/Benchmark/Benchmark.csproj @@ -0,0 +1,19 @@ + + + + Exe + net9.0;net8.0 + enable + enable + + + + + + + + + + + + diff --git a/src/Benchmark/Program.cs b/src/Benchmark/Program.cs new file mode 100644 index 0000000..d7bd483 --- /dev/null +++ b/src/Benchmark/Program.cs @@ -0,0 +1,4 @@ +using Benchmark; +using BenchmarkDotNet.Running; + +BenchmarkRunner.Run(); \ No newline at end of file diff --git a/src/Fluss.sln b/src/Fluss.sln index 1154d39..0ae3d82 100644 --- a/src/Fluss.sln +++ b/src/Fluss.sln @@ -12,6 +12,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluss.Testing", "Fluss.Test EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluss.Regen", "Fluss.Regen\Fluss.Regen.csproj", "{6E250321-6993-4B94-8600-85E15BF713FA}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmark", "Benchmark\Benchmark.csproj", "{970AB461-4D44-4B99-AACA-7DAA569D4C34}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -42,5 +44,9 @@ Global {6E250321-6993-4B94-8600-85E15BF713FA}.Debug|Any CPU.Build.0 = Debug|Any CPU {6E250321-6993-4B94-8600-85E15BF713FA}.Release|Any CPU.ActiveCfg = Release|Any CPU {6E250321-6993-4B94-8600-85E15BF713FA}.Release|Any CPU.Build.0 = Release|Any CPU + {970AB461-4D44-4B99-AACA-7DAA569D4C34}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {970AB461-4D44-4B99-AACA-7DAA569D4C34}.Debug|Any CPU.Build.0 = Debug|Any CPU + {970AB461-4D44-4B99-AACA-7DAA569D4C34}.Release|Any CPU.ActiveCfg = Release|Any CPU + {970AB461-4D44-4B99-AACA-7DAA569D4C34}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/src/Fluss/UnitOfWork/UnitOfWork.Authorization.cs b/src/Fluss/UnitOfWork/UnitOfWork.Authorization.cs index fc0672a..c0e027c 100644 --- a/src/Fluss/UnitOfWork/UnitOfWork.Authorization.cs +++ b/src/Fluss/UnitOfWork/UnitOfWork.Authorization.cs @@ -1,9 +1,6 @@ using Fluss.Authentication; using Fluss.Events; using Fluss.ReadModel; -#if !DEBUG -using Fluss.Extensions; -#endif namespace Fluss; @@ -21,7 +18,7 @@ private async ValueTask AuthorizeUsage(EventEnvelope envelope) return accepted.Count > 0; #else - return await _policies.Select(p => p.AuthenticateEvent(envelope, ac)).AnyAsync(); + return await AnyAsync(_policies.Select(p => p.AuthenticateEvent(envelope, ac))); #endif } @@ -37,7 +34,19 @@ private async ValueTask AuthorizeUsage(IReadModel eventListener) return accepted.Count > 0; #else - return await _policies.Select(p => p.AuthenticateReadModel(eventListener, ac)).AnyAsync(); + return await AnyAsync(_policies.Select(p => p.AuthenticateReadModel(eventListener, ac))); #endif } + + private static async ValueTask AnyAsync(IEnumerable> valueTasks) + { + foreach (var valueTask in valueTasks) + { + if (await valueTask) + { + return true; + } + } + return false; + } } From 6431291ee7e1a1a09d600d65f41d8d22910cbeb7 Mon Sep 17 00:00:00 2001 From: Thorsten Thiel Date: Sat, 7 Sep 2024 16:30:06 +0200 Subject: [PATCH 2/2] Improve benchmarks --- src/.gitignore | 4 +- src/Benchmark/Bench.cs | 50 ++++---- src/Benchmark/Benchmark.csproj | 2 +- src/Benchmark/Program.cs | 2 +- .../AddExtensionMiddleware.cs | 6 +- .../PostgreSQLEventRepository.cs | 4 +- src/Fluss.Testing/AggregateTestBed.cs | 4 +- src/Fluss.Testing/EventRepositoryTestBase.cs | 4 +- src/Fluss.Testing/EventTestBed.cs | 2 +- src/Fluss.Testing/ReadModelTestBed.cs | 4 +- .../Core/Authentication/AuthContextTest.cs | 2 +- .../Core/Events/InMemoryListenerCacheTest.cs | 14 +-- ...sientEventAwareEventListenerFactoryTest.cs | 2 +- .../UnitOfWorkAndAuthorizationTest.cs | 7 +- .../Core/UnitOfWork/UnitOfWorkTest.cs | 77 ++++++------ .../Core/Validation/RootValidatorTests.cs | 8 +- src/Fluss.sln.DotSettings.user | 15 --- .../ArbitraryUserUnitOfWorkExtension.cs | 22 ++-- src/Fluss/Authentication/AuthContext.cs | 59 ++++++++-- src/Fluss/Events/EventEnvelope.cs | 3 +- src/Fluss/Events/EventListener.cs | 35 +++--- src/Fluss/Events/EventListenerFactory.cs | 2 +- .../Events/EventMemoryArrayExtensions.cs | 18 +-- src/Fluss/Events/EventRepository.cs | 4 +- src/Fluss/Events/InMemoryEventCache.cs | 7 +- .../Events/InMemoryEventListenerCache.cs | 24 ++-- src/Fluss/Events/InMemoryEventRepository.cs | 14 ++- .../TransientEventAwareEventRepository.cs | 110 +++++++++--------- src/Fluss/Fluss.csproj | 2 + src/Fluss/ServiceCollectionExtensions.cs | 7 +- src/Fluss/SideEffects/SideEffectDispatcher.cs | 4 +- src/Fluss/UnitOfWork/IUnitOfWork.cs | 3 +- src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs | 41 ++++--- .../UnitOfWork/UnitOfWork.Authorization.cs | 66 ++++++----- src/Fluss/UnitOfWork/UnitOfWork.ReadModels.cs | 21 +++- src/Fluss/UnitOfWork/UnitOfWork.cs | 90 ++++++++++---- src/Fluss/UnitOfWork/UnitOfWorkFactory.cs | 4 +- .../UnitOfWork/UnitOfWorkRecordingProxy.cs | 12 +- src/Fluss/Validation/RootValidator.cs | 17 +-- 39 files changed, 445 insertions(+), 327 deletions(-) delete mode 100644 src/Fluss.sln.DotSettings.user diff --git a/src/.gitignore b/src/.gitignore index 606d593..89b9cdb 100644 --- a/src/.gitignore +++ b/src/.gitignore @@ -15,4 +15,6 @@ msbuild.wrn .idea -*.received.* \ No newline at end of file +*.received.* + +*.user \ No newline at end of file diff --git a/src/Benchmark/Bench.cs b/src/Benchmark/Bench.cs index f1d7c3c..8e6e9b2 100644 --- a/src/Benchmark/Bench.cs +++ b/src/Benchmark/Bench.cs @@ -1,5 +1,5 @@ -using BenchmarkDotNet.Attributes; -using BenchmarkDotNet.Diagnosers; +using BenchmarkDotNet_GitCompare; +using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Jobs; using Fluss; using Fluss.Authentication; @@ -9,8 +9,9 @@ namespace Benchmark; -[SimpleJob(baseline: true)] -[SimpleJob(RuntimeMoniker.Net90)] +[GitJob("879573d", baseline: true, id: "0_before")] +[SimpleJob(id: "1_after")] +[RPlotExporter] [MemoryDiagnoser] public class Bench { @@ -20,10 +21,10 @@ public void Setup() var sc = new ServiceCollection(); sc.AddEventSourcing(); sc.AddPoliciesFrom(typeof(Bench).Assembly); - + _sp = sc.BuildServiceProvider(); } - + ServiceProvider _sp = null!; [Benchmark] @@ -32,7 +33,8 @@ public async Task PublishEventsAndReadMixedReadWrite() var sum = 0; for (var j = 0; j < 1000; j++) { - await _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork => { + await _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork => + { for (var i = 0; i < 50; i++) { await unitOfWork.Publish(new TestEvent(1)); @@ -40,63 +42,67 @@ await _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork => { } }); - var unitOfWork = _sp.GetSystemUserUnitOfWork(); + using var unitOfWork = _sp.GetSystemUserUnitOfWork(); var readModel1 = await unitOfWork.GetReadModel(1); var readModel2 = await unitOfWork.GetReadModel(2); sum += readModel1.GotEvents + readModel2.GotEvents; } - + return sum; } - + [IterationSetup(Targets = [nameof(PublishEventsAndReadReadHeavySingleReadModel), nameof(PublishEventsAndReadReadHeavyMultipleReadModel)])] public void SetupHeavyRead() { var sc = new ServiceCollection(); sc.AddEventSourcing(); sc.AddPoliciesFrom(typeof(Bench).Assembly); - + _sp = sc.BuildServiceProvider(); - _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork => { + _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork => + { for (var i = 0; i < 10000; i++) { await unitOfWork.Publish(new TestEvent(i)); } }).AsTask().Wait(); } - - + + [Benchmark] public async Task PublishEventsAndReadReadHeavySingleReadModel() { var sum = 0; + for (var j = 0; j < 50000; j++) { - var unitOfWork = _sp.GetSystemUserUnitOfWork(); + using var unitOfWork = _sp.GetSystemUserUnitOfWork(); var readModel1 = await unitOfWork.GetReadModel(3); sum += readModel1.GotEvents; } - + return sum; } - + [Benchmark] public async Task PublishEventsAndReadReadHeavyMultipleReadModel() { var sum = 0; + for (var j = 1; j < 5000; j++) { - var unitOfWork = _sp.GetSystemUserUnitOfWork(); + using var unitOfWork = _sp.GetSystemUserUnitOfWork(); var readModel1 = await unitOfWork.GetReadModel(j); sum += readModel1.GotEvents; } - + return sum; } } -public class AllowAllPolicy : Policy { +public class AllowAllPolicy : Policy +{ public ValueTask AuthenticateEvent(EventEnvelope envelope, IAuthContext authContext) { return ValueTask.FromResult(true); @@ -122,7 +128,7 @@ protected override EventsEqualReadModel When(EventEnvelope envelope) _ => this, }; } -} +} public record EventsModEqualReadModel : ReadModelWithKey { @@ -136,4 +142,4 @@ protected override EventsModEqualReadModel When(EventEnvelope envelope) _ => this, }; } -} \ No newline at end of file +} diff --git a/src/Benchmark/Benchmark.csproj b/src/Benchmark/Benchmark.csproj index 6aac7ea..a884b6e 100644 --- a/src/Benchmark/Benchmark.csproj +++ b/src/Benchmark/Benchmark.csproj @@ -2,7 +2,7 @@ Exe - net9.0;net8.0 + net8.0 enable enable diff --git a/src/Benchmark/Program.cs b/src/Benchmark/Program.cs index d7bd483..73da8b6 100644 --- a/src/Benchmark/Program.cs +++ b/src/Benchmark/Program.cs @@ -1,4 +1,4 @@ using Benchmark; using BenchmarkDotNet.Running; -BenchmarkRunner.Run(); \ No newline at end of file +BenchmarkRunner.Run(); diff --git a/src/Fluss.HotChocolate/AddExtensionMiddleware.cs b/src/Fluss.HotChocolate/AddExtensionMiddleware.cs index e4d3fc1..484c70f 100644 --- a/src/Fluss.HotChocolate/AddExtensionMiddleware.cs +++ b/src/Fluss.HotChocolate/AddExtensionMiddleware.cs @@ -142,8 +142,8 @@ private static async ValueTask WaitForChange(IServiceProvider serviceProvi var cancellationTokenSource = new CancellationTokenSource(); - var latestPersistedEventVersion = currentEventListener.Min(el => el.Tag.LastSeen); - var latestTransientEventVersion = currentEventListener.Min(el => el.Tag.LastSeenTransient); + var latestPersistedEventVersion = currentEventListener.Min(el => el.LastSeenEvent); + var latestTransientEventVersion = currentEventListener.Min(el => el.LastSeenTransientEvent); var persistedEventTask = Task.Run(async () => { @@ -156,7 +156,7 @@ private static async ValueTask WaitForChange(IServiceProvider serviceProvi var eventListener = currentEventListener[index]; var updatedEventListener = await eventListenerFactory.UpdateTo(eventListener, latestPersistedEventVersion); - if (updatedEventListener.Tag.LastAccepted > eventListener.Tag.LastAccepted) + if (updatedEventListener.LastAcceptedEvent > eventListener.LastAcceptedEvent) { return; } diff --git a/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs index c11e431..ae760ef 100644 --- a/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs +++ b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs @@ -27,7 +27,7 @@ public PostgreSQLEventRepository(PostgreSQLConfig config) dataSource = dataSourceBuilder.Build(); } - private async ValueTask Publish(IEnumerable envelopes, Func eventExtractor, + private async ValueTask Publish(IReadOnlyList envelopes, Func eventExtractor, NpgsqlConnection? conn = null) where TEnvelope : Envelope { using var activity = ActivitySource.Source.StartActivity(); @@ -81,7 +81,7 @@ private async ValueTask Publish(IEnumerable envelopes, Fun NotifyNewEvents(); } - public async ValueTask Publish(IEnumerable envelopes) + public async ValueTask Publish(IReadOnlyList envelopes) { await Publish(envelopes, e => e.Event); } diff --git a/src/Fluss.Testing/AggregateTestBed.cs b/src/Fluss.Testing/AggregateTestBed.cs index 19a340f..8e75bf6 100644 --- a/src/Fluss.Testing/AggregateTestBed.cs +++ b/src/Fluss.Testing/AggregateTestBed.cs @@ -16,12 +16,12 @@ namespace Fluss.Testing; public AggregateTestBed() { var validator = new Mock(); - validator.Setup(v => v.ValidateEvent(It.IsAny(), It.IsAny?>())) + validator.Setup(v => v.ValidateEvent(It.IsAny(), It.IsAny())) .Returns?>((_, _) => Task.CompletedTask); validator.Setup(v => v.ValidateAggregate(It.IsAny(), It.IsAny())) .Returns((_, _) => Task.CompletedTask); - _unitOfWork = new UnitOfWork(EventRepository, EventListenerFactory, [new AllowAllPolicy()], + _unitOfWork = UnitOfWork.Create(EventRepository, EventListenerFactory, [new AllowAllPolicy()], new UserIdProvider(_ => Guid.Empty, null!), validator.Object); } diff --git a/src/Fluss.Testing/EventRepositoryTestBase.cs b/src/Fluss.Testing/EventRepositoryTestBase.cs index af42303..5a402eb 100644 --- a/src/Fluss.Testing/EventRepositoryTestBase.cs +++ b/src/Fluss.Testing/EventRepositoryTestBase.cs @@ -37,8 +37,8 @@ public async Task ReturnsPublishedEvents() public async Task ReturnsMultiplePublishedEvents() { var envelopes = GetMockEnvelopes(0, 1).ToList(); - await Repository.Publish(envelopes.Take(1)); - await Repository.Publish(envelopes.Skip(1)); + await Repository.Publish(envelopes.Take(1).ToList()); + await Repository.Publish(envelopes.Skip(1).ToList()); var gottenEnvelopes = await Repository.GetEvents(-1, 1).ToFlatEventList(); diff --git a/src/Fluss.Testing/EventTestBed.cs b/src/Fluss.Testing/EventTestBed.cs index 890467c..373f845 100644 --- a/src/Fluss.Testing/EventTestBed.cs +++ b/src/Fluss.Testing/EventTestBed.cs @@ -22,7 +22,7 @@ public virtual EventTestBed WithEvents(params Event[] events) At = DateTimeOffset.Now, By = null, Event = @event - })).AsTask().Wait(); + }).ToList()).AsTask().Wait(); return this; } diff --git a/src/Fluss.Testing/ReadModelTestBed.cs b/src/Fluss.Testing/ReadModelTestBed.cs index 4b2904f..4805cd6 100644 --- a/src/Fluss.Testing/ReadModelTestBed.cs +++ b/src/Fluss.Testing/ReadModelTestBed.cs @@ -11,7 +11,7 @@ public class ReadModelTestBed : EventTestBed var eventSourced = EventListenerFactory .UpdateTo(new TReadModel(), EventRepository.GetLatestVersion().AsTask().Result).AsTask().Result; - Assert.Equal(readModel with { Tag = eventSourced.Tag }, eventSourced); + Assert.Equal(readModel, eventSourced); AssertReadModelDoesNotReactToCanary(eventSourced); @@ -24,7 +24,7 @@ public ReadModelTestBed ResultsIn(TReadModel readModel) var eventSourced = EventListenerFactory .UpdateTo(new TReadModel { Id = readModel.Id }, EventRepository.GetLatestVersion().AsTask().Result).AsTask().Result; - Assert.Equal(readModel with { Tag = eventSourced.Tag }, eventSourced); + Assert.Equal(readModel, eventSourced); AssertReadModelDoesNotReactToCanary(eventSourced); diff --git a/src/Fluss.UnitTest/Core/Authentication/AuthContextTest.cs b/src/Fluss.UnitTest/Core/Authentication/AuthContextTest.cs index fcb7e5d..c7cd976 100644 --- a/src/Fluss.UnitTest/Core/Authentication/AuthContextTest.cs +++ b/src/Fluss.UnitTest/Core/Authentication/AuthContextTest.cs @@ -13,7 +13,7 @@ public class AuthContextTest public AuthContextTest() { _unitOfWork = new Mock(); - _authContext = new AuthContext(_unitOfWork.Object, Guid.NewGuid()); + _authContext = AuthContext.Get(_unitOfWork.Object, Guid.NewGuid()); } [Fact] diff --git a/src/Fluss.UnitTest/Core/Events/InMemoryListenerCacheTest.cs b/src/Fluss.UnitTest/Core/Events/InMemoryListenerCacheTest.cs index 14d171d..1f3dd25 100644 --- a/src/Fluss.UnitTest/Core/Events/InMemoryListenerCacheTest.cs +++ b/src/Fluss.UnitTest/Core/Events/InMemoryListenerCacheTest.cs @@ -40,7 +40,7 @@ public async Task PassesUpdatesToNext() public async Task ReturnsCachedEventListener() { _baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny(), 100)) - .Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) })); + .Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 })); _ = await _listenerCache.UpdateTo(new TestEventListener(), 100); _ = await _listenerCache.UpdateTo(new TestEventListener(), 100); @@ -58,7 +58,7 @@ public async Task ReturnsCachedEventListener() public async Task ReturnsCachedKeyedEventListener() { _baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny(), 100)) - .Returns(ValueTask.FromResult(new KeyedTestEventListener { Id = 1, Tag = new EventListenerVersionTag(100) })); + .Returns(ValueTask.FromResult(new KeyedTestEventListener { Id = 1, LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 })); await _listenerCache.UpdateTo(new KeyedTestEventListener { Id = 1 }, 100); await _listenerCache.UpdateTo(new KeyedTestEventListener { Id = 1 }, 100); @@ -76,9 +76,9 @@ public async Task ReturnsCachedKeyedEventListener() public async Task ForwardsIfCacheContainsNewer() { _baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny(), 100)) - .Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) })); + .Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 })); _baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny(), 90)) - .Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(90) })); + .Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 90, LastSeenEvent = 90, LastSeenTransientEvent = -1 })); await _listenerCache.UpdateTo(new TestEventListener(), 100); await _listenerCache.UpdateTo(new TestEventListener(), 90); @@ -106,9 +106,9 @@ public async Task UpdatesStoreWithNewerVersion() var otherTestEventList = new TestEventListener(); _baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny(), 100)) - .Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) })); + .Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 })); _baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny(), 110)) - .Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(110) })); + .Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 110, LastSeenEvent = 110, LastSeenTransientEvent = -1 })); await _listenerCache.UpdateTo(testEventListener, 100); @@ -128,7 +128,7 @@ public async Task UpdatesStoreWithNewerVersion() public async Task ForwardsAgainIfCleaned() { _baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny(), 100)) - .Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) })); + .Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 })); await _listenerCache.UpdateTo(new TestEventListener(), 100); _listenerCache.Clean(); diff --git a/src/Fluss.UnitTest/Core/TransientEvents/TransientEventAwareEventListenerFactoryTest.cs b/src/Fluss.UnitTest/Core/TransientEvents/TransientEventAwareEventListenerFactoryTest.cs index fa40e32..432f238 100644 --- a/src/Fluss.UnitTest/Core/TransientEvents/TransientEventAwareEventListenerFactoryTest.cs +++ b/src/Fluss.UnitTest/Core/TransientEvents/TransientEventAwareEventListenerFactoryTest.cs @@ -39,7 +39,7 @@ public async Task AppliesTransientEvents() var updatedReadModel = await _transientFactory.UpdateTo(readModel, -1); - Assert.Equal(0, updatedReadModel.Tag.LastSeenTransient); + Assert.Equal(0, updatedReadModel.LastSeenTransientEvent); } } diff --git a/src/Fluss.UnitTest/Core/UnitOfWork/UnitOfWorkAndAuthorizationTest.cs b/src/Fluss.UnitTest/Core/UnitOfWork/UnitOfWorkAndAuthorizationTest.cs index bd33ca8..6e12393 100644 --- a/src/Fluss.UnitTest/Core/UnitOfWork/UnitOfWorkAndAuthorizationTest.cs +++ b/src/Fluss.UnitTest/Core/UnitOfWork/UnitOfWorkAndAuthorizationTest.cs @@ -11,13 +11,14 @@ public async Task DoesNotReuseCacheWhenNewEventIsAdded() { _policies.Add(new AllowReadAfterEventPolicy()); + var unitOfWork = GetUnitOfWork(); await Assert.ThrowsAsync(async () => { - await _unitOfWork.GetReadModel(1); + await unitOfWork.GetReadModel(1); }); - await _unitOfWork.Publish(new AllowEvent()); - await _unitOfWork.GetReadModel(1); + await unitOfWork.Publish(new AllowEvent()); + await unitOfWork.GetReadModel(1); } private record AllowEvent : Event; diff --git a/src/Fluss.UnitTest/Core/UnitOfWork/UnitOfWorkTest.cs b/src/Fluss.UnitTest/Core/UnitOfWork/UnitOfWorkTest.cs index e27c188..53dd938 100644 --- a/src/Fluss.UnitTest/Core/UnitOfWork/UnitOfWorkTest.cs +++ b/src/Fluss.UnitTest/Core/UnitOfWork/UnitOfWorkTest.cs @@ -12,28 +12,20 @@ public partial class UnitOfWorkTest { private readonly InMemoryEventRepository _eventRepository; private readonly List _policies; - private readonly Fluss.UnitOfWork _unitOfWork; private readonly UnitOfWorkFactory _unitOfWorkFactory; + private Mock _validator; public UnitOfWorkTest() { _eventRepository = new InMemoryEventRepository(); _policies = []; - Mock validator = new(MockBehavior.Strict); - validator.Setup(v => v.ValidateEvent(It.IsAny(), It.IsAny?>())) - .Returns?>((_, _) => Task.CompletedTask); - validator.Setup(v => v.ValidateAggregate(It.IsAny(), It.IsAny())) + _validator = new(MockBehavior.Strict); + _validator.Setup(v => v.ValidateEvent(It.IsAny(), It.IsAny())) + .Returns((_, _) => Task.CompletedTask); + _validator.Setup(v => v.ValidateAggregate(It.IsAny(), It.IsAny())) .Returns((_, _) => Task.CompletedTask); - _unitOfWork = new Fluss.UnitOfWork( - _eventRepository, - new EventListenerFactory(_eventRepository), - _policies, - new UserIdProvider(_ => Guid.NewGuid(), null!), - validator.Object - ); - _eventRepository.Publish([ new EventEnvelope { Event = new TestEvent(1), Version = 0 }, new EventEnvelope { Event = new TestEvent(2), Version = 1 }, @@ -42,23 +34,34 @@ public UnitOfWorkTest() _unitOfWorkFactory = new UnitOfWorkFactory( new ServiceCollection() - .AddScoped(_ => _unitOfWork) + .AddScoped(_ => GetUnitOfWork()) .BuildServiceProvider()); } + private Fluss.UnitOfWork GetUnitOfWork() + { + return Fluss.UnitOfWork.Create( + _eventRepository, + new EventListenerFactory(_eventRepository), + _policies, + new UserIdProvider(_ => Guid.NewGuid(), null!), + _validator.Object + ); + } + [Fact] public async Task CanGetConsistentVersion() { - Assert.Equal(2, await _unitOfWork.ConsistentVersion()); + Assert.Equal(2, await GetUnitOfWork().ConsistentVersion()); } [Fact] public async Task CanGetAggregate() { - var existingAggregate = await _unitOfWork.GetAggregate(1); + var existingAggregate = await GetUnitOfWork().GetAggregate(1); Assert.True(existingAggregate.Exists); - var notExistingAggregate = await _unitOfWork.GetAggregate(100); + var notExistingAggregate = await GetUnitOfWork().GetAggregate(100); Assert.False(notExistingAggregate.Exists); } @@ -66,9 +69,10 @@ public async Task CanGetAggregate() public async Task CanPublish() { _policies.Add(new AllowAllPolicy()); - var notExistingAggregate = await _unitOfWork.GetAggregate(100); + var unitOfWork = GetUnitOfWork(); + var notExistingAggregate = await unitOfWork.GetAggregate(100); await notExistingAggregate.Create(); - await _unitOfWork.CommitInternal(); + await unitOfWork.CommitInternal(); var latestVersion = await _eventRepository.GetLatestVersion(); var newEvent = await _eventRepository.GetEvents(latestVersion - 1, latestVersion).ToFlatEventList(); @@ -80,9 +84,9 @@ public async Task ThrowsWhenPublishNotAllowed() { await Assert.ThrowsAsync(async () => { - var notExistingAggregate = await _unitOfWork.GetAggregate(100); + var notExistingAggregate = await GetUnitOfWork().GetAggregate(100); await notExistingAggregate.Create(); - await _unitOfWork.CommitInternal(); + await GetUnitOfWork().CommitInternal(); }); } @@ -90,9 +94,10 @@ await Assert.ThrowsAsync(async () => public async Task CanGetAggregateTwice() { _policies.Add(new AllowAllPolicy()); - var notExistingAggregate = await _unitOfWork.GetAggregate(100); + var unitOfWork = GetUnitOfWork(); + var notExistingAggregate = await unitOfWork.GetAggregate(100); await notExistingAggregate.Create(); - var existingAggregate = await _unitOfWork.GetAggregate(100); + var existingAggregate = await unitOfWork.GetAggregate(100); Assert.True(existingAggregate.Exists); } @@ -101,14 +106,14 @@ public async Task CanGetRootReadModel() { _policies.Add(new AllowAllPolicy()); - var rootReadModel = await _unitOfWork.GetReadModel(); + var rootReadModel = await GetUnitOfWork().GetReadModel(); Assert.Equal(3, rootReadModel.GotEvents); } [Fact] public async Task CanGetRootReadModelUnsafe() { - var rootReadModel = await _unitOfWork.UnsafeGetReadModelWithoutAuthorization(); + var rootReadModel = await GetUnitOfWork().UnsafeGetReadModelWithoutAuthorization(); Assert.Equal(3, rootReadModel.GotEvents); } @@ -117,7 +122,7 @@ public async Task ThrowsWhenRootReadModelNotAuthorized() { await Assert.ThrowsAsync(async () => { - await _unitOfWork.GetReadModel(); + await GetUnitOfWork().GetReadModel(); }); } @@ -126,14 +131,14 @@ public async Task CanGetReadModel() { _policies.Add(new AllowAllPolicy()); - var readModel = await _unitOfWork.GetReadModel(1); + var readModel = await GetUnitOfWork().GetReadModel(1); Assert.Equal(2, readModel.GotEvents); } [Fact] public async Task CanGetReadModelUnsafe() { - var readModel = await _unitOfWork.UnsafeGetReadModelWithoutAuthorization(1); + var readModel = await GetUnitOfWork().UnsafeGetReadModelWithoutAuthorization(1); Assert.Equal(2, readModel.GotEvents); } @@ -142,7 +147,7 @@ public async Task ThrowsWhenReadModelNotAuthorized() { await Assert.ThrowsAsync(async () => { - await _unitOfWork.GetReadModel(1); + await GetUnitOfWork().GetReadModel(1); }); } @@ -151,28 +156,30 @@ public async Task CanGetMultipleReadModels() { _policies.Add(new AllowAllPolicy()); - var readModels = await _unitOfWork.GetMultipleReadModels([1, 2]); + + var unitOfWork = GetUnitOfWork(); + var readModels = await unitOfWork.GetMultipleReadModels([1, 2]); Assert.Equal(2, readModels[0].GotEvents); Assert.Equal(1, readModels[1].GotEvents); Assert.Equal(2, readModels.Count); - Assert.Equal(2, _unitOfWork.ReadModels.Count); - Assert.Contains(_unitOfWork.ReadModels, rm => rm == readModels[0]); - Assert.Contains(_unitOfWork.ReadModels, rm => rm == readModels[1]); + Assert.Equal(2, unitOfWork.ReadModels.Count); + Assert.Contains(unitOfWork.ReadModels, rm => rm == readModels[0]); + Assert.Contains(unitOfWork.ReadModels, rm => rm == readModels[1]); } [Fact] public async Task ReturnsNothingWhenMultipleReadModelNotAuthorized() { - var readModels = await _unitOfWork.GetMultipleReadModels([1, 2]); + var readModels = await GetUnitOfWork().GetMultipleReadModels([1, 2]); Assert.Equal(0, readModels.Count(rm => rm != null)); } [Fact] public async Task CanGetMultipleReadModelsUnsafe() { - var readModels = await _unitOfWork.UnsafeGetMultipleReadModelsWithoutAuthorization([1, 2]); + var readModels = await GetUnitOfWork().UnsafeGetMultipleReadModelsWithoutAuthorization([1, 2]); Assert.Equal(2, readModels[0].GotEvents); Assert.Equal(1, readModels[1].GotEvents); Assert.Equal(2, readModels.Count); diff --git a/src/Fluss.UnitTest/Core/Validation/RootValidatorTests.cs b/src/Fluss.UnitTest/Core/Validation/RootValidatorTests.cs index 783982b..5e1eb54 100644 --- a/src/Fluss.UnitTest/Core/Validation/RootValidatorTests.cs +++ b/src/Fluss.UnitTest/Core/Validation/RootValidatorTests.cs @@ -29,7 +29,7 @@ [new AggregateValidatorAlwaysValid()], [new EventValidatorAlwaysValid()] ); - await validator.ValidateEvent(new EventEnvelope { Event = new TestEvent() }); + await validator.ValidateEvent(_unitOfWorkMock.Object, new EventEnvelope { Event = new TestEvent() }); } [Fact] @@ -43,7 +43,7 @@ [new EventValidatorAlwaysInvalid()] await Assert.ThrowsAsync(async () => { - await validator.ValidateEvent(new EventEnvelope { Event = new TestEvent() }); + await validator.ValidateEvent(_unitOfWorkMock.Object, new EventEnvelope { Event = new TestEvent() }); }); } @@ -56,7 +56,7 @@ [new AggregateValidatorAlwaysValid()], [new EventValidatorAlwaysValid()] ); - await validator.ValidateAggregate(new TestAggregate(), new Fluss.UnitOfWork(null!, null!, null!, null!, null!)); + await validator.ValidateAggregate(new TestAggregate(), Fluss.UnitOfWork.Create(null!, null!, [], null!, null!)); } [Fact] @@ -70,7 +70,7 @@ [new EventValidatorAlwaysValid()] await Assert.ThrowsAsync(async () => { - await validator.ValidateAggregate(new TestAggregate(), new Fluss.UnitOfWork(null!, null!, null!, null!, null!)); + await validator.ValidateAggregate(new TestAggregate(), Fluss.UnitOfWork.Create(null!, null!, [], null!, null!)); }); } diff --git a/src/Fluss.sln.DotSettings.user b/src/Fluss.sln.DotSettings.user deleted file mode 100644 index 448c27b..0000000 --- a/src/Fluss.sln.DotSettings.user +++ /dev/null @@ -1,15 +0,0 @@ - - ForceIncluded - ForceIncluded - C:\Users\Enterprize1\AppData\Local\JetBrains\Rider2024.2\resharper-host\temp\Rider\vAny\CoverageData\_Fluss.-842573491\Snapshot\snapshot.utdcvr - - <SessionState ContinuousTestingMode="0" IsActive="True" Name="Session" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"> - <Project Location="C:\Users\Enterprize1\Code\fluss\src\Fluss.UnitTest" Presentation="&lt;Fluss.UnitTest&gt;" /> -</SessionState> - DoNothing - - - - - - True \ No newline at end of file diff --git a/src/Fluss/Authentication/ArbitraryUserUnitOfWorkExtension.cs b/src/Fluss/Authentication/ArbitraryUserUnitOfWorkExtension.cs index a7f5a06..399d1a5 100644 --- a/src/Fluss/Authentication/ArbitraryUserUnitOfWorkExtension.cs +++ b/src/Fluss/Authentication/ArbitraryUserUnitOfWorkExtension.cs @@ -1,4 +1,6 @@ using System.Collections.Concurrent; +using Fluss.Events; +using Fluss.Validation; using Microsoft.Extensions.DependencyInjection; namespace Fluss.Authentication; @@ -27,23 +29,25 @@ public IUnitOfWork GetUserUnitOfWork(Guid userId) private IServiceProvider GetCachedServiceProvider(Guid userId) { - return _cache.GetOrAdd(userId, CreateUserServiceProvider); + return _cache.TryGetValue(userId, out var value) ? value : _cache.GetOrAdd(userId, CreateUserServiceProvider); } private IServiceProvider CreateUserServiceProvider(Guid providedId) { var collection = new ServiceCollection(); - var constructorArgumentTypes = typeof(UnitOfWork).GetConstructors().Single().GetParameters() - .Select(p => p.ParameterType); - foreach (var type in constructorArgumentTypes) - { - if (type == typeof(UserIdProvider)) continue; - collection.AddSingleton(type, serviceProvider.GetRequiredService(type)); - } + collection.AddSingleton(_ => serviceProvider.GetRequiredService()); + collection.AddSingleton(_ => serviceProvider.GetRequiredService()); + collection.AddSingleton>(_ => serviceProvider.GetRequiredService>()); + collection.AddSingleton(_ => serviceProvider.GetRequiredService()); collection.ProvideUserIdFrom(_ => providedId); - collection.AddTransient(); + collection.AddTransient(sp => UnitOfWork.Create( + sp.GetRequiredService(), + sp.GetRequiredService(), + sp.GetServices(), + sp.GetRequiredService(), + sp.GetRequiredService())); collection.AddTransient(sp => sp.GetRequiredService()); collection.AddTransient(); diff --git a/src/Fluss/Authentication/AuthContext.cs b/src/Fluss/Authentication/AuthContext.cs index a1d01e1..0058d51 100644 --- a/src/Fluss/Authentication/AuthContext.cs +++ b/src/Fluss/Authentication/AuthContext.cs @@ -1,5 +1,6 @@ using Fluss.Events; using Fluss.ReadModel; +using Microsoft.Extensions.ObjectPool; namespace Fluss.Authentication; @@ -20,14 +21,34 @@ public ValueTask> public Guid UserId { get; } } -internal class AuthContext(IUnitOfWork unitOfWork, Guid userId) : IAuthContext +internal class AuthContext : IAuthContext { - private readonly Dictionary Data = new(); - public Guid UserId { get; private set; } = userId; + private static readonly ObjectPool Pool = new DefaultObjectPool(new DefaultPooledObjectPolicy()); + + internal static AuthContext Get(IUnitOfWork unitOfWork, Guid userId) + { + var authContext = Pool.Get(); + authContext._unitOfWork = unitOfWork; + authContext._userId = userId; + return authContext; + } + + private IUnitOfWork _unitOfWork = null!; + private readonly Dictionary _data = new(); + private Guid? _userId; + public Guid UserId + { + get + { + EnsureInitialized(); + return _userId!.Value; + } + } public async ValueTask CacheAndGet(string key, Func> func) { - var o = Data.GetValueOrDefault(key); + EnsureInitialized(); + var o = _data.GetValueOrDefault(key); switch (o) { @@ -38,9 +59,9 @@ public async ValueTask CacheAndGet(string key, Func> func) } var newTask = func(); - Data[key] = newTask; + _data[key] = newTask; var result = await newTask; - Data[key] = result!; + _data[key] = result!; return result; } @@ -48,19 +69,39 @@ public async ValueTask CacheAndGet(string key, Func> func) public ValueTask GetReadModel() where TReadModel : EventListener, IRootEventListener, IReadModel, new() { - return unitOfWork.UnsafeGetReadModelWithoutAuthorization(); + EnsureInitialized(); + return _unitOfWork.UnsafeGetReadModelWithoutAuthorization(); } public ValueTask GetReadModel(TKey key) where TReadModel : EventListener, IEventListenerWithKey, IReadModel, new() { - return unitOfWork.UnsafeGetReadModelWithoutAuthorization(key); + EnsureInitialized(); + return _unitOfWork.UnsafeGetReadModelWithoutAuthorization(key); } public ValueTask> GetMultipleReadModels(IEnumerable keys) where TKey : notnull where TReadModel : EventListener, IReadModel, IEventListenerWithKey, new() { - return unitOfWork.UnsafeGetMultipleReadModelsWithoutAuthorization(keys); + EnsureInitialized(); + return _unitOfWork.UnsafeGetMultipleReadModelsWithoutAuthorization(keys); + } + + private void EnsureInitialized() + { + if (_unitOfWork == null || _userId == null) + { + throw new InvalidOperationException("AuthContext is uninitialized"); + } + } + + internal void Return() + { + _userId = null; + _unitOfWork = null!; + _data.Clear(); + + Pool.Return(this); } } diff --git a/src/Fluss/Events/EventEnvelope.cs b/src/Fluss/Events/EventEnvelope.cs index a1e1822..d464065 100644 --- a/src/Fluss/Events/EventEnvelope.cs +++ b/src/Fluss/Events/EventEnvelope.cs @@ -1,3 +1,4 @@ +using System.Runtime.InteropServices; using Newtonsoft.Json.Linq; namespace Fluss.Events; @@ -10,7 +11,7 @@ public abstract record Envelope public Guid? By { get; init; } } -public record RawEventEnvelope : Envelope +public sealed record RawEventEnvelope : Envelope { public required JObject RawEvent { get; init; } } diff --git a/src/Fluss/Events/EventListener.cs b/src/Fluss/Events/EventListener.cs index ad289dc..343c24d 100644 --- a/src/Fluss/Events/EventListener.cs +++ b/src/Fluss/Events/EventListener.cs @@ -4,16 +4,22 @@ namespace Fluss.Events; public abstract record EventListener { - internal EventListenerVersionTag Tag = new(-1); + /// Last Event that was consumed by this EventListener + internal long LastSeenEvent = -1; + /// Last Event that mutated this EventListener + internal long LastAcceptedEvent = -1; + /// Last TransientEvent that was consumed by this EventListener + internal long LastSeenTransientEvent = -1; + protected abstract EventListener When(EventEnvelope envelope); internal EventListener WhenInt(EventEnvelope envelope) { #if DEBUG - if (envelope.Version != Tag.LastSeen + 1 && envelope is not TransientEventEnvelope) + if (envelope.Version != LastSeenEvent + 1 && envelope is not TransientEventEnvelope) { throw new InvalidOperationException( - $"Event envelope version {envelope.Version} is not the next expected version {Tag.LastSeen + 1}."); + $"Event envelope version {envelope.Version} is not the next expected version {LastSeenEvent + 1}."); } #endif @@ -23,29 +29,24 @@ internal EventListener WhenInt(EventEnvelope envelope) if (envelope.Event is TransientEvent) { - if (newEventListener.Tag.HasTaint()) - { - newEventListener.Tag.LastSeenTransient = envelope.Version; - } - else - { - newEventListener = newEventListener with { Tag = Tag with { LastSeenTransient = envelope.Version } }; - } - + newEventListener.LastSeenTransientEvent = envelope.Version; return newEventListener; } if (changed) { - newEventListener = newEventListener with { Tag = new EventListenerVersionTag(envelope.Version) }; - } - else - { - newEventListener.Tag.LastSeen = envelope.Version; + newEventListener.LastAcceptedEvent = envelope.Version; } + newEventListener.LastSeenEvent = envelope.Version; + return newEventListener; } + + internal bool HasTaint() + { + return LastSeenTransientEvent > -1; + } } public record EventListenerVersionTag diff --git a/src/Fluss/Events/EventListenerFactory.cs b/src/Fluss/Events/EventListenerFactory.cs index f35350e..e85c8b8 100644 --- a/src/Fluss/Events/EventListenerFactory.cs +++ b/src/Fluss/Events/EventListenerFactory.cs @@ -7,7 +7,7 @@ public sealed class EventListenerFactory(IEventRepository eventRepository) : IEv { public async ValueTask UpdateTo(TEventListener eventListener, long to) where TEventListener : EventListener { - var events = await eventRepository.GetEvents(eventListener.Tag.LastSeen, to); + var events = await eventRepository.GetEvents(eventListener.LastSeenEvent, to); return UpdateWithEvents(eventListener, events); } diff --git a/src/Fluss/Events/EventMemoryArrayExtensions.cs b/src/Fluss/Events/EventMemoryArrayExtensions.cs index 39d47b6..cd131cb 100644 --- a/src/Fluss/Events/EventMemoryArrayExtensions.cs +++ b/src/Fluss/Events/EventMemoryArrayExtensions.cs @@ -1,26 +1,26 @@ using System.Collections.ObjectModel; +using Collections.Pooled; namespace Fluss.Events; public static class EventMemoryArrayExtensions { - public static ReadOnlyCollection> ToPagedMemory(this List envelopes) where T : EventEnvelope + private static readonly ReadOnlyCollection> EmptyPagedMemory = new([]); + public static ReadOnlyCollection> ToPagedMemory(this IReadOnlyList envelopes) { - if (envelopes is List casted) + if (envelopes.Count == 0) { - return new[] { - casted.ToArray().AsMemory().AsReadOnly() - }.AsReadOnly(); + return EmptyPagedMemory.AsReadOnly(); } return new[] { - envelopes.Cast().ToArray().AsMemory().AsReadOnly() + envelopes.ToArray().AsMemory().AsReadOnly() }.AsReadOnly(); } - public static IReadOnlyList ToFlatEventList(this ReadOnlyCollection> pagedMemory) + public static PooledList ToFlatEventList(this ReadOnlyCollection> pagedMemory) { - var result = new List(); + var result = new PooledList(); foreach (var memory in pagedMemory) { @@ -30,7 +30,7 @@ public static IReadOnlyList ToFlatEventList(this ReadOnlyCollecti return result; } - public static async ValueTask> ToFlatEventList(this ValueTask>> pagedMemory) + public static async ValueTask> ToFlatEventList(this ValueTask>> pagedMemory) { return (await pagedMemory).ToFlatEventList(); } diff --git a/src/Fluss/Events/EventRepository.cs b/src/Fluss/Events/EventRepository.cs index 9b7283f..0bef2a4 100644 --- a/src/Fluss/Events/EventRepository.cs +++ b/src/Fluss/Events/EventRepository.cs @@ -5,7 +5,7 @@ namespace Fluss.Events; public interface IEventRepository { event EventHandler NewEvents; - ValueTask Publish(IEnumerable events); + ValueTask Publish(IReadOnlyList events); ValueTask>> GetEvents(long fromExclusive, long toInclusive); ValueTask> GetRawEvents(); ValueTask ReplaceEvent(long version, IEnumerable newEvents); @@ -45,7 +45,7 @@ public virtual event EventHandler NewEvents remove => Next.NewEvents -= value; } - public virtual ValueTask Publish(IEnumerable events) + public virtual ValueTask Publish(IReadOnlyList events) { return Next.Publish(events); } diff --git a/src/Fluss/Events/InMemoryEventCache.cs b/src/Fluss/Events/InMemoryEventCache.cs index 513fcc0..ab3a5af 100644 --- a/src/Fluss/Events/InMemoryEventCache.cs +++ b/src/Fluss/Events/InMemoryEventCache.cs @@ -70,18 +70,17 @@ private async Task EnsureEventsLoaded(long to) } } - public override async ValueTask Publish(IEnumerable events) + public override async ValueTask Publish(IReadOnlyList events) { using var activity = FlussActivitySource.Source.StartActivity(); activity?.SetTag("EventSourcing.EventRepository", nameof(InMemoryEventCache)); - var eventEnvelopes = events.ToList(); - await base.Publish(eventEnvelopes); + await base.Publish(events); await _loadLock.WaitAsync(); try { - AddEvents(new[] { eventEnvelopes.ToArray().AsMemory().AsReadOnly() }.AsReadOnly()); + AddEvents(events.ToPagedMemory()); } finally { diff --git a/src/Fluss/Events/InMemoryEventListenerCache.cs b/src/Fluss/Events/InMemoryEventListenerCache.cs index 7ea2059..3aa1310 100644 --- a/src/Fluss/Events/InMemoryEventListenerCache.cs +++ b/src/Fluss/Events/InMemoryEventListenerCache.cs @@ -10,13 +10,13 @@ public override async ValueTask UpdateTo(TEventL { var cached = Retrieve(eventListener, to); - if (cached.Tag.LastSeen == to) + if (cached.LastSeenEvent == to) { return cached; } var newEventListener = await Next.UpdateTo(cached, to); - if (!newEventListener.Tag.HasTaint()) + if (!newEventListener.HasTaint()) { Store(newEventListener); } @@ -35,12 +35,12 @@ private TEventListener Retrieve(TEventListener eventListener, lo } var cachedEntry = (TEventListener)cached; - if (cachedEntry.Tag.LastAccepted > before) + if (cachedEntry.LastAcceptedEvent > before) { return eventListener; } - return CloneTag(cachedEntry); + return cachedEntry; } private void Store(TEventListener newEventListener) where TEventListener : EventListener @@ -54,27 +54,19 @@ private void Store(TEventListener newEventListener) where TEvent } var cachedEntry = (TEventListener)cached; - if (newEventListener.Tag.LastSeen <= cachedEntry.Tag.LastSeen) + if (newEventListener.LastSeenEvent <= cachedEntry.LastSeenEvent) { return; } - _cache.Set(key, CloneTag(newEventListener), new MemoryCacheEntryOptions { Size = 1 }); - } - - private TEventListener CloneTag(TEventListener eventListener) - where TEventListener : EventListener - { - return eventListener with { Tag = eventListener.Tag with { } }; + _cache.Set(key, newEventListener, new MemoryCacheEntryOptions { Size = 1 }); } private object GetKey(EventListener eventListener) { - if (eventListener.GetType().GetInterfaces().Any(x => - x.IsGenericType && - x.GetGenericTypeDefinition() == typeof(IEventListenerWithKey<>))) + if (eventListener is IEventListenerWithKey eventListenerWithKey) { - return (eventListener.GetType(), eventListener.GetType().GetProperty("Id")?.GetValue(eventListener)); + return (eventListener.GetType(), eventListenerWithKey.Id); } return eventListener.GetType(); diff --git a/src/Fluss/Events/InMemoryEventRepository.cs b/src/Fluss/Events/InMemoryEventRepository.cs index be6db6e..aa423e4 100644 --- a/src/Fluss/Events/InMemoryEventRepository.cs +++ b/src/Fluss/Events/InMemoryEventRepository.cs @@ -1,15 +1,16 @@ using System.Collections.ObjectModel; +using Collections.Pooled; using Fluss.Exceptions; using Newtonsoft.Json; namespace Fluss.Events; -public class InMemoryEventRepository : IBaseEventRepository +public class InMemoryEventRepository : IBaseEventRepository, IDisposable { - private readonly List _events = []; + private readonly PooledList _events = []; public event EventHandler? NewEvents; - public ValueTask Publish(IEnumerable eventEnvelopes) + public ValueTask Publish(IReadOnlyList eventEnvelopes) { foreach (var eventEnvelope in eventEnvelopes) { @@ -98,4 +99,11 @@ private void NotifyNewEvents() NewEvents?.Invoke(this, EventArgs.Empty); }); } + + public void Dispose() + { + GC.SuppressFinalize(this); + + _events.Dispose(); + } } diff --git a/src/Fluss/Events/TransientEvents/TransientEventAwareEventRepository.cs b/src/Fluss/Events/TransientEvents/TransientEventAwareEventRepository.cs index dcac24f..31556ad 100644 --- a/src/Fluss/Events/TransientEvents/TransientEventAwareEventRepository.cs +++ b/src/Fluss/Events/TransientEvents/TransientEventAwareEventRepository.cs @@ -1,43 +1,75 @@ using System.Collections.ObjectModel; using System.Reflection; +using Collections.Pooled; namespace Fluss.Events.TransientEvents; -public sealed class TransientEventAwareEventRepository : EventRepositoryPipeline +public sealed class TransientEventAwareEventRepository : EventRepositoryPipeline, IDisposable { private readonly List _transientEvents = []; private long _transientEventVersion; - private bool _cleanTaskIsRunning; - private bool _anotherCleanTaskRequired; + private readonly Timer _timer; + private readonly object _lock = new(); + private static readonly TimeSpan CleanInterval = TimeSpan.FromMilliseconds(100); public event EventHandler? NewTransientEvents; + public TransientEventAwareEventRepository() + { + _timer = new Timer(CleanEvents, null, CleanInterval, CleanInterval); + } + public ReadOnlyCollection> GetCurrentTransientEvents() { - lock (this) + lock (_lock) { - var result = _transientEvents.ToPagedMemory(); - CleanEvents(); - return result; + return _transientEvents.ToPagedMemory(); } } - public override async ValueTask Publish(IEnumerable events) + public override async ValueTask Publish(IReadOnlyList events) { - var eventEnvelopes = events.ToList(); - if (eventEnvelopes.Count == 0) return; + if (events.Count == 0) return; - var transientEventEnvelopes = eventEnvelopes.Where(e => e.Event is TransientEvent); + using var transientEventEnvelopes = new PooledList(); + + foreach (var eventEnvelope in events) + { + if (eventEnvelope.Event is TransientEvent) + { + transientEventEnvelopes.Add(eventEnvelope); + } + } + + if (transientEventEnvelopes.Count == 0) + { + await base.Publish(events); + return; + } // Reset version of persisted events to ensure cache functionality using the first Version received as baseline // We can safely fall back to -1 here, since the value will not be used as no events are being published - var firstPersistedVersion = eventEnvelopes.FirstOrDefault()?.Version ?? -1; + var firstPersistedVersion = events.Count > 0 ? events[0].Version : -1; + + var regularEventEnvelopes = new List(); + + foreach (var eventEnvelope in events) + { + if (eventEnvelope.Event is TransientEvent) continue; - var persistedEventEnvelopes = eventEnvelopes - .Where(e => e.Event is not TransientEvent) - .Select((e, i) => e with { Version = firstPersistedVersion + i }); + var rightVersion = firstPersistedVersion + regularEventEnvelopes.Count; - await base.Publish(persistedEventEnvelopes); + if (eventEnvelope.Version != rightVersion) + { + regularEventEnvelopes.Add(eventEnvelope with { Version = rightVersion }); + } + else + { + regularEventEnvelopes.Add(eventEnvelope); + } + } + + await base.Publish(regularEventEnvelopes.ToArray()); PublishTransientEvents(transientEventEnvelopes); } @@ -48,7 +80,7 @@ private void PublishTransientEvents(IEnumerable events) var now = DateTimeOffset.Now; - lock (this) + lock (_lock) { var newEvents = eventList.Select(e => { @@ -77,44 +109,18 @@ private void PublishTransientEvents(IEnumerable events) // cancelled after 100ms will the transient events be cleared. This does not invalidate expired events as close // to their expiration time as possible however this is the lesser worry compared to not all listeners getting // all events before they are cleaned up. - private void CleanEvents() + private void CleanEvents(object? _) { - lock (this) - { - if (_cleanTaskIsRunning) - { - _anotherCleanTaskRequired = true; - return; - } - - // Starting the clean task after the lock - _cleanTaskIsRunning = true; - } + var cleanUntil = DateTimeOffset.Now - CleanInterval; - var now = DateTimeOffset.Now; - - // ReSharper disable once AsyncVoidLambda - new Task(async () => + lock (_lock) { - while (true) - { - await Task.Delay(100); + _transientEvents.RemoveAll(e => e.ExpiresAt < cleanUntil); + } + } - lock (this) - { - if (_anotherCleanTaskRequired) - { - _anotherCleanTaskRequired = false; - now = DateTimeOffset.Now; - } - else - { - _transientEvents.RemoveAll(e => e.ExpiresAt < now); - _cleanTaskIsRunning = false; - return; - } - } - } - }).Start(); + public void Dispose() + { + _timer.Dispose(); } } diff --git a/src/Fluss/Fluss.csproj b/src/Fluss/Fluss.csproj index 1bbda25..20b76db 100644 --- a/src/Fluss/Fluss.csproj +++ b/src/Fluss/Fluss.csproj @@ -10,10 +10,12 @@ + + diff --git a/src/Fluss/ServiceCollectionExtensions.cs b/src/Fluss/ServiceCollectionExtensions.cs index dbeb120..f8f2222 100644 --- a/src/Fluss/ServiceCollectionExtensions.cs +++ b/src/Fluss/ServiceCollectionExtensions.cs @@ -38,7 +38,12 @@ public static IServiceCollection AddEventSourcing(this IServiceCollection servic return eventListenerFactory; }) .AddSingleton() - .AddTransient() + .AddTransient(sp => UnitOfWork.Create( + sp.GetRequiredService(), + sp.GetRequiredService(), + sp.GetServices(), + sp.GetRequiredService(), + sp.GetRequiredService())) .AddTransient(sp => sp.GetRequiredService()) .AddTransient() .AddSingleton() diff --git a/src/Fluss/SideEffects/SideEffectDispatcher.cs b/src/Fluss/SideEffects/SideEffectDispatcher.cs index c353517..905b4e1 100644 --- a/src/Fluss/SideEffects/SideEffectDispatcher.cs +++ b/src/Fluss/SideEffects/SideEffectDispatcher.cs @@ -46,7 +46,7 @@ private async void HandleNewEvents(object? sender, EventArgs eventArgs) await _dispatchLock.WaitAsync(); var latestVersion = await _transientEventRepository.GetLatestVersion(); - var newEvents = await _transientEventRepository.GetEvents(_persistedVersion, latestVersion).ToFlatEventList(); + using var newEvents = await _transientEventRepository.GetEvents(_persistedVersion, latestVersion).ToFlatEventList(); try { @@ -63,7 +63,7 @@ private async void HandleNewEvents(object? sender, EventArgs eventArgs) private async void HandleNewTransientEvents(object? sender, EventArgs eventArgs) { // We want to fetch the events before hitting the lock to avoid missing events. - var currentEvents = _transientEventRepository.GetCurrentTransientEvents().ToFlatEventList(); + using var currentEvents = _transientEventRepository.GetCurrentTransientEvents().ToFlatEventList(); await _dispatchLock.WaitAsync(); var newEvents = currentEvents.Where(e => e.Version > _transientVersion); diff --git a/src/Fluss/UnitOfWork/IUnitOfWork.cs b/src/Fluss/UnitOfWork/IUnitOfWork.cs index abfb966..9e14389 100644 --- a/src/Fluss/UnitOfWork/IUnitOfWork.cs +++ b/src/Fluss/UnitOfWork/IUnitOfWork.cs @@ -4,11 +4,10 @@ namespace Fluss; -public interface IUnitOfWork +public interface IUnitOfWork : IDisposable { ValueTask ConsistentVersion(); IReadOnlyCollection ReadModels { get; } - ConcurrentQueue PublishedEventEnvelopes { get; } ValueTask GetReadModel(Type tReadModel, object? key, long? at = null); diff --git a/src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs b/src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs index 6ea222a..d71116e 100644 --- a/src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs +++ b/src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using Collections.Pooled; using Fluss.Aggregates; using Fluss.Events; // ReSharper disable LoopCanBeConvertedToQuery @@ -7,16 +8,17 @@ namespace Fluss; public partial class UnitOfWork { - public ConcurrentQueue PublishedEventEnvelopes { get; } = new(); + internal readonly PooledList PublishedEventEnvelopes = []; public async ValueTask GetAggregate() where TAggregate : AggregateRoot, new() { + EnsureInstantiated(); using var activity = FlussActivitySource.Source.StartActivity(); activity?.SetTag("EventSourcing.Aggregate", typeof(TAggregate).FullName); var aggregate = new TAggregate(); - aggregate = await _eventListenerFactory.UpdateTo(aggregate, await ConsistentVersion()); + aggregate = await _eventListenerFactory!.UpdateTo(aggregate, await ConsistentVersion()); aggregate = aggregate with { UnitOfWork = this }; @@ -31,12 +33,13 @@ public partial class UnitOfWork public async ValueTask GetAggregate(TKey key) where TAggregate : AggregateRoot, new() { + EnsureInstantiated(); using var activity = FlussActivitySource.Source.StartActivity(); activity?.SetTag("EventSourcing.Aggregate", typeof(TAggregate).FullName); var aggregate = new TAggregate { Id = key }; - aggregate = await _eventListenerFactory.UpdateTo(aggregate, await ConsistentVersion()); + aggregate = await _eventListenerFactory!.UpdateTo(aggregate, await ConsistentVersion()); aggregate = aggregate with { UnitOfWork = this }; @@ -57,7 +60,7 @@ public async ValueTask Publish(Event @event, AggregateRoot? aggregate = null) At = DateTimeOffset.UtcNow, By = CurrentUserId(), Event = @event, - Version = await ConsistentVersion() + PublishedEventEnvelopes.Count + 1 + Version = (_consistentVersion ?? await ConsistentVersion()) + PublishedEventEnvelopes.Count + 1 }; if (!await AuthorizeUsage(eventEnvelope)) @@ -66,13 +69,23 @@ public async ValueTask Publish(Event @event, AggregateRoot? aggregate = null) $"Cannot add event {eventEnvelope.Event.GetType()} as the current user."); } + await ValidateEvent(eventEnvelope); await ValidateEventResult(eventEnvelope, aggregate); - PublishedEventEnvelopes.Enqueue(eventEnvelope); + PublishedEventEnvelopes.Add(eventEnvelope); + } + + private async Task ValidateEvent(EventEnvelope eventEnvelope) + { + EnsureInstantiated(); + + await _validator!.ValidateEvent(this, eventEnvelope); } private async ValueTask ValidateEventResult(EventEnvelope envelope, T? aggregate) where T : AggregateRoot { + EnsureInstantiated(); + if (aggregate is null) return; // It's possible that the given aggregate does not have all necessary events applied yet. @@ -80,21 +93,15 @@ private async ValueTask ValidateEventResult(EventEnvelope envelope, T? aggreg if (aggregate.WhenInt(envelope) is not T result || result == aggregate) return; - await _validator.ValidateAggregate(result, this); + await _validator!.ValidateAggregate(result, this); } internal async ValueTask CommitInternal() { using var activity = FlussActivitySource.Source.StartActivity(); - var validatedEnvelopes = new List(); - foreach (var envelope in PublishedEventEnvelopes) - { - await _validator.ValidateEvent(envelope, validatedEnvelopes); - validatedEnvelopes.Add(envelope); - } - - await _eventRepository.Publish(PublishedEventEnvelopes); + EnsureInstantiated(); + await _eventRepository!.Publish(PublishedEventEnvelopes); _consistentVersion += PublishedEventEnvelopes.Count; PublishedEventEnvelopes.Clear(); } @@ -102,11 +109,13 @@ internal async ValueTask CommitInternal() private async ValueTask UpdateAndApplyPublished(TEventListener eventListener, long? at) where TEventListener : EventListener { - eventListener = await _eventListenerFactory.UpdateTo(eventListener, at ?? await ConsistentVersion()); + EnsureInstantiated(); + + eventListener = await _eventListenerFactory!.UpdateTo(eventListener, at ?? await ConsistentVersion()); foreach (var publishedEventEnvelope in PublishedEventEnvelopes) { - if (eventListener.Tag.LastSeen < publishedEventEnvelope.Version) + if (eventListener.LastSeenEvent < publishedEventEnvelope.Version) { eventListener = (TEventListener)eventListener.WhenInt(publishedEventEnvelope); } diff --git a/src/Fluss/UnitOfWork/UnitOfWork.Authorization.cs b/src/Fluss/UnitOfWork/UnitOfWork.Authorization.cs index c0e027c..d658fc3 100644 --- a/src/Fluss/UnitOfWork/UnitOfWork.Authorization.cs +++ b/src/Fluss/UnitOfWork/UnitOfWork.Authorization.cs @@ -8,45 +8,49 @@ public partial class UnitOfWork { private async ValueTask AuthorizeUsage(EventEnvelope envelope) { - var ac = new AuthContext(this, CurrentUserId()); -#if DEBUG - var all = -await Task.WhenAll(_policies.Select(async policy => (policy, await policy.AuthenticateEvent(envelope, ac))).ToList()); - - var rejected = all.Where(a => !a.Item2).ToList(); - var accepted = all.Where(a => a.Item2).ToList(); - - return accepted.Count > 0; -#else - return await AnyAsync(_policies.Select(p => p.AuthenticateEvent(envelope, ac))); -#endif + EnsureInstantiated(); + + var ac = AuthContext.Get(this, CurrentUserId()); + + try + { + foreach (var policy in _policies!) + { + if (await policy.AuthenticateEvent(envelope, ac)) + { + return true; + } + } + + return false; + } + finally + { + ac.Return(); + } } private async ValueTask AuthorizeUsage(IReadModel eventListener) { - var ac = new AuthContext(this, CurrentUserId()); -#if DEBUG - var all = -await Task.WhenAll(_policies.Select(async policy => (policy, await policy.AuthenticateReadModel(eventListener, ac))).ToList()); - - var rejected = all.Where(a => !a.Item2).ToList(); - var accepted = all.Where(a => a.Item2).ToList(); - - return accepted.Count > 0; -#else - return await AnyAsync(_policies.Select(p => p.AuthenticateReadModel(eventListener, ac))); -#endif - } + EnsureInstantiated(); - private static async ValueTask AnyAsync(IEnumerable> valueTasks) - { - foreach (var valueTask in valueTasks) + var ac = AuthContext.Get(this, CurrentUserId()); + + try { - if (await valueTask) + foreach (var policy in _policies!) { - return true; + if (await policy.AuthenticateReadModel(eventListener, ac)) + { + return true; + } } + + return false; + } + finally + { + ac.Return(); } - return false; } } diff --git a/src/Fluss/UnitOfWork/UnitOfWork.ReadModels.cs b/src/Fluss/UnitOfWork/UnitOfWork.ReadModels.cs index 492c3c1..464ca68 100644 --- a/src/Fluss/UnitOfWork/UnitOfWork.ReadModels.cs +++ b/src/Fluss/UnitOfWork/UnitOfWork.ReadModels.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using Collections.Pooled; using Fluss.Events; using Fluss.ReadModel; @@ -6,7 +7,7 @@ namespace Fluss; public partial class UnitOfWork { - private readonly ConcurrentBag _readModels = []; + private readonly PooledList _readModels = new(); public IReadOnlyCollection ReadModels => _readModels; public async ValueTask GetReadModel(Type tReadModel, object? key, long? at = null) @@ -38,7 +39,7 @@ public async ValueTask GetReadModel(Type tReadModel, object? key, lo if (at is null) { - _readModels.Add(eventListener); + RegisterReadModel(eventListener); } return readModel; @@ -59,7 +60,7 @@ public async ValueTask GetReadModel(long? at = null) if (at is null) { - _readModels.Add(readModel); + RegisterReadModel(readModel); } return readModel; @@ -72,7 +73,7 @@ public async ValueTask UnsafeGetReadModelWithoutAuthorization GetReadModel(TKey key, long if (at is null) { - _readModels.Add(readModel); + RegisterReadModel(readModel); } return readModel; @@ -107,7 +108,7 @@ public async ValueTask if (at is null) { - _readModels.Add(readModel); + RegisterReadModel(readModel); } return readModel; @@ -158,4 +159,12 @@ await Parallel.ForEachAsync(keysList, async (key, _) => return keysList.Select(k => dictionary[k]).ToList(); } + + private void RegisterReadModel(EventListener eventListener) + { + lock (_readModels) + { + _readModels.Add(eventListener); + } + } } diff --git a/src/Fluss/UnitOfWork/UnitOfWork.cs b/src/Fluss/UnitOfWork/UnitOfWork.cs index 8b76dc7..82c6b61 100644 --- a/src/Fluss/UnitOfWork/UnitOfWork.cs +++ b/src/Fluss/UnitOfWork/UnitOfWork.cs @@ -1,34 +1,58 @@ +using Collections.Pooled; using Fluss.Authentication; using Fluss.Events; using Fluss.Validation; +using Microsoft.Extensions.ObjectPool; namespace Fluss; public partial class UnitOfWork : IWriteUnitOfWork { - private readonly IEventListenerFactory _eventListenerFactory; - private readonly IEventRepository _eventRepository; - private readonly IEnumerable _policies; - private readonly IRootValidator _validator; - private readonly UserIdProvider _userIdProvider; + private class UnitOfWorkObjectPolicy : IPooledObjectPolicy + { + public UnitOfWork Create() => new(); + public bool Return(UnitOfWork obj) => true; + } + + private static readonly ObjectPool Pool = new DefaultObjectPool(new UnitOfWorkObjectPolicy()); + + private IEventListenerFactory? _eventListenerFactory; + private IEventRepository? _eventRepository; + private readonly PooledList _policies = new(); + private IRootValidator? _validator; + private UserIdProvider? _userIdProvider; private long? _consistentVersion; + private bool _isInstantiated; - private Task? _latestVersionLoader; + private UnitOfWork() + { + } - public UnitOfWork(IEventRepository eventRepository, IEventListenerFactory eventListenerFactory, + public static UnitOfWork Create(IEventRepository eventRepository, IEventListenerFactory eventListenerFactory, IEnumerable policies, UserIdProvider userIdProvider, IRootValidator validator) { - using var activity = FlussActivitySource.Source.StartActivity(); + var unitOfWork = Pool.Get(); + unitOfWork._eventRepository = eventRepository; + unitOfWork._eventListenerFactory = eventListenerFactory; + unitOfWork._policies.AddRange(policies); + unitOfWork._userIdProvider = userIdProvider; + unitOfWork._validator = validator; + unitOfWork._isInstantiated = true; + return unitOfWork; + } - _eventRepository = eventRepository; - _eventListenerFactory = eventListenerFactory; - _policies = policies; - _userIdProvider = userIdProvider; - _validator = validator; + private void EnsureInstantiated() + { + if (!_isInstantiated) + { + throw new InvalidOperationException("UnitOfWork is not properly instantiated."); + } } public async ValueTask ConsistentVersion() { + EnsureInstantiated(); + if (_consistentVersion.HasValue) { return _consistentVersion.Value; @@ -42,26 +66,25 @@ public async ValueTask ConsistentVersion() { return _consistentVersion.Value; } + } - _latestVersionLoader ??= Task.Run(async () => - { - var version = await _eventRepository.GetLatestVersion(); - _consistentVersion = version; - return version; - }); + var version = await _eventRepository!.GetLatestVersion(); + + lock (this) + { + _consistentVersion ??= version; } - return await _latestVersionLoader; + return _consistentVersion.Value; } public IUnitOfWork WithPrefilledVersion(long? version) { + EnsureInstantiated(); + lock (this) { - if (!_consistentVersion.HasValue && _latestVersionLoader == null) - { - _consistentVersion = version; - } + _consistentVersion ??= version; } return this; @@ -69,6 +92,23 @@ public IUnitOfWork WithPrefilledVersion(long? version) private Guid CurrentUserId() { - return _userIdProvider.Get(); + EnsureInstantiated(); + return _userIdProvider!.Get(); + } + + public void Dispose() + { + _eventListenerFactory = null; + _eventRepository = null; + _policies.Clear(); + _validator = null; + _userIdProvider = null; + _consistentVersion = null; + _readModels.Clear(); + _isInstantiated = false; + + Pool.Return(this); + + GC.SuppressFinalize(this); } } diff --git a/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs b/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs index 21869ce..aac3afe 100644 --- a/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs +++ b/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs @@ -23,7 +23,7 @@ public async ValueTask Commit(Func action) await RetryPolicy .ExecuteAsync(async () => { - var unitOfWork = serviceProvider.GetRequiredService(); + using var unitOfWork = serviceProvider.GetRequiredService(); await action(unitOfWork); await unitOfWork.CommitInternal(); }); @@ -36,7 +36,7 @@ public async ValueTask Commit(Func> action) return await RetryPolicy .ExecuteAsync(async () => { - var unitOfWork = serviceProvider.GetRequiredService(); + using var unitOfWork = serviceProvider.GetRequiredService(); var result = await action(unitOfWork); await unitOfWork.CommitInternal(); return result; diff --git a/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs b/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs index b5a9001..aad4a3b 100644 --- a/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs +++ b/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs @@ -12,7 +12,6 @@ public ValueTask ConsistentVersion() } public IReadOnlyCollection ReadModels => impl.ReadModels; - public ConcurrentQueue PublishedEventEnvelopes => impl.PublishedEventEnvelopes; public List RecordedListeners { get; } = []; @@ -79,7 +78,7 @@ public IReadOnlyList GetRecordedListeners() eventListenerTypeWithKeyAndVersions.Add(new EventListenerTypeWithKeyAndVersion( recordedListener.GetType(), recordedListener is IEventListenerWithKey keyListener ? keyListener.Id : null, - recordedListener.Tag.LastAccepted + recordedListener.LastAcceptedEvent )); } @@ -94,10 +93,17 @@ public async ValueTask IsStillUpToDate(IUnitOfWork unitOfWork, long? at = if (readModel is EventListener eventListener) { - return eventListener.Tag.LastAccepted <= Version; + return eventListener.LastAcceptedEvent <= Version; } return false; } } + + public void Dispose() + { + impl.Dispose(); + + GC.SuppressFinalize(this); + } } \ No newline at end of file diff --git a/src/Fluss/Validation/RootValidator.cs b/src/Fluss/Validation/RootValidator.cs index 7fe8757..7a1fc46 100644 --- a/src/Fluss/Validation/RootValidator.cs +++ b/src/Fluss/Validation/RootValidator.cs @@ -1,4 +1,5 @@ using System.Reflection; +using Collections.Pooled; using Fluss.Aggregates; using Fluss.Authentication; using Fluss.Events; @@ -7,7 +8,7 @@ namespace Fluss.Validation; public interface IRootValidator { - public Task ValidateEvent(EventEnvelope envelope, IReadOnlyList? PreviousEnvelopes = null); + public Task ValidateEvent(IUnitOfWork unitOfWork, EventEnvelope envelope); public Task ValidateAggregate(AggregateRoot aggregate, UnitOfWork unitOfWork); } @@ -56,18 +57,8 @@ private void CacheEventValidators(IEnumerable validators) } } - public async Task ValidateEvent(EventEnvelope envelope, IReadOnlyList? previousEnvelopes = null) + public async Task ValidateEvent(IUnitOfWork unitOfWork, EventEnvelope envelope) { - var unitOfWork = _arbitraryUserUnitOfWorkCache.GetUserUnitOfWork(envelope.By ?? SystemUser.SystemUserGuid); - - var willBePublishedEnvelopes = previousEnvelopes ?? new List(); - - var versionedUnitOfWork = unitOfWork.WithPrefilledVersion(envelope.Version - willBePublishedEnvelopes.Count - 1); - foreach (var willBePublishedEnvelope in willBePublishedEnvelopes) - { - versionedUnitOfWork.PublishedEventEnvelopes.Enqueue(willBePublishedEnvelope); - } - var type = envelope.Event.GetType(); if (!_eventValidators.TryGetValue(type, out var validators)) return; @@ -75,7 +66,7 @@ public async Task ValidateEvent(EventEnvelope envelope, IReadOnlyList - v.handler.Invoke(v.validator, [envelope.Event, versionedUnitOfWork])); + v.handler.Invoke(v.validator, [envelope.Event, unitOfWork])); await Task.WhenAll(invocations.Cast().Select(async x => await x)); }