Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

21 add benchmarks #32

Merged
merged 2 commits into from
Sep 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ msbuild.wrn

.idea

*.received.*
*.received.*

*.user
145 changes: 145 additions & 0 deletions src/Benchmark/Bench.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
using BenchmarkDotNet_GitCompare;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using Fluss;
using Fluss.Authentication;
using Fluss.Events;
using Fluss.ReadModel;
using Microsoft.Extensions.DependencyInjection;

namespace Benchmark;

[GitJob("879573d", baseline: true, id: "0_before")]
[SimpleJob(id: "1_after")]
[RPlotExporter]
[MemoryDiagnoser]
public class Bench
{
[IterationSetup]
public void Setup()
{
var sc = new ServiceCollection();
sc.AddEventSourcing();
sc.AddPoliciesFrom(typeof(Bench).Assembly);

_sp = sc.BuildServiceProvider();
}

ServiceProvider _sp = null!;

[Benchmark]
public async Task<int> PublishEventsAndReadMixedReadWrite()
{
var sum = 0;
for (var j = 0; j < 1000; j++)
{
await _sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork =>
{
for (var i = 0; i < 50; i++)
{
await unitOfWork.Publish(new TestEvent(1));
await unitOfWork.Publish(new TestEvent(2));
}
});

using var unitOfWork = _sp.GetSystemUserUnitOfWork();
var readModel1 = await unitOfWork.GetReadModel<EventsEqualReadModel, int>(1);
var readModel2 = await unitOfWork.GetReadModel<EventsEqualReadModel, int>(2);
sum += readModel1.GotEvents + readModel2.GotEvents;
}

return sum;
}

[IterationSetup(Targets = [nameof(PublishEventsAndReadReadHeavySingleReadModel), nameof(PublishEventsAndReadReadHeavyMultipleReadModel)])]
public void SetupHeavyRead()
{
var sc = new ServiceCollection();
sc.AddEventSourcing();
sc.AddPoliciesFrom(typeof(Bench).Assembly);

_sp = sc.BuildServiceProvider();

_sp.GetSystemUserUnitOfWorkFactory().Commit(async unitOfWork =>
{
for (var i = 0; i < 10000; i++)
{
await unitOfWork.Publish(new TestEvent(i));
}
}).AsTask().Wait();
}


[Benchmark]
public async Task<int> PublishEventsAndReadReadHeavySingleReadModel()
{
var sum = 0;

for (var j = 0; j < 50000; j++)
{
using var unitOfWork = _sp.GetSystemUserUnitOfWork();
var readModel1 = await unitOfWork.GetReadModel<EventsModEqualReadModel, int>(3);
sum += readModel1.GotEvents;
}

return sum;
}

[Benchmark]
public async Task<int> PublishEventsAndReadReadHeavyMultipleReadModel()
{
var sum = 0;

for (var j = 1; j < 5000; j++)
{
using var unitOfWork = _sp.GetSystemUserUnitOfWork();
var readModel1 = await unitOfWork.GetReadModel<EventsModEqualReadModel, int>(j);
sum += readModel1.GotEvents;
}

return sum;
}
}

public class AllowAllPolicy : Policy
{
public ValueTask<bool> AuthenticateEvent(EventEnvelope envelope, IAuthContext authContext)
{
return ValueTask.FromResult(true);
}

public ValueTask<bool> AuthenticateReadModel(IReadModel readModel, IAuthContext authContext)
{
return ValueTask.FromResult(true);
}
}

public record TestEvent(int Id) : Event;

public record EventsEqualReadModel : ReadModelWithKey<int>
{
public int GotEvents { get; private init; }

protected override EventsEqualReadModel When(EventEnvelope envelope)
{
return envelope.Event switch
{
TestEvent testEvent when testEvent.Id == Id => this with { GotEvents = GotEvents + 1 },
_ => this,
};
}
}

public record EventsModEqualReadModel : ReadModelWithKey<int>
{
public int GotEvents { get; private init; }

protected override EventsModEqualReadModel When(EventEnvelope envelope)
{
return envelope.Event switch
{
TestEvent testEvent when testEvent.Id % Id == 0 => this with { GotEvents = GotEvents + 1 },
_ => this,
};
}
}
19 changes: 19 additions & 0 deletions src/Benchmark/Benchmark.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.14.0" />
<PackageReference Include="Enterprize1.BenchmarkDotNet.GitCompare" Version="0.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Fluss\Fluss.csproj" />
</ItemGroup>

</Project>
4 changes: 4 additions & 0 deletions src/Benchmark/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
using Benchmark;
using BenchmarkDotNet.Running;

