Rx and Time related operators

2012/02/14

Rx and Time related operators

this post will focus on Rx‘s time related operators.

Rx, Reactive, IObservable, IObserver, Delay,  Timeout, Timestamp, TimeInterval,  Generate, Interval, Timer

Rx has lot to do with the time notion.

actually Rx is kind of a time machine where datum does schedule for processing at specific time
(I will talk about scheduling in future post).

within the System.Reactive and System.Reactive.Linq namespaces we can find the following extension method and services.

Window, Buffer and Sample which was spoken here, here and here. all operators has a time based overloads.

Replay and Scheduler which I will discuss on future posts.

DelayTimeout, Timestamp, TimeIntervalGenerate, Interval and Timer which I will present soon.

except of the above API you can find the time notion as overload of many other Rx API (either explicitly or implicitly though the scheduler).

Interval and Timer

Rx come with lot of factories which can construct different IObservable<T> some of them produce a time-based stream.

the Interval factory construct a stream that produces a sequential value (long) after each period.

the following snippet will produce a sequential value each second:

Code Snippet
  1. IObservable<long> source =
  2.     Observable.Interval(TimeSpan.FromSeconds(1))

the Timer factory is more fine tuning version of the Interval factory, which can produce single value a due time or sequence of value which will start to produce sequential values per period that will start after a due time.

the following snippet produce a single value after 1 minutes due time.

Code Snippet
  1. IObservable<long> singleDueTime =
  2.     Observable.Timer(TimeSpan.FromMinutes(1));
  3. singleDueTime.Subscribe(item => Console.WriteLine("Elapsed once"));

the following snippet will produce a sequential value stream per second after due time of 1 minutes.

Code Snippet
  1. IObservable<long> singleDueTime =
  2.     Observable.Timer(
  3.         TimeSpan.FromMinutes(1),
  4.         TimeSpan.FromSeconds(1));
  5. singleDueTime.Subscribe(item => Console.WriteLine("Elapsed"));

Generate

more structured factory can be define by the Generate factory.

the following snippet will produce a formatted string of a sequential numeric values in interval that get more slower and slower.

Code Snippet
  1. var xs = Observable.Generate(
  2.     0, // initial value
  3.     i => i < 10, // condition
  4.     i => i + 1, // iterate
  5.     i => string.Format("** {0} **", i), // selector
  6.     i => TimeSpan.FromSeconds(i)); // time selector

Timestamped

Records the timestamp for each value in an observable sequence.

this is a very common task which we no longer have to reinvent the wheel for.

the operator wrap the original value with a Timestamped<T>.

the following snippet stamp each of the value which produce by the Interval operator.

Code Snippet
  1. IObservable<Timestamped<long>> source =
  2.     Observable.Interval(TimeSpan.FromSeconds(1))
  3.     .Timestamp();
  4. source.Subscribe(item =>
  5.     Console.WriteLine("Time = {0}, Value = {1}",
  6.         item.Timestamp, item.Value));

TimeInterval

the TimeInterval operator records the time interval between consecutive values in an observable sequence.

the following snippet is measuring the duration between the produced values.

Code Snippet
  1. var xs = Observable.Generate(
  2.     0, // initial value
  3.     i => i < 10, // condition
  4.     i => i + 1, // iterate
  5.     i => string.Format("** {0} **", i), // selector
  6.     i => TimeSpan.FromSeconds(i)); // time selector
  7.  
  8. xs.TimeInterval()
  9.     .Subscribe(item => Console.WriteLine("Time = {0}, Value = {1}",
  10.         item.Interval, item.Value));

Timeout

the Timeout operation either propagate the original value or produce an OnError with TimeoutException in case that the duration since the last value was overdue.

the following snippet will produce values as long as the duration between value won’t pass the 4 second threshold.

Code Snippet
  1. var xs = Observable.Generate(
  2.     0, // initial value
  3.     i => i < 10, // condition
  4.     i => i + 1, // iterate
  5.     i => string.Format("** {0} **", i), // selector
  6.     i => TimeSpan.FromSeconds(i)); // time selector
  7.  
  8. xs.Timeout(TimeSpan.FromSeconds(4))
  9.     .Subscribe(item => Console.WriteLine(item),
  10.     ex => Console.WriteLine(ex));

Delay

the Delay operator does create a delayed datum stream which will delay the projection of the datum for specific duration.

image

the following snippet will delay the original stream before the value will reach the subscriber.
the value will be stamp twice, before and after the delay.

Code Snippet
  1. var source =
  2.     Observable.Interval(TimeSpan.FromSeconds(1))
  3.     .Timestamp();
  4. var delayed =
  5.     source.Delay(TimeSpan.FromSeconds(1))
  6.         .Timestamp();
  7.  
  8. delayed.Subscribe(item => Console.WriteLine(
  9.     "Datum {0}, Create on {1:ss} and propogate on {2:ss}",
  10.     item.Value.Value, item.Value.Timestamp, item.Timestamp));

Summary

as I said Rx is kind of a time machine, there is other time related operator which I didn’t cover in this post.

one of the core concept of Rx is the scheduler which is capable for a very interesting time manipulation, but this is a topic for future post.

what I do want to mention on the scheduler issue is that many of the Rx operation is having an overloads which accept scheduler and it is highly recommended to use those overload when you do use a scheduler.

kick it on DotNetKicks.com Shout it

 

Technorati Tags: ,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,

Windows Live Tags: operators,Reactive,Linq,Buffer,Sample,Replay,Scheduler,Delay,Timeout,Timestamp,TimeInterval,Generate,Interval,Timer,factories,IObservable,factory,snippet,Code,Observable,TimeSpan,FromSeconds,version,sequence,FromMinutes,Subscribe,item,Console,WriteLine,Format,selector,Records,task,operator,Value,duration,OnError,TimeoutException,threshold,projection,Datum,Create,Summary,concept,topic,singleDueTime
WordPress Tags: operators,Reactive,Linq,Buffer,Sample,Replay,Scheduler,Delay,Timeout,Timestamp,TimeInterval,Generate,Interval,Timer,factories,IObservable,factory,snippet,Code,Observable,TimeSpan,FromSeconds,version,sequence,FromMinutes,Subscribe,item,Console,WriteLine,Format,selector,Records,task,operator,Value,duration,OnError,TimeoutException,threshold,projection,Datum,Create,Summary,concept,topic,singleDueTime
Blogger Labels: operators,Reactive,Linq,Buffer,Sample,Replay,Scheduler,Delay,Timeout,Timestamp,TimeInterval,Generate,Interval,Timer,factories,IObservable,factory,snippet,Code,Observable,TimeSpan,FromSeconds,version,sequence,FromMinutes,Subscribe,item,Console,WriteLine,Format,selector,Records,task,operator,Value,duration,OnError,TimeoutException,threshold,projection,Datum,Create,Summary,concept,topic,singleDueTime

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>