Asos.ServiceBus.MessageSiphon 1.0.19

Prefix Reserved
There is a newer version of this package available.
See the version list below for details.
dotnet tool install --global Asos.ServiceBus.MessageSiphon --version 1.0.19                
This package contains a .NET tool you can call from the shell/command line.
dotnet new tool-manifest # if you are setting up this repo
dotnet tool install --local Asos.ServiceBus.MessageSiphon --version 1.0.19                
This package contains a .NET tool you can call from the shell/command line.
#tool dotnet:?package=Asos.ServiceBus.MessageSiphon&version=1.0.19                
nuke :add-package Asos.ServiceBus.MessageSiphon --version 1.0.19                

azure-servicebus-message-siphon

A dotnet tool for message siphon scenarios when working with Azure Service Bus, allowing you to purge and replay messages, within a single namespace or across namespaces

What's it for?

You may have cases such as

  • Messages have dead lettered and you want to replay them back onto the main queue\topic
  • You want to clone messages from one entity to another in the same namespace
  • You want to clone and remove messages from one namespace to another
  • You want to purge messages, deleting from a queue, topic or dead-letter sub entity.
  • You want to use connection strings, RBAC or both for the source and targets. (i.e. connection string for source, RBAC for target)

How does it work?

A configuration file is used that determines the set of work that will be performed. The configuration file is made up of service bus connection details, and a list of siphon work that needs to be performed. The siphon work then references the connections by Name

As the consumer of this package, you can define some standard work you'd like to perform as a set of configuration files, then use them to execute the work against one or many service bus namespaces - just provide the appropriate ConnectionString or FullyQualifiedNamespace at runtime

Configuration details

ReplayMessagesJob - A replay messages just has the following

  • JobType - the type of work to perform. Can be one of DeadLettersSameEntity, DeadLettersToDifferentEntity or SourceToTarget
    • DeadLettersSameEntity - replays messages from the sub-queue back onto the parent entity
    • DeadLettersToDifferentEntity - replays messages from the entity sub-queue, to a different Target entity
    • SourceToTarget - siphons messages from any source to any target entity
    • PurgeSource - purges messages from the source entity, a delete operation
    • PurgeDeadLetters - as per PurgeSource but for dead letter entities
  • JobName - Give the work a useful name, used in log messages etc
  • NumberOfConcurrentProcesses - When performing a siphon job, copying messages from source to target, messages are received in batches as per the SourceBatchReceiveSize setting. Once messages are received, they need to be sent to the target and NumberOfConcurrentProcesses controls how many concurrent operations attempt to work through the batch. E.g. if your batch size is 100 and you set NumberOfConcurrentProcesses to 10, then 10 operations will each attempt to process 10 messages in parallel. Experiment with these settings to find the best combination to achieve high throughput.

ServiceBusDetails - a list of service bus connections. A ServiceBusDetail instance has the following

  • Name - give the item a useful name, you'll use this to refer to it from any siphon work you define
  • ConnectionMode - how to connect to the service bus, can be one of ConnectionString or Rbac
  • ConnectionString - if ConnectionMode is ConnectionString, then define the SAS connection string to use
  • FullyQualifiedNamespace = if ConnectionMode is Rbac, then define the fully qualified namespace, e.g. namespace.servicebus.windows.net

SiphonWork - a list of work to perform, Siphon work requires the following

  • SiphonMode - the type of operation to perform. Can be Clone or CloneAndDelete.

    • When Clone, the worker will use PeekMessagesAsync so not to increment the delivery count of messages, and the message will not be received or completed. A copy of the message is created
    • When CloneAndDelete, the worker will use ReceiveMessagesAsync, and will attempt to complete the message once the send operation to target has completed OK
  • SourceConnectionName - the Name of the ServiceBusDetails to use as the source

  • SourceEntity - the name of the entity, can be either a queue or a topic

  • SourceSubscriptions - if SourceEntity is a topic, then define a list of subscription names for the Topic

  • TargetConnectionName - the Name of the ServiceBusDetails to use as the target

  • TargetEntity - the name of the entity to send to, can be either a queue or a topic

  • SourceBatchReceiveSize - the size of the batch to recieve messages from the source. Defaults to 20.

  • MessagesOlderThan - An ISO8601 date span, as per the duration specification. E.g. PT72H (72 hours), P30D (30 days). When performing a purge, only messages that have an older enqueue time than this value are considered, allowing you to perform work such as purge messages older than 72 hours

Example configurations

Replaying service bus messages from dead letters on the subscriptions

In this configuration, we only need to define a single service bus connection, since we're replaying onto the same namespace/entity.

{
    "Logging": {
        "LogLevel": {
            "Default": "Information"
        }
    },    
    "ReplayMessagesJob": {
        "JobType": "DeadLettersSameEntity",
        "JobName": "Replay-Messages-From-DeadLetter-Subqueue",
        "NumberOfConcurrentProcesses": 1,
        "ServiceBusDetails": [
            {
                "Name": "Source",
                "ConnectionMode": "ConnectionString",
                "ConnectionString": "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-sas-key"
            }
        ],
        "SiphonWork": [
            {
                "SiphonMode": "CloneAndDelete",
                "SourceConnectionName": "Source",
                "SourceEntity": "snapshot",
                "SourceSubscriptions": [ "test", "test1" ]
            }
        ]
    }
}

