Rx - for beginners (part 6): Merge expression
Rx - for beginners (part 6): Merge expression
this post is the 6th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.

in this post we will focus on the Merge expression.
What does it do?
the merge expression is used to merge multiple IObservable streams into single IObservable streams.
the marble diagrams of merge expression will look as follow:
each value on the source streams is project into the result stream until all
the source streams complete.
if one of the stream raise exception the error will be project into the result stream
and dispose the subscriptions (the result stream will stop listening to any of the streams).
the marble diagram for this case will look as follow:
Code Sample
the code sample can be download from here.
the sample for this post is merging 3 streams that is running on different time interval
into one stream.
Code Snippet
- private static IObservable<string> CreateObservable(
- string prefix,int stopAt, double secondsInterval)
- {
- var inteval = TimeSpan.FromSeconds(secondsInterval);
- var eventStream = Observable.Interval(inteval).
- TakeWhile(value => value < stopAt).
- Select(value => prefix + value.ToString());
-
- return eventStream;
- }
the code is using the CreateObservable method in order to create each of the source stream.
line 5, Observable.Interval is creating stream with increasing value for each interval.
line 6, is stopping the after stopAt iterations.
line 7, formatting the value of the stream to string with prefix and the interval value.
the main method is looking as follow:
Code Snippet
- static void Main(string[] args)
- {
- var xs = CreateObservable("X",5, 0.3);
- var ys = CreateObservable(" Y",2, 1);
- var zs = CreateObservable(" Z",7, 0.6);
-
- //var mergedStream = xs.Merge(ys).Merge(zs);
- var mergedStream = Observable.Merge(xs,ys,zs);
- mergedStream.Subscribe(value => Console.WriteLine(value));
- Console.ReadKey();
- }
Lines 3-5, creating source streams.
line 8, creating the merged stream.
line9, subscribe to the merged stream.
the following is the output:
Summary
Merge is exactly what we expected, any value of the source streams will be project
into the result stream unless exception occurs.
תגים של Technorati:
Rx,
IObservable,
IObserver