Aspire.Confluent.Kafka 8.2.1

Prefix Reserved
There is a newer version of this package available.
See the version list below for details.
dotnet add package Aspire.Confluent.Kafka --version 8.2.1                
NuGet\Install-Package Aspire.Confluent.Kafka -Version 8.2.1                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Aspire.Confluent.Kafka" Version="8.2.1" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Aspire.Confluent.Kafka --version 8.2.1                
#r "nuget: Aspire.Confluent.Kafka, 8.2.1"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Aspire.Confluent.Kafka as a Cake Addin
#addin nuget:?package=Aspire.Confluent.Kafka&version=8.2.1

// Install Aspire.Confluent.Kafka as a Cake Tool
#tool nuget:?package=Aspire.Confluent.Kafka&version=8.2.1                

Aspire.Confluent.Kafka library

Provides ability to registers an IProducer<TKey, TValue> and an IConsumer<TKey, TValue> in the DI container for producing and consuming messages to an Apache Kafka broker. Enables corresponding health check, logging and metrics. This library wraps Confluent.Kafka binaries.

Getting started

Prerequisites

  • An Apache Kafka broker.

Install the package

Install the .NET Aspire Confluent Kafka library with NuGet:

dotnet add package Aspire.Confluent.Kafka

Usage example

In the Program.cs file of your project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue> for use via the dependency injection container. The method takes two generic parameters corresponding to the type of the key and the type of the message to send to the broker. These generic parameters will be used to new an instance of ProducerBuilder<TKey, TValue>. This method also take connection name parameter.

builder.AddKafkaProducer<string, string>("messaging");

You can then retrieve the IProducer<TKey, TValue> instance using dependency injection. For example, to retrieve the producer from an IHostedService:

internal sealed class MyWorker(IProducer<string, string> producer) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(10));
        long i = 0;
        while (await timer.WaitForNextTickAsync(stoppingToken))
        {
            var message = new Message<string, string>
            {
              Key = Guid.NewGuid.ToString(),
              Value = $"Hello, World! {i}"
            };
            producer.Produce("topic", message);
            logger.LogInformation($"{producer.Name} sent message '{message.Value}'");
            i++;
        }
    }
}

You can refer to Confluent's Apache Kafka .NET Client documentatoin for more information about how to use the IProducer<TKey, TValue> efficiently.

Configuration

The .NET Aspire Confluent Kafka component provides multiple options to configure the connection based on the requirements and conventions of your project.

Use a connection string

When using a connection string from the ConnectionStrings configuration section, you can provide the name of the connection string when calling builder.AddKafkaProducer() or builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("myConnection");

And then the connection string will be retrieved from the ConnectionStrings configuration section:

{
  "ConnectionStrings": {
    "myConnection": "broker:9092"
  }
}

The value provided as connection string will be set to the BootstrapServers property of the produced IProducer<TKey, TValue> or IConsumer<TKey, TValue> instance. Refer to BootstrapServers for more information.

Use configuration providers

The .NET Aspire Confluent Kafka component supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration by respectively using the Aspire:Confluent:Kafka:Producer and Aspire.Confluent:Kafka:Consumer keys. Example appsettings.json that configures some of the options:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

The Config properties of both Aspire:Confluent:Kafka:Producer and Aspire.Confluent:Kafka:Consumer configuration sections respectively bind to instances of ProducerConfig and ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> requires the ClientId property to be set to let the broker track consumed message offsets.

Use inline delegates to configure KafkaProducerSettings and KafkaConsumerSettings.

Also you can pass the Action<KafkaProducerSettings> configureSettings delegate to set up some or all the options inline, for example to disable health checks from code:

    builder.AddKafkaProducer<string, string>("messaging", settings => settings.DisableHealthChecks = true);

Similarly you can configure inline a consumer from code:

    builder.AddKafkaConsumer<string, string>("messaging", settings => settings.DisableHealthChecks = true);

Use inline delegates to configure ProducerBuilder<TKey, TValue> and ConsumerBuilder<TKey, TValue>.

To configure Confluent.Kafka builders (for example to setup custom serializers/deserializers for message key and value) you can pass an Action<ProducerBuilder<TKey, TValue>> (or Action<ConsumerBuilder<TKey, TValue>>) from code:

    builder.AddKafkaProducer<string, MyMessage>("messaging", producerBuilder => {
      producerBuilder.SetValueSerializer(new MyMessageSerializer());
    })

You can refer to ProducerBuilder<TKey, TValue> and ConsumerBuilder<TKey, TValue> api documentation for more information.

AppHost extensions

In your AppHost project, install the Aspire.Hosting.Kafka library with NuGet:

dotnet add package Aspire.Hosting.Kafka

Then, in the Program.cs file of AppHost, register an Apache Kafka container and consume the connection using the following methods:

var messaging = builder.AddKafka("messaging");

var myService = builder.AddProject<Projects.MyService>()
                       .WithReference(messaging);

The WithReference method configures a connection in the MyService project named messaging. In the Program.cs file of MyService, the Apache Kafka broker connection can be consumed using:

builder.AddKafkaProducer<string, string>("messaging");

or

builder.AddKafkaConsumer<string, string>("messaging");

Additional documentation

Feedback & contributing

https://github.com/dotnet/aspire

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories (1)

Showing the top 1 popular GitHub repositories that depend on Aspire.Confluent.Kafka:

Repository Stars
oskardudycz/EventSourcing.NetCore
Examples and Tutorials of Event Sourcing in .NET
Version Downloads Last updated
9.0.0 1,419 11/12/2024
9.0.0-rc.1.24511.1 113 10/15/2024
8.2.2 1,624 10/24/2024
8.2.1 5,427 9/26/2024
8.2.0 5,386 8/29/2024
8.1.0 5,765 7/23/2024
8.0.2 904 6/28/2024
8.0.1 2,311 5/21/2024
8.0.0 629 5/21/2024
8.0.0-preview.7.24251.11 641 5/7/2024
8.0.0-preview.6.24214.1 102 4/23/2024
8.0.0-preview.5.24201.12 102 4/9/2024
8.0.0-preview.4.24156.9 2,173 3/12/2024
8.0.0-preview.3.24105.21 620 2/13/2024