Rx – DistinctUntilChanged


Rx – DistinctUntilChanged

this post will focus on the simple yet very useful DistinctUntilChanged operator.

Rx, IObservable, IObserver, Buffer, Parallel, Concurrency, DistinctUntilChanged

sometimes a datum stream may produce the same value for a while, you can see it in stock exchange stream the value of specific stock may be steady for a while.

the observer can reduce its computation level by ignoring a repeatable value (sequential repeatable value, for none sequential you can use the Distinct operator).

the DistinctUntilChanged is having the following overloads:

Code Snippet
  1. IObservable<TSource> DistinctUntilChanged<TSource>();
  2. IObservable<TSource> DistinctUntilChanged<TSource, TKey>(
  3.     Func<TSource, TKey> keySelector);
  4. IObservable<TSource> DistinctUntilChanged<TSource>(
  5.     IEqualityComparer<TSource> comparer);
  6. IObservable<TSource> DistinctUntilChanged<TSource, TKey>(
  7.     Func<TSource, TKey> keySelector,
  8.     IEqualityComparer<TKey> comparer);

I was hiding the fist parameter of the extension method which is: this IObservable<TSource> source.

you can use the first overload when you dealing with a simple datum stream (of primitive or simple value type),
but real-life datum stream are often more complex.
for complex scenario you may prefer one of the other 3 overloads where you can define the comparison.

stock exchange scenario may be looking something like the following snippet:

Code Snippet
  1. public IObservable<Stock> AlertStream(IObservable<Stock> provider)
  2. {
  3.     return from oldAndCurrent in provider
  4.                .DistinctUntilChanged(stock => stock.Price)
  5.                .Buffer(2,1)
  6.            let old = oldAndCurrent[0]
  7.            let current = oldAndCurrent[1]
  8.            where old.Price < current.Price &&
  9.                  old.Volume + FACTOR < current.Volume
  10.            select current;
  11. }

the sample is using a sliding buffer of 2,
you can read more about the Buffer operator in here.

the AlertStream method is getting a stock stream.

it filter steady price out by using the DistinctUntilChanged operator with a lambda that indicate the Price property as the comparison property.

after that it buffer the current value with the previous value (without the distinct the buffer will not ignore steady couples) .

last thing the snippet is doing before it returns a filtered stream is to filter out old / current couples that does not match a criteria.


as simple as the DistinctUntilChanged is, it is one of useful operator which you should not ignore.

Shout it

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.


You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>