MhLabs.AwsLambdaSqsRetry 2.0.0

dotnet add package MhLabs.AwsLambdaSqsRetry --version 2.0.0                
NuGet\Install-Package MhLabs.AwsLambdaSqsRetry -Version 2.0.0                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MhLabs.AwsLambdaSqsRetry" Version="2.0.0" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add MhLabs.AwsLambdaSqsRetry --version 2.0.0                
#r "nuget: MhLabs.AwsLambdaSqsRetry, 2.0.0"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install MhLabs.AwsLambdaSqsRetry as a Cake Addin
#addin nuget:?package=MhLabs.AwsLambdaSqsRetry&version=2.0.0

// Install MhLabs.AwsLambdaSqsRetry as a Cake Tool
#tool nuget:?package=MhLabs.AwsLambdaSqsRetry&version=2.0.0                

MhLabs.AwsLambdaSqsRetry

For cases where we want to have multiple subscribers to events and where Kinesis doesn't make sense due to the fixed shard configuration. For sporadic messages / infrequent message bursts, the procing model for SNS is much cheaper and it scales more flexibly. However, Contrary to Kinesis, SNS doesn't provide any retry logic apart from the 3 retries that's default to AWS Lambda. If the third Lambda invokation attemt falis, the message gets lost.

Ideally we'd want an SQS queue to subscribe to an SNS topic and trigger the Lambda function, but so far AWS hasn't provided support for SQS as a Lambda event source.

This lets us consume from an SNS topic and solve the retry issue by specifying an SQS queue as Dead Letter Queue for the Lambda. Any failed Lambda events gets sent to the DLQ. The DLQ can be configured to have a delivery delay, so say we have a DynamoDB table with a low write capacity, but with an auto scaling policy enabled. If a burst of messages comes through, they will start getting rejected due to the sudden high write capacity. All these messages are put on the SQS DLQ with a 5 minute delivery delay.

The SQS DQL is polled every minute by an additional Lambda, which effectively is invoking the same method as the SNS consumer. The five minute delay gives time for the auto scaling to kick in and once the write capacity has been increased the messages in the DLQ are likely to be accepted by DynamoDB. If they fail again, they will go back to SQS where they will be retried. The default retention time in SQS is 4 days.

Lambda code:

  public class SnsProcessor : MessageProcessorBase<SNSEvent, List<Product>>
    {
        private readonly ProductRepository _repo;

        public SnsProcessor() 
        {
            _handler = new ProductRepository();
        }

        // Method triggered by Lambda
        protected override async Task HandleEvent(List<Product> products, ILambdaContext context)
        {
            var records = new List<Product>();
            foreach (var product in products)
            {
                if (product != null) {
                   records.Add(product);
                }
            }
            await _repo.Add(records);
        }

        protected override List<Products> ExtractEventBody(SNSEvent ev)
        {
            return JsonConvert.DeserializeObject<List<Product>>(ev.Records.FirstOrDefault()?.Sns?.Message);
        }
    }

Serverless.template

{
    "AWSTemplateFormatVersion": "2010-09-09",
    "Transform": "AWS::Serverless-2016-10-31",
    "Resources": {
        "SnsConsumer": {
            "Type": "AWS::Serverless::Function",
            "Properties": {
                "Handler": "product_service::product_service.SnsProcessor::Process",
                "Runtime": "dotnetcore1.0",
                "CodeUri": "",
                "MemorySize": 128,
                "Timeout": 30,
                "Role": null,
                "Policies": [
                    "AWSLambdaFullAccess",
                    "AmazonDynamoDBFullAccess"
                ],
                "DeadLetterQueue": {
                    "Type": "SQS",
                    "TargetArn": {
                        "Fn::GetAtt": [
                            "DeadLetterQueue",
                            "Arn"
                        ]
                    }
                },
                "Events": {
                    "PutResource": {
                        "Type": "SNS",
                        "Properties": {
                            "Topic": {
                                "Fn::ImportValue": "product-TopicArn"
                            }
                        }
                    }
                }
            }
        },
        "ProcessRetries": {
            "Type": "AWS::Serverless::Function",
            "Properties": {
                "Handler": "product_service::product_service.SnsProcessor::RetryBatch",
                "Runtime": "dotnetcore1.0",
                "CodeUri": "",
                "MemorySize": 128,
                "Timeout": 30,
                "Role": null,
                "Policies": [
                    "AWSLambdaFullAccess",
                    "AmazonDynamoDBFullAccess",
                    "AmazonSQSFullAccess"
                ],
                "Environment": {
                    "Variables": {
                        "RetryQueueUrl": {
                            "Ref": "DeadLetterQueue"
                        }
                    }
                },
                "Events": {
                    "PutResource": {
                        "Type": "Schedule",
                        "Properties": {
                            "Schedule": "rate(1 minute)"
                        }
                    }
                }
            }
        },       
        "DeadLetterQueue": {
            "Type": "AWS::SQS::Queue",
            "Properties": {
                "DelaySeconds": 300
            }
        }
    },
    "Outputs": {}
}

Flow:

alt text

Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
2.0.0 658 1/22/2023
1.0.7 20,635 9/12/2018
1.0.6 1,072 9/7/2018
1.0.5 13,549 9/1/2017
1.0.4 3,145 8/11/2017
1.0.3 966 8/11/2017
1.0.2 1,024 8/10/2017
1.0.1 969 8/9/2017
1.0.0 959 8/9/2017