Rx – for beginners (part 7): Zip expression
this post is the 7th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Zip expression.
the code for this post can be download from here.
What does it do?
the zip expression is used to synchronize 2 IObservable streams into single IObservable stream.
How does it do it?
it is taking the first observed value on either of the stream and wait for value from the other stream.
after both stream produce a value, the zip will project the value combination,
then it will wait for the next couple.
What happens when the stream frequency is not equals?
if one of the stream produce values more frequently than the other, those values will be queue in memory
and each time that the slower stream will produce a value, a single value will be fetched out of the fast stream’s queue,
and combined with the slower stream’s value.
that mean that you should be aware for possibly side effects, in terms of memory usages.
the marble diagrams of zip expression will look as follow:
When does it halt?
the zip processing will come to end either when one of the stream
will complete or when either of the stream will throw exception.
…..
actually it is a bit more complex,
when one of the stream complete, and still having value within the queued,
the complete notification will be put into the queue.
which mean that the actual complete will take place only the queue become dry
or when the other stream will notify completion.
the marble diagram for this scenario will look as follow:
if either of the stream is throwing exception the exception will immediately
project into the result stream and halt the zip operation.
Code sample
the code for this post can be download from here.
Helper method for creating interval stream:
- private static IObservable<string> CreateObservable(
- string prefix,int stopAt, double secondsInterval)
- {
- var inteval = TimeSpan.FromSeconds(secondsInterval);
- var eventStream = Observable.Interval(inteval).
- TakeWhile(value => value < stopAt).
- Select(value => prefix + value.ToString());
- return eventStream;
- }
the method parameters include (line 2):
- prefix: just so we can distinguish values that was created by one stream from
values created on the other (see line 7). - stop at: define how many values will be produce by the stream before complete (see line 6).
- seconds interval: define the frequency that the stream produce values (see lines 4,5)
The main method is looking as follow:
- static void Main(string[] args)
- {
- var xs = CreateObservable("X",5, 0.1);
- var ys = CreateObservable(" Y",3, 1);
- var zippedStream = xs.Zip (ys, (x, y) => x + " : " + y);
- zippedStream.Subscribe(value => Console.WriteLine(value),() => Console.WriteLine("Complete"));
- Console.ReadKey();
- }
line 3, create stream which will produce 5 values before completion at frequency of 0.1 seconds.
line 4, create stream which will produce 3 values before completion at frequency of 1 seconds.
line 6, is zipping both streams, the Lamda is using to concatenate both stream’s values.
The result will look as follow:
Summary
Zip expression can be used to synchronize 2 streams,
but we should be carful of possible memory side effects.