diff --git a/src/Fluss.HotChocolate/Fluss.HotChocolate.csproj b/src/Fluss.HotChocolate/Fluss.HotChocolate.csproj index bd24d8e..b2bd8dc 100644 --- a/src/Fluss.HotChocolate/Fluss.HotChocolate.csproj +++ b/src/Fluss.HotChocolate/Fluss.HotChocolate.csproj @@ -11,7 +11,6 @@ https://github.com/atmina/fluss git MIT - true diff --git a/src/Fluss.Regen/FileBuilders/SelectorFileBuilder.cs b/src/Fluss.Regen/FileBuilders/SelectorFileBuilder.cs index 5910c81..574259b 100644 --- a/src/Fluss.Regen/FileBuilders/SelectorFileBuilder.cs +++ b/src/Fluss.Regen/FileBuilders/SelectorFileBuilder.cs @@ -39,11 +39,14 @@ public void WriteClassHeader() public void WriteEndNamespace() { - _writer.WriteIndentedLine("private record CacheEntryValue(object Value, global::System.Collections.Generic.IReadOnlyList? EventListeners);"); + _writer.WriteIndentedLine("private record CacheEntryValue(object Value, global::System.Collections.Generic.IReadOnlyList? EventListeners, long CreatedAtVersion);"); _writer.WriteLine(); _writer.WriteIndented("private static async global::System.Threading.Tasks.ValueTask MatchesEventListenerState(global::Fluss.IUnitOfWork unitOfWork, CacheEntryValue value) "); using (_writer.WriteBraces()) { + _writer.WriteIndentedLine("var version = await unitOfWork.ConsistentVersion();"); + _writer.WriteIndentedLine("if (value.CreatedAtVersion == version) return true;"); + _writer.WriteIndented("foreach (var eventListenerData in value.EventListeners ?? global::System.Array.Empty()) "); using (_writer.WriteBraces()) { @@ -152,7 +155,7 @@ public void WriteMethodCacheMiss(string returnType) using (_writer.WriteBraces()) { - _writer.WriteIndentedLine("entry.Value = new CacheEntryValue(result, recordingUnitOfWork.GetRecordedListeners());"); + _writer.WriteIndentedLine("entry.Value = new CacheEntryValue(result, recordingUnitOfWork.GetRecordedListeners(), await unitOfWork.ConsistentVersion());"); _writer.WriteIndentedLine("entry.Size = 1;"); } diff --git a/src/Fluss.UnitTest/.gitignore b/src/Fluss.UnitTest/.gitignore new file mode 100644 index 0000000..bd49926 --- /dev/null +++ b/src/Fluss.UnitTest/.gitignore @@ -0,0 +1,2 @@ +TestResults/ + diff --git a/src/Fluss/UnitOfWork/IUnitOfWork.cs b/src/Fluss/UnitOfWork/IUnitOfWork.cs index 4a11411..2cda0ad 100644 --- a/src/Fluss/UnitOfWork/IUnitOfWork.cs +++ b/src/Fluss/UnitOfWork/IUnitOfWork.cs @@ -34,4 +34,5 @@ ValueTask UnsafeGetReadModelWithoutAuthorization(long? a where TKey : notnull where TReadModel : EventListener, IReadModel, IEventListenerWithKey, new(); IUnitOfWork WithPrefilledVersion(long? version); + IUnitOfWork CopyWithVersion(long version); } diff --git a/src/Fluss/UnitOfWork/UnitOfWork.cs b/src/Fluss/UnitOfWork/UnitOfWork.cs index 82c6b61..61fa86a 100644 --- a/src/Fluss/UnitOfWork/UnitOfWork.cs +++ b/src/Fluss/UnitOfWork/UnitOfWork.cs @@ -90,6 +90,14 @@ public IUnitOfWork WithPrefilledVersion(long? version) return this; } + public IUnitOfWork CopyWithVersion(long version) + { + EnsureInstantiated(); + + var unitOfWork = Create(_eventRepository!, _eventListenerFactory!, _policies, _userIdProvider!, _validator!); + return unitOfWork.WithPrefilledVersion(version); + } + private Guid CurrentUserId() { EnsureInstantiated(); diff --git a/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs b/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs index 6cfaef2..96ea262 100644 --- a/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs +++ b/src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs @@ -1,10 +1,11 @@ -using System.Collections.Concurrent; -using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.CodeAnalysis; using Fluss.Events; using Fluss.ReadModel; namespace Fluss; +public class AtNotAllowedInSelectorException : Exception; + public class UnitOfWorkRecordingProxy(IUnitOfWork impl) : IUnitOfWork { public ValueTask ConsistentVersion() @@ -18,42 +19,54 @@ public ValueTask ConsistentVersion() public ValueTask GetReadModel([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicParameterlessConstructor)] Type tReadModel, object? key, long? at = null) { + if (at.HasValue) throw new AtNotAllowedInSelectorException(); return impl.GetReadModel(tReadModel, key, at); } public ValueTask GetReadModel(long? at = null) where TReadModel : EventListener, IRootEventListener, IReadModel, new() { + if (at.HasValue) throw new AtNotAllowedInSelectorException(); return Record(impl.GetReadModel(at)); } public ValueTask GetReadModel(TKey key, long? at = null) where TReadModel : EventListener, IEventListenerWithKey, IReadModel, new() { + if (at.HasValue) throw new AtNotAllowedInSelectorException(); return Record(impl.GetReadModel(key, at)); } public ValueTask UnsafeGetReadModelWithoutAuthorization(long? at = null) where TReadModel : EventListener, IRootEventListener, IReadModel, new() { + if (at.HasValue) throw new AtNotAllowedInSelectorException(); return Record(impl.UnsafeGetReadModelWithoutAuthorization(at)); } public ValueTask UnsafeGetReadModelWithoutAuthorization(TKey key, long? at = null) where TReadModel : EventListener, IEventListenerWithKey, IReadModel, new() { + if (at.HasValue) throw new AtNotAllowedInSelectorException(); return Record(impl.UnsafeGetReadModelWithoutAuthorization(key, at)); } public ValueTask> GetMultipleReadModels(IEnumerable keys, long? at = null) where TReadModel : EventListener, IReadModel, IEventListenerWithKey, new() where TKey : notnull { + if (at.HasValue) throw new AtNotAllowedInSelectorException(); return Record(impl.GetMultipleReadModels(keys, at)); } public ValueTask> UnsafeGetMultipleReadModelsWithoutAuthorization(IEnumerable keys, long? at = null) where TReadModel : EventListener, IReadModel, IEventListenerWithKey, new() where TKey : notnull { + if (at.HasValue) throw new AtNotAllowedInSelectorException(); return Record(impl.UnsafeGetMultipleReadModelsWithoutAuthorization(keys, at)); } public IUnitOfWork WithPrefilledVersion(long? version) { - return impl.WithPrefilledVersion(version); + throw new AtNotAllowedInSelectorException(); + } + + public IUnitOfWork CopyWithVersion(long version) + { + throw new AtNotAllowedInSelectorException(); } private async ValueTask Record(ValueTask readModel) where TReadModel : EventListener