WebSocket.Rx 0.1.8

This package has a SemVer 2.0.0 package version: 0.1.8+6.
dotnet add package WebSocket.Rx --version 0.1.8
                    
NuGet\Install-Package WebSocket.Rx -Version 0.1.8
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="WebSocket.Rx" Version="0.1.8" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="WebSocket.Rx" Version="0.1.8" />
                    
Directory.Packages.props
<PackageReference Include="WebSocket.Rx" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add WebSocket.Rx --version 0.1.8
                    
#r "nuget: WebSocket.Rx, 0.1.8"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package WebSocket.Rx@0.1.8
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=WebSocket.Rx&version=0.1.8
                    
Install as a Cake Addin
#tool nuget:?package=WebSocket.Rx&version=0.1.8
                    
Install as a Cake Tool

WebSocket.Rx

<div align="center">

<img alt="WebSocket.Rx logo" height="128" src="https://raw.githubusercontent.com/st0o0/WebSocket.Rx/refs/heads/main/docs/logo/logo.png" width="128"/>

A powerful .NET library for reactive WebSocket communication using R3 (Reactive Extensions)

NuGet License Downloads Dotnet </div>


๐Ÿ“‹ Table of Contents

โœจ Features

Client Features

  • ๐Ÿ”„ Automatic Reconnection - Built-in reconnection logic with configurable strategies
  • ๐Ÿ“ก Reactive Streams - Observable sequences for messages, connections, and disconnections
  • ๐Ÿงต Thread-Safe - Safe concurrent message sending and receiving
  • ๐Ÿ“ฆ Message Queuing - Automatic buffering with channel-based send queue
  • โšก High Performance - Built on System.Threading.Channels and ArrayPool<byte>
  • ๐ŸŽฏ Type-Safe - Strong typing with text/binary message support
  • ๐Ÿ”’ Proper Resource Management - Full IAsyncDisposable support with graceful shutdown

Server Features

  • ๐Ÿ‘ฅ Multi-Client Support - Handle multiple WebSocket connections simultaneously
  • ๐Ÿ“Š Client Tracking - Built-in client metadata and connection management
  • ๐Ÿ”” Event Streams - Observables for client connect/disconnect events
  • ๐ŸŽฏ Selective Messaging - Send to specific clients or broadcast to all
  • ๐Ÿ›ก๏ธ Robust Cleanup - Automatic client cleanup on disconnect

๐Ÿ“ฆ Installation

dotnet add package WebSocket.Rx

Requirements: .NET 10.0 or higher

๐Ÿš€ Quick Start

Client Example

using WebSocket.Rx;
using R3;

// Create and configure client
await using var client = new ReactiveWebSocketClient(new Uri("wss://echo.websocket.org"))
{
    IsReconnectionEnabled = true,
    KeepAliveInterval = TimeSpan.FromSeconds(30),
    IsTextMessageConversionEnabled = true
};

// Subscribe to messages
client.MessageReceived
    .Subscribe(msg => Console.WriteLine($"Received: {msg.Text}"));

// Subscribe to connection events
client.ConnectionHappened
    .Subscribe(info => Console.WriteLine($"Connected: {info.Reason}"));

client.DisconnectionHappened
    .Subscribe(info => Console.WriteLine($"Disconnected: {info.Reason}"));

// Connect and send messages
await client.StartAsync();
await client.SendAsTextAsync("Hello WebSocket!");

Server Example

using WebSocket.Rx;
using R3;

// Create and start server
await using var server = new ReactiveWebSocketServer("http://localhost:8080/")
{
    IsTextMessageConversionEnabled = true
};

// Subscribe to client events
server.ClientConnected
    .Subscribe(client => Console.WriteLine($"Client connected: {client.Metadata.Id}"));

server.Messages
    .Subscribe(msg => 
    {
        Console.WriteLine($"From {msg.Metadata.Id}: {msg.Message.Text}");
        // Echo back to sender
        server.SendAsTextAsync(msg.Metadata.Id, $"Echo: {msg.Message.Text}");
    });

await server.StartAsync();
Console.WriteLine($"Server running with {server.ClientCount} clients");

๐ŸŽ“ Core Concepts

Observable Streams

WebSocket.Rx is built around reactive streams using R3:

// Filter and transform messages
client.MessageReceived
    .Where(msg => msg.MessageType == MessageType.Text)
    .Select(msg => msg.Text.ToUpper())
    .Subscribe(text => Console.WriteLine(text));

// Debounce reconnection events
client.ConnectionHappened
    .Throttle(TimeSpan.FromSeconds(1))
    .Subscribe(info => Console.WriteLine("Stable connection established"));

Message Types

// Send text message (queued)
await client.SendAsTextAsync("Hello");

// Send binary message (queued)
await client.SendAsBinaryAsync(new byte[] { 0x01, 0x02 });

// Send instant (bypasses queue)
await client.SendInstantAsync("Urgent message");

// Try send (non-blocking)
bool sent = client.TrySendAsText("Optional message");

Connection Lifecycle

// Start connection
await client.StartAsync();

// Reconnect manually
await client.ReconnectAsync();

// Stop gracefully
await client.StopAsync(WebSocketCloseStatus.NormalClosure, "Goodbye");

// Dispose (automatic cleanup)
await client.DisposeAsync();

๐Ÿ”ง Advanced Usage

Custom Configuration

var client = new ReactiveWebSocketClient(new Uri("wss://example.com"))
{
    // Connection settings
    ConnectTimeout = TimeSpan.FromSeconds(10),
    KeepAliveInterval = TimeSpan.FromSeconds(30),
    KeepAliveTimeout = TimeSpan.FromSeconds(10),
    
    // Reconnection
    IsReconnectionEnabled = true,
    
    // Message handling
    IsTextMessageConversionEnabled = true,
    MessageEncoding = Encoding.UTF8
};

Server Broadcasting

// Broadcast to all clients
foreach (var clientId in server.ConnectedClients.Keys)
{
    await server.SendAsTextAsync(clientId, "Broadcast message");
}

// Send to specific clients
var targetClients = server.ConnectedClients
    .Where(c => c.Value.CustomData?.Contains("premium") == true)
    .Select(c => c.Key);

foreach (var clientId in targetClients)
{
    await server.SendAsTextAsync(clientId, "Premium feature alert!");
}

Error Handling

client.DisconnectionHappened
    .Subscribe(info => 
    {
        Console.WriteLine($"Disconnect reason: {info.Reason}");
        if (info.Exception != null)
        {
            Console.WriteLine($"Error: {info.Exception.Message}");
        }
    });

Combining Observables

// Wait for connection before sending
client.ConnectionHappened
    .Take(1)
    .Subscribe(_ => client.SendAsTextAsync("Connected!"));

// Process messages in batches
client.MessageReceived
    .Buffer(TimeSpan.FromSeconds(1))
    .Where(batch => batch.Count > 0)
    .Subscribe(batch => Console.WriteLine($"Processed {batch.Count} messages"));

๐Ÿ“š API Reference

ReactiveWebSocketClient

Property Type Description
Url Uri WebSocket server URL
IsStarted bool Client started state
IsRunning bool Client running state
IsReconnectionEnabled bool Enable auto-reconnect
MessageReceived Observable<ReceivedMessage> Message stream
ConnectionHappened Observable<Connected> Connection stream
DisconnectionHappened Observable<Disconnected> Disconnection stream

Key Methods:

  • Task StartAsync() - Start the client
  • Task StopAsync(status, description) - Stop gracefully
  • Task ReconnectAsync() - Manual reconnect
  • Task SendAsTextAsync(message) - Send text (queued)
  • Task SendAsBinaryAsync(data) - Send binary (queued)
  • ValueTask DisposeAsync() - Clean up resources

ReactiveWebSocketServer

Property Type Description
IsRunning bool Server running state
ClientCount int Number of connected clients
ConnectedClients IReadOnlyDictionary<Guid, Metadata> Client metadata
ClientConnected Observable<ClientConnected> Client connect stream
ClientDisconnected Observable<ClientDisconnected> Client disconnect stream
Messages Observable<ServerReceivedMessage> Server message stream

Key Methods:

  • Task StartAsync() - Start the server
  • Task<bool> StopAsync(status, description) - Stop server
  • Task<bool> SendAsTextAsync(clientId, message) - Send to client
  • ValueTask DisposeAsync() - Clean up resources

๐Ÿ’ก Inspiration

This library is inspired by and builds upon the excellent work of:

Websocket.Client by Marfusios

WebSocket.Rx takes inspiration from Websocket.Client's elegant reactive approach to WebSocket communication. Key influences include:

  • Reactive-First Design - Using observables for all events and messages
  • Automatic Reconnection - Built-in reconnection logic for robust connections
  • Clean API - Intuitive and easy-to-use interface
What's Different?

While honoring the spirit of Websocket.Client, WebSocket.Rx offers:

  • โœ… R3 Integration - Built on the modern R3 reactive library (successor to Rx.NET)
  • โœ… Server Support - Full-featured WebSocket server implementation
  • โœ… Modern .NET - Built for .NET 10+ with latest performance optimizations
  • โœ… IAsyncDisposable - Proper async resource cleanup
  • โœ… Channel-Based Queuing - High-performance message queue using System.Threading.Channels
  • โœ… Enhanced Memory Management - Uses ArrayPool<byte> and RecyclableMemoryStream

Both libraries share the same core philosophy: WebSocket communication should be simple, reactive, and reliable.

๐Ÿค Contributing

Contributions are welcome! This library grows with the community's needs.

How to Contribute

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/amazing-feature
  3. Write tests for your changes
  4. Ensure all tests pass: dotnet test
  5. Submit a Pull Request

Guidelines

  • โœ… Follow existing code style and conventions
  • โœ… Include unit tests for new features
  • โœ… Update documentation for API changes
  • โœ… Keep PRs focused and atomic
  • โœ… Write meaningful commit messages

Development Setup

git clone https://github.com/st0o0/WebSocket.Rx.git
cd WebSocket.Rx
dotnet restore
dotnet build
dotnet test

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


<div align="center">

Built with โค๏ธ for the .NET community

Report Bug ยท Request Feature ยท Documentation

</div>

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on WebSocket.Rx:

Package Downloads
Signal.Bot

Package Description

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.1.8 105 2/20/2026
0.1.7 51 2/17/2026
0.1.6 220 2/13/2026
0.1.5 122 2/9/2026
0.1.4 110 2/7/2026
0.1.3 37 2/6/2026
0.1.2 42 2/6/2026
0.1.1 33 2/6/2026
0.1.0 39 2/3/2026

🎯 WebSocket.Rx v0.1.8

📦 NuGet: https://nuget.org/packages/WebSocket.Rx
🔗 Release: https://github.com/st0o0/WebSocket.Rx/releases/v0.1.8

See release page for full changelog and details!

Built with โค๏ธ