DCSIMG
Rx - for beginners (part 8): Combine Latest expression - Bnaya Eshet

Bnaya Eshet

Disclaimer

Rx - for beginners (part 8): Combine Latest expression

Rx - for beginners (part 8): Combine Latest expression

Rx, IObservable,IObserver

this post is the 8th in a series of posts about the new Reactive Framework (Rx).

the series TOC can found here.

 

in this post we will focus on the Combine Latest expression.

the code for this post can be download from here.

 

What does it do?

like the zip expression the Combine Latest expression is used to synchronize

2 IObservable streams into single IObservable stream.

unlike the zip expression the Combine Latest doesn't using queue,

as it name suggest it only remember the latest value of each stream.

 

What is the combination strategy?

the strategy used by the Combine Latest is to observe both stream, and each time

it new value observed on either of the stream it combine it with the latest value observed on the other stream.

 

the marble diagrams of Combine Latest expression will look as follow:

Rx, IObservable,IObserver

we can see that when the y stream observed the b value, the result stream

combine it with the latest value observed on x stream (3) and latter the same value

will be combine into the result stream when the x stream will observed the 4 value.

 

When does it halt?

the Combine Latest processing will come to end either when one of the stream

will complete or throw exception.

 

Code sample

the code for this post can be download from here.

 

Helper method for creating interval stream:

Code Snippet
  1. private static IObservable<string> CreateObservable(
  2.     string prefix,int stopAt, double secondsInterval)
  3. {
  4.     var inteval = TimeSpan.FromSeconds(secondsInterval);
  5.     var eventStream = Observable.Interval(inteval).
  6.         TakeWhile(value => value < stopAt).
  7.         Select(value => prefix + value.ToString());
  8.  
  9.     return eventStream;
  10. }

the method parameters include (line 2):

  • prefix: just so we can distinguish values that was created by one stream from
    values created on the other (see line 7).
  • stop at: define how many values will be produce by the stream before complete (see line 6).
  • seconds interval: define the frequency that the stream produce values (see lines 4,5)
The main method is looking as follow:
Code Snippet
  1. static void Main(string[] args)
  2. {
  3.     var xs = CreateObservable("X",15, 0.5);
  4.     var ys = CreateObservable("Y",2, 2);
  5.  
  6.     var combineLatestStream = xs.CombineLatest(ys, (x,y) => x + " : " + y);
  7.     combineLatestStream.Subscribe(value => Console.WriteLine(value),() => Console.WriteLine("Completed"));
  8.     Console.ReadKey();
  9. }

line 3, create stream which will produce 15 values before completion at frequency of 0.5 seconds.

line 4, create stream which will produce 2 values before completion at frequency of 2 seconds.

line 6, is combine both streams, the Lamda is using to concatenate both stream's values.

The result will look as follow:

Rx, IObservable,IObserver 

 
Summary

Combine Latest expression can be used to synchronize 2 streams,

while the output stream will get notified when ever either of the stream produce new value. 

 

תגים של Technorati:‏ ,,

 

kick it on DotNetKicks.com


Comments

No Comments