EasyRabbitFlow 5.0.0

dotnet add package EasyRabbitFlow --version 5.0.0
                    
NuGet\Install-Package EasyRabbitFlow -Version 5.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="EasyRabbitFlow" Version="5.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="EasyRabbitFlow" Version="5.0.0" />
                    
Directory.Packages.props
<PackageReference Include="EasyRabbitFlow" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add EasyRabbitFlow --version 5.0.0
                    
#r "nuget: EasyRabbitFlow, 5.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package EasyRabbitFlow@5.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=EasyRabbitFlow&version=5.0.0
                    
Install as a Cake Addin
#tool nuget:?package=EasyRabbitFlow&version=5.0.0
                    
Install as a Cake Tool

<p align="center"> <img src="icon.png" alt="EasyRabbitFlow" width="120" /> </p>

<h1 align="center">EasyRabbitFlow</h1>

<p align="center"> <strong>High-performance RabbitMQ client for .NET — fluent configuration, automatic topology, zero-ceremony consumers.</strong> </p>

<p align="center"> <a href="https://www.nuget.org/packages/EasyRabbitFlow"><img src="https://img.shields.io/nuget/v/EasyRabbitFlow" alt="NuGet" /></a> <a href="https://www.nuget.org/packages/EasyRabbitFlow"><img src="https://img.shields.io/nuget/dt/EasyRabbitFlow" alt="Downloads" /></a> <a href="LICENSE.txt"><img src="https://img.shields.io/badge/license-MIT-blue.svg" alt="License" /></a> </p>


⚠️ Breaking Changes (v5.0.0)

IRabbitFlowConsumer<TEvent>.HandleAsync signature changed

A new RabbitFlowMessageContext parameter was added to provide AMQP metadata (MessageId, CorrelationId, headers, delivery info) directly to each consumer.

- Task HandleAsync(TEvent message, CancellationToken cancellationToken);
+ Task HandleAsync(TEvent message, RabbitFlowMessageContext context, CancellationToken cancellationToken);

How to migrate: Add the RabbitFlowMessageContext context parameter to every HandleAsync implementation. If you don't need the context, simply ignore it:

public Task HandleAsync(MyEvent message, RabbitFlowMessageContext context, CancellationToken ct)
{
    // context is available but not required to use
    return ProcessAsync(message, ct);
}

Other changes in v5.0.0:

  • PublishAsync / PublishBatchAsync now accept an optional correlationId parameter.
  • PublishAsync returns PublishResult (instead of bool). Use result.Success instead of the raw return value.
  • PublishBatchAsync is new — publish multiple messages atomically (Transactional) or individually confirmed (Confirm).
  • ChannelMode was removed from PublisherOptions — it is now a per-call parameter on PublishBatchAsync (defaults to Transactional). Single-message publishes always use publisher confirms.
  • PublisherOptions.IdempotencyEnabled is new — auto-assigns a unique MessageId to every published message.

Table of Contents


Why EasyRabbitFlow?

Feature EasyRabbitFlow
Fluent, strongly-typed configuration
Automatic queue / exchange / dead-letter generation
Reflection-free per-message processing
Configurable retry with exponential backoff
Temporary batch processing with auto-cleanup
Queue state & purge utilities
Full DI integration (scoped/transient/singleton)
Publisher confirms (single) & transactional batch
Built-in idempotency support (auto MessageId)
CorrelationId support (end-to-end tracing)
RabbitFlowMessageContext per-message metadata
Rich PublishResult / BatchPublishResult types
Thread-safe channel operations
.NET Standard 2.1 (works with .NET 6, 7, 8, 9+)

Architecture Overview

