Rivulet.Sql 2.0.0

dotnet add package Rivulet.Sql --version 2.0.0
                    
NuGet\Install-Package Rivulet.Sql -Version 2.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Rivulet.Sql" Version="2.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Rivulet.Sql" Version="2.0.0" />
                    
Directory.Packages.props
<PackageReference Include="Rivulet.Sql" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Rivulet.Sql --version 2.0.0
                    
#r "nuget: Rivulet.Sql, 2.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Rivulet.Sql@2.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Rivulet.Sql&version=2.0.0
                    
Install as a Cake Addin
#tool nuget:?package=Rivulet.Sql&version=2.0.0
                    
Install as a Cake Tool

Rivulet.Sql

Safe parallel SQL operations with connection pooling awareness and bulk operations.

Built on top of Rivulet.Core, this package provides SQL-aware parallel operators that automatically handle transient database failures, respect connection pooling limits, and support efficient bulk operations.

Installation

dotnet add package Rivulet.Sql

Requires Rivulet.Core (automatically included).

Features

  • Works with any ADO.NET provider
  • Connection pooling awareness
  • Transaction support
  • Parameterized queries
  • Respects database connection pool limits

API

  • ExecuteQueriesParallelAsync - Execute multiple queries
  • ExecuteCommandsParallelAsync - Execute multiple commands
  • BulkInsertAsync - Provider-agnostic bulk insert

Quick Start

Parallel SQL Queries

Execute multiple queries in parallel with automatic retry for transient SQL errors:

using Rivulet.Sql;
using Microsoft.Data.SqlClient;

var userIds = new[] { 1, 2, 3, 4, 5 };
var queries = userIds.Select(id => $"SELECT * FROM Users WHERE Id = {id}");

var results = await queries.ExecuteQueriesParallelAsync(
    () => new SqlConnection(connectionString),
    reader =>
    {
        var users = new List<User>();
        while (reader.Read())
        {
            users.Add(new User
            {
                Id = reader.GetInt32(0),
                Name = reader.GetString(1),
                Email = reader.GetString(2)
            });
        }
        return users;
    },
    new SqlOptions
    {
        CommandTimeout = 30,
        ParallelOptions = new ParallelOptionsRivulet
        {
            MaxDegreeOfParallelism = 10,
            MaxRetries = 3
        }
    });

foreach (var userList in results)
{
    foreach (var user in userList)
    {
        Console.WriteLine($"{user.Id}: {user.Name}");
    }
}

Parameterized Queries

Use parameters to prevent SQL injection:

var userIds = new[] { 1, 2, 3 };
var queriesWithParams = userIds.Select(id => (
    query: "SELECT * FROM Users WHERE Id = @id",
    configureParams: (Action<IDbCommand>)((cmd) =>
    {
        var param = cmd.CreateParameter();
        param.ParameterName = "@id";
        param.Value = id;
        cmd.Parameters.Add(param);
    })
));

var results = await queriesWithParams.ExecuteQueriesParallelAsync(
    () => new SqlConnection(connectionString),
    reader =>
    {
        var user = new User();
        if (reader.Read())
        {
            user.Id = reader.GetInt32(0);
            user.Name = reader.GetString(1);
        }
        return user;
    });

Parallel SQL Commands

Execute multiple INSERT, UPDATE, or DELETE commands in parallel:

var updates = new[]
{
    "UPDATE Users SET LastLogin = GETDATE() WHERE Id = 1",
    "UPDATE Users SET LastLogin = GETDATE() WHERE Id = 2",
    "UPDATE Users SET LastLogin = GETDATE() WHERE Id = 3"
};

var affectedRows = await updates.ExecuteCommandsParallelAsync(
    () => new SqlConnection(connectionString),
    new SqlOptions
    {
        ParallelOptions = new ParallelOptionsRivulet
        {
            MaxDegreeOfParallelism = 5,
            ErrorMode = ErrorMode.CollectAndContinue
        }
    });

Console.WriteLine($"Total rows affected: {affectedRows.Sum()}");

Parallel Scalar Queries

