Skip to content

Commit

Permalink
Improve benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
ThorstenThiel authored and Enterprize1 committed Sep 15, 2024
1 parent a4d9111 commit bdb3507
Show file tree
Hide file tree
Showing 39 changed files with 445 additions and 327 deletions.
4 changes: 3 additions & 1 deletion src/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ msbuild.wrn

.idea

*.received.*
*.received.*

*.user
50 changes: 28 additions & 22 deletions src/Benchmark/Bench.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet_GitCompare;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using Fluss;
using Fluss.Authentication;
Expand All @@ -9,8 +9,9 @@

namespace Benchmark;

[SimpleJob(baseline: true)]
[SimpleJob(RuntimeMoniker.Net90)]
[GitJob("879573d", baseline: true, id: "0_before")]
[SimpleJob(id: "1_after")]
[RPlotExporter]
[MemoryDiagnoser]
public class Bench
{
Expand All @@ -20,10 +21,10 @@ public void Setup()
var sc = new ServiceCollection();
sc.AddEventSourcing();
sc.AddPoliciesFrom(typeof(Bench).Assembly);

_sp = sc.BuildServiceProvider();
}

ServiceProvider _sp = null!;

[Benchmark]
Expand All @@ -32,71 +33,76 @@ public async Task<int> PublishEventsAndReadMixedReadWrite()
var sum = 0;
for (var j = 0; j < 1000; j++)
{
await _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork => {
await _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork =>
{
for (var i = 0; i < 50; i++)
{
await unitOfWork.Publish(new TestEvent(1));
await unitOfWork.Publish(new TestEvent(2));
}
});

var unitOfWork = _sp.GetSystemUserUnitOfWork();
using var unitOfWork = _sp.GetSystemUserUnitOfWork();
var readModel1 = await unitOfWork.GetReadModel<EventsEqualReadModel, int>(1);
var readModel2 = await unitOfWork.GetReadModel<EventsEqualReadModel, int>(2);
sum += readModel1.GotEvents + readModel2.GotEvents;
}

return sum;
}

[IterationSetup(Targets = [nameof(PublishEventsAndReadReadHeavySingleReadModel), nameof(PublishEventsAndReadReadHeavyMultipleReadModel)])]
public void SetupHeavyRead()
{
var sc = new ServiceCollection();
sc.AddEventSourcing();
sc.AddPoliciesFrom(typeof(Bench).Assembly);

_sp = sc.BuildServiceProvider();

_sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork => {
_sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork =>
{
for (var i = 0; i < 10000; i++)
{
await unitOfWork.Publish(new TestEvent(i));
}
}).AsTask().Wait();
}


[Benchmark]
public async Task<int> PublishEventsAndReadReadHeavySingleReadModel()
{
var sum = 0;

for (var j = 0; j < 50000; j++)
{
var unitOfWork = _sp.GetSystemUserUnitOfWork();
using var unitOfWork = _sp.GetSystemUserUnitOfWork();
var readModel1 = await unitOfWork.GetReadModel<EventsModEqualReadModel, int>(3);
sum += readModel1.GotEvents;
}

return sum;
}

[Benchmark]
public async Task<int> PublishEventsAndReadReadHeavyMultipleReadModel()
{
var sum = 0;

for (var j = 1; j < 5000; j++)
{
var unitOfWork = _sp.GetSystemUserUnitOfWork();
using var unitOfWork = _sp.GetSystemUserUnitOfWork();
var readModel1 = await unitOfWork.GetReadModel<EventsModEqualReadModel, int>(j);
sum += readModel1.GotEvents;
}

return sum;
}
}

public class AllowAllPolicy : Policy {
public class AllowAllPolicy : Policy
{
public ValueTask<bool> AuthenticateEvent(EventEnvelope envelope, IAuthContext authContext)
{
return ValueTask.FromResult(true);
Expand All @@ -122,7 +128,7 @@ protected override EventsEqualReadModel When(EventEnvelope envelope)
_ => this,
};
}
}
}

public record EventsModEqualReadModel : ReadModelWithKey<int>
{
Expand All @@ -136,4 +142,4 @@ protected override EventsModEqualReadModel When(EventEnvelope envelope)
_ => this,
};
}
}
}
2 changes: 1 addition & 1 deletion src/Benchmark/Benchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Benchmark/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Benchmark;
using BenchmarkDotNet.Running;

