DCSIMG
February 2012 - Posts - Bnaya Eshet

Bnaya Eshet

Disclaimer

February 2012 - Posts

Parallel.ForEach behavior

Parallel.ForEach behavior

this post is a direct continuation for the previous post about "Real-life story: Blocking Collection".

(Real-life story: Blocking Collection).ContinueWith (t => this post);
or
await (Real-life story: Blocking Collection);
this post;

:DEV, SELA, Parallel, Task, Thread, TPL, async, TAP, paralle.foreach, pool, Blocking collection

my colleague Bram Veldhoen has suggest to demonstrate the behavior of the Parallel.ForEach thread's hunger in more pure fashion which doesn't include BlockingCollection<T> or any other high level Enumerable.

the following code demonstrate the issue by using a slow Enumerable:

Code Snippet
  1. private static int _getInt = 0;
  2. public static IEnumerable<int> GetInts()
  3. {
  4.     Thread.Sleep(30 * 5000);
  5.     yield return Interlocked.Increment(ref _getInt);
  6. }
  7.  
  8. private static Timer _tmr;
  9. private static void Main()
  10. {
  11.  
  12.     _tmr = new Timer(state =>
  13.         {
  14.             var p = Process.GetCurrentProcess();
  15.             int workerThreads, ioThreads;
  16.             ThreadPool.GetAvailableThreads(out workerThreads, out ioThreads);
  17.             Console.WriteLine("Thread Count = {0}, Available ThreadPool Threads = {1}",
  18.                 p.Threads.Count, workerThreads);
  19.         }, null, 0, 1000);
  20.     Parallel.ForEach(GetInts(), i => { });
  21.     Console.ReadKey();
  22. }

when running this code we can see the following output:

:DEV, SELA, Parallel, Task, Thread, TPL, async, TAP, paralle.foreach, pool, Blocking collection

we can see that thread count trend is constantly raising while the available ThreadPool's worker threads is declining.

you can read the previous post for solution to this issue.

Technorati Tags: ,,,,
Windows Live Tags: SELA,Parallel,Task,Thread,collection
WordPress Tags: SELA,Parallel,Task,Thread,collection
Blogger Labels: SELA,Parallel,Task,Thread,collection

Real-life story: Blocking Collection

Real-life story: Blocking Collection

this post will discuss a real-life story which uncover none trivial (yet logical) behavior which related to Parallel.ForEach and BlockingCollection<T>.

I will explain why it happens and what how can we handle it right.

Blocking collection, paralle.foreach, tpl, tap, thread, pool, async

it all start when Guy Eden from ITG has found that the following code seem to leak memory:

Code Snippet
  1. private static void Main()
  2. {
  3.     var bc = new BlockingCollection<int>();
  4.     Task.Factory.StartNew(() =>
  5.         Parallel.ForEach(bc.GetConsumingEnumerable(), i => { }));
  6.     Console.ReadKey();
  7. }

it was leaking even those the blocking collection were empty (nothing was adding item to the collection).

while monitoring this code we can see a steep memory curve constantly raising up to the sky.

Blocking collection, paralle.foreach, tpl, tap, thread, pool, async 

I found it very surprising  because it seem quit reasonable scenario and BlockingCollection<T> is a prime member of the new TPL concurrent collection family.

so I have decided to ask Stephen Toub about this behavior and as always Stephen has enlighten me about what is really happening in this scenario.

the following is a snippet from Stephen's response:

"The Parallel.ForEach uses the ThreadPool.  ForEach doesn’t use a fixed number of threads, but instead will use whatever threads the pool will make available to it.  And the ThreadPool has a starvation mechanism, which will introduce additional threads if all threads in the pool are blocked and not making forward progress."

there are couple of options to cope with this issue:

the first is fairly simple which is to use the MaxDegreeOfParallelism as shown in the following snippet:

Code Snippet
  1. var bc = new BlockingCollection<int>();
  2. Task.Factory.StartNew(() =>
  3.     {
  4.         ParallelOptions options = new ParallelOptions
  5.             {
  6.                 MaxDegreeOfParallelism = 30
  7.             };
  8.         Parallel.ForEach(bc.GetConsumingEnumerable(), options, i => { });
  9.     });

but this solution is suffering from inefficient use of threads because we are obviously targeting a long running scenario where it is better not to consume the thread from the ThreadPool.

