Skip to content

Commit

Permalink
feat: upgrade exception tags in otel (#476)
Browse files Browse the repository at this point in the history
Change how the exceptions are being recorded in otel
activities to be in accordance with the official documentation.

Make the KafkaFlow ActivitySource name public so the users of
manual instrumentation can easily refer to without having
to type it.
  • Loading branch information
simaoribeiro authored Nov 22, 2023
1 parent 21db5e3 commit 06fcddf
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 29 deletions.
5 changes: 3 additions & 2 deletions src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Middlewares;
using KafkaFlow.IntegrationTests.Core.Producers;
using KafkaFlow.OpenTelemetry;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -45,7 +46,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
MessageStorage.Clear();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow.OpenTelemetry")
.AddSource(KafkaFlowInstrumentation.ActivitySourceName)
.AddInMemoryExporter(this.exportedItems)
.Build();

Expand Down Expand Up @@ -78,7 +79,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
var baggageValue2 = "TestBaggageValue2";

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow.OpenTelemetry")
.AddSource(KafkaFlowInstrumentation.ActivitySourceName)
.AddSource(kafkaFlowTestString)
.AddInMemoryExporter(this.exportedItems)
.Build();
Expand Down
24 changes: 3 additions & 21 deletions src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,22 @@

namespace KafkaFlow.OpenTelemetry
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection;
using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions;

internal static class ActivitySourceAccessor
{
internal const string ActivityString = "otel_activity";
internal const string ExceptionEventKey = "exception";
internal const string MessagingSystemId = "kafka";
internal const string AttributeMessagingOperation = "messaging.operation";
internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key";
internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset";
internal static readonly AssemblyName AssemblyName = typeof(ActivitySourceAccessor).Assembly.GetName();
internal static readonly string ActivitySourceName = AssemblyName.Name;
internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version);

public static void SetGenericTags(Activity activity)
{
activity?.SetTag(Conventions.AttributeMessagingSystem, MessagingSystemId);
}
internal static readonly ActivitySource ActivitySource = new(KafkaFlowInstrumentation.ActivitySourceName, KafkaFlowInstrumentation.Version);

public static ActivityEvent CreateExceptionEvent(Exception exception)
internal static void SetGenericTags(Activity activity)
{
var activityTagCollection = new ActivityTagsCollection(
new[]
{
new KeyValuePair<string, object>(Conventions.AttributeExceptionMessage, exception.Message),
new KeyValuePair<string, object>(Conventions.AttributeExceptionStacktrace, exception.StackTrace),
});

return new ActivityEvent(ExceptionEventKey, DateTimeOffset.UtcNow, activityTagCollection);
activity?.SetTag(Conventions.AttributeMessagingSystem, MessagingSystemId);
}
}
}
19 changes: 19 additions & 0 deletions src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace KafkaFlow.OpenTelemetry

Check warning on line 1 in src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentation.cs

View workflow job for this annotation

GitHub Actions / build

'public' members should come before 'internal' members (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1202.md) [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj]
{
using System.Reflection;

/// <summary>
/// KafkaFlow OTEL instrumentation properties
/// </summary>
public static class KafkaFlowInstrumentation
{
internal static readonly AssemblyName AssemblyName = typeof(KafkaFlowInstrumentation).Assembly.GetName();
internal static readonly string Version = AssemblyName.Version.ToString();

/// <summary>
/// ActivitySource name to be used when adding
/// KafkaFlow as source to an OTEL listener
/// </summary>
public static readonly string ActivitySourceName = AssemblyName.Name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using global::OpenTelemetry;
using global::OpenTelemetry.Context.Propagation;
using global::OpenTelemetry.Trace;

internal static class OpenTelemetryConsumerEventsHandler
{
Expand Down Expand Up @@ -67,9 +68,8 @@ public static Task OnConsumeError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);

activity?.AddEvent(exceptionEvent);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.RecordException(ex);

activity?.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using global::OpenTelemetry;
using global::OpenTelemetry.Context.Propagation;
using global::OpenTelemetry.Trace;

internal static class OpenTelemetryProducerEventsHandler
{
Expand Down Expand Up @@ -77,9 +78,8 @@ public static Task OnProducerError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);

activity?.AddEvent(exceptionEvent);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.RecordException(ex);

activity?.Dispose();
}
Expand Down

0 comments on commit 06fcddf

Please sign in to comment.