Execute scalar queries (COUNT, MAX, MIN, etc.) in parallel:

var tableNames = new[] { "Users", "Products", "Orders" };
var queries = tableNames.Select(table => $"SELECT COUNT(*) FROM {table}");

var counts = await queries.ExecuteScalarParallelAsync<int>(
    () => new SqlConnection(connectionString));

for (int i = 0; i < tableNames.Length; i++)
{
    Console.WriteLine($"{tableNames[i]}: {counts[i]} rows");
}

Bulk Operations

Bulk Insert

Efficiently insert thousands of records using batched operations:

var users = Enumerable.Range(1, 10000)
    .Select(i => new User { Name = $"User{i}", Email = $"user{i}@example.com" })
    .ToList();

var totalInserted = await users.BulkInsertAsync(
    () => new SqlConnection(connectionString),
    async (batch, cmd, ct) =>
    {
        var sb = new StringBuilder();
        int paramIndex = 0;

        foreach (var user in batch)
        {
            if (sb.Length > 0) sb.Append("; ");
            sb.Append($"INSERT INTO Users (Name, Email) VALUES (@name{paramIndex}, @email{paramIndex})");

            var nameParam = cmd.CreateParameter();
            nameParam.ParameterName = $"@name{paramIndex}";
            nameParam.Value = user.Name;
            cmd.Parameters.Add(nameParam);

            var emailParam = cmd.CreateParameter();
            emailParam.ParameterName = $"@email{paramIndex}";
            emailParam.Value = user.Email;
            cmd.Parameters.Add(emailParam);

            paramIndex++;
        }

        cmd.CommandText = sb.ToString();
        await Task.CompletedTask;
    },
    new BulkOperationOptions
    {
        BatchSize = 1000,
        UseTransaction = true,
        SqlOptions = new SqlOptions
        {
            ParallelOptions = new ParallelOptionsRivulet
            {
                MaxDegreeOfParallelism = 4
            }
        }
    });

Console.WriteLine($"Inserted {totalInserted} users");

Bulk Update

Update multiple records efficiently:

var users = await GetUsersToUpdate();

var totalUpdated = await users.BulkUpdateAsync(
    () => new SqlConnection(connectionString),
    async (batch, cmd, ct) =>
    {
        var sb = new StringBuilder();
        int paramIndex = 0;

        foreach (var user in batch)
        {
            if (sb.Length > 0) sb.Append("; ");
            sb.Append($"UPDATE Users SET Name = @name{paramIndex}, Email = @email{paramIndex} WHERE Id = @id{paramIndex}");

            var idParam = cmd.CreateParameter();
            idParam.ParameterName = $"@id{paramIndex}";
            idParam.Value = user.Id;
            cmd.Parameters.Add(idParam);

            var nameParam = cmd.CreateParameter();
            nameParam.ParameterName = $"@name{paramIndex}";
            nameParam.Value = user.Name;
            cmd.Parameters.Add(nameParam);

            var emailParam = cmd.CreateParameter();
            emailParam.ParameterName = $"@email{paramIndex}";
            emailParam.Value = user.Email;
            cmd.Parameters.Add(emailParam);

            paramIndex++;
        }

        cmd.CommandText = sb.ToString();
        await Task.CompletedTask;
    },
    new BulkOperationOptions
    {
        BatchSize = 500,
        UseTransaction = true
    });

Bulk Delete

Delete multiple records in batches:

var userIdsToDelete = await GetInactiveUserIds();

var totalDeleted = await userIdsToDelete.BulkDeleteAsync(
    () => new SqlConnection(connectionString),
    async (batch, cmd, ct) =>
    {
        cmd.CommandText = $"DELETE FROM Users WHERE Id IN ({string.Join(",", batch)})";
        await Task.CompletedTask;
    },
    new BulkOperationOptions
    {
        BatchSize = 1000,
        UseTransaction = true
    });

Console.WriteLine($"Deleted {totalDeleted} inactive users");

Automatic Retry Handling

Rivulet.Sql automatically retries transient SQL errors:

SQL Server Transient Errors

  • -2, -1: Connection timeout/broken
  • 53: Connection does not exist
  • 64: Error on server
  • 40197, 40501, 40613: Azure SQL transient errors

PostgreSQL (Npgsql) Transient Errors

  • 08000-08006: Connection exceptions
  • 53300: Too many connections
  • 57P03: Cannot connect now

MySQL Transient Errors

  • 1040: Too many connections
  • 1205: Lock wait timeout
  • 1213: Deadlock found
  • 2006, 2013: Server gone away/lost connection
var options = new SqlOptions
{
    ParallelOptions = new ParallelOptionsRivulet
    {
        MaxRetries = 5,
        BaseDelay = TimeSpan.FromMilliseconds(100),
        BackoffStrategy = BackoffStrategy.ExponentialJitter
    },
    OnSqlErrorAsync = (item, exception, retryAttempt) =>
    {
        Console.WriteLine($"SQL error on retry {retryAttempt}: {exception.Message}");
        return ValueTask.CompletedTask;
    }
};

var results = await queries.ExecuteQueriesParallelAsync(
    () => new SqlConnection(connectionString),
    reader => MapToUser(reader),
    options);

Connection Pool Management

Rivulet.Sql is designed to work with ADO.NET connection pooling:

// Connection string with pooling configuration
var connectionString = "Server=localhost;Database=MyDb;User Id=sa;Password=***;" +
                      "Max Pool Size=100;Min Pool Size=10;";

var options = new SqlOptions
{
    AutoManageConnection = true,  // Automatically opens and closes connections
    ParallelOptions = new ParallelOptionsRivulet
    {
        MaxDegreeOfParallelism = 20  // Don't exceed connection pool size
    }
};

// The factory function creates new connections that participate in pooling
var results = await queries.ExecuteQueriesParallelAsync(
    () => new SqlConnection(connectionString),
    reader => MapToUser(reader),
    options);

Best Practice: Set MaxDegreeOfParallelism to be less than your connection pool's Max Pool Size to avoid pool exhaustion.

Batch Operation Callbacks

Monitor bulk operation progress:

var totalProcessed = 0;
var options = new BulkOperationOptions
{
    BatchSize = 1000,
    OnBatchStartAsync = (batch, batchNum) =>
    {
        Console.WriteLine($"Starting batch {batchNum} with {batch.Count} items");
        return ValueTask.CompletedTask;
    },
    OnBatchCompleteAsync = (batch, batchNum, affectedRows) =>
    {
        totalProcessed += affectedRows;
        Console.WriteLine($"Batch {batchNum} complete: {affectedRows} rows affected");
        Console.WriteLine($"Total processed so far: {totalProcessed}");
        return ValueTask.CompletedTask;
    },
    OnBatchErrorAsync = (batch, batchNum, exception) =>
    {
        Console.WriteLine($"Batch {batchNum} failed: {exception.Message}");
        return ValueTask.CompletedTask;
    }
};

await items.BulkInsertAsync(
    () => new SqlConnection(connectionString),
    BuildInsertCommand,
    options);

Advanced Features

Transaction Isolation Levels

Control transaction isolation for bulk operations:

var options = new BulkOperationOptions
{
    UseTransaction = true,
    SqlOptions = new SqlOptions
    {
        IsolationLevel = IsolationLevel.Serializable  // Highest isolation
    }
};

await users.BulkInsertAsync(
    () => new SqlConnection(connectionString),
    BuildInsertCommand,
    options);

Custom Command Timeout

Set per-operation timeouts:

var options = new SqlOptions
{
    CommandTimeout = 120,  // 2 minutes for long-running queries
    ParallelOptions = new ParallelOptionsRivulet
    {
        PerItemTimeout = TimeSpan.FromSeconds(130)  // Overall timeout per item
    }
};

Provider-Agnostic Code

Works with any ADO.NET provider (SQL Server, PostgreSQL, MySQL, SQLite, etc.):

// SQL Server
var results1 = await queries.ExecuteQueriesParallelAsync(
    () => new SqlConnection(sqlServerConnectionString),
    MapToUser);

