Rx Challenge #8 Solution

2015/07/29

no comments

this post is part of series, the hole series can be found in here.

this challenge was all about buffering the data on suspension period and flush it out on resume.

the idea is to suspend the observable notification for a while without loosing data.

the following diagram show the required functionality

Marble Diagram
  1. /*************************************************
  2. * source  —1—2—3—4—5—6—7—8—9—10—11-|
  3. * hold    ————-T———–F———–T———|
  4. * result  —1—2—3————-4567–8—9———–10,11-|
  5. *************************************************/

solving this problem require some kind of buffering

and if you follow my blog you must know that when it come to

buffer and dispatch I’m usually integrating TPL Dataflow blocks.

in our case I’m using Buffer Block right in the middle between the source and the result.

the pipe-line of messaging will look like the following

source –> Buffer Block –> result

what I’m playing with is the subscription between the result to the Buffer Block.

the Buffer Block to source subscription is kept untouched while the I’m connecting

and disconnecting the subscription from the Buffer Block to the result.

whenever the Buffer Block detached from the result it keep buffering the messages,

and when it re-attached to the result all buffered messages are pushed to the result

and no data lost.

you can check the code for this extension on the next code snippet.

Code Snippet
  1. public static IObservable<T> HoldWhile<T>(
  2.             this IObservable<T> source,
  3.             IObservable<bool> hold)
  4. {
  5.     var result = Observable.Create<T>(target =>
  6.     {
  7.         var compositeDisposable = new CompositeDisposable();
  8.  
  9.         var block = new BufferBlock<T>();
  10.         IDisposable unlinkFromSource = null;
  11.  
  12.         IDisposable unlinkTarget = block.AsObservable().Subscribe(target); // link the target to the block
  13.  
  14.         IDisposable unlinkHoldEvents = hold.Subscribe(pause =>
  15.         {
  16.             if (pause)
  17.             {
  18.                 if (unlinkTarget != Disposable.Empty)
  19.                 {
  20.                     // unlink the target
  21.                     unlinkTarget.Dispose();
  22.                     unlinkTarget = Disposable.Empty;
  23.                 }
  24.             }
  25.             else
  26.             {
  27.                 // re-link the target
  28.                 if (unlinkTarget == Disposable.Empty)
  29.                     unlinkTarget = block.AsObservable().Subscribe(target);
  30.             }
  31.         });
  32.  
  33.         unlinkFromSource = source.Do(
  34.                                     v => { },
  35.                                     () =>
  36.                                     {   // it is important to make sure that the
  37.                                         // target attached to the block
  38.                                         // before completion (otherwise  you may loose data)
  39.                                         if (unlinkTarget == Disposable.Empty)
  40.                                             unlinkTarget = block.AsObservable().Subscribe(target);
  41.                                     })
  42.                                     .Subscribe(block.AsObserver());
  43.  
  44.         compositeDisposable.Add(unlinkFromSource);
  45.         compositeDisposable.Add(unlinkTarget);
  46.         compositeDisposable.Add(unlinkHoldEvents);
  47.  
  48.         return compositeDisposable;
  49.  
  50.     });
  51.     return result;
  52. }

on line 5 I’m using Observable.Create which will encapsulate the logic (per subscription) 

line 12 attaching  the target to the Buffer Block (initial state)

at line 14 I’m listening the the hold / resume notification and

on the following line until line 31 I’m attaching and detaching the target from the Buffer Block

lines 33 –42 I’m attaching the Buffer Block to the source

lines 31-41 handling stream completion (flush out buffer data on completion)

lines 44-46 composing few disposable together

and that’s all

 

the full series can be found here

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*