Rx – for beginners (part 7): Zip expression

2010/03/05

Rx – for beginners (part 7): Zip expression

this post is the 7th in a series of posts about the new Reactive Framework (Rx).

the series TOC can found here.

 Rx, IObservable,IObserver

in this post we will focus on the Zip expression.

the code for this post can be download from here.

 
What does it do?

the zip expression is used to synchronize 2 IObservable streams into single IObservable stream.

 

How does it do it?

it is taking the first observed value on either of the stream and wait for value from the other stream.

after both stream produce a value, the zip will project the value combination,

then it will wait for the next couple.

 

What happens when the stream frequency is not equals?

if one of the stream produce values more frequently than the other, those values will be queue in memory

and each time that the slower stream will produce a value, a single value will be fetched out of the fast stream’s queue,

and combined with the slower stream’s value.

that mean that you should be aware for possibly side effects, in terms of memory usages.

 

the marble diagrams of zip expression will look as follow:

Rx, IObservable,IObserver

 

When does it halt?

the zip processing will come to end either when one of the stream

will complete or when either of the stream will throw exception.

…..

actually it is a bit more complex,

when one of the  stream complete, and still having value within the queued,

the complete notification will be put into the queue.

which mean that the actual complete will take place only the queue become dry

or when the other stream will notify completion.

 

the marble diagram for this scenario will look as follow:

Rx, IObservable,IObserver

 

if either of the stream is throwing exception the exception will immediately

project into the result stream and halt the zip operation.

Rx, IObservable,IObserver

 

Code sample

the code for this post can be download from here.

 

Helper method for creating interval stream:
Code Snippet
  1. private static IObservable<string> CreateObservable(
  2.     string prefix,int stopAt, double secondsInterval)
  3. {
  4.     var inteval = TimeSpan.FromSeconds(secondsInterval);
  5.     var eventStream = Observable.Interval(inteval).
  6.         TakeWhile(value => value < stopAt).
  7.         Select(value => prefix + value.ToString());
  8.  
  9.     return eventStream;
  10. }

the method parameters include (line 2):

  • prefix: just so we can distinguish values that was created by one stream from
    values created on the other (see line 7).
  • stop at: define how many values will be produce by the stream before complete (see line 6).
  • seconds interval: define the frequency that the stream produce values (see lines 4,5)

 

The main method is looking as follow:
Code Snippet
  1. static void Main(string[] args)
  2. {
  3.     var xs = CreateObservable("X",5, 0.1);
  4.     var ys = CreateObservable("   Y",3, 1);
  5.  
  6.     var zippedStream = xs.Zip (ys, (x, y) => x + " : " + y);
  7.     zippedStream.Subscribe(value => Console.WriteLine(value),() => Console.WriteLine("Complete"));
  8.     Console.ReadKey();
  9. }

line 3, create stream which will produce 5 values before completion at frequency of 0.1 seconds.

line 4, create stream which will produce 3 values before completion at frequency of 1 seconds.

line 6, is zipping both streams, the Lamda is using to concatenate both stream’s values.

 

The result will look as follow:

Rx, IObservable,IObserver

 

Summary

Zip expression can be used to synchronize 2 streams,

but we should be carful of possible memory side effects. 

 

תגים של Technorati:‏ ,,

 

kick it on DotNetKicks.com


Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*