BenchmarkRunner.Run<Bench>();
BenchmarkRunner.Run<Bench>();
6 changes: 3 additions & 3 deletions src/Fluss.HotChocolate/AddExtensionMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ private static async ValueTask<long> WaitForChange(IServiceProvider serviceProvi

var cancellationTokenSource = new CancellationTokenSource();

var latestPersistedEventVersion = currentEventListener.Min(el => el.Tag.LastSeen);
var latestTransientEventVersion = currentEventListener.Min(el => el.Tag.LastSeenTransient);
var latestPersistedEventVersion = currentEventListener.Min(el => el.LastSeenEvent);
var latestTransientEventVersion = currentEventListener.Min(el => el.LastSeenTransientEvent);

var persistedEventTask = Task.Run(async () =>
{
Expand All @@ -156,7 +156,7 @@ private static async ValueTask<long> WaitForChange(IServiceProvider serviceProvi
var eventListener = currentEventListener[index];
var updatedEventListener = await eventListenerFactory.UpdateTo(eventListener, latestPersistedEventVersion);

if (updatedEventListener.Tag.LastAccepted > eventListener.Tag.LastAccepted)
if (updatedEventListener.LastAcceptedEvent > eventListener.LastAcceptedEvent)
{
return;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public PostgreSQLEventRepository(PostgreSQLConfig config)
dataSource = dataSourceBuilder.Build();
}

private async ValueTask Publish<TEnvelope>(IEnumerable<TEnvelope> envelopes, Func<TEnvelope, object> eventExtractor,
private async ValueTask Publish<TEnvelope>(IReadOnlyList<TEnvelope> envelopes, Func<TEnvelope, object> eventExtractor,
NpgsqlConnection? conn = null) where TEnvelope : Envelope
{
using var activity = ActivitySource.Source.StartActivity();
Expand Down Expand Up @@ -81,7 +81,7 @@ private async ValueTask Publish<TEnvelope>(IEnumerable<TEnvelope> envelopes, Fun
NotifyNewEvents();
}

public async ValueTask Publish(IEnumerable<EventEnvelope> envelopes)
public async ValueTask Publish(IReadOnlyList<EventEnvelope> envelopes)
{
await Publish(envelopes, e => e.Event);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Fluss.Testing/AggregateTestBed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ namespace Fluss.Testing;
public AggregateTestBed()
{
var validator = new Mock<IRootValidator>();
validator.Setup(v => v.ValidateEvent(It.IsAny<EventEnvelope>(), It.IsAny<IReadOnlyList<EventEnvelope>?>()))
validator.Setup(v => v.ValidateEvent(It.IsAny<IUnitOfWork>(), It.IsAny<EventEnvelope>()))
.Returns<EventEnvelope, IReadOnlyList<EventEnvelope>?>((_, _) => Task.CompletedTask);
validator.Setup(v => v.ValidateAggregate(It.IsAny<AggregateRoot>(), It.IsAny<UnitOfWork>()))
.Returns<AggregateRoot, UnitOfWork>((_, _) => Task.CompletedTask);

_unitOfWork = new UnitOfWork(EventRepository, EventListenerFactory, [new AllowAllPolicy()],
_unitOfWork = UnitOfWork.Create(EventRepository, EventListenerFactory, [new AllowAllPolicy()],
new UserIdProvider(_ => Guid.Empty, null!), validator.Object);
}

Expand Down
4 changes: 2 additions & 2 deletions src/Fluss.Testing/EventRepositoryTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public async Task ReturnsPublishedEvents()
public async Task ReturnsMultiplePublishedEvents()
{
var envelopes = GetMockEnvelopes(0, 1).ToList();
await Repository.Publish(envelopes.Take(1));
await Repository.Publish(envelopes.Skip(1));
await Repository.Publish(envelopes.Take(1).ToList());
await Repository.Publish(envelopes.Skip(1).ToList());

var gottenEnvelopes = await Repository.GetEvents(-1, 1).ToFlatEventList();

Expand Down
2 changes: 1 addition & 1 deletion src/Fluss.Testing/EventTestBed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public virtual EventTestBed WithEvents(params Event[] events)
At = DateTimeOffset.Now,
By = null,
Event = @event
})).AsTask().Wait();
}).ToList()).AsTask().Wait();

return this;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Fluss.Testing/ReadModelTestBed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class ReadModelTestBed : EventTestBed
var eventSourced = EventListenerFactory
.UpdateTo(new TReadModel(), EventRepository.GetLatestVersion().AsTask().Result).AsTask().Result;

Assert.Equal(readModel with { Tag = eventSourced.Tag }, eventSourced);
Assert.Equal(readModel, eventSourced);

AssertReadModelDoesNotReactToCanary(eventSourced);

Expand All @@ -24,7 +24,7 @@ public ReadModelTestBed ResultsIn<TReadModel, TKey>(TReadModel readModel)
var eventSourced = EventListenerFactory
.UpdateTo(new TReadModel { Id = readModel.Id }, EventRepository.GetLatestVersion().AsTask().Result).AsTask().Result;

Assert.Equal(readModel with { Tag = eventSourced.Tag }, eventSourced);
Assert.Equal(readModel, eventSourced);

AssertReadModelDoesNotReactToCanary(eventSourced);

Expand Down
2 changes: 1 addition & 1 deletion src/Fluss.UnitTest/Core/Authentication/AuthContextTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class AuthContextTest
public AuthContextTest()
{
_unitOfWork = new Mock<IUnitOfWork>();
_authContext = new AuthContext(_unitOfWork.Object, Guid.NewGuid());
_authContext = AuthContext.Get(_unitOfWork.Object, Guid.NewGuid());
}

[Fact]
Expand Down
14 changes: 7 additions & 7 deletions src/Fluss.UnitTest/Core/Events/InMemoryListenerCacheTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public async Task PassesUpdatesToNext()
public async Task ReturnsCachedEventListener()
{
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 100))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 }));

