diff --git a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
index e20bf6258..03744d531 100644
--- a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
+++ b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
@@ -136,19 +136,19 @@ IConsumerConfigurationBuilder WithWorkersCount(
///
/// Sets the strategy to choose a worker when a message arrives
///
- /// A class that implements the interface
+ /// A class that implements the interface
/// A factory to create the instance
///
IConsumerConfigurationBuilder WithWorkDistributionStrategy(Factory factory)
- where T : class, IDistributionStrategy;
+ where T : class, IWorkerDistributionStrategy;
///
/// Sets the strategy to choose a worker when a message arrives
///
- /// A class that implements the interface
+ /// A class that implements the interface
///
IConsumerConfigurationBuilder WithWorkDistributionStrategy()
- where T : class, IDistributionStrategy;
+ where T : class, IWorkerDistributionStrategy;
///
/// Configures the consumer for manual message completion.
diff --git a/src/KafkaFlow.Abstractions/IDistributionStrategy.cs b/src/KafkaFlow.Abstractions/IDistributionStrategy.cs
deleted file mode 100644
index 591ade02d..000000000
--- a/src/KafkaFlow.Abstractions/IDistributionStrategy.cs
+++ /dev/null
@@ -1,26 +0,0 @@
-namespace KafkaFlow
-{
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Tasks;
-
- ///
- /// An interface used to create a distribution strategy
- ///
- public interface IDistributionStrategy
- {
- ///
- /// Initializes the distribution strategy, this method is called when a consumer is started
- ///
- /// List of workers to be initialized
- void Init(IReadOnlyList workers);
-
- ///
- /// Gets an available worker to process the message
- ///
- /// Message partition key
- /// A that is cancelled when the consumers stops
- ///
- Task GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken);
- }
-}
diff --git a/src/KafkaFlow.Abstractions/IWorkerDistributionStrategy.cs b/src/KafkaFlow.Abstractions/IWorkerDistributionStrategy.cs
new file mode 100644
index 000000000..bd5a2fee6
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/IWorkerDistributionStrategy.cs
@@ -0,0 +1,23 @@
+namespace KafkaFlow;
+
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+///
+/// An interface used to create a distribution strategy
+///
+public interface IWorkerDistributionStrategy
+{
+ ///
+ /// Initializes the distribution strategy, this method is called when a consumer is started
+ ///
+ /// List of workers to be initialized
+ void Init(IReadOnlyList workers);
+
+ ///
+ /// Retrieves an available worker based on the provided distribution strategy context.
+ ///
+ /// The distribution strategy context containing message and consumer details.
+ /// The selected instance.
+ ValueTask GetWorkerAsync(WorkerDistributionStrategy context);
+}
diff --git a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj
index 50e3dd839..283ef9c6b 100644
--- a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj
+++ b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj
@@ -7,4 +7,9 @@
Contains all KafkaFlow extendable interfaces
+
+
+
+
+
diff --git a/src/KafkaFlow.Abstractions/WorkerDistributionStrategy.cs b/src/KafkaFlow.Abstractions/WorkerDistributionStrategy.cs
new file mode 100644
index 000000000..3cf2c1449
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/WorkerDistributionStrategy.cs
@@ -0,0 +1,57 @@
+namespace KafkaFlow;
+
+using System;
+using System.Threading;
+
+///
+/// Represents a strategy context for distributing workers based on specific message and consumer details.
+///
+public ref struct WorkerDistributionStrategy
+{
+ ///
+ /// Initializes a new instance of the struct.
+ ///
+ /// Name of the consumer.
+ /// Topic associated with the message.
+ /// Partition of the topic.
+ /// Raw key of the message.
+ /// A cancellation token that is cancelled when the consumer has stopped
+ public WorkerDistributionStrategy(
+ string consumerName,
+ string topic,
+ int partition,
+ ReadOnlyMemory? rawMessageKey,
+ CancellationToken consumerStoppedCancellationToken)
+ {
+ this.ConsumerName = consumerName;
+ this.Topic = topic;
+ this.Partition = partition;
+ this.RawMessageKey = rawMessageKey;
+ this.ConsumerStoppedCancellationToken = consumerStoppedCancellationToken;
+ }
+
+ ///
+ /// Gets the name of the consumer.
+ ///
+ public string ConsumerName { get; }
+
+ ///
+ /// Gets the topic associated with the message.
+ ///
+ public string Topic { get; }
+
+ ///
+ /// Gets the partition number of the topic.
+ ///
+ public int Partition { get; }
+
+ ///
+ /// Gets the raw key of the message.
+ ///
+ public ReadOnlyMemory? RawMessageKey { get; }
+
+ ///
+ /// Gets the cancellation token that is cancelled when the consumer has stopped
+ ///
+ public CancellationToken ConsumerStoppedCancellationToken { get; }
+}
diff --git a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
index a74de9d43..01eca3090 100644
--- a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
+++ b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
@@ -20,7 +20,7 @@ public ConsumerConfiguration(
TimeSpan workersCountEvaluationInterval,
int bufferSize,
TimeSpan workerStopTimeout,
- Factory distributionStrategyFactory,
+ Factory distributionStrategyFactory,
IReadOnlyList middlewaresConfigurations,
bool autoMessageCompletion,
bool noStoreOffsets,
@@ -69,7 +69,7 @@ public ConsumerConfiguration(
"The value must be greater than 0");
}
- public Factory DistributionStrategyFactory { get; }
+ public Factory DistributionStrategyFactory { get; }
public IReadOnlyList MiddlewaresConfigurations { get; }
diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
index e1c027151..8bde7f3b9 100644
--- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
+++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
@@ -36,7 +36,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild
private ConsumerInitialState initialState = ConsumerInitialState.Running;
private int statisticsInterval;
- private Factory distributionStrategyFactory = _ => new BytesSumDistributionStrategy();
+ private Factory distributionStrategyFactory = _ => new BytesSumDistributionStrategy();
private TimeSpan autoCommitInterval = TimeSpan.FromSeconds(5);
private ConsumerCustomFactory customFactory = (consumer, _) => consumer;
@@ -158,14 +158,14 @@ public IConsumerConfigurationBuilder WithWorkerStopTimeout(TimeSpan timeout)
}
public IConsumerConfigurationBuilder WithWorkDistributionStrategy(Factory factory)
- where T : class, IDistributionStrategy
+ where T : class, IWorkerDistributionStrategy
{
this.distributionStrategyFactory = factory;
return this;
}
public IConsumerConfigurationBuilder WithWorkDistributionStrategy()
- where T : class, IDistributionStrategy
+ where T : class, IWorkerDistributionStrategy
{
this.DependencyConfigurator.AddTransient();
this.distributionStrategyFactory = resolver => resolver.Resolve();
diff --git a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
index 4099867b9..21f5b6896 100644
--- a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
+++ b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
@@ -13,7 +13,7 @@ public interface IConsumerConfiguration
///
/// Gets the consumer worker distribution strategy
///
- Factory DistributionStrategyFactory { get; }
+ Factory DistributionStrategyFactory { get; }
///
/// Gets the consumer middlewares configurations
diff --git a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
index 5774d2c9e..aeeb8e0f5 100644
--- a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
+++ b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
@@ -14,7 +14,7 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool
private readonly IDependencyResolver consumerDependencyResolver;
private readonly IMiddlewareExecutor middlewareExecutor;
private readonly ILogHandler logHandler;
- private readonly Factory distributionStrategyFactory;
+ private readonly Factory distributionStrategyFactory;
private readonly IOffsetCommitter offsetCommitter;
private readonly Event workerPoolStoppedSubject;
@@ -22,7 +22,7 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool
private TaskCompletionSource