BbQ.Events 1.0.13

dotnet add package BbQ.Events --version 1.0.13
                    
NuGet\Install-Package BbQ.Events -Version 1.0.13
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="BbQ.Events" Version="1.0.13" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="BbQ.Events" Version="1.0.13" />
                    
Directory.Packages.props
<PackageReference Include="BbQ.Events" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add BbQ.Events --version 1.0.13
                    
#r "nuget: BbQ.Events, 1.0.13"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package BbQ.Events@1.0.13
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=BbQ.Events&version=1.0.13
                    
Install as a Cake Addin
#tool nuget:?package=BbQ.Events&version=1.0.13
                    
Install as a Cake Tool

BbQ.Events

Event-driven architecture support with strongly-typed pub/sub and projections for BbQ libraries.

✨ Features

  • Type-safe event publishing with IEventPublisher
  • Event handlers (IEventHandler<TEvent>) for processing events one-by-one
  • Event subscribers (IEventSubscriber<TEvent>) for consuming event streams
  • Projection support for building read models and materialized views
  • Partitioned projections for parallel event processing
  • Backpressure & flow control - bounded channels with configurable strategies (Block, DropNewest, DropOldest)
  • Projection monitoring via IProjectionMonitor for observability (events/sec, lag, worker count, checkpoints, queue depth)
  • Projection replay API via IProjectionRebuilder for resetting and rebuilding projections
  • Advanced replay service via IReplayService for controlled replay with event ranges, dry runs, and checkpoint strategies
  • In-memory event bus for single-process applications
  • Thread-safe implementation using System.Threading.Channels
  • Storage-agnostic design - extend for distributed scenarios
  • Source generator support - automatic handler/subscriber/projection discovery via BbQ.Events.SourceGenerators

📦 Installation

dotnet add package BbQ.Events

🚀 Quick Start

1. Register the Event Bus

using BbQ.Events.DependencyInjection;

services.AddInMemoryEventBus();

2. Publish Events

using BbQ.Events;

public class CreateUserHandler
{
    private readonly IEventPublisher _publisher;

    public CreateUserHandler(IEventPublisher publisher)
    {
        _publisher = publisher;
    }

    public async Task Handle(CreateUserCommand command)
    {
        // Domain logic...
        var user = new User(command.Id, command.Name);

        // Publish event
        await _publisher.Publish(new UserCreated(user.Id, user.Name));
    }
}

3. Handle Events

public class SendWelcomeEmailHandler : IEventHandler<UserCreated>
{
    public Task Handle(UserCreated @event, CancellationToken ct)
    {
        Console.WriteLine($"Sending welcome email to {@event.Name}");
        return Task.CompletedTask;
    }
}

// Register manually
services.AddScoped<IEventHandler<UserCreated>, SendWelcomeEmailHandler>();

4. Subscribe to Event Streams

public class UserAnalyticsSubscriber : IEventSubscriber<UserCreated>
{
    private readonly IEventBus _eventBus;

    public UserAnalyticsSubscriber(IEventBus eventBus)
    {
        _eventBus = eventBus;
    }

    public IAsyncEnumerable<UserCreated> Subscribe(CancellationToken ct)
        => _eventBus.Subscribe<UserCreated>(ct);
}

// Consume the stream
await foreach (var evt in subscriber.Subscribe(cancellationToken))
{
    // Process event
}

🎯 Projections (NEW)

Projections transform events into queryable read models for event-sourced systems.

Define a Projection

[Projection]
public class UserProfileProjection :
    IProjectionHandler<UserCreated>,
    IProjectionHandler<UserUpdated>
{
    private readonly IUserRepository _repository;
    
    public UserProfileProjection(IUserRepository repository)
    {
        _repository = repository;
    }
    
    public async ValueTask ProjectAsync(UserCreated evt, CancellationToken ct)
    {
        var profile = new UserProfile(evt.UserId, evt.Name, evt.Email);
        await _repository.UpsertAsync(profile, ct);
    }
    
    public async ValueTask ProjectAsync(UserUpdated evt, CancellationToken ct)
    {
        var profile = await _repository.GetAsync(evt.UserId, ct);
        if (profile != null)
        {
            profile.Name = evt.Name;
            await _repository.UpsertAsync(profile, ct);
        }
    }
}