the other option is to use a custom scheduler (actually this is one of the few scenario where custom scheduler seem right).

the following snippet show a simplify version of the custom scheduler:

Code Snippet
  1. public sealed class LongRunningTaskScheduler:TaskScheduler, IDisposable
  2. {
  3.     private readonly ConcurrentQueue<Task> _requests = new ConcurrentQueue<Task>();
  4.     private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
  5.     private readonly ManualResetEventSlim _sync = new ManualResetEventSlim(false);
  6.     private int _counter = 0;
  7.  
  8.     public LongRunningTaskScheduler(int maxDegreeOfParallism)
  9.     {
  10.         for (int i = 0; i < maxDegreeOfParallism; i++)
  11.         {
  12.             Task.Factory.StartNew(Execute, TaskCreationOptions.LongRunning);
  13.         }
  14.     }
  15.     private void Execute()
  16.     {
  17.         var cancelToken = _cancellation.Token;
  18.         while (!cancelToken.IsCancellationRequested)
  19.         {
  20.             _sync.Wait(cancelToken);
  21.             Task t;
  22.             while(_requests.TryDequeue(out t))
  23.                 base.TryExecuteTask(t);
  24.             _sync.Reset();
  25.         }
  26.     }
  27.     protected override void QueueTask(Task task)
  28.     {
  29.         _requests.Enqueue(task);
  30.         _sync.Set();
  31.         int c = Interlocked.Increment(ref _counter);
  32.         Console.Write("{0}, ", c);
  33.     }
  34.     protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  35.     {
  36.         return false;
  37.     }
  38.     protected override IEnumerable<Task> GetScheduledTasks()
  39.     {
  40.         return _requests.ToArray();
  41.     }
  42.     public void Dispose()
  43.     {
  44.         _cancellation.Cancel();
  45.     }
  46. }

the scheduler is getting maxDegreeOfParallelism as ctor parameter (better name may be poolSize) and construct a pool of long running tasks.

the tasks will be synchronized using ManualResetEventSlim (which accept cancellation token as parameter to the Wait API AutomaticResetEvent does not have a slim version and does not accept cancellation token) (see lines 4,5,20).

the scheduler will execute the tasks on a non ThreadPool thread, therefore won't interfere the ThreadPool heuristics.

the following snippet show how to use this scheduler:

Code Snippet
  1. var bc = new BlockingCollection<int>();
  2. Task.Factory.StartNew(() =>
  3.     {
  4.         ParallelOptions options = new ParallelOptions
  5.         {
  6.             TaskScheduler = new LongRunningTaskScheduler(30),
  7.         };
  8.         Parallel.ForEach(bc.GetConsumingEnumerable(), options, i => { Console.WriteLine("."); });
  9.     }, TaskCreationOptions.LongRunning);
Summary

you should be  aware of the Parallel.ForEach behavior which assume short running actions and try to use whatever threads the pool will make available to it.

in cases that you want to use it for potentially long actions, you better use scheduler that will take off the load to a non thread pool threads or at least use the MaxDegreeOfParallelism.


kick it on DotNetKicks.com

.NET 4.5 and C# 5 is approaching to matureness

.NET 4.5 and C# 5 is approaching to matureness

I have playing with the VS 11 preview for awhile and found it very stable and exciting.

VS11,.NET 4.5, C# 5

now the release date is getting match closer,
the beta will be released (next week) on February 29th and it will come with “Go Live” license.

as for small team and start up there is a very interesting announcement about Team Foundation Server Express Beta which is a great news and will make TFS's practice match more common than it is today.

I'm certainly excited to download the new version as soon as it will become available.

you can read more about it at Jason Zander's blog.

Rx and Time related operators

Rx and Time related operators

this post will focus on Rx's time related operators.

Rx, Reactive, IObservable, IObserver, Delay,  Timeout, Timestamp, TimeInterval,  Generate, Interval, Timer

Rx has lot to do with the time notion.

actually Rx is kind of a time machine where datum does schedule for processing at specific time
(I will talk about scheduling in future post).

within the System.Reactive and System.Reactive.Linq namespaces we can find the following extension method and services.

Window, Buffer and Sample which was spoken here, here and here. all operators has a time based overloads.

Replay and Scheduler which I will discuss on future posts.

DelayTimeout, Timestamp, TimeIntervalGenerate, Interval and Timer which I will present soon.

