HoneyDrunk.Transport.StorageQueue
0.4.0
dotnet add package HoneyDrunk.Transport.StorageQueue --version 0.4.0
NuGet\Install-Package HoneyDrunk.Transport.StorageQueue -Version 0.4.0
<PackageReference Include="HoneyDrunk.Transport.StorageQueue" Version="0.4.0" />
<PackageVersion Include="HoneyDrunk.Transport.StorageQueue" Version="0.4.0" />
<PackageReference Include="HoneyDrunk.Transport.StorageQueue" />
paket add HoneyDrunk.Transport.StorageQueue --version 0.4.0
#r "nuget: HoneyDrunk.Transport.StorageQueue, 0.4.0"
#:package HoneyDrunk.Transport.StorageQueue@0.4.0
#addin nuget:?package=HoneyDrunk.Transport.StorageQueue&version=0.4.0
#tool nuget:?package=HoneyDrunk.Transport.StorageQueue&version=0.4.0
HoneyDrunk.Transport
Reliable messaging and outbox infrastructure for the Hive - Transport unifies brokers, queues, and event buses under one contract ensuring delivery, order, and idempotence. It powers communication between NodesβData, Pulse, Vault, and beyondβso every message finds its way.
π¦ What Is This?
HoneyDrunk.Transport is the messaging backbone of HoneyDrunk.OS ("the Hive"). It provides a transport-agnostic abstraction layer over different message brokers with built-in resilience, observability, and exactly-once semantics.
What This Package Provides
- Transport Abstraction - Unified
ITransportPublisherandITransportConsumerover Azure Service Bus, Azure Storage Queue, and InMemory - Middleware Pipeline - Onion-style message processing with logging, telemetry, correlation, and retry
- Envelope Pattern - Immutable
ITransportEnvelopewith correlation/causation tracking and Grid context propagation - Grid Context Integration - Uses
IGridContextfrom HoneyDrunk.Kernel for distributed context propagation - Transactional Outbox -
IOutboxStoreandIOutboxDispatchercontracts for exactly-once processing - Health Contributors -
ITransportHealthContributorfor Kubernetes probe integration - Observability - OpenTelemetry spans via
ITransportMetricsand built-in telemetry middleware - Blob Fallback - Persist failed Service Bus publishes to Azure Blob Storage for later replay
What This Package Does Not Provide
- Automatic message routing β Application responsibility; no built-in routing conventions
- Message schema registry β No schema validation or evolution support in v0.1.0
- Distributed transactions β Outbox provides eventual consistency, not two-phase commit
- Non-Azure provider implementations β RabbitMQ and Kafka are planned but not available
β οΈ v0.1.0 Limitations
The following features exist as contracts only or have limitations in v0.1.0:
| Feature | Contract | v0.1.0 Status |
|---|---|---|
| Transport publishing | ITransportPublisher |
β Implemented for Service Bus, Storage Queue, InMemory |
| Transport consuming | ITransportConsumer |
β Implemented for Service Bus, Storage Queue, InMemory |
| Transactional outbox | IOutboxStore |
β οΈ Contract only β application must implement against their database |
| Outbox dispatching | IOutboxDispatcher |
β
DefaultOutboxDispatcher provided |
| Health aggregation | ITransportHealthContributor |
β οΈ Contributors exist β application wires into health system |
| Message serialization | IMessageSerializer |
β
JsonMessageSerializer provided as default |
Bottom line: v0.1.0 provides complete transport abstraction with Azure providers. Applications must implement:
IOutboxStoreif using transactional outbox pattern (database-specific)- Health endpoint wiring for contributor aggregation
- Custom
IMessageSerializerif JSON is not suitable
π Quick Start
Installation
# Azure Service Bus transport
dotnet add package HoneyDrunk.Transport.AzureServiceBus
# Or Azure Storage Queue transport
dotnet add package HoneyDrunk.Transport.StorageQueue
# Or InMemory transport (for testing)
dotnet add package HoneyDrunk.Transport.InMemory
# Or just the core abstractions (contracts only)
dotnet add package HoneyDrunk.Transport
Web API Setup
This example shows a web application with Kernel and Azure Service Bus. Simpler setups are possibleβsee package-specific documentation.
Registration order matters. Kernel must be registered before Transport. See DependencyInjection.md for details.
using HoneyDrunk.Kernel.DependencyInjection;
using HoneyDrunk.Transport.DependencyInjection;
var builder = WebApplication.CreateBuilder(args);
// 1. Kernel (required for Grid context)
builder.Services.AddHoneyDrunkCoreNode(nodeDescriptor);
// 2. Transport core (middleware pipeline, envelope factory)
builder.Services.AddHoneyDrunkTransportCore(options =>
{
options.EnableTelemetry = true;
options.EnableLogging = true;
});
// 3. Azure Service Bus provider
builder.Services.AddHoneyDrunkServiceBusTransport(options =>
{
options.FullyQualifiedNamespace = "mynamespace.servicebus.windows.net";
options.Address = "orders";
options.EntityType = ServiceBusEntityType.Topic;
options.SubscriptionName = "order-processor";
options.MaxConcurrency = 10;
});
// 4. Register message handlers
builder.Services.AddMessageHandler<OrderCreatedEvent, OrderCreatedHandler>();
var app = builder.Build();
app.Run();
Abstractions-Only Usage
For libraries that only need contracts without runtime dependencies:
// Reference only HoneyDrunk.Transport
// No Kernel runtime, no broker SDK dependencies
public class OrderService
{
private readonly ITransportPublisher _publisher;
private readonly EnvelopeFactory _envelopeFactory;
private readonly IMessageSerializer _serializer;
public async Task PublishOrderCreatedAsync(Order order, CancellationToken ct)
{
var @event = new OrderCreatedEvent { OrderId = order.Id };
var payload = _serializer.Serialize(@event);
var envelope = _envelopeFactory.CreateEnvelope<OrderCreatedEvent>(payload);
await _publisher.PublishAsync(
envelope,
EndpointAddress.Create("orders", "orders-topic"),
ct);
}
}
π― Key Features (v0.1.0)
π¨ Transport Envelope Pattern
All messages are wrapped in immutable ITransportEnvelope for distributed tracing:
public interface ITransportEnvelope
{
string MessageId { get; }
string? CorrelationId { get; }
string? CausationId { get; }
string MessageType { get; }
ReadOnlyMemory<byte> Payload { get; }
IReadOnlyDictionary<string, string> Headers { get; }
DateTimeOffset Timestamp { get; }
}
// Create envelopes via factory (integrates with TimeProvider and Grid context)
var envelope = envelopeFactory.CreateEnvelopeWithGridContext<OrderCreatedEvent>(
payload, gridContext);
Note: Always use EnvelopeFactory to create envelopes. It integrates with TimeProvider for deterministic timestamps and IGridContext for distributed context propagation.
π Grid Context Integration
Transport is fully integrated with Kernel's IGridContext for distributed context propagation:
public class OrderCreatedHandler : IMessageHandler<OrderCreatedEvent>
{
public async Task<MessageProcessingResult> HandleAsync(
OrderCreatedEvent message,
MessageContext context,
CancellationToken ct)
{
// Access Grid context directly from MessageContext
var grid = context.GridContext;
_logger.LogInformation(
"Processing order {OrderId} with CorrelationId {CorrelationId} on Node {NodeId}",
message.OrderId,
grid?.CorrelationId,
grid?.NodeId);
return MessageProcessingResult.Success;
}
}
Note: Grid context is extracted from envelope headers by GridContextPropagationMiddleware and populated in MessageContext automatically.
π§ Middleware Pipeline
Message processing follows an onion-style middleware pattern:
// Built-in middleware (executed in order)
// 1. GridContextPropagationMiddleware - Extracts IGridContext from envelope
// 2. TelemetryMiddleware - Distributed tracing via OpenTelemetry
// 3. LoggingMiddleware - Structured logging of message processing
// Custom middleware registration
builder.Services.AddHoneyDrunkTransportCore()
.AddMiddleware<CustomRetryMiddleware>()
.AddMiddleware<CustomValidationMiddleware>();
Note: Middleware order matters. GridContextPropagation must run before telemetry to ensure correlation IDs are available for tracing.
π€ Transactional Outbox
For exactly-once processing with database transactions:
public class OrderService(
IOutboxStore outboxStore,
EnvelopeFactory factory,
IMessageSerializer serializer,
IDbContext dbContext)
{
public async Task CreateOrderAsync(CreateOrderCommand command, CancellationToken ct)
{
await using var transaction = await dbContext.BeginTransactionAsync(ct);
// Save order to database
var order = new Order { /* ... */ };
await dbContext.Orders.AddAsync(order, ct);
// Save message to outbox (same transaction)
var payload = serializer.Serialize(new OrderCreatedEvent { OrderId = order.Id });
var envelope = factory.CreateEnvelope<OrderCreatedEvent>(payload);
await outboxStore.SaveAsync(
EndpointAddress.Create("orders", "orders-topic"),
envelope, ct);
await dbContext.SaveChangesAsync(ct);
await transaction.CommitAsync(ct);
// DefaultOutboxDispatcher publishes from outbox in background
}
}
Note: IOutboxStore is a contractβapplication must implement against their database. DefaultOutboxDispatcher polls the store and publishes pending messages.
π₯ Health Contributors
Transport providers include health monitoring for Kubernetes probes:
public interface ITransportHealthContributor
{
string Name { get; }
ValueTask<TransportHealthResult> CheckHealthAsync(CancellationToken ct);
}
// Each transport registers its own contributor
// - ServiceBusHealthContributor
// - StorageQueueHealthContributor
// - InMemoryHealthContributor
Note: Health contributors are passiveβinvoked by host health system on demand. Applications wire contributors into their health check infrastructure.
βοΈ Storage Queue vs Service Bus
| Scenario | Storage Queue | Service Bus |
|---|---|---|
| Cost optimization | β $0.0004/10K ops | β Higher cost |
| High volume (millions/day) | β Excellent | β Good |
| Simple queue semantics | β Yes | β Yes |
| Message size < 64KB | β Yes | β Up to 100MB |
| Topics/subscriptions (fan-out) | β No | β Yes |
| Sessions (ordered processing) | β No | β Yes |
| Transactions | β No | β Yes |
| Duplicate detection | β No | β Yes |
Choose Storage Queue for cost-effective, high-volume, simple queue scenarios.
Choose Service Bus for enterprise messaging with topics, sessions, or transactions.
π Documentation
Package Documentation
- Architecture - High-level architecture and design principles
- Abstractions - Core contracts:
ITransportEnvelope,IMessageHandler,MessageContext - Pipeline - Middleware pipeline and built-in middleware
- Configuration - All options:
TransportCoreOptions,RetryOptions, error strategies - Context - Grid context propagation and
IGridContextFactory - Primitives -
EnvelopeFactory,TransportEnvelope, serialization - Outbox - Transactional outbox pattern
Transport Providers
- AzureServiceBus - Service Bus transport: sessions, topics, blob fallback
- StorageQueue - Storage Queue transport: concurrency model, poison queues
- InMemory - InMemory transport for testing
Runtime & Observability
- Runtime -
ITransportRuntimeand consumer lifecycle - Health - Health monitoring with
ITransportHealthContributor - Metrics -
ITransportMetricsand OpenTelemetry integration - Testing - Test patterns and helpers
ποΈ Project Structure
HoneyDrunk.Transport/
βββ HoneyDrunk.Transport/ # Core abstractions & pipeline
β βββ Abstractions/ # Publisher, consumer, handler contracts
β βββ Pipeline/ # Middleware execution engine
β βββ Configuration/ # TransportCoreOptions, RetryOptions
β βββ Context/ # Grid context factory and propagation
β βββ Primitives/ # EnvelopeFactory, TransportEnvelope
β βββ Outbox/ # IOutboxStore, DefaultOutboxDispatcher
β βββ Runtime/ # ITransportRuntime host
β βββ Health/ # ITransportHealthContributor
β βββ Metrics/ # ITransportMetrics
β βββ Telemetry/ # OpenTelemetry integration
β βββ DependencyInjection/ # AddHoneyDrunkTransportCore()
β
βββ HoneyDrunk.Transport.AzureServiceBus/ # Azure Service Bus provider
β βββ Publishing/ # ServiceBusTransportPublisher
β βββ Consuming/ # ServiceBusTransportConsumer
β βββ BlobFallback/ # Blob storage for failed publishes
β βββ Health/ # ServiceBusHealthContributor
β βββ DependencyInjection/ # AddHoneyDrunkServiceBusTransport()
β
βββ HoneyDrunk.Transport.StorageQueue/ # Azure Storage Queue provider
β βββ Publishing/ # StorageQueueTransportPublisher
β βββ Consuming/ # StorageQueueTransportConsumer
β βββ Health/ # StorageQueueHealthContributor
β βββ DependencyInjection/ # AddHoneyDrunkTransportStorageQueue()
β
βββ HoneyDrunk.Transport.InMemory/ # In-memory provider (testing)
β βββ InMemoryBroker # Thread-safe in-memory message store
β βββ Health/ # InMemoryHealthContributor
β βββ DependencyInjection/ # AddHoneyDrunkInMemoryTransport()
β
βββ HoneyDrunk.Transport.Tests/ # xUnit test suite
π What's New in v0.1.0
Core Transport
ITransportPublisherandITransportConsumertransport abstractionITransportEnvelopeimmutable message wrapper with Grid contextEnvelopeFactoryintegratingTimeProviderandIGridContextIMessagePipelinewith onion-style middleware executionIMessageHandler<T>andMessageProcessingResultfor handler contractsGridContextPropagationMiddleware,TelemetryMiddleware,LoggingMiddlewareIOutboxStoreandDefaultOutboxDispatcherfor transactional outboxITransportHealthContributorfor health check participationITransportMetricsfor OpenTelemetry integration
Azure Service Bus Provider
ServiceBusTransportPublisherwith topic and queue supportServiceBusTransportConsumerwith session and subscription supportBlobFallbackPublisherfor failed publish persistenceServiceBusHealthContributorfor connectivity health checksAzureServiceBusOptionswith retry and prefetch configuration
Azure Storage Queue Provider
StorageQueueTransportPublisherwith base64 encodingStorageQueueTransportConsumerwith concurrent pollingStorageQueueHealthContributorfor connectivity health checksStorageQueueOptionswith dequeue count and visibility timeout
InMemory Provider
InMemoryBrokerthread-safe message store for testingInMemoryTransportPublisherandInMemoryTransportConsumerInMemoryHealthContributoralways-healthy contributor
π Related Projects
| Project | Relationship |
|---|---|
| HoneyDrunk.Kernel | Transport depends on Kernel for IGridContext and TimeProvider |
| HoneyDrunk.Standards | Analyzers and coding conventions |
| HoneyDrunk.Data | Data access and persistence (in development) |
| HoneyDrunk.Auth | Authentication and authorization (in development) |
Note: HoneyDrunk.Transport depends only on HoneyDrunk.Kernel.Abstractions (contracts, no runtime). Transport providers depend on their respective Azure SDKs.
π License
This project is licensed under the MIT License.
<div align="center">
Built with π― by HoneyDrunk Studios
</div>
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Azure.Storage.Queues (>= 12.25.0)
- HoneyDrunk.Transport (>= 0.4.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.2)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.2)
- Microsoft.Extensions.Options (>= 10.0.2)
- Microsoft.Extensions.Options.DataAnnotations (>= 10.0.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
v0.4.0: Kernel v0.4.0 upgrade with IGridContext.AddBaggage() API change, fail-fast envelope validation, and updated Azure.Storage.Queues to 12.25.0.