Skip to content

Commit

Permalink
Health check circuit breaker
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Jan 1, 2025
1 parent 841bdc9 commit 87835fb
Show file tree
Hide file tree
Showing 30 changed files with 435 additions and 297 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
namespace Sample.CircuitBreaker.HealthCheck.Consumers;

public class AddConsumer : IConsumer<Add>
{
private readonly ILogger<AddConsumer> _logger;

public AddConsumer(ILogger<AddConsumer> logger)
{
_logger = logger;
}

public class AddConsumer(ILogger<AddConsumer> logger) : IConsumer<Add>
{
public Task OnHandle(Add message, CancellationToken cancellationToken)
{
_logger.LogInformation("{A} + {B} = {C}", message.a, message.b, message.a + message.b);
logger.LogInformation("{A} + {B} = {C}", message.A, message.B, message.A + message.B);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
namespace Sample.CircuitBreaker.HealthCheck.Consumers;

public class SubtractConsumer : IConsumer<Subtract>
{
private readonly ILogger<SubtractConsumer> _logger;

public SubtractConsumer(ILogger<SubtractConsumer> logger)
{
_logger = logger;
}

public class SubtractConsumer(ILogger<SubtractConsumer> logger) : IConsumer<Subtract>
{
public Task OnHandle(Subtract message, CancellationToken cancellationToken)
{
_logger.LogInformation("{A} - {B} = {C}", message.a, message.b, message.a - message.b);
logger.LogInformation("{A} - {B} = {C}", message.A, message.B, message.A - message.B);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
global using SlimMessageBus;
global using SlimMessageBus.Host;
global using SlimMessageBus.Host.RabbitMQ;
global using SlimMessageBus.Host.Serialization.SystemTextJson;
global using SlimMessageBus.Host.Serialization.SystemTextJson;
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;

using Microsoft.Extensions.Logging;

public class AddRandomHealthCheck : RandomHealthCheck
public class AddRandomHealthCheck(ILogger<AddRandomHealthCheck> logger) : RandomHealthCheck(logger)
{
public AddRandomHealthCheck(ILogger<AddRandomHealthCheck> logger)
: base(logger)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,12 @@

using Microsoft.Extensions.Diagnostics.HealthChecks;

public abstract class RandomHealthCheck : IHealthCheck
{
private readonly ILogger _logger;

protected RandomHealthCheck(ILogger logger)
{
_logger = logger;
}

public abstract class RandomHealthCheck(ILogger logger) : IHealthCheck
{
public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
var value = (HealthStatus)Random.Shared.Next(3);
_logger.LogInformation("{HealthCheck} evaluated as {HealthStatus}", this.GetType(), value);
logger.LogInformation("{HealthCheck} evaluated as {HealthStatus}", GetType(), value);
return Task.FromResult(new HealthCheckResult(value, value.ToString()));
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;

using Microsoft.Extensions.Logging;

public class SubtractRandomHealthCheck : RandomHealthCheck
public class SubtractRandomHealthCheck(ILogger<SubtractRandomHealthCheck> logger) : RandomHealthCheck(logger)
{
public SubtractRandomHealthCheck(ILogger<SubtractRandomHealthCheck> logger)
: base(logger)
{
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,20 @@
namespace Sample.CircuitBreaker.HealthCheck;
public class IntermittentMessagePublisher : BackgroundService

public class IntermittentMessagePublisher(ILogger<IntermittentMessagePublisher> logger, IMessageBus messageBus)
: BackgroundService
{
private readonly ILogger _logger;
private readonly IMessageBus _messageBus;

public IntermittentMessagePublisher(ILogger<IntermittentMessagePublisher> logger, IMessageBus messageBus)
{
_logger = logger;
_messageBus = messageBus;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var a = Random.Shared.Next(10);
var b = Random.Shared.Next(10);

//_logger.LogInformation("Emitting {A} +- {B} = ?", a, b);
logger.LogInformation("Emitting {A} +- {B} = ?", a, b);

await Task.WhenAll(
_messageBus.Publish(new Add(a, b)),
_messageBus.Publish(new Subtract(a, b)),
messageBus.Publish(new Add(a, b), cancellationToken: stoppingToken),
messageBus.Publish(new Subtract(a, b), cancellationToken: stoppingToken),
Task.Delay(1000, stoppingToken));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
namespace Sample.CircuitBreaker.HealthCheck.Models;

public record Add(int a, int b);
public record Add(int A, int B);
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
namespace Sample.CircuitBreaker.HealthCheck.Models;

public record Subtract(int a, int b);
public record Subtract(int A, int B);
3 changes: 2 additions & 1 deletion src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
namespace Sample.CircuitBreaker.HealthCheck;

using Microsoft.Extensions.Diagnostics.HealthChecks;

using Sample.CircuitBreaker.HealthCheck.HealthChecks;

using SlimMessageBus.Host.CircuitBreaker.HealthCheck.Config;
using SlimMessageBus.Host.CircuitBreaker.HealthCheck;

public static class Program
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;

static internal class AbstractConsumerProperties
{
static readonly internal ProviderExtensionProperty<bool> IsPaused = new("CircuitBreaker_IsPaused");
static readonly internal ProviderExtensionProperty<List<IConsumerCircuitBreaker>?> Breakers = new("CircuitBreaker_Breakers");
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Config;
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;

using Microsoft.Extensions.DependencyInjection.Extensions;

Expand Down Expand Up @@ -32,16 +32,18 @@ public static T PauseOnDegradedHealthCheck<T>(this T builder, params string[] ta

private static void RegisterHealthServices(AbstractConsumerBuilder builder)
{
builder.ConsumerSettings.CircuitBreakers.TryAdd<HealthCheckCircuitBreaker>();
builder.PostConfigurationActions.Add(
services =>
{
services.TryAddSingleton<HealthCheckBackgroundService>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHealthCheckPublisher, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>()));
services.TryAdd(ServiceDescriptor.Singleton<IHealthCheckHostBreaker, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>()));
services.AddHostedService(sp => sp.GetRequiredService<HealthCheckBackgroundService>());

services.TryAddSingleton<HealthCheckCircuitBreaker>();
});
var circuitBreakers = builder.ConsumerSettings.GetOrCreate(ConsumerSettingsProperties.CircuitBreakerTypes, () => []);
circuitBreakers.TryAdd<HealthCheckCircuitBreaker>();

builder.PostConfigurationActions.Add(services =>
{
services.TryAddSingleton<HealthCheckBackgroundService>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHealthCheckPublisher, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>()));
services.TryAdd(ServiceDescriptor.Singleton<IHealthCheckHostBreaker, HealthCheckBackgroundService>(sp => sp.GetRequiredService<HealthCheckBackgroundService>()));
services.AddHostedService(sp => sp.GetRequiredService<HealthCheckBackgroundService>());

services.TryAddTransient<HealthCheckCircuitBreaker>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IAbstractConsumerInterceptor, HealthCheckCircuitBreakerAbstractConsumerInterceptor>());
});
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;

static internal class SettingsExtensions
static internal class ConsumerSettingsExtensions
{
private const string _key = nameof(HealthCheckCircuitBreaker);

public static T PauseOnDegraded<T>(this T consumerSettings, params string[] tags)
where T : AbstractConsumerSettings
{
Expand All @@ -15,7 +13,6 @@ public static T PauseOnDegraded<T>(this T consumerSettings, params string[] tags
dict[tag] = HealthStatus.Degraded;
}
}

return consumerSettings;
}

Expand All @@ -30,18 +27,9 @@ public static T PauseOnUnhealthy<T>(this T consumerSettings, params string[] tag
dict[tag] = HealthStatus.Unhealthy;
}
}

return consumerSettings;
}

static internal IDictionary<string, HealthStatus> HealthBreakerTags(this AbstractConsumerSettings consumerSettings)
{
if (!consumerSettings.Properties.TryGetValue(_key, out var rawValue) || rawValue is not IDictionary<string, HealthStatus> value)
{
value = new Dictionary<string, HealthStatus>();
consumerSettings.Properties[_key] = value;
}

return value;
}
static internal IDictionary<string, HealthStatus> HealthBreakerTags(this AbstractConsumerSettings consumerSettings)
=> consumerSettings.GetOrCreate(ConsumerSettingsProperties.HealthStatusTags, () => []);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;

static internal class ConsumerSettingsProperties
{
/// <summary>
/// <see cref="IConsumerCircuitBreaker"/> to be used with the consumer.
/// </summary>
static readonly internal ProviderExtensionProperty<TypeCollection<IConsumerCircuitBreaker>> CircuitBreakerTypes = new("CircuitBreakerTypes");

static readonly internal ProviderExtensionProperty<Dictionary<string, HealthStatus>> HealthStatusTags = new("CircuitBreaker_HealthStatusTags");
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ public async Task PublishAsync(HealthReport report, CancellationToken cancellati
}
}

public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task StartAsync(CancellationToken cancellationToken)
=> Task.CompletedTask;

public Task StopAsync(CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;

namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;

internal sealed class HealthCheckCircuitBreaker : IConsumerCircuitBreaker
{
private readonly IEnumerable<AbstractConsumerSettings> _settings;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;

internal sealed class HealthCheckCircuitBreakerAbstractConsumerInterceptor : IAbstractConsumerInterceptor
{
public int Order => 100;

public async Task<bool> CanStart(AbstractConsumer consumer)
{
var breakerTypes = consumer.Settings.SelectMany(x => x.GetOrDefault(ConsumerSettingsProperties.CircuitBreakerTypes, [])).ToHashSet();
if (breakerTypes.Count == 0)
{
// no breakers, allow to pass
return true;
}

var breakers = consumer.GetOrCreate(AbstractConsumerProperties.Breakers, () => [])!;

async Task BreakerChanged(Circuit state)
{
if (!consumer.IsStarted)
{
return;
}

var isPaused = consumer.IsPaused();
var shouldPause = state == Circuit.Closed || breakers.Exists(x => x.State == Circuit.Closed);
if (shouldPause != isPaused)
{
var settings = consumer.Settings.Count > 0 ? consumer.Settings[0] : null;
var path = settings?.Path ?? "[unknown path]";
var bus = settings?.MessageBusSettings?.Name ?? "default";
if (shouldPause)
{
consumer.Logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus);
await consumer.DoStop().ConfigureAwait(false);
}
else
{
consumer.Logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus);
await consumer.DoStart().ConfigureAwait(false);
}
consumer.SetIsPaused(shouldPause);
}
}

var sp = consumer.Settings.Select(x => x.MessageBusSettings.ServiceProvider).First(x => x != null);
foreach (var breakerType in breakerTypes)
{
var breaker = (IConsumerCircuitBreaker)ActivatorUtilities.CreateInstance(sp, breakerType, consumer.Settings);
breakers.Add(breaker);

await breaker.Subscribe(BreakerChanged);
}

var isPaused = breakers.Exists(x => x.State == Circuit.Closed);
consumer.SetIsPaused(isPaused);
return !isPaused;
}

public async Task<bool> CanStop(AbstractConsumer consumer)
{
var breakers = consumer.GetOrDefault(AbstractConsumerProperties.Breakers, null);
if (breakers == null || breakers.Count == 0)
{
// no breakers, allow to pass
return true;
}

foreach (var breaker in breakers)
{
breaker.Unsubscribe();

if (breaker is IAsyncDisposable asyncDisposable)
{
await asyncDisposable.DisposeAsync();
}
else if (breaker is IDisposable disposable)
{
disposable.Dispose();
}
}
breakers.Clear();

return !consumer.IsPaused();
}
}

internal static class AbstractConsumerExtensions
{
public static bool IsPaused(this AbstractConsumer consumer) => consumer.GetOrDefault(AbstractConsumerProperties.IsPaused, false);
public static void SetIsPaused(this AbstractConsumer consumer, bool isPaused) => AbstractConsumerProperties.IsPaused.Set(consumer, isPaused);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus;

namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;

/// <summary>
/// Circuit breaker to toggle consumer status on an external event.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@
</ItemGroup>

<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>DynamicProxyGenAssembly2</_Parameter1>
</AssemblyAttribute>
<InternalsVisibleTo Include="SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test"/>
<InternalsVisibleTo Include="DynamicProxyGenAssembly2"/>
</ItemGroup>

</Project>
Loading

0 comments on commit 87835fb

Please sign in to comment.