Rx and Time related operators
Rx and Time related operators
this post will focus on Rx's time related operators.

Rx has lot to do with the time notion.
actually Rx is kind of a time machine where datum does schedule for processing at specific time
(I will talk about scheduling in future post).
within the System.Reactive and System.Reactive.Linq namespaces we can find the following extension method and services.
Window, Buffer and Sample which was spoken here, here and here. all operators has a time based overloads.
Replay and Scheduler which I will discuss on future posts.
Delay, Timeout, Timestamp, TimeInterval, Generate, Interval and Timer which I will present soon.
except of the above API you can find the time notion as overload of many other Rx API (either explicitly or implicitly though the scheduler).
Interval and Timer
Rx come with lot of factories which can construct different IObservable<T> some of them produce a time-based stream.
the Interval factory construct a stream that produces a sequential value (long) after each period.
the following snippet will produce a sequential value each second:
Code Snippet
- IObservable<long> source =
- Observable.Interval(TimeSpan.FromSeconds(1))
the Timer factory is more fine tuning version of the Interval factory, which can produce single value a due time or sequence of value which will start to produce sequential values per period that will start after a due time.
the following snippet produce a single value after 1 minutes due time.
Code Snippet
- IObservable<long> singleDueTime =
- Observable.Timer(TimeSpan.FromMinutes(1));
- singleDueTime.Subscribe(item => Console.WriteLine("Elapsed once"));
the following snippet will produce a sequential value stream per second after due time of 1 minutes.
Code Snippet
- IObservable<long> singleDueTime =
- Observable.Timer(
- TimeSpan.FromMinutes(1),
- TimeSpan.FromSeconds(1));
- singleDueTime.Subscribe(item => Console.WriteLine("Elapsed"));
Generate
more structured factory can be define by the Generate factory.
the following snippet will produce a formatted string of a sequential numeric values in interval that get more slower and slower.
Code Snippet
- var xs = Observable.Generate(
- 0, // initial value
- i => i < 10, // condition
- i => i + 1, // iterate
- i => string.Format("** {0} **", i), // selector
- i => TimeSpan.FromSeconds(i)); // time selector
Timestamped
Records the timestamp for each value in an observable sequence.
this is a very common task which we no longer have to reinvent the wheel for.
the operator wrap the original value with a Timestamped<T>.
the following snippet stamp each of the value which produce by the Interval operator.
Code Snippet
- IObservable<Timestamped<long>> source =
- Observable.Interval(TimeSpan.FromSeconds(1))
- .Timestamp();
- source.Subscribe(item =>
- Console.WriteLine("Time = {0}, Value = {1}",
- item.Timestamp, item.Value));
TimeInterval
the TimeInterval operator records the time interval between consecutive values in an observable sequence.
the following snippet is measuring the duration between the produced values.
Code Snippet
- var xs = Observable.Generate(
- 0, // initial value
- i => i < 10, // condition
- i => i + 1, // iterate
- i => string.Format("** {0} **", i), // selector
- i => TimeSpan.FromSeconds(i)); // time selector
-
- xs.TimeInterval()
- .Subscribe(item => Console.WriteLine("Time = {0}, Value = {1}",
- item.Interval, item.Value));
Timeout
the Timeout operation either propagate the original value or produce an OnError with TimeoutException in case that the duration since the last value was overdue.
the following snippet will produce values as long as the duration between value won't pass the 4 second threshold.
Code Snippet
- var xs = Observable.Generate(
- 0, // initial value
- i => i < 10, // condition
- i => i + 1, // iterate
- i => string.Format("** {0} **", i), // selector
- i => TimeSpan.FromSeconds(i)); // time selector
-
- xs.Timeout(TimeSpan.FromSeconds(4))
- .Subscribe(item => Console.WriteLine(item),
- ex => Console.WriteLine(ex));
Delay
the Delay operator does create a delayed datum stream which will delay the projection of the datum for specific duration.

the following snippet will delay the original stream before the value will reach the subscriber.
the value will be stamp twice, before and after the delay.
Code Snippet
- var source =
- Observable.Interval(TimeSpan.FromSeconds(1))
- .Timestamp();
- var delayed =
- source.Delay(TimeSpan.FromSeconds(1))
- .Timestamp();
-
- delayed.Subscribe(item => Console.WriteLine(
- "Datum {0}, Create on {1:ss} and propogate on {2:ss}",
- item.Value.Value, item.Value.Timestamp, item.Timestamp));
Summary
as I said Rx is kind of a time machine, there is other time related operator which I didn't cover in this post.
one of the core concept of Rx is the scheduler which is capable for a very interesting time manipulation, but this is a topic for future post.
what I do want to mention on the scheduler issue is that many of the Rx operation is having an overloads which accept scheduler and it is highly recommended to use those overload when you do use a scheduler.
Technorati Tags: operators,Reactive,Linq,Buffer,Sample,Replay,Scheduler,Delay,Timeout,Timestamp,TimeInterval,Generate,Interval,Timer,factories,IObservable,factory,snippet,Code,Observable,TimeSpan,FromSeconds,version,sequence,FromMinutes,Subscribe,item,Console,WriteLine,Format,selector,Records,task,operator,Value,duration,OnError,TimeoutException,threshold,projection,Datum,Create,Summary,concept,topic,singleDueTime
Windows Live Tags:
operators,
Reactive,
Linq,
Buffer,
Sample,
Replay,
Scheduler,
Delay,
Timeout,
Timestamp,
TimeInterval,
Generate,
Interval,
Timer,
factories,
IObservable,
factory,
snippet,
Code,
Observable,
TimeSpan,
FromSeconds,
version,
sequence,
FromMinutes,
Subscribe,
item,
Console,
WriteLine,
Format,
selector,
Records,
task,
operator,
Value,
duration,
OnError,
TimeoutException,
threshold,
projection,
Datum,
Create,
Summary,
concept,
topic,
singleDueTime WordPress Tags:
operators,
Reactive,
Linq,
Buffer,
Sample,
Replay,
Scheduler,
Delay,
Timeout,
Timestamp,
TimeInterval,
Generate,
Interval,
Timer,
factories,
IObservable,
factory,
snippet,
Code,
Observable,
TimeSpan,
FromSeconds,
version,
sequence,
FromMinutes,
Subscribe,
item,
Console,
WriteLine,
Format,
selector,
Records,
task,
operator,
Value,
duration,
OnError,
TimeoutException,
threshold,
projection,
Datum,
Create,
Summary,
concept,
topic,
singleDueTime Blogger Labels:
operators,
Reactive,
Linq,
Buffer,
Sample,
Replay,
Scheduler,
Delay,
Timeout,
Timestamp,
TimeInterval,
Generate,
Interval,
Timer,
factories,
IObservable,
factory,
snippet,
Code,
Observable,
TimeSpan,
FromSeconds,
version,
sequence,
FromMinutes,
Subscribe,
item,
Console,
WriteLine,
Format,
selector,
Records,
task,
operator,
Value,
duration,
OnError,
TimeoutException,
threshold,
projection,
Datum,
Create,
Summary,
concept,
topic,
singleDueTime