Skip to content

Commit

Permalink
fix: all things wrong with the parameterbuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
mvarendorff2 committed Nov 5, 2024
1 parent d7c9a18 commit a4079e7
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 82 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ jobs:
- name: Test
run: dotnet test --collect:"XPlat Code Coverage" --no-restore --no-build
working-directory: ./src/Fluss.UnitTest
- name: Test HotChocolate
run: dotnet test --collect:"XPlat Code Coverage" --no-restore --no-build
working-directory: ./src/Fluss.HotChocolate.IntegrationTest
- name: Start PostgreSQL
run: docker compose up -d
working-directory: ./src/Fluss.PostgreSQL.IntegrationTest
Expand Down
67 changes: 16 additions & 51 deletions src/Fluss.HotChocolate.IntegrationTest/HotChocolateTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ public async Task Setup()
.AddQueryType()
.AddMutationType()
.AddTypeExtension<TodoQueries>()
.AddTypeExtension<TodoExtensions>()
.AddTypeExtension<TodoMutations>()
.AddLiveEventSourcing();

hostBuilder.WebHost.ConfigureKestrel(serveroptions =>
serveroptions.Configure().Endpoint(IPAddress.Loopback, 0));
hostBuilder.WebHost.ConfigureKestrel(serverOptions =>
serverOptions.Configure().Endpoint(IPAddress.Loopback, 0));

Host = hostBuilder.Build();
Host.UseWebSockets();
Expand All @@ -53,21 +52,9 @@ public async Task Setup()
Address = addressFeature!.Addresses.First();
}

private readonly Guid _aGuid = Guid.NewGuid();
protected UnitOfWorkFactory ArrangeUnitOfWorkFactory => Host.Services.GetUserUnitOfWorkFactory(_aGuid);

[Test]
public async Task JustWorks()
public async Task SubscriberReceivesLiveUpdates()
{
const int initialTodos = 5;
for (var i = 0; i < initialTodos; i++)
{
await ArrangeUnitOfWorkFactory.Commit(async unitOfWork =>
{
await TodoWrite.Create(unitOfWork, "Todo " + i);
});
}

var tokenSource = new CancellationTokenSource();
tokenSource.CancelAfter(30000);

Expand All @@ -76,24 +63,21 @@ await ArrangeUnitOfWorkFactory.Commit(async unitOfWork =>
// Receive initial response
await channel.Reader.WaitToReadAsync(tokenSource.Token);
var ids = await channel.Reader.ReadAsync(tokenSource.Token);
Assert.That(ids, Has.Count.EqualTo(initialTodos));
Assert.That(ids, Is.Empty);

for (var i = 0; i < 10; i++)
{
if (i % 2 == 0)
{
await SubscribeToTodos(default);
}

var newId = await CreateTodo("Todo");
var newId = await CreateTodo($"Todo {i}");
await channel.Reader.WaitToReadAsync(tokenSource.Token);
ids = await channel.Reader.ReadAsync(tokenSource.Token);
Assert.That(ids, Has.Count.EqualTo(i + initialTodos + 1));
Assert.That(ids, Contains.Item(Guid.Parse(newId)));
Assert.That(ids, Has.Count.EqualTo(i + 1));
Assert.That(ids, Contains.Item(newId));
}

await tokenSource.CancelAsync();
}

private async Task<string> CreateTodo(string todo)
private async Task<Guid> CreateTodo(string todo)
{
var httpClient = new HttpClient();
httpClient.BaseAddress = new Uri(Address);
Expand All @@ -110,7 +94,7 @@ private async Task<string> CreateTodo(string todo)
var responseBody = await response.Content.ReadFromJsonAsync<JsonElement>();
var id = responseBody.GetProperty("data").GetProperty("createTodo").GetProperty("id").GetString();

return id ?? "";
return Guid.Parse(id ?? "");
}

private async Task<Channel<List<Guid>>> SubscribeToTodos(CancellationToken ct)
Expand All @@ -135,26 +119,17 @@ private async Task<Channel<List<Guid>>> SubscribeToTodos(CancellationToken ct)
.Select(t => t.GetProperty("id").GetGuid())
.ToList();

var messageId = document.RootElement.GetProperty("id").GetString();
if (messageId == "0")
{
await channel.Writer.WriteAsync(ids, ct);
}
else
{
await socket.SendAsync(Encoding.UTF8.GetBytes($$"""{"id":"{{messageId}}","type":"complete"}""").ToArray().AsMemory(), WebSocketMessageType.Text, true, ct);
break;
}
await channel.Writer.WriteAsync(ids, ct);

if (ct.IsCancellationRequested) break;
}
}, ct);

var init = """{"type":"connection_init"}""";
const string init = """{"type":"connection_init"}""";
await socket.SendAsync(Encoding.UTF8.GetBytes(init).AsMemory(), WebSocketMessageType.Text, true, ct);

var x = """{"id":"0","type":"subscribe","payload":{"query":"\n query Todos {\n todos { id \n index } }\n","operationName":"Todos","variables":{}}}""";
await socket.SendAsync(Encoding.UTF8.GetBytes(x).AsMemory(), WebSocketMessageType.Text, true, ct);
const string subscription = """{"id":"0","type":"subscribe","payload":{"query":"\n query Todos {\n todos { id } }\n","operationName":"Todos","variables":{}}}""";
await socket.SendAsync(Encoding.UTF8.GetBytes(subscription).AsMemory(), WebSocketMessageType.Text, true, ct);

return channel;
}
Expand Down Expand Up @@ -284,16 +259,6 @@ public async Task<IEnumerable<TodoRead>> GetTodos(IUnitOfWork unitOfWork)
}
}

[ExtendObjectType<TodoRead>]
public class TodoExtensions
{
public async Task<int> Index([Parent] TodoRead todo, IUnitOfWork unitOfWork)
{
var indexModel = await unitOfWork.GetReadModel<TodoIndex, Guid>(todo.Id);
return indexModel.Index;
}
}

[MutationType]
public class TodoMutations
{
Expand All @@ -309,4 +274,4 @@ public async Task<TodoRead> CreateTodo([Service] IServiceProvider serviceProvide
}
}

# endregion HotChocolate
# endregion HotChocolate
45 changes: 25 additions & 20 deletions src/Fluss.HotChocolate/AddExtensionMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,36 @@ public async ValueTask InvokeAsync(IRequestContext context)
}
}

private async IAsyncEnumerable<IQueryResult> LiveResults(IReadOnlyDictionary<string, object?>? contextData, QueryResult firstResult, IQueryRequest originalRequest)
private IReadOnlyCollection<EventListener> GetCurrentListeners(IReadOnlyDictionary<string, object?>? contextData)
{
yield return firstResult;

using var serviceScope = rootServiceProvider.CreateScope();
var serviceProvider = serviceScope.ServiceProvider;
if (contextData == null)
{
logger.LogWarning("Trying to add live results but {ContextData} is null!", nameof(contextData));
throw new InvalidOperationException("Cannot fetch ReadModels from null context.");
}

if (!contextData.TryGetValue(nameof(IUnitOfWork), out var value) || value is not IUnitOfWork unitOfWork)
{
logger.LogWarning("Trying to add live results but {ContextData} does not contain UnitOfWork!", nameof(contextData));
throw new InvalidOperationException("Cannot fetch ReadModels when no UnitOfWork is present.");
}

return unitOfWork.ReadModels.ToList().AsReadOnly();
}

private async IAsyncEnumerable<IQueryResult> LiveResults(IReadOnlyDictionary<string, object?>? contextData, QueryResult firstResult, IQueryRequest originalRequest)
{
if (contextData == null)
{
logger.LogWarning("Trying to add live results but {ContextData} is null!", nameof(contextData));
yield break;
}

var listeners = GetCurrentListeners(contextData);
yield return firstResult;

using var serviceScope = rootServiceProvider.CreateScope();
var serviceProvider = serviceScope.ServiceProvider;

contextData.TryGetValue(nameof(HttpContext), out var httpContext);
var isWebsocket = (httpContext as HttpContext)?.WebSockets.IsWebSocketRequest ?? false;
Expand All @@ -84,20 +102,7 @@ private async IAsyncEnumerable<IQueryResult> LiveResults(IReadOnlyDictionary<str

while (true)
{
if (contextData == null || !contextData.TryGetValue(nameof(IUnitOfWork), out var value))
{
break;
}

if (value is not UnitOfWork unitOfWork)
{
break;
}

var latestPersistedEventVersion = await WaitForChange(
serviceProvider,
unitOfWork.ReadModels
);
var latestPersistedEventVersion = await WaitForChange(serviceProvider, listeners);

if (isWebsocket && contextSocketSession is ISocketSession socketSession && socketSession.Operations.All(operationSession => operationSession.Id != operationId?.ToString()))
{
Expand All @@ -119,8 +124,8 @@ private async IAsyncEnumerable<IQueryResult> LiveResults(IReadOnlyDictionary<str
break;
}

listeners = GetCurrentListeners(executionResult.ContextData);
yield return result;
contextData = executionResult.ContextData;

if (result.Errors?.Count > 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ private static IUnitOfWork CreateIUnitOfWork(IResolverContext context)
var createdUnitOfWork = context
.Service<IUnitOfWork>()
.WithPrefilledVersion(
context.GetGlobalState<long>(PrefillUnitOfWorkVersion)
context.GetGlobalStateOrDefault<long?>(PrefillUnitOfWorkVersion)
);

((IMiddlewareContext)context).RegisterForCleanup(createdUnitOfWork.Return);
// When uncommenting this, switch to copying lists in the middleware (= undesirable?)
((IMiddlewareContext)context).RegisterForCleanup(createdUnitOfWork.Return, CleanAfter.Request);

return createdUnitOfWork;
});
Expand Down
1 change: 1 addition & 0 deletions src/Fluss/UnitOfWork/UnitOfWork.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using Collections.Pooled;
using Fluss.Authentication;
using Fluss.Events;
Expand Down
30 changes: 21 additions & 9 deletions src/Fluss/UnitOfWork/UnitOfWorkFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,16 @@ await RetryPolicy
.ExecuteAsync(async () =>
{
var unitOfWork = serviceProvider.GetRequiredService<UnitOfWork>();
await action(unitOfWork);

await unitOfWork.CommitInternal();
await unitOfWork.Return();
try
{
await action(unitOfWork);
await unitOfWork.CommitInternal();
}
finally
{
await unitOfWork.Return();
}
});
}

Expand All @@ -39,11 +45,17 @@ public async ValueTask<T> Commit<T>(Func<IWriteUnitOfWork, ValueTask<T>> action)
.ExecuteAsync(async () =>
{
var unitOfWork = serviceProvider.GetRequiredService<UnitOfWork>();
var result = await action(unitOfWork);

await unitOfWork.CommitInternal();
await unitOfWork.Return();
return result;
try
{
var result = await action(unitOfWork);
await unitOfWork.CommitInternal();

return result;
}
finally
{
await unitOfWork.Return();
}
});
}
}
}

0 comments on commit a4079e7

Please sign in to comment.