except of the above API you can find the time notion as overload of many other Rx API (either explicitly or implicitly though the scheduler).

Interval and Timer

Rx come with lot of factories which can construct different IObservable<T> some of them produce a time-based stream.

the Interval factory construct a stream that produces a sequential value (long) after each period.

the following snippet will produce a sequential value each second:

Code Snippet
  1. IObservable<long> source =
  2.     Observable.Interval(TimeSpan.FromSeconds(1))

the Timer factory is more fine tuning version of the Interval factory, which can produce single value a due time or sequence of value which will start to produce sequential values per period that will start after a due time.

the following snippet produce a single value after 1 minutes due time.

Code Snippet
  1. IObservable<long> singleDueTime =
  2.     Observable.Timer(TimeSpan.FromMinutes(1));
  3. singleDueTime.Subscribe(item => Console.WriteLine("Elapsed once"));

the following snippet will produce a sequential value stream per second after due time of 1 minutes.

Code Snippet
  1. IObservable<long> singleDueTime =
  2.     Observable.Timer(
  3.         TimeSpan.FromMinutes(1),
  4.         TimeSpan.FromSeconds(1));
  5. singleDueTime.Subscribe(item => Console.WriteLine("Elapsed"));
Generate

more structured factory can be define by the Generate factory.

the following snippet will produce a formatted string of a sequential numeric values in interval that get more slower and slower.

Code Snippet
  1. var xs = Observable.Generate(
  2.     0, // initial value
  3.     i => i < 10, // condition
  4.     i => i + 1, // iterate
  5.     i => string.Format("** {0} **", i), // selector
  6.     i => TimeSpan.FromSeconds(i)); // time selector
Timestamped

Records the timestamp for each value in an observable sequence.

this is a very common task which we no longer have to reinvent the wheel for.

the operator wrap the original value with a Timestamped<T>.

the following snippet stamp each of the value which produce by the Interval operator.

Code Snippet
  1. IObservable<Timestamped<long>> source =
  2.     Observable.Interval(TimeSpan.FromSeconds(1))
  3.     .Timestamp();
  4. source.Subscribe(item =>
  5.     Console.WriteLine("Time = {0}, Value = {1}",
  6.         item.Timestamp, item.Value));
TimeInterval

the TimeInterval operator records the time interval between consecutive values in an observable sequence.

the following snippet is measuring the duration between the produced values.

Code Snippet
  1. var xs = Observable.Generate(
  2.     0, // initial value
  3.     i => i < 10, // condition
  4.     i => i + 1, // iterate
  5.     i => string.Format("** {0} **", i), // selector
  6.     i => TimeSpan.FromSeconds(i)); // time selector
  7.  
  8. xs.TimeInterval()
  9.     .Subscribe(item => Console.WriteLine("Time = {0}, Value = {1}",
  10.         item.Interval, item.Value));
Timeout

the Timeout operation either propagate the original value or produce an OnError with TimeoutException in case that the duration since the last value was overdue.

the following snippet will produce values as long as the duration between value won't pass the 4 second threshold.

Code Snippet
  1. var xs = Observable.Generate(
  2.     0, // initial value
  3.     i => i < 10, // condition
  4.     i => i + 1, // iterate
  5.     i => string.Format("** {0} **", i), // selector
  6.     i => TimeSpan.FromSeconds(i)); // time selector
  7.  
  8. xs.Timeout(TimeSpan.FromSeconds(4))
  9.     .Subscribe(item => Console.WriteLine(item),
  10.     ex => Console.WriteLine(ex));
Delay

the Delay operator does create a delayed datum stream which will delay the projection of the datum for specific duration.

image

the following snippet will delay the original stream before the value will reach the subscriber.
the value will be stamp twice, before and after the delay.

Code Snippet
  1. var source =
  2.     Observable.Interval(TimeSpan.FromSeconds(1))
  3.     .Timestamp();
  4. var delayed =
  5.     source.Delay(TimeSpan.FromSeconds(1))
  6.         .Timestamp();
  7.  
  8. delayed.Subscribe(item => Console.WriteLine(
  9.     "Datum {0}, Create on {1:ss} and propogate on {2:ss}",
  10.     item.Value.Value, item.Value.Timestamp, item.Timestamp));
Summary

as I said Rx is kind of a time machine, there is other time related operator which I didn't cover in this post.

one of the core concept of Rx is the scheduler which is capable for a very interesting time manipulation, but this is a topic for future post.

what I do want to mention on the scheduler issue is that many of the Rx operation is having an overloads which accept scheduler and it is highly recommended to use those overload when you do use a scheduler.


kick it on DotNetKicks.com Shout it

 


Technorati Tags: ,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,

Windows Live Tags: operators,Reactive,Linq,Buffer,Sample,Replay,Scheduler,Delay,Timeout,Timestamp,TimeInterval,Generate,Interval,Timer,factories,IObservable,factory,snippet,Code,Observable,TimeSpan,FromSeconds,version,sequence,FromMinutes,Subscribe,item,Console,WriteLine,Format,selector,Records,task,operator,Value,duration,OnError,TimeoutException,threshold,projection,Datum,Create,Summary,concept,topic,singleDueTime
WordPress Tags: operators,Reactive,Linq,Buffer,Sample,Replay,Scheduler,Delay,Timeout,Timestamp,TimeInterval,Generate,Interval,Timer,factories,IObservable,factory,snippet,Code,Observable,TimeSpan,FromSeconds,version,sequence,FromMinutes,Subscribe,item,Console,WriteLine,Format,selector,Records,task,operator,Value,duration,OnError,TimeoutException,threshold,projection,Datum,Create,Summary,concept,topic,singleDueTime
Blogger Labels: operators,Reactive,Linq,Buffer,Sample,Replay,Scheduler,Delay,Timeout,Timestamp,TimeInterval,Generate,Interval,Timer,factories,IObservable,factory,snippet,Code,Observable,TimeSpan,FromSeconds,version,sequence,FromMinutes,Subscribe,item,Console,WriteLine,Format,selector,Records,task,operator,Value,duration,OnError,TimeoutException,threshold,projection,Datum,Create,Summary,concept,topic,singleDueTime

Rx - DistinctUntilChanged

Rx - DistinctUntilChanged

this post will focus on the simple yet very useful DistinctUntilChanged operator.

Rx, IObservable, IObserver, Buffer, Parallel, Concurrency, DistinctUntilChanged

sometimes a datum stream may produce the same value for a while, you can see it in stock exchange stream the value of specific stock may be steady for a while.

the observer can reduce its computation level by ignoring a repeatable value (sequential repeatable value, for none sequential you can use the Distinct operator).

the DistinctUntilChanged is having the following overloads:

Code Snippet
  1. IObservable<TSource> DistinctUntilChanged<TSource>();
  2. IObservable<TSource> DistinctUntilChanged<TSource, TKey>(
  3.     Func<TSource, TKey> keySelector);
  4. IObservable<TSource> DistinctUntilChanged<TSource>(
  5.     IEqualityComparer<TSource> comparer);
  6. IObservable<TSource> DistinctUntilChanged<TSource, TKey>(
  7.     Func<TSource, TKey> keySelector,
  8.     IEqualityComparer<TKey> comparer);

I was hiding the fist parameter of the extension method which is: this IObservable<TSource> source.

you can use the first overload when you dealing with a simple datum stream (of primitive or simple value type),
but real-life datum stream are often more complex.
for complex scenario you may prefer one of the other 3 overloads where you can define the comparison.

stock exchange scenario may be looking something like the following snippet:

Code Snippet
  1. public IObservable<Stock> AlertStream(IObservable<Stock> provider)
  2. {
  3.     return from oldAndCurrent in provider
  4.                .DistinctUntilChanged(stock => stock.Price)
  5.                .Buffer(2,1)
  6.            let old = oldAndCurrent[0]
  7.            let current = oldAndCurrent[1]
  8.            where old.Price < current.Price &&
  9.                  old.Volume + FACTOR < current.Volume
  10.            select current;
  11. }

the sample is using a sliding buffer of 2,
you can read more about the Buffer operator in here.

the AlertStream method is getting a stock stream.

it filter steady price out by using the DistinctUntilChanged operator with a lambda that indicate the Price property as the comparison property.

after that it buffer the current value with the previous value (without the distinct the buffer will not ignore steady couples) .

last thing the snippet is doing before it returns a filtered stream is to filter out old / current couples that does not match a criteria.

Summary

as simple as the DistinctUntilChanged is, it is one of useful operator which you should not ignore.


Shout it