Beckett 0.12.2

There is a newer version of this package available.
See the version list below for details.
dotnet add package Beckett --version 0.12.2                
NuGet\Install-Package Beckett -Version 0.12.2                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Beckett" Version="0.12.2" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Beckett --version 0.12.2                
#r "nuget: Beckett, 0.12.2"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Beckett as a Cake Addin
#addin nuget:?package=Beckett&version=0.12.2

// Install Beckett as a Cake Tool
#tool nuget:?package=Beckett&version=0.12.2                

Beckett

Event sourcing is a powerful pattern for building applications but reading and writing events using an event store is only half of the equation. Beckett aims to fill in the gaps:

  • Subscriptions - subscribe to messages and process them in order by stream
    • Projections, read models, event handlers - add asynchronous, event-driven behavior to your applications
    • Horizontal scalability - use auto-scaling to have as many workers as needed processing messages in parallel where the work is distributed automatically across all available nodes without needing to manage the distribution by way of consumer groups or similar mechanisms
    • Retries - built-in retry support for failed messages - since messages are processed in order by stream per subscription, a failed message only blocks a single stream for a subscription at a time and the rest of the streams can continue processing for that subscription
  • Scheduled / recurring messages - schedule messages to be sent at a future time with cancellation support, or create a recurring schedule to send messages at a regular interval using cron expressions for scheduled jobs, etc...
  • Open Telemetry - built-in support to provide tracing and metrics
  • Dashboard - browse messages, retry failed subscription checkpoints
  • Bring Your Own Event Store - Beckett provides a simple Postgres-based message store or use your own by implementing the IMessageStorage interface

Example

We are building a warehouse management system and we need to allocate inventory to orders. The requirements are that allocation occurs when an item is added to an order:

using Beckett;
using Beckett.Database;

var builder = Host.CreateApplicationBuilder(args);

var connectionString = builder.Configuration.GetConnectionString("InventoryAllocation")!;

//ensure the Beckett database schema is up to date
await BeckettDatabase.Migrate(connectionString);

//configure the data source with support for Beckett
builder.Services.AddNpgsqlDataSource(connectionString, options => options.AddBeckett());

//register the subscription handler in the container
builder.Services.AddTransient<OrderItemAddedHandler>();

//add Beckett support to the host for the InventoryAllocation subscription group
var beckett = builder.AddBeckett(
    options => { options.WithSubscriptionGroup("InventoryAllocation"); }
);

//map message types
beckett.Map<OrderItemAdded>("order_item_added");
beckett.Map<InventoryAllocated>("inventory_allocated");

//add subscription handler
beckett.AddSubscription("order-item-inventory-allocation")
    .Message<OrderItemAdded>()
    .Handler<OrderItemAddedHandler>((handler, message, token) => handler.Handle(message, token));

var host = builder.Build();

host.Run();

public record OrderItemAdded(Guid OrderId, Guid ProductId, int Quantity);

public record InventoryAllocated(Guid ProductId, Guid OrderId, int Quantity);

public class OrderItemAddedHandler(IMessageStore messageStore)
{
    public async Task Handle(OrderItemAdded message, CancellationToken cancellationToken)
    {
        await messageStore.AppendToStream(
            $"inventory-{message.ProductId}",
            ExpectedVersion.Any,
            new InventoryAllocated(message.ProductId, message.OrderId, message.Quantity),
            cancellationToken
        );
    }
}

In this example application we are handling the OrderItemAdded event with the OrderItemAddedHandler class. The host has been configured to use the InventoryAllocation subscription group, and there can be as many instances of this host running as necessary and the work will be divided among them automatically allowing you to take advantage of auto scaling without limits. The handler will receive all OrderItemAdded messages written to the message store since it is subscribed to that type in the AddSubscription call. When a message is received it is dispatched to the handler which then writes an InventoryAllocated event to an Inventory stream to track allocated product inventory.

One of the guiding design principles of Beckett is keeping a minimal footprint - there should be as few references to Beckett-provided types in application code as possible. Subscription handlers are registered as inline delegates that can refer to handler instances that are resolved from the container or static functions. The only type from Beckett used in the application code in this sample is IMessageStore which itself is optional if you're using your own message store.

The call to BeckettDatabase.Migrate in the example is applying any outstanding migrations to the database that are required by Beckett. If you wish to run the migrations separately using Flyway or similar tools then you can use the dump-migrations shell script supplied in the root of the directory to create a single SQL file:

./dump-migrations.sh beckett 001.sql

In this case beckett is the schema you'd like to use in your database for the tables, functions, and types that Beckett uses and 001.sql is the path of the file you'd like to create.

How It Works

sequenceDiagram
    participant Message Store
    participant Global Stream Consumer
    participant Subscription Checkpoints
    participant Global Checkpoint
    participant Checkpoint Consumer
    participant Subscription Handler
    Global Stream Consumer->>Message Store: poll for new messages
    Message Store-->>Global Stream Consumer: message batch
    Global Stream Consumer->>Subscription Checkpoints: create or update checkpoints
    Global Stream Consumer->>Global Checkpoint: update global checkpoint
    Checkpoint Consumer->>Subscription Checkpoints: poll for available checkpoints
    Subscription Checkpoints-->>Checkpoint Consumer: checkpoints to process
    Checkpoint Consumer->>Message Store: read stream at checkpoint position
    Message Store-->>Checkpoint Consumer: stream messages
    Checkpoint Consumer->>Subscription Handler: dispatch messages to handler
    alt is Error
        Checkpoint Consumer->>Subscription Checkpoints: mark checkpoint as pending retry
    else is Success
        Checkpoint Consumer->>Subscription Checkpoints: update checkpoint to latest processed position
    end
  • Subscriptions subscribe to stream categories and/or message types
  • Subscriptions belong to subscription groups - InventoryAllocation
  • Each host is assigned to a subscription group - options.WithSubscriptionGroup("InventoryAllocation")
  • Global checkpoints keep track of what messages Beckett has read from the message store per subscription group
  • The global stream consumer reads new messages from the message store based on the global checkpoint
  • Using the subscription configuration registered in the host it determines what checkpoints to create or update in terms of work to be done per subscription
  • Checkpoints are per group + subscription + stream
  • Checkpoints track the stream version - the current version of the stream being tracked - as well as the stream position, which is the position in the stream that Beckett has processed up to
  • If the stream version of a checkpoint is greater than the stream position it is considered to be "lagging"
  • The checkpoint consumer watches for lagging checkpoints - once it sees one it attempts to reserve it so it can be processed
  • Checkpoint reservations last a configurable amount of time (default 5 minutes) and if they are not processed before then (process killed, etc...) then the reservations can expire at which point they can be recovered and processed again
  • Checkpoint expired reservation recovery runs every minute by default
  • Checkpoints are processed by handling the messages in stream order, dispatching them to the handler for the subscription one at a time
  • Checkpoints are processed in parallel, with the allowed concurrency configured via options.Subscriptions.Concurrency (defaults to the number of CPUs in the host * 5 or 20, whichever number is smaller)
  • If a subscription handler encounters an error processing a message the checkpoint will be retried in a separate process up to a max number of retries using exponential backoff, at which point it changes to the failed status
  • Once a checkpoint is failed it can be manually retried at any point in time
  • If a retry succeeds at any point the checkpoint becomes active again and can process new messages

Dashboard

Beckett comes batteries-included with a dashboard to provide visibility into your system while it's running, retry failed checkpoints, and so on:

<img width="1575" alt="Beckett Dashboard" src="https://github.com/user-attachments/assets/0dc5a445-111b-4552-a639-36b37779d094">

Adding the Beckett dashboard to an ASP.NET Core application is simple:

var builder = WebApplication.CreateBuilder(args);

var app = builder.Build();

app.MapBeckettDashboard("/beckett");

app.Run();

In this example, the dashboard will be available at http://localhost:<port>/beckett and can be further configured using standard ASP.NET Core route group configuration options - authorization, etc...

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Beckett:

Package Downloads
Beckett.Dashboard

Messaging and event sourcing library

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.13.1 349 11/15/2024
0.13.0 91 11/8/2024
0.12.6 446 11/1/2024
0.12.5 125 10/31/2024
0.12.4 179 10/30/2024
0.12.3 115 10/29/2024
0.12.2 125 10/29/2024
0.12.1 341 10/28/2024
0.12.0 100 10/28/2024
0.11.12 129 10/25/2024
0.11.11 205 10/22/2024
0.11.10 334 10/18/2024
0.11.9 142 10/18/2024
0.11.8 148 10/18/2024
0.11.7 141 10/18/2024
0.11.6 162 10/17/2024
0.11.5 101 10/17/2024
0.11.4 95 10/17/2024
0.11.3 94 10/17/2024
0.11.2 229 10/13/2024
0.11.1 127 10/12/2024
0.11.0 129 10/11/2024
0.10.4 100 10/8/2024
0.10.3 109 10/7/2024
0.10.2 79 10/7/2024
0.10.1 80 10/7/2024
0.10.0 81 10/7/2024
0.9.17 1,725 9/14/2024
0.9.16 129 9/14/2024
0.9.15 212 9/13/2024
0.9.14 112 9/13/2024
0.9.13 287 9/7/2024
0.9.12 102 9/6/2024
0.9.11 268 9/5/2024
0.9.10 351 9/4/2024
0.9.9 396 8/29/2024
0.9.8 93 8/29/2024
0.9.7 228 8/28/2024
0.9.6 193 8/27/2024
0.9.5 100 8/27/2024
0.9.4 566 8/16/2024
0.9.3 584 8/14/2024
0.9.2 402 8/12/2024
0.9.1 264 8/8/2024
0.9.0 297 8/5/2024
0.8.13 61 8/2/2024
0.8.12 62 8/2/2024
0.8.11 433 7/26/2024
0.8.10 102 7/25/2024
0.8.9 102 7/25/2024
0.8.8 284 7/21/2024
0.8.7 140 7/19/2024
0.8.6 89 7/19/2024
0.8.5 184 7/16/2024
0.8.4 113 7/16/2024
0.8.3 103 7/16/2024
0.8.2 189 7/13/2024
0.8.1 100 7/13/2024
0.8.0 94 7/13/2024
0.7.9 115 7/12/2024
0.7.8 99 7/12/2024
0.7.7 115 7/11/2024
0.7.6 103 7/10/2024
0.7.5 94 7/10/2024
0.7.4 120 7/10/2024
0.7.3 104 7/10/2024
0.7.2 94 7/9/2024
0.7.1 101 7/9/2024
0.7.0 101 7/9/2024