StreamFlow.RabbitMq
10.0.0
dotnet add package StreamFlow.RabbitMq --version 10.0.0
NuGet\Install-Package StreamFlow.RabbitMq -Version 10.0.0
<PackageReference Include="StreamFlow.RabbitMq" Version="10.0.0" />
paket add StreamFlow.RabbitMq --version 10.0.0
#r "nuget: StreamFlow.RabbitMq, 10.0.0"
// Install StreamFlow.RabbitMq as a Cake Addin #addin nuget:?package=StreamFlow.RabbitMq&version=10.0.0 // Install StreamFlow.RabbitMq as a Cake Tool #tool nuget:?package=StreamFlow.RabbitMq&version=10.0.0
StreamFlow - Just another RabbitMQ handler
Justification
I know there are multiple cool RabbitMQ wrappers (MassTransit, EasyNetQ and many more) in the wild which does the job much better then this library. However I was never been able to use them as is without overriding multiple methods or classes. So I decided to make my own wrapper with my own needs.
Install via NuGet
If you want to include the HTTP sink in your project, you can install it directly from NuGet.
To install the package, run the following command in the Package Manager Console:
PM> Install-Package StreamFlow.RabbitMq
Concepts
All names in RabbitMQ server can be defined by implementing IRabbitMqConventions and registering in dependency injection. However default behavior works like this:
- exchange name is created using request class name
- queue name is created using
<request class name>:<request handler name>[:<service id>][:<consumer group name>]
- error queue name is created using
<request class name>:<request handler name>[:<service id>][:<consumer group name>]:Error
A consumer group is a group of consumers that share the same group id.
When a topic is consumed by consumers in the same group, every record
will be delivered to only one consumer.
If all the consumer instances have the same consumer group, then the
records will effectively be load-balanced over the consumer instances.
Service id identifies service implementation. As for example if we have
multiple services like customers and orders - they will have different
service id's. Most simple case here would be to use a short memorable
identifier like: "customers", "orders".
The purpose of such id is to distinguish message handlers which can have
exactly the same name and can handle exactly the same message. Since these
are in different services - load balancing scenario is not acceptable in this case.
It can be also treated as a consumer group but in this case it applies to whole service.
Component Lifetimes
Connection
Connection object is registered as singleton. However there is a separation between consumer and publisher connections. It means that single service instance can have maximum two connection to RabbitMQ server.
IPublisher
Uses singleton life time. Internally it uses Channel collection with bounded size of 100_000 items. If publications in the code are happening way faster then this collection is processed - you might receive a RabbitMqPublisherException with Reason: InternalQueueIsFull.
Under the hood publisher uses asynchronous publisher acknowledgments. Which means every PublishAsync actually waits (asynchronously) for server confirmation. See for more detailed explanation in RabbitMQ documentation: https://www.rabbitmq.com/confirms.html#publisher-confirms Here you can also see more detailed examples of the behavior: https://www.rabbitmq.com/tutorials/tutorial-seven-dotnet.html For a summary: if you await each PublishAsync method - you actually wait confirmation for each publication as in this example:
public async Task PublishMessagesAsync<T>(T [] messages)
{
foreach (var message in messages)
{
try
{
await _publisher.PublishAsync(message);
}
catch (Exception e)
{
// handle publication errors
}
}
}
Example above might be relatively slow - however it is very simple to implement. In order to improve publishing performance you can do similar approach like in this example:
public async Task PublishMessagesAsync<T>(T [] messages)
{
var tasks = new Task[messages.Length];
for (var i = 0; i < messages.Length; ++i)
{
var message = messages[i];
tasks[i] = _publisher.PublishAsync(message);
}
foreach (var task in tasks)
{
try
{
await task;
}
catch (Exception e)
{
// handle publication errors
}
}
}
NOTE: if you do not specify cancellation token (or specify default), or no timeout is specified in publish options - library automatically adds 60 seconds timeout. If application do not receive response in this time - message is marked as cancelled.
Middleware
Each middleware is added as transient. Every middleware will be created on each received or published message.
Consumers
Every consumer is singleton. However for each received message own scope is created and disposed at the end of processing.
Handlers
Handlers are registered as scoped instances.
Super simple to use
Define a message:
public class PingRequest
{
public DateTime Timestamp { get; set; }
}
Publish message:
public class PingController
{
private readonly IPublisher _publisher;
public PingController(IPublisher publisher)
{
_publisher = publisher;
}
public async Task SendPing()
{
await _publisher.PublishAsync(new PingRequest { Timestamp = DateTime.UtcNow; });
}
}
Define message consumer:
public class PingRequestConsumer : IConsumer<PingRequest>
{
public Task Handle(IMessage<PingRequest> message, CancellationToken cancellation)
{
Console.WriteLine(message.Body.Timestamp);
return Task.CompletedTask;
}
}
Configure using ASP.NET Core:
services.AddStreamFlow(transport =>
{
transport
.UsingRabbitMq(mq => mq
.Connection("localhost", "guest", "guest")
.StartConsumerHostedService()
)
.Consumers(builder => builder
.Add<PingRequest, PingRequestConsumer>(options => options
.ConsumerCount(5)
.ConsumerGroup("gr1"))
.Add<PingRequest, PingRequestConsumer>(options => options
.ConsumerCount(5)
.ConsumerGroup("gr2"))
)
.ConfigureConsumerPipe(builder => builder
.Use<LogAppIdMiddleware>()
)
.ConfigurePublisherPipe(builder => builder
.Use(_ => new SetAppIdMiddleware("Published from StreamFlow.Tests.AspNetCore"))
);
});
The code above registers stream flow classes, configured RabbitMQ connection and instructs to start ASP.NET Core hosted service which starts configured consumers. It configures 2 consumer groupds launching 5 instances for each group.
ConfigureConsumerPipe and ConfigurePublisherPipe registers middleware-like actions which are executed when message is received or when published respectively. Code of these middlewares:
// used at consumer side to log received AppId
public class LogAppIdMiddleware : IStreamFlowMiddleware
{
private readonly ILogger<LogAppIdMiddleware> _logger;
public LogAppIdMiddleware(ILogger<LogAppIdMiddleware> logger)
{
_logger = logger;
}
public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
{
if (!string.IsNullOrWhiteSpace(context.AppId))
{
_logger.LogInformation("AppId: {AppId}", context.AppId);
}
return next(context);
}
}
// used at publisher side to set custom AppId
public class SetAppIdMiddleware : IStreamFlowMiddleware
{
private readonly string _appId;
public SetAppIdMiddleware(string appId)
{
_appId = appId;
}
public Task Invoke(IMessageContext context, Func<IMessageContext, Task> next)
{
context.WithAppId(_appId);
return next(context);
}
}
Even though those example middlewares are very simple in the real world scenarios it can create very powerfull implementations:
- retries
- exception handling
- logging
- metrics
- ...
MediatR
There is an implementation for MediatR library which might greatly simplify consumer implementation.
Simply install package:
Install-Package StreamFlow.RabbitMq.MediatR
Library expects that MediatR itself is already configured and available in dependency injection container.
MediatR Notifications
Request class must implement INotification interface as for example:
public class PingNotification : INotification
{
}
And then it can be used in StreamFlow:
....
.Consumers(builder => builder
.AddNotification<PingNotification>()
)
....
MediatR Request without response
Request class must implement IRequest interface as for example:
public class PingRequest : IRequest
{
}
And then it can be used in StreamFlow:
....
.Consumers(builder => builder
.AddRequest<PingRequest>()
)
....
MediatR Request with response
Request class must implement IRequest interface as for example:
public class PongResponse
{
}
public class PingPongRequest : IRequest<PongResponse>
{
}
And then it can be used in StreamFlow:
....
.Consumers(builder => builder
.AddRequest<PingPongRequest, PongResponse>()
)
....
However in this case you get an additional behavior: response is sent back using RabbitMQ publisher bus. For this to work you also need to enable publisher host (see EnablePublisherHost call):
services.AddStreamFlow(transport =>
{
transport
.UseRabbitMq(mq => mq
.Connection("localhost", "guest", "guest")
.EnableConsumerHost(consumer => consumer.Prefetch(5))
.WithPrometheusMetrics()
.WithPublisherOptions(publisher => publisher
.EnablePublisherHost()
)
)
}
Metrics
Library has a support for various metrics which represents various information about insights of publishers and consumers. Out of the box there is an implementation for Prometheus metrics, but definitelly its not limited to it.
In order to enable metrics, add Prometheus package:
PM> Install-Package StreamFlow.RabbitMq.Prometheus
And enable metrics when configuring RabbitMQ (see: WithPrometheusMetrics):
services.AddStreamFlow(streamFlowOptions, transport =>
{
transport
.UseRabbitMq(mq => mq
.Connection(options.Host, options.Username, options.Password, options.VirtualHost)
.WithPrometheusMetrics()
....
);
});
Prometheus metrics
streamflow_messages_published
histogram, represents published message counts and durations (seconds)
labels: exchange, state [completed or failed]
streamflow_bus_messages_published
histogram, represents published message counts and durations (seconds) when using publisher host
labels: exchange, state [completed or failed]
streamflow_messages_publishing_events
histogram, represents various events happening during publishing process, like channel creation, preparations, serialization, transaction commit and so on, helps to see if there are any bottlenecks in publisher process
labels: exchange, event
streamflow_messages_publishing_errors
counter, represents exceptions happened during message publishing
labels: exchange
streamflow_bus_messages_publishing_errors
counter, represents exceptions happened during message publishing when publisher host is used
labels: exchange
streamflow_messages_consumed
histogram, consumed message count and consumer durations (seconds)
labels: exchange, queue, state
streamflow_messages_consumed_errors
counter, represents exceptions got during consumer process
labels: exchange, queue
streamflow_messages_bus_publishing
counter, shows how many messages are published using publisher host
streamflow_messages_bus_publishing_errors
counter, represents publishing over publisher host errors, since publisher host channel is bounded - it will start increasing when publisher host is receiving more messasges than is able to actually send to RabbitMQ
streamflow_publisher_pool_size
counter, when using publisher pooling, shows pool size (publishers created but currently not used)
streamflow_publisher_pool_in_use
counter, when using publisher pooling, shows how many publisher instances are currently in use
Error Handling
Every application deals with errors. I am pretty sure - RabbitMQ handlers can get into multiple exceptional cases. In case of unhandled exception IRabbitMqErrorHandler instance is created which by default will send a message to error queue. Even though you can define your own error queue name by overriding IRabbitMqConventions interface, default behavior simply add ":Error" suffix to the end of the consumer group queue and send message to that queue with exception details in message header. Application developers or support can later decide what to do with those messages.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. |
-
net8.0
- RabbitMQ.Client (>= 6.8.1)
- StreamFlow (>= 10.0.0)
-
net9.0
- RabbitMQ.Client (>= 6.8.1)
- StreamFlow (>= 10.0.0)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on StreamFlow.RabbitMq:
Package | Downloads |
---|---|
StreamFlow.RabbitMq.Prometheus
RabbitMQ.Client wrapper |
|
StreamFlow.RabbitMq.MediatR
RabbitMQ.Client wrapper |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
10.0.0 | 260 | 12/11/2024 |
9.1.0 | 9,925 | 4/19/2023 |
9.0.3 | 705 | 3/30/2023 |
9.0.2 | 269 | 3/29/2023 |
9.0.1 | 285 | 3/29/2023 |
9.0.0 | 302 | 3/29/2023 |
8.0.0 | 1,277 | 2/18/2023 |
7.4.0 | 454 | 2/11/2023 |
7.3.1 | 2,837 | 11/2/2022 |
7.3.0 | 781 | 10/28/2022 |
7.2.1 | 662 | 10/25/2022 |
7.2.0 | 578 | 10/25/2022 |
7.1.4 | 1,035 | 10/11/2022 |
7.1.3 | 624 | 10/11/2022 |
7.1.2 | 635 | 10/11/2022 |
7.1.1 | 632 | 10/10/2022 |
7.1.0 | 691 | 10/9/2022 |
7.0.0 | 622 | 10/9/2022 |
6.8.1 | 3,594 | 5/20/2022 |
6.8.0 | 620 | 5/20/2022 |
6.7.2 | 927 | 2/24/2022 |
6.7.1 | 814 | 1/23/2022 |
6.7.0 | 792 | 1/17/2022 |
6.6.1 | 730 | 1/13/2022 |
6.6.0 | 797 | 1/12/2022 |
6.5.1 | 393 | 12/20/2021 |
6.5.0 | 388 | 12/20/2021 |
6.4.0 | 375 | 12/20/2021 |
6.3.4 | 353 | 12/14/2021 |
6.3.3 | 344 | 12/14/2021 |
6.3.2 | 351 | 12/14/2021 |
6.3.1 | 380 | 12/14/2021 |
6.3.0 | 430 | 12/14/2021 |
6.2.1 | 391 | 12/9/2021 |
6.2.0 | 356 | 12/9/2021 |
6.1.2 | 368 | 12/8/2021 |
6.1.1 | 391 | 12/8/2021 |
6.1.0 | 402 | 12/8/2021 |
6.0.0 | 651 | 12/6/2021 |
5.2.0 | 393 | 11/23/2021 |
5.1.0 | 401 | 11/23/2021 |
5.0.0 | 414 | 11/14/2021 |
4.1.0 | 561 | 10/14/2021 |
4.0.3 | 423 | 9/20/2021 |
4.0.2 | 480 | 9/19/2021 |
4.0.1 | 396 | 9/19/2021 |
4.0.0 | 417 | 9/19/2021 |
3.9.0 | 430 | 9/19/2021 |
3.8.0 | 529 | 9/12/2021 |
3.7.0 | 480 | 9/12/2021 |
3.5.0 | 417 | 9/7/2021 |
3.4.0 | 413 | 9/7/2021 |
3.3.0 | 338 | 9/7/2021 |
3.2.0 | 332 | 9/7/2021 |
3.1.0 | 343 | 9/2/2021 |
3.0.0 | 334 | 9/1/2021 |
2.5.0 | 495 | 5/4/2021 |
2.3.0 | 324 | 5/4/2021 |
2.2.0 | 357 | 4/27/2021 |
2.1.0 | 360 | 4/26/2021 |
2.0.0 | 325 | 4/26/2021 |