Rx - for beginners (part 8): Combine Latest expression
Rx - for beginners (part 8): Combine Latest expression

this post is the 8th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Combine Latest expression.
the code for this post can be download from here.
What does it do?
like the zip expression the Combine Latest expression is used to synchronize
2 IObservable streams into single IObservable stream.
unlike the zip expression the Combine Latest doesn't using queue,
as it name suggest it only remember the latest value of each stream.
What is the combination strategy?
the strategy used by the Combine Latest is to observe both stream, and each time
it new value observed on either of the stream it combine it with the latest value observed on the other stream.
the marble diagrams of Combine Latest expression will look as follow:
we can see that when the y stream observed the b value, the result stream
combine it with the latest value observed on x stream (3) and latter the same value
will be combine into the result stream when the x stream will observed the 4 value.
When does it halt?
the Combine Latest processing will come to end either when one of the stream
will complete or throw exception.
Code sample
the code for this post can be download from here.
Helper method for creating interval stream:
Code Snippet
- private static IObservable<string> CreateObservable(
- string prefix,int stopAt, double secondsInterval)
- {
- var inteval = TimeSpan.FromSeconds(secondsInterval);
- var eventStream = Observable.Interval(inteval).
- TakeWhile(value => value < stopAt).
- Select(value => prefix + value.ToString());
-
- return eventStream;
- }
the method parameters include (line 2):
- prefix: just so we can distinguish values that was created by one stream from
values created on the other (see line 7). - stop at: define how many values will be produce by the stream before complete (see line 6).
- seconds interval: define the frequency that the stream produce values (see lines 4,5)
The main method is looking as follow:
Code Snippet
- static void Main(string[] args)
- {
- var xs = CreateObservable("X",15, 0.5);
- var ys = CreateObservable("Y",2, 2);
-
- var combineLatestStream = xs.CombineLatest(ys, (x,y) => x + " : " + y);
- combineLatestStream.Subscribe(value => Console.WriteLine(value),() => Console.WriteLine("Completed"));
- Console.ReadKey();
- }
line 3, create stream which will produce 15 values before completion at frequency of 0.5 seconds.
line 4, create stream which will produce 2 values before completion at frequency of 2 seconds.
line 6, is combine both streams, the Lamda is using to concatenate both stream's values.
The result will look as follow:
Summary
Combine Latest expression can be used to synchronize 2 streams,
while the output stream will get notified when ever either of the stream produce new value.
תגים של Technorati:
Rx,
IObservable,
IObserver