Get Even More Visitors To Your Blog, Upgrade To A Business Listing >>

Dissecting the ActionBlock: a Short Story About a Nasty Deadlock

I think almost every project in the real world uses some form of producer-consumer Queue. It could be something totally custom, with business logic intermixed with threading. It could be something semi-custom, like a list of tasks that deals with shared BlockingCollection. Or it could be a simple solution that is based on existing components, like TPL DataFlow ActionBlock.

The idea of this problem is very simple: in many systems, you want to separate a set of types that schedules some work from another set of types that processes it. Add some concurrency here and you need to solve the producer-consumer problem.Today we’re going to explore some internals of ActionBlock. We also will discuss some interesting problems, like why frameworks are so hard to design and how to avoid leaky abstractions.

Ready? Let’s go.

In my current project, we have different ways of dealing with concurrent dataflows. And in order to unify them we moved some piece of our code to use ActionBlock. The problem that we’re trying to solve is pretty common: we have a parser and custom interpreter for a TypeScript-like language. Without digging too deep into details, you can assume that we just need to parse a set of, let say, TypeScript files and build what is called a ‘transitive closure’ of all dependencies along the way.

Roughly we have the following logic:

  1. Parse the file
  2. Analyze the file to understand what dependencies are.
  3. Resolve the dependencies (i.e. resolve which TypeScript files are required by this one)
  4. Schedule all the dependencies for parsing.

Pretty simple, right? Actually it is and here you can see how this logic could be implemented using TPL DataFlow and ActionBlock:

private static TaskParsedFile> ParseFileAsync(string path)
{
   
Console.WriteLine($"Parsing {path}. Thread Id - {Thread.CurrentThread.ManagedThreadId}");
   
Thread.Sleep(10);

   
return Task.FromResult(
       
new ParsedFile()
        {
            FileName
= path,
            Dependencies
= GetFileDependencies(path),
        });
}

static void Main(string[] args)
{
   
long numberOfProcessedFiles = 0;
   
ActionBlockstring> actionBlock = null;
   
Funcstring, Task> processFile = async path =>
    {
       
Interlocked.Increment(ref numberOfProcessedFiles);
       
ParsedFile parsedFile = await ParseFileAsync(path);

       
foreach (var dependency in parsedFile.Dependencies)
        {
           
// Need to check that we haven't processed this file yet
            await actionBlock.SendAsync(dependency);
        }

       
if (parsedFile.Dependencies.Count == 0 && actionBlock.InputCount == 0)
        {
           
// This is a marker that this is a last file and there
            // is nothing to process
            actionBlock.Complete();
        }
    };

actionBlock = new ActionBlockstring>(processFile,
       
new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism
= 8
        });




   
actionBlock
.SendAsync("FooBar.ts").GetAwaiter().GetResult();
   
Console.WriteLine("Waiting for an action block to finish...");

    actionBlock
.Completion.GetAwaiter().GetResult();
   
Console.WriteLine($"Done. Processed {numberOfProcessedFiles}");
   
Console.ReadLine();

}

Let’s discuss what is going on here. First, we have a top-level function called ParseTransitiveClosure, that takes a path to a file. Then the function parses the content and calls a function that schedules parsing for all dependencies by using the SendAsync extension method. (Another alternative could be to use the synchronous Post method that adds an item to the queue synchronously and returns false if the operation failed) And when the given file doesn’t have any dependencies and the input queue is empty, our method calls Complete method to notify that data processing is finished.

Main method schedules parsing for the first file and ParseFileAsync fakes a parsing process by doing nothing except computing dependencies using following logic: method ‘foo.ts’ relies on ‘fo.ts’, that relies on ‘f.ts’ etc. Basically, each file depends on the same file with shorter name. This is unlikely to be true in the real world, but it definitely shows the purpose.

ActionBlock provides a good API and handles concurrency for us. The only caveat here is that by default ‘degree of parallelism’ is 1, so you should provide this option when you creating an instance of the ActionBlock with value greater than 1. In this case ActionBlock will call a callback function in parallel to process multiple incoming items at the same time.

Post vs. SendAsync or what to use when

Everyone who tried to solve producer-consumer problem faced a dilemma: what to do if an incoming flow exceeds an ability to process it? How to ‘throttle’ back? Just keep every possible object in memory and grow the queue indefinitely? Throw an exception? Return ‘false’? Block an ‘Enqueue’ method when the queue is full?

To solve this problem, ActionBlock authors decided to use following well-known pattern:

  1. The client of an action block may provide a queue size.
  2. When the queue is full the Post method will return false and SendAsync method will block until the queue will get a free spot.

As you may see in the previous example, current code doesn’t specify any limits to the queue, which means that if the producer will push really hard the application can end up with OutOfMemoryException.

Let’s try to change one line and let’s provide a queue size. And for a sake of this example let’s set it to some ridiculously low number, like … 1.

actionBlock = new ActionBlockstring>(processFile,
   
new ExecutionDataflowBlockOptions() {BoundedCapacity = 1});

Now, if when we’ll run this code, we’ll get … a deadlock!

image

Deadlock

Let’s try to think about this problem a little bit. Suppose you’re building your own custom implementation of the producer-consumer queue that takes a callback for processing elements. First you may provide synchronous API for adding elements, let’s say, method Post. If the queue is full this method may simply return false, leaving the problem of dealing with a full queue on the client. You may also provide an asynchronous API, say, SendAsync, that will “block” the execution until the queue would have a free spot. Of course, because the method is async this operation won’t be blocking per se, but it will give a client an ability to ‘await’ the result of an operation.

So you’ll end up with a following logic inside your queue: call the callback, wait for it to finish and then remove the item from the queue. You may remove the item from the queue before calling the callback but as we’ll see in a moment it won’t change the possibility of getting deadlock.

Within several minutes we have designed an API that ActionBlock is using: method Post that adds an element to the queue synchronously and returns false if block is full or can’t process elements any more (method Complete()). And method SendAsync that returns a task that will be completed only when the item will be added to the queue.

This design is clean and simple but it can cause a deadlock in a very simple situation. Suppose the queue is full and the queue calls back a function to process the element. And instead of quickly processing the element, this callback tries to schedule another items by calling SendAsync.

image

Because the queue is full the queue is waiting for the process function to finish. But the function itself just got stuck because in order to finish the execution it needs to add more elements! Classical deadlock.

Ok. We’re getting a deadlock because an ActionBlock removes elements only *after* the callback is finished. Let’s consider an alternative: what if ActionBlock will remove an item *before* callback is finished?

Actually, deadlock would still be possible. Let’s consider ActionBlock with a bound size of 1 and degree of parallelism of 2.

  • Threads T1 adds an element to a queue. ActionBlock removes an item and calls a callback.
  • Thread T2 adds an element to a queue. ActionBlock removes and item and calls a callback.
  • Thread T1 adds an element to a queue. ActionBlock can’t call a callback, because degree of parallelism is 2. The queue is full.
  • Callback 1 tries to add an item to the queue, but got stuck because the queue is full.
  • Callback 2 tries to add an item to the queue, but got stuck because the queue is full.

image

It means that removing elements before won’t help with our problem. In fact it will make it even worse because the probability of a deadlock would be lower (we need N callbacks that schedules additional work, where N is degree of parallelism). Another drawback of this solution is subtler: ActionBlock is not a generic purpose producer-consumer queue. This class implements ITargetSource and could be used in more complicated dataflows. For instance, you may use one BufferedBlock with more than one action block to process items in parallel. With existing behavior when the action block is full it won’t accept more elements from the source and will leave an ability for other blocks to process the same item immediately instead of waiting for just one action block.

If the item will be removed from the queue before calling the callback the actual ‘bound’ of the queue would be ‘BoundedCapacity’ + ‘MaxDegreeOfParallelism’ which is way harder to reason about.

How to solve the problem?

There are two aspects that contribute to a deadlock problem:

  1. “Awaiting” for SendAsync method to finish when the queue is full.
  2. Logic that checks that all items are processed based on InputCount from the action block.

We can easily address both of them: if the queue is full we can add new item *asynchronously* and we can use our own custom counter to count number of pending items in the queue:

static void Main(string[] args)
{
   
// Holds a number of files in progress.
    long numberOfPendingFiles = 1;
   
long numberOfProcessedFiles = 0;

   
ActionBlockstring> actionBlock = null;
   
Funcstring, Task> processFile = async path =>
    {
       
Interlocked.Decrement(ref numberOfPendingFiles);
       
Interlocked.Increment(ref numberOfProcessedFiles);

       
//Console.WriteLine($"Parsing '{path}'");
        ParsedFile parsedFile = await ParseFileAsync(path);

       
Interlocked.Add(ref numberOfPendingFiles, parsedFile.Dependencies.Count);

       
foreach (var dependency in parsedFile.Dependencies)
        {
           
// Need to check that we haven't processed this file yet

           
// Fast check first: if the queue is not full
            // let's add the item synchronously
            if (!actionBlock.Post(dependency))
            {
               
// The queue is full (or at least, it was full a moment ago)
                // Let's add new work asynchronously to avoid blocking this operation
                Task.Run(() => actionBlock.SendAsync(dependency));
            }
        }

       
if (parsedFile.Dependencies.Count == 0 &&
            Interlocked.Read(ref numberOfPendingFiles) == 0)
        {
           
// This is a marker that this is a last file and
            // there is nothing to process
            actionBlock.Complete();
        }
    };

    actionBlock
= new ActionBlockstring>(processFile,
       
new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity
= 1,
            MaxDegreeOfParallelism
= 8
        });

    actionBlock
.SendAsync("FooBar.ts").GetAwaiter().GetResult();
   
Console.WriteLine("Waiting for an action block to finish...");

    actionBlock
.Completion.GetAwaiter().GetResult();
   
Console.WriteLine($"Done. Processed {numberOfProcessedFiles}");
   
Console.ReadLine();
}

Degree of Parallelism

I intentionally didn’t cover anything related to ‘multithreaded’ or parallel aspect of the ActionBlock, but there is one aspect that you should know if you’re using or will be using any types from TPL Dataflow. Unlike other primitives from TPL all the blocks that control parallel execution are single threaded by default. It means that ActionBlock will process items one-by-one with one thread, TransformBlock will transform items one-by-one with one thread etc. The reason for this behavior is simplicity: it is much easier to reason about queue-like behaviour (first in, first out) when there is no concurrency involved.

To enable parallelism you should provide an instance of ExecutionDataflowBlockOptions in the constructor with MaxDegreeOfParallelism property greater than 0. Btw, setting this property to -1 will enable ‘unbounded’ mode when ActionBlock will create as many tasks as possible and their number would only be limited by a given TaskScheduler that you may also provide at the construction time.

Conclusion

Designing an easy-to-use component is hard. Designing an easy to-use-component that deals with concurrency for you even harder. The best way to use it correctly is to know how it is implemented, and what restrictions the design team had in mind.

ActionBlock is a great type that drastically simplifies most common producer-consumer scenarios. But even in this case, in order to use it correctly, you should know some key aspects of TPL Dataflow, like default degree of parallelism, behavior of bounded blocks and idea of work-items ownership.

Share the post

Dissecting the ActionBlock: a Short Story About a Nasty Deadlock

×

Subscribe to Msdn Blogs | Get The Latest Information, Insights, Announcements, And News From Microsoft Experts And Developers In The Msdn Blogs.

Get updates delivered right to your inbox!

Thank you for your subscription

×