Register and Run Projections

// Register event bus and projections
services.AddInMemoryEventBus();
services.AddProjectionsFromAssembly(typeof(Program).Assembly);
services.AddProjectionEngine();

// Run projection engine (as hosted service or manually)
var engine = serviceProvider.GetRequiredService<IProjectionEngine>();
await engine.RunAsync(cancellationToken);

Partitioned Projections

Use IPartitionedProjectionHandler<TEvent> to specify partition keys for ordering guarantees:

[Projection]
public class UserStatisticsProjection : IPartitionedProjectionHandler<UserActivity>
{
    public string GetPartitionKey(UserActivity evt) => evt.UserId.ToString();
    
    public async ValueTask ProjectAsync(UserActivity evt, CancellationToken ct)
    {
        // Process event - partition key can be used for custom parallelization
    }
}

Error Handling & Retry Policies (NEW)

Configure how projections handle errors during event processing:

services.AddProjection<UserProfileProjection>(options =>
{
    // Configure error handling strategy
    options.ErrorHandling.Strategy = ProjectionErrorHandlingStrategy.Retry;
    options.ErrorHandling.MaxRetryAttempts = 3;
    options.ErrorHandling.InitialRetryDelayMs = 1000; // 1 second
    options.ErrorHandling.MaxRetryDelayMs = 30000;    // 30 seconds (cap)
    options.ErrorHandling.FallbackStrategy = ProjectionErrorHandlingStrategy.Skip;
});

Available Strategies:

  1. Retry (Default) - Retries with exponential backoff for transient failures

    • Attempts: MaxRetryAttempts (default: 3)
    • Delay: Starts at InitialRetryDelayMs (default: 1000ms), doubles each retry up to MaxRetryDelayMs (default: 30000ms)
    • After exhausting retries, uses FallbackStrategy (default: Skip)
  2. Skip - Logs error and continues processing (event is marked as processed)

    • Best for non-critical events or when availability is more important than consistency
    • Event is checkpointed and won't be reprocessed
  3. Stop - Halts the projection worker for manual intervention

    • Best when data consistency is critical
    • Worker must be manually restarted

Examples:

// Retry transient failures, skip after exhaustion
services.AddProjection<UserProfileProjection>(options =>
{
    options.ErrorHandling.Strategy = ProjectionErrorHandlingStrategy.Retry;
    options.ErrorHandling.MaxRetryAttempts = 5;
    options.ErrorHandling.FallbackStrategy = ProjectionErrorHandlingStrategy.Skip;
});

// Skip failed events immediately
services.AddProjection<AnalyticsProjection>(options =>
{
    options.ErrorHandling.Strategy = ProjectionErrorHandlingStrategy.Skip;
});

// Stop on any error for critical projections
services.AddProjection<FinancialProjection>(options =>
{
    options.ErrorHandling.Strategy = ProjectionErrorHandlingStrategy.Stop;
});

Structured Logging:

All error handling strategies provide structured logging with event details:

  • Event type and handler information
  • Attempt count and retry delays
  • Error messages and stack traces
  • Projection and partition context
// Example log output
// [Warning] Error processing event for UserProjection:user-123 at position 456. 
//           Attempt 2 of 3. Retrying in 2000ms
// [Error] Skipping failed event for AnalyticsProjection:_default at position 789. 
//         Event type: UserActivity, Handler: UserActivityProjection
// [Critical] Stopping projection worker for FinancialProjection:_default at position 1011. 
//            Manual intervention required.

Backpressure & Flow Control (NEW)

Prevent unbounded memory growth when event ingestion outpaces projection processing:

services.AddProjection<UserProfileProjection>(options =>
{
    // Configure channel capacity (default: 1000)
    options.ChannelCapacity = 500;
    
    // Configure backpressure strategy (default: Block)
    options.BackpressureStrategy = BackpressureStrategy.Block;
});

