Skip to content

Commit

Permalink
docs: sample otel (#481)
Browse files Browse the repository at this point in the history
* docs: sample otel

* chore: upgrade sample to v3
  • Loading branch information
Gui Ferreira authored Dec 6, 2023
1 parent 453996e commit 0697853
Show file tree
Hide file tree
Showing 13 changed files with 13,388 additions and 422 deletions.
7 changes: 7 additions & 0 deletions KafkaFlow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
.editorconfig = .editorconfig
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.OpenTelemetry", "samples\KafkaFlow.Sample.OpenTelemetry\KafkaFlow.Sample.OpenTelemetry.csproj", "{E9E8B374-4165-45F2-8DF5-F141E141AC1D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -216,6 +218,10 @@ Global
{80080C1D-579E-4AB2-935D-5CFFC51843D8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{80080C1D-579E-4AB2-935D-5CFFC51843D8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{80080C1D-579E-4AB2-935D-5CFFC51843D8}.Release|Any CPU.Build.0 = Release|Any CPU
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -258,6 +264,7 @@ Global
{0C98213A-A553-40DC-BEA9-137BDE4A7398} = {96F5D441-B8DE-4ABC-BEF2-F758D1B2BA39}
{1755E8DB-970C-4A24-8B7C-A2BEC1410BEE} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068}
{80080C1D-579E-4AB2-935D-5CFFC51843D8} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068}
{E9E8B374-4165-45F2-8DF5-F141E141AC1D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<IsPackable>false</IsPackable>
<GenerateDocumentationFile>false</GenerateDocumentationFile>
<InvariantGlobalization>true</InvariantGlobalization>
</PropertyGroup>


<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="OpenTelemetry" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.6.0" />
</ItemGroup>


<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ProtobufNet\KafkaFlow.Serializer.ProtobufNet.csproj" />
</ItemGroup>

</Project>
18 changes: 18 additions & 0 deletions samples/KafkaFlow.Sample.OpenTelemetry/PrintConsoleHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Threading.Tasks;

namespace KafkaFlow.Sample.OpenTelemetry;

public class PrintConsoleHandler : IMessageHandler<TestMessage>
{
public Task Handle(IMessageContext context, TestMessage message)
{
Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2}",
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
message.Text);

return Task.CompletedTask;
}
}
94 changes: 94 additions & 0 deletions samples/KafkaFlow.Sample.OpenTelemetry/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using System;
using System.Threading.Tasks;
using KafkaFlow.Configuration;
using KafkaFlow.OpenTelemetry;
using KafkaFlow.Producers;
using KafkaFlow.Serializer;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

namespace KafkaFlow.Sample.OpenTelemetry;

public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();

const string producerName = "PrintConsole";
const string topicName = "otel-sample-topic";

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(
serviceName: "DemoApp",
serviceVersion: "1.0.0"))
.AddSource(KafkaFlowInstrumentation.ActivitySourceName)
.AddConsoleExporter()
.AddOtlpExporter()
.Build();

services.AddKafka(
kafka => kafka
.UseConsoleLog()
.AddOpenTelemetryInstrumentation()
.AddCluster(
cluster => cluster
.WithBrokers(new[]
{
"localhost:9092"
})
.CreateTopicIfNotExists(topicName, 6, 1)
.AddProducer(
producerName,
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(m => m.AddSerializer<ProtobufNetSerializer>())
)
.AddConsumer(
consumer => consumer
.Topic(topicName)
.WithGroupId("print-console-handler")
.WithBufferSize(100)
.WithWorkersCount(3)
.AddMiddlewares(
middlewares => middlewares
.AddDeserializer<ProtobufNetDeserializer>()
.AddTypedHandlers(h => h.AddHandler<PrintConsoleHandler>())
)
)
)
);

var provider = services.BuildServiceProvider();

var bus = provider.CreateKafkaBus();

await bus.StartAsync();

var producer = provider
.GetRequiredService<IProducerAccessor>()
.GetProducer(producerName);

while (true)
{
Console.WriteLine("Type the message you want to send or 'exit' to quit:");
var input = Console.ReadLine();


if (input!.Equals("exit", StringComparison.OrdinalIgnoreCase))
{
await bus.StopAsync();
break;
}

await producer.ProduceAsync(
topicName,
Guid.NewGuid().ToString(),
new TestMessage { Text = $"Message: {input}" });
}

await Task.Delay(3000);
}
}
52 changes: 52 additions & 0 deletions samples/KafkaFlow.Sample.OpenTelemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# KafkaFlow.Sample.OpenTelemetry

