Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
simaoribeiro committed Nov 10, 2023
1 parent 6ba75d8 commit acfdf81
Showing 12 changed files with 103 additions and 36 deletions.
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/ActivityOperationType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow
{
/// <summary>Activity operation names enum values</summary>
public enum ActivityOperationType
{
/// <summary>PUBLISH</summary>
Publish,

/// <summary>PROCESS</summary>
Process,
}
}
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow
{
using System.Diagnostics;
using System.Reflection;

public static class ActivitySourceAccessor

Check warning on line 6 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'ActivitySourceAccessor'

Check warning on line 6 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

GitHub Actions / Test deployment

{
public const string ActivityString = "otel_activity";

Check warning on line 8 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'ActivitySourceAccessor.ActivityString'

Check failure on line 8 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs#L8

Change this constant to a 'static' read-only property.
public static readonly ActivitySource ActivitySource = new("KafkaFlow.OpenTelemetry", Version);

Check warning on line 9 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'ActivitySourceAccessor.ActivitySource'

Check warning on line 9 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs#L9

Move this field's initializer into a static constructor.
internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
}
}
4 changes: 4 additions & 0 deletions src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj
Original file line number Diff line number Diff line change
@@ -7,4 +7,8 @@
<Description>Contains all KafkaFlow extendable interfaces</Description>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -5,21 +5,15 @@ namespace KafkaFlow.OpenTelemetry
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection;
using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions;

internal static class ActivitySourceAccessor
internal static class ActivityAccessor
{
internal const string ActivityString = "otel_activity";
internal const string ExceptionEventKey = "exception";
internal const string MessagingSystemId = "kafka";
internal const string AttributeMessagingOperation = "messaging.operation";
internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key";
internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset";
internal static readonly AssemblyName AssemblyName = typeof(ActivitySourceAccessor).Assembly.GetName();
internal static readonly string ActivitySourceName = AssemblyName.Name;
internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version);

public static void SetGenericTags(Activity activity)
{
26 changes: 12 additions & 14 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
Original file line number Diff line number Diff line change
@@ -20,25 +20,23 @@ public static Task OnConsumeStarted(IMessageContext context)
{
try
{
var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context?.ConsumerContext.Topic} {ProcessString}" : ProcessString;

// Extract the PropagationContext of the upstream parent from the message headers.
var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, ExtractTraceContextIntoBasicProperties);
Baggage.Current = parentContext.Baggage;

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext);
var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity;

if (parentContext.ActivityContext.IsValid())
{
activity.SetParentId(parentContext.ActivityContext.TraceId, parentContext.ActivityContext.SpanId, parentContext.ActivityContext.TraceFlags);
}

foreach (var item in Baggage.Current)
{
activity?.AddBaggage(item.Key, item.Value);
}

context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);

ActivitySourceAccessor.SetGenericTags(activity);
ActivityAccessor.SetGenericTags(activity);

if (activity != null && activity.IsAllDataRequested)
{
@@ -57,7 +55,7 @@ public static Task OnConsumeCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Stop();
activity?.Dispose();
}

return Task.CompletedTask;
@@ -67,7 +65,7 @@ public static Task OnConsumeError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);
var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex);

activity?.AddEvent(exceptionEvent);

@@ -86,11 +84,11 @@ private static void SetConsumerTags(IMessageContext context, Activity activity)
{
var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]);

activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString);
activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Process);
activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic);
activity.SetTag(AttributeMessagingKafkaConsumerGroup, context.ConsumerContext.GroupId);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, messageKey);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, messageKey);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset);
activity.SetTag(AttributeMessagingKafkaSourcePartition, context.ConsumerContext.Partition);
}
}
22 changes: 7 additions & 15 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
Original file line number Diff line number Diff line change
@@ -10,7 +10,6 @@

internal static class OpenTelemetryProducerEventsHandler
{
private const string PublishString = "publish";
private const string AttributeMessagingDestinationName = "messaging.destination.name";
private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition";
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
@@ -19,12 +18,7 @@ public static Task OnProducerStarted(IMessageContext context)
{
try
{
var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context?.ProducerContext.Topic} {PublishString}" : PublishString;

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Producer);
var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity;

// Depending on Sampling (and whether a listener is registered or not), the
// activity above may not be created.
@@ -34,8 +28,6 @@ public static Task OnProducerStarted(IMessageContext context)

if (activity != null)
{
context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);

contextToInject = activity.Context;
}
else if (Activity.Current != null)
@@ -48,7 +40,7 @@ public static Task OnProducerStarted(IMessageContext context)
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties);

ActivitySourceAccessor.SetGenericTags(activity);
ActivityAccessor.SetGenericTags(activity);

if (activity != null && activity.IsAllDataRequested)
{
@@ -67,7 +59,7 @@ public static Task OnProducerCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Stop();
activity?.Dispose();
}

return Task.CompletedTask;
@@ -77,7 +69,7 @@ public static Task OnProducerError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);
var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex);

activity?.AddEvent(exceptionEvent);

@@ -97,11 +89,11 @@ private static void InjectTraceContextIntoBasicProperties(IMessageContext contex

private static void SetProducerTags(IMessageContext context, Activity activity)
{
activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, PublishString);
activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Publish);
activity.SetTag(AttributeMessagingDestinationName, context?.ProducerContext.Topic);
activity.SetTag(AttributeMessagingKafkaDestinationPartition, context?.ProducerContext.Partition);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset);
}
}
}
17 changes: 17 additions & 0 deletions src/KafkaFlow/ActivityFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Diagnostics;

namespace KafkaFlow
{
internal class ActivityFactory : IActivityFactory
{
public Activity Start(string topicName, ActivityOperationType activityOperationType, ActivityKind activityKind)
{
var activityName = !string.IsNullOrEmpty(topicName) ? $"{topicName} {activityOperationType}" : activityOperationType.ToString();

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
return ActivitySourceAccessor.ActivitySource.StartActivity(activityName, activityKind);
}
}
}
1 change: 1 addition & 0 deletions src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@ public KafkaConfiguration Build()
.AddSingleton<IConsumerAccessor>(new ConsumerAccessor())
.AddSingleton<IConsumerManagerFactory>(new ConsumerManagerFactory())
.AddSingleton<IClusterManagerAccessor, ClusterManagerAccessor>()
.AddSingleton<IActivityFactory, ActivityFactory>()
.AddSingleton(r =>
{
var logHandler = r.Resolve<ILogHandler>();
7 changes: 7 additions & 0 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Consumers
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
@@ -14,6 +15,7 @@ internal class ConsumerWorker : IConsumerWorker
private readonly IMiddlewareExecutor middlewareExecutor;
private readonly ILogHandler logHandler;
private readonly GlobalEvents globalEvents;
private readonly IActivityFactory activityFactory;

private readonly Channel<ConsumeResult<byte[], byte[]>> messagesBuffer;

@@ -37,6 +39,7 @@ public ConsumerWorker(
this.logHandler = logHandler;
this.messagesBuffer = Channel.CreateBounded<ConsumeResult<byte[], byte[]>>(consumer.Configuration.BufferSize);
this.globalEvents = this.dependencyResolver.Resolve<GlobalEvents>();
this.activityFactory = this.dependencyResolver.Resolve<IActivityFactory>();
}

public int Id { get; }
@@ -106,6 +109,8 @@ public void OnTaskCompleted(Action handler)

private async Task ProcessMessageAsync(ConsumeResult<byte[], byte[]> message, CancellationToken cancellationToken)
{
var activity = this.activityFactory.Start(message.Topic, ActivityOperationType.Process, ActivityKind.Consumer);

try
{
var context = new MessageContext(
@@ -119,6 +124,8 @@ private async Task ProcessMessageAsync(ConsumeResult<byte[], byte[]> message, Ca
this.Id),
null);

context.Items.Add(ActivitySourceAccessor.ActivityString, activity);

try
{
var scope = this.dependencyResolver.CreateScope();
10 changes: 10 additions & 0 deletions src/KafkaFlow/IActivityFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Diagnostics;

namespace KafkaFlow
{
internal interface IActivityFactory
{

public Activity Start(string topicName, ActivityOperationType activityOperationType, ActivityKind activityKind);
}
}
1 change: 1 addition & 0 deletions src/KafkaFlow/KafkaFlow.csproj
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.1.1" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.1" />
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
</ItemGroup>

19 changes: 19 additions & 0 deletions src/KafkaFlow/Producers/MessageProducer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Producers
{
using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
@@ -12,6 +13,7 @@ internal class MessageProducer : IMessageProducer, IDisposable
private readonly IProducerConfiguration configuration;
private readonly MiddlewareExecutor middlewareExecutor;
private readonly GlobalEvents globalEvents;
private readonly IActivityFactory activityFactory;

private readonly object producerCreationSync = new();

@@ -25,6 +27,7 @@ public MessageProducer(
this.configuration = configuration;
this.middlewareExecutor = new MiddlewareExecutor(configuration.MiddlewaresConfigurations);
this.globalEvents = this.dependencyResolver.Resolve<GlobalEvents>();
this.activityFactory = this.dependencyResolver.Resolve<IActivityFactory>();
}

public string ProducerName => this.configuration.Name;
@@ -40,8 +43,12 @@ public async Task<DeliveryResult<byte[], byte[]>> ProduceAsync(

using var scope = this.dependencyResolver.CreateScope();

var activity = this.activityFactory.Start(topic, ActivityOperationType.Publish, ActivityKind.Producer);

var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers);

this.AddActivityToMessageContext(messageContext, activity);

await this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext));

try
@@ -99,8 +106,12 @@ public void Produce(
{
var scope = this.dependencyResolver.CreateScope();

var activity = this.activityFactory.Start(topic, ActivityOperationType.Publish, ActivityKind.Producer);

var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers);

this.AddActivityToMessageContext(messageContext, activity);

this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext));

this.middlewareExecutor
@@ -354,5 +365,13 @@ private MessageContext CreateMessageContext(
null,
new ProducerContext(topic));
}

private void AddActivityToMessageContext(MessageContext messageContext, Activity activity)
{
if(activity != null)
{
messageContext.Items.Add(ActivitySourceAccessor.ActivityString, activity);
}
}
}
}

0 comments on commit acfdf81

Please sign in to comment.