// PostgreSQL
var results2 = await queries.ExecuteQueriesParallelAsync(
    () => new NpgsqlConnection(postgresConnectionString),
    MapToUser);

// MySQL
var results3 = await queries.ExecuteQueriesParallelAsync(
    () => new MySqlConnection(mysqlConnectionString),
    MapToUser);

Configuration Options

SqlOptions

SQL-specific configuration:

var options = new SqlOptions
{
    CommandTimeout = 30,              // Command timeout in seconds
    AutoManageConnection = true,       // Auto open/close connections
    IsolationLevel = IsolationLevel.ReadCommitted,  // Transaction isolation
    OnSqlErrorAsync = async (item, ex, retry) => { /* custom logging */ },
    ParallelOptions = new ParallelOptionsRivulet
    {
        MaxDegreeOfParallelism = 10,
        MaxRetries = 3,
        BaseDelay = TimeSpan.FromMilliseconds(100),
        BackoffStrategy = BackoffStrategy.ExponentialJitter,
        ErrorMode = ErrorMode.CollectAndContinue
    }
};

BulkOperationOptions

Bulk operation configuration:

var options = new BulkOperationOptions
{
    BatchSize = 1000,                  // Items per batch
    UseTransaction = true,              // Wrap each batch in transaction
    SqlOptions = new SqlOptions { /* ... */ },
    OnBatchStartAsync = async (batch, num) => { /* ... */ },
    OnBatchCompleteAsync = async (batch, num, affected) => { /* ... */ },
    OnBatchErrorAsync = async (batch, num, ex) => { /* ... */ }
};

Best Practices

  1. Use Parameterized Queries: Always use parameters to prevent SQL injection
  2. Set Appropriate Parallelism: Match MaxDegreeOfParallelism to your connection pool size
  3. Enable AutoManageConnection: Let Rivulet handle connection lifecycle unless you have specific needs
  4. Use Transactions for Bulk Operations: Enable UseTransaction = true for data consistency
  5. Monitor Progress: Use callbacks for long-running bulk operations
  6. Tune Batch Size: Experiment with batch sizes (100-2000) for optimal performance
  7. Handle Provider Differences: Be aware of provider-specific SQL syntax and error codes

Performance

Rivulet.Sql is designed for high-throughput database operations:

  • Connection Pooling Aware: Respects connection pool limits to avoid exhaustion
  • Batched Operations: Reduces round-trips for bulk operations
  • Bounded Concurrency: Prevents overwhelming the database
  • Automatic Retries: Handles transient failures without manual intervention
  • Zero Allocations: Uses ValueTask<T> in hot paths

Examples

See the samples directory for complete working examples including:

  • Parallel report generation from multiple queries
  • Bulk data migration between databases
  • ETL pipelines with SQL sources
  • Database maintenance operations

Multi-Database Support

Works seamlessly with:

  • SQL Server (System.Data.SqlClient, Microsoft.Data.SqlClient)
  • PostgreSQL (Npgsql)
  • MySQL (MySql.Data, MySqlConnector)
  • SQLite (System.Data.SQLite, Microsoft.Data.Sqlite)
  • Oracle (Oracle.ManagedDataAccess)
  • Any ADO.NET provider implementing IDbConnection

License

MIT License - see LICENSE file for details


Made with ❤️ by Jeffeek | NuGet | GitHub

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (3)

Showing the top 3 NuGet packages that depend on Rivulet.Sql:

Package Downloads
Rivulet.Sql.SqlServer

SQL Server-specific optimizations for Rivulet.Sql including SqlBulkCopy integration for 10-100x faster bulk inserts

Rivulet.Sql.MySql

MySQL-specific optimizations for Rivulet.Sql including LOAD DATA INFILE integration for 10-100x faster bulk inserts

Rivulet.Sql.PostgreSql

PostgreSQL-specific optimizations for Rivulet.Sql including COPY command integration for 10-100x faster bulk inserts

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.0.0 133 3/24/2026
1.3.0 209 12/13/2025
1.3.0-beta 463 12/8/2025
1.3.0-alpha 338 12/8/2025