_ = await _listenerCache.UpdateTo(new TestEventListener(), 100);
_ = await _listenerCache.UpdateTo(new TestEventListener(), 100);
Expand All @@ -58,7 +58,7 @@ public async Task ReturnsCachedEventListener()
public async Task ReturnsCachedKeyedEventListener()
{
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<KeyedTestEventListener>(), 100))
.Returns(ValueTask.FromResult(new KeyedTestEventListener { Id = 1, Tag = new EventListenerVersionTag(100) }));
.Returns(ValueTask.FromResult(new KeyedTestEventListener { Id = 1, LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 }));

await _listenerCache.UpdateTo(new KeyedTestEventListener { Id = 1 }, 100);
await _listenerCache.UpdateTo(new KeyedTestEventListener { Id = 1 }, 100);
Expand All @@ -76,9 +76,9 @@ public async Task ReturnsCachedKeyedEventListener()
public async Task ForwardsIfCacheContainsNewer()
{
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 100))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 }));
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 90))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(90) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 90, LastSeenEvent = 90, LastSeenTransientEvent = -1 }));

await _listenerCache.UpdateTo(new TestEventListener(), 100);
await _listenerCache.UpdateTo(new TestEventListener(), 90);
Expand Down Expand Up @@ -106,9 +106,9 @@ public async Task UpdatesStoreWithNewerVersion()
var otherTestEventList = new TestEventListener();

_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 100))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 }));
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 110))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(110) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 110, LastSeenEvent = 110, LastSeenTransientEvent = -1 }));

await _listenerCache.UpdateTo(testEventListener, 100);

Expand All @@ -128,7 +128,7 @@ public async Task UpdatesStoreWithNewerVersion()
public async Task ForwardsAgainIfCleaned()
{
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 100))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 }));

await _listenerCache.UpdateTo(new TestEventListener(), 100);
_listenerCache.Clean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public async Task AppliesTransientEvents()

var updatedReadModel = await _transientFactory.UpdateTo(readModel, -1);

Assert.Equal(0, updatedReadModel.Tag.LastSeenTransient);
Assert.Equal(0, updatedReadModel.LastSeenTransientEvent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ public async Task DoesNotReuseCacheWhenNewEventIsAdded()
{
_policies.Add(new AllowReadAfterEventPolicy());

var unitOfWork = GetUnitOfWork();
await Assert.ThrowsAsync<UnauthorizedAccessException>(async () =>
{
await _unitOfWork.GetReadModel<TestReadModel, int>(1);
await unitOfWork.GetReadModel<TestReadModel, int>(1);
});

await _unitOfWork.Publish(new AllowEvent());
await _unitOfWork.GetReadModel<TestReadModel, int>(1);
await unitOfWork.Publish(new AllowEvent());
await unitOfWork.GetReadModel<TestReadModel, int>(1);
}

private record AllowEvent : Event;
Expand Down
Loading

0 comments on commit bdb3507

Please sign in to comment.