Rx - for beginners (part 14): time based buffering
Rx - for beginners (part 14): time based buffering
this post is the 14th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the BufferWithTime operator.
the code for this post available here.
What does BufferWithTime operator do?
the buffer with time operator is buffering values that occurs within specific
time windows, and then publish the buffered values whenever the time period ends.
Marble diagram
the marble diagram for the BufferWithTime operator:

as we can see the BufferWithTime operator slice the origin stream into time period sections
and buffer the produced value occurs during each period.
when the period elapse it publish the buffer.
Code Sample
the following code demonstrating 1 second buffering for values that produced at rate of 300 milliseconds.
Code Snippet
- IObservable<long> producer = Observable.Interval(TimeSpan.FromMilliseconds(300));
- IObservable<IList<long>> observableBuffer = producer.BufferWithTime(TimeSpan.FromSeconds(1));
-
- producer.Subscribe(value => Console.WriteLine(value));
- observableBuffer.Subscribe(values => Console.WriteLine("SUM: {0}",values.Sum()));
line 1, create observable producer, that produce value each 300 milliseconds.
line 2, buffer the producer under 1 second time window.
line 4, subscribe to the origin producer, and project it values.
line 5, subscribe to the buffered observable and project the sum of the values that was buffered during the 1 second period.
the output of this code will look as follow:
Summary
the BufferWithTime operator may be very useful in scenarios
when we want to aggregate or manipulate values under specific time period,
or do batch operation at wider frequency.
just think of scenario when you having high frequency rate producer (let say 1 event every 1 millisecond)
and you should persist the event's data into database.
it may be wiser to use batch update or bulk insert every
slightly longer period of time.
the code for this post available here.
Point of interest (BufferWithCount)
BufferWithTime operator has close relative operator named BufferWithCount,
and as you may already guess, it buffer the result by counting instead of time period.