Yarkool.RedisMQ 1.0.2

dotnet add package Yarkool.RedisMQ --version 1.0.2
NuGet\Install-Package Yarkool.RedisMQ -Version 1.0.2
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Yarkool.RedisMQ" Version="1.0.2" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Yarkool.RedisMQ --version 1.0.2
#r "nuget: Yarkool.RedisMQ, 1.0.2"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Yarkool.RedisMQ as a Cake Addin
#addin nuget:?package=Yarkool.RedisMQ&version=1.0.2

// Install Yarkool.RedisMQ as a Cake Tool
#tool nuget:?package=Yarkool.RedisMQ&version=1.0.2

Yarkool.RedisMQ

基于Redis Stream开发的队列服务, 包含发布者和消费者

用法

Program.cs中注册

var cli = new RedisClient("127.0.0.1:6379,password=");
services.AddRedisMQ(cli, config =>
{
    config.UseErrorQueue = true;  //是否在消费错误时, 消息推送到错误队列
    config.RedisPrefix = "Test:";  //Redis缓存前缀
    config.RegisterConsumerService = false;  //是否开启队列消费服务
    config.RepublishNonAckTimeOutMessage = true;  //是否重新发布未正常Ack的消息到队列, 需要开启`RegisterConsumerService`
});

创建消费者, 需添加RedisMQConsumer特性, 设置QueueName, 消费者数量等, 延迟队列需要设置IsDelayQueueConsumer = true

[RedisMQConsumer("Test")]
public class TestRedisMqConsumer : IRedisMQConsumer<TestMessage>
{
    public Task OnMessageAsync(TestMessage message, CancellationToken cancellationToken = default)
    {
        System.Console.WriteLine(message.Input);

        return Task.CompletedTask;
    }
}

[RedisMQConsumer("Delay", ConsumerCount = 1, PendingTimeOut = 10, IsDelayQueueConsumer = true)]
public class DelayConsumer(ILogger<DelayConsumer> logger) : IRedisMQConsumer<TestMessage>
{
    public Task OnMessageAsync(TestMessage message, CancellationToken cancellationToken = default)
    {
        logger.LogInformation($"message from delay queue: {message.Input}");
        return Task.CompletedTask;
    }
}

发布消息, 只需要注入IRedisMQPublisher, 调用PublishAsync, 参数QueueName需要跟消费者的QueueName一致

private readonly IRedisMQPublisher _publisher;
public WeatherForecastController(IRedisMQPublisher publisher)
{
    _publisher = publisher;
}

// 发送普通队列消息
[HttpPost("PublishMessage")]
public async Task<string> PublishMessage()
{
    var input = Guid.NewGuid().ToString("N");
    var messageId = await _publisher.PublishMessageAsync("Test", new TestMessage
    {
        Input = input
    });
    return $"{messageId}-{input}";
}

// 发送延迟队列消息
[HttpPost("PublishDelayMessage")]
public async Task<string> PublishDelayMessage()
{
    var input = Guid.NewGuid().ToString("N");
    var messageId = await _publisher.PublishMessageAsync("Delay", new TestMessage
    {
        Input = input
    }, TimeSpan.FromSeconds(10));
    return $"{messageId}-{input}";
}
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.2 70 4/18/2024