this post discuss the meaning of awaiting RX stream.
many languages including .NET embrace the concept of async / await.
async / await on .NET is usually refer to Task, but Task in not the only
await-able type in .net, in matter of fact there is a few type
which can be awaited, one of those type is IObservable.
what happens when you’re awaiting IObservable?
When awaiting IObservable the await schedule on completion and
resume with the last value.
the value of I will be 9.
in the following example:
will resume after 3 seconds with value = 2.
the previous sample may seem quite odd because after all why should you care about the last value?
it happens that RX APIs design to be fluent therefore all (not deprecated) APIs return IObservable.
for example Aggregate, Sum, Min, Max, etc. all returning IObservable which make it uneasy
when you want to fetch the result. awaiting the last (and only) value of aggregation may be very convenient.
see the following snippet
what’s can go wrong?
when awaiting IObservable stream, the await is expecting value (the last stream’s element)
but what happens when you’re awaiting empty stream?
the following snippet will throw InvalidOperationException:
so what can you do in order to prevent IObservable from being empty?
it may be best practice to put the ‘DefaultIfEmpty’ operator when awaiting stream that may be null
(be aware that default value for value type is not null).
another pitfall you may encounter is the opposite direction when the stream never ends.
on this case you will never pass the awaiting line:
Subscribe and await
subscribing RX’s stream (IObservable) return IDisposable which cannot be await.
this put many developer in confusion about awaiting stream.
you may try the following snippet (which is not recommended)
the problem with the above snippet is that the stream may be a cold
(which is the case with Observable.Interval), cold stream produce data per subscriber and
on the code above there is 2 different subscribers at lines 4 and 5.
await cause second subscription which make the above pattern quite dangerous.
how can you modify the pattern?
one of the less spoken operator of RX is the ‘Do’ operator. ‘Do’ produce side effects (just like Subscribe)
but unlike Subscribe it’s part of the pipeline and don’t make any real subscription.
therefore you can replace the above pattern with the following one:
now you’re having single subscription (triggered by the await) and you can still
track the messages.
async / await is nicely adopt by Rx but you have to pay attention
in order to avoid pitfalls