DCSIMG
Rx - for beginners (part 6): Merge expression - Bnaya Eshet

Bnaya Eshet

Disclaimer

Rx - for beginners (part 6): Merge expression

Rx - for beginners (part 6): Merge expression

this post is the 6th in a series of posts about the new Reactive Framework (Rx).

the series TOC can found here.

Rx, IObservable,IObserver

in this post we will focus on the Merge expression.

 

What does it do?

the merge expression is used to merge multiple IObservable streams into single IObservable streams.

the marble diagrams of merge expression will look as follow:

Rx, IObservable,IObserver

each value on the source streams is project into the result stream until all

the source streams complete.

 

if one of the stream raise exception the error will be project into the result stream

and dispose the subscriptions (the result stream will stop listening to any of the streams).

the marble diagram for this case will look as follow:

Rx, IObservable,IObserver

 

Code Sample

the code sample can be download from here.

 

the sample for this post is merging 3 streams that is running on different time interval

into one stream.

Code Snippet
  1. private static IObservable<string> CreateObservable(
  2.             string prefix,int stopAt, double secondsInterval)
  3.         {
  4.             var inteval = TimeSpan.FromSeconds(secondsInterval);
  5.             var eventStream = Observable.Interval(inteval).
  6.                 TakeWhile(value => value < stopAt).
  7.                 Select(value => prefix + value.ToString());
  8.  
  9.             return eventStream;
  10.         }

the code is using the CreateObservable method in order to create each of the source stream.

line 5, Observable.Interval is creating stream with increasing value for each interval.

line 6, is stopping the after stopAt iterations.

line 7, formatting the value of the stream to string with prefix and the interval value.

 

the main method is looking as follow:

Code Snippet
  1. static void Main(string[] args)
  2.         {
  3.             var xs = CreateObservable("X",5, 0.3);
  4.             var ys = CreateObservable("   Y",2, 1);
  5.             var zs = CreateObservable("      Z",7, 0.6);
  6.  
  7.             //var mergedStream = xs.Merge(ys).Merge(zs);
  8.             var mergedStream = Observable.Merge(xs,ys,zs);
  9.             mergedStream.Subscribe(value => Console.WriteLine(value));
  10.             Console.ReadKey();
  11.         }

Lines 3-5, creating source streams.

line 8, creating the merged stream.

line9, subscribe to the merged stream.

 

the following is the output:

Rx, IObservable,IObserver

Summary

Merge is exactly what we expected, any value of the source streams will be project

into the  result stream unless exception occurs.

 

תגים של Technorati:‏ ,,

 

kick it on DotNetKicks.com


Comments

No Comments