Eventso.KafkaProducer
2.6.1
dotnet add package Eventso.KafkaProducer --version 2.6.1
NuGet\Install-Package Eventso.KafkaProducer -Version 2.6.1
<PackageReference Include="Eventso.KafkaProducer" Version="2.6.1" />
paket add Eventso.KafkaProducer --version 2.6.1
#r "nuget: Eventso.KafkaProducer, 2.6.1"
// Install Eventso.KafkaProducer as a Cake Addin #addin nuget:?package=Eventso.KafkaProducer&version=2.6.1 // Install Eventso.KafkaProducer as a Cake Tool #tool nuget:?package=Eventso.KafkaProducer&version=2.6.1
Binary low-allocating Kafka producer for .Net
Binary Kafka producer with a unified api that accepts ReadOnlySpan<byte>
instead of Message<TKey,TValue>
and avoids creating intermediate byte arrays during serialization. Uses low-level api of Confluent.Kafka library and is fully compatible with it. A single producer instance is enough for your entire application.
Features
- Binary producer api accepts
ReadOnlySpan<byte>
for key and value - Only single producer instance in application is enough
- Effectively producing message batch
- Ready to use overloads for common key types: short, int, long, string, Guid
- Support for Protobuf, System.Text.Json, SpanJson serialization (separate packages)
- Confluent.Kafka compatibility
- Tested in heavy-load production environment
Registration
There are 3 ways to create binary producer:
// 1. Create from Confluent.Kafka ProducerBuilder<TAnyKey, TAnyValue>
ProducerBuilder<byte[], byte[]> confluentBuilder = ...
IProducer binaryProducer = confluentBuilder.BuildBinary();
// 2. Using non-generic ProducerBuilder with same api as ProducerBuilder<TKey, TValue>
ProducerBuilder producerBuilder = new ProducerBuilder(producerConfig);
producerBuilder
.SetDefaultPartitioner(...)
.SetErrorHandler(...);
IProducer binaryProducer = producerBuilder.Build();
// 3. Create binary producer from generic Confluent.Kafka producer
IProducer<TAnyKey, TAnyValue> confluentProducer = ...
IProducer binaryProducer = confluentProducer.CreateBinary();
How to use the producer:
There are two basic methods in IProducer interface
Task<DeliveryResult> ProduceAsync(
string topic,
ReadOnlySpan<byte> key,
ReadOnlySpan<byte> value,
CancellationToken cancellationToken = default(CancellationToken),
Headers? headers = null,
Timestamp timestamp = default,
Partition? partition = null);
void Produce(
string topic,
ReadOnlySpan<byte> key,
ReadOnlySpan<byte> value,
Headers? headers = null,
Timestamp timestamp = default,
Action<DeliveryReport>? deliveryHandler = null,
Partition? partition = null);
Library contains extension methods for frequently used key types: short, int, long, string, Guid
MessageBatch
MessageBatch creates only one TaskCompletionSource and Task per batch while original producer api creates TaskCompletionSource per message. Batch Produce supports all producer features.
var batch = producer.CreateBatch(topicName);
foreach(var record in records)
{
batch.Produce(record.Key, record.Value)
}
//wait for delivery all messages
await batch.Complete(token);
Protobuf and Json values
Additional packages contain method overloads that accepts Google.Protobuf.IMessage or typed object as message value. They use stack or ArrayPool for non-allocating serialization.
- Protobuf
- System.Text.Json (methods also accepts optional
JsonSerializerOptions
orJsonSerializerContext
) - SpanJson
Performance and memory allocation benchmark
Method | Messages | Mean | Gen0 | Gen1 | Gen2 | Allocated |
---|---|---|---|---|---|---|
Binary_Proto_Buffer_MessageBatch | 5000 | 29.038 ms | 218.7500 | - | - | 1406.83 KB |
Binary_Proto_WhenAll | 5000 | 33.454 ms | 400.0000 | 200.0000 | - | 2481.26 KB |
Confluent_Proto_WhenAll | 5000 | 37.963 ms | 1000.0000 | 666.6667 | - | 6652.58 KB |
Binary_Proto_Buffer_MessageBatch | 10000 | 52.968 ms | 400.0000 | - | - | 2813.2 KB |
Binary_SpanJson_MessageBatch | 5000 | 57.299 ms | - | - | - | 1407.25 KB |
Binary_Json_Buffer_MessageBatch | 5000 | 59.708 ms | 500.0000 | - | - | 3516.85 KB |
Binary_Proto_WhenAll | 10000 | 61.909 ms | 666.6667 | 444.4444 | 222.2222 | 4870.61 KB |
Confluent_Proto_WhenAll | 10000 | 75.649 ms | 2000.0000 | 1000.0000 | 666.6667 | 13304.45 KB |
Confluent_Json_WhenAll | 5000 | 82.425 ms | 1000.0000 | - | - | 9583.16 KB |
Binary_SpanJson_MessageBatch | 10000 | 107.249 ms | - | - | - | 2814.13 KB |
Binary_Json_Buffer_MessageBatch | 10000 | 113.456 ms | 1000.0000 | - | - | 7033.18 KB |
Confluent_Json_WhenAll | 10000 | 145.569 ms | 3000.0000 | 1000.0000 | - | 19164.77 KB |
Binary_Proto_Buffer_MessageBatch | 30000 | 147.694 ms | 1000.0000 | - | - | 8438.85 KB |
Binary_Proto_WhenAll | 30000 | 173.801 ms | 2000.0000 | 1000.0000 | - | 14752.95 KB |
Confluent_Proto_WhenAll | 30000 | 219.556 ms | 6000.0000 | 2000.0000 | 1000.0000 | 39656.1 KB |
Binary_Json_Buffer_MessageBatch | 30000 | 335.746 ms | 3000.0000 | - | - | 21096.23 KB |
Binary_SpanJson_MessageBatch | 30000 | 339.693 ms | 1000.0000 | - | - | 8438.8 KB |
Confluent_Json_WhenAll | 30000 | 433.557 ms | 9500.0000 | 3000.0000 | 1500.0000 | 57237.13 KB |
Thanks to @xeromorph for the great ideas and help.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net6.0
- CommunityToolkit.HighPerformance (>= 8.3.2)
- Confluent.Kafka (>= 2.6.1)
-
net8.0
- CommunityToolkit.HighPerformance (>= 8.3.2)
- Confluent.Kafka (>= 2.6.1)
NuGet packages (3)
Showing the top 3 NuGet packages that depend on Eventso.KafkaProducer:
Package | Downloads |
---|---|
Eventso.KafkaProducer.Protobuf
Kafka producer with Protobuf serialization. Low memory allocation and effective batching api. |
|
Eventso.KafkaProducer.Json
Kafka producer with System.Text.Json serialization. Low memory allocation and effective batching api. |
|
Eventso.KafkaProducer.SpanJson
Kafka producer with SpanJson serialization. Low memory allocation and effective batching api. |
GitHub repositories
This package is not used by any popular GitHub repositories.