Picton.Messaging 9.0.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package Picton.Messaging --version 9.0.0
NuGet\Install-Package Picton.Messaging -Version 9.0.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Picton.Messaging" Version="9.0.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Picton.Messaging --version 9.0.0
#r "nuget: Picton.Messaging, 9.0.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Picton.Messaging as a Cake Addin
#addin nuget:?package=Picton.Messaging&version=9.0.0

// Install Picton.Messaging as a Cake Tool
#tool nuget:?package=Picton.Messaging&version=9.0.0

Picton

License Build status Coverage Status FOSSA Status

About

Picton.Messaging is a C# library containing a high performance message processor (also known as a message "pump") designed to process messages from an Azure storage queue as efficiently as possible.

I created Picton.Mesaging because I needed a way to process a large volume of messages from Azure storage queues as quickly and efficiently as possible. I searched for a long time, but I could never find a solution that met all my requirements.

In March 2016 I attended three webinars by Daniel Marbach on "Async/Await & Task Parallel Library" (part 1, part 2 and part 3). Part 2 was particularly interresting to me because Daniel presented a generic message pump that meets most (but not all) of my requirements. Specifically, Daniel's message pump meets the following criteria:

  • Concurrent message handling. This means that multiple messages can be processed at the same time. This is critical, especially if your message processing logic is I/O bound.
  • Limiting concurrency. This means that we can decide the maximum number of messages that can be processed at the same time.
  • Cancelling and graceful shutdown. This means that our message pump can be notified that we want to stop processing additional messages and also we can decide what to do with messages that are being processed at the moment when we decide to stop.

The sample code that Daniel shared during his webinars was very generic and not specific to Azure so I made the following enhancements:

  • The messages are fetched from an Azure storage queue. We can specify which queue and it can even be a queue in the storage emulator.
  • Backoff. When no messages are found in the queue, it's important to scale back and query the queue less often. As soon as new messages are available in the queue, we want to scale back up in order to process these messages as quickly as possible. I made two improvements to implemented this logic:
    • first, introduce a pause when no message is found in the queue in order to reduce the number of queries to the storage queue. By default we pause for one second, but it's configurable. You could even eliminate the pause altogether but I do not recommend it.
    • second, reduce the number of concurrent message processing tasks. This is the most important improvement introduced in the Picton library (in my humble opinion!). Daniel's sample code uses SemaphoreSlim to act as a "gatekeeper" and to limit the number of tasks that be be executed concurently. However, the number of "slots" permitted by the semaphore must be predetermined and is fixed. Picton eliminates this restriction and allows this number to be dynamically increased and decreased based on the presence or abscence of messages in the queue.

In December 2017 version 2.0 was released with a much more efficient method of fetching messages from the Azure queue: there is now a dedicated task for this pupose instead of allowing each individual concurent task to fetch their own messages. This means that the logic to increase/decrease the number of available slots in the SemaphoreSlim is no longer necessary and has ben removed.

Nuget

Picton.Messaging is available as a Nuget package.

NuGet Version

Installation

The easiest way to include Picton.Messaging in your C# project is by grabing the nuget package:

PM> Install-Package Picton.Messaging

Once you have the Picton.Messaging library properly referenced in your project, modify your RoleEntryPoint like this example:

using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.RetryPolicies;
using Picton.Messaging;
using System;
using System.Diagnostics;

namespace WorkerRole1
{
    public class MyWorkerRole : RoleEntryPoint
    {
        private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

        public override void Run()
        {
            Trace.TraceInformation("WorkerRole is running");

            try
            {
                this.RunAsync(this.cancellationTokenSource.Token).Wait();
            }
            finally
            {
                this.runCompleteEvent.Set();
            }
        }

        public override bool OnStart()
        {
            // Use TLS 1.2 for Service Bus connections
            ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;

            // Set the maximum number of concurrent connections
            ServicePointManager.DefaultConnectionLimit = 12;

            // For information on handling configuration changes
            // see the MSDN topic at https://go.microsoft.com/fwlink/?LinkId=166357.

            bool result = base.OnStart();

            Trace.TraceInformation("WorkerRole has been started");

            return result;
        }

        public override void OnStop()
        {
            Trace.TraceInformation("WorkerRole is stopping");

            // Invoking "Cancel()" will cause the AsyncMessagePump to stop
            this.cancellationTokenSource.Cancel();
            this.runCompleteEvent.WaitOne();

            base.OnStop();

            Trace.TraceInformation("WorkerRole has stopped");
        }

        private async Task RunAsync(CancellationToken cancellationToken)
        {
            var connectionString = "<-- insert connection string for your Azure account -->";
            var concurrentTask = 10; // <-- this is the max number of messages that can be processed at a time

            // Configure the message pump
            var options = new MessagePumpOptions(connectionString, concurrentTasks);
            var messagePump = new AsyncMessagePump(options)
            {
                OnMessage = (queueName, message, cancellationToken) =>
                {
                    // This is where you insert your custom logic to process a message
                },
                OnError = (queueName, message, exception, isPoison) =>
                {
                    // Insert your custom error handling

                    // ==========================================================================
                    // Important note regarding "isPoison":
                    // --------------------------------------------------------------------------
                    // this parameter indicates whether this message has exceeded the maximum
                    // number of retries. 
                    //
                    // When you have configured the "poison queue name" for the given queue and 
                    // this parameter is "true", the message is automatically copied to the poison
                    // queue and removed from the original queue.
                    // 
                    // If you have not configured the "poison queue name" for the given queue and
                    // this parameter is "true", the message is automatically removed from the
                    // original queue and you are responsible for storing the message. If you don't,
                    // this mesage will be lost.
                    // ==========================================================================
                }
            };

            // Replace the following samples with the queues you want to monitor
            messagePump.AddQueue("queue01", "queue01-poison", TimeSpan.FromMinutes(1), 3, "queue01-oversize-messages");
            messagePump.AddQueue("queue02", "queue02-poison", TimeSpan.FromMinutes(1), 3, "queue02-oversize-messages");
            messagePump.AddQueue("queue03", "queue03-poison", TimeSpan.FromMinutes(1), 3, "queue03-oversize-messages");

            // Queues can share the same poison queue
            messagePump.AddQueue("queue04", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue04-oversize-messages");
            messagePump.AddQueue("queue05", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue05-oversize-messages");

            // Queues can also share the same blob storage for messages that exceed the max size
            messagePump.AddQueue("queue06", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");
            messagePump.AddQueue("queue07", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");

            // Start the message pump
            await messagePump.StartAsync(cancellationToken);
        }
    }
}

License

FOSSA Status

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 is compatible.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.1 is compatible. 
.NET Framework net48 is compatible.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
9.1.0 94 2/24/2024
9.0.0 97 1/30/2024
8.0.0 152 1/1/2024
7.0.0 628 3/27/2020
6.0.0 641 5/22/2019
5.0.0 680 2/8/2019
4.0.0 974 5/31/2018
3.2.0 968 5/24/2018
3.1.0 982 5/8/2018
3.0.0 1,049 2/13/2018
2.0.0 1,067 12/27/2017
1.1.1 1,674 7/14/2017