BbQ.Events.PostgreSql
0.1.0
dotnet add package BbQ.Events.PostgreSql --version 0.1.0
NuGet\Install-Package BbQ.Events.PostgreSql -Version 0.1.0
<PackageReference Include="BbQ.Events.PostgreSql" Version="0.1.0" />
<PackageVersion Include="BbQ.Events.PostgreSql" Version="0.1.0" />
<PackageReference Include="BbQ.Events.PostgreSql" />
paket add BbQ.Events.PostgreSql --version 0.1.0
#r "nuget: BbQ.Events.PostgreSql, 0.1.0"
#:package BbQ.Events.PostgreSql@0.1.0
#addin nuget:?package=BbQ.Events.PostgreSql&version=0.1.0
#tool nuget:?package=BbQ.Events.PostgreSql&version=0.1.0
BbQ.Events.PostgreSql
PostgreSQL implementation for BbQ.Events, providing both event store and checkpoint persistence.
This package provides production-ready, durable implementations for:
- Event Store: Full event sourcing with IEventStore for PostgreSQL
- Checkpoint Store: Projection checkpoint persistence with IProjectionCheckpointStore
Features
- ✅ Durable Event Store: Sequential event persistence with stream isolation
- ✅ Durable Checkpoint Store: Persistent projection checkpoints
- ✅ Atomic Operations: INSERT ... ON CONFLICT upserts prevent race conditions
- ✅ Thread-safe: Safe for parallel processing and multiple instances
- ✅ Automatic Schema Creation: Optional schema initialization on startup
- ✅ Minimal Dependencies: Uses Npgsql (PostgreSQL ADO.NET provider) for performance
- ✅ JSON Serialization: Flexible event data serialization
- ✅ Partitioned Projections: Support for partition-based checkpointing (schema-ready)
- ✅ Feature-Based Architecture: Organized by capability (Events, Checkpointing, Schema, Configuration)
Installation
dotnet add package BbQ.Events.PostgreSql
Database Schema
The package includes SQL schema files in the Schema/ folder.
Automatic Schema Creation (Recommended)
The simplest way to set up the database schema is to enable automatic schema creation:
services.UsePostgreSqlEventStore(options =>
{
options.ConnectionString = "Host=localhost;Database=myapp;Username=myuser;Password=mypass";
options.AutoCreateSchema = true; // Automatically create tables if they don't exist
});
When AutoCreateSchema is enabled:
- ✅ Tables are created automatically on application startup if they don't exist
- ✅ The schema creation is idempotent (safe to run multiple times)
- ✅ Existing tables are not modified
- ✅ Uses the embedded SQL scripts to ensure consistency
Note: For production environments, you may prefer to run the SQL scripts manually for more control. Set AutoCreateSchema = false (the default) and execute the scripts during deployment.
Manual Schema Creation
Alternatively, you can manually run the schema scripts:
Events Table (for Event Store)
-- See Schema/CreateEventsTable.sql for full script
CREATE TABLE bbq_events (
event_id BIGSERIAL PRIMARY KEY,
stream_name VARCHAR(200) NOT NULL,
position BIGINT NOT NULL,
event_type VARCHAR(500) NOT NULL,
event_data TEXT NOT NULL,
metadata TEXT NULL,
created_utc TIMESTAMP NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC'),
CONSTRAINT uq_bbq_events_stream_position UNIQUE (stream_name, position)
);
Streams Table (for Event Store)
-- See Schema/CreateStreamsTable.sql for full script
CREATE TABLE bbq_streams (
stream_name VARCHAR(200) PRIMARY KEY,
current_position BIGINT NOT NULL DEFAULT -1,
version INT NOT NULL DEFAULT 0,
created_utc TIMESTAMP NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC'),
last_updated_utc TIMESTAMP NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC')
);
Checkpoints Table (for Projection Checkpoints)
-- See Schema/CreateCheckpointTable.sql for full script
CREATE TABLE bbq_projection_checkpoints (
projection_name TEXT NOT NULL,
partition_key TEXT NULL DEFAULT NULL,
position BIGINT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT pk_bbq_projection_checkpoints PRIMARY KEY (projection_name, partition_key) NULLS NOT DISTINCT
);
Note: The partition_key column is nullable and defaults to NULL for non-partitioned projections. PostgreSQL allows nullable columns in composite primary keys with the NULLS NOT DISTINCT clause (PostgreSQL 15+). This ensures only one row with a NULL partition_key can exist per projection_name, which is the desired behavior for non-partitioned projections.
Explicit Schema Initialization
You can also manually trigger schema initialization at any time:
var eventStore = provider.GetRequiredService<IEventStore>();
// Ensure the schema exists (creates tables if they don't exist)
await eventStore.EnsureSchemaAsync();
This is useful for:
- Testing environments where you want to set up the database on-demand
- Initialization logic in application startup
- Migration scenarios where you're adding the event store to an existing application
Usage
Event Store
Use the PostgreSQL event store for durable event persistence:
using BbQ.Events.PostgreSql.Configuration;
var services = new ServiceCollection();
// Register PostgreSQL event store
services.UsePostgreSqlEventStore("Host=localhost;Database=myapp;Username=myuser;Password=mypass");
var provider = services.BuildServiceProvider();
var eventStore = provider.GetRequiredService<IEventStore>();
// Append events to a stream
var userId = Guid.NewGuid();
await eventStore.AppendAsync("users", new UserCreated(userId, "Alice", "alice@example.com"));
await eventStore.AppendAsync("users", new UserUpdated(userId, "Alice Smith"));
// Read events from a stream
await foreach (var storedEvent in eventStore.ReadAsync<UserCreated>("users"))
{
Console.WriteLine($"Position {storedEvent.Position}: {storedEvent.Event.Name}");
}
// Get current stream position
var position = await eventStore.GetStreamPositionAsync("users");
Console.WriteLine($"Stream at position: {position}");
Event Store with Options
Configure advanced options:
services.UsePostgreSqlEventStore(options =>
{
options.ConnectionString = "Host=localhost;Database=myapp;Username=myuser;Password=mypass";
options.IncludeMetadata = true; // Include metadata (timestamp, server, etc.)
options.AutoCreateSchema = true; // Automatically create schema if missing
options.JsonSerializerOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
});
Checkpoint Store (for Projections)
Use PostgreSQL checkpoint store for durable projection checkpoints:
using BbQ.Events.Configuration;
using BbQ.Events.PostgreSql.Configuration;
var services = new ServiceCollection();
// Register event bus and projections
services.AddInMemoryEventBus();
services.AddProjection<UserProfileProjection>();
// Register PostgreSQL checkpoint store
services.UsePostgreSqlCheckpoints(
"Host=localhost;Database=myapp;Username=myuser;Password=mypass");
// Register projection engine
services.AddProjectionEngine();
var provider = services.BuildServiceProvider();
// Get the projection engine
var engine = provider.GetRequiredService<IProjectionEngine>();
// Run projections (blocks until cancelled)
await engine.RunAsync(cancellationToken);
Complete Example with Event Store and Checkpoints
using BbQ.Events.Configuration;
using BbQ.Events.PostgreSql.Configuration;
var services = new ServiceCollection();
// Register PostgreSQL event store for event sourcing
services.UsePostgreSqlEventStore(options =>
{
options.ConnectionString = "Host=localhost;Database=myapp;Username=myuser;Password=mypass";
options.AutoCreateSchema = true; // Automatically create schema
});
// Register event bus for pub/sub
services.AddInMemoryEventBus();
// Register projections
services.AddProjection<UserProfileProjection>();
// Register PostgreSQL checkpoint store for projections
services.UsePostgreSqlCheckpoints("Host=localhost;Database=myapp;Username=myuser;Password=mypass");
// Register projection engine
services.AddProjectionEngine();
var provider = services.BuildServiceProvider();
Connection String Configuration
Recommended: Store connection strings in configuration:
var checkpointConnection = builder.Configuration.GetConnectionString("Checkpoints");
services.UsePostgreSqlCheckpoints(checkpointConnection);
appsettings.json:
{
"ConnectionStrings": {
"Checkpoints": "Host=localhost;Database=myapp;Username=myuser;Password=mypass"
}
}
Connection String Examples
Local Development:
Host=localhost;Database=myapp;Username=myuser;Password=mypass
Docker Container:
Host=postgres-container;Port=5432;Database=myapp;Username=myuser;Password=mypass
Production with SSL:
Host=prod-server.example.com;Database=myapp;Username=myuser;Password=mypass;SSL Mode=Require
Connection Pooling (default enabled):
Host=localhost;Database=myapp;Username=myuser;Password=mypass;Maximum Pool Size=100
Architecture
The package follows a feature-based folder structure:
BbQ.Events.PostgreSql/
Checkpointing/ # Projection checkpoint store
PostgreSqlProjectionCheckpointStore.cs
Schema/ # SQL schema scripts
CreateCheckpointTable.sql
Configuration/ # DI extensions
ServiceCollectionExtensions.cs
This structure:
- Aligns with the BbQ.Events core library architecture
- Makes it easy to find related functionality
- Separates concerns cleanly
- Provides clear separation between public API and internal implementation
Concurrency and Idempotency
Checkpoint Atomicity
The checkpoint store uses PostgreSQL's INSERT ... ON CONFLICT statement for atomic upsert operations:
INSERT INTO bbq_projection_checkpoints (projection_name, partition_key, position, updated_at)
VALUES (@projection_name, NULL, @position, NOW())
ON CONFLICT (projection_name, partition_key)
DO UPDATE SET position = EXCLUDED.position, updated_at = NOW()
This ensures that:
- Concurrent writes are safe: Multiple processes can write checkpoints simultaneously
- Last write wins: The most recent checkpoint value is always persisted
- No lost updates: Atomic operations prevent race conditions
Parallel Processing
The implementation is safe for parallel processing scenarios:
- ✅ Multiple instances can run concurrently
- ✅ Different projections maintain independent state
- ✅ Connection pooling handles concurrent database access
- ✅ Proper async/await patterns for scalability
Performance Considerations
Connection Pooling
The implementation relies on Npgsql's built-in connection pooling:
- Each operation opens and closes a connection
- Connection pooling handles reuse automatically
- No manual connection management required
- Default pool size is 100 connections (configurable in connection string)
Recommended Indexes
The schema file includes recommended indexes:
Checkpoints Table:
-- Primary key for fast checkpoint lookups
PRIMARY KEY (projection_name, partition_key)
-- Index for projection health monitoring
CREATE INDEX idx_bbq_projection_checkpoints_updated_at
ON bbq_projection_checkpoints(updated_at);
-- Index for querying all checkpoints of a projection
CREATE INDEX idx_bbq_projection_checkpoints_projection_name
ON bbq_projection_checkpoints(projection_name);
Migration Guides
From In-Memory Checkpoint Store to PostgreSQL
Replace the in-memory checkpoint store with PostgreSQL:
Before:
services.AddProjectionEngine(); // Uses InMemoryProjectionCheckpointStore
After:
services.UsePostgreSqlCheckpoints(connectionString);
services.AddProjectionEngine();
All existing projection code remains unchanged.
From SQL Server to PostgreSQL
Replace the SQL Server checkpoint store with PostgreSQL:
Before:
services.UseSqlServerCheckpoints(connectionString);
services.AddProjectionEngine();
After:
services.UsePostgreSqlCheckpoints(connectionString);
services.AddProjectionEngine();
Note: You'll need to migrate the checkpoint data from SQL Server to PostgreSQL. The schema is very similar.
Troubleshooting
Connection Issues
If you encounter connection errors:
Verify PostgreSQL is accessible:
psql -h localhost -U myuser -d myappCheck connection string: Ensure it's valid and includes necessary parameters
Verify tables exist:
SELECT * FROM information_schema.tables WHERE table_name = 'bbq_projection_checkpoints';
Checkpoint Not Persisting
If checkpoints aren't being saved:
Verify projection engine is running:
await engine.RunAsync(cancellationToken);Check projection registration:
services.AddProjection<YourProjection>();Query the checkpoint table directly:
SELECT * FROM bbq_projection_checkpoints;
Permission Issues
If you get permission errors:
Ensure the database user has necessary permissions:
GRANT SELECT, INSERT, UPDATE, DELETE ON bbq_projection_checkpoints TO myuser;Verify connection credentials in your connection string
Testing with Docker
For local development and testing, you can use Docker to run PostgreSQL:
docker run --name postgres-bbq \
-e POSTGRES_PASSWORD=mypassword \
-e POSTGRES_USER=myuser \
-e POSTGRES_DB=myapp \
-p 5432:5432 \
-d postgres:16
Then use this connection string:
Host=localhost;Database=myapp;Username=myuser;Password=mypassword
Future Enhancements
The schema and architecture are designed to support future features:
Checkpoint Store
- Partitioned Projections: The
partition_keycolumn enables per-partition checkpointing - Partition Metadata: Additional columns can be added without breaking changes
- Monitoring: The
updated_atcolumn enables projection health monitoring - Checkpoint batching: Batch checkpoint updates for improved throughput
General
- Replay features: Integration with projection replay and rebuilding
- Parallelism: Parallel projection processing support
- Event Store: PostgreSQL event store implementation (future package)
License
MIT License - see LICENSE.txt for details
Contributing
Contributions are welcome! Please open an issue or pull request at: https://github.com/JeanMarcMbouma/Outcome
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- BbQ.Events (>= 1.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
- Npgsql (>= 8.0.5)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.1.0 | 150 | 1/26/2026 |