AsyncEnumerator 4.0.1

Introduces IAsyncEnumerable, IAsyncEnumerator, ForEachAsync(), and ParallelForEachAsync()
GitHub: https://github.com/Dasync/AsyncEnumerable

PROBLEM SPACE

Helps to (a) create an element provider, where producing an element can take a lot of time
due to dependency on other asynchronous events (e.g. wait handles, network streams), and
(b) a consumer that processes those element as soon as they are ready without blocking
the thread (the processing is scheduled on a worker thread instead).


EXAMPLE

using Dasync.Collections;

static IAsyncEnumerable<int> ProduceAsyncNumbers(int start, int end)
{
 return new AsyncEnumerable<int>(async yield => {

   // Just to show that ReturnAsync can be used multiple times
   await yield.ReturnAsync(start);

   for (int number = start + 1; number <= end; number++)
     await yield.ReturnAsync(number);

   // You can break the enumeration loop with the following call:
   yield.Break();

   // This won't be executed due to the loop break above
   await yield.ReturnAsync(12345);
 });
}

// Just to compare with synchronous version of enumerator
static IEnumerable<int> ProduceNumbers(int start, int end)
{
 yield return start;

 for (int number = start + 1; number <= end; number++)
   yield return number;

 yield break;

 yield return 12345;
}

static async Task ConsumeNumbersAsync()
{
 var asyncEnumerableCollection = ProduceAsyncNumbers(start: 1, end: 10);
 await asyncEnumerableCollection.ForEachAsync(async number => {
   await Console.Out.WriteLineAsync($"{number}");
 });
}

// Just to compare with synchronous version of enumeration
static void ConsumeNumbers()
{
 var enumerableCollection = ProduceNumbers(start: 1, end: 10);
 foreach (var number in enumerableCollection) {
   Console.Out.WriteLine($"{number}");
 }
}

Install-Package AsyncEnumerator -Version 4.0.1
dotnet add package AsyncEnumerator --version 4.0.1
<PackageReference Include="AsyncEnumerator" Version="4.0.1" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add AsyncEnumerator --version 4.0.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.

1: How to use this library?

See examples above. You can the core code and lots of useful extension methods in the Dasync.Collections namespace.

2: Using CancellationToken

    using Dasync.Collections;
    
    IAsyncEnumerable<int> ProduceNumbers()
    {
      return new AsyncEnumerable<int>(async yield => {

        await FooAsync(yield.CancellationToken);
      });
    }

3: Always remember about ConfigureAwait(false)

To avoid performance degradation and possible dead-locks in ASP.NET or WPF applications (or any SynchronizationContext-dependent environment), you should always put ConfigureAwait(false) in your await statements:

    using Dasync.Collections;
    
    IAsyncEnumerable<int> GetValues()
    {
      return new AsyncEnumerable<int>(async yield =>
      {
        await FooAsync().ConfigureAwait(false);

        // Yes, it's even needed for 'yield.ReturnAsync'
        await yield.ReturnAsync(123).ConfigureAwait(false);
      });
    }

4: Clean-up on incomplete enumeration

Imagine such situation:

    using Dasync.Collections;

    IAsyncEnumerable<int> ReadValuesFromQueue()
    {
      return new AsyncEnumerable<int>(async yield =>
      {
        using (var queueClient = CreateQueueClient())
        {
          while (true)
          {
            var message = queueClient.DequeueMessageAsync();
            if (message == null)
              break;
            
            await yield.ReturnAsync(message.Value);
          }
        }
      });
    }

    Task<int> ReadFirstValueOrDefaultAsync()
    {
      return ReadValuesFromQueue().FirstOrDefaultAsync();
    }

The FirstOrDefaultAsync method will try to read first value from the IAsyncEnumerator, and then will just dispose it. However, disposing AsyncEnumerator does not mean that the queueClient in the lambda function will be disposed automatically as well, because async methods are just state machines which need somehow to go to a particular state to do the clean-up.
To provide such behavior, when you dispose an AsyncEnumerator before you reach the end of enumeration, it will tell to resume your async lambda function (at await yield.ReturnAsync()) with the AsyncEnumerationCanceledException (derives from OperationCanceledException). Having such exception in your lambda method will break normal flow of enumeration and will go to terminal state of the underlying state machine, what will do the clean-up, i.e. dispose the queueClient in this case. You don't need (and shouldn't) catch that exception type, because it's handled internally by AsyncEnumerator. The same exception is thrown when you call yield.Break().

There is another option to do the cleanup on Dispose:

    using Dasync.Collections;

    IAsyncEnumerator<int> GetQueueEnumerator()
    {
      var queueClient = CreateQueueClient();

      return new AsyncEnumerable<int>(async yield =>
      {
        while (true)
        {
          var message = queueClient.DequeueMessageAsync();
          if (message == null)
            break;
            
          await yield.ReturnAsync(message.Value);
        }
      },
      onDispose: () => queueClient.Dispose());
    }

5: Why is GetAsyncEnumeratorAsync async?

