Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Enterprize1 committed Sep 6, 2024
1 parent c817835 commit 5c56c7c
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 8 deletions.
4 changes: 3 additions & 1 deletion src/Benchmark/Bench.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet_GitCompare;
using BenchmarkDotNet.Attributes;
using Fluss;
using Fluss.Authentication;
using Fluss.Events;
Expand All @@ -8,6 +9,7 @@
namespace Benchmark;

[SimpleJob]
[GitJob(baseline: true)]
[MemoryDiagnoser]
public class Bench
{
Expand Down
11 changes: 10 additions & 1 deletion src/Benchmark/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
using Benchmark;
using BenchmarkDotNet.Running;

BenchmarkRunner.Run<Bench>();
if (false)
{
BenchmarkRunner.Run<Bench>();

Check warning on line 6 in src/Benchmark/Program.cs

View workflow job for this annotation

GitHub Actions / build

Unreachable code detected

Check warning on line 6 in src/Benchmark/Program.cs

View workflow job for this annotation

GitHub Actions / build

Unreachable code detected
}
else
{
var bench = new Bench();
bench.Setup();
await bench.PublishEventsAndRead();
}
2 changes: 1 addition & 1 deletion src/Fluss/UnitOfWork/IUnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public interface IUnitOfWork
{
ValueTask<long> ConsistentVersion();
IReadOnlyCollection<EventListener> ReadModels { get; }
ConcurrentQueue<EventEnvelope> PublishedEventEnvelopes { get; }
IReadOnlyCollection<EventEnvelope> PublishedEventEnvelopes { get; }

ValueTask<IReadModel> GetReadModel(Type tReadModel, object? key, long? at = null);

Expand Down
8 changes: 5 additions & 3 deletions src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ namespace Fluss;

public partial class UnitOfWork
{
public ConcurrentQueue<EventEnvelope> PublishedEventEnvelopes { get; } = new();
// TODO: Use https://github.com/jtmueller/Collections.Pooled
internal readonly List<EventEnvelope> _publishedEventEnvelopes = [];
public IReadOnlyCollection<EventEnvelope> PublishedEventEnvelopes => _publishedEventEnvelopes;

public async ValueTask<TAggregate> GetAggregate<TAggregate>() where TAggregate : AggregateRoot, new()
{
Expand Down Expand Up @@ -68,7 +70,7 @@ public async ValueTask Publish(Event @event, AggregateRoot? aggregate = null)

await ValidateEventResult(eventEnvelope, aggregate);

PublishedEventEnvelopes.Enqueue(eventEnvelope);
_publishedEventEnvelopes.Add(eventEnvelope);
}

private async ValueTask ValidateEventResult<T>(EventEnvelope envelope, T? aggregate) where T : AggregateRoot
Expand Down Expand Up @@ -96,7 +98,7 @@ internal async ValueTask CommitInternal()

await _eventRepository.Publish(PublishedEventEnvelopes);
_consistentVersion += PublishedEventEnvelopes.Count;
PublishedEventEnvelopes.Clear();
_publishedEventEnvelopes.Clear();
}

private async ValueTask<TEventListener> UpdateAndApplyPublished<TEventListener>(TEventListener eventListener, long? at)
Expand Down
2 changes: 1 addition & 1 deletion src/Fluss/UnitOfWork/UnitOfWorkRecordingProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public ValueTask<long> ConsistentVersion()
}

public IReadOnlyCollection<EventListener> ReadModels => impl.ReadModels;
public ConcurrentQueue<EventEnvelope> PublishedEventEnvelopes => impl.PublishedEventEnvelopes;
public IReadOnlyCollection<EventEnvelope> PublishedEventEnvelopes => impl.PublishedEventEnvelopes;

public List<EventListener> RecordedListeners { get; } = [];

Expand Down
2 changes: 1 addition & 1 deletion src/Fluss/Validation/RootValidator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public async Task ValidateEvent(EventEnvelope envelope, IReadOnlyList<EventEnvel
var versionedUnitOfWork = unitOfWork.WithPrefilledVersion(envelope.Version - willBePublishedEnvelopes.Count - 1);
foreach (var willBePublishedEnvelope in willBePublishedEnvelopes)
{
versionedUnitOfWork.PublishedEventEnvelopes.Enqueue(willBePublishedEnvelope);
((List<EventEnvelope>)versionedUnitOfWork.PublishedEventEnvelopes).Add(willBePublishedEnvelope);
}

var type = envelope.Event.GetType();
Expand Down

0 comments on commit 5c56c7c

Please sign in to comment.