Cloning messages from a subscription in one namespace using a connection string, to another namespace using RBAC.

In this example, messages will be receieved in batches of 40, and 5 concurrent tasks will be used to to send them to the target in parallel (i.e. each task processes 8 messages each)

{
    "Logging": {
        "LogLevel": {
        }
    },    
    "ReplayMessagesJob": {
        "JobType": "SourceToTarget",
        "JobName": "Local Debug",
        "NumberOfConcurrentProcesses": 5,
        "ServiceBusDetails": [
            {
                "Name": "Source",
                "ConnectionMode": "ConnectionString",
                "ConnectionString": "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=access-key"
            },
            {
                "Name": "Target",
                "ConnectionMode": "Rbac",
                "FullyQualifiedNamespace": "namespace1.servicebus.windows.net"
            }
        ],
        "SiphonWork": [
            {
                "SiphonMode": "Clone",
                "SourceConnectionName": "Source",
                "SourceEntity": "topic-entity",
                "SourceSubscriptions": [ "test-subscription" ],
		"SourceBatchReceiveSize" : 40,
                "TargetConnectionName": "Target",
                "TargetEntity": "copy-of-topic"
            }
        ]
    }
}

Using RBAC, receive messages from a dead letter subscription in batches of 100 and purge messsages that are older than 1 hour

{
    "Logging": {
        "LogLevel": {
            "Default": "Information"
        }
    },   
    "ReplayMessagesJob": {
        "JobType": "PurgeDeadLetters",
        "JobName": "Purge Debug",
        "ServiceBusDetails": [
            {
                "Name": "Source",
                "ConnectionMode": "Rbac",
                "FullyQualifiedNamespace": "namespace.servicebus.windows.net"
            }
        ],
        "SiphonWork": [
            {
                "SiphonMode": "Delete",
                "SourceConnectionName": "Source",
                "SourceEntity": "demo",
                "SourceSubscriptions": [ "test" ],
                "SourceBatchReceiveSize" : 100,
                "MessagesOlderThan" : "PT1H"
            }
        ]
    }
}

How to use?

The package is available as a dotnet tool, so you can install from nuget.org by running the following command

dotnet tool update --global Asos.ServiceBus.MessageSiphon ---version x.x.x

Once installed, you have a command available to you siphon-asb-messages. This requires a single parameter, the path to the configuration file, e.g.

siphon-asb-messages -n D:\temp\replay-config.json
siphon-asb-messages -n usr/tmp/replay-config.json

Azure Pipelines

Since this is a dotnet tool, you can run it from your Azure Pipeline. This lets you take advantage of Managed Identity from Service Connections, and interact with namespaces that may be IP Restricted by running the work from a build pool that is allow-listed on the namespace.

This gives you a way to allow supporting team members to run jobs, replaying or purging messages, in a secure and repeatable manner & without requring any particular tools or permissions.

For example, another team may have a dependency on a service bus that you own and maintain. The namespace is IP rescricted and access is via RBAC, they need to replay some dead letters on a topic-subscription. You can define an Azure pipeline to do this work and define a particular build pool with a known IP, then grant their team members permissions to run the pipeline. In this way, the consuming team can self-serve without sharing connection strings or having to use Bastion services

To use this, you'd need to define a configuration file in your source repository. Note, we use a variable placeholder here for $ServiceBusConnectionString

{
  "Logging": {
    "LogLevel": {
      "Default": "Information"
    }
  },  
  "ReplayMessagesJob": {
    "JobType": "DeadLettersSameEntity",
    "JobName": "Dead Letters Replay",
    "ServiceBusDetails": [
      {
        "Name": "Source",
        "ConnectionMode": "ConnectionString",
        "FullyQualifiedNamespace": "$(ServiceBusConnectionString)"
      }
    ],
    "SiphonWork": [
      {
        "SiphonMode": "CloneAndDelete",
        "SourceConnectionName": "Source",
        "SourceEntity": "some-topic-orders",
        "SourceSubscriptions": [
          "the-subscription-name"
        ]
      }
    ]
  }
}

An example pipeline might look like so. In this example, we support either RBAC using a fully qualified namespace, or connection strings by retrieving a secret from a vault. Replace Tokens will update the config file with the appropriate values

parameters:
  - name: StageName
    displayName: Service Bus Connection String Secret
    type: string
    default: "ReplayPurgeMessages"

  - name: AzureSubscription
    displayName: The name of the service connection that's used to connect to Azure
    type: string

  - name: ConfigPath
    displayName: Path to config
    type: string

  - name: ConfigFileName
    displayName: Message replay config
    type: string

  - name: Environment
    displayName: Environment
    type: string

  - name: ServiceBusFullyQualifiedName
    displayName: ServiceBusFullyQualifiedName
    type: string
    default: ""
  
  - name: KeyVaultSubscriptionName
    displayName: Name of the subscription that contains the key vault, if using it to retrieve connection strings
    type: string
    default: ""
  
  - name: KeyVaultName
    displayName: Key Vault Name
    type: string
    default: ""
  
  - name: SecretName
    displayName: Service Bus Connection String Secret
    type: string
    default: ""

