Rx – Buffer

2011/12/22

Rx – Buffer

this post is on of a series of post about Rx (Reactive Extension). in this one I will discuss the Buffer operator.

no doubt that one of the most useful Rx operator is the Buffer.

Buffer,Rx,Reactive,IObservable,IObserver

the Buffer operator enable to reduce a throughput pressure and gain better utilization of our resources.

let take a scenario of monitoring data stream and persist the datum into database (or send it through a network boundaries).

assuming the datum rate is 1 per millisecond, databases does not typically design to work well for round-trips of such frequency,
but if we can buffer a chunk of datum each second (or more) we can save those chunk in much lower frequency (maybe by using bulk insert).

this is how we can gain better utilization of our system.

this is exactly what the Buffer operator does.
it can create chunk of data from an observable either by time or by count (or even by the combination of both).

it present those chunk as observable of IList<T> which mean that if we are dealing with high frequency Observable<T> we can transform it to less frequently Observable<IList<T>>.

the Buffer operator is very simple to use and very flexible in term of the batch size.

Buffer API

the following example demonstrates reducing pressure of 1 millisecond frequency observable:

Code Snippet
  1. var xs = Observable.Interval(TimeSpan.FromMilliseconds(1));
  2. var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(1));
  3. bufferdStream.Subscribe(item => {/* do bulk insert */});

the same can be done with buffering for every n items:

Code Snippet
  1. var xs = Observable.Interval(TimeSpan.FromMilliseconds(1));
  2. var bufferdStream = xs.Buffer(1000);
  3. bufferdStream.Subscribe(item => {/* do bulk insert */});

there is even API for the combination of both, which mean that the buffer will be close, either after n item or elapsed of a duration:

Code Snippet
  1. var xs = Observable.Interval(TimeSpan.FromMilliseconds(1));
  2. var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(1), 1000);
  3. bufferdStream.Subscribe(item => {/* do bulk insert */});

the Buffer operator can be overlapped, which mean that more than one buffer can coexist at a time (datum will be capture by multiple buffers)

Code Snippet
  1. var xs = Observable.Interval(TimeSpan.FromMilliseconds(1));
  2. // buffer window of 1 second will be open every 0.1 second
  3. var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(1),
  4.                             TimeSpan.FromSeconds(0.1));
  5. bufferdStream.Subscribe(item => {/* do bulk insert */});

the marble diagram of it is:

Rx,Buffer,IObservable,IObserver

you can see that each buffer hold 4 datum and the same datum can be include in multiple buffers (IList<T>).

Advance Buffer API

actually you can gain even better control on the opening and closing of a buffering windows.

the buffer window design to accept an observable as the opening trigger of the buffering window and a corresponding observable factory for signaling the window’s closing trigger.

even though this API look a bit odd it is very powerful.

with this API you can activate buffer as response for external situation. let think of a buffering stream which should buffer cars engine performance within separate geographical regions. we can start buffering each time the car GPS indicate a region border and close the buffer window on exiting the region.

real life scenario often need this kind of granularity.

the API for this feature goes like this:

Code Snippet
  1. var regionBorderStream = Observable.Create<Unit>(obs =>
  2.     {
  3.         // read and analize gps data
  4.         return Disposable.Empty;
  5.     });
  6. var carEngineStream = Observable.Interval(TimeSpan.FromMilliseconds(1));
  7. var bufferdStream = carEngineStream.Buffer(regionBorderStream,
  8.     region => regionBorderStream.Where(item =>  item == region));
  9. bufferdStream.Subscribe(item => {/* do bulk insert */});

regionBorderStream (at line 1) represent a stream of region passing notification.

carEngineStream (at line 6) represent a car engine information stream.

and the buffered stream (at line 7) is buffering the engine stream while open a buffer each time it enter a region, which mean whenever the regionBorderStream produce a value. the buffer will be close when we will exit the region.

you may have notice that the above code does support multiple buffering at a time. you may enter the city region and then enter a sub region (for example New York and the Central Park or Beijing and the Forbidden City area).

Summary

Buffering is very powerful scenario.

I will survey other operator on future post and we will discuss the advantage and disadvantage of the Buffer operator in compare with some of the other operators.

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>