Rx – Join

2012/04/04

Rx – Join

this post is an advance one so you haven’t yet master the Rx basic practice you might want to start in here.

the post will focus on the Join and GroupJoin Rx, Reactive Extension, Join, Iobservable, IObserver, Linqoperators and we will try to get a solid understanding of what does it really means to join multiple data streams.

the Rx notion of join is quit different than the join we has used to know in SQL queries.

while the SQL query’s join refer to data equality (relationship and foreign keys), the Rx’s join is all about coexisting.

actually Rx is much closer to the real life notion of join.
when you meeting someone (online or in the real world) its actually mean that both of you are doing the activity at the same time.

so speaking of Rx Join, you must do that conceptual switching from data into coexistence.

because Rx’s join is all about coexistence the actual data type doesn’t matter any more.
you can declare a join between unrelated data type, because the data type has nothing to do with the Rx’s concept of join.

for example:

assuming you have an algorithm which can define a persons mood based on it tweets an other social media text. you can join this moods stream with a weather stream and get a result stream of correlations’ between weather and people’s moods.

Rx, Reactive Extension, Join, Iobservable, IObserver, Linq

the weather time window duration (period) is from the time it was pushed until the next weather is pushed. the mood period is a point in time (period = zero).

whenever a mood coexist within a weather period the result stream projected a combined weather/mood datum.

the code snippet for scenario is something like the following code.

Code Snippet
  1. IObservable<Weather> ws = WeatherStream;
  2. IObservable<Mood> ms = MoodStream;
  3.  
  4. IObservable<Tuple<Weather,Mood>> result =
  5.     ws.Join(
  6.             ms, // the join with stream
  7.             w => ws, // closing weather period
  8.             m => Observable.Empty<Unit>(), // closing mood
  9.             (w /* Weather */, m /* Mood */) => Tuple.Create(w, m));

lets look at the join syntax.

at line 5 we use the Join extension method on the weather stream.

the first parameter (line 6) is the mood stream which we joined with.

now we have to define when the time period window of the weather datum will end.
and this is what the second parameter (at line 7) is all about.
it is a Func that get a weather as parameter and return a stream which will close the weather period upon it next projection (OnNext or OnComplete).

because the weather stream is a hot stream the code above will close the weather period whenever a new weather will be project.
it is a one time event end it will work equally if the parameter will be w => ws.Take(1).

the 3rd parameter dictate the mood datum period,
in our case the mood is a point event which has no period, therefore the parameter return Empty stream (which is a stream that immediately complete).

the last parameter is the select operator which define
the projected data type, in our case we return a Tuple of a weather and a mood.

now we can take a breath.

Rx, Reactive Extension, Join, Iobservable, IObserver, Linq

the API is a bit complex, mainly because of the closing stream‘s parameters, but it is very powerful.

actually the closing stream concept is not unique to the Join operator, there are many other operator like Buffer, Window and GroupByUntil, which is having an overloads that get a closing stream.

the closing stream concept enable a join window notion that are more complex than a simple TimeSpan variable.

for example the closing stream can react to temperature, tweets, sound volume or any other event stream.

Group Join

actually join has another flavor which is the GroupJoin operator. it is very similar to the join but it has a different projection.

GroupJoin is grouping the secondary stream intersection as an IObservable combined with the primary stream datum.

for example:

Code Snippet
  1. IObservable<Tuple<Weather, IObservable<Mood>>> result =
  2.     ws.GroupJoin(
  3.         ms, // the join with stream                       
  4.         // a stream which close the weather period
  5.         value => ws,
  6.         // a stream which close the mood period (immediate closeing)
  7.         value => Observable.Empty<Unit>(),  // A is a point event which will catch within the main window (of A)
  8.         // projection of a result
  9.         (w, observableOfMoods) =>Tuple.Create(w, observableOfMoods));

is is all similar to the Join operator until we goes into the projection at line 9.

it is a projection of the weather value and IObservable<Mood>, which mean that any weather datum will immediately project (as a new group) and mood datum will constantly be pushed into it’s related IObservable as long as the weather life-time doesn’t disposed by the closing stream.

the marble diagram of it would seem like the following diagram:

Rx, Reactive Extension, Join, Iobservable, IObserver, Linq

the GroupJoin operator can also be written in a pure LINQ syntax, it is goes like the following snippet:

Code Snippet
  1. IObservable<Tuple<Weather, IObservable<Mood>>> result =
  2.     from w in ws
  3.     join m in ms                  
  4.         on ws // a stream which close the weather period
  5.         // a stream which close the mood period (immediate closeing)
  6.         equals Observable.Empty<Unit>()
  7.         into g // the IObservable<Mood> which related to the current weather
  8.     select Tuple.Create(w, g);

you can find the closing stream of the weather at line 4
and the moods closing stream at line 6.

Summary

the Join concept is a very powerful one but you have to do a mental switching from data correlation into coexistence correlation.

it is really worth the learning curve effort because it is bringing a hole new capabilities to the table which is not trivial at all.

kick it on DotNetKicks.com

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>