Real-life story: Blocking Collection

2012/02/26

Real-life story: Blocking Collection

this post will discuss a real-life story which uncover none trivial (yet logical) behavior which related to Parallel.ForEach and BlockingCollection<T>.

I will explain why it happens and what how can we handle it right.

Blocking collection, paralle.foreach, tpl, tap, thread, pool, async

it all start when Guy Eden from ITG has found that the following code seem to leak memory:

Code Snippet
  1. private static void Main()
  2. {
  3.     var bc = new BlockingCollection<int>();
  4.     Task.Factory.StartNew(() =>
  5.         Parallel.ForEach(bc.GetConsumingEnumerable(), i => { }));
  6.     Console.ReadKey();
  7. }

it was leaking even those the blocking collection were empty (nothing was adding item to the collection).

while monitoring this code we can see a steep memory curve constantly raising up to the sky.

Blocking collection, paralle.foreach, tpl, tap, thread, pool, async 

I found it very surprising  because it seem quit reasonable scenario and BlockingCollection<T> is a prime member of the new TPL concurrent collection family.

so I have decided to ask Stephen Toub about this behavior and as always Stephen has enlighten me about what is really happening in this scenario.

the following is a snippet from Stephen‘s response:

"The Parallel.ForEach uses the ThreadPool.  ForEach doesn’t use a fixed number of threads, but instead will use whatever threads the pool will make available to it.  And the ThreadPool has a starvation mechanism, which will introduce additional threads if all threads in the pool are blocked and not making forward progress."

there are couple of options to cope with this issue:

the first is fairly simple which is to use the MaxDegreeOfParallelism as shown in the following snippet:

Code Snippet
  1. var bc = new BlockingCollection<int>();
  2. Task.Factory.StartNew(() =>
  3.     {
  4.         ParallelOptions options = new ParallelOptions
  5.             {
  6.                 MaxDegreeOfParallelism = 30
  7.             };
  8.         Parallel.ForEach(bc.GetConsumingEnumerable(), options, i => { });
  9.     });

but this solution is suffering from inefficient use of threads because we are obviously targeting a long running scenario where it is better not to consume the thread from the ThreadPool.

the other option is to use a custom scheduler (actually this is one of the few scenario where custom scheduler seem right).

the following snippet show a simplify version of the custom scheduler:

Code Snippet
  1. public sealed class LongRunningTaskScheduler:TaskScheduler, IDisposable
  2. {
  3.     private readonly ConcurrentQueue<Task> _requests = new ConcurrentQueue<Task>();
  4.     private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
  5.     private readonly ManualResetEventSlim _sync = new ManualResetEventSlim(false);
  6.     private int _counter = 0;
  7.  
  8.     public LongRunningTaskScheduler(int maxDegreeOfParallism)
  9.     {
  10.         for (int i = 0; i < maxDegreeOfParallism; i++)
  11.         {
  12.             Task.Factory.StartNew(Execute, TaskCreationOptions.LongRunning);
  13.         }
  14.     }
  15.     private void Execute()
  16.     {
  17.         var cancelToken = _cancellation.Token;
  18.         while (!cancelToken.IsCancellationRequested)
  19.         {
  20.             _sync.Wait(cancelToken);
  21.             Task t;
  22.             while(_requests.TryDequeue(out t))
  23.                 base.TryExecuteTask(t);
  24.             _sync.Reset();
  25.         }
  26.     }
  27.     protected override void QueueTask(Task task)
  28.     {
  29.         _requests.Enqueue(task);
  30.         _sync.Set();
  31.         int c = Interlocked.Increment(ref _counter);
  32.         Console.Write("{0}, ", c);
  33.     }
  34.     protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  35.     {
  36.         return false;
  37.     }
  38.     protected override IEnumerable<Task> GetScheduledTasks()
  39.     {
  40.         return _requests.ToArray();
  41.     }
  42.     public void Dispose()
  43.     {
  44.         _cancellation.Cancel();
  45.     }
  46. }

the scheduler is getting maxDegreeOfParallelism as ctor parameter (better name may be poolSize) and construct a pool of long running tasks.

the tasks will be synchronized using ManualResetEventSlim (which accept cancellation token as parameter to the Wait API AutomaticResetEvent does not have a slim version and does not accept cancellation token) (see lines 4,5,20).

the scheduler will execute the tasks on a non ThreadPool thread, therefore won’t interfere the ThreadPool heuristics.

the following snippet show how to use this scheduler:

Code Snippet
  1. var bc = new BlockingCollection<int>();
  2. Task.Factory.StartNew(() =>
  3.     {
  4.         ParallelOptions options = new ParallelOptions
  5.         {
  6.             TaskScheduler = new LongRunningTaskScheduler(30),
  7.         };
  8.         Parallel.ForEach(bc.GetConsumingEnumerable(), options, i => { Console.WriteLine("."); });
  9.     }, TaskCreationOptions.LongRunning);

Summary

you should be  aware of the Parallel.ForEach behavior which assume short running actions and try to use whatever threads the pool will make available to it.

in cases that you want to use it for potentially long actions, you better use scheduler that will take off the load to a non thread pool threads or at least use the MaxDegreeOfParallelism.

kick it on DotNetKicks.com

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>