diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index d03c80f..8b8b2a0 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -45,6 +45,9 @@ jobs: - name: Test run: dotnet test --collect:"XPlat Code Coverage" --no-restore --no-build working-directory: ./src/Fluss.UnitTest + - name: Test HotChocolate + run: dotnet test --collect:"XPlat Code Coverage" --no-restore --no-build + working-directory: ./src/Fluss.HotChocolate.IntegrationTest - name: Start PostgreSQL run: docker compose up -d working-directory: ./src/Fluss.PostgreSQL.IntegrationTest diff --git a/src/Fluss.HotChocolate.IntegrationTest/HotChocolateTest.cs b/src/Fluss.HotChocolate.IntegrationTest/HotChocolateTest.cs index dc40be4..caab805 100644 --- a/src/Fluss.HotChocolate.IntegrationTest/HotChocolateTest.cs +++ b/src/Fluss.HotChocolate.IntegrationTest/HotChocolateTest.cs @@ -34,12 +34,11 @@ public async Task Setup() .AddQueryType() .AddMutationType() .AddTypeExtension() - .AddTypeExtension() .AddTypeExtension() .AddLiveEventSourcing(); - hostBuilder.WebHost.ConfigureKestrel(serveroptions => - serveroptions.Configure().Endpoint(IPAddress.Loopback, 0)); + hostBuilder.WebHost.ConfigureKestrel(serverOptions => + serverOptions.Configure().Endpoint(IPAddress.Loopback, 0)); Host = hostBuilder.Build(); Host.UseWebSockets(); @@ -53,21 +52,9 @@ public async Task Setup() Address = addressFeature!.Addresses.First(); } - private readonly Guid _aGuid = Guid.NewGuid(); - protected UnitOfWorkFactory ArrangeUnitOfWorkFactory => Host.Services.GetUserUnitOfWorkFactory(_aGuid); - [Test] - public async Task JustWorks() + public async Task SubscriberReceivesLiveUpdates() { - const int initialTodos = 5; - for (var i = 0; i < initialTodos; i++) - { - await ArrangeUnitOfWorkFactory.Commit(async unitOfWork => - { - await TodoWrite.Create(unitOfWork, "Todo " + i); - }); - } - var tokenSource = new CancellationTokenSource(); tokenSource.CancelAfter(30000); @@ -76,24 +63,21 @@ await ArrangeUnitOfWorkFactory.Commit(async unitOfWork => // Receive initial response await channel.Reader.WaitToReadAsync(tokenSource.Token); var ids = await channel.Reader.ReadAsync(tokenSource.Token); - Assert.That(ids, Has.Count.EqualTo(initialTodos)); + Assert.That(ids, Is.Empty); for (var i = 0; i < 10; i++) { - if (i % 2 == 0) - { - await SubscribeToTodos(default); - } - - var newId = await CreateTodo("Todo"); + var newId = await CreateTodo($"Todo {i}"); await channel.Reader.WaitToReadAsync(tokenSource.Token); ids = await channel.Reader.ReadAsync(tokenSource.Token); - Assert.That(ids, Has.Count.EqualTo(i + initialTodos + 1)); - Assert.That(ids, Contains.Item(Guid.Parse(newId))); + Assert.That(ids, Has.Count.EqualTo(i + 1)); + Assert.That(ids, Contains.Item(newId)); } + + await tokenSource.CancelAsync(); } - private async Task CreateTodo(string todo) + private async Task CreateTodo(string todo) { var httpClient = new HttpClient(); httpClient.BaseAddress = new Uri(Address); @@ -110,7 +94,7 @@ private async Task CreateTodo(string todo) var responseBody = await response.Content.ReadFromJsonAsync(); var id = responseBody.GetProperty("data").GetProperty("createTodo").GetProperty("id").GetString(); - return id ?? ""; + return Guid.Parse(id ?? ""); } private async Task>> SubscribeToTodos(CancellationToken ct) @@ -135,26 +119,17 @@ private async Task>> SubscribeToTodos(CancellationToken ct) .Select(t => t.GetProperty("id").GetGuid()) .ToList(); - var messageId = document.RootElement.GetProperty("id").GetString(); - if (messageId == "0") - { - await channel.Writer.WriteAsync(ids, ct); - } - else - { - await socket.SendAsync(Encoding.UTF8.GetBytes($$"""{"id":"{{messageId}}","type":"complete"}""").ToArray().AsMemory(), WebSocketMessageType.Text, true, ct); - break; - } + await channel.Writer.WriteAsync(ids, ct); if (ct.IsCancellationRequested) break; } }, ct); - var init = """{"type":"connection_init"}"""; + const string init = """{"type":"connection_init"}"""; await socket.SendAsync(Encoding.UTF8.GetBytes(init).AsMemory(), WebSocketMessageType.Text, true, ct); - var x = """{"id":"0","type":"subscribe","payload":{"query":"\n query Todos {\n todos { id \n index } }\n","operationName":"Todos","variables":{}}}"""; - await socket.SendAsync(Encoding.UTF8.GetBytes(x).AsMemory(), WebSocketMessageType.Text, true, ct); + const string subscription = """{"id":"0","type":"subscribe","payload":{"query":"\n query Todos {\n todos { id } }\n","operationName":"Todos","variables":{}}}"""; + await socket.SendAsync(Encoding.UTF8.GetBytes(subscription).AsMemory(), WebSocketMessageType.Text, true, ct); return channel; } @@ -284,16 +259,6 @@ public async Task> GetTodos(IUnitOfWork unitOfWork) } } -[ExtendObjectType] -public class TodoExtensions -{ - public async Task Index([Parent] TodoRead todo, IUnitOfWork unitOfWork) - { - var indexModel = await unitOfWork.GetReadModel(todo.Id); - return indexModel.Index; - } -} - [MutationType] public class TodoMutations { @@ -309,4 +274,4 @@ public async Task CreateTodo([Service] IServiceProvider serviceProvide } } -# endregion HotChocolate \ No newline at end of file +# endregion HotChocolate diff --git a/src/Fluss.HotChocolate/AddExtensionMiddleware.cs b/src/Fluss.HotChocolate/AddExtensionMiddleware.cs index 30eca4f..4cda49e 100644 --- a/src/Fluss.HotChocolate/AddExtensionMiddleware.cs +++ b/src/Fluss.HotChocolate/AddExtensionMiddleware.cs @@ -54,18 +54,36 @@ public async ValueTask InvokeAsync(IRequestContext context) } } - private async IAsyncEnumerable LiveResults(IReadOnlyDictionary? contextData, QueryResult firstResult, IQueryRequest originalRequest) + private IReadOnlyCollection GetCurrentListeners(IReadOnlyDictionary? contextData) { - yield return firstResult; - - using var serviceScope = rootServiceProvider.CreateScope(); - var serviceProvider = serviceScope.ServiceProvider; + if (contextData == null) + { + logger.LogWarning("Trying to add live results but {ContextData} is null!", nameof(contextData)); + throw new InvalidOperationException("Cannot fetch ReadModels from null context."); + } + + if (!contextData.TryGetValue(nameof(IUnitOfWork), out var value) || value is not IUnitOfWork unitOfWork) + { + logger.LogWarning("Trying to add live results but {ContextData} does not contain UnitOfWork!", nameof(contextData)); + throw new InvalidOperationException("Cannot fetch ReadModels when no UnitOfWork is present."); + } + return unitOfWork.ReadModels.ToList().AsReadOnly(); + } + + private async IAsyncEnumerable LiveResults(IReadOnlyDictionary? contextData, QueryResult firstResult, IQueryRequest originalRequest) + { if (contextData == null) { logger.LogWarning("Trying to add live results but {ContextData} is null!", nameof(contextData)); yield break; } + + var listeners = GetCurrentListeners(contextData); + yield return firstResult; + + using var serviceScope = rootServiceProvider.CreateScope(); + var serviceProvider = serviceScope.ServiceProvider; contextData.TryGetValue(nameof(HttpContext), out var httpContext); var isWebsocket = (httpContext as HttpContext)?.WebSockets.IsWebSocketRequest ?? false; @@ -84,20 +102,7 @@ private async IAsyncEnumerable LiveResults(IReadOnlyDictionary operationSession.Id != operationId?.ToString())) { @@ -119,8 +124,8 @@ private async IAsyncEnumerable LiveResults(IReadOnlyDictionary 0) { diff --git a/src/Fluss.HotChocolate/UnitOfWorkParameterExpressionBuilder.cs b/src/Fluss.HotChocolate/UnitOfWorkParameterExpressionBuilder.cs index 99e85de..4565799 100644 --- a/src/Fluss.HotChocolate/UnitOfWorkParameterExpressionBuilder.cs +++ b/src/Fluss.HotChocolate/UnitOfWorkParameterExpressionBuilder.cs @@ -15,10 +15,11 @@ private static IUnitOfWork CreateIUnitOfWork(IResolverContext context) var createdUnitOfWork = context .Service() .WithPrefilledVersion( - context.GetGlobalState(PrefillUnitOfWorkVersion) + context.GetGlobalStateOrDefault(PrefillUnitOfWorkVersion) ); - ((IMiddlewareContext)context).RegisterForCleanup(createdUnitOfWork.Return); + // When uncommenting this, switch to copying lists in the middleware (= undesirable?) + ((IMiddlewareContext)context).RegisterForCleanup(createdUnitOfWork.Return, CleanAfter.Request); return createdUnitOfWork; }); diff --git a/src/Fluss/UnitOfWork/UnitOfWork.cs b/src/Fluss/UnitOfWork/UnitOfWork.cs index bec2438..a0814bc 100644 --- a/src/Fluss/UnitOfWork/UnitOfWork.cs +++ b/src/Fluss/UnitOfWork/UnitOfWork.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using Collections.Pooled; using Fluss.Authentication; using Fluss.Events; diff --git a/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs b/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs index c087b7e..cd727e7 100644 --- a/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs +++ b/src/Fluss/UnitOfWork/UnitOfWorkFactory.cs @@ -24,10 +24,16 @@ await RetryPolicy .ExecuteAsync(async () => { var unitOfWork = serviceProvider.GetRequiredService(); - await action(unitOfWork); - await unitOfWork.CommitInternal(); - await unitOfWork.Return(); + try + { + await action(unitOfWork); + await unitOfWork.CommitInternal(); + } + finally + { + await unitOfWork.Return(); + } }); } @@ -39,11 +45,17 @@ public async ValueTask Commit(Func> action) .ExecuteAsync(async () => { var unitOfWork = serviceProvider.GetRequiredService(); - var result = await action(unitOfWork); - - await unitOfWork.CommitInternal(); - await unitOfWork.Return(); - return result; + try + { + var result = await action(unitOfWork); + await unitOfWork.CommitInternal(); + + return result; + } + finally + { + await unitOfWork.Return(); + } }); } -} +} \ No newline at end of file