Rivulet.Core
1.0.1
See the version list below for details.
dotnet add package Rivulet.Core --version 1.0.1
NuGet\Install-Package Rivulet.Core -Version 1.0.1
<PackageReference Include="Rivulet.Core" Version="1.0.1" />
<PackageVersion Include="Rivulet.Core" Version="1.0.1" />
<PackageReference Include="Rivulet.Core" />
paket add Rivulet.Core --version 1.0.1
#r "nuget: Rivulet.Core, 1.0.1"
#:package Rivulet.Core@1.0.1
#addin nuget:?package=Rivulet.Core&version=1.0.1
#tool nuget:?package=Rivulet.Core&version=1.0.1
Rivulet.Core
Safe, async-first parallel operators with bounded concurrency, retries, and backpressure for I/O-heavy workloads.
Transform collections in parallel while maintaining control over concurrency, errors, and resource usage.
Installation
dotnet add package Rivulet.Core
Quick Start
Parallel Transformation
using Rivulet.Core;
var urls = new[] { "https://api.example.com/1", "https://api.example.com/2", /* ... */ };
var results = await urls.SelectParallelAsync(
async (url, ct) =>
{
using var response = await httpClient.GetAsync(url, ct);
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync(ct);
},
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
MaxRetries = 3,
IsTransient = ex => ex is HttpRequestException or TaskCanceledException,
ErrorMode = ErrorMode.CollectAndContinue
});
Streaming Results
Process results as they complete instead of waiting for all:
await foreach (var result in source.SelectParallelStreamAsync(
async (item, ct) => await ProcessAsync(item, ct),
new ParallelOptionsRivulet { MaxDegreeOfParallelism = 16 }))
{
// Handle result immediately as it completes
Console.WriteLine(result);
}
Parallel Side Effects
Execute actions in parallel without collecting results:
await items.ForEachParallelAsync(
async (item, ct) => await SaveToDbAsync(item, ct),
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10,
ErrorMode = ErrorMode.FailFast
});
Key Features
- ✅ Bounded Concurrency - Control max parallel operations with backpressure
- ✅ Retry Policies - Automatic retries with exponential backoff for transient errors
- ✅ Error Handling Modes - FailFast, CollectAndContinue, or BestEffort
- ✅ Streaming Support - Process results incrementally via
IAsyncEnumerable<T> - ✅ Cancellation - Full
CancellationTokensupport throughout - ✅ Lifecycle Hooks - OnStart, OnComplete, OnError, OnThrottle callbacks
- ✅ Per-Item Timeouts - Enforce timeouts for individual operations
- ✅ Works with both
IEnumerable<T>andIAsyncEnumerable<T>
Configuration Options
new ParallelOptionsRivulet
{
// Concurrency control
MaxDegreeOfParallelism = 32, // Max concurrent operations (default: CPU cores)
ChannelCapacity = 1024, // Backpressure buffer size (streaming only)
// Error handling
ErrorMode = ErrorMode.CollectAndContinue, // How to handle failures
OnErrorAsync = async (index, ex) => { /* ... */ return true; },
// Retry policy
MaxRetries = 3, // Number of retry attempts
IsTransient = ex => ex is HttpRequestException, // Which errors to retry
BaseDelay = TimeSpan.FromMilliseconds(100), // Exponential backoff base
// Timeouts
PerItemTimeout = TimeSpan.FromSeconds(30), // Timeout per item
// Lifecycle hooks
OnStartItemAsync = async (index) => { /* ... */ },
OnCompleteItemAsync = async (index) => { /* ... */ },
OnThrottleAsync = async (count) => { /* ... */ },
OnDrainAsync = async (count) => { /* ... */ }
}
Error Modes
- FailFast - Stop immediately on first error and throw
- CollectAndContinue - Continue processing, collect all errors, throw
AggregateExceptionat end - BestEffort - Continue processing, return successful results only, suppress errors
Framework Support
- .NET 8.0
- .NET 9.0
Documentation & Source
- GitHub Repository: https://github.com/Jeffeek/Rivulet
- Report Issues: https://github.com/Jeffeek/Rivulet/issues
- License: MIT
Performance Tips
- Choose appropriate parallelism - Start with
MaxDegreeOfParallelism = 32for I/O-bound work - Use streaming for large datasets -
SelectParallelStreamAsyncreduces memory usage - Set per-item timeouts - Prevent hung operations from blocking the pipeline
- Configure backpressure - Adjust
ChannelCapacitybased on memory constraints - Handle transient errors - Use
IsTransientandMaxRetriesfor flaky APIs
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- System.Linq.Async (>= 6.0.3)
- System.Threading.Channels (>= 8.0.0)
-
net9.0
- System.Linq.Async (>= 6.0.3)
- System.Threading.Channels (>= 8.0.0)
NuGet packages (9)
Showing the top 5 NuGet packages that depend on Rivulet.Core:
| Package | Downloads |
|---|---|
|
Rivulet.Diagnostics
Enterprise observability for Rivulet.Core with EventListener wrappers, metric aggregators, and health check integration. |
|
|
Rivulet.Testing
Testing utilities for Rivulet parallel operations including deterministic schedulers, virtual time, fake channels, and chaos injection. |
|
|
Rivulet.Diagnostics.OpenTelemetry
OpenTelemetry integration for Rivulet.Core with distributed tracing, metrics, and logging support. |
|
|
Rivulet.Hosting
Integration with Microsoft.Extensions.Hosting for dependency injection, configuration, and hosted services using Rivulet parallel operations. |
|
|
Rivulet.Sql
Safe parallel SQL operations with connection pooling awareness and bulk operations for Rivulet.Core |
GitHub repositories
This package is not used by any popular GitHub repositories.