Skip to content

vhatsura/confluent-kafka-extensions-opentelemetry

Repository files navigation

Confluent.Kafka.Extensions.OpenTelemetry

GitHub Actions Badge NuGet Badge

The Confluent.Kafka.Extensions.OpenTelemetry package enables collection of instrumentation data of the Confluent.Kafka library. The actual instrumentation of the Confluent.Kafka library should be configured using Confluent.Kafka.Extensions.Diagnostics.

Installation

Install-Package Confluent.Kafka.Extensions.OpenTelemetry

Usage

Confluent.Kafka configuration

As Confluent.Kafka does not expose any instrumentation data, additional, configuration is required. Full documentation is available at Confluent.Kafka.Extensions.Diagnostics docs. There is also an example on how to use the package in real world application.

Producer

using Confluent.Kafka;
using Confluent.Kafka.Extensions.Diagnostics;


using var producer =
    new ProducerBuilder<Null, string>(new ProducerConfig(new ClientConfig { BootstrapServers = "localhost:9092" }))
        .SetKeySerializer(Serializers.Null)
        .SetValueSerializer(Serializers.Utf8)
        .BuildWithInstrumentation();

await producer.ProduceAsync("topic", new Message<Null, string> { Value = "Hello World!" });

Consumer

using Confluent.Kafka;
using Confluent.Kafka.Extensions.Diagnostics;

using var consumer = new ConsumerBuilder<Ignore, string>(
        new ConsumerConfig(new ClientConfig { BootstrapServers = "localhost:9092" })
        {
            GroupId = "group", AutoOffsetReset = AutoOffsetReset.Earliest
        })
    .SetValueDeserializer(Deserializers.Utf8)
    .Build();

consumer.Subscribe("topic");

consumer.ConsumeWithInstrumentation((result) =>
{
    Console.WriteLine(result.Message.Value);
});

OpenTelemetry configuration

using Confluent.Kafka.Extensions.OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

builder.Services.AddOpenTelemetry().WithTracing(traceBuilder =>
{
    traceBuilder
        .AddInMemoryExporter()
        .AddHttpClientInstrumentation()
        .AddAspNetCoreInstrumentation()
        .AddConfluentKafkaInstrumentation();  // <-- Add Confluent.Kafka OpenTelemetry support
});

About

OpenTelemetry instrumentation for Confluent.Kafka

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages