Rx – Exception Handling

2012/01/22

Rx – Exception Handling

this post will discuss exception handling within the Rx arena.

Rx, observable, observer, linq, exception handling, try, catch, finally, retry

handling event stream exception is not trivial,
for example observable should delegate exception to its subscribers though the OnError operation and cancel the subscription.
on the other hand the subscriber may want to response OnError state by renewing its subscription or fallback to alternative stream.

it is true that the Rx design guidelines suggest that faulted stream should not continue to produce data,
but real-world implementation such as stuck exchange stream (or other hot stream) may ignore this recommendation.

if you design such stream you can consider having a fault info, wrapped within the OnNext message (as a data property) instead of sending OnError state and leaving the OnError state for fatal fault which the stream cannot be recover from.

So how can you handle fault state?

Rx is having a few operator that response to OnError.

the first one is Retry which re-subscribe (forever or for specific number of failures).

for the Demonstration I will use the following observable (which produce OnError after the second OnNext):

Code Snippet
  1. var observable = Observable.Create<int>(
  2.     obs =>
  3.     {
  4.         obs.OnNext(1);
  5.         obs.OnNext(2);
  6.         obs.OnError(new SystemException());
  7.         return Disposable.Empty;
  8.     });

the following code re-subscribe 3 times before it do surrender to the evil exception.

Code Snippet
  1. observable
  2.     .Retry(3)
  3.     .Subscribe(
  4.         item => Console.WriteLine(item),
  5.         (ex) => Console.WriteLine(ex.Message),
  6.         () => Console.WriteLine("Complete"));

this scenario may be suitable for observable which download data from the network and response with an error when the network is not available (consider unreliable network).

the output will look like the following snapshot:

Rx, observable, observer, linq, exception handling, try, catch, finally, retry

sometimes it is not enough to re-subscribe and you have to define an alternative fallback stream.

consider stock exchange scenario, when ever specific stock provider has fail to supply the data you may want to switch and subscribe to different provider.

you can do so using the Catch operator:

just like try catch you can specify specific or have generic fallback strategy.

having the following fallback streams:

Code Snippet
  1. var fallback1 = Observable.Create<int>(
  2.     obs =>
  3.     {
  4.         for (int i = 0; i < 10; i++)
  5.             obs.OnNext(i);
  6.         return Disposable.Empty;
  7.     });
  8. var fallback2 = Observable.Create<int>(
  9.     obs =>
  10.     {
  11.         for (int i = 20; i < 23; i++)
  12.             obs.OnNext(i);
  13.         return Disposable.Empty;
  14.     });
  15. var fallback3 = Observable.Create<int>(
  16.     obs =>
  17.     {
  18.         for (int i = 30; i < 33; i++)
  19.             obs.OnNext(i);
  20.         return Disposable.Empty;
  21.     });

you can map the fallback using the following Rx code:

Code Snippet
  1. observable
  2.     .Catch((NullReferenceException ex) => fallback1)
  3.     .Catch((SystemException ex) => fallback2)
  4.     .Catch(fallback3)
  5.     .Subscribe(
  6.         item => Console.WriteLine(item),
  7.         (ex) => Console.WriteLine(ex.Message),
  8.         () => Console.WriteLine("Complete"));

lines 2-4 are mapping different fallbacks for different exceptions.

it will generate the following output:

Rx, observable, observer, linq, exception handling, try, catch, finally, retry

SystemException has thrown, therefore the fallback stream is starting at 20.

finally we can discuss the 3rd option.

sometimes you do not care whether the stream has stopped because it has complete or was faulted, all you really care about is to clear some resources.
in this case you can use the finally operator which will be trigger in both scenario, completed normally or in faulted state.

the following code demonstrate this API:

Code Snippet
  1. observable
  2.     .Finally(() => {/* clear some resources */})
  3.     .Subscribe(
  4.         item => Console.WriteLine(item),
  5.         (ex) => Console.WriteLine(ex.Message),
  6.         () => Console.WriteLine("Complete"));

Summary

Rx has some very useful operator which response to the OnError state, you can re-subscribe, switch into fallback stream of just handle the finalization state.

Shout it kick it on DotNetKicks.com

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>