stages:
  - stage: ${{parameters.StageName}}
    displayName: ${{parameters.StageName}}

    pool:
      vmImage: ubuntu-latest

    jobs: 
      - job:
        steps:
          - checkout: self

          - task: AzureCLI@2
            displayName: Get Service Bus Connection String From Vault
            condition: and(succeeded(), ne('${{ parameters.KeyVaultName }}', ''))
            inputs:
              azureSubscription: ${{parameters.AzureSubscription}}
              scriptType: pscore
              scriptLocation: inlineScript
              inlineScript: |
                $subscription = ${{parameters.KeyVaultSubscriptionName}}
                $connectionString = az keyvault secret show --name ${{parameters.SecretName}} --subscription $subscription --vault-name ${{parameters.KeyVaultName}} --query value -o tsv                
                echo "##vso[task.setvariable variable=ServiceBusConnectionString]$connectionString"

          - task: AzureCLI@2
            displayName: Set variable to fully qualified namespace parameter
            condition: and(succeeded(), eq('${{ parameters.KeyVaultName }}', ''))
            inputs:
              azureSubscription: ${{parameters.AzureSubscription}}
              scriptType: pscore
              scriptLocation: inlineScript
              inlineScript: |
                  $namespaceName = ${{parameters.ServiceBusFullyQualifiedName}}
                  echo "Connecting to namespace " + $namespaceName
                  echo "##vso[task.setvariable variable=ServiceBusFullyQualifiedName]$namespaceName"

          - task: replacetokens@4
            displayName: Replace Tokens
            inputs:
              targetFiles: '${{parameters.ConfigPath}}/*'
              encoding: 'auto'
              tokenPattern: 'azpipelines'
              writeBOM: true
              actionOnMissing: 'warn'
              keepToken: false
              actionOnNoFiles: 'continue'
              enableTransforms: false
              useLegacyPattern: false
              enableTelemetry: true

          - task: AzureCLI@2
            displayName: Perform Message Work
            inputs:
              azureSubscription: ${{parameters.AzureSubscription}}
              scriptType: pscore
              scriptLocation: inlineScript
              inlineScript: |
                dotnet tool update Asos.ServiceBus.MessageSiphon --tool-path . --version 1.0.0
                & "./siphon-asb-messages" -n ${{parameters.ConfigPath}}/${{parameters.ConfigFileName}}
Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

This package has no dependencies.

Version Downloads Last updated
1.2.32 117 7/8/2024
1.2.31 104 7/1/2024
1.2.30 96 6/24/2024
1.2.29 81 6/24/2024
1.2.28 113 6/17/2024
1.2.27 87 6/12/2024
1.2.26 90 6/10/2024
1.2.25 77 6/4/2024
1.2.24 98 6/3/2024
1.2.23 95 5/28/2024
1.2.22 97 5/20/2024
1.2.21 112 5/20/2024
1.2.20 67 5/13/2024
1.2.19 83 5/13/2024
1.2.18 133 5/6/2024
1.2.17 118 4/29/2024
1.2.16 111 4/22/2024
1.2.15 88 4/22/2024
1.2.14 147 4/17/2024
1.2.12 198 4/12/2024
1.1.35 228 1/30/2024
1.1.34 208 1/26/2024
1.1.33 187 1/8/2024
1.1.26 230 12/6/2023
1.1.24 138 12/4/2023
1.1.23 141 12/4/2023
1.1.22 166 12/4/2023
1.1.21 173 12/4/2023
1.1.20 159 11/27/2023
1.1.19 181 11/20/2023
1.1.18 194 11/13/2023
1.1.17 246 11/6/2023
1.1.16 212 10/23/2023
1.1.15 259 10/17/2023
1.1.14 244 10/17/2023
1.1.13 244 10/16/2023
1.1.12 223 10/16/2023
1.1.11 210 10/2/2023
1.1.10 253 9/25/2023
1.1.9 238 9/14/2023
1.1.8 175 6/28/2023
1.1.7 294 2/16/2023
1.1.6 243 2/13/2023
1.1.5 233 2/7/2023
1.1.4 275 2/2/2023
1.1.2 311 12/13/2022
1.1.1 290 12/13/2022
1.1.0 315 12/13/2022
1.0.24 303 12/13/2022
1.0.23 298 12/12/2022
1.0.22 301 12/9/2022
1.0.21 426 8/5/2022
1.0.21-pr0026-0011 271 8/4/2022
1.0.20 430 8/3/2022
1.0.20-pr0024-0004 260 8/2/2022
1.0.20-ci-message-selec0003 299 8/2/2022
1.0.19 497 1/24/2022