Available Strategies:

  1. Block (Default) - Applies backpressure to event publishers

    • Most reliable: no events are dropped
    • May slow down event ingestion when queue reaches capacity
    • Best for critical projections where data loss is unacceptable
  2. DropNewest - Drops incoming events when queue is full

    • Preserves older events in the queue
    • Useful for debugging scenarios
    • ⚠️ Recommended for debugging only - may cause data loss
  3. DropOldest - Drops oldest queued events when full

    • Always processes most recent events
    • Good for real-time dashboards and monitoring
    • ⚠️ May skip important state transitions

Examples:

// Critical projection - block on backpressure
services.AddProjection<FinancialProjection>(options =>
{
    options.ChannelCapacity = 100;
    options.BackpressureStrategy = BackpressureStrategy.Block;
});

// Real-time dashboard - prefer recent events
services.AddProjection<DashboardProjection>(options =>
{
    options.ChannelCapacity = 50;
    options.BackpressureStrategy = BackpressureStrategy.DropOldest;
});

// High-throughput projection - large buffer
services.AddProjection<AnalyticsProjection>(options =>
{
    options.ChannelCapacity = 5000;
    options.BackpressureStrategy = BackpressureStrategy.Block;
});

Monitoring Queue Depth:

The projection monitor tracks queue depth and dropped events:

var monitor = serviceProvider.GetRequiredService<IProjectionMonitor>();
var metrics = monitor.GetMetrics("UserProjection", "_default");

Console.WriteLine($"Queue Depth: {metrics.QueueDepth}");
Console.WriteLine($"Events Dropped: {metrics.EventsDropped}");
Console.WriteLine($"Events Processed: {metrics.EventsProcessed}");

Best Practices:

  • Start with default settings (1000 capacity, Block strategy)
  • Monitor queue depth metrics to detect backpressure
  • Tune ChannelCapacity based on ingestion rate and processing speed
  • Use MaxDegreeOfParallelism to increase throughput for partitioned projections
  • Consider Block strategy for critical projections
  • Use Drop strategies only when data loss is acceptable

Note: The default projection engine processes events sequentially. Implement a custom IProjectionEngine to leverage partition keys for parallel processing.

📖 See PROJECTION_SAMPLE.md for complete examples and best practices.

🔗 Automatic Handler Registration

Event handlers, subscribers, and projections are automatically discovered by the BbQ.Events source generator:

services.AddInMemoryEventBus();
services.AddYourAssemblyNameEventHandlers();  // Auto-discovers event handlers/subscribers
services.AddYourAssemblyNameProjections();    // Auto-discovers projections

🔗 Integration with BbQ.Cqrs (Optional)

BbQ.Events works standalone, but can be easily integrated with BbQ.Cqrs for complete event-driven CQRS:

// Command handler that publishes events
public class CreateUserCommandHandler : IRequestHandler<CreateUser, Outcome<User>>
{
    private readonly IEventPublisher _publisher;

    public async Task<Outcome<User>> Handle(CreateUser command, CancellationToken ct)
    {
        var user = new User(command.Id, command.Name);
        
        // Publish event after state change
        await _publisher.Publish(new UserCreated(user.Id, user.Name), ct);
        
        return Outcome<User>.From(user);
    }
}

🏗️ Architecture

In-Memory Event Bus

The default InMemoryEventBus implementation:

  • Uses System.Threading.Channels for thread-safe pub/sub
  • Supports multiple concurrent handlers per event type
  • Supports multiple concurrent subscribers per event type
  • Handlers are executed and awaited before publish completes
  • Slow subscribers don't block publishers (drops oldest messages)
  • Suitable for single-process applications

Projection Engine

The default projection engine:

  • Subscribes to live event streams and dispatches to projection handlers
  • Processes events sequentially as they arrive
  • Provides checkpoint storage infrastructure (IProjectionCheckpointStore)
  • Handles errors gracefully and continues processing
  • Can be extended for batch processing, parallel processing, and automatic checkpointing
  • Tracks metrics and health via IProjectionMonitor (events/sec, lag, worker count, checkpoints)

Projection Monitoring

