System.Reactive 6.0.0-preview.16

Prefix Reserved
This is a prerelease version of System.Reactive.
There is a newer version of this package available.
See the version list below for details.

Requires NuGet 2.12 or higher.

dotnet add package System.Reactive --version 6.0.0-preview.16                
NuGet\Install-Package System.Reactive -Version 6.0.0-preview.16                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="System.Reactive" Version="6.0.0-preview.16" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add System.Reactive --version 6.0.0-preview.16                
#r "nuget: System.Reactive, 6.0.0-preview.16"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install System.Reactive as a Cake Addin
#addin nuget:?package=System.Reactive&version=6.0.0-preview.16&prerelease

// Install System.Reactive as a Cake Tool
#tool nuget:?package=System.Reactive&version=6.0.0-preview.16&prerelease                

Rx (Reactive Extensions for .NET)

Rx enables event-driven programming with a composable, declarative model.

Getting started

Run the following at a command line:

mkdir TryRx
cd TryRx
dotnet new console
dotnet add package System.Reactive

Alternatively, if you have Visual Studio installed, create a new .NET Console project, and then use the NuGet package manager to add a reference to System.Reactive.

You can then add this code to your Program.cs. This creates an observable source (ticks) that produces an event once every second. It also adds a handler to that source that writes a message to the console for each event:

using System.Reactive.Linq;

IObservable<long> ticks = Observable.Timer(
    dueTime: TimeSpan.Zero,
    period: TimeSpan.FromSeconds(1));

ticks.Subscribe(
    tick => Console.WriteLine($"Tick {tick}"));

Console.ReadLine();

Examples

Wrapping an existing event source as an Rx IObservable<T>

If you have an existing source of events that does not support Rx directly, but which does offer .NET events, you can bring this into the world of Rx using the Observable.FromEventPattern method:

using System.Reactive.Linq;

FileSystemWatcher fsw = new FileSystemWatcher(@"C:\temp");

IObservable<FileSystemEventArgs> changeEvents = Observable
    .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
        h => fsw.Changed += h,
        h => fsw.Changed -= h)
    .Select(e => e.EventArgs);

fsw.EnableRaisingEvents = true;

Waiting for inactivity

It can sometimes be useful to wait for a period of inactivity before taking action. For example, if you have code that monitors a directory in a filesystem, processing modified or added files, it's common for there to be flurries of activity. For example, if a user is copying multiple files into a folder that you're observing, there will be multiple changes, and it could be more efficient to wait until those stop and then process all the changes in one batch, than to attempt to process everything immediately.

This example defines a custom Rx operator that can be attached to any source. It will wait for that source to start producing events, and then, it will wait for it to stop again for the specified period. Each time that happens, it reports all of the activity that occurred between the last two periods of inactivity:

static class RxExt
{
    public static IObservable<IList<T>> Quiescent<T>(
        this IObservable<T> src,
        TimeSpan minimumInactivityPeriod,
        IScheduler scheduler)
    {
        IObservable<int> onoffs =
            from _ in src
            from delta in Observable.Return(1, scheduler).Concat(Observable.Return(-1, scheduler).Delay(minimumInactivityPeriod, scheduler))
            select delta;
        IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
        IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);
        return src.Buffer(zeroCrossings);
    }
}

