Awaiting RX

2015/11/21

one comment

Awaiting RX

this post discuss the meaning of awaiting RX stream.

many languages including .NET embrace the concept of async / await.

async / await on .NET is usually refer to Task, but Task in not the only

await-able type in .net, in matter of fact there is a few type

which can be awaited, one of those type is IObservable.

Rx, IObservable, IObserver, Task, Async, Await, Actor

 

what happens when you’re awaiting IObservable?

When awaiting IObservable the await schedule on completion and

resume with the last value.

for example:

Code Snippet
  1. int i = await Observable.Range(0, 10);

the value of I will be 9.

in the following example:

Code Snippet
  1. var period = TimeSpan.FromSeconds(1);
  2. long i = await Observable.Interval(period)
  3.                         .Take(3);

will resume after 3 seconds with value = 2.

the previous sample may seem quite odd because after all why should you care about the last value?

it happens that RX APIs design to be fluent therefore all (not deprecated) APIs return IObservable.

for example Aggregate, Sum, Min, Max, etc. all returning IObservable which make it uneasy

when you want to fetch the result. awaiting the last (and only) value of aggregation may be very convenient.

see the following snippet

Code Snippet
  1. var period = TimeSpan.FromSeconds(1);
  2. long i = await Observable.Interval(period)
  3.                         .Take(3)
  4.                         .Sum();

 

what’s can go wrong?

when awaiting IObservable stream, the await is expecting value (the last stream’s element)

but what happens when you’re awaiting empty stream?

the following snippet will throw  InvalidOperationException:

Code Snippet
  1. await Observable.Empty<int>();

so what can you do in order to prevent IObservable from being empty?

it may be best practice to put the ‘DefaultIfEmpty’ operator when awaiting stream that may be null

(be aware that default value for value type is not null).

Code Snippet
  1. await Observable.Empty<int>()
  2.                     .DefaultIfEmpty();

another pitfall you may encounter is the opposite direction when the stream never ends.

on this case you will never pass the awaiting line:

Code Snippet
  1. await Observable.Never<int>();

 

Subscribe and await

subscribing RX’s stream (IObservable) return IDisposable which cannot be await.

this put many developer in confusion about awaiting stream.

you may try the following snippet (which is not recommended)

Code Snippet
  1. var period = TimeSpan.FromSeconds(1);
  2. var xs = Observable.Interval(period)
  3.                         .Take(3);
  4. xs.Subscribe(v => Trace.WriteLine(v));
  5. await xs;

the problem with the above snippet is that the stream may be a cold

(which is the case with Observable.Interval), cold stream produce data per subscriber and

on the code above there is 2 different subscribers at lines 4 and 5.

await cause second subscription which make the above pattern quite dangerous.

 

how can you modify the pattern?

one of the less spoken operator of RX is the ‘Do’ operator. ‘Do’ produce side effects (just like Subscribe)

but unlike Subscribe it’s part of the pipeline and don’t make any real subscription.

therefore you can replace the above pattern with the following one:

Code Snippet
  1. var period = TimeSpan.FromSeconds(1);
  2. var xs = Observable.Interval(period)
  3.                         .Take(3)
  4.                         .Do(v => Trace.WriteLine(v));
  5. await xs;

now you’re having single subscription (triggered by the await) and you can still

track the messages.

 

Summary

async / await is nicely adopt by Rx but you have to pay attention

in order to avoid pitfalls

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*

one comment

  1. Pingback: Szumma #018 – 2015 48. hét | d/fuel