The projection monitoring system tracks:

  • Events processed per second - throughput metrics
  • Per-partition lag - how far behind the projection is
  • Active worker count - number of concurrent workers
  • Checkpoint frequency - how often checkpoints are written
  • Queue depth - number of events waiting to be processed (NEW)
  • Events dropped - total events dropped due to backpressure (NEW)

Example usage:

// Monitor is automatically registered with AddProjectionEngine()
services.AddProjectionEngine();

// Query metrics at runtime
var monitor = serviceProvider.GetRequiredService<IProjectionMonitor>();
var metrics = monitor.GetMetrics("UserProjection", "_default");

if (metrics != null)
{
    Console.WriteLine($"Lag: {metrics.Lag} events");
    Console.WriteLine($"Throughput: {metrics.EventsPerSecond:F2} events/sec");
    Console.WriteLine($"Workers: {metrics.WorkerCount}");
    Console.WriteLine($"Checkpoints written: {metrics.CheckpointsWritten}");
    Console.WriteLine($"Queue depth: {metrics.QueueDepth}");
    Console.WriteLine($"Events dropped: {metrics.EventsDropped}");
}

For production monitoring, implement a custom IProjectionMonitor:

public class PrometheusProjectionMonitor : IProjectionMonitor
{
    private readonly Counter _eventsProcessed = Metrics.CreateCounter(
        "projection_events_processed_total", 
        "Total events processed by projection",
        new CounterConfiguration { LabelNames = new[] { "projection", "partition" } });
    
    private readonly Gauge _lag = Metrics.CreateGauge(
        "projection_lag", 
        "Lag between current and latest position",
        new GaugeConfiguration { LabelNames = new[] { "projection", "partition" } });
    
    public void RecordEventProcessed(string projectionName, string partitionKey, long currentPosition)
    {
        _eventsProcessed.WithLabels(projectionName, partitionKey).Inc();
    }
    
    public void RecordLag(string projectionName, string partitionKey, long currentPosition, long? latestPosition)
    {
        if (latestPosition.HasValue)
        {
            var lag = Math.Max(0, latestPosition.Value - currentPosition);
            _lag.WithLabels(projectionName, partitionKey).Set(lag);
        }
    }
    
    // Implement other methods...
}

// Register custom monitor
services.AddSingleton<IProjectionMonitor, PrometheusProjectionMonitor>();
services.AddProjectionEngine();

Projection Replay API

The projection rebuilder provides APIs to reset projection checkpoints and rebuild projections from scratch. This is useful for:

  • Rebuilding projections after schema changes
  • Recovering from corrupted projection state
  • Testing projection logic
  • Migrating to new projection implementations

The IProjectionRebuilder is automatically registered when you call AddProjectionEngine().

Reset All Projections
var rebuilder = serviceProvider.GetRequiredService<IProjectionRebuilder>();

// Reset all registered projections
await rebuilder.ResetAllProjectionsAsync(cancellationToken);

// Restart projection engine to begin rebuild
await engine.RunAsync(cancellationToken);
Reset a Single Projection
var rebuilder = serviceProvider.GetRequiredService<IProjectionRebuilder>();

// Reset a specific projection
await rebuilder.ResetProjectionAsync("UserProfileProjection", cancellationToken);

// Restart projection engine to begin rebuild
await engine.RunAsync(cancellationToken);
Reset a Single Partition

For partitioned projections, you can reset individual partitions without affecting others:

var rebuilder = serviceProvider.GetRequiredService<IProjectionRebuilder>();

// Reset a specific partition
await rebuilder.ResetPartitionAsync("UserStatisticsProjection", "user-123", cancellationToken);

// Restart projection engine to begin rebuild
await engine.RunAsync(cancellationToken);
List Registered Projections

Useful for CLI tools and management UIs:

var rebuilder = serviceProvider.GetRequiredService<IProjectionRebuilder>();

// Get all registered projection names
var projections = rebuilder.GetRegisteredProjections();
foreach (var projection in projections)
{
    Console.WriteLine($"Projection: {projection}");
}
CLI-Friendly Usage

The rebuilder API is designed to be easily integrated into CLI applications:

using BbQ.Events;
using BbQ.Events.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;

// Configure services
var services = new ServiceCollection();
services.AddLogging();
services.AddInMemoryEventBus();
services.AddProjectionsFromAssembly(typeof(Program).Assembly);
services.AddProjectionEngine();

var provider = services.BuildServiceProvider();
var rebuilder = provider.GetRequiredService<IProjectionRebuilder>();

// Parse command-line arguments
if (args.Length == 0 || args[0] == "list")
{
    // List all projections
    Console.WriteLine("Registered projections:");
    foreach (var projection in rebuilder.GetRegisteredProjections())
    {
        Console.WriteLine($"  - {projection}");
    }
}
else if (args[0] == "reset-all")
{
    // Reset all projections
    Console.WriteLine("Resetting all projections...");
    await rebuilder.ResetAllProjectionsAsync();
    Console.WriteLine("All projections reset. Restart the projection engine to rebuild.");
}
else if (args[0] == "reset" && args.Length >= 2)
{
    // Reset a specific projection
    var projectionName = args[1];
    Console.WriteLine($"Resetting projection: {projectionName}...");
    await rebuilder.ResetProjectionAsync(projectionName);
    Console.WriteLine($"Projection {projectionName} reset. Restart the projection engine to rebuild.");
}
else if (args[0] == "reset-partition" && args.Length >= 3)
{
    // Reset a specific partition
    var projectionName = args[1];
    var partitionKey = args[2];
    Console.WriteLine($"Resetting partition {partitionKey} of projection {projectionName}...");
    await rebuilder.ResetPartitionAsync(projectionName, partitionKey);
    Console.WriteLine($"Partition {partitionKey} reset. Restart the projection engine to rebuild.");
}
else
{
    Console.WriteLine("Usage:");
    Console.WriteLine("  list                           - List all registered projections");
    Console.WriteLine("  reset-all                      - Reset all projections");
    Console.WriteLine("  reset <projection-name>        - Reset a specific projection");
    Console.WriteLine("  reset-partition <projection-name> <partition-key> - Reset a specific partition");
}

Note: After resetting checkpoints, you must restart the projection engine (or projections) for the changes to take effect. The rebuilder only manages checkpoints - it does not modify projection state or read models directly.

Projection Replay API (NEW)

The replay service provides comprehensive control over replaying projections from historical events. Unlike simple checkpoint resets, replay offers fine-grained control over replay boundaries, batch processing, and checkpoint strategies.

The IReplayService is automatically registered when you call AddProjectionEngine().

What is Replay?

Replay is the process of rebuilding a projection by reprocessing historical events. This is useful for:

  • Rebuilding projections after schema or logic changes
  • Validating projection logic with historical data
  • Recovering from corrupted read models
  • Testing with specific event ranges
  • Incremental replay in stages
Replay vs. Checkpoint Reset
Feature Checkpoint Reset (IProjectionRebuilder) Replay (IReplayService)
Purpose Reset checkpoint and restart engine Orchestrate controlled replay
Event Range All events from beginning Configurable (FromPosition, ToPosition)
Batch Control Uses projection defaults Configurable per replay
Dry Run No Yes (test without checkpoint writes)
Checkpoint Strategy Normal Configurable (Normal, FinalOnly, None)
Partition Support Yes Yes
Basic Replay from Scratch
var replayService = serviceProvider.GetRequiredService<IReplayService>();

// Replay from the beginning
await replayService.ReplayAsync(
    "UserProfileProjection",
    new ReplayOptions { FromPosition = 0 },
    cancellationToken);
Replay a Specific Range
// Replay events 1000-2000
await replayService.ReplayAsync(
    "OrderProjection",
    new ReplayOptions
    {
        FromPosition = 1000,
        ToPosition = 2000,
        BatchSize = 100  // Checkpoint every 100 events
    },
    cancellationToken);
Resume from Last Checkpoint
// Continue from last checkpoint position
await replayService.ReplayAsync(
    "UserProfileProjection",
    new ReplayOptions
    {
        FromCheckpoint = true,  // Resume from saved position
        ToPosition = 5000       // Process up to position 5000
    },
    cancellationToken);
Dry Run Replay

Test replay without modifying checkpoints:

// Test replay without checkpoint writes
await replayService.ReplayAsync(
    "InventoryProjection",
    new ReplayOptions
    {
        FromPosition = 0,
        DryRun = true  // Process events but don't save checkpoints
    },
    cancellationToken);
Replay a Specific Partition

For partitioned projections, replay only one partition:

// Replay a single partition
await replayService.ReplayAsync(
    "UserStatisticsProjection",
    new ReplayOptions
    {
        Partition = "user-123",  // Only this partition
        FromPosition = 0
    },
    cancellationToken);
Checkpoint Strategies

Control when checkpoints are written during replay:

// Write checkpoint only after replay completes
await replayService.ReplayAsync(
    "UserProfileProjection",
    new ReplayOptions
    {
        FromPosition = 0,
        CheckpointMode = CheckpointMode.FinalOnly  // Single checkpoint at end
    },
    cancellationToken);

// Never write checkpoints (similar to dry run)
await replayService.ReplayAsync(
    "AnalyticsProjection",
    new ReplayOptions
    {
        FromPosition = 0,
        CheckpointMode = CheckpointMode.None  // No checkpoints written
    },
    cancellationToken);

// Normal checkpoint strategy (default)
await replayService.ReplayAsync(
    "OrderProjection",
    new ReplayOptions
    {
        FromPosition = 0,
        BatchSize = 100,
        CheckpointMode = CheckpointMode.Normal  // Checkpoint every BatchSize events
    },
    cancellationToken);
ReplayOptions Reference
public class ReplayOptions
{
    // Resume from last saved checkpoint (default: false)
    public bool FromCheckpoint { get; set; }
    
    // Start replay from this position (overrides FromCheckpoint)
    public long? FromPosition { get; set; }
    
    // Stop replay at this position (inclusive)
    public long? ToPosition { get; set; }
    
    // Number of events before checkpointing (null = use projection default)
    public int? BatchSize { get; set; }
    
    // Replay only this partition (for partitioned projections)
    public string? Partition { get; set; }
    
    // Process events without persisting checkpoints (default: false)
    public bool DryRun { get; set; }
    
    // Control when checkpoints are written (default: Normal)
    public CheckpointMode CheckpointMode { get; set; }
}

public enum CheckpointMode
{
    Normal,      // Write checkpoints according to BatchSize (default)
    FinalOnly,   // Write checkpoint only after replay completes
    None         // Never write checkpoints
}
Validation

Replay options are validated automatically:

var options = new ReplayOptions
{
    FromPosition = 100,
    ToPosition = 50  // Error: FromPosition > ToPosition
};

// Throws InvalidOperationException
await replayService.ReplayAsync("UserProjection", options, cancellationToken);

Validation rules:

  • FromPosition and ToPosition must be non-negative
  • FromPosition cannot be greater than ToPosition
  • BatchSize must be positive (if specified)
  • Projection must be registered
CLI-Friendly Usage
// Parse command-line arguments for replay
if (args[0] == "replay")
{
    var projectionName = args[1];
    var fromPos = long.Parse(args[2]);
    var toPos = args.Length > 3 ? long.Parse(args[3]) : (long?)null;
    
    Console.WriteLine($"Replaying {projectionName} from {fromPos} to {toPos?.ToString() ?? "end"}...");
    
    await replayService.ReplayAsync(
        projectionName,
        new ReplayOptions
        {
            FromPosition = fromPos,
            ToPosition = toPos,
            BatchSize = 100
        },
        cancellationToken);
    
    Console.WriteLine("Replay complete!");
}
Best Practices
  1. Use Dry Run First: Test replay with DryRun = true before committing
  2. Start Small: Replay a limited range first (ToPosition)
  3. Monitor Progress: Check logs for replay progress and errors
  4. Batch Size: Tune BatchSize for performance vs. recovery granularity
  5. Partition Replay: Replay specific partitions to reduce scope
  6. FinalOnly for Large Replays: Reduce checkpoint overhead with CheckpointMode.FinalOnly
  7. Backup Read Models: Back up read models before replay if needed
