Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/3.0 #463

Merged
merged 20 commits into from
Nov 27, 2023
Merged
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
e42417d
chore!: upgrade to .net 6
jose-sousa-8 Jul 29, 2023
682c842
fix!: update dashboard and client app routes to kafkaflow
jose-sousa-8 Jul 26, 2023
9c43c28
feat!: async support on message type and schema registry resolvers
jose-sousa-8 Sep 12, 2023
ce60755
feat: enable workers count calculation in runtime
filipeesch Jul 4, 2023
efd9f8b
refactor: core simplification and better thread synchronization
filipeesch Jul 11, 2023
619cbee
feat: creates worker context and worker events
filipeesch Jul 13, 2023
c15b0b2
feat: creates the ConsumerLagWorkerBalancer
filipeesch Sep 13, 2023
0eb5c6c
refactor: rename StoreOffset to Complete
filipeesch Sep 13, 2023
dea630c
refactor: remove unused namespaces
filipeesch Oct 2, 2023
569fb99
docs: add dynamic workers documentation
filipeesch Sep 19, 2023
8c01d8c
feat: add event notification feature
ailtonguitar Oct 3, 2023
fc8a0b3
refactor!: merge projects into core framework
jose-sousa-8 Sep 19, 2023
e51f120
feat: adapt open telemetry to release 3.0
simaoribeiro Oct 23, 2023
e7a2e37
feat: evolve worker distribution strategy
filipeesch Oct 10, 2023
e64e9fc
docs: update documentation with changes from v3
joelfoliveira Nov 22, 2023
4e46c35
docs: added migration guide from v2 to v3
joelfoliveira Nov 20, 2023
f3366d0
refactor: adopt common .NET conventions
JoaoRodriguesGithub Nov 21, 2023
f89ac3d
docs: add v2 documentation to website docs
joelfoliveira Nov 23, 2023
3bee56d
docs: fix versioned docs routing
Nov 27, 2023
c5417f9
docs: update v2 documentation
joelfoliveira Nov 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: creates worker context and worker events
  • Loading branch information