BenchmarkRunner.Run<Bench>();
6 changes: 3 additions & 3 deletions src/Fluss.HotChocolate/AddExtensionMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ private static async ValueTask<long> WaitForChange(IServiceProvider serviceProvi

var cancellationTokenSource = new CancellationTokenSource();

var latestPersistedEventVersion = currentEventListener.Min(el => el.Tag.LastSeen);
var latestTransientEventVersion = currentEventListener.Min(el => el.Tag.LastSeenTransient);
var latestPersistedEventVersion = currentEventListener.Min(el => el.LastSeenEvent);
var latestTransientEventVersion = currentEventListener.Min(el => el.LastSeenTransientEvent);

var persistedEventTask = Task.Run(async () =>
{
Expand All @@ -156,7 +156,7 @@ private static async ValueTask<long> WaitForChange(IServiceProvider serviceProvi
var eventListener = currentEventListener[index];
var updatedEventListener = await eventListenerFactory.UpdateTo(eventListener, latestPersistedEventVersion);

if (updatedEventListener.Tag.LastAccepted > eventListener.Tag.LastAccepted)
if (updatedEventListener.LastAcceptedEvent > eventListener.LastAcceptedEvent)
{
return;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public PostgreSQLEventRepository(PostgreSQLConfig config)
dataSource = dataSourceBuilder.Build();
}

private async ValueTask Publish<TEnvelope>(IEnumerable<TEnvelope> envelopes, Func<TEnvelope, object> eventExtractor,
private async ValueTask Publish<TEnvelope>(IReadOnlyList<TEnvelope> envelopes, Func<TEnvelope, object> eventExtractor,
NpgsqlConnection? conn = null) where TEnvelope : Envelope
{
using var activity = ActivitySource.Source.StartActivity();
Expand Down Expand Up @@ -81,7 +81,7 @@ private async ValueTask Publish<TEnvelope>(IEnumerable<TEnvelope> envelopes, Fun
NotifyNewEvents();
}

public async ValueTask Publish(IEnumerable<EventEnvelope> envelopes)
public async ValueTask Publish(IReadOnlyList<EventEnvelope> envelopes)
{
await Publish(envelopes, e => e.Event);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Fluss.Testing/AggregateTestBed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ namespace Fluss.Testing;
public AggregateTestBed()
{
var validator = new Mock<IRootValidator>();
validator.Setup(v => v.ValidateEvent(It.IsAny<EventEnvelope>(), It.IsAny<IReadOnlyList<EventEnvelope>?>()))
validator.Setup(v => v.ValidateEvent(It.IsAny<IUnitOfWork>(), It.IsAny<EventEnvelope>()))
.Returns<EventEnvelope, IReadOnlyList<EventEnvelope>?>((_, _) => Task.CompletedTask);
validator.Setup(v => v.ValidateAggregate(It.IsAny<AggregateRoot>(), It.IsAny<UnitOfWork>()))
.Returns<AggregateRoot, UnitOfWork>((_, _) => Task.CompletedTask);

_unitOfWork = new UnitOfWork(EventRepository, EventListenerFactory, [new AllowAllPolicy()],
_unitOfWork = UnitOfWork.Create(EventRepository, EventListenerFactory, [new AllowAllPolicy()],
new UserIdProvider(_ => Guid.Empty, null!), validator.Object);
}

Expand Down
4 changes: 2 additions & 2 deletions src/Fluss.Testing/EventRepositoryTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public async Task ReturnsPublishedEvents()
public async Task ReturnsMultiplePublishedEvents()
{
var envelopes = GetMockEnvelopes(0, 1).ToList();
await Repository.Publish(envelopes.Take(1));
await Repository.Publish(envelopes.Skip(1));
await Repository.Publish(envelopes.Take(1).ToList());
await Repository.Publish(envelopes.Skip(1).ToList());

var gottenEnvelopes = await Repository.GetEvents(-1, 1).ToFlatEventList();

Expand Down
2 changes: 1 addition & 1 deletion src/Fluss.Testing/EventTestBed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
At = DateTimeOffset.Now,
By = null,
Event = @event
})).AsTask().Wait();
}).ToList()).AsTask().Wait();

Check warning on line 25 in src/Fluss.Testing/EventTestBed.cs

View check run for this annotation

Codecov / codecov/patch

src/Fluss.Testing/EventTestBed.cs#L25

Added line #L25 was not covered by tests

return this;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Fluss.Testing/ReadModelTestBed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
var eventSourced = EventListenerFactory
.UpdateTo(new TReadModel(), EventRepository.GetLatestVersion().AsTask().Result).AsTask().Result;

Assert.Equal(readModel with { Tag = eventSourced.Tag }, eventSourced);
Assert.Equal(readModel, eventSourced);

Check warning on line 14 in src/Fluss.Testing/ReadModelTestBed.cs

View check run for this annotation

Codecov / codecov/patch

src/Fluss.Testing/ReadModelTestBed.cs#L14

Added line #L14 was not covered by tests

AssertReadModelDoesNotReactToCanary(eventSourced);

Expand All @@ -24,7 +24,7 @@
var eventSourced = EventListenerFactory
.UpdateTo(new TReadModel { Id = readModel.Id }, EventRepository.GetLatestVersion().AsTask().Result).AsTask().Result;

Assert.Equal(readModel with { Tag = eventSourced.Tag }, eventSourced);
Assert.Equal(readModel, eventSourced);

Check warning on line 27 in src/Fluss.Testing/ReadModelTestBed.cs

View check run for this annotation

Codecov / codecov/patch

src/Fluss.Testing/ReadModelTestBed.cs#L27

Added line #L27 was not covered by tests

AssertReadModelDoesNotReactToCanary(eventSourced);

Expand Down
2 changes: 1 addition & 1 deletion src/Fluss.UnitTest/Core/Authentication/AuthContextTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class AuthContextTest
public AuthContextTest()
{
_unitOfWork = new Mock<IUnitOfWork>();
_authContext = new AuthContext(_unitOfWork.Object, Guid.NewGuid());
_authContext = AuthContext.Get(_unitOfWork.Object, Guid.NewGuid());
}

[Fact]
Expand Down
14 changes: 7 additions & 7 deletions src/Fluss.UnitTest/Core/Events/InMemoryListenerCacheTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public async Task PassesUpdatesToNext()
public async Task ReturnsCachedEventListener()
{
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 100))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 }));

_ = await _listenerCache.UpdateTo(new TestEventListener(), 100);
_ = await _listenerCache.UpdateTo(new TestEventListener(), 100);
Expand All @@ -58,7 +58,7 @@ public async Task ReturnsCachedEventListener()
public async Task ReturnsCachedKeyedEventListener()
{
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<KeyedTestEventListener>(), 100))
.Returns(ValueTask.FromResult(new KeyedTestEventListener { Id = 1, Tag = new EventListenerVersionTag(100) }));
.Returns(ValueTask.FromResult(new KeyedTestEventListener { Id = 1, LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 }));

await _listenerCache.UpdateTo(new KeyedTestEventListener { Id = 1 }, 100);
await _listenerCache.UpdateTo(new KeyedTestEventListener { Id = 1 }, 100);
Expand All @@ -76,9 +76,9 @@ public async Task ReturnsCachedKeyedEventListener()
public async Task ForwardsIfCacheContainsNewer()
{
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 100))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 }));
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 90))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(90) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 90, LastSeenEvent = 90, LastSeenTransientEvent = -1 }));

await _listenerCache.UpdateTo(new TestEventListener(), 100);
await _listenerCache.UpdateTo(new TestEventListener(), 90);
Expand Down Expand Up @@ -106,9 +106,9 @@ public async Task UpdatesStoreWithNewerVersion()
var otherTestEventList = new TestEventListener();

_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 100))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 }));
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 110))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(110) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 110, LastSeenEvent = 110, LastSeenTransientEvent = -1 }));

await _listenerCache.UpdateTo(testEventListener, 100);

Expand All @@ -128,7 +128,7 @@ public async Task UpdatesStoreWithNewerVersion()
public async Task ForwardsAgainIfCleaned()
{
_baseEventListenerFactory.Setup(f => f.UpdateTo(It.IsAny<TestEventListener>(), 100))
.Returns(ValueTask.FromResult(new TestEventListener { Tag = new EventListenerVersionTag(100) }));
.Returns(ValueTask.FromResult(new TestEventListener { LastAcceptedEvent = 100, LastSeenEvent = 100, LastSeenTransientEvent = -1 }));

await _listenerCache.UpdateTo(new TestEventListener(), 100);
_listenerCache.Clean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public async Task AppliesTransientEvents()

var updatedReadModel = await _transientFactory.UpdateTo(readModel, -1);

Assert.Equal(0, updatedReadModel.Tag.LastSeenTransient);
Assert.Equal(0, updatedReadModel.LastSeenTransientEvent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ public async Task DoesNotReuseCacheWhenNewEventIsAdded()
{
_policies.Add(new AllowReadAfterEventPolicy());

var unitOfWork = GetUnitOfWork();
await Assert.ThrowsAsync<UnauthorizedAccessException>(async () =>
{
await _unitOfWork.GetReadModel<TestReadModel, int>(1);
await unitOfWork.GetReadModel<TestReadModel, int>(1);
});

await _unitOfWork.Publish(new AllowEvent());
await _unitOfWork.GetReadModel<TestReadModel, int>(1);
await unitOfWork.Publish(new AllowEvent());
await unitOfWork.GetReadModel<TestReadModel, int>(1);
}

private record AllowEvent : Event;
Expand Down
Loading