Rx – for beginners (part 10): Concat expression

2010/03/16

Rx – for beginners (part 10): Concat expression

Rx, IObservable,IObserver

this post is the 10th in a series of posts about the new Reactive Framework (Rx).

the series TOC can found here.

in this post we will focus on the Concat expression.

 

the code for this post can be download from here.

 
What does it do?

the Concat expression is used to concatenate one Observable stream

into the end of another Observable stream.

once the first Observable stream is completed the the concat stream will be immediate subscribed.

 

the marble diagrams of Concat expression will look as follow:

Rx, IObservable,IObserver

the result observable stream will subscribe to the yStream after the xStream is completed,

therefore any values on the yStream before the subscription will be ignored.

 

in case of exception on any of the Concat Observable streams the marble diagrams will look as follow:

Rx, IObservable,IObserver

Rx, IObservable,IObserver

 

Code sample

the code for this post can be download from here.

 

Helper method for creating interval stream (based on System.Timers.Timer event):

Code Snippet
  1. static System.Timers.Timer s_timer = new System.Timers.Timer(1000);
  2.  
  3. private static IObservable<string> CreateObservable(string prefix,TimeSpan stopAt)
  4. {
  5.     var eventStream = Observable.FromEvent<ElapsedEventArgs>(s_timer,"Elapsed");
  6.  
  7.     var startTime = DateTime.Now;
  8.     var stopTime = startTime + stopAt;
  9.  
  10.     Func<IEvent<ElapsedEventArgs>,bool> stopPredicate =
  11.         value => value.EventArgs.SignalTime < stopTime;
  12.     Func<IEvent<ElapsedEventArgs>,string> selectFormatter =
  13.         value => prefix + (value.EventArgs.SignalTime – startTime).TotalSeconds.ToString("N0");
  14.     
  15.     var obs = eventStream.TakeWhile(stopPredicate).Select(selectFormatter);
  16.  
  17.     return obs;
  18. }

Line 1, create instance of the timer.

Line 3, the method get 2 parameters:

  • prefix: which is used to format the stream output (line 13).
  • stop at: which will define the stream lifetime duration.

line 5, create observable stream out of the timer elapsed event (we can use this technique on any .NET event).

lines 10-11, is the stop predicate which is used by the TakeWhile at line 15.

lines 12-13, is the output formatting which is used by the Select at line 15.

 

the main method is look as follow:

Code Snippet
  1. static void Main(string[] args)
  2. {
  3.     var xs = CreateObservable("X", TimeSpan.FromSeconds(3.5));
  4.     var ys = CreateObservable("  Y", TimeSpan.FromSeconds(10));
  5.  
  6.     xs.Subscribe (value => {/* do nothing */},
  7.         () =>
  8.         {
  9.             Console.ForegroundColor = ConsoleColor.Gray;
  10.             Console.WriteLine("xs Completed");
  11.             Console.ForegroundColor = ConsoleColor.White;
  12.         });
  13.  
  14.     var combineLatestStream = xs.Concat(ys);
  15.  
  16.     Console.ForegroundColor = ConsoleColor.White;
  17.     combineLatestStream.Subscribe(value => Console.WriteLine(value),() => Console.WriteLine("Completed"));
  18.     s_timer.Start();
  19.     Console.ReadKey();
  20. }

line 3, create observable stream with lifetime period of 3.5 seconds.

lime 4, create observable stream with lifetime period of 10 seconds.

lines 6-12, subscribing the the complete event of the xs stream.

line 14, Concat the streams.

line 17, subscribe to to concat output stream.

line 18, start the timer so the streams will start to produce values.

 

the output will look as follow:

Rx, IObservable,IObserver

you may notice that Y4 value is missing.

the reason for it is that the complete event occur at the 4th timer elapse event,

and only then the subscription to the ys stream was taking place so nobody was yet listening

to the ys stream when the 4th timer elapsed.

 

Summary

Concat can join 2 observable stream, but unlike Merge

it doesn’t listen on both streams simultaneously.

 

תגים של Technorati:‏ ,,

 

kick it on DotNetKicks.com


Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*