DCSIMG
Rx - for beginners (part 14): time based buffering - Bnaya Eshet

Bnaya Eshet

Disclaimer

Rx - for beginners (part 14): time based buffering

Rx - for beginners (part 14): time based buffering

IObservable,IObserver,Rx,Observable

this post is the 14th in a series of posts about the new Reactive Framework (Rx).

the series TOC can found here.

in this post we will focus on the BufferWithTime operator.

 

the code for this post available here.

 

What does BufferWithTime operator do?

the buffer with time operator is buffering values that occurs within specific

time windows, and then publish the buffered values whenever the time period ends.

 

Marble diagram

the marble diagram for the BufferWithTime operator:

IObservable,IObserver,Rx,Observable

as we can see the BufferWithTime operator slice the origin stream into time period sections

and buffer the produced value occurs during each period.

when the period elapse it publish the buffer.

 

Code Sample

the following code demonstrating 1 second buffering for values that produced at rate of 300 milliseconds.

Code Snippet
  1.  IObservable<long> producer = Observable.Interval(TimeSpan.FromMilliseconds(300));
  2.  IObservable<IList<long>> observableBuffer = producer.BufferWithTime(TimeSpan.FromSeconds(1));
  3.  
  4.  producer.Subscribe(value => Console.WriteLine(value));
  5.  observableBuffer.Subscribe(values => Console.WriteLine("SUM: {0}",values.Sum()));

line 1, create observable producer, that produce value each 300 milliseconds.

line 2, buffer the producer under 1 second time window.

line 4, subscribe to the origin producer, and project it values.

line 5, subscribe to the buffered observable and project the sum of the values that was buffered during the 1 second period.

 

the output of this code will look as follow:

IObservable,IObserver,Rx,Observable

 

Summary

the BufferWithTime operator may be very useful in scenarios

when we want to aggregate or manipulate values under specific time period,

or do batch operation at wider frequency.

 

just think of scenario when you having high frequency rate producer (let say 1 event every 1 millisecond)

and you should persist the event's data into database.

it may be wiser to use batch update or bulk insert every

slightly longer period of time.

 

the code for this post available here.

Point of interest (BufferWithCount)

BufferWithTime operator has close relative operator named BufferWithCount,

and as you may already guess, it buffer the result by counting instead of time period.

 

kick it on DotNetKicks.com Shout it


Comments

No Comments