┌─────────────────────────────────────────────────────────────────────┐
│                         Your Application                            │
├────────────┬──────────────┬──────────────┬──────────────────────────┤
│ Publisher  │  Consumers   │   State /    │  Temporary Batch         │
│            │  (Hosted)    │   Purger     │  Processing              │
├────────────┴──────────────┴──────────────┴──────────────────────────┤
│                     EasyRabbitFlow Library                           │
│                                                                     │
│  ┌──────────────┐  ┌────────────────┐  ┌─────────────────────────┐ │
│  │ IRabbitFlow   │  │ ConsumerHosted │  │ IRabbitFlowTemporary    │ │
│  │ Publisher     │  │ Service        │  │ (batch processing)      │ │
│  └──────┬───────┘  └───────┬────────┘  └────────────┬────────────┘ │
│         │                  │                         │              │
│  ┌──────┴──────────────────┴─────────────────────────┴────────┐    │
│  │              RabbitMQ.Client (ConnectionFactory)            │    │
│  └────────────────────────────┬────────────────────────────────┘    │
└───────────────────────────────┼─────────────────────────────────────┘
                                │
                    ┌───────────┴───────────┐
                    │     RabbitMQ Broker    │
                    │                       │
                    │  ┌─────────────────┐  │
                    │  │   Exchanges     │  │
                    │  └────────┬────────┘  │
                    │           │            │
                    │  ┌────────▼────────┐  │
                    │  │     Queues      │  │
                    │  └────────┬────────┘  │
                    │           │            │
                    │  ┌────────▼────────┐  │
                    │  │  Dead-Letter    │  │
                    │  │    Queues       │  │
                    │  └─────────────────┘  │
                    └───────────────────────┘

Message Flow

  Publisher                    RabbitMQ                        Consumer
  ────────                    ────────                        ────────

  PublishAsync() ──────►  Exchange ──routing──► Queue ──────► HandleAsync()
                                                  │               │
                                                  │          (on failure)
                                                  │               │
                                                  │          Retry Policy
                                                  │          (exponential
                                                  │           backoff)
                                                  │               │
                                                  │          (exhausted)
                                                  │               │
                                                  └──► Dead-Letter Queue

Getting Started

Installation

dotnet add package EasyRabbitFlow

Quick Start

1. Define your event model:

public class OrderCreatedEvent
{
    public string OrderId { get; set; } = string.Empty;
    public decimal Total { get; set; }
    public DateTime CreatedAt { get; set; }
}

2. Implement a consumer:

public class OrderConsumer : IRabbitFlowConsumer<OrderCreatedEvent>
{
    private readonly ILogger<OrderConsumer> _logger;

    public OrderConsumer(ILogger<OrderConsumer> logger)
    {
        _logger = logger;
    }

    public async Task HandleAsync(OrderCreatedEvent message, RabbitFlowMessageContext context, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Processing order {OrderId}, total: {Total}, correlationId: {CorrelationId}",
            message.OrderId, message.Total, context.CorrelationId);

        // Your business logic here: save to DB, send email, call API, etc.
        await Task.CompletedTask;
    }
}

3. Configure in Program.cs:

builder.Services
    .AddRabbitFlow(cfg =>
    {
        cfg.ConfigureHost(host =>
        {
            host.Host = "localhost";
            host.Port = 5672;
            host.Username = "guest";
            host.Password = "guest";
        });

        cfg.AddConsumer<OrderConsumer>("orders-queue", c =>
        {
            c.AutoGenerate = true;
            c.PrefetchCount = 10;
            c.Timeout = TimeSpan.FromSeconds(30);
            c.ConfigureRetryPolicy(r =>
            {
                r.MaxRetryCount = 3;
                r.ExponentialBackoff = true;
                r.ExponentialBackoffFactor = 2;
            });
        });
    })
    .UseRabbitFlowConsumers(); // Starts background consumer automatically

4. Publish from an endpoint or service:

app.MapPost("/orders", async (OrderCreatedEvent order, IRabbitFlowPublisher publisher) =>
{
    var result = await publisher.PublishAsync(order, "orders-queue");
    return result.Success
        ? Results.Ok(new { result.Destination, result.MessageId, result.TimestampUtc })
        : Results.Problem(result.Error?.Message);
});

That's it — four steps from zero to a working publish/consume pipeline.


Configuration

All configuration is done through the AddRabbitFlow extension method:

builder.Services.AddRabbitFlow(cfg =>
{
    cfg.ConfigureHost(...);                    // Connection settings
    cfg.ConfigureJsonSerializerOptions(...);   // Serialization (optional)
    cfg.ConfigurePublisher(...);               // Publisher behavior (optional)
    cfg.AddConsumer<T>(...);                   // Register consumers
});

Host Settings

cfg.ConfigureHost(host =>
{
    host.Host = "rabbitmq.example.com";
    host.Port = 5672;
    host.Username = "admin";
    host.Password = "secret";
    host.VirtualHost = "/";
    host.AutomaticRecoveryEnabled = true;
    host.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
    host.RequestedHeartbeat = TimeSpan.FromSeconds(30);
});
Property Type Default Description
Host string "localhost" RabbitMQ server hostname or IP
Port int 5672 AMQP port
Username string "guest" Authentication username
Password string "guest" Authentication password
VirtualHost string "/" RabbitMQ virtual host
AutomaticRecoveryEnabled bool true Auto-reconnect after failures
TopologyRecoveryEnabled bool true Auto-recover queues/exchanges after reconnect
NetworkRecoveryInterval TimeSpan 10s Wait time between recovery attempts
RequestedHeartbeat TimeSpan 30s Heartbeat interval for connection health

JSON Serialization

Optionally customize how messages are serialized/deserialized:

cfg.ConfigureJsonSerializerOptions(json =>
{
    json.PropertyNameCaseInsensitive = true;
    json.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
    json.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
});

If not configured, the default JsonSerializerOptions are used.

Publisher Options

cfg.ConfigurePublisher(pub =>
{
    pub.DisposePublisherConnection = false; // Keep connection alive (default)
    pub.IdempotencyEnabled = true;          // Auto-assign MessageId to every message
});
Property Type Default Description
DisposePublisherConnection bool false Dispose connection after each publish
IdempotencyEnabled bool false Auto-assign a unique MessageId to every published message for deduplication

Consumers

Implementing a Consumer

Every consumer implements IRabbitFlowConsumer<TEvent>:

public interface IRabbitFlowConsumer<TEvent>
{
    Task HandleAsync(TEvent message, RabbitFlowMessageContext context, CancellationToken cancellationToken);
}

The RabbitFlowMessageContext parameter provides AMQP metadata (MessageId, CorrelationId, headers, etc.) for the message being processed. See Message Context for details.

Consumers support full dependency injection — you can inject any service (scoped, transient, singleton):

public class EmailConsumer : IRabbitFlowConsumer<NotificationEvent>
{
    private readonly IEmailService _emailService;
    private readonly ILogger<EmailConsumer> _logger;

    public EmailConsumer(IEmailService emailService, ILogger<EmailConsumer> logger)
    {
        _emailService = emailService;
        _logger = logger;
    }

    public async Task HandleAsync(NotificationEvent message, RabbitFlowMessageContext context, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Sending email to {Recipient}, MessageId={MessageId}", message.Email, context.MessageId);
        await _emailService.SendAsync(message.Email, message.Subject, message.Body, cancellationToken);
    }
}

Registering Consumers

cfg.AddConsumer<EmailConsumer>("email-queue", c =>
{
    c.Enable = true;                              // Enable/disable this consumer
    c.PrefetchCount = 5;                          // Messages fetched in parallel
    c.Timeout = TimeSpan.FromSeconds(30);         // Per-message processing timeout
    c.AutoAckOnError = false;                     // Don't ack failed messages
    c.ExtendDeadletterMessage = true;             // Add error details to DLQ messages
    c.ConsumerId = "email-consumer-1";            // Custom connection identifier
});
Property Type Default Description
Enable bool true Whether this consumer is active
QueueName string (set in constructor) Queue to consume from
ConsumerId string? null Custom connection ID (falls back to queue name)
PrefetchCount ushort 1 How many messages to prefetch
Timeout TimeSpan 30s Processing timeout per message
AutoAckOnError bool false Auto-acknowledge on error (message lost)
AutoGenerate bool false Auto-create queue/exchange/DLQ
ExtendDeadletterMessage bool false Enrich dead-letter messages with error details

Auto-Generate Topology

When AutoGenerate = true, EasyRabbitFlow creates queues, exchanges, and dead-letter queues automatically:

cfg.AddConsumer<OrderConsumer>("orders-queue", c =>
{
    c.AutoGenerate = true;
    c.ConfigureAutoGenerate(ag =>
    {
        ag.ExchangeName = "orders-exchange";      // Custom exchange name
        ag.ExchangeType = ExchangeType.Fanout;    // Direct | Fanout | Topic | Headers
        ag.RoutingKey = "orders-routing-key";      // Routing key for binding
        ag.GenerateExchange = true;                // Create the exchange
        ag.GenerateDeadletterQueue = true;         // Create a dead-letter queue
        ag.DurableQueue = true;                    // Queue survives broker restart
        ag.DurableExchange = true;                 // Exchange survives broker restart
        ag.ExclusiveQueue = false;                 // Not limited to one connection
        ag.AutoDeleteQueue = false;                // Don't delete when last consumer disconnects
    });
});

Generated topology when AutoGenerate = true:

                        ┌──────────────────────┐
                        │  orders-exchange      │
                        │  (fanout)             │
                        └──────────┬───────────┘
                                   │ routing-key
                        ┌──────────▼───────────┐
                        │  orders-queue         │──── args: x-dead-letter-exchange
                        │  (durable)            │         x-dead-letter-routing-key
                        └──────────┬───────────┘
                                   │ (on failure)
                  ┌────────────────▼─────────────────┐
                  │  orders-queue-deadletter-exchange │
                  │  (direct)                         │
                  └────────────────┬─────────────────┘
                                   │
                  ┌────────────────▼─────────────────┐
                  │  orders-queue-deadletter          │
                  └──────────────────────────────────┘
Property Type Default Description
GenerateExchange bool true Create the exchange
GenerateDeadletterQueue bool true Create dead-letter queue and exchange
ExchangeType ExchangeType Direct Direct, Fanout, Topic, or Headers
ExchangeName string? null Custom name (defaults to {queue}-exchange)
RoutingKey string? null Custom routing key (defaults to {queue}-routing-key)
DurableExchange bool true Exchange survives broker restart
DurableQueue bool true Queue survives broker restart
ExclusiveQueue bool false Queue limited to declaring connection
AutoDeleteQueue bool false Delete queue when last consumer disconnects
Args IDictionary? null Additional RabbitMQ arguments

Retry Policies

Configure how failed messages are retried:

c.ConfigureRetryPolicy(r =>
{
    r.MaxRetryCount = 5;               // Number of attempts
    r.RetryInterval = 1000;            // Base delay in ms
    r.ExponentialBackoff = true;        // Enable exponential backoff
    r.ExponentialBackoffFactor = 2;     // Multiply delay by this factor
    r.MaxRetryDelay = 30_000;           // Cap delay at 30 seconds
});

Example: retry timeline with exponential backoff (factor=2, base=1000ms):

Attempt 1 → fail → wait 1000ms
Attempt 2 → fail → wait 2000ms
Attempt 3 → fail → wait 4000ms
Attempt 4 → fail → wait 8000ms
Attempt 5 → fail → sent to dead-letter queue
Property Type Default Description
MaxRetryCount int 1 Total retry attempts
RetryInterval int 1000 Base delay between retries (ms)
ExponentialBackoff bool false Enable exponential backoff
ExponentialBackoffFactor int 1 Multiplier for exponential growth
MaxRetryDelay int 60000 Upper bound for delay (ms)

Custom Dead-Letter Queues

Route failed messages to a specific dead-letter queue:

c.ConfigureCustomDeadletter(dl =>
{
    dl.DeadletterQueueName = "custom-errors-queue";
});

When ExtendDeadletterMessage = true, the dead-letter message includes full error details:

{
  "dateUtc": "2026-01-15T10:30:00Z",
  "messageType": "OrderCreatedEvent",
  "messageData": { "orderId": "ORD-123", "total": 99.99 },
  "exceptionType": "TimeoutException",
  "errorMessage": "The operation was canceled.",
  "stackTrace": "...",
  "source": "OrderService",
  "innerExceptions": []
}

Publishing Messages

Inject IRabbitFlowPublisher to publish messages. All single-message publishes use publisher confirms — the await only completes after the broker confirms receipt.

Single Message

Returns a PublishResult with Success, Destination, RoutingKey, MessageId, TimestampUtc, and Error:

public class OrderService
{
    private readonly IRabbitFlowPublisher _publisher;

    public OrderService(IRabbitFlowPublisher publisher)
    {
        _publisher = publisher;
    }

    // Publish directly to a queue
    public async Task CreateOrderAsync(OrderCreatedEvent order)
    {
        var result = await _publisher.PublishAsync(order, queueName: "orders-queue");

        if (!result.Success)
            throw new Exception($"Publish failed: {result.Error?.Message}");
    }

    // Publish to an exchange with routing key and correlation ID
    public async Task BroadcastNotificationAsync(NotificationEvent notification, string correlationId)
    {
        var result = await _publisher.PublishAsync(
            notification,
            exchangeName: "notifications",
            routingKey: "user.created",
            correlationId: correlationId);

        Console.WriteLine($"Published to {result.Destination} at {result.TimestampUtc}");
    }
}

Method signatures:

// Publish to exchange (with routing key) — always uses publisher confirms
Task<PublishResult> PublishAsync<TEvent>(TEvent message, string exchangeName, string routingKey,
    string? correlationId = null, string publisherId = "",
    JsonSerializerOptions? jsonOptions = null,
    CancellationToken cancellationToken = default) where TEvent : class;

// Publish directly to queue — always uses publisher confirms
Task<PublishResult> PublishAsync<TEvent>(TEvent message, string queueName,
    string? correlationId = null, string publisherId = "",
    JsonSerializerOptions? jsonOptions = null,
    CancellationToken cancellationToken = default) where TEvent : class;

PublishResult properties:

Property Type Description
Success bool Whether the broker confirmed the message
MessageId string? Unique ID (when IdempotencyEnabled = true)
Destination string Target exchange or queue name
RoutingKey string Routing key used (empty for queue publishes)
TimestampUtc DateTime When the publish was executed
Error Exception? The exception if Success is false

Batch Publishing

Use PublishBatchAsync to publish multiple messages in a single operation. The channelMode parameter controls atomicity:

  • Transactional (default): All-or-nothing — if any message fails, the entire batch is rolled back.
  • Confirm: Each message is individually confirmed — a mid-batch failure does not roll back previous messages.
// Atomic batch (Transactional — default)
var result = await publisher.PublishBatchAsync(
    orders,
    exchangeName: "orders-exchange",
    routingKey: "new-order");

Console.WriteLine($"Batch: {result.MessageCount} messages, success={result.Success}");

// Non-atomic batch (Confirm mode — higher throughput)
var result = await publisher.PublishBatchAsync(
    orders,
    queueName: "orders-queue",
    channelMode: ChannelMode.Confirm);

Method signatures:

// Batch to exchange
Task<BatchPublishResult> PublishBatchAsync<TEvent>(IReadOnlyList<TEvent> messages,
    string exchangeName, string routingKey,
    ChannelMode channelMode = ChannelMode.Transactional,
    string? correlationId = null, string publisherId = "",
    JsonSerializerOptions? jsonOptions = null,
    CancellationToken cancellationToken = default) where TEvent : class;

// Batch to queue
Task<BatchPublishResult> PublishBatchAsync<TEvent>(IReadOnlyList<TEvent> messages,
    string queueName,
    ChannelMode channelMode = ChannelMode.Transactional,
    string? correlationId = null, string publisherId = "",
    JsonSerializerOptions? jsonOptions = null,
    CancellationToken cancellationToken = default) where TEvent : class;

BatchPublishResult properties:

Property Type Description
Success bool Whether all messages were published
MessageCount int Number of messages in the batch
MessageIds IReadOnlyList<string> IDs per message (when IdempotencyEnabled = true)
Destination string Target exchange or queue name
RoutingKey string Routing key used
ChannelMode ChannelMode Mode used (Transactional or Confirm)
TimestampUtc DateTime When the batch was executed
Error Exception? The exception if Success is false

Idempotency

Enable automatic MessageId generation so consumers can deduplicate:

cfg.ConfigurePublisher(pub => pub.IdempotencyEnabled = true);

When enabled, every published message (single or batch) gets a unique MessageId set in BasicProperties.MessageId. The ID is also returned in PublishResult.MessageId or BatchPublishResult.MessageIds.

Correlation

Pass a correlationId when publishing to trace related messages end-to-end:

// Single message with correlation
await publisher.PublishAsync(order, "orders-queue", correlationId: "req-abc-123");

// Exchange publish with correlation
await publisher.PublishAsync(event, "notifications", routingKey: "new", correlationId: requestId);

// Batch — same correlationId shared across all messages
await publisher.PublishBatchAsync(events, "orders-queue", correlationId: batchId);

The correlationId is set on BasicProperties.CorrelationId and received by consumers via RabbitFlowMessageContext.CorrelationId.


Message Context

Every consumer receives a RabbitFlowMessageContext as a parameter of HandleAsync, providing access to AMQP metadata of the message being processed:

public class OrderConsumer : IRabbitFlowConsumer<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent message, RabbitFlowMessageContext context, CancellationToken ct)
    {
        // Idempotency check using MessageId
        if (context.MessageId != null && await _db.ExistsAsync(context.MessageId))
        {
            _logger.LogWarning("Duplicate message {Id}, skipping", context.MessageId);
            return;
        }

        // Trace correlation across services
        _logger.LogInformation("Processing order. CorrelationId={CorrelationId}", context.CorrelationId);

        // Check if this is a redelivery
        if (context.Redelivered)
        {
            _logger.LogWarning("Redelivered message, DeliveryTag={Tag}", context.DeliveryTag);
        }

        await ProcessOrderAsync(message, ct);
    }
}

RabbitFlowMessageContext properties:

Property Type Description
MessageId string? Unique ID from BasicProperties.MessageId (set when IdempotencyEnabled = true)
CorrelationId string? Correlation ID from BasicProperties.CorrelationId (set via correlationId parameter)
Exchange string? Exchange that delivered the message (empty for direct queue publishes)
RoutingKey string? Routing key used when the message was published
Headers IDictionary? Custom AMQP headers from BasicProperties.Headers
DeliveryTag ulong Broker-assigned delivery tag for this message
Redelivered bool Whether the broker redelivered this message

Note: RabbitFlowMessageContext is immutable — all properties are read-only and populated automatically from BasicDeliverEventArgs before HandleAsync is invoked.


Queue State Inspection

Use IRabbitFlowState to query queue metadata at runtime:

public class HealthCheckService
{
    private readonly IRabbitFlowState _state;

    public HealthCheckService(IRabbitFlowState state)
    {
        _state = state;
    }

    public async Task<object> GetQueueHealthAsync(string queueName)
    {
        return new
        {
            IsEmpty = await _state.IsEmptyQueueAsync(queueName),
            MessageCount = await _state.GetQueueLengthAsync(queueName),
            ConsumerCount = await _state.GetConsumersCountAsync(queueName),
            HasConsumers = await _state.HasConsumersAsync(queueName)
        };
    }
}
Method Returns Description
IsEmptyQueueAsync(queueName) Task<bool> Is the queue empty?
GetQueueLengthAsync(queueName) Task<uint> Number of messages in the queue
GetConsumersCountAsync(queueName) Task<uint> Number of active consumers
HasConsumersAsync(queueName) Task<bool> Does the queue have any consumers?

Queue Purging

Use IRabbitFlowPurger to remove all messages from queues:

// Purge a single queue
await purger.PurgeMessagesAsync("orders-queue");

// Purge multiple queues at once
await purger.PurgeMessagesAsync(new[] { "orders-queue", "emails-queue", "notifications-queue" });

Temporary Batch Processing

IRabbitFlowTemporary is designed for fire-and-forget batch workflows — process a collection of messages through RabbitMQ with automatic queue creation and cleanup.

Ideal for:

  • Background jobs (PDF generation, email sending, report calculation)
  • One-time batch processing (database cleanup, data migration)
  • Parallel processing with configurable concurrency

Basic Usage

public class InvoiceService
{
    private readonly IRabbitFlowTemporary _temporary;

    public InvoiceService(IRabbitFlowTemporary temporary)
    {
        _temporary = temporary;
    }

    public async Task ProcessInvoiceBatchAsync(List<Invoice> invoices)
    {
        int processed = await _temporary.RunAsync(
            invoices,
            onMessageReceived: async (invoice, ct) =>
            {
                Console.WriteLine($"Processing invoice {invoice.Id}...");
                await Task.Delay(500, ct); // Simulate work
            },
            onCompleted: (total, errors) =>
            {
                Console.WriteLine($"Done! Processed: {total}, Errors: {errors}");
            },
            options: new RunTemporaryOptions
            {
                PrefetchCount = 10,
                Timeout = TimeSpan.FromSeconds(30),
                CorrelationId = Guid.NewGuid().ToString()
            });
    }
}

With Result Collection

int processed = await _temporary.RunAsync<Invoice, InvoiceResult>(
    invoices,
    onMessageReceived: async (invoice, ct) =>
    {
        var result = await ProcessInvoiceAsync(invoice, ct);
        return new InvoiceResult { InvoiceId = invoice.Id, Status = "Completed" };
    },
    onCompletedAsync: async (count, results) =>
    {
        // results is a ConcurrentQueue<InvoiceResult> with all collected results
        Console.WriteLine($"Processed {count} invoices, collected {results.Count} results");
        await SaveResultsAsync(results);
    });

How It Works

  Your Code                   RabbitMQ (Temporary)            Handler
  ─────────                   ──────────────────              ───────

  RunAsync(messages) ─────►  Create temp queue  ───────────►  onMessageReceived()
       │                     Publish all msgs                      │
       │                           │                               │
       │                     Consume & process  ◄──────────────────┘
       │                           │
       │                     All processed?
       │                           │ yes
       ◄──────────────────── Delete temp queue
       │                     Call onCompleted()
       │
  return count
Option Type Default Description
PrefetchCount ushort 1 Parallel message processing (>0)
Timeout TimeSpan? null Per-message timeout
QueuePrefixName string? null Custom prefix for the temp queue name
CorrelationId string? Guid Correlation ID for tracing/logging

Transient Exceptions & Custom Retry Logic

By default, timeouts and RabbitFlowTransientException trigger retries. Throw RabbitFlowTransientException from your consumer to signal a retryable error:

using EasyRabbitFlow.Exceptions;

public class PaymentConsumer : IRabbitFlowConsumer<PaymentEvent>
{
    public async Task HandleAsync(PaymentEvent message, RabbitFlowMessageContext context, CancellationToken cancellationToken)
    {
        try
        {
            await ProcessPaymentAsync(message);
        }
        catch (HttpRequestException ex) when (ex.StatusCode == HttpStatusCode.ServiceUnavailable)
        {
            // This will trigger the retry policy
            throw new RabbitFlowTransientException("Payment gateway temporarily unavailable", ex);
        }
        // Any other exception → no retry, sent to dead-letter queue
    }
}

