Rx – Aggregate vs. Scan

2012/01/24

Rx – Aggregate vs. Scan

this post will focus on 2 Rx operators Aggregate and Scan.

Rx, Reactive extension, aggregate, scan, Iobservable, IObserver

both Aggregate and Scan are dealing with event stream accumulation, the only difference is that Aggregate produce single result (upon the stream completion)
and Scan present an ongoing runtime accumulation which react for each OnNext.

both operators has 2 overloads with the same signature:

Code Snippet
  1. IObservable<TSource> Aggregate<TSource>(
  2.     this IObservable<TSource> source,
  3.     Func<TSource, TSource, TSource> accumulator);
  4.  
  5. IObservable<TSource> Scan<TSource>(
  6.     this IObservable<TSource> source,
  7.     Func<TSource, TSource, TSource> accumulator);
  8.  
  9. IObservable<TAccumulate> Aggregate<TSource, TAccumulate>(
  10.     this IObservable<TSource> source,
  11.     TAccumulate seed,
  12.     Func<TAccumulate, TSource, TAccumulate> accumulator);
  13.  
  14. IObservable<TAccumulate> Scan<TSource, TAccumulate>(
  15.     this IObservable<TSource> source,
  16.     TAccumulate seed,
  17.     Func<TAccumulate, TSource, TAccumulate> accumulator);

the first overload (line 1,5) gets a simple accumulation Func<T,T,T> which get the previous accumulated value and the current value as parameters and should return new accumulated value (on the first accumulation the previous accumulated value will be default(T)).

the second overload define a seed value for the first accumulation and a Func<TAccumulate, TSource, TAccumulate> which get the previous accumulated value and the current value as parameters and should return new accumulated value.
notice that the accumulated value type can be different from the current value.

for example the following stream:

Code Snippet
  1. var xs = Observable.Range(1, 10);
  2.  
  3. var result = xs.Aggregate((acc, i) => acc + i);
  4. result.ForEach(item => Console.WriteLine(item));

will project a single result (55).

while the Scan version:

Code Snippet
  1. var xs = Observable.Range(1, 10);
  2.  
  3. var result = xs.Scan((acc, i) => acc + i);
  4. result.ForEach(item => Console.WriteLine(item));

will project each accumulation interval:

1
3
6
10
15
21
28
36
45
55

both operator can become very handy within a Window operator.

for more information about the Window operator see this post.

for example, you may want to accumulate stream of customers which enter a store on per hour base.

you can use the Window operator combine with the Aggregate operator to get per hour report
or using the Window combine with the Scan operation to get continues report per hour (it will let you to react immediately for a live data, for example you can react when more then 100 customer were enter the store within un hour or less).

the following code will demonstrate the aggregate scenario, but I should warn you, you are now stepping into some dark art code (which is the result of some concurrency behavior which I personally hope that the Rx team will address in the future in more intuitive way).

I consider to to add a few operator in future version of Rx Contrib which will handle this task more intuitively.

and I will also post a work-through series of how to use the Rx Contrib libraries.

what you will see is not the most intuitive code snippet but it is what you need in order to get the job done.

Code Snippet
  1. var storeStreamMock = Observable.Generate<Random, Unit>(
  2.     new Random(),   // random object
  3.     rnd => true,    // continue forever (exit term)
  4.     rnd => rnd,     // next iteration value (ignored)
  5.     rnd => Unit.Default, // projection (allways project Unit.Default)
  6.     rnd => TimeSpan.FromMilliseconds(rnd.Next(10, 100))); // deley between iterations
  7.  
  8. IObservable<Task<int>> accStream =
  9.     from win in storeStreamMock.Window(TimeSpan.FromSeconds(1))
  10.     select win.Aggregate(0, (acc, cur) => acc + 1).ToTask();
  11.  
  12. accStream
  13.     .ObserveOn(Scheduler.TaskPool)
  14.     .ForEach(item =>
  15.         Console.WriteLine(item.Result));

 

line 1-6 are generating a mock of store observable by using the Generate factory, you can completely ignore this part.

at line 9 we define a window of 5 second.

line 10 define the aggregation and export the aggregated value into a Task (TPL).

it is part of the dark art, otherwise we will end up with blocking and contentions.

the last part of the dark art is that you should process the result within the subscribe in parallel (line 13).

you can find different suggestion of how to complete such task in this thread.

Summary

both Scan and Aggregate are a very useful operators,

but you should be careful while using it within a Window. 

Shout it

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*

one comment

  1. infigmapema2013/11/18 ב 21:44

    I come here from Google, it is impressive of your post.

    Reply