Rx – Window

2011/12/31

Rx – Window

continuing with the Rx series, this post will discuss the Window operator.

Rx, Reactive Extension,Window,Buffer, IObservable, IObserver

in previous post I was discussing the Buffer operator which enable buffering of Rx datum stream into chunks.

has good and useful as the Buffer operator is, it doesn’t nail up every single scenario.

let consider a scenario of tracing the highest and lower value within a time period. for example hourly tracking of a service monitoring which produce values every second.

technically we can use the Buffer operator for handling this scenario. the problem is that buffering a hour of data for each of our services will end up with 3660 long living items for each of the services.

this will lead us to:

  • item which survive the GC Gen 0,1 collection.
  • large object heap allocation in case large chunks.

the point is that in this scenario buffering the data is doesn’t needed at all, because calculating the highest and lower value can be done on the fly by comparing the current value against the previous one.

The right operator for this scenario

so we need some kind of window (either of a time period, fix item count or the combination of the 2) which will project value within it scope without buffering.

Rx does have such operator which is the Window operator.

in contrast to the Buffer operator which return IObservable<IList<T>> the Window operator return IObservable<IObservable<T>>.

at first glance it seem rather odd signature, but I will right about to show how powerful is this concept and how can you take advantage of it.

but before we will get into the implementation details we rather take a look on the differences between the Buffer and Window operators.

the Buffer marble diagram:

Rx, Reactive Extension,Window,Buffer, IObservable, IObserver

the buffer accumulate the item internally until the end of its buffering period and then project the accumulated values as IList<T>.

the Window marble diagram is:

Rx, Reactive Extension,Window,Buffer, IObservable, IObserver

unlike the Buffer, which is caching the item internally, the Window does not cache the items at all, each item is immediately project through IObservable<T>.OnNext.

Window support sliding window and the custom periods API, just like the Buffer operator do (read more on that at the Buffer operator post).

sliding Window will share the same item in multiple windows:

Rx, Reactive Extension,Window,Buffer, IObservable, IObserver

Better memory utilization

back to our initial goal, the following code is using an Aggregate operator over a Window and calculate the min / max value for each period.

I will use the following class of aggregation

Code Snippet
  1. class MinMaxItem
  2.     {
  3.         public long? Min { get; private set; }
  4.         public long? Max { get; private set; }
  5.  
  6.         public static MinMaxItem Calc(MinMaxItem instance, long value)
  7.         {
  8.             if (!instance.Min.HasValue)
  9.             {
  10.                 instance.Min = value;
  11.                 instance.Max = value;
  12.             }
  13.             else
  14.             {
  15.                 instance.Min = Math.Min(instance.Min.Value, value);
  16.                 instance.Max = Math.Max(instance.Max.Value, value);
  17.             }
  18.             return instance;
  19.         }
  20.     }

and the following code demonstrate the plumbing:

Code Snippet
  1. var xs = Observable.Interval(TimeSpan.FromMilliseconds(0.1));
  2.  
  3. IObservable<MinMaxItem> minMax =
  4.     from win in xs.Window(10)
  5.         from item in win.Aggregate(
  6.             new MinMaxItem(),
  7.             MinMaxItem.Calc)
  8.         select item;
  9.  
  10. minMax.Subscribe (item => Console.WriteLine(
  11.     "Min = {0} /tMax = {1}",
  12.     item.Min.Value, item.Max.Value));

the above code snippet is using the from keyword twice (line 4 and 5) which is in fact using the SelectMany operator. it is used to extract the inner IObservable<T> out of the IObservable<IObservable<T>>.

other alternative is to use the Switch operator, which also extract the inner IObservable<T> out of Coverable<IObservable<T>>.

in this case the code will look like:

Code Snippet
  1. IObservable<IObservable<MinMaxItem>> minMax =
  2.     from win in xs.Window(10)
  3.     select win.Aggregate(
  4.         new MinMaxItem(),
  5.         MinMaxItem.Calc);
  6.  
  7. minMax.Switch().Subscribe(item => Console.WriteLine(
  8.     "Min = {0} /tMax = {1}",
  9.     item.Min.Value, item.Max.Value));

notice the Switch operator at line 7.

Summary

I have shown the Window operator and it’s capabilities.

because IObservable<IObservable<T>> is quit a complex API, Window is usually come with other operator like Aggregate, SelectMany or other operator which I will discuss on latter posts.

the Switch operator is very handy when you want to flatten the Window output (it is simply merge the inner observable streams into single result).

an alternative to Switch is the SelectMany operator which can handle each of the Window streams separately (it can be used directly or by the nested from syntax).

you can read more on Window memory benefit on James Miles’s post about using Rx Window in stock exchange scenario.

and be aware of some parallelism issues involve with Window, Aggregate and SelectMany which you can read more about in this thread of the Rx forum which discuss some of the pitfall, API request and alternative API for the window aggregation scenario.

finally you should use the operator that is most appropriate for your scenario, for example the Buffer operator is great for balance IO operations, and the Window operator is great to reduce memory pressure upon aggregation.

Shout it

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>