The IAsyncEnumerable.GetAsyncEnumeratorAsync() method is async and returns a Task&lt;IAsyncEnumerator&gt;, where the current implementation of AsyncEnumerable always runs that method synchronously and just returns an instance of AsyncEnumerator. Having interfaces allows you to do your own implementation, where classes mentioned above are just helpers. The initial idea was to be able to support database-like scenarios, where GetAsyncEnumeratorAsync() executes a query first (what internally returns a pointer), and the MoveNextAsync() enumerates through rows (by using that pointer).

6: Returning IAsyncEnumerable vs IAsyncEnumerator

When you implement a method that returns an async-enumerable collection you have a choice to return either IAsyncEnumerable or IAsyncEnumerator - the constructors of the helper classes AsyncEnumerable and AsyncEnumerator are absolutely identical. Both interfaces have same set of useful extension methods, like ForEachAsync.

When you create an 'enumerable', you create a factory that produces 'enumerators', i.e. you can enumerate through a collection many times. On the other hand, creating an 'enumerator' is needed when you can through a collection only once.

7: Where is Reset or ResetAsync?

The Reset method must not be on the IEnumerator interface, and should be considered as deprecated. Create a new enumerator instead. This is the reason why the 'oneTimeUse' flag was removed in version 2 of this library.

8: How can I do synchronous for-each enumeration through IAsyncEnumerable?

You can use extension methods like IAsyncEnumerable.ToEnumerable() to use built-in foreach enumeration, BUT you should never do that! The general idea of this library is to avoid thread-blocking calls on worker threads, where converting an IAsyncEnumerable to IEnumerable will just defeat the whole purpose of this library. This is the reason why such synchronous extension methods are marked with [Obsolete] attribute.

9: What's the difference between ForEachAsync and ParallelForEachAsync?

The ForEachAsync allows you to go through a collection and perform an action on every single item in sequential manner. On the other hand, ParallelForEachAsync allows you to run the action on multiple items at the same time where the sequential
order of completion is not guaranteed. For the latter, the degree of the parallelism is controlled by the maxDegreeOfParalellism
argument, however it does not guarantee to spin up the exact amount of threads, because it depends on the thread pool size and its occupancy at a moment of time. Such parallel approach is much better than trying to create a task for an action for every single item on the collection and then awaiting on all of them with Task.WhenAll, because it adds less overhead to the runtime, better with memory usage, and helps with throttling-sensitive scenarios.

1: How to use this library?

See examples above. You can the core code and lots of useful extension methods in the Dasync.Collections namespace.

2: Using CancellationToken

    using Dasync.Collections;
    
    IAsyncEnumerable<int> ProduceNumbers()
    {
      return new AsyncEnumerable<int>(async yield => {

        await FooAsync(yield.CancellationToken);
      });
    }

3: Always remember about ConfigureAwait(false)

To avoid performance degradation and possible dead-locks in ASP.NET or WPF applications (or any SynchronizationContext-dependent environment), you should always put ConfigureAwait(false) in your await statements:

    using Dasync.Collections;
    
    IAsyncEnumerable<int> GetValues()
    {
      return new AsyncEnumerable<int>(async yield =>
      {
        await FooAsync().ConfigureAwait(false);

        // Yes, it's even needed for 'yield.ReturnAsync'
        await yield.ReturnAsync(123).ConfigureAwait(false);
      });
    }

4: Clean-up on incomplete enumeration

Imagine such situation:

    using Dasync.Collections;

    IAsyncEnumerable<int> ReadValuesFromQueue()
    {
      return new AsyncEnumerable<int>(async yield =>
      {
        using (var queueClient = CreateQueueClient())
        {
          while (true)
          {
            var message = queueClient.DequeueMessageAsync();
            if (message == null)
              break;
            
            await yield.ReturnAsync(message.Value);
          }
        }
      });
    }

    Task<int> ReadFirstValueOrDefaultAsync()
    {
      return ReadValuesFromQueue().FirstOrDefaultAsync();
    }

The FirstOrDefaultAsync method will try to read first value from the IAsyncEnumerator, and then will just dispose it. However, disposing AsyncEnumerator does not mean that the queueClient in the lambda function will be disposed automatically as well, because async methods are just state machines which need somehow to go to a particular state to do the clean-up.
To provide such behavior, when you dispose an AsyncEnumerator before you reach the end of enumeration, it will tell to resume your async lambda function (at await yield.ReturnAsync()) with the AsyncEnumerationCanceledException (derives from OperationCanceledException). Having such exception in your lambda method will break normal flow of enumeration and will go to terminal state of the underlying state machine, what will do the clean-up, i.e. dispose the queueClient in this case. You don't need (and shouldn't) catch that exception type, because it's handled internally by AsyncEnumerator. The same exception is thrown when you call yield.Break().

There is another option to do the cleanup on Dispose:

    using Dasync.Collections;

    IAsyncEnumerator<int> GetQueueEnumerator()
    {
      var queueClient = CreateQueueClient();

      return new AsyncEnumerable<int>(async yield =>
      {
        while (true)
        {
          var message = queueClient.DequeueMessageAsync();
          if (message == null)
            break;
            
          await yield.ReturnAsync(message.Value);
        }
      },
      onDispose: () => queueClient.Dispose());
    }

