Skip to content

Commit

Permalink
[Core] Incorporate CancellationToken and IConsumerContext to the Cons…
Browse files Browse the repository at this point in the history
…umer interfaces #246

Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Oct 6, 2024
1 parent 2239152 commit e20f496
Show file tree
Hide file tree
Showing 85 changed files with 1,223 additions and 638 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ Another service (or application layer) handles the message:
```cs
public class SomeMessageConsumer : IConsumer<SomeMessage>
{
public async Task OnHandle(SomeMessage message)
public async Task OnHandle(SomeMessage message, CancellationToken cancellationToken)
{
// handle the message
}
Expand All @@ -134,7 +134,7 @@ The receiving side handles the request and replies:
```cs
public class SomeRequestHandler : IRequestHandler<SomeRequest, SomeResponse>
{
public async Task<SomeResponse> OnHandle(SomeRequest request)
public async Task<SomeResponse> OnHandle(SomeRequest request, CancellationToken cancellationToken)
{
// handle the request message and return a response
return new SomeResponse { /* ... */ };
Expand Down Expand Up @@ -213,7 +213,7 @@ The domain event handler implements the `IConsumer<T>` interface:
// domain event handler
public class OrderSubmittedHandler : IConsumer<OrderSubmittedEvent>
{
public Task OnHandle(OrderSubmittedEvent e)
public Task OnHandle(OrderSubmittedEvent e, CancellationToken cancellationToken)
{
// ...
}
Expand Down
78 changes: 55 additions & 23 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
- [Consume the request message (the request handler)](#consume-the-request-message-the-request-handler)
- [Request without response](#request-without-response)
- [Static accessor](#static-accessor)
- [Dependency resolver](#dependency-resolver)
- [Dependency Resolver](#dependency-resolver)
- [Dependency auto-registration](#dependency-auto-registration)
- [ASP.Net Core](#aspnet-core)
- [Modularization of configuration](#modularization-of-configuration)
Expand Down Expand Up @@ -152,6 +152,9 @@ await bus.Publish(msg);

// OR delivered to the specified topic (or queue)
await bus.Publish(msg, "other-topic");

// pass cancellation token
await bus.Publish(msg, cancellationToken: ct);
```

> The transport plugins might introduce additional configuration options. Please check the relevant provider docs. For example, Azure Service Bus, Azure Event Hub and Kafka allow setting the partitioning key for a given message type.
Expand Down Expand Up @@ -180,7 +183,7 @@ mbb
})
```

Finally, it is possible to specify a headers modifier for the entire bus:
Finally, it is possible to specify a headers modifier for the entire bus (it will apply to all outgoing messages):

```cs
mbb
Expand All @@ -200,7 +203,7 @@ mbb.Consume<SomeMessage>(x => x
.WithConsumer<SomeConsumer>() // (1)
// if you do not want to implement the IConsumer<T> interface
// .WithConsumer<AddCommandConsumer>(nameof(AddCommandConsumer.MyHandleMethod)) // (2) uses reflection
// .WithConsumer<AddCommandConsumer>((consumer, message) => consumer.MyHandleMethod(message)) // (3) uses a delegate
// .WithConsumer<AddCommandConsumer>((consumer, message, consumerContext, cancellationToken) => consumer.MyHandleMethod(message)) // (3) uses a delegate
.Instances(1)
//.KafkaGroup("some-consumer-group")) // Kafka provider specific extensions
```
Expand All @@ -210,7 +213,7 @@ When the consumer implements the `IConsumer<SomeMessage>` interface:
```cs
public class SomeConsumer : IConsumer<SomeMessage>
{
public async Task OnHandle(SomeMessage msg)
public async Task OnHandle(SomeMessage msg, CancellationToken cancellationToken)
{
// handle the msg
}
Expand All @@ -221,7 +224,7 @@ The `SomeConsumer` needs to be registered in the DI container. The SMB runtime w

> When `.WithConsumer<TConsumer>()` is not declared, then a default consumer of type `IConsumer<TMessage>` will be assumed (since v2.0.0).

Alternatively, if you do not want to implement the `IConsumer<SomeMessage>`, then you can provide the method name (2) or a delegate that calls the consumer method (3).
Alternatively, if you do not want to implement the `IConsumer<SomeMessage>`, then you can provide the method name _(2)_ or a delegate that calls the consumer method _(3)_.
`IConsumerContext` and/or `CancellationToken` can optionally be included as parameters to be populated on invocation when taking this approach:

```cs
Expand Down Expand Up @@ -261,27 +264,49 @@ await consumerControl.Stop();

#### Consumer context (additional message information)

> Changed in version 1.15.0

The consumer can access the [`IConsumerContext`](../src/SlimMessageBus/IConsumerContext.cs) object which:

- allows to access additional message information - topic (or queue) name the message arrived on, headers, cancellation token,
- enable the transport provider to pass additional message information specific to the chosen transport.

Examples of such transport specific information are the Azure Service Bus UserProperties, or Kafka Topic-Partition offset.

To use it the consumer has to implement the [`IConsumerWithContext`](../src/SlimMessageBus/IConsumerWithContext.cs) interface:
The recommended (and newer) approach is to define a consumer type that implements `IConsumer<IConsumerContext<TMessage>>`.
For example:

```cs
// The consumer wraps the message type in IConsumerContext<T>
public class PingConsumer : IConsumer<IConsumerContext<PingMessage>>
{
public Task OnHandle(IConsumerContext<PingMessage> context, CancellationToken cancellationToken)
{
var message = context.Message; // the message (here PingMessage)
var topic = context.Path; // the topic or queue name
var headers = context.Headers; // message headers
// Kafka transport specific extension (requires SlimMessageBus.Host.Kafka package):
var transportMessage = context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
}
}

// To declare the consumer type use the .WithConsumerOfContext<TConsumer>() method
mbb.Consume<SomeMessage>(x => x
.Topic("some-topic")
.WithConsumerOfContext<PingConsumer>()
);
```

The other approach is for the consumer to implement the [`IConsumerWithContext`](../src/SlimMessageBus/IConsumerWithContext.cs) interface:

```cs
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message)
public Task OnHandle(PingMessage message, CancellationToken cancellationToken)
{
var topic = Context.Path; // the topic or queue name
var headers = Context.Headers; // message headers
var cancellationToken = Context.CancellationToken;
// Kafka transport specific extension (requires SlimMessageBus.Host.Kafka package):
var transportMessage = Context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
Expand Down Expand Up @@ -457,7 +482,7 @@ The request handling micro-service needs to have a handler that implements `IReq
```cs
public class SomeRequestHandler : IRequestHandler<SomeRequest, SomeResponse>
{
public async Task<SomeResponse> OnHandle(SomeRequest request)
public async Task<SomeResponse> OnHandle(SomeRequest request, CancellationToken cancellationToken)
{
// handle the request
return new SomeResponse();
Expand Down Expand Up @@ -497,7 +522,7 @@ public class SomeRequest : IRequest
// The handler has to use IRequestHandler<T> interface
public class SomeRequestHandler : IRequestHandler<SomeRequest>
{
public async Task OnHandle(SomeRequest request)
public async Task OnHandle(SomeRequest request, CancellationToken cancellationToken)
{
// no response returned
}
Expand Down Expand Up @@ -530,7 +555,7 @@ This allows to easily look up the `IMessageBus` instance in the domain model lay
See [`DomainEvents`](../src/Samples/Sample.DomainEvents.WebApi/Startup.cs#L79) sample it works per-request scope and how to use it for domain events.

## Dependency resolver
## Dependency Resolver

SMB uses the [`Microsoft.Extensions.DependencyInjection`](https://www.nuget.org/packages/Microsoft.Extensions.DependencyInjection) container to obtain and manage instances of the declared consumers (class instances that implement `IConsumer<>` or `IRequestHandler<>`) or interceptors.

Expand Down Expand Up @@ -623,9 +648,12 @@ services.AddSlimMessageBus(mbb =>

> Since version 2.0.0

The SMB bus configuration can be split into modules. This allows to keep the bus configuration alongside the relevant application module (or layer).
The SMB bus configuration can be split into modules. This allows to keep the bus configuration alongside the relevant application module (or layer):

The `services.AddSlimMessageBus(mbb => { })` can be called multiple times. The end result will be a sum of the configurations (the supplied `MessageBusBuilder` instance will be the same). Consider the example:
- The `services.AddSlimMessageBus(mbb => { })` can be called multiple times.
- The end result will be a sum of the configurations (the supplied `MessageBusBuilder` instance will be the same).

Consider the example:

```cs
// Module 1
Expand Down Expand Up @@ -653,7 +681,8 @@ services.AddSlimMessageBus(mbb =>
});
```

Before version 2.0.0 there was support for modularity using `IMessageBusConfigurator` implementation. However, the interface was deprecated in favor of the `AddSlimMessageBus()` extension method that was made additive.
Before version 2.0.0 there was support for modularity using `IMessageBusConfigurator` implementation.
However, the interface was deprecated in favor of the `AddSlimMessageBus()` extension method that was made additive.

### Auto registration of consumers and interceptors

Expand Down Expand Up @@ -716,12 +745,12 @@ mbb.Produce<OrderEvent>(x => x.DefaultTopic("events"));
public class CustomerEventConsumer : IConsumer<CustomerEvent>
{
public Task OnHandle(CustomerEvent e) { }
public Task OnHandle(CustomerEvent e, CancellationToken cancellationToken) { }
}

public class OrderEventConsumer : IConsumer<OrderEvent>
{
public Task OnHandle(OrderEvent e) { }
public Task OnHandle(OrderEvent e, CancellationToken cancellationToken) { }
}

// which consume from the same topic
Expand Down Expand Up @@ -796,12 +825,12 @@ Given the following consumers:
```cs
public class CustomerEventConsumer : IConsumer<CustomerEvent>
{
public Task OnHandle(CustomerEvent e) { }
public Task OnHandle(CustomerEvent e, CancellationToken cancellationToken) { }
}

public class CustomerCreatedEventConsumer : IConsumer<CustomerCreatedEvent>
{
public Task OnHandle(CustomerCreatedEvent e) { }
public Task OnHandle(CustomerCreatedEvent e, CancellationToken cancellationToken) { }
}
```

Expand Down Expand Up @@ -1081,8 +1110,11 @@ For example, Apache Kafka requires `mbb.KafkaGroup(string)` for consumers to dec
Providers:

- [Apache Kafka](provider_kafka.md)
- [Azure Service Bus](provider_azure_servicebus.md)
- [Azure Event Hubs](provider_azure_eventhubs.md)
- [Redis](provider_redis.md)
- [Azure Service Bus](provider_azure_servicebus.md)
- [Hybrid](provider_hybrid.md)
- [MQTT](provider_mqtt.md)
- [Memory](provider_memory.md)
- [Hybrid](provider_hybrid.md)
- [RabbitMQ](provider_rabbitmq.md)
- [Redis](provider_redis.md)
- [SQL](provider_sql.md)
Loading

0 comments on commit e20f496

Please sign in to comment.