Anthony Chu Contact Me

Async Streams with IAsyncEnumerable in .NET Core 3

Wednesday, July 31, 2019

One of the most exciting features of .NET Core 3 and C# 8.0 has been the addition of IAsyncEnumerable<T> (aka async streams). But what's so special about it? What can we do now that wasn't possible before?

In this article, we'll look at what challenges IAsyncEnumerable<T> is intended to solve, how to implement it in our own applications, and why IAsyncEnumerable<T> will replace Task<IEnumerable<T>> in many situations.

Check out all the new features in .NET Core 3

Life before IAsyncEnumerable<T>

Perhaps the best way to illustrate why IAsyncEnumerable<T> is useful is to take a look at what challenges exist without it.

Imagine we're building a data access library, and we need a method that queries a data store or API for some data. It's pretty common for that method to return Task<IEnumerable<T>>, like this:

public async Task<IEnumerable<Product>> GetAllProducts()

To implement the method, we typically perform some data access asynchronously, then return all the data when it's finished. The problem with this becomes more evident when we need to make multiple asynchronous calls to obtain the data. For example, our database or API could be returning data in pages, like this implementation that uses Azure Cosmos DB:

public async Task<IEnumerable<Product>> GetAllProducts()
{
    Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
    var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
    var products = new List<Product>();
    while (iterator.HasMoreResults)
    {
        foreach (var product in await iterator.ReadNextAsync())
        {
            products.Add(product);
        }
    }
    return products;
}

Notice we are paging through all the results in a while loop, instantiating all the product objects, placing them into a List<Product>, and finally we return the whole thing. This is quite inefficient, especially for larger datasets.

Maybe we can create a more efficient implementation by changing our method to return results one page at a time:

public IEnumerable<Task<IEnumerable<Product>>> GetAllProducts()
{
    Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
    var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
    while (iterator.HasMoreResults)
    {
        yield return iterator.ReadNextAsync().ContinueWith(t => 
        {
            return (IEnumerable<Product>)t.Result;
        });
    }
}

The caller would consume the method like this:

foreach (var productsTask in productsRepository.GetAllProducts())
{
    foreach (var product in await productsTask)
    {
        Console.WriteLine(product.Name);
    }
}

This implementation is more efficient, but the method now returns IEnumerable<Task<IEnumerable<Product>>>. As we can see in the calling code, it's not intuitive to understand how to invoke the method and process the data. More importantly, paging is an implementation detail of the data access method that the caller should know nothing about.

IAsyncEnumerable<T> to the rescue

What we really want to do is to retrieve data asynchronously from our database and stream results back to the caller as they become available.

In synchronous code, a method that returns IEnumerable<T> can use the yield return statement to return each piece of data to the caller as it is returned from the database.

public IEnumerable<Product> GetAllProducts()
{
    Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
    var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
    while (iterator.HasMoreResults)
    {
        foreach (var product in iterator.ReadNextAsync().Result)
        {
            yield return product;
        }
    }
}

However, DO NOT DO THIS! The above code turns the async database call into a blocking call and will not scale.

If only we could use yield return with asynchronous methods! That hasn't been possible... until now.

IAsyncEnumerable<T> was introduced in .NET Core 3 (.NET Standard 2.1). It exposes an enumerator that has a MoveNextAsync() method that can awaited. This means the producer can make asynchronous calls in between yielding results.

Instead of returning a Task<IEnumerable<T>>, our method can now return IAsyncEnumerable<T> and use yield return to emit data.

public async IAsyncEnumerable<Product> GetAllProducts()
{
    Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
    var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
    while (iterator.HasMoreResults)
    {
        foreach (var product in await iterator.ReadNextAsync())
        {
            yield return product;
        }
    }
}

To consume the results, we need to use the new await foreach() syntax available in C# 8:

await foreach (var product in productsRepository.GetAllProducts())
{
    Console.WriteLine(product);
}

This is much nicer. The method produces data as they are available. The calling code consumes the data at its own pace.

IAsyncEnumerable<T> and ASP.NET Core

Starting with .NET Core 3 Preview 7, ASP.NET is able to return IAsyncEnumerable<T> from an API controller action. That means we can return our method's results directly -- effectively streaming data from the database to the HTTP response.

[HttpGet]
public IAsyncEnumerable<Product> Get()
    => productsRepository.GetAllProducts();

Replacing Task<IEnumerable<T>> with IAsyncEnumerable<T>

As times goes by and the adoption .NET Core 3 and .NET Standard 2.1 grows, expect to see IAsyncEnumerable<T> to be used in places where we've typically used Task<IEnumerable<T>>.

I look forward to seeing libraries support IAsyncEnumerable<T>. Throughout this article, we've seen code like this for querying data using the Azure Cosmos DB 3.0 SDK:

var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
while (iterator.HasMoreResults)
{
    foreach (var product in await iterator.ReadNextAsync())
    {
        Console.WriteLine(product.Name);
    }
}

Like our earlier examples, Cosmos DB's own SDK also leaks its paging implementation detail and that makes it awkward to process query results.

To see what it could look like if GetItemQueryIterator<Product>() returned IAsyncEnumerable<T> instead, we can create an extension method on FeedIterator:

public static class FeedIteratorExtensions
{
    public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(this FeedIterator<T> iterator)
    {
        while (iterator.HasMoreResults)
        {
            foreach(var item in await iterator.ReadNextAsync())
            {
                yield return item;
            }
        }
    }
}

Now we can process our query results in a much cleaner way:

var products = container
    .GetItemQueryIterator<Product>("SELECT * FROM c")
    .ToAsyncEnumerable();
await foreach (var product in products)
{
    Console.WriteLine(product.Name);
}

Summary

IAsyncEnumerable<T> is a welcomed addition to .NET and will make for much cleaner and more efficient code in many cases. Learn more about it with these resources: