Rx – for beginners (part 13): Publish (broadcast to many subscribers)

2010/03/31

Rx – for beginners (part 13): Publish (broadcast to many subscribers)

IObservable,IObserver,Observable,Rx this post is the 13th in a series of posts about the new Reactive Framework (Rx).

the series TOC can found here.

in this post we will focus on the Publish operator.

 

the code for this post available here.

 

let guess how many times the select statement will be invoke for the following code,

the underline stream will produce 2 value (0, 1)?

Code Snippet
  1. IObservable<long> observableRoot = Observable.Interval(TimeSpan.FromSeconds(1));
  2.  
  3. var producer = from item in observableRoot
  4.                where item < 2
  5.                select item;
  6.  
  7. producer.Subscribe(value => Console.WriteLine("A: " + value));
  8. producer.Subscribe(value => Console.WriteLine("B: " + value));

 

the answer is 4 times, actually the select statement will be invoked for each of the subscription (in this sample we having 2 on lines 7-8),

each time that the producer (observable) will produced value.

 

in order to emphasis the problem I did some minor changes to the Linq query:

Code Snippet
  1. IObservable<long> observableRoot = Observable.Interval(TimeSpan.FromSeconds(1));
  2.  
  3. var producer = observableRoot.Where(item => item < 2);
  4. producer = producer.Select(item =>
  5.                {
  6.                    Console.WriteLine("X");
  7.                    return item;
  8.                });
  9.  
  10. producer.Subscribe(value => Console.WriteLine("A: " + value));
  11. producer.Subscribe(value => Console.WriteLine("B: " + value));

as you can see at line 3, we doing the where part,

and at lines 4-8 we doing the select part + writing X to the console each time that the select occurs.

 

the output for this code will look as follow:

IObservable,IObserver,Observable,Rx

 

So we do have a problem!

in lot of cases we would like to broadcast the same value for any of the registered subscriber without

recalculating different value for each of the subscription.

 

How can we do it using the Rx framework?

the solution is to separate the subscriber from the underline producer stream?

 

the separation is done in 2 steps:

step one, isolating from the underline stream, it is done by using the Publish operator.

the publish operator return non active IConnectableObservable which mean that

subscribers can subscribe to this producer isolation, but the producer isolation

does not yet listening to the underline stream.

step two, connecting to the underline stream by using Connect operator.

 

the code will look as follow:

Code Snippet
  1. IObservable<long> observableRoot = Observable.Interval(TimeSpan.FromSeconds(1));
  2.  
  3. var producer = observableRoot.Where(item => item < 2);
  4. producer = producer.Select(item =>
  5.                {
  6.                    Console.WriteLine("X");
  7.                    return item;
  8.                });
  9.  
  10. IConnectableObservable<long> producerAbstraction = producer.Publish();
  11.  
  12. producerAbstraction.Subscribe(value => Console.WriteLine("A: " + value));
  13. producerAbstraction.Subscribe(value => Console.WriteLine("B: " + value));
  14.  
  15. producerAbstraction.Connect();

nothing changed till line 10.

line 10, creation of IConnectableObservable abstraction (producerAbstraction) (which is not active yet).

lines 12-13, is similar to the previous subscription, but now the subscription is for the abstraction layer.

of producerAbstraction (instead of subscribing to the underline stream).

line 15, connect the producerAbstraction to the underline producer stream (only at this point values will start

flowing into the subscribers callbacks).

 

the output will be:

IObservable,IObserver,Observable,Rx

as you can see, no duplicate select occurs.

 

Summary

you should be aware of the subscription behavior, and apply the right pattern that

match your needs.

 

the code for this post available here.

 

Shout it


Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>