Aspire.Confluent.Kafka
9.0.0
Prefix Reserved
dotnet add package Aspire.Confluent.Kafka --version 9.0.0
NuGet\Install-Package Aspire.Confluent.Kafka -Version 9.0.0
<PackageReference Include="Aspire.Confluent.Kafka" Version="9.0.0" />
paket add Aspire.Confluent.Kafka --version 9.0.0
#r "nuget: Aspire.Confluent.Kafka, 9.0.0"
// Install Aspire.Confluent.Kafka as a Cake Addin #addin nuget:?package=Aspire.Confluent.Kafka&version=9.0.0 // Install Aspire.Confluent.Kafka as a Cake Tool #tool nuget:?package=Aspire.Confluent.Kafka&version=9.0.0
Aspire.Confluent.Kafka library
Provides ability to registers an IProducer<TKey, TValue> and an IConsumer<TKey, TValue> in the DI container for producing and consuming messages to an Apache Kafka broker. Enables corresponding health check, logging and metrics. This library wraps Confluent.Kafka binaries.
Getting started
Prerequisites
- An Apache Kafka broker.
Install the package
Install the .NET Aspire Confluent Kafka library with NuGet:
dotnet add package Aspire.Confluent.Kafka
Usage example
In the Program.cs file of your project, call the AddKafkaProducer
extension method to register an IProducer<TKey, TValue>
for use via the dependency injection container. The method takes two generic parameters corresponding to the type of the key and the type of the message to send to the broker. These generic parameters will be used to new an instance of ProducerBuilder<TKey, TValue>
. This method also take connection name parameter.
builder.AddKafkaProducer<string, string>("messaging");
You can then retrieve the IProducer<TKey, TValue>
instance using dependency injection. For example, to retrieve the producer from an IHostedService
:
internal sealed class MyWorker(IProducer<string, string> producer) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(10));
long i = 0;
while (await timer.WaitForNextTickAsync(stoppingToken))
{
var message = new Message<string, string>
{
Key = Guid.NewGuid.ToString(),
Value = $"Hello, World! {i}"
};
producer.Produce("topic", message);
logger.LogInformation($"{producer.Name} sent message '{message.Value}'");
i++;
}
}
}
You can refer to Confluent's Apache Kafka .NET Client documentatoin for more information about how to use the IProducer<TKey, TValue>
efficiently.
Configuration
The .NET Aspire Confluent Kafka component provides multiple options to configure the connection based on the requirements and conventions of your project.
Use a connection string
When using a connection string from the ConnectionStrings
configuration section, you can provide the name of the connection string when calling builder.AddKafkaProducer()
or builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("myConnection");
And then the connection string will be retrieved from the ConnectionStrings
configuration section:
{
"ConnectionStrings": {
"myConnection": "broker:9092"
}
}
The value provided as connection string will be set to the BootstrapServers
property of the produced IProducer<TKey, TValue>
or IConsumer<TKey, TValue>
instance. Refer to BootstrapServers for more information.
Use configuration providers
The .NET Aspire Confluent Kafka component supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings
or KafkaConsumerSettings
from configuration by respectively using the Aspire:Confluent:Kafka:Producer
and Aspire.Confluent:Kafka:Consumer
keys. Example appsettings.json
that configures some of the options:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
The Config
properties of both Aspire:Confluent:Kafka:Producer
and Aspire.Confluent:Kafka:Consumer
configuration sections respectively bind to instances of ProducerConfig
and ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
requires the ClientId
property to be set to let the broker track consumed message offsets.
Use inline delegates to configure KafkaProducerSettings
and KafkaConsumerSettings
.
Also you can pass the Action<KafkaProducerSettings> configureSettings
delegate to set up some or all the options inline, for example to disable health checks from code:
builder.AddKafkaProducer<string, string>("messaging", settings => settings.DisableHealthChecks = true);
Similarly you can configure inline a consumer from code:
builder.AddKafkaConsumer<string, string>("messaging", settings => settings.DisableHealthChecks = true);
Use inline delegates to configure ProducerBuilder<TKey, TValue>
and ConsumerBuilder<TKey, TValue>
.
To configure Confluent.Kafka
builders (for example to setup custom serializers/deserializers for message key and value) you can pass an Action<ProducerBuilder<TKey, TValue>>
(or Action<ConsumerBuilder<TKey, TValue>>
) from code:
builder.AddKafkaProducer<string, MyMessage>("messaging", producerBuilder => {
producerBuilder.SetValueSerializer(new MyMessageSerializer());
})
You can refer to ProducerBuilder<TKey, TValue>
and ConsumerBuilder<TKey, TValue>
api documentation for more information.
AppHost extensions
In your AppHost project, install the Aspire.Hosting.Kafka
library with NuGet:
dotnet add package Aspire.Hosting.Kafka
Then, in the Program.cs file of AppHost
, register an Apache Kafka container and consume the connection using the following methods:
var messaging = builder.AddKafka("messaging");
var myService = builder.AddProject<Projects.MyService>()
.WithReference(messaging);
The WithReference
method configures a connection in the MyService
project named messaging
. In the Program.cs file of MyService
, the Apache Kafka broker connection can be consumed using:
builder.AddKafkaProducer<string, string>("messaging");
or
builder.AddKafkaConsumer<string, string>("messaging");
Additional documentation
- https://docs.confluent.io/kafka-clients/dotnet/current/overview.html
- https://github.com/dotnet/aspire/tree/main/src/Components/README.md
Feedback & contributing
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net8.0
- AspNetCore.HealthChecks.Kafka (>= 8.0.1)
- Confluent.Kafka (>= 2.6.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 8.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 8.0.2)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.2)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 8.0.11)
- Microsoft.Extensions.Hosting.Abstractions (>= 8.0.1)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.2)
- Microsoft.Extensions.Options (>= 8.0.2)
- Microsoft.Extensions.Primitives (>= 8.0.0)
- OpenTelemetry.Extensions.Hosting (>= 1.9.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories (1)
Showing the top 1 popular GitHub repositories that depend on Aspire.Confluent.Kafka:
Repository | Stars |
---|---|
oskardudycz/EventSourcing.NetCore
Examples and Tutorials of Event Sourcing in .NET
|
Version | Downloads | Last updated |
---|---|---|
9.0.0 | 1,169 | 11/12/2024 |
9.0.0-rc.1.24511.1 | 113 | 10/15/2024 |
8.2.2 | 1,411 | 10/24/2024 |
8.2.1 | 5,123 | 9/26/2024 |
8.2.0 | 5,308 | 8/29/2024 |
8.1.0 | 5,740 | 7/23/2024 |
8.0.2 | 901 | 6/28/2024 |
8.0.1 | 2,309 | 5/21/2024 |
8.0.0 | 624 | 5/21/2024 |
8.0.0-preview.7.24251.11 | 641 | 5/7/2024 |
8.0.0-preview.6.24214.1 | 102 | 4/23/2024 |
8.0.0-preview.5.24201.12 | 102 | 4/9/2024 |
8.0.0-preview.4.24156.9 | 2,173 | 3/12/2024 |
8.0.0-preview.3.24105.21 | 620 | 2/13/2024 |