This is a simple sample that shows how to enable [OpenTelemetry](https://opentelemetry.io/) instrumentation when using KafkaFlow.

## How to run

### Requirements

- [.NET 6.0 SDK](https://dotnet.microsoft.com/en-us/download/dotnet/6.0)
- [Docker Desktop](https://www.docker.com/products/docker-desktop/)

### Start the cluster

Using your terminal of choice, start the cluster.
You can find a docker-compose file at the root of this repository.
Position the terminal in that folder and run the following command.

```bash
docker-compose up -d
```

### Start Jaeger

Using your terminal of choice, start the [Jaeger](https://www.jaegertracing.io/).

```
docker run --rm --name jaeger \
-e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 4317:4317 \
-p 4318:4318 \
-p 14250:14250 \
-p 14268:14268 \
-p 14269:14269 \
-p 9411:9411 \
jaegertracing/all-in-one:1.51
```

### Run the Sample

Using your terminal of choice, start the sample for the sample folder.

```bash
dotnet run
```

### See the traces collected

Go to http://localhost:16686/ and observe the traces collected for your application.
10 changes: 10 additions & 0 deletions samples/KafkaFlow.Sample.OpenTelemetry/TestMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Runtime.Serialization;

namespace KafkaFlow.Sample.OpenTelemetry;

[DataContract]
public class TestMessage
{
[DataMember(Order = 1)]
public string Text { get; set; }
}
8 changes: 7 additions & 1 deletion website/docs/getting-started/samples.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ You can find the code here: [/samples/KafkaFlow.Sample.PauseConsumerOnError](htt

## Consumer Throttling

This is a sample that shows how to throttle a consumer based on others consumers lag
This is a sample that shows how to throttle a consumer based on others consumers lag.

You can find the code here: [/samples/KafkaFlow.Sample.ConsumerThrottling](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.ConsumerThrottling)

Expand All @@ -59,3 +59,9 @@ You can find the code here: [/samples/KafkaFlow.Sample.ConsumerThrottling](https
This sample shows how to use a consumer to handle all the topics according to a naming convention. This is not a feature of KafkaFlow, but a demonstration of how to use the pattern conventions exposed by [librdkafka](https://github.com/confluentinc/librdkafka/tree/95a542c87c61d2c45b445f91c73dd5442eb04f3c) ([here](https://github.com/confluentinc/librdkafka/blob/95a542c87c61d2c45b445f91c73dd5442eb04f3c/src-cpp/rdkafkacpp.h#L2681)).

You can find the code here: [/samples/KafkaFlow.Sample.WildcardConsumer](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.WildcardConsumer)

## Open Telemetry

This is a sample that shows how to enable OpenTelemetry instrumentation when using KafkaFlow.

You can find the code here: [/samples/KafkaFlow.Sample.OpenTelemetry](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.OpenTelemetry)
2 changes: 1 addition & 1 deletion website/docs/guides/dependency-injection.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
sidebar_position: 20
sidebar_position: 11
---

# Dependency Injection
Expand Down
2 changes: 1 addition & 1 deletion website/docs/guides/migration/_category_.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"label": "Migration",
"position": 11,
"position": 12,
"link": {
"type": "generated-index"
}
Expand Down
18 changes: 16 additions & 2 deletions website/docs/guides/open-telemetry.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
---
sidebar_position: 10
sidebar_label: OpenTelemetry
---

# OpenTelemetry instrumentation

In this section, we will explore how to enable OpenTelemetry instrumentation when using KafkaFlow.

KafkaFlow includes support for [Traces](https://opentelemetry.io/docs/concepts/signals/traces/) and [Baggage](https://opentelemetry.io/docs/concepts/signals/baggage/) signals using [OpenTelemetry instrumentation](https://opentelemetry.io/docs/instrumentation/net/).

:::tip
You can find a sample on how to enable OpenTelemetry [here](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.OpenTelemetry).
:::

## Including OpenTelemetry instrumentation in your code

Add the package [KafkaFlow.OpenTelemetry](https://www.nuget.org/packages/KafkaFlow.OpenTelemetry/) to the project and add the extension method `AddOpenTelemetryInstrumentation` in your Startup:
Add the package [KafkaFlow.OpenTelemetry](https://www.nuget.org/packages/KafkaFlow.OpenTelemetry/) to the project and add the extension method `AddOpenTelemetryInstrumentation` in your [configuration](./configuration.md):

```csharp
services.AddKafka(
Expand All @@ -18,6 +25,13 @@ services.AddKafka(
);
```

## Using KafkaFlow instrumentation with .NET Automatic Instrumentation
Once you have your .NET application instrumentation configured ([see here](https://opentelemetry.io/docs/instrumentation/net/getting-started/)), you just need to subscribe to the source `KafkaFlow.OpenTelemetry` that is accessible through a constant at `KafkaFlowInstrumentation.ActivitySourceName`.

## Using .NET Automatic Instrumentation

When using [.NET automatic instrumentation](https://github.com/open-telemetry/opentelemetry-dotnet-instrumentation), the KafkaFlow activity can be captured by including the ActivitySource name `KafkaFlow.OpenTelemetry` as a parameter to the variable `OTEL_DOTNET_AUTO_TRACES_ADDITIONAL_SOURCES`.

## Propagation

KafkaFlow uses [Propagation](https://opentelemetry.io/docs/specs/otel/context/api-propagators/), the mechanism that moves context information data between services and processes.
When a message is produced using a KafkaFlow producer and consumed by a KafkaFlow consumer, the context will automatically be propagated.
2 changes: 1 addition & 1 deletion website/docs/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ To do that, KafkaFlow gives you access to features like:
- [Schema Registry](guides/middlewares/serializer-middleware.md#adding-schema-registry-support) support.
- [Compression](guides/compression.md) using native Confluent Kafka client compression or compressor middleware.
- [Global Events Subcription](guides/global-events.md) for message production and consumption.
- [Open Telemetry Instrumentation](guides/open-telemetry.md) for traces and baggage signals.
- [OpenTelemetry Instrumentation](guides/open-telemetry.md) for traces and baggage signals.
- Graceful shutdown (wait to finish processing to shutdown).
- Store offset when processing ends, avoiding message loss.
- Supports .NET Core and .NET Framework.
Expand Down
Loading

0 comments on commit 0697853

Please sign in to comment.