(This works by creating a sequence (onoffs) that produces a value 1 each time activity occurs, and then a corresponding -1 after the specified time has elapsed. It then uses Scan to produce the outstanding sequence, which is just a running total of those onoffs. This is effectively a count of the number of events that have happened recently (where 'recently' is defined as 'less than minimumInactivityPeriod ago). Every new event that occurs raises this running total by 1, but each time the specified timespan has passed for a particular event, it drops by one. So when this drops back to 0, it means that there are no events that have occurred as recently as the minimumInactivityPeriod. The zeroCrossings sequence picks out just the events in which outstanding drops back to zero. This has the effect that zeroCrossings raises an event every time there has been some activity followed by minimumInactivityPeriod of inactivity. Finally, we plug this into the Buffer operator, which slices the input events (src) into chunks. By passing it the zeroCrossings source, we tell Buffer to deliver a new slice every time the source becomes inactive. The effect is that the source returned by Quiescent does nothing until there has been some activity followed by the specified period of inactivity, at which point it produces a single event reporting all of the source events that have occurred since the previous period, or in the initial case, all of the source events so far.)

You could use this in conjunction with the adapted FileSystemWatcher from the preceding example:

IObservable<IList<FileSystemEventArgs>> fileActivityStopped = changeEvents
    .Quiescent(TimeSpan.FromSeconds(2), Scheduler.Default);

await fileActivityStopped.ForEachAsync(
    a => Console.WriteLine($"File changes stopped after {a.Count} changes"));

(Note: this only uses the Changed event. A real application might also need to look at the FileSystemWatcher's Created, Renamed, and Deleted events.)

Feedback

You can create issues at the https://github.com/dotnet/reactive repository

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net6.0-windows10.0.19041 is compatible.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 is compatible.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Universal Windows Platform uap10.0.18362 is compatible. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1.5K)

Showing the top 5 NuGet packages that depend on System.Reactive:

Package Downloads
System.Reactive.Linq

Legacy facade for Reactive Extensions (Rx) for .NET

System.Reactive.Core

Legacy facade for Reactive Extensions (Rx) for .NET

System.Reactive.Interfaces

Legacy facade for Reactive Extensions (Rx) for .NET

System.Reactive.PlatformServices

Legacy facade for Reactive Extensions (Rx) for .NET

GraphQL.Client

A GraphQL Client for .NET Standard

GitHub repositories (282)

Showing the top 5 popular GitHub repositories that depend on System.Reactive:

Repository Stars
microsoft/PowerToys
Windows system utilities to maximize productivity
shadowsocks/shadowsocks-windows
A C# port of shadowsocks
AvaloniaUI/Avalonia
Develop Desktop, Embedded, Mobile and WebAssembly apps with C# and XAML. The most popular .NET UI client technology
BeyondDimension/SteamTools
🛠「Watt Toolkit」是一个开源跨平台的多功能 Steam 工具箱。
chocolatey/choco
Chocolatey - the package manager for Windows
Version Downloads Last updated
6.0.1 3,365,676 5/22/2024
6.0.1-preview.1 146,098 6/14/2023
6.0.0 16,632,758 5/19/2023
6.0.0-preview.16 766 5/17/2023
6.0.0-preview.13 8,017 4/20/2023
6.0.0-preview.9 32,553 3/31/2023
6.0.0-preview.1 7,679 3/10/2023
5.0.0 71,991,968 11/10/2020
5.0.0-preview.220 17,951 10/15/2020
5.0.0-preview.16 37,596 9/26/2020
4.4.1 44,618,670 4/2/2020
4.3.2 22,219,322 12/24/2019
4.3.1 961,326 12/4/2019
4.2.2 8,284 12/24/2019
4.2.0 1,849,290 10/10/2019
4.2.0-preview.625 36,491 8/19/2019
4.2.0-preview.566 12,521 6/14/2019
4.2.0-preview.102 26,643 2/24/2019
4.2.0-preview.63 140,088 12/12/2018
4.1.6 3,902,367 8/1/2019
4.1.5 4,766,244 4/10/2019
4.1.4 31,013 4/8/2019
4.1.3 1,748,788 2/18/2019
4.1.2 10,417,791 10/10/2018
4.1.1 221,016 10/1/2018
4.1.0 1,323,637 8/13/2018
4.1.0-preview.330 1,707 8/3/2018
4.1.0-preview.84 37,115 6/18/2018
4.0.0 23,495,042 5/25/2018
4.0.0-preview.4.build.391 34,511 5/23/2018
4.0.0-preview.3.build.380 12,207 5/16/2018
4.0.0-preview.2.build.379 11,825 5/12/2018
3.1.1 9,805,579 11/21/2016
3.1.0 161,800 11/5/2016
3.1.0-rc 7,620 9/23/2016
3.0.0 2,197,073 6/27/2016