Event Store Integration

Event streaming replay is fully implemented and works with any IEventStore implementation (InMemoryEventStore, SqlServerEventStore).

Important: The replay service reads events from a stream named after the projection. Ensure your events are appended to streams using the projection name as the stream identifier.

// Example: Appending events to the correct stream for replay
var eventStore = serviceProvider.GetRequiredService<IEventStore>();
var projectionName = "UserProfileProjection";

// Append events to stream named after projection
await eventStore.AppendAsync(projectionName, new UserCreated(...));
await eventStore.AppendAsync(projectionName, new UserUpdated(...));

// Now replay will work correctly
var replayService = serviceProvider.GetRequiredService<IReplayService>();
await replayService.ReplayAsync(projectionName, new ReplayOptions { FromPosition = 0 }, ct);

Alternative: Use ProjectionStartupMode.Replay to automatically replay from checkpoints when the projection engine starts:

services.AddProjection<UserProfileProjection>(options =>
{
    options.StartupMode = ProjectionStartupMode.Replay;
});

Distributed Systems

For multi-process or distributed systems, implement IEventBus with your preferred message broker:

  • RabbitMQ
  • Azure Service Bus
  • Apache Kafka
  • AWS SQS/SNS

📚 Key Concepts

Event Publisher

Publishes events to all registered handlers and active subscribers.

Event Handler

Processes events one-by-one as they're published. Multiple handlers can process the same event.

Event Subscriber

Provides a stream of events for reactive programming patterns. Each subscriber gets an independent stream.

Projection Handler

Transforms events into read models. Can handle multiple event types and optionally support partitioning.

Projection Engine

Orchestrates projection execution from live event streams. Provides infrastructure for checkpointing via IProjectionCheckpointStore.

Projection Rebuilder

Manages projection checkpoints for rebuilding projections from scratch. Supports resetting all projections, single projections, or individual partitions.

Replay Service

Orchestrates controlled replay of projections with fine-grained control over event ranges, batch processing, dry runs, and checkpoint strategies. Useful for testing, validation, and incremental replay scenarios.

Event Bus

Central hub combining publishing and subscribing capabilities.

⚙️ Configuration

// Default configuration
services.AddInMemoryEventBus();

// Manual handler registration
services.AddScoped<IEventHandler<MyEvent>, MyEventHandler>();
services.AddScoped<IEventSubscriber<MyEvent>, MyEventSubscriber>();

// Projection registration
services.AddProjection<MyProjection>();
services.AddProjectionsFromAssembly(typeof(Program).Assembly);
services.AddProjectionEngine();

// Custom checkpoint store for production
services.AddSingleton<IProjectionCheckpointStore, SqlCheckpointStore>();

🎯 Design Principles

  • Optional consumers: Events can be published without handlers or subscribers
  • Type safety: Compile-time checking for all event types
  • Explicit: Clear separation between publishing, handling, subscribing, and projecting
  • Storage-agnostic: Interfaces can be implemented for any storage/messaging backend
  • Extensible: Default implementations can be extended for production features (checkpointing, batching, parallelism)
  • Compatible: Works standalone or integrates with BbQ.Cqrs

📄 License

This project is licensed under the MIT License.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (2)

Showing the top 2 NuGet packages that depend on BbQ.Events:

Package Downloads
BbQ.Events.SqlServer

SQL Server implementation for BbQ.Events, providing both IEventStore for event sourcing and IProjectionCheckpointStore for projection checkpoints. Features durable event persistence with sequential positions, atomic operations, JSON serialization, and thread-safe parallel processing support.

BbQ.Events.PostgreSql

PostgreSQL implementation for BbQ.Events, providing both IEventStore for event sourcing and IProjectionCheckpointStore for projection checkpoints. Features durable event persistence with sequential positions, atomic operations, JSON serialization, and thread-safe parallel processing support.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.13 0 3/4/2026
1.0.12 153 1/27/2026
1.0.11 199 1/26/2026