December 2011 - Posts
Rx - Window
continuing with the Rx series, this post will discuss the Window operator.

in previous post I was discussing the Buffer operator which enable buffering of Rx datum stream into chunks.
has good and useful as the Buffer operator is, it doesn't nail up every single scenario.
let consider a scenario of tracing the highest and lower value within a time period. for example hourly tracking of a service monitoring which produce values every second.
technically we can use the Buffer operator for handling this scenario. the problem is that buffering a hour of data for each of our services will end up with 3660 long living items for each of the services.
this will lead us to:
- item which survive the GC Gen 0,1 collection.
- large object heap allocation in case large chunks.
the point is that in this scenario buffering the data is doesn't needed at all, because calculating the highest and lower value can be done on the fly by comparing the current value against the previous one.
The right operator for this scenario
so we need some kind of window (either of a time period, fix item count or the combination of the 2) which will project value within it scope without buffering.
Rx does have such operator which is the Window operator.
in contrast to the Buffer operator which return IObservable<IList<T>> the Window operator return IObservable<IObservable<T>>.
at first glance it seem rather odd signature, but I will right about to show how powerful is this concept and how can you take advantage of it.
but before we will get into the implementation details we rather take a look on the differences between the Buffer and Window operators.
the Buffer marble diagram:

the buffer accumulate the item internally until the end of its buffering period and then project the accumulated values as IList<T>.
the Window marble diagram is:

unlike the Buffer, which is caching the item internally, the Window does not cache the items at all, each item is immediately project through IObservable<T>.OnNext.
Window support sliding window and the custom periods API, just like the Buffer operator do (read more on that at the Buffer operator post).
sliding Window will share the same item in multiple windows:

Better memory utilization
back to our initial goal, the following code is using an Aggregate operator over a Window and calculate the min / max value for each period.
I will use the following class of aggregation
Code Snippet
- class MinMaxItem
- {
- public long? Min { get; private set; }
- public long? Max { get; private set; }
-
- public static MinMaxItem Calc(MinMaxItem instance, long value)
- {
- if (!instance.Min.HasValue)
- {
- instance.Min = value;
- instance.Max = value;
- }
- else
- {
- instance.Min = Math.Min(instance.Min.Value, value);
- instance.Max = Math.Max(instance.Max.Value, value);
- }
- return instance;
- }
- }
and the following code demonstrate the plumbing:
Code Snippet
- var xs = Observable.Interval(TimeSpan.FromMilliseconds(0.1));
-
- IObservable<MinMaxItem> minMax =
- from win in xs.Window(10)
- from item in win.Aggregate(
- new MinMaxItem(),
- MinMaxItem.Calc)
- select item;
-
- minMax.Subscribe (item => Console.WriteLine(
- "Min = {0} /tMax = {1}",
- item.Min.Value, item.Max.Value));
the above code snippet is using the from keyword twice (line 4 and 5) which is in fact using the SelectMany operator. it is used to extract the inner IObservable<T> out of the IObservable<IObservable<T>>.
other alternative is to use the Switch operator, which also extract the inner IObservable<T> out of Coverable<IObservable<T>>.
in this case the code will look like:
Code Snippet
- IObservable<IObservable<MinMaxItem>> minMax =
- from win in xs.Window(10)
- select win.Aggregate(
- new MinMaxItem(),
- MinMaxItem.Calc);
-
- minMax.Switch().Subscribe(item => Console.WriteLine(
- "Min = {0} /tMax = {1}",
- item.Min.Value, item.Max.Value));
notice the Switch operator at line 7.
Summary
I have shown the Window operator and it's capabilities.
because IObservable<IObservable<T>> is quit a complex API, Window is usually come with other operator like Aggregate, SelectMany or other operator which I will discuss on latter posts.
the Switch operator is very handy when you want to flatten the Window output (it is simply merge the inner observable streams into single result).
an alternative to Switch is the SelectMany operator which can handle each of the Window streams separately (it can be used directly or by the nested from syntax).
you can read more on Window memory benefit on James Miles's post about using Rx Window in stock exchange scenario.
and be aware of some parallelism issues involve with Window, Aggregate and SelectMany which you can read more about in this thread of the Rx forum which discuss some of the pitfall, API request and alternative API for the window aggregation scenario.
finally you should use the operator that is most appropriate for your scenario, for example the Buffer operator is great for balance IO operations, and the Window operator is great to reduce memory pressure upon aggregation.
the concept of async \ await
in this post I will survey the new .NET 4.5 / C# 5 concept of async / await.

I will focus on how to understand what is really happens behind the new async / await syntax.
What's it all about?
the new async / await syntax is using the C# syntactic compiler to generate async operation from code that is looking very much like a synchronous code.
but before we start we should discus the new C# 5 syntax.
the syntax include 2 keywords:
- async - which is only a marker for async method.
- await - indicate a callback boundary.
Code Snippet
- static async Task Execute()
- {
- Console.WriteLine(run on calling thread);
-
- await Task.Factory.StartNew(() => Thread.Sleep(1000));
-
- Console.WriteLine(run on callback thread);
- }
so how should we understand what was written in the above code?
actually it is a different way to represent a continuation (you can read more about the continuation concept in here).
the above code is somewhat identical to the following TPL 4 code:
Code Snippet
- static Task Execute()
- {
- Console.WriteLine(run on calling thread);
- Task t = Task.Factory.StartNew(() => Thread.Sleep(1000));
- return t.ContinueWith (tsk =>
- {
- Console.WriteLine(run on callback thread);
- });
- }
the syntactic compiler will translate the code below the await keyword into continuation state machine, which is logically (not technically) identical to the above code.
Point of interest:
you may have been notice that the async method return a Task even though there is no return within the method block.
surveying the TPL 4 code snippet we can understand that the async method will actually return to the caller immediately after the Task.Factory.StartNew start the task and the rest of the code is actually a continuation callback.
what we got back from the async method is a task which represent the async part of the method.
async / await with return value
async / await can represent a continuation of a callback that accept async result.
Code Snippet
- static async Task<DateTime> Execute()
- {
- DateTime result = await Task.Factory.StartNew(() => DateTime.Now );
-
- return result.AddDays(1);
- }
the above code will logically translate to:
Code Snippet
- static Task<DateTime> Execute()
- {
- Task<DateTime> t = Task.Factory.StartNew(() => DateTime.Now);
- return t.ContinueWith (tsk =>
- {
- return tsk.Result.AddDays(1);
- });
- }
you may have notice that the return value (on the left side of the await) was unwrapped (DateTime instead of Task<DateTime>)
Which thread is running?

normally when the method doesn't invoke from the UI thread, everything before the await line will run synchronously on the caller thread.
the Task.Run naturally will be schedule on a different thread and everything under the await will be schedule on different thread then the caller thread, it may be the same thread of the Task.Run or any other ThreadPool thread (when there is only single continuation it will probably be the same thread as Task.Run)
Async and UI
whenever the async method invocation is coming from UI thread (or to be more precise from thread under synchronization context) the continuation return back to the synchronization context thread.
this is quit similar to the following TPL code (.NET 4):
Code Snippet
- TaskScheduler scheduler = TaskScheduler.FromCurrentSynchronizationContext();
- Task t = Task.Factory.StartNew(() => Trace.WriteLine("in parallel"));
- t.ContinueWith(tsk => Trace.WriteLine("UI thread"), scheduler);
or to more legacy code which is using the synchronization context directly:
Code Snippet
- Action a = () => Trace.WriteLine("in parallel");
- SynchronizationContext sc = SynchronizationContext.Current;
- a.BeginInvoke(ar =>
- {
- sc.Post(state => Trace.WriteLine("UI thread"), null);
- }, null);
async / await is aware of the synchronization context of the caller and if any it schedule the await callback on this context.

Summary
the syntactic compiler translate the async / await syntax into state machine which handle the continuation flow after parallel operation.
there is much more for that and I will discuss it in future posts.
you can see it performance characteristic on this post.
TPL - Continuation
this post will discuss TPL Continuation.

TPL continuation can chain task into a pipeline.
when dealing with dependencies between parallel work units, like [encoding -> compression -> encryption], continuation is the API for scheduling work unit upon completion of other work unit.
the general idea is quit similar to the old APM pattern (BeginXxx, EndXxx) callback.
basic completion
the syntax of continuation:
Code Snippet
- Task tsk = Task.Factory.StartNew(() => {/* do somethng*/});
- tsk.ContinueWith(t => {/* continue when something complete*/});
continuation API is fairly straight-forward:
we can define a continuation action upon task completion (the t in the lambda represent the completed task).
completion of Task<T>
continuation does also support Task<T> this way we can handle a task result upon the task's completion.
the following sample show the concept of the [encoding -> encryption -> send] pipeline:
Code Snippet
- Task<byte[]> tskEncoding = Task.Factory.StartNew(() =>
- {
- return Encoding.UTF8.GetBytes(data);
- });
- Task<Byte[]> tskEncrypt = tskEncoding.ContinueWith(t =>
- {
- return Encrypt(t.Result);
- });
- Task tskSend = tskEncoding.ContinueWith(t =>
- {
- Send(t.Result);
- });
the first task (line 1-4) return (async) encoding data.
the first continuation task (line 5-8) get the encoded data from the result of the completed task and encrypt it, the encrypted data return as the task's result.
the second continuation (line 9-12) will be schedule on the completion of the encryption task.
multiple completions
completion API does not limit to single completion per a task. the completion represent a callback and we can set as many callback as we need for any Task (or Task<T>).
Code Snippet
- Task tsk = Task.Factory.StartNew(() => {/* do somethng*/});
- tsk.ContinueWith(t => {/* callback 1 */});
- tsk.ContinueWith(t => {/* callback 2 */});
Continue when all/any
we can also set continuation which will be trigger upon the completion of multiple tasks.
Code Snippet
- Task tsk1 = Task.Factory.StartNew(() => {/* do somethng */});
- Task tsk2 = Task.Factory.StartNew(() => {/* do somethng else */});
- Task[] tsks = new Task[] { tsk1, tsk2 };
- Task.Factory.ContinueWhenAll(tsks, tskArr => {/* callback 1 */});
or continue on the completion of the first among multiple tasks.
Code Snippet
- Task tsk1 = Task.Factory.StartNew(() => {/* do somethng */});
- Task tsk2 = Task.Factory.StartNew(() => {/* do somethng else */});
- Task[] tsks = new Task[] { tsk1, tsk2 };
- Task.Factory.ContinueWhenAny(tsks, firstTask => {/* callback 1 */});
Parent / Child
as you may know TPL support a parent / child execution model,
when you start a task within an executing task scope you can set the task behavior to accept the parent / child paradigm.
the TPL infrastructure does aware when a task is having children and behave accordantly (wait will wait for the completion of all the task's children, cancelling a parent task will affect all of its children, the debug parallel tasks window can present the task's hierarchic).
Code Snippet
- Task.Factory.StartNew(() =>
- {
- Task.Factory.StartNew(() =>
- {
- // ...
- },TaskCreationOptions.AttachedToParent);
- // ...
- });
Parent / Child and continuation
when it come to continuation the continuation callback will occurs only after the completion of all the task's children.
Code Snippet
- var t = Task.Factory.StartNew(() =>
- {
- Task t1 = Task.Factory.StartNew(() =>
- {
- Thread.Sleep(1000);
- Console.WriteLine("child1");
- }, TaskCreationOptions.AttachedToParent);
- });
- t.ContinueWith(tsk => Console.WriteLine("Complete !!!"));
the above code demonstrate a simple continuation upon parent child task.
the completion will occurs when after the completion of t1.
Parent / child with nested continuation
let take another scenario when both the parent and the child task is having a continuation.
Code Snippet
- var t = Task.Factory.StartNew(() =>
- {
- Task t1 = Task.Factory.StartNew(() =>
- {
- Thread.Sleep(1000);
- Console.WriteLine("child1");
- }, TaskCreationOptions.AttachedToParent);
- t1.ContinueWith(tsk =>
- {
- Thread.Sleep(1000);
- Console.WriteLine("child continuation");
- });
- });
-
- t.ContinueWith(tsk => Console.WriteLine("parent continuation"));
let think of the above code. will the parent continuation complete before or after the child continuation?
the answer is: the parent continuation will ignore the child continuation and complete first.
the parent continuation will be aware of the child continuation only if we mark the child continuation with TaskContinuationOptions.AttachedToParent.
Code Snippet
- var t = Task.Factory.StartNew(() =>
- {
- Task t1 = Task.Factory.StartNew(() =>
- {
- Thread.Sleep(1000);
- Console.WriteLine("child1");
- }, TaskCreationOptions.AttachedToParent);
- t1.ContinueWith(tsk =>
- {
- Thread.Sleep(1000);
- Console.WriteLine("child continuation");
- }, TaskContinuationOptions.AttachedToParent);
- });
-
- t.ContinueWith(tsk => Console.WriteLine("parent continuation"));
now the parent continuation will complete after the completion of the child's task continuation.
Conditional continuation
till now we have seen many of the continuation scenarios. the last scenario which I want to present is the cool ability of tuning the continuation to occurs only when the execution status end with specific condition.
you can set the continuation to occur only on success, failure or cancellation.
Code Snippet
- var cancellation = new CancellationTokenSource();
- Task t = Task.Factory.StartNew(() =>
- {
- if (Environment.TickCount % 2 == 0)
- throw new Exception();
- else
- Console.WriteLine("pass");
- }, cancellation.Token);
-
- t.ContinueWith(tsk => Console.WriteLine("OK"),
- TaskContinuationOptions.OnlyOnRanToCompletion);
- t.ContinueWith(tsk => Console.WriteLine("Cancelled"),
- TaskContinuationOptions.OnlyOnCanceled);
- t.ContinueWith(tsk => Console.WriteLine("Failed"),
- TaskContinuationOptions.OnlyOnFaulted);
the TaskContinuationOptions is a bitwise so you can specify multiple option upon single continuation.
Summary
continuation is one of the most powerful feature of the new TPL infrastructure.
it is having more feature and it simpler to use than the old APM pattern.
using the continuation pattern we can manage complex parallelism with regard of dependencies.
finally ,as I will describe in latter past, the new async feature (of .NET 4.5 / C#5) is all about the continuation concept.
RX - SP1
Rx release is having it first service pack.

The Service Pack release doesn't include any new API-level functionality and fixes a few minor bugs (all of which were already fixed in the Experimental Releases in the v1.1 band):
- Scheduler.TaskPool now guarantees the use of the task pool. See this forum post for more info.
- SkipUntil now propagates errors of the source sequence, even when the "until" sequence hasn't fired yet.
- ToQbservable now accepts an IScheduler parameter, mirroring its ToObservable brother.
- Take(0) is now supported, resulting in an overload that accepts an IScheduler to produce the OnCompleted message.
In addition to those fixes, this (supported) release includes support for Silverlight 5 and Windows Phone 7.5, so you'll find the Rx assemblies in the Add New Reference dialog for those project types.
When using the MSI installer, you'll notice the installer performs an in-place update of any existing Rx SDK v1.0.10621 installation you may have on your machine. If you don't have the v1.0 SDK installed yet, you can simply use the new MSI to perform a clean install as well.
Assembly version numbers (used by the CLR) continue to be 1.0.10621.0, hence you don't need to recompile applications that use Rx v1.0 but you can simply service the Rx binaries. The file version number (used by installers to upgrade files) of the assemblies has bumped to 1.0.11221.5 (reflecting the build date, i.e. December 21st, precisely six months after the initial release). Also, the version number of the MSI package and NuGet packages will reflect the 11221 build number.
Rx - Buffer
this post is on of a series of post about Rx (Reactive Extension). in this one I will discuss the Buffer operator.
no doubt that one of the most useful Rx operator is the Buffer.

the Buffer operator enable to reduce a throughput pressure and gain better utilization of our resources.
let take a scenario of monitoring data stream and persist the datum into database (or send it through a network boundaries).
assuming the datum rate is 1 per millisecond, databases does not typically design to work well for round-trips of such frequency,
but if we can buffer a chunk of datum each second (or more) we can save those chunk in much lower frequency (maybe by using bulk insert).
this is how we can gain better utilization of our system.
this is exactly what the Buffer operator does.
it can create chunk of data from an observable either by time or by count (or even by the combination of both).
it present those chunk as observable of IList<T> which mean that if we are dealing with high frequency Observable<T> we can transform it to less frequently Observable<IList<T>>.
the Buffer operator is very simple to use and very flexible in term of the batch size.
Buffer API
the following example demonstrates reducing pressure of 1 millisecond frequency observable:
Code Snippet
- var xs = Observable.Interval(TimeSpan.FromMilliseconds(1));
- var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(1));
- bufferdStream.Subscribe(item => {/* do bulk insert */});
the same can be done with buffering for every n items:
Code Snippet
- var xs = Observable.Interval(TimeSpan.FromMilliseconds(1));
- var bufferdStream = xs.Buffer(1000);
- bufferdStream.Subscribe(item => {/* do bulk insert */});
there is even API for the combination of both, which mean that the buffer will be close, either after n item or elapsed of a duration:
Code Snippet
- var xs = Observable.Interval(TimeSpan.FromMilliseconds(1));
- var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(1), 1000);
- bufferdStream.Subscribe(item => {/* do bulk insert */});
the Buffer operator can be overlapped, which mean that more than one buffer can coexist at a time (datum will be capture by multiple buffers)
Code Snippet
- var xs = Observable.Interval(TimeSpan.FromMilliseconds(1));
- // buffer window of 1 second will be open every 0.1 second
- var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(1),
- TimeSpan.FromSeconds(0.1));
- bufferdStream.Subscribe(item => {/* do bulk insert */});
the marble diagram of it is:

you can see that each buffer hold 4 datum and the same datum can be include in multiple buffers (IList<T>).
Advance Buffer API
actually you can gain even better control on the opening and closing of a buffering windows.
the buffer window design to accept an observable as the opening trigger of the buffering window and a corresponding observable factory for signaling the window's closing trigger.
even though this API look a bit odd it is very powerful.
with this API you can activate buffer as response for external situation. let think of a buffering stream which should buffer cars engine performance within separate geographical regions. we can start buffering each time the car GPS indicate a region border and close the buffer window on exiting the region.
real life scenario often need this kind of granularity.
the API for this feature goes like this:
Code Snippet
- var regionBorderStream = Observable.Create<Unit>(obs =>
- {
- // read and analize gps data
- return Disposable.Empty;
- });
- var carEngineStream = Observable.Interval(TimeSpan.FromMilliseconds(1));
- var bufferdStream = carEngineStream.Buffer(regionBorderStream,
- region => regionBorderStream.Where(item => item == region));
- bufferdStream.Subscribe(item => {/* do bulk insert */});
regionBorderStream (at line 1) represent a stream of region passing notification.
carEngineStream (at line 6) represent a car engine information stream.
and the buffered stream (at line 7) is buffering the engine stream while open a buffer each time it enter a region, which mean whenever the regionBorderStream produce a value. the buffer will be close when we will exit the region.
you may have notice that the above code does support multiple buffering at a time. you may enter the city region and then enter a sub region (for example New York and the Central Park or Beijing and the Forbidden City area).
Summary
Buffering is very powerful scenario.
I will survey other operator on future post and we will discuss the advantage and disadvantage of the Buffer operator in compare with some of the other operators.
Tpl Dataflow (IDataflowBlock ) - Part 5
the previous post discus the concept ITargetBlock and ISourceBlock,
which is the TDF consumer/Producer contract.
you can find all the post in this series under the TDF tag.
this post focus on the IDataflowBlock contract which is the life-time management contract for all data-flow's blocks.

the IDataflowBlock define single property and 2 methods:
Code Snippet
- public interface IDataflowBlock
- {
- Task Completion { get; }
-
- void Complete();
- void Fault(Exception exception);
- }
ending the processing of Dataflow block is done either by calling Complete() or by Fault() in cases that the data flow should exit into fault state.
in case of Complete the block will finish the processing of all messages that already in it's inner buffer and decline (DecliningPermanently) any incoming messages.
Fault put the block into faulty state and it does not schedule any messages from its inner buffer.
the Completion property is using the TAP (task asynchrony pattern) concept as it API for monitoring the block state.
the completion property return a Task which enable either waiting (Wait) on, continuation (ContinueWith) or await.
the block will set the task completion when it will complete the processing of all the messages within it inner buffer (or when finishing current executing messages in case of faulty state).
because the block is using the task semantic for it's easy to handle block exceptions.
the following code demonstrate the management of ActionBlock lifetime.
Code Snippet
- var ab = new ActionBlock<int>(i =>
- {
- Thread.Sleep(1000);
- Console.WriteLine(i);
- });
-
- ab.Completion.ContinueWith(t =>
- {
- if (t.Status == TaskStatus.Faulted)
- Console.WriteLine("Failiur: {0}", t.Exception);
- else
- Console.WriteLine("Complete");
- });
-
- Console.WriteLine("Proccesing");
-
- ab.Post(1);
- ab.Post(2);
- ab.Post(3);
-
- if (Console.ReadKey().KeyChar == 'f')
- (ab as IDataflowBlock).Fault(new Exception("wrong tick"));
- else
- ab.Complete();
at line 7 we have set a continuation callback which check the completion status and writing the completion information.
point of interest
as we discuss in the previous post the dataflow blocks does have internal task which is responsible for its execution (by default it is one task per block but it can be throttle to work with multiple tasks) this task does release when ever the block become idle (when the block will become active again it will construct new task).
it is important to understand that the completion task is a semantic task and not one of the worker tasks.
in case that you want to create you're own custom block you can use the TaskCompletionSource<T> in order to present the TAP (task asynchrony pattern) semantics.
you can learn more on the TAP concept in here.
Summary
every dataflow block implement the IDataflowBlock contract which enable to control and monitor the block life-time.
Task != Thread
whenever I teaching the Tpl Task subject I continually repeating the mantra which say that "task is a metadata/context of execution and it does not really responsible for the actual execution".
Task is a data structure which hold information about code execution, it's hold the delegate which will be execute, status, state, result, exception synchronization object, ext...
but the responsibility of the execution is actually belong to the Task Scheduler.
in matter of fact task can be execute synchronously.
Code Snippet
- Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
- Task t = new Task(() =>
- Console.WriteLine(Thread.CurrentThread.ManagedThreadId));
- t.RunSynchronously();
by default it is execute on a thread pool worker thread but it can be execute on non thread pool thread (using the TaskCreationOption.LongRunning overload).
Code Snippet
- Task.Factory.StartNew(() =>
- Console.WriteLine(Thread.CurrentThread.IsThreadPoolThread));
- Task.Factory.StartNew(() =>
- Console.WriteLine(Thread.CurrentThread.IsThreadPoolThread),
- TaskCreationOptions.LongRunning);
it can be benefit from IOCP (IO Completion Port) in order to avoid thread pool starvation (when performing IO operations).
IO operation are not CPU bounded operation, those operation performed by the hard disk controller, the network card, ext...
we can apply task semantic to IO operation which is using the APM pattern by using Task.Factory.FromAsync
Code Snippet
- //Task<int> returns the number of bytes read
- Task<int> taskAsyncRead = Task<int>.Factory.FromAsync(
- file.BeginRead, // begin invoke delegate
- file.EndRead, // end invoke delegate
- buffer, // read buffer
- 0, // start index
- buffer.Length, // length
- null); // optional state
this lead us to the most general abstraction of the task semantic which is TaskCompletionSource<T>.
TaskCompletionSource<T> is a class that help us to apply TAP (Task Asynchrony Pattern) semantic.
in general we can use this class to apply a TAP semantic for any operation.
for example the following code apply a TAP semantics for WebClient download:
Code Snippet
- private static Task<string> DownloadAsync(string address)
- {
- var uri = new Uri(address);
-
- // present TAP semantics
- var tcs = new TaskCompletionSource<string>();
-
- var proxy = new WebClient();
- // handling the download async result
- proxy.DownloadStringCompleted += (s, e) =>
- {
- if (e.Cancelled)
- tcs.SetCanceled();
- else if (e.Error != null)
- tcs.SetException(e.Error);
- else
- tcs.SetResult(e.Result);
-
- proxy.Dispose();
- };
- // start downloading a-sync
- proxy.DownloadStringAsync(uri);
-
- return tcs.Task; // does not wait for completion
- }
the TaskCompletionSource<T> has the following methods:
- SetCanceled: apply cancellation semantic.
- SetException: apply fault semantic.
- SetResult: apply completion semantic.
and it have a Task property which encapsulate the TAP semantics and can be hand back to the caller.
IOCP
because WebClient async operation is using the IOCP by wrapping it with TAP semantic we gain a task which run over the IOCP.
Can we take it farther?
suppose we want to use a Task which is trigger by FileSystemWatcher. it is actually very feasible and easy when we are using the TaskCompletionSource<T>.
the following extension method returns task which will trigger when the user will delete text file.
Code Snippet
- public static Task<string> ToTask(
- this FileSystemWatcher instance, Action action)
- {
- var tcs = new TaskCompletionSource<string>();
- instance.EnableRaisingEvents = true;
- instance.Filter = "*.txt";
- instance.Deleted += (s, e) =>
- {
- action();
- tcs.SetResult(e.FullPath);
- };
-
- return tcs.Task;
- }
then it can be call from anywhere:
Code Snippet
- static void Main(string[] args)
- {
- bool completed = false;
-
- var fsw = new FileSystemWatcher(".");
- Task<string> t = fsw.ToTask(() =>
- Console.WriteLine("Executing"));
- t.ContinueWith(tsk =>
- {
- completed = true;
- Console.WriteLine();
- Console.WriteLine(tsk.Result);
- });
-
- while (!completed)
- {
- Console.Write(".");
- Thread.Sleep(100);
- }
- }
Summary
Task != Thread, by default Task will be schedule on a thread pool thread but that is the responsibility of the task scheduler.
the TAP abstraction does enable to apply its semantic to almost any operation.
in future post I will show how to apply the FileSystemWatcher to the async/await pattern
Tpl Dataflow (block structure) - Part 4
this is the 4th post in the Tpl dataflow series, you can see other post in this series at the TDF tag.
this post will discuss a general implementation of built-in Tpl dataflow block.
in general built-in blocks fall into 3 categories
- Pure buffering blocks:
present different strategy of buffering and distribution. - Execution blocks:
manipulate the incoming messages. - Grouping blocks:
deal with different strategies for combine messages from multiple sources.
as a test case I will discuss the most basic target block called ActionBlock<T>.
the action block is an execution block which enables the execution of a delegate to perform some action for the datum of each input message.
the following code demonstrate basic action block:
Code Snippet
- var ab = new ActionBlock<int>(i =>
- {
- /* Do some processing*/
- EventLog.WriteEntry("App", i.ToString());
- });
-
- ab.Post(1);
- ab.Post(2);
- ab.Post(3);
all the built-in blocks construct from one or more internal buffer for the input message and one or more tasks (by default single task) which process single message each time.
action block internal is built form a single internal buffer and a task (by default single task) which will fetch single a message each time and invoke the delegate with the message datum.

it is important to understand that the dataflow blocks release the internal task whenever the internal buffer become empty. then whenever new message reach the internal buffer the block create a new task.
as I said in previous posts Tpl dataflow were design to deal with immense throughput and it try to avoid over subscription.
both the idea of using buffering instead of creating task per message and releasing the task whenever it doesn't needed anymore lead to better usage of the CPU and memory resources.
Summary
action block is a very simple one. the focus of this post was to present some of the ideas behind the built-in block construction in general.
it is essential to understand the idea of single task (by default) over internal buffering, when you start interacting with the blocks. the behavior of the entire blocks ecosystem (when we chain blocks into more complex flow) will be depend on how each block buffering and execution were configured.
I will survey different configuration option on latter posts.
in the next post I will speak on the block life-time management.
What is the cost of async/await?
.NET 4.5 (C# 5) had brought the new async/await syntax.
I will cover async/await syntax in more details in future post, but in the meanwhile in case that you are not yet familiar with this syntax, what's you should have to know is that the syntactic compiler transform that syntax (and the lines that follow the await keyword) into IL, the generated IL is following the concept of the TPL ContinueWith.
the syntactic compiler is actually generating fair amount of IL which represent a state-machine of the async execution.

we do gain lot of benefit from the new async/await syntax (in terms of readability and more) but as I said I will discuss this aspect on latter posts.
the questions which this post is trying to answer is:
- How match do we pay for it?
- Does the compiler is smart enough to minimize the overhead involve with this feature?
the answer for the question is: not only that the compiler is smart enough to mitigate the cost,
it is actually (in most cases) doing better job compare with direct using of ContinueWith.
Benchmark:
I have extend Joseph E. Hoag's benchmark, and I have find consistence improvement of the async/await over the ContinueWith, both in terms of performance and memory allocation.
the benchmarks is targeting both:
real task execution:
Code Snippet
- private static Task AsyncMethod()
- {
- return Task.Factory.StartNew(() => { });
- }
and conceptual task execution:
Code Snippet
- private static Task ConceptualAsyncMethod()
- {
- var t = new TaskCompletionSource<object>();
- t.SetResult(null);
- return t.Task;
- }
and I was testing the following scenarios:
1. Continuation/Unwarp on both real and conceptual task.
Code Snippet
- private static Task DoContinuations(int ntasks)
- {
- Task curr = AsyncMethod();
- for (int i = 1; i < ntasks; i++)
- curr = curr.ContinueWith(_ => AsyncMethod()).Unwrap(); return curr;
- }
Code Snippet
- private static Task DoContinuationsConceptual(int ntasks)
- {
- Task curr = ConceptualAsyncMethod();
- for (int i = 1; i < ntasks; i++)
- curr = curr.ContinueWith(_ => ConceptualAsyncMethod()).Unwrap(); return curr;
- }
2. await against real and conceptual.
Code Snippet
- private static async Task DoAwaits(int ntasks)
- {
- for (int i = 0; i < ntasks; i++)
- await AsyncMethod();
- }
Code Snippet
- private static async Task DoAwaitsConceptual(int ntasks)
- {
- for (int i = 0; i < ntasks; i++)
- await ConceptualAsyncMethod();
- }
3. then I was testing the Task.FromResult for ContinueWith and await.
Code Snippet
-
- private static Task DoContinuationsFromResult(int ntasks)
- {
- Task curr = Task.FromResult(42);
- for (int i = 1; i < ntasks; i++)
- curr = curr.ContinueWith(t => Task.FromResult(42));
- return curr;
- }
Code Snippet
- private static async Task DoAwaitsFromResult(int ntasks)
- {
- for (int i = 0; i < ntasks; i++)
- await Task.FromResult(42);
- }
4. to make sure that it isn't the Unwarp thing which is causing the overhead I added the following benchmark.h
Code Snippet
- private static Task DoContinuationsConceptualWithoutUnwrap(int ntasks)
- {
- Task curr = ConceptualAsyncMethod();
- for (int i = 1; i < ntasks; i++)
- curr = curr.ContinueWith(t => {/* empty */ });
- return curr;
- }
all result come in favor of the async/await syntax,
while everything was running under .NET 4.5.
the execution result for 100000 iterations, under virtual machine with 2 cores and 2G of RAM were:

Summary
not only async\await doesn't come with overhead, it is actually perform much better in terms of speed and memory allocation.
It seem that we have benefit both from simplicity and efficiency.
Is it faster?
does the .NET 4.5 really run faster than 4?

this post will summaries TPL Performance Improvements in .NET 4.5.
the TPL team has put lot of effort to dramatically improve the overall performance of .NET 4.5.
the improvement was achieve both by execution and memory allocation optimization.
in result .NET 4.5 parallelism is faster and more GC friendly.
allocation optimization does improve the overall execution speedup, because GC collection does have significantly impact on the overall performance (thread freezing and more).
the benchmarks in this post were execute against 100000 tasks with 25 iterations.
Continuation:
.NET 4.5 acknowledged the continuation-style of programming to be a mainline scenario (async/await support). lot of effort were put into making continuations faster.
using the magical Interlocked.CompareExchange the continuation footprint were reduce while the task is having single continuation (there will be no continuation list assignment in this case), but even when having more than a single continuation .NET 4.5 is still faster than in .NET 4.
the following benchmark show single continuation assignment:
performance (speedup):

memory allocation:

the following benchmark show multiple continuation assignment:
performance (speedup):

memory allocation:

Restructuring Task:
the task internal was redesign to have a minimal memory footprint for common scenarios, while also supporting more rare scenarios.
the following benchmark show improvement over Task creation:
performance (speedup):

memory allocation:

the following benchmark show improvement over creation of Task<T>:
performance (speedup):

memory allocation:

TaskCompletionSource
the following benchmark show improvement over creation of TaskCompletionSource:
performance (speedup):

memory allocation:

summary
TPL 4.5 performance was significantly improved both in terms of speedup and allocations.
you can see the full benchmarks information and more details about the inner stuff at TPL Performance Improvements in .Net 4.5 by Joseph E. Hoag.
Tpl Dataflow (ISourceBlock) - Part 3
the previous post discus the concept ITargetBlock
which is the TDF consumer contract.
this post will focus on the source block which is the producer contract.
as mention in previous post, sources and targets engage in a protocol for transferring messages between them.

Source Block:
the source block main responsibility is to produce (or manipulate) data which will be consume by the target.
as we learn in previous post the target may consume the data either directly (push) or indirectly (pull).
a target can be attached to source using the ISourceBlock<T> LinkTo method :
LinkTo:
link to provide the ability to link one or more targets to a source, this is how we can build a Dataflow network.
when target is linked to sources the sources automatically propagate any data they contain to targets (asynchronously upon the source distribution strategy).
Detaching linked target from a source:
the LinkTo method returns a IDisposable which can be use to detach the target from the source (at nay time).
Dataflow network:
actually we can dynamically change the Dataflow network at runtime while attaching and detaching blocks.
Pull:
When source push data to a target, the protocol employed may allow the target to simply accept and take ownership on the offered data, or it may require the target to communicate back to the source (pull).
see the previous post for more details.
Race condition:
when a target is pulling data out of one or more sources it may have to ensure that the data does not simultaneously consumed by other targets.
it is getting even more complicate when a target try to atomically consume messages from multiple sources (which mean either succeeding to consume all messages on none).
Two-phase commit:
by using the two-phase commit protocol the target can atomically consume multiple messages (from one or more sources).
the ISourceBlock<T> define the following methods in order to enable two-phase commit protocol:
- ConsumeMeassage (commit)
- ReserveMessage (prepare)
- ReleaseMessage (roll-back)
Code Snippet
- interface ISourceBlock<out TOutput> : IDataflowBlock
- {
- IDisposable LinkTo(
- ITargetBlock<TOutput> target,
- bool unlinkAfterOne);
-
- TOutput ConsumeMessage(
- DataflowMessageHeader messageHeader,
- ITargetBlock<TOutput> target,
- out bool messageConsumed);
- bool ReserveMessage(
- DataflowMessageHeader messageHeader,
- ITargetBlock<TOutput> target);
- void ReleaseReservation(
- DataflowMessageHeader messageHeader,
- ITargetBlock<TOutput> target);
- }
Two-phase commit in action:
when target want to atomically consume messages from one or more sources:
- it try to reserve the messages on each source (the message header has the message identifier).
- if it failed to reserve one of the messages, it should release other reservation using ReleaseResarvation (roll back).
- if all reservation succeed it can safely consume the messages, using ConsumeMessage (commit).
Consume Message:
even without two-phase commit, the ConsumeMessage design to handle race condition.
when ever we call the consume message, the source block assign the consume message's messageConsumed out parameter to indicate whether the consume operation has succeed. this way the source can ensure that the message will be consume by a single target and avoid race conditions.
Summary
using the TDF source and target contract we can apply push and pulling strategy while avoiding race condition.
in the following posts I will speak about the blocks house keeping contract and propagating.
Tpl Dataflow (ITargetBlock) - Part 2
in this post will focus on one of the TPL Dataflow main contract called target.
previous post can be found here.
TPL Dataflow is built upon the concept of Producer / Consumer pattern, which represent by 2 interfaces.
ISourceBlock<T> which is the producer and
ITargetBlock<T> which is the consumer.
sources and targets engage in a protocol for transferring messages between them.
this post is focusing on the target (the source will be discus in the following post)
Target Block:
the target block interface defines a single method called OfferMessage which is responsible to decide whether to consume the offered message immediately (push), fetching the message at latter time (pull) or to decline the offer.
the following snippet show the target contract:
Code Snippet
- interface ITargetBlock<in TInput> : IDataflowBlock
- {
- DataflowMessageStatus OfferMessage(
- DataflowMessageHeader messageHeader,
- TInput messageValue,
- ISourceBlock<TInput> source,
- bool consumeToAccept);
- }
for push scenario the source pass the message to the messageValue parameter (line 5).
for pull scenario the target gets a message header (line 4) which contain a message id,
the source parameter (line 6) contain the message's sender, this way the target can ask for the message at latter time.
the consumeToAccept parameter (line 7) is a way for sources to propagate messages with reduced negotiation.
- false = the target should take the passed-in messageValue (push).
- true = the target should get the message by pulling the source.
the OfferMessage implementation decides whether or not to accept the message and it can project it using the return value.
the return value is a enum which have the following options:
- Accepted: accepted the message and take ownership.
- Declined: decline the offer.
- Postponed: postponed the offer
for potential consumption at a later time. - NotAvailable: failed to consume the message .
- DecliningPermanently: declined offer and all future offers sent by the source.
the 2011 SDP conference has just come to end.
and we want to thanks all attendants.

we have spoken about TPL and what’s new in TPL 4.5,
Aysnc and await, Rx (Reactive Extension) and TPL Dataflow.
the material for the our sessions is available here.
the link contain the material for both the first and second day (code samples).
links:
RxContrib project, Rxx Project, TPL Dataflow
New version of Rx Contrib
I have release new version of Rx Contrib
it is support both the Rx release (1.0.10621) and Rx experimental version (1.1.10621.0).
the Profile contract dll has renamed to System.Reactive.Contrib.Profile.Contracts
and the profiler visualizer has improved.