filipeesch authored and joelfoliveira committed Nov 27, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 619cbee1e1033dec60a1f398d488e8db4778c645
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace KafkaFlow
{
using System;
using KafkaFlow.Configuration;
using KafkaFlow.Observer;

/// <summary>
/// Represents the interface of a internal worker
@@ -17,5 +19,15 @@ public interface IWorker
/// </summary>
/// <param name="handler"><see cref="Action"/> to be executed</param>
void OnTaskCompleted(Action handler);

/// <summary>
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppingSubject, VoidObject> WorkerStopping { get; }

/// <summary>
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppedSubject, VoidObject> WorkerStopped { get; }
}
}
19 changes: 19 additions & 0 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace KafkaFlow
{
using KafkaFlow.Observer;

/// <summary>
/// Represents a subject specific to worker stopped events where observers can subscribe to receive notifications.
/// </summary>
public class WorkerStoppedSubject : Subject<WorkerStoppedSubject, VoidObject>
{
/// <summary>
/// Initializes a new instance of the <see cref="WorkerStoppedSubject"/> class.
/// </summary>
/// <param name="logHandler">The log handler object to be used</param>
public WorkerStoppedSubject(ILogHandler logHandler)
: base(logHandler)
{
}
}
}
19 changes: 19 additions & 0 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace KafkaFlow
{
using KafkaFlow.Observer;

/// <summary>
/// Represents a subject specific to worker stopping events where observers can subscribe to receive notifications.
/// </summary>
public class WorkerStoppingSubject : Subject<WorkerStoppingSubject, VoidObject>
{
/// <summary>
/// Initializes a new instance of the <see cref="WorkerStoppingSubject"/> class.
/// </summary>
/// <param name="logHandler">The log handler object to be used</param>
public WorkerStoppingSubject(ILogHandler logHandler)
: base(logHandler)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -65,6 +65,35 @@ public static IDependencyConfigurator AddSingleton<TService>(
InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(this IDependencyConfigurator configurator)
where TService : class
{
return configurator.Add<TService>(InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <param name="factory">A factory to create new instances of the service implementation</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(
this IDependencyConfigurator configurator,
Func<IDependencyResolver, TService> factory)
{
return configurator.Add(
typeof(TService),
factory,
InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a transient type mapping
/// </summary>
17 changes: 17 additions & 0 deletions src/KafkaFlow.Abstractions/Observer/ISubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace KafkaFlow.Observer
{
/// <summary>
/// Represents a subject in the observer design pattern that can be observed by observers.
/// </summary>
/// <typeparam name="TSubject">The type of the subject.</typeparam>
/// <typeparam name="TArg">An argument type that will be passed to the observers</typeparam>
public interface ISubject<TSubject, TArg>
where TSubject : Subject<TSubject, TArg>
{
/// <summary>
/// Subscribes an observer to the subject.
/// </summary>
/// <param name="observer">The observer to subscribe.</param>
void Subscribe(ISubjectObserver<TSubject, TArg> observer);
}
}
18 changes: 18 additions & 0 deletions src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace KafkaFlow.Observer
{
using System.Threading.Tasks;

/// <summary>
/// Represents an observer in the observer design pattern that can receive notifications from a subject.
/// </summary>
/// <typeparam name="TSubject">The type of the subject.</typeparam>
/// <typeparam name="TArg">An argument type that will be passed to the observers</typeparam>
public interface ISubjectObserver<in TSubject, in TArg>
{
/// <summary>
/// Called when a notification is received from the subject.
/// </summary>
/// <returns>A task representing the asynchronous notification handling.</returns>
Task OnNotification(TSubject subject, TArg arg);
}
}
53 changes: 53 additions & 0 deletions src/KafkaFlow.Abstractions/Observer/Subject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
namespace KafkaFlow.Observer
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>
/// A generic implementation that should be extended to help the use of the notification system.
/// </summary>
/// <typeparam name="TSubject">The type of the subject.</typeparam>
/// <typeparam name="TArg">An argument type that will be passed to the observers</typeparam>
public abstract class Subject<TSubject, TArg> : ISubject<TSubject, TArg>
where TSubject : Subject<TSubject, TArg>
{
private readonly ILogHandler logHandler;
private readonly List<ISubjectObserver<TSubject, TArg>> observers = new();

/// <summary>
/// Initializes a new instance of the <see cref="Subject{TSubject, TArg}"/> class.
/// </summary>
/// <param name="logHandler">The log handler object to be used</param>
protected Subject(ILogHandler logHandler)
{
this.logHandler = logHandler;
}

/// <summary>
/// Subscribes an observer to the subject, allowing it to receive notifications.
/// </summary>
/// <param name="observer">The observer to subscribe.</param>
public void Subscribe(ISubjectObserver<TSubject, TArg> observer) => this.observers.Add(observer);

/// <summary>
/// Notifies all subscribed observers asynchronously.
/// </summary>
/// <param name="arg">The parameter passed by the client.</param>
/// <returns>A task representing the asynchronous notification operation.</returns>
public async Task NotifyAsync(TArg arg)
{
foreach (var observer in this.observers)
{
try
{
await observer.OnNotification((TSubject)this, arg);
}
catch (Exception e)
{
this.logHandler.Error("Error notifying observer", e, new { Subject = this.GetType().Name });
}
}
}
}
}
17 changes: 17 additions & 0 deletions src/KafkaFlow.Abstractions/VoidObject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace KafkaFlow;

/// <summary>
/// A type that represents an empty object that should be ignored
/// </summary>
public class VoidObject
{
/// <summary>
/// Gets the unique instance value
/// </summary>
public static readonly VoidObject Value = new();

private VoidObject()
{
// Empty
}
}
4 changes: 3 additions & 1 deletion src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ namespace KafkaFlow.BatchConsume
using System;
using System.Collections.Generic;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;

/// <summary>
/// no needed
@@ -23,6 +24,7 @@ public static IConsumerMiddlewareConfigurationBuilder BatchConsume(
{
return builder.Add(
resolver => new BatchConsumeMiddleware(
resolver.Resolve<IWorkerLifetimeContext>(),
batchSize,
batchTimeout,
resolver.Resolve<ILogHandler>()),
@@ -38,7 +40,7 @@ public static IReadOnlyCollection<IMessageContext> GetMessagesBatch(this IMessag
{
if (context is BatchConsumeMessageContext ctx)
{
return (IReadOnlyCollection<IMessageContext>) ctx.Message.Value;
return (IReadOnlyCollection<IMessageContext>)ctx.Message.Value;
}

throw new InvalidOperationException($"This method can only be used on {nameof(BatchConsumeMessageContext)}");
78 changes: 53 additions & 25 deletions src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
@@ -5,19 +5,28 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable, IAsyncDisposable
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;
using KafkaFlow.Observer;

internal class BatchConsumeMiddleware
: IMessageMiddleware,
ISubjectObserver<WorkerStoppedSubject, VoidObject>,
IDisposable
{
private readonly SemaphoreSlim dispatchSemaphore = new(1, 1);

private readonly int batchSize;
private readonly TimeSpan batchTimeout;
private readonly ILogHandler logHandler;
private readonly IConsumerConfiguration consumerConfiguration;

private readonly List<IMessageContext> batch;
private CancellationTokenSource dispatchTokenSource;
private Task<Task> dispatchTask;

public BatchConsumeMiddleware(
IWorkerLifetimeContext workerContext,
int batchSize,
TimeSpan batchTimeout,
ILogHandler logHandler)
@@ -26,6 +35,9 @@ public BatchConsumeMiddleware(
this.batchTimeout = batchTimeout;
this.logHandler = logHandler;
this.batch = new(batchSize);
this.consumerConfiguration = workerContext.Consumer.Configuration;

workerContext.Worker.WorkerStopped.Subscribe(this);
}

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
@@ -35,49 +47,62 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
try
{
context.ConsumerContext.ShouldStoreOffset = false;
context.ConsumerContext.WorkerStopped.ThrowIfCancellationRequested();

this.batch.Add(context);

if (this.batch.Count == 1)
{
this.dispatchTokenSource = CancellationTokenSource.CreateLinkedTokenSource(context.ConsumerContext.WorkerStopped);

this.dispatchTokenSource.CancelAfter(this.batchTimeout);

this.dispatchTokenSource.Token.Register(
async _ =>
{
this.dispatchTokenSource.Dispose();
await this.DispatchAsync(context, next);
},
null);
}

if (this.batch.Count >= this.batchSize)
{
this.dispatchTokenSource.Cancel();
this.ScheduleExecution(context, next);
return;
}
}
finally
{
this.dispatchSemaphore.Release();
}

if (this.batch.Count >= this.batchSize)
{
await this.TriggerDispatchAndWaitAsync();
}
}

public async ValueTask DisposeAsync()
public async Task OnNotification(WorkerStoppedSubject subject, VoidObject arg) => await this.TriggerDispatchAndWaitAsync();

public void Dispose()
{
this.dispatchTokenSource.Dispose();
this.dispatchTask?.Dispose();
this.dispatchTokenSource?.Dispose();
this.dispatchSemaphore.Dispose();
}

private async Task TriggerDispatchAndWaitAsync()
{
await this.dispatchSemaphore.WaitAsync();
this.dispatchSemaphore.Dispose();
this.dispatchTokenSource?.Cancel();
this.dispatchSemaphore.Release();

await (this.dispatchTask ?? Task.CompletedTask);
}

public void Dispose() => this.DisposeAsync().GetAwaiter().GetResult();
private void ScheduleExecution(IMessageContext context, MiddlewareDelegate next)
{
this.dispatchTokenSource = CancellationTokenSource.CreateLinkedTokenSource(context.ConsumerContext.WorkerStopped);

this.dispatchTask = Task
.Delay(this.batchTimeout, this.dispatchTokenSource.Token)
.ContinueWith(
_ => this.DispatchAsync(context, next),
CancellationToken.None);
}

private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate next)
{
await this.dispatchSemaphore.WaitAsync();

this.dispatchTokenSource.Dispose();
this.dispatchTokenSource = null;

var localBatch = this.batch.ToList();

try
@@ -113,9 +138,12 @@ private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate nex
this.dispatchSemaphore.Release();
}

foreach (var messageContext in localBatch)
if (this.consumerConfiguration.AutoStoreOffsets)
{
messageContext.ConsumerContext.StoreOffset();
foreach (var messageContext in localBatch)
{
messageContext.ConsumerContext.StoreOffset();
}
}
}
}
Loading