ChannelMediator 1.1.19
See the version list below for details.
dotnet add package ChannelMediator --version 1.1.19
NuGet\Install-Package ChannelMediator -Version 1.1.19
<PackageReference Include="ChannelMediator" Version="1.1.19" />
<PackageVersion Include="ChannelMediator" Version="1.1.19" />
<PackageReference Include="ChannelMediator" />
paket add ChannelMediator --version 1.1.19
#r "nuget: ChannelMediator, 1.1.19"
#:package ChannelMediator@1.1.19
#addin nuget:?package=ChannelMediator&version=1.1.19
#tool nuget:?package=ChannelMediator&version=1.1.19
๐ ChannelMediator
A modern, high-performance mediator for .NET, built on System.Threading.Channels, with full MediatR compatibility.
Compatible with .NET 8, .NET 9, and .NET 10.
โจ Features
- โ
MediatR Compatible - Familiar API (
Send/Publish/CreateStream) - โ Channel-Based - Asynchronous processing with natural backpressure
- โ Pipeline Behaviors - Global AND specific
- โ
Streaming -
IAsyncEnumerable<T>withIStreamRequest<T>and stream pipeline behaviors - โ Parallel Notifications - Sequential or parallel broadcasting
- โ High Performance - Channel-based with modern optimizations
- โ Azure Service Bus - Distributed messaging with queues and topics
- โ RabbitMQ - Self-hosted distributed messaging with exchanges and queues
- โ Minimal API Generator - Source-generated endpoint mapping from request attributes
- โ
API Client Generator - Source-generated
HttpClienthandlers for consuming generated APIs - โ .NET 8 / 9 / 10 - Multi-targeted packages for current .NET versions
๐ฆ Installation
# Package (coming soon)
dotnet add package ChannelMediator
# Or local reference
<ProjectReference Include="..\ChannelMediator\ChannelMediator.csproj" />
๐ฏ Quick Start
Configuration
using ChannelMediator;
using Microsoft.Extensions.DependencyInjection;
using System.Reflection;
var services = new ServiceCollection();
// Register the mediator
services.AddChannelMediator(
config => config.Strategy = NotificationPublishStrategy.Parallel,
Assembly.GetExecutingAssembly());
var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();
If multiple handlers are found for the same request, only the first registered handler is kept and the others are ignored.
When you pass multiple assemblies to AddChannelMediator, the declaration order determines which handler is selected by default.
Put the assembly containing your preferred default handler first.
services.AddChannelMediator(
assemblies: [
typeof(MyPreferredHandler).Assembly,
typeof(MyFallbackHandler).Assembly
]);
Define a Request
// Request
public record AddToCartRequest(string ProductCode) : IRequest<CartItem>;
// Response
public record CartItem(string ProductCode, int Quantity, decimal Total);
// Handler
public class AddToCartHandler : IRequestHandler<AddToCartRequest, CartItem>
{
public async Task<CartItem> Handle(
AddToCartRequest request,
CancellationToken cancellationToken)
{
// Business logic
return new CartItem(request.ProductCode, 1, 19.99m);
}
}
Usage
var cart = await mediator.Send(new AddToCartRequest("ABC123"));
Notifications
// Notification
public record ProductAddedNotification(string ProductCode, int Quantity) : INotification;
// Handlers (multiple handlers supported)
public class LogHandler : INotificationHandler<ProductAddedNotification>
{
public Task Handle(ProductAddedNotification notification, CancellationToken ct)
{
Console.WriteLine($"LOG: {notification.ProductCode}");
return Task.CompletedTask;
}
}
public class EmailHandler : INotificationHandler<ProductAddedNotification>
{
public async Task Handle(ProductAddedNotification notification, CancellationToken ct)
{
await SendEmailAsync(notification.ProductCode);
}
}
// Publish notification to all handlers
await mediator.Publish(new ProductAddedNotification("ABC123", 1));
๐ญ Pipeline Behaviors
Global Behaviors (for ALL requests)
public class LoggingBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>, IPipelineBehavior
where TRequest : IRequest<TResponse>
{
public async ValueTask<TResponse> HandleAsync(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
Console.WriteLine($"Before: {typeof(TRequest).Name}");
var response = await next();
Console.WriteLine($"After: {typeof(TRequest).Name}");
return response;
}
}
// Registration
services.AddOpenPipelineBehavior(typeof(LoggingBehavior<,>));
services.AddOpenPipelineBehavior(typeof(PerformanceMonitoringBehavior<,>));
Specific Behaviors (for a specific request type)
public class ValidationBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
public async ValueTask<TResponse> HandleAsync(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
// Specific validation
if (request is AddToCartRequest { ProductCode: null or "" })
throw new ArgumentException("ProductCode required");
return await next();
}
}
// Registration
services.AddPipelineBehavior<AddToCartRequest, CartItem, ValidationBehavior<AddToCartRequest, CartItem>>();
๐ Streaming
Stream large or incremental datasets with IStreamRequest<T> and IAsyncEnumerable<T>. Streaming requests bypass the channel pump and are dispatched directly, so they don't block other requests.
Define a Stream Request
public record GetOrderLinesQuery(int OrderId) : IStreamRequest<OrderLineDto>;
public class GetOrderLinesHandler : IStreamRequestHandler<GetOrderLinesQuery, OrderLineDto>
{
private readonly IOrderRepository _repo;
public GetOrderLinesHandler(IOrderRepository repo) => _repo = repo;
public async IAsyncEnumerable<OrderLineDto> Handle(
GetOrderLinesQuery request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var line in _repo.StreamOrderLinesAsync(request.OrderId, cancellationToken))
{
yield return line;
}
}
}
Usage
await foreach (var line in mediator.CreateStream(new GetOrderLinesQuery(42), cancellationToken))
{
Console.WriteLine(line);
}
Stream Pipeline Behaviors
public class StreamLoggingBehavior<TRequest, TResponse>
: IStreamPipelineBehavior<TRequest, TResponse>
where TRequest : IStreamRequest<TResponse>
{
public async IAsyncEnumerable<TResponse> Handle(
TRequest request,
StreamHandlerDelegate<TResponse> next,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
Console.WriteLine($"Stream starting: {typeof(TRequest).Name}");
await foreach (var item in next().WithCancellation(cancellationToken))
{
yield return item;
}
Console.WriteLine($"Stream completed: {typeof(TRequest).Name}");
}
}
// Registration
services.AddScoped<IStreamPipelineBehavior<GetOrderLinesQuery, OrderLineDto>,
StreamLoggingBehavior<GetOrderLinesQuery, OrderLineDto>>();
๐ Available APIs
| Method | Return Type | Description |
|---|---|---|
Send<TResponse>(IRequest<TResponse>, CancellationToken) |
Task<TResponse> |
Sends a request to a single handler and returns the response |
Send(IRequest, CancellationToken) |
Task |
Sends a request without response (command) |
Publish<TNotification>(TNotification, CancellationToken) |
Task |
Publishes a notification to multiple handlers |
CreateStream<TResponse>(IStreamRequest<TResponse>, CancellationToken) |
IAsyncEnumerable<TResponse> |
Creates an async stream from a streaming handler |
๐ Documentation
- ๐ Azure Service Bus Integration
- ๐ RabbitMQ Integration
- โก Minimal API & Client Generators
- ๐ MediatR Compatibility
- ๐ญ Pipeline Behaviors
- ๐ Sequence Diagram
๐ Minimal API & Client Generators
ChannelMediator provides source generators that eliminate boilerplate for ASP.NET Core Minimal APIs and their HTTP clients.
Server side โ Auto-generate Minimal API endpoints
Decorate your request contracts with [EndpointApi] and a mapper class with [MapApiExtension]. The generator emits all MapGet / MapPost / MapPut / MapDelete calls automatically.
// 1. Shared contracts project โ install ChannelMediator.MinimalApiGenerator.Abstraction
[EndpointApi(GroupName = "Catalog", Path = "products", UseHttpStandardVerbs = true)]
public record GetProductRequest(int Id) : IRequest<Product?>;
[EndpointApi(GroupName = "Catalog", Path = "products")]
public record SaveProductRequest(Product Product) : IRequest<Product>;
// 2. Server project โ install ChannelMediator.MinimalApiGenerator
[MapApiExtension]
public static partial class MyRequestsMapper { }
// 3. Program.cs
app.MapMyRequestsMapper(); // All endpoints are registered!
Client side โ Auto-generate HttpClient handlers
In a client project, add a single assembly attribute. The generator creates IRequestHandler implementations that call the server via HttpClient.
// Install ChannelMediator.ApiClientGenerator
[assembly: ApiClient(typeof(GetProductRequest), HttpClientName = "ApiClient")]
// Register the named HttpClient
services.AddHttpClient("ApiClient")
.ConfigureHttpClient(cfg => cfg.BaseAddress = new Uri("https://localhost:7031/api/"));
// Use the mediator exactly as if the handler were local
var product = await mediator.Send(new GetProductRequest(1));
๐๏ธ Architecture
Client
โ
IMediator (Send / Publish)
โ
Channel (async queue)
โ
RequestHandlerWrapper
โ
Pipeline Behaviors (chain)
โโ Global Behavior 1
โโ Global Behavior 2
โโ Specific Behavior 1
โโ Request Handler (business logic)
๐ Azure Service Bus Integration
In a microservice architecture, a single process cannot handle all requests. You need to distribute workloads across multiple consumer instances and decouple services through asynchronous messaging.
ChannelMediator.AzureBus extends the mediator with two extension methods that transparently route messages through Azure Service Bus:
mediator.Notify(notification)โ Publishes to a Topic (fan-out to all subscribers)mediator.EnqueueRequest(request)โ Enqueues to a Queue (competing consumers, only one processes each message)
var mediator = app.Services.GetRequiredService<IMediator>();
// Fan-out notification to all subscriber services
await mediator.Notify(new ProductAddedNotification("SKU-001", 5, 49.95m));
// Enqueue a request for competing consumer processing
await mediator.EnqueueRequest(new MyRequest("process-order-42"));
Supports Live mode (real Azure Service Bus) and Mock mode (in-process for local development). Queues, topics, and subscriptions are created automatically on first use.
๐ RabbitMQ Integration
For self-hosted or on-premise scenarios, ChannelMediator.RabbitMQ provides the same distributed messaging patterns using RabbitMQ:
mediator.NotifyRabbitMq(notification)โ Publishes to a Fanout Exchange (fan-out to all bound queues)mediator.EnqueueRabbitMqRequest(request)โ Enqueues to a Queue (competing consumers, only one processes each message)
var mediator = app.Services.GetRequiredService<IMediator>();
// Fan-out notification to all subscriber services
await mediator.Notify(new ProductAddedNotification("SKU-001", 5, 49.95m));
// Enqueue a request for competing consumer processing
await mediator.EnqueueRequest(new MyRequest("process-order-42"));
Supports Live mode (real RabbitMQ broker) and Mock mode (in-process for local development). Exchanges, queues, and bindings are created automatically on first use.
๐ฏ Use Cases
Perfect for:
- โ High-load applications (backpressure)
- โ Microservices with CQRS patterns
- โ Migration from MediatR (drop-in replacement)
- โ REST / gRPC APIs with complex orchestration
- โ Event-driven architectures
Examples:
- E-commerce: Orders, cart, checkout
- CMS: Publishing, workflow, notifications
- IoT: Telemetry, commands, events
- Finance: Transactions, audit, reporting
โ๏ธ Advanced Configuration
Parallel Notifications
services.AddChannelMediator(config =>
config.Strategy = NotificationPublishStrategy.Parallel);
// All handlers execute in parallel with Task.WhenAll
await mediator.Publish(notification);
Sequential Notifications
services.AddChannelMediator(config =>
config.Strategy = NotificationPublishStrategy.Sequential);
// Handlers execute one after another
await mediator.Publish(notification);
๐งช Tests
[Fact]
public async Task Should_Handle_Request()
{
// Arrange
var services = new ServiceCollection();
services.AddChannelMediator(Assembly.GetExecutingAssembly());
var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();
// Act
var result = await mediator.Send(new AddToCartRequest("TEST"));
// Assert
Assert.NotNull(result);
Assert.Equal("TEST", result.ProductCode);
}
๐ง Compatibility
- .NET 10 (can be back-ported to .NET 8)
- C# 14 (can be adapted for C# 12)
- Microsoft.Extensions.DependencyInjection 9.0+
๐ License
MIT (to be defined)
๐ฅ Contributing
Contributions are welcome! Open an issue or a PR.
๐ Inspirations
- MediatR - The original and still excellent
- System.Threading.Channels - The foundation of our implementation
โญ Why ChannelMediator?
- Performance - Channel-based asynchronous processing
- Flexibility - MediatR-compatible API with powerful extensions
- Modern - .NET 10, C# 14, modern patterns
- Powerful - Global behaviors, parallel notifications
- Familiar - MediatR compatible, easy migration
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- ChannelMediator.Contracts (>= 1.0.9)
- Microsoft.Extensions.DependencyInjection (>= 10.0.5)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.5)
-
net8.0
- ChannelMediator.Contracts (>= 1.0.9)
- Microsoft.Extensions.DependencyInjection (>= 10.0.5)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.5)
-
net9.0
- ChannelMediator.Contracts (>= 1.0.9)
- Microsoft.Extensions.DependencyInjection (>= 10.0.5)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.5)
NuGet packages (3)
Showing the top 3 NuGet packages that depend on ChannelMediator:
| Package | Downloads |
|---|---|
|
ChannelMediator.AzureBus
Azure Service Bus integration for ChannelMediator โ distributed messaging with queues and topics. |
|
|
ChannelMediator.RabbitMQ
RabbitMQ integration for ChannelMediator โ distributed messaging with queues and exchanges. |
|
|
ChannelMediator.InMemory
In-memory messaging integration for ChannelMediator notifications and queued requests. |
GitHub repositories
This package is not used by any popular GitHub repositories.