5: Why is GetAsyncEnumeratorAsync async?

The IAsyncEnumerable.GetAsyncEnumeratorAsync() method is async and returns a Task&lt;IAsyncEnumerator&gt;, where the current implementation of AsyncEnumerable always runs that method synchronously and just returns an instance of AsyncEnumerator. Having interfaces allows you to do your own implementation, where classes mentioned above are just helpers. The initial idea was to be able to support database-like scenarios, where GetAsyncEnumeratorAsync() executes a query first (what internally returns a pointer), and the MoveNextAsync() enumerates through rows (by using that pointer).

6: Returning IAsyncEnumerable vs IAsyncEnumerator

When you implement a method that returns an async-enumerable collection you have a choice to return either IAsyncEnumerable or IAsyncEnumerator - the constructors of the helper classes AsyncEnumerable and AsyncEnumerator are absolutely identical. Both interfaces have same set of useful extension methods, like ForEachAsync.

When you create an 'enumerable', you create a factory that produces 'enumerators', i.e. you can enumerate through a collection many times. On the other hand, creating an 'enumerator' is needed when you can through a collection only once.

7: Where is Reset or ResetAsync?

The Reset method must not be on the IEnumerator interface, and should be considered as deprecated. Create a new enumerator instead. This is the reason why the 'oneTimeUse' flag was removed in version 2 of this library.

8: How can I do synchronous for-each enumeration through IAsyncEnumerable?

You can use extension methods like IAsyncEnumerable.ToEnumerable() to use built-in foreach enumeration, BUT you should never do that! The general idea of this library is to avoid thread-blocking calls on worker threads, where converting an IAsyncEnumerable to IEnumerable will just defeat the whole purpose of this library. This is the reason why such synchronous extension methods are marked with [Obsolete] attribute.

9: What's the difference between ForEachAsync and ParallelForEachAsync?

The ForEachAsync allows you to go through a collection and perform an action on every single item in sequential manner. On the other hand, ParallelForEachAsync allows you to run the action on multiple items at the same time where the sequential
order of completion is not guaranteed. For the latter, the degree of the parallelism is controlled by the maxDegreeOfParalellism
argument, however it does not guarantee to spin up the exact amount of threads, because it depends on the thread pool size and its occupancy at a moment of time. Such parallel approach is much better than trying to create a task for an action for every single item on the collection and then awaiting on all of them with Task.WhenAll, because it adds less overhead to the runtime, better with memory usage, and helps with throttling-sensitive scenarios.

Release Notes

4.0.0: Use interfaces from Microsoft.Bcl.AsyncInterfaces package in NET Standard 2.0.
3.1.0: Add support for NET Standard 2.1, consolidate interface with Microsoft's implementation.
2.2.0: New extension methods: SelectMany, Append, Prepend, OfType, Concat, Distinct, ToDictionaryAsync, ToLookupAsync, AggregateAsync.
2.1.0: New extension methods: Batch, UnionAll, Single, SingleOrDefault, DefaultIfEmpty, Cast.
2.0.0: Revise design of the library: same features, but slight paradigm shift and interface breaking changes.
1.5.0: Add support for .NET Standard, minor improvements.
1.4.2: Add finalizer to AsyncEnumerator and call Dispose in ForEachAsync and ParallelForEachAsync extension methods.
1.4.0: Add new generic type AsyncEnumeratorWithState for performance optimization.
      Now IAsyncEnumerator<T> is covariant.
      Add ForEachAsync, ParallelForeachAsync, and LINQ-style extension methods for IAsyncEnumerator.
1.2.1: New Linq-style extension methods in System.Collections.Async namespace.
1.1.0: Add ParallelForEachAsync extension methods for IEnumerable<T> and IAsyncEnumerable<T> in System.Collections.Async namespace.

Showing the top 4 GitHub repositories that depend on AsyncEnumerator:

Repository Stars
dotnetcore/CAP
Distributed transaction solution in micro-service base on eventually consistency, also an eventbus with Outbox pattern
RevoLand/Steam-Library-Manager
Open source utility to manage Steam, Origin and Uplay libraries in ease of use with multi library support
planetarium/libplanet
Blockchain core in C#/.NET for persistent peer-to-peer online games
Dasync/Dasync
Every developer deserves the right of creating microservices without using any framework 🤍

Version History

Version Downloads Last updated
4.0.1 5,250 10/22/2019
4.0.0 1,213 10/18/2019
3.1.0 9,666 9/23/2019
2.2.2 316,117 1/27/2019
2.2.1 484,814 5/29/2018
2.2.0 14,962 5/18/2018
2.1.1 84,694 1/20/2018
2.1.0 161,036 5/22/2017
2.0.1 43,495 2/13/2017
1.5.0 3,046 2/12/2017
1.4.2 1,027 2/6/2017
1.3.0 1,295 1/20/2017
1.2.3 6,337 1/6/2017
1.2.2 1,341 12/11/2016
1.2.1 376 12/10/2016
1.2.0 14,269 11/29/2016
1.1.3 395 11/28/2016
1.1.2 17,041 8/29/2016
1.0.3 2,085 4/28/2016