Exception types:

Exception Purpose
RabbitFlowTransientException Signals a retryable error — triggers retry policy
RabbitFlowException General library error
RabbitFlowOverRetriesException Thrown internally when all retry attempts are exhausted

Full API Reference

Registered Services

Interface Lifetime Description
IRabbitFlowPublisher Singleton Publish messages to exchanges or queues
IRabbitFlowState Singleton Query queue metadata
IRabbitFlowTemporary Singleton Temporary batch processing
IRabbitFlowPurger Singleton Purge queue messages
ConsumerHostedService Hosted Background consumer lifecycle (via UseRabbitFlowConsumers)

Extension Methods

// Register all EasyRabbitFlow services
IServiceCollection AddRabbitFlow(this IServiceCollection services,
    Action<RabbitFlowConfigurator>? configurator = null);

// Start background consumer processing
IServiceCollection UseRabbitFlowConsumers(this IServiceCollection services);

RabbitFlowConfigurator Methods

Method Description
ConfigureHost(Action<HostSettings>) Set RabbitMQ connection details
ConfigureJsonSerializerOptions(Action<JsonSerializerOptions>) Customize JSON serialization
ConfigurePublisher(Action<PublisherOptions>?) Configure publisher behavior
AddConsumer<TConsumer>(string queueName, Action<ConsumerSettings<TConsumer>>) Register a consumer

Performance Notes

EasyRabbitFlow is designed for high-throughput scenarios:

  • Zero per-message reflection — consumer handlers are compiled via expression trees at startup, not resolved per message.
  • Connection pooling — publisher reuses a single connection by default.
  • Prefetch control — tune PrefetchCount for optimal throughput vs. memory usage.
  • Thread-safe channel operations — all channel I/O (ACK/NACK/Publish) is serialized via per-channel semaphores, preventing race conditions when PrefetchCount > 1.
  • Semaphore-based concurrency — internal semaphores prevent consumer overload.
  • Automatic recovery — connections auto-recover after network failures with configurable intervals.

Recommended settings for high throughput:

cfg.AddConsumer<MyConsumer>("high-volume-queue", c =>
{
    c.PrefetchCount = 50;                          // Process 50 messages concurrently
    c.Timeout = TimeSpan.FromSeconds(60);          // Generous timeout for heavy processing
    c.ConfigureRetryPolicy(r =>
    {
        r.MaxRetryCount = 3;
        r.RetryInterval = 500;
        r.ExponentialBackoff = true;
        r.ExponentialBackoffFactor = 2;
        r.MaxRetryDelay = 10_000;                  // Cap at 10 seconds
    });
});

cfg.ConfigurePublisher(pub =>
{
    pub.DisposePublisherConnection = false;         // Reuse connection
    pub.IdempotencyEnabled = true;                  // Auto-assign MessageId
});

License

This project is licensed under the MIT License.

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.1 is compatible. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
5.0.0 196 2/15/2026
4.2.0 93 2/15/2026
4.1.0 90 2/15/2026
4.0.2 649 10/31/2025
4.0.1 272 10/13/2025
4.0.0 208 10/12/2025
3.0.9 154 10/10/2025
3.0.8 157 10/10/2025
3.0.7 446 8/10/2025
3.0.6 401 6/19/2025
3.0.5 464 5/1/2025
3.0.4 207 4/25/2025
3.0.3 239 4/25/2025
3.0.2 208 4/25/2025
3.0.1 313 4/17/2025
3.0.0 270 4/17/2025
2.2.7 324 4/14/2025
2.2.6 287 4/13/2025
2.2.5 287 4/13/2025
2.2.4 207 4/11/2025
Loading failed