Rx - for beginners (part 13): Publish (broadcast to many subscribers)
Rx - for beginners (part 13): Publish (broadcast to many subscribers)
this post is the 13th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Publish operator.
the code for this post available here.
let guess how many times the select statement will be invoke for the following code,
the underline stream will produce 2 value (0, 1)?
Code Snippet
- IObservable<long> observableRoot = Observable.Interval(TimeSpan.FromSeconds(1));
-
- var producer = from item in observableRoot
- where item < 2
- select item;
-
- producer.Subscribe(value => Console.WriteLine("A: " + value));
- producer.Subscribe(value => Console.WriteLine("B: " + value));
the answer is 4 times, actually the select statement will be invoked for each of the subscription (in this sample we having 2 on lines 7-8),
each time that the producer (observable) will produced value.
in order to emphasis the problem I did some minor changes to the Linq query:
Code Snippet
- IObservable<long> observableRoot = Observable.Interval(TimeSpan.FromSeconds(1));
-
- var producer = observableRoot.Where(item => item < 2);
- producer = producer.Select(item =>
- {
- Console.WriteLine("X");
- return item;
- });
-
- producer.Subscribe(value => Console.WriteLine("A: " + value));
- producer.Subscribe(value => Console.WriteLine("B: " + value));
as you can see at line 3, we doing the where part,
and at lines 4-8 we doing the select part + writing X to the console each time that the select occurs.
the output for this code will look as follow:
So we do have a problem!
in lot of cases we would like to broadcast the same value for any of the registered subscriber without
recalculating different value for each of the subscription.
How can we do it using the Rx framework?
the solution is to separate the subscriber from the underline producer stream?
the separation is done in 2 steps:
step one, isolating from the underline stream, it is done by using the Publish operator.
the publish operator return non active IConnectableObservable which mean that
subscribers can subscribe to this producer isolation, but the producer isolation
does not yet listening to the underline stream.
step two, connecting to the underline stream by using Connect operator.
the code will look as follow:
Code Snippet
- IObservable<long> observableRoot = Observable.Interval(TimeSpan.FromSeconds(1));
-
- var producer = observableRoot.Where(item => item < 2);
- producer = producer.Select(item =>
- {
- Console.WriteLine("X");
- return item;
- });
-
- IConnectableObservable<long> producerAbstraction = producer.Publish();
-
- producerAbstraction.Subscribe(value => Console.WriteLine("A: " + value));
- producerAbstraction.Subscribe(value => Console.WriteLine("B: " + value));
-
- producerAbstraction.Connect();
nothing changed till line 10.
line 10, creation of IConnectableObservable abstraction (producerAbstraction) (which is not active yet).
lines 12-13, is similar to the previous subscription, but now the subscription is for the abstraction layer.
of producerAbstraction (instead of subscribing to the underline stream).
line 15, connect the producerAbstraction to the underline producer stream (only at this point values will start
flowing into the subscribers callbacks).
the output will be:
as you can see, no duplicate select occurs.
Summary
you should be aware of the subscription behavior, and apply the right pattern that
match your needs.
the code for this post available here.