The TPL Dataflow library available from the NuGet Package Manager is a great way to manage in memory asynchronous data processing. It’s especially useful in producer/consumer situations because it greatly simplifies the amount of ceremonious code required to manage and synchronize multiple asynchronous processes happening in parallel. Last but not least, the library is written on top of the Task Parallel Library introduced in .NET 4.0, so it makes judicious use of Task and Task<T>, which as of .NET 4.5 can be awaited using the new async/await keywords. You could just say that it’s awesome-sauce in a NuGet Package. While the library offers a number of classes that have different specialties, there is one class that I believe to be the rockstar of the group: ActionBlock<T>.

ActionBlock Jackson

Basically the ActionBlock class takes in an Action<T>, which will serve as the code that will be run for every item passed to the block. So if I wanted to email “Hello” to a number of email addresses, I can represent that like so:

var emailer = new ActionBlock<string>(email =>
{
    using (var smtpClient = new SmtpClient("myhostserver"))
    {
        var message = new MailMessage
        {
            From = new MailAddress("dave@somecompany.com"),
            Subject = "Hi",
            Body = String.Format("Hello {0}!", email)
        };
        message.To.Add(new MailAddress(email));
        smtpClient.Send(message);
    }
});
emailer.Post("person1@place.com");
emailer.Post("person2@place.com");
emailer.Complete();

Here I’m defining my ActionBlock and passing in an anonymous delegate that will send an email to the provided email address. After the block is initialized, I can call the Post method to send items into the block to be processed. The Post method is synchronous and returns a boolean that indicates whether the item was accepted by the block for processing. If you want to send data to the ActionBlock asynchronously you can call the SendAsync method, which returns a Task<bool>, and await it’s return. When we’re done posting data to the ActionBlock and we don’t expect to post any more, we call the Complete method on the ActionBlock. This method indicates that no more data will be posted and allows the ActionBlock to finish up any running threads and end itself gracefully.

Last ActionBlock Hero

Because the ActionBlock is running a Task under the hood to do it’s processing, it exposes a Task property called Completion that allows us to wait for all processing of items to be complete before proceeding with other code. This is very useful when the ActionBlock is running on its own and the caller application needs to shut down gracefully. If waiting is not your thing, you can also use a continuation delegate that can be called when the ActionBlock completes it’s work.

By default the ActionBlock will process each data element posted to it one at a time. Well let’s say I’m impatient and I have thousands of people to email hello to, and I want to send 10 emails at once. With a very minor modification to the declaration of the ActionBlock, thy will be done:

var emailer = new ActionBlock<string>(email =>
            {
                using (var smtpClient = new SmtpClient("myhostserver"))
                {
                    var message = new MailMessage
                    {
                        From = new MailAddress("dave@somecompany.com"),
                        Subject = "Hi",
                        Body = String.Format("Hello {0}!", email)
                    };
                    message.To.Add(new MailAddress(email));
                    smtpClient.Send(message);
                }
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

The only code that is different here is that we’re passing an instance of the ExecutionDataflowBlockOptions class to the ActionBlock in the constructor. The MaxDegreeOfParallelism indicates to the block how many instances of the delegate to run simultaneously. It’s that simple. There are a few other properties on the options object that are useful in tweaking the behavior of the ActionBlock:

  • BoundedCapacity: Indicates how many data elements the ActionBlock will allow to be held in it’s internal queue awaiting processing. By default this is -1, meaning it will store any data you post to it, so you may want to consider bounding it if you expect your delegate to be lagging behind your producer.
  • MaxMessagesPerTask: Indicates how many messages to process before returning the current thread to the pool and re-acquiring a new one. Useful if you have heavy thread contention and want to be ensure thread fairness. By default this is -1, or infinite.
  • CancellationToken: Allows you to pass in a cancellation token so that the ActionBlock can be cancelled from outside code. Cancelling an action block results in all executing threads running to completion and all remaining queued items being purged. The ActionBlock will also refuse any further data posted to it.
  • TaskScheduler: Remember the days when you had to marshal to the UI thread to make changes in an event handler, By passing in TaskScheduler.FromCurrentSynchronizationContext(), the delegate will run on the UI thread and you won’t have to think twice about it. You can also write your own TaskScheduler but that’s genius school and we’re just getting acquainted here.

Executive ActionBlock

Sending emails can be a slow process. Even running 10 at a time, if each email takes 10 seconds to send and we have thousands of emails to process we’re looking at some serious processing time. Each instance of our delegate above is holding onto a thread for 10 seconds while the email is sent, which to a machine is an eternity. Now if sending emails is the only asynchronous processing being done maybe that’s not a big deal, but if you are trying to maximize thread efficiency, why not release the thread while the email is being sent, and then re-acquire a thread to continue processing when the email completes.

It just so happens the ActionBlock supports async delegates too, so let’s take a look at how we can convert our existing example to use async/await and send these emails more efficiently:

var emailer = new ActionBlock<string>(new Func<string, Task>(async email =>
            {
                using (var smtpClient = new SmtpClient("myhostserver"))
                {
                    var message = new MailMessage
                    {
                        From = new MailAddress("dave@somecompany.com"),
                        Subject = "Hi",
                        Body = String.Format("Hello {0}!", email)
                    };
                    message.To.Add(new MailAddress(email));
                    await smtpClient.SendMailAsync(message);
                }
            }), new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 10
            });

First notice that this delegate uses the async keyword to indicate that we will be awaiting within the lambda. As of .NET 4.5, many new methods have been added that support returning a Task that can be awaited, and the SmtpClient is no exception. Using the SendMailAsync method we can await the sending of the email. When the compiler sees the await statement, it takes all code after the await and creates a callback out of it. When the program is run, the executing thread encounters the await and immediately releases the executing thread back to the pool so it can be used for other async work being done in your app. When the email is sent, a new thread is acquired from the pool and continues on the line after the await. Seamless and efficient!

Cut!

The ActionBlock gives us a very easy way to process data concurrently and asynchronously, and it’s only one piece of the Dataflow library. I didn’t go in depth about the rest of Dataflow and the power it can offer, but if you’re interested, here are some other blog posts I and others have written around the internets about Dataflow and Async/Await: