DCSIMG
January 2012 - Posts - Bnaya Eshet

Bnaya Eshet

Disclaimer

January 2012 - Posts

Tpl Dataflow walkthrough - Part 5

Tpl Dataflow walkthrough - Part 5

this post is a complete walkthrough of a web crawler sample that was build purely by using Tpl Dataflow.

it was built on .NET 4.5 / C# 5 (on a virtual machine using VS 11).

I will analyze each part of this sample, both by discussing the Dataflow blocks and the patterns in used.

the sample code is available in here (it is a VS 11 project).

TDF, Tpl,Dataflow, ITargerBlock, ISorceBlock, IDataBlobk, Transform

during the walkthrough you will see the following Tpl Dataflow blocks:

  • TransformBlock
  • TransformManyBlock
  • ActionBlock
  • BroadcastBlock

you will see how the aysnc / await signature of the Dataflow blocks is better for executing an IO bound operation (without freezing a worker ThreadPool thread).

I should also mention that this post is part of the Tpl Dataflow series which you better read before reading this one.

Disclamation: the web crawler sample is for educational purpose only (running web crawler application may be forbidden by the low of your country).

The sample topography:

Tpl Dataflow application is usually a collection of agents which is linked together in order to compose a complete solution. each agent is having its own responsibilities and concerns. the following diagram present the agent topography for this sample:

TDF, Tpl,Dataflow, ITargerBlock, ISorceBlock, IDataBlobk, Transform 

agents block type and responsibilities

Downloader: the responsibility of the downloader is to download the html of a web page. it is using a TransformBlock<Tin, Tout> which belong to the executer block family. the transform block is getting a url as the input message and it produce the page's html as it output.

the transform block is construct from:

  • input buffer (for url)
  • task (do the transformation)
  • output buffer (for the downloaded html)

the task is taking one message at a time from the input buffer, transform the message by a Func<Tin, Tout> delegate which it get as a constructor parameter and put the result in the output buffer, where it is available for other blocks to consume.

later we will see that our crawler transformation is actually taking Func<Tin, Task<Tout>> which is a better signature for IO bound operations (I will discuss it latter).

TDF, Tpl,Dataflow, ITargerBlock, ISorceBlock, IDataBlobk, Transform

the transform block is a propagator block which mean that it exposed both as a target and a source block. it is implementing IPropagatorBlock<Tin, Tout>.

the following snippet show that IPropagatorBlock is simply an encapsulation of ITargetBlock and ISourceBlock.

Code Snippet
  1. public interface IPropagatorBlock<in TInput, out TOutput>
  2.     : ITargetBlock<TInput>,
  3.       ISourceBlock<TOutput>,
  4.       IDataflowBlock
  5. {
  6. }
Start crawling
Code Snippet
  1. var downloader = new TransformBlock<string, string>(
  2.     async (url) =>
  3.     {
  4.         // using IOCP the thread pool worker thread does return to the pool
  5.         WebClient wc = new WebClient();
  6.         string result = await wc.DownloadStringTaskAsync(url);
  7.         return result;
  8.     }, downloaderOptions);

as I was mentioning earlier the downloader contractor is getting Func<Tin, Task<Tout>>, therefore we can apply an async lambda expression (line 2). the code await for downloading (at line 6).

if you are not familiar with the async / await concept you can read this post or more posts in here.

anyway while awaiting for the download (DownloadStringTaskAsync) the block's task is actually return its worker thread to the ThreadPool and take advantage of the IOCP (IO Completion Port), this is an IO bound operation which mean that no CPU resources is needed while the network card fetching the data from the network.
it is important to understand that while the network card is handling the request the agent's task does not fetching another message from the buffer, the task will be interrupt when the data will be available.

analyzing the html

the crawler is using 2 agent for analyze the downloaded html:

  • link parser (which will look for links elements <a href="..."/>)
  • image parser (which will look for image elements <image src="..."/>)

both agent should be link to the downloader agent.
the problem is that linking both agent directly to the downloader agent will result with starvation of one of the agent.
unlike Rx the most blocks forward messages into the first linked target that accept the message, and ignore other linked targets. which mean that the message will be handle by a single agent at a time.

broadcast behavior can be achieved by using a BroadcastBlock<T> which is part of the pure buffer family.
the broadcast block is construct from:

  • input buffer
  • task
  • output buffer of single item.

TDF, Tpl,Dataflow, ITargerBlock, ISorceBlock, IDataBlobk, Transform, IPropagatorBlock

the task is fetching a message from the input buffer and place it in the output buffer, from the output buffer the message submit to the linked block.

the broadcast block is getting a Func<T,T> delegate as a constructor parameter, the idea behind it is cloning (which will enable separation of the messages).
if you are passing a reference type message to multiple agents, without cloning, changes that made by one agent will be visible to all the other agents.

the broadcast block will use the cloning delegate before sending the message to the linked agents.
the cloning pattern will ensure that only single block is processing a message instance at a time, this will maintain the message ownership and avoid the needs of data synchronization for thread safety.

the crawler will use the following block definition for broadcasting:

Code Snippet
  1. var contentBroadcaster = new BroadcastBlock<string>(s => s);

in our case the html content is a string which is immutable, therefore no real cloning is needed.

the crawler will link the agents (blocks) to each other after the construction of all the relevant blocks, right now we are focusing on the agents themselves.

Link parser

the link parser is using the following regular expression in order to fetch all the links (<a href=..."/>) out from the html and extract the link's url.

Code Snippet
  1. private const string LINK_REGEX_HREF =
  2.     "\\shref=('|\\\")?(?<LINK>http\\://.*?(?=\\1)).*>";
  3. private static readonly Regex _linkRegexHRef =
  4.     new Regex(LINK_REGEX_HREF);

unlike the downloader agent which get a single input (url) and produce a single output (html),
the link parser produce multiple outputs (links) per each input (html).
you can use the transform block and set the output type to array of links but the Tpl Dataflow is having a better block for this scenario.
because the processing of each link is independent of other links, it will be better if the transform output buffer will contain flatten links objects rather then a collection of link's array.

the crawler is using the TransformManyBlock<Tin,Tout>. this block is similar to the transform block with only one difference, the delegate at the constructor parameter is one of the following delegates:

  • Func<Tin, IEnumerable<Tout>>
  • Func<Tin, Task<IEnumerable<Tout>>>

the block task will extract the outputs results and put each of the extracted result, separately, in the output buffer.

TDF, Tpl,Dataflow, ITargerBlock, ISorceBlock, IDataBlobk, Transform, IPropagatorBlock 

this is the code for the link parser agent:

Code Snippet
  1. var linkParser = new TransformManyBlock<string, string>(
  2.        (html) =>
  3.        {
  4.            var output = new List<string>();
  5.            var links = _linkRegexHRef.Matches(html);
  6.            foreach (Match item in links)
  7.            {
  8.                var value = item.Groups["LINK"].Value;
  9.                output.Add(value);
  10.            }
  11.            return output;
  12.        });

it is very straight forward, parse each html by using regex and return list of result which the block will extract into the output buffer.

Image parser

the image parser is quit similar to the link parser.
the only differences is that it using different regular expression which extract the image's url.

the regex part is:

Code Snippet
  1. private const string IMG_REGEX =
  2.     "<\\s*img [^\\>]*src=('|\")?(?<IMG>http\\://.*?(?=\\1)).*>\\s*([^<]+|.*?)?\\s*</a>";
  3. private static readonly Regex _imgRegex =
  4.     new Regex(IMG_REGEX);

and the parser agent code is:

Code Snippet
  1. var imgParser = new TransformManyBlock<string, string>(
  2.         (html) =>
  3.         {
  4.             var output = new List<string>();
  5.             var images = _imgRegex.Matches(html);
  6.             foreach (Match item in images)
  7.             {
  8.                 var value = item.Groups["IMG"].Value;
  9.                 output.Add(value);
  10.             }
  11.             return output;
  12.         });
writer agent

the last operational agent is the writer agent which will download the an image from a url and save it to the local disk.

the writer is using a simple action block, which is a simple executer block that have an input buffer and a task.

TDF, Tpl,Dataflow, ITargerBlock, ISorceBlock, IDataBlobk, Transform, IPropagatorBlock

the task is fetching messages from the buffer and execute a delegate which is given as constructor parameter.
the delegate signature can be either Action<T> or Funk<T, Task>. the latter one is great for IO bound operation (from the same reasons discussed earlier when we was looking on the transform block signature).

because the writer is doing 2 IO bound operations:

  • download the image from the web
  • write the image to the file system

the crawler is using the Funk<T, Task> signature.
the writer code is:

Code Snippet
  1. var writer = new ActionBlock<string>(async url =>
  2. {
  3.     WebClient wc = new WebClient();
  4.     // using IOCP the thread pool worker thread does return to the pool
  5.     byte[] buffer = await wc.DownloadDataTaskAsync(url);
  6.     string fileName = Path.GetFileName(url);
  7.  
  8.     string name = @"Images\" + fileName;
  9.  
  10.     using (Stream srm = File.OpenWrite(name))
  11.     {
  12.         await srm.WriteAsync(buffer, 0, buffer.Length);
  13.     }
  14. });

the first await at line 5, is awaiting until the task will be interrupt by the network card,
and the second await at line 12, will await until it will be interrupt by the file system controller.

you may have been notice that the second await is within a using block, you can read more about this topic at this post.

link it together

right now we are having most of our building blocks and it is time to define the data-flow by linking the block to each other.

the downloader should be link to the content broadcaster which in tern should be linked both to the image and link parser, the image parser should be linked to the writer and the link parser should be linked back to the downloader (so it can crawl farther).

but there is one last issue.
it happens that some web page is having links that is targeting an image. this lead us to more complex linking where the link parser should be linked both to the downloader and having conditional link to the writer for those url that is having an image suffix.
as we discuss earlier having a direct link from the link parser to both the downloader and the writer will results with starvation of one of those agents.
we do need a final broadcast block which will handle this distribution task.

Code Snippet
  1. var linkBroadcaster = new BroadcastBlock<string>(s => s);

the link parser will be linked to the broadcaster and the broadcaster will be liked to both the downloader and the writer.

we have spoke of the conditional link from the link parser and the writer, but it will be more effective if the link parser to the downloader will be link only those pages that are most likely having useful data like php, aspx, htm, ext...

Filtering linked messages

the following predicates will be use in order to filter linked messages:

Code Snippet
  1. StringComparison comparison = StringComparison.InvariantCultureIgnoreCase;
  2. Predicate<string> linkFilter = link =>
  3.     link.IndexOf(".aspx", comparison) != -1 ||
  4.     link.IndexOf(".php", comparison) != -1 ||
  5.     link.IndexOf(".htm", comparison) != -1 ||
  6.     link.IndexOf(".html", comparison) != -1;
  7. Predicate<string> imgFilter = url =>
  8.     url.EndsWith(".jpg", comparison) ||
  9.     url.EndsWith(".png", comparison) ||
  10.     url.EndsWith(".gif", comparison);

the first predicate (line 2) will filter the downloader agent target and the second (line 7) will filter the link parser result which is targeting the writer agent.

compose the data-flow

finally we got to the agent composition.

TDF, Tpl,Dataflow, ITargerBlock, ISorceBlock, IDataBlobk, Transform, IPropagatorBlock

Code Snippet
  1. IDisposable disposeAll = new CompositeDisposable(
  2.     // from [downloader] to [contentBroadcaster]
  3.     downloader.LinkTo(contentBroadcaster),
  4.     // from [contentBroadcaster] to [imgParser]
  5.     contentBroadcaster.LinkTo(imgParser),
  6.     // from [contentBroadcaster] to [linkParserHRef]
  7.     contentBroadcaster.LinkTo(linkParser),
  8.     // from [linkParser] to [linkBroadcaster]
  9.     linkParser.LinkTo(linkBroadcaster),
  10.     // conditional link to from [linkBroadcaster] to [downloader]
  11.     linkBroadcaster.LinkTo(downloader, linkFilter, true),
  12.     // from [linkBroadcaster] to [writer]
  13.     linkBroadcaster.LinkTo(writer, imgFilter, true),
  14.     // from [imgParser] to [writer]
  15.     imgParser.LinkTo(writer));

each LinkTo operation return a disposable instance which can be use to dispose the link when it no longer needed. the crawler compose all those disposable together into a single disposable called dispose All by using the CompositeDisposable which is part of the Rx library.

you can see the conditional LinkTo at line 11 and 13.
is is very important to set the last parameter of the LinkTo to true if you don't want to dispose the link when the filter doesn't match the criteria.

summary

this post was a walkthrough of a web crawler sample.
the complete sample, which is available in here (VS 11), is also having exception handling, agent termination after x amount of seconds, prevention of processing the same url twice and more. for simplicity the code within this post was a simplified version.


Shout it

Rx - Aggregate vs. Scan

Rx - Aggregate vs. Scan

this post will focus on 2 Rx operators Aggregate and Scan.

Rx, Reactive extension, aggregate, scan, Iobservable, IObserver

both Aggregate and Scan are dealing with event stream accumulation, the only difference is that Aggregate produce single result (upon the stream completion)
and Scan present an ongoing runtime accumulation which react for each OnNext.

both operators has 2 overloads with the same signature:

Code Snippet
  1. IObservable<TSource> Aggregate<TSource>(
  2.     this IObservable<TSource> source,
  3.     Func<TSource, TSource, TSource> accumulator);
  4.  
  5. IObservable<TSource> Scan<TSource>(
  6.     this IObservable<TSource> source,
  7.     Func<TSource, TSource, TSource> accumulator);
  8.  
  9. IObservable<TAccumulate> Aggregate<TSource, TAccumulate>(
  10.     this IObservable<TSource> source,
  11.     TAccumulate seed,
  12.     Func<TAccumulate, TSource, TAccumulate> accumulator);
  13.  
  14. IObservable<TAccumulate> Scan<TSource, TAccumulate>(
  15.     this IObservable<TSource> source,
  16.     TAccumulate seed,
  17.     Func<TAccumulate, TSource, TAccumulate> accumulator);

the first overload (line 1,5) gets a simple accumulation Func<T,T,T> which get the previous accumulated value and the current value as parameters and should return new accumulated value (on the first accumulation the previous accumulated value will be default(T)).

the second overload define a seed value for the first accumulation and a Func<TAccumulate, TSource, TAccumulate> which get the previous accumulated value and the current value as parameters and should return new accumulated value.
notice that the accumulated value type can be different from the current value.

for example the following stream:

Code Snippet
  1. var xs = Observable.Range(1, 10);
  2.  
  3. var result = xs.Aggregate((acc, i) => acc + i);
  4. result.ForEach(item => Console.WriteLine(item));

will project a single result (55).

while the Scan version:

Code Snippet
  1. var xs = Observable.Range(1, 10);
  2.  
  3. var result = xs.Scan((acc, i) => acc + i);
  4. result.ForEach(item => Console.WriteLine(item));

will project each accumulation interval:

1
3
6
10
15
21
28
36
45
55

both operator can become very handy within a Window operator.

for more information about the Window operator see this post.

for example, you may want to accumulate stream of customers which enter a store on per hour base.

you can use the Window operator combine with the Aggregate operator to get per hour report
or using the Window combine with the Scan operation to get continues report per hour (it will let you to react immediately for a live data, for example you can react when more then 100 customer were enter the store within un hour or less).

the following code will demonstrate the aggregate scenario, but I should warn you, you are now stepping into some dark art code (which is the result of some concurrency behavior which I personally hope that the Rx team will address in the future in more intuitive way).

I consider to to add a few operator in future version of Rx Contrib which will handle this task more intuitively.

and I will also post a work-through series of how to use the Rx Contrib libraries.

what you will see is not the most intuitive code snippet but it is what you need in order to get the job done.

Code Snippet
  1. var storeStreamMock = Observable.Generate<Random, Unit>(
  2.     new Random(),   // random object
  3.     rnd => true,    // continue forever (exit term)
  4.     rnd => rnd,     // next iteration value (ignored)
  5.     rnd => Unit.Default, // projection (allways project Unit.Default)
  6.     rnd => TimeSpan.FromMilliseconds(rnd.Next(10, 100))); // deley between iterations
  7.  
  8. IObservable<Task<int>> accStream =
  9.     from win in storeStreamMock.Window(TimeSpan.FromSeconds(1))
  10.     select win.Aggregate(0, (acc, cur) => acc + 1).ToTask();
  11.  
  12. accStream
  13.     .ObserveOn(Scheduler.TaskPool)
  14.     .ForEach(item =>
  15.         Console.WriteLine(item.Result));

 

line 1-6 are generating a mock of store observable by using the Generate factory, you can completely ignore this part.

at line 9 we define a window of 5 second.

line 10 define the aggregation and export the aggregated value into a Task (TPL).

it is part of the dark art, otherwise we will end up with blocking and contentions.

the last part of the dark art is that you should process the result within the subscribe in parallel (line 13).

you can find different suggestion of how to complete such task in this thread.

Summary

both Scan and Aggregate are a very useful operators,

but you should be careful while using it within a Window. 


Shout it

Rx - Exception Handling

Rx - Exception Handling

this post will discuss exception handling within the Rx arena.

Rx, observable, observer, linq, exception handling, try, catch, finally, retry

handling event stream exception is not trivial,
for example observable should delegate exception to its subscribers though the OnError operation and cancel the subscription.
on the other hand the subscriber may want to response OnError state by renewing its subscription or fallback to alternative stream.

it is true that the Rx design guidelines suggest that faulted stream should not continue to produce data,
but real-world implementation such as stuck exchange stream (or other hot stream) may ignore this recommendation.

if you design such stream you can consider having a fault info, wrapped within the OnNext message (as a data property) instead of sending OnError state and leaving the OnError state for fatal fault which the stream cannot be recover from.

So how can you handle fault state?

Rx is having a few operator that response to OnError.

the first one is Retry which re-subscribe (forever or for specific number of failures).

for the Demonstration I will use the following observable (which produce OnError after the second OnNext):

Code Snippet
  1. var observable = Observable.Create<int>(
  2.     obs =>
  3.     {
  4.         obs.OnNext(1);
  5.         obs.OnNext(2);
  6.         obs.OnError(new SystemException());
  7.         return Disposable.Empty;
  8.     });

the following code re-subscribe 3 times before it do surrender to the evil exception.

Code Snippet
  1. observable
  2.     .Retry(3)
  3.     .Subscribe(
  4.         item => Console.WriteLine(item),
  5.         (ex) => Console.WriteLine(ex.Message),
  6.         () => Console.WriteLine("Complete"));

this scenario may be suitable for observable which download data from the network and response with an error when the network is not available (consider unreliable network).

the output will look like the following snapshot:

Rx, observable, observer, linq, exception handling, try, catch, finally, retry

sometimes it is not enough to re-subscribe and you have to define an alternative fallback stream.

consider stock exchange scenario, when ever specific stock provider has fail to supply the data you may want to switch and subscribe to different provider.

you can do so using the Catch operator:

just like try catch you can specify specific or have generic fallback strategy.

having the following fallback streams:

Code Snippet
  1. var fallback1 = Observable.Create<int>(
  2.     obs =>
  3.     {
  4.         for (int i = 0; i < 10; i++)
  5.             obs.OnNext(i);
  6.         return Disposable.Empty;
  7.     });
  8. var fallback2 = Observable.Create<int>(
  9.     obs =>
  10.     {
  11.         for (int i = 20; i < 23; i++)
  12.             obs.OnNext(i);
  13.         return Disposable.Empty;
  14.     });
  15. var fallback3 = Observable.Create<int>(
  16.     obs =>
  17.     {
  18.         for (int i = 30; i < 33; i++)
  19.             obs.OnNext(i);
  20.         return Disposable.Empty;
  21.     });

you can map the fallback using the following Rx code:

Code Snippet
  1. observable
  2.     .Catch((NullReferenceException ex) => fallback1)
  3.     .Catch((SystemException ex) => fallback2)
  4.     .Catch(fallback3)
  5.     .Subscribe(
  6.         item => Console.WriteLine(item),
  7.         (ex) => Console.WriteLine(ex.Message),
  8.         () => Console.WriteLine("Complete"));

lines 2-4 are mapping different fallbacks for different exceptions.

it will generate the following output:

Rx, observable, observer, linq, exception handling, try, catch, finally, retry

SystemException has thrown, therefore the fallback stream is starting at 20.

finally we can discuss the 3rd option.

sometimes you do not care whether the stream has stopped because it has complete or was faulted, all you really care about is to clear some resources.
in this case you can use the finally operator which will be trigger in both scenario, completed normally or in faulted state.

the following code demonstrate this API:

Code Snippet
  1. observable
  2.     .Finally(() => {/* clear some resources */})
  3.     .Subscribe(
  4.         item => Console.WriteLine(item),
  5.         (ex) => Console.WriteLine(ex.Message),
  6.         () => Console.WriteLine("Complete"));
Summary

Rx has some very useful operator which response to the OnError state, you can re-subscribe, switch into fallback stream of just handle the finalization state.


Shout it kick it on DotNetKicks.com

async / await, some reasoning

async / await, some reasoning

this post will try to make some reasoning about the .NET 4.5 / C#5 await keyword.

async, await, task, tpl, parallel, c#5, .NET 4.5

I will begin with a quiz.

how long will it take to the following method to produce the 42 value?

Code Snippet
  1. async Task<int> Execute()
  2. {
  3.     await Task.Delay(1000);
  4.     await Task.Delay(1000);
  5.     return 42;
  6. }

you should remember that conceptually the await keyword will translate to a continuation.

the above code can be compare to the following TPL 4 code snippet:

Code Snippet
  1. Task<int> Execute()
  2. {
  3.     Task t1 = Task.Factory.StartNew(
  4.         () => Thread.Sleep(1000));
  5.     Task t2 = t1.ContinueWith(t1_ =>
  6.         {
  7.             Thread.Sleep(1000);
  8.         });
  9.     Task<int> t3 = t2.ContinueWith(t2_ => 42);
  10.     return t3;
  11. }

whatever come after the await will be compile into a continuation closure.

therefore the 42 value will be produce after 2 second.

How can we await for multiple task?

there is couple of way for awaiting on multiple tasks, but the most recommended one is to use Task.WhenAll (you can also use Task.WhenAny to continue after the completion of the first task).

do not confuse the Task.WhenAll with Task.WaitAll, WhenAll is a continuation which happens when all the task come to completion, while WaitAll is a blocking API which will block the execution until all tasks will be completed.

the following snippet demonstrate the Task.WhenAll usage.

Code Snippet
  1. async Task<int> Execute()
  2. {
  3.     Task t1 = Task.Delay(1000);
  4.     Task t2 = Task.Delay(1000);
  5.     await Task.WhenAll(t1, t2);
  6.     return 42;
  7. }

the 42 value will now produce after 1 second.

it is somewhat equals to the following TPL 4 snippet:

Code Snippet
  1. Task<int> Execute()
  2. {
  3.     Task t1 = Task.Factory.StartNew(
  4.         () => Thread.Sleep(1000));
  5.     Task t2 = Task.Factory.StartNew(
  6.         () => Thread.Sleep(1000));
  7.     Task<int> t3 = Task.Factory.ContinueWhenAll(
  8.         new[]{t1, t2}, tsks => 42);
  9.     return t3;
  10. }

to wrap it up let think how long will it take to the following snippet to produce a value.

Code Snippet
  1. async Task<int> Execute()
  2. {
  3.     for (int i = 0; i < 10; i++)
  4.     {
  5.         await Task.Delay(1000);
  6.     }
  7.     return 42;
  8. }

the right answer is 10 seconds, each iteration will result in a continuation closure which will wrap the following iterations.

this can be translate to something like the following snippet (TPL 4).

Code Snippet
  1. Task<int> Execute()
  2. {
  3.     var stateMachine = new StateMachine();
  4.     return stateMachine.OnNext();
  5. }
  6.  
  7. class StateMachine
  8. {
  9.     private int _i;
  10.     private TaskCompletionSource<int> _semanticTask =
  11.         new TaskCompletionSource<int>();
  12.  
  13.     public Task<int> OnNext()
  14.     {
  15.         Interlocked.Increment(ref _i);
  16.  
  17.         Task t = Task.Factory.StartNew(() =>
  18.             Thread.Sleep(1000));
  19.         if (_i <= 10)
  20.             t.ContinueWith(t_ => OnNext());
  21.         else
  22.             _semanticTask.SetResult(42);
  23.  
  24.         return _semanticTask.Task;
  25.     }
  26. }

the execute method will create a state machine which will chain task continuation 10 times and then set the value of 42.
the TaskCompletionSource represent semantics of task (TAP - Task Async Pattern).

TaskCompletionSource does not produce any concurrency (doesn't attached to any thread), it just present a task which can be project result, exception or cancellation.

the OnNext method immediately return a semantic task which will signal as complete at line 22 (the exit term of the recursion).

Summary

await present a continuation. each time the code is hitting the await it construct a new continuation closure.


Shout it

Using async / await

Using async / await

this post will discuss parallel disposal.

async, await, parallel, task,tpl, using

whenever we want to dispose a parallel execution upon completion we can't use the convenient using keyword.

for example, the following code may be dispose the command before completion:

Very bad Code Snippet
  1. using (var conn = new SqlConnection(CONN_STR))
  2. using (var cmd = new SqlCommand("Select * from Employee", conn))
  3. {
  4.     conn.Open();
  5.     cmd.BeginExecuteReader(ar =>
  6.         {
  7.             int affected = cmd.EndExecuteNonQuery(ar);
  8.         });
  9. }

the using is absolutely wrong for the above sample.

what should be done is:

Code Snippet
  1. var conn = new SqlConnection(CONN_STR);
  2. var cmd = new SqlCommand("Select * from Employee", conn);
  3. conn.Open();
  4. cmd.BeginExecuteReader(ar =>
  5. {
  6.     int affected = cmd.EndExecuteNonQuery(ar);
  7.     cmd.Dispose();
  8.     conn.Dispose();
  9. }, null);

we can write it slightly friendlier by using the TPL FromAsync wrapper:

Code Snippet
  1. var conn = new SqlConnection(CONN_STR);
  2. var cmd = new SqlCommand("Select * from Employee", conn);
  3. conn.Open();
  4. Task<int> t = Task.Factory.FromAsync<int>(
  5.     cmd.BeginExecuteReader,
  6.     cmd.EndExecuteNonQuery,
  7.     null);
  8. t.ContinueWith(tsk =>
  9.     {
  10.         int affected = tsk.Result;
  11.         cmd.Dispose();
  12.         conn.Dispose();
  13.     });
but what if the compiler can rewrite our code?

in that case we can write a similar code to the code in the first code snippet and get it compiled into something like the above code snippet above.

this is exactly what happens when in .NET 4.5 / C#5 (async / await pattern).

when we write the following code:

Code Snippet
  1. async Task<int> ExecuteNonQueryAsync()
  2. {
  3.     int affected = 0;
  4.     using (var conn = new SqlConnection(CONN_STR))
  5.     using (var cmd = new SqlCommand("Select * from Employee", conn))
  6.     {
  7.         conn.Open();
  8.         affected = await Task.Factory.FromAsync<int>(
  9.             cmd.BeginExecuteReader,
  10.             cmd.EndExecuteNonQuery,
  11.             null);
  12.     }
  13.     return affected;  
  14. }

the compiler does rewrite this async method.

everything that follow the await keyword (line 8),

will be put into a continuation state machine, including the closing of the curly brackets (of the using).

Summary

using the new async / await pattern will dispose our resources on time.

using keyword is a very clean syntax and now we can apply it to parallel execution.


Shout it

async \ await and Exception Handling

async \ await and Exception Handling

this post will discuss how async / await is handling exceptions.

async, await, continuation, continue,exception, tpl,.net 4.5, c#5

as we mention in previous post, about the async / await concept, await is all about continuation.

before .NET 4.5 parallel execution exceptions has to be handle in separate of the synchronic handling.

for example:

handling ThreadPool execution:

Code Snippet
  1. void Foo()
  2. {
  3.     try
  4.     {
  5.         Console.WriteLine("Synchronic");
  6.         ThreadPool.QueueUserWorkItem(state =>
  7.             {
  8.                 try
  9.                 {
  10.                     Console.WriteLine("Parallel");
  11.                 }
  12.                 catch (Exception exAsync)
  13.                 {
  14.                     EventLog.WriteEntry("application", exAsync.ToString());
  15.                 }
  16.             }, null);
  17.     }
  18.     catch (Exception ex)
  19.     {
  20.         EventLog.WriteEntry("application", ex.ToString());
  21.     }
  22. }

as you can see we have to handle the parallel exception (line 12) in separate from the synchronic handling (line 18).

TPL has brought new option for handling parallel exception, now you can use ContinueWith with TaskContinuationOptions.OnlyOnFault (line 8).
but still you have to handle the parallel exception is in separate of the synchronic one:

Code Snippet
  1. void Foo()
  2. {
  3.     try
  4.     {
  5.         Console.WriteLine("Synchronic");
  6.         Task t = Task.Factory.StartNew(() => Console.WriteLine("Parallel"));
  7.         t.ContinueWith(tsk => EventLog.WriteEntry("application", tsk.Exception.ToString()),
  8.             TaskContinuationOptions.OnlyOnFaulted);
  9.     }
  10.     catch (Exception ex)
  11.     {
  12.         EventLog.WriteEntry("application", ex.ToString());
  13.     }
  14. }
async / await pattern

using the async / await pattern we can handle both synchronic and parallel exception in the same place:

Code Snippet
  1. async void Foo()
  2. {
  3.     try
  4.     {
  5.         Console.WriteLine("Synchronic");
  6.         await Task.Factory.StartNew(() => Console.WriteLine("Parallel"));
  7.     }
  8.     catch (Exception ex)
  9.     {
  10.         // handling both synchronic and parallel exceptions
  11.         EventLog.WriteEntry("application", ex.ToString());
  12.     }
  13. }

when we are using the async / await pattern at compile time the compiler convert our code into continuation state machine.
therefore the compiler can take the code within the catch area and apply it both for the synchronic and the parallel execution.

Summary

async / await pattern does simplify the exception handling. we do write our exception handling once and it will apply for both synchronic and parallel execution.


kick it on DotNetKicks.com

Rx - Sample

Rx - Sample

this post will focus on the Rx Sample operator.

IObservable,IObserver,Sample,Rx,Reactive

the Sample operation does sampling the observable stream and forward less intensive data stream of the sampled datum.

it can be prove very useful for scenario like handling
accelerometer stream which can produce 60 value per second, in some cases we don't need such intensity and
our machine resources may be happier to handle only 10 value per seconds.
the same may be apply to video stream analytics and many other scenario.

this is how the marble diagram look like:

IObservable,IObserver,Sample,Rx,Reactive

as you can see, nothing special it is just a time based filtering.

the Sample's API is:

Code Snippet
  1. var xs = Observable.Interval(TimeSpan.FromMilliseconds(0.1));
  2. var ys = xs.Sample(TimeSpan.FromMilliseconds(30));

the TimeSpan parameter define the sampling rate.

except of the TimeSpan overload the Sample operator
can be triggered upon a custom timing.

custom triggering is done by using an overload which accept IObservable as it sampling trigger.

the following example is using a random stream of data as the Sample trigger:

Code Snippet
  1. var rndStream = Observable.Create<Unit>(obs =>
  2.     {
  3.         var unsub = new BooleanDisposable();
  4.         Task.Factory.StartNew(() =>
  5.             {
  6.                 var rand = new Random();
  7.                 while (!unsub.IsDisposed)
  8.                 {
  9.                     Thread.Sleep(rand.Next(4000));
  10.                     obs.OnNext(Unit.Default);
  11.                 }
  12.             });
  13.         return unsub;
  14.     });
  15. var xs = Observable.Interval(TimeSpan.FromMilliseconds(0.1));
  16. var ys = xs.Sample(rndStream);
  17. ys.Subscribe(item => Console.Write("{0}, ", item));

as you can see ,at line 16, the xs stream will be sampled at random rate.

Summary

Sample is very straight forward operator which bring the ability of reducing stream intensity in cases which a lower data rate is better.


Shout it