SentinelKafka 1.0.0
dotnet add package SentinelKafka --version 1.0.0
NuGet\Install-Package SentinelKafka -Version 1.0.0
<PackageReference Include="SentinelKafka" Version="1.0.0" />
<PackageVersion Include="SentinelKafka" Version="1.0.0" />
<PackageReference Include="SentinelKafka" />
paket add SentinelKafka --version 1.0.0
#r "nuget: SentinelKafka, 1.0.0"
#:package SentinelKafka@1.0.0
#addin nuget:?package=SentinelKafka&version=1.0.0
#tool nuget:?package=SentinelKafka&version=1.0.0
SentinelKafka
A resilient .NET SDK for Confluent Kafka integration, designed for easy configuration, MSK Support, structured Policies (Polly), and Dependency Injection (DI) support.
Features
- Resilient Producer: Built-in retries, circuit breakers, and timeouts via Polly.
- AWS MSK Support: Natively integrates with Amazon MSK using
AWS.MSK.Authand IAM credentials. - Easy DI Setup: Use
AddKafkaProducer()to integrate cleanly intoIServiceCollection. - SOLID Design: Clean separation of Extensions and Messaging logic.
Installation
Install via NuGet:
dotnet add package SentinelKafka
Requires the following dependencies, which are brought in automatically:
Confluent.KafkaPollyAWS.MSK.AuthAWSSDK.Core
Setup & Configuration
1. appsettings.json Configuration
Add the Kafka settings to your configuration file (or map them directly via the DI configuration action):
{
"SentinelKafka": {
"BootstrapServers": "localhost:9092",
"Ack": 1,
"LingerMs": 5,
"MessageSendMaxRetries": 3,
"IsMsk": true,
"AccessKey": "your-aws-access-key",
"SecretKey": "your-aws-secret-key",
"Region": "us-east-1",
"Topics": {
"orders": "prod-orders-topic"
}
}
}
2. Dependency Injection
In your Program.cs or Startup.cs, add:
using SentinelKafka.Extensions;
var builder = WebApplication.CreateBuilder(args);
// Option A: Configuration via appsettings.json
builder.Services.AddSentinelKafka(options =>
builder.Configuration.GetSection("SentinelKafka").Bind(options));
// Option B: Manual Code-Based Configuration
builder.Services.AddSentinelKafka(options =>
{
options.IsMsk = true;
options.AccessKey = "your-access-key";
options.SecretKey = "your-secret-key";
options.Region = "us-east-1";
options.BootstrapServers = "localhost:9092";
options.MessageSendMaxRetries = 3;
options.Topics = new Dictionary<string, string> { { "OrdersTopicAlias", "prod-orders-topic" } };
});
Usage
Inject IResilientKafkaProducer into your services. The DI automatically resolves to a resilient internally managed thread-safe producer wrapped via Polly policies.
using SentinelKafka.Messaging;
public class OrderService
{
private readonly IResilientKafkaProducer _producer;
public OrderService(IResilientKafkaProducer producer)
{
_producer = producer;
}
public async Task CreateOrderAsync(string orderId, OrderData data)
{
// Produce message using Polly policies (Retry, Timeout, Circuit Breaker)
bool success = await _producer.ProduceWithRetryAsync("orders", data, key: orderId);
if (success)
{
Console.WriteLine("Message successfully produced to Kafka.");
}
else
{
Console.WriteLine("Failed to produce message due to timeout or open circuit.");
}
}
}
3. Consuming Messages (with Dead Letter Queue Support)
The SentinelKafka SDK provides an enterprise-ready base consumer out-of-the-box (KafkaConsumerBase<TMessage>). It natively handles:
- Strongly Typed Deserialization: Automatically parses Kafka JSON values into C# objects.
- Robust Exception Handling: Prevents poison-pills and gracefully captures unhandled occurrences safely.
- Dead Letter Queue (DLQ): Provides an easily overridable
SendToDlqAsyncabstraction hook uniquely designed for downstream microservices to securely route pipeline failures without indefinitely stalling partition offsets.
Create a robust consuming service by inheriting this generic SDK class and overriding the processing hooks:
using SentinelKafka.Messaging;
public class OrderMessage { public string OrderId { get; set; } }
public class OrderValidationConsumer : KafkaConsumerBase<OrderMessage>
{
protected override string TopicName => "OrdersTopicAlias";
protected override string GroupIdName => "OrdersGroupAlias";
public OrderValidationConsumer(
ILogger<OrderValidationConsumer> logger,
IOptions<KafkaOptions> options,
IHostEnvironment env,
IServiceScopeFactory scopeFactory)
: base(logger, options, env, scopeFactory) { }
protected override async Task ProcessMessageAsync(OrderMessage message, CancellationToken ct)
{
// Business logic sequentially orchestrated against each polled message
Console.WriteLine($"Order Processed safely: {message.OrderId}");
}
// Seamlessly write failing messages & exception stacktraces directly into your application's PostgreSQL Db or specialized Topics!
protected override async Task SendToDlqAsync(ConsumeResult<string, string> result, Exception? ex, string reason, CancellationToken ct)
{
using var scope = _scopeFactory.CreateScope();
var myDb = scope.ServiceProvider.GetRequiredService<MyDbContext>();
myDb.DeadLetters.Add(new DeadLetterRecord {
Payload = result.Message.Value,
FailureReason = reason,
Exception = ex?.Message
});
await myDb.SaveChangesAsync(ct);
}
}
Register it gracefully as a Hosted Service so it automatically kicks off during ASP.NET Core process initialization:
builder.Services.AddHostedService<OrderValidationConsumer>();
4. Advanced Telemetry & Distributed Tracing (OpenTelemetry)
The SentinelKafka SDK is engineered flawlessly for decoupled microservices architectures! Both KafkaProducer and KafkaConsumerBase explicitly intercept and sequentially propagate standard W3C traceparent and tracestate payload properties inline mapping to Kafka Headers!
This inherently means if your upstream HTTP API receives a request organically, the local HTTP Activity.Current Trace ID is natively injected synchronously onto the Kafka message object properties! This ultimately triggers a perfectly scoped, seamlessly correlated OpenTelemetry child Activity block on the Consumer side process dynamically ensuring tools natively monitoring your Datadog or AWS X-Ray environments successfully visualize the entire transaction saga bridging HTTP → AWS MSK → Background Consumer effortlessly!
5. High-Throughput & Confluent Schema Registry (Protobuf/Avro)
While natively serializing text utilizing KafkaConsumerBase<TMessage> paired cleanly with System.Text.Json performs brilliantly, strictly enforcing Binary schema formats precisely optimizing your payload structures implicitly like Protobuf natively averages dramatically faster deserialization speeds explicitly!
More imperatively: leveraging Confluent Schema registries structurally securely enforces rigid static contract schemas eliminating breaking application changes actively terminating payloads from deploying incorrectly!
To inherently leverage this logic within the SDK reliably, securely register your SchemaRegistryUrl directly globally on appsettings.json, utilize the natively registered IProtobufKafkaProducer<> DI abstraction effectively, or manually seamlessly inherit standard consumer instances natively extending the abstract ProtobufKafkaConsumerBase<TMessage> framework hook directly out-of-the-box:
public class MyProtobufOrderConsumer : ProtobufKafkaConsumerBase<MyGeneratedProtoClass>
{
protected override string TopicName => "OrdersTopicProto";
protected override string GroupIdName => "GroupProto";
public MyProtobufOrderConsumer(ILogger<MyProtobufOrderConsumer> logger, IOptions<KafkaOptions> options, IHostEnvironment env, IServiceScopeFactory scopeFactory)
: base(logger, options, env, scopeFactory) { }
protected override async Task ProcessMessageAsync(MyGeneratedProtoClass message, CancellationToken ct)
{
// Binary structures sequentially evaluating effectively!
Console.WriteLine($"Binary schema seamlessly successfully matching valid contract inherently: {message.Id}");
}
}
License
MIT
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- AWS.MSK.Auth (>= 1.1.2)
- AWSSDK.Core (>= 4.0.3.18)
- Confluent.Kafka (>= 2.13.2)
- Confluent.SchemaRegistry (>= 2.13.2)
- Confluent.SchemaRegistry.Serdes.Avro (>= 2.13.2)
- Confluent.SchemaRegistry.Serdes.Protobuf (>= 2.13.2)
- Google.Protobuf (>= 3.29.3)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.5)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.5)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.5)
- Polly (>= 8.6.6)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.0 | 34 | 3/18/2026 |
Initial open-source release with robust Producer and Consumer policies, MSK token handling and DLQ automation hooks.