Rx Challenge #8

2015/07/24

no comments

this post is part of series, the hole series can be found in here.

 

this challenge is all about buffering data while suspended and flush it out on resume.

we already had challenge on Suspend and Resume but that challenge ignored the

data during the suspension period. on this challenge you expected to buffer the

data during the suspension period and flush it out on resume.

to achieve this challenge you have to provide extension method on the following format:

Code Snippet
  1. public static IObservable<T> HoldWhile<T>(
  2.             this IObservable<T> source,
  3.             IObservable<bool> hold)
  4. {
  5.     /*************************************************
  6.     * source  —1—2—3—4—5—6—7—8—9—10—11-|
  7.     * hold    ————-T———–F———–T———|
  8.     * result  —1—2—3————-4567–8—9———–10,11-|
  9.     *************************************************/
  10.     // TODO: complete the mission logic
  11.     throw new NotImplementedException();
  12. }

the input includes the source stream and the hold stream which provide

triggers for the buffering periods. buffer period should start when trigger value equals true

until it change to false.

you can test it against the following test (if you check the test carefully you may spot clue for my choice to implement it):

 

Code Snippet
  1. [TestClass]
  2. public class HoldWhileQuesionTests
  3. {
  4.     [TestMethod]
  5.     public async Task HoldWhile_Test()
  6.     {
  7.         long ticksPerSecond = TimeSpan.FromSeconds(1).Ticks;
  8.         var scheduler = new TestScheduler();
  9.         var source = Observable.Interval(TimeSpan.FromSeconds(2), scheduler)
  10.                            .Take(11)
  11.                            .Publish()
  12.                            .RefCount()
  13.                            .Select(m => m + 1); // 1..14
  14.         var suspend = source.Where(m => m % 3 == 0)
  15.                             .Select(m => m % 2 != 0)
  16.                             .Delay(TimeSpan.FromSeconds(1), scheduler); // 3 = true 6 = false, 9 = true, 12 = false
  17.  
  18.         var observer = scheduler.CreateObserver<long>();
  19.         // act
  20.         source.HoldWhile(suspend)
  21.               .Subscribe(observer);
  22.  
  23.         /*************************************************
  24.         * input: (T = True, F = False)
  25.         * source  —1—2—3—4—5—6—7—8—9—10—11-|
  26.         * suspend ————-T———–F———–T———|
  27.         * output:
  28.         * result  —1—2—3————-4567–8—9———–10,11-|
  29.         *************************************************/
  30.         scheduler.AdvanceBy(ticksPerSecond * 3 * 2); // until 3
  31.  
  32.         // verify
  33.         long[] results = GetResoults(observer);
  34.         Assert.AreEqual(3, results.Length);
  35.         Assert.AreEqual(3, results[2]);
  36.  
  37.         // act
  38.         scheduler.AdvanceBy(ticksPerSecond * 3 * 2); // Until 6
  39.  
  40.         // verify
  41.         results = GetResoults(observer);
  42.         Assert.AreEqual(3, results.Length);
  43.  
  44.         // act
  45.         scheduler.AdvanceBy(ticksPerSecond); // Until F
  46.         await Task.Delay(100); // wait for the TDF task
  47.  
  48.         // verify
  49.         results = GetResoults(observer);
  50.         Assert.AreEqual(6, results.Length);
  51.         Assert.AreEqual(6, results[5]);
  52.  
  53.        /*************************************************
  54.        * source  —1—2—3—4—5—6—7—8—9—10—11-|
  55.        * suspend ————-T———–F———–T———|
  56.        * result  —1—2—3————-4567–8—9———–10,11-|
  57.        *************************************************/
  58.  
  59.         // act
  60.         scheduler.AdvanceBy(ticksPerSecond * 5); // Until 9
  61.         await Task.Delay(100); // wait for the TDF task
  62.  
  63.         // verify
  64.         results = GetResoults(observer);
  65.         Assert.AreEqual(9, results.Length);
  66.         Assert.AreEqual(9, results[8]);
  67.  
  68.         // act
  69.         scheduler.AdvanceBy(ticksPerSecond * 3); // Before 11
  70.         await Task.Delay(100); // wait for the TDF task
  71.  
  72.         // verify
  73.         results = GetResoults(observer);
  74.         Assert.AreEqual(9, results.Length);
  75.  
  76.         // act
  77.         scheduler.AdvanceBy(ticksPerSecond * 2); // to the end
  78.         await Task.Delay(100); // wait for the TDF task
  79.  
  80.         // verify
  81.         results = GetResoults(observer);
  82.         Assert.AreEqual(11, results.Length);
  83.         Assert.AreEqual(11, results.Length);
  84.         Assert.AreEqual(NotificationKind.OnCompleted,
  85.                         observer.Messages.Last().Value.Kind);
  86.     }
  87.  
  88.     private static long[] GetResoults(ITestableObserver<long> observer)
  89.     {
  90.         var results = observer.
  91.             Messages
  92.             .Where(m => m.Value.Kind == NotificationKind.OnNext)
  93.             .Select(m => m.Value.Value).ToArray();
  94.         return results;
  95.     }
  96. }
  97. public static class HoldWhileQuesionExtensions
  98. {
  99.     /// <summary>
  100.     /// Holds while, use to buffer notification of the source
  101.     /// while suspending and flush it out when resuming
  102.     /// </summary>
  103.     /// <typeparam name="T"></typeparam>
  104.     /// <param name="source">The source.</param>
  105.     /// <param name="hold">The hold.</param>
  106.     /// <returns></returns>
  107.     public static IObservable<T> HoldWhile<T>(
  108.                 this IObservable<T> source,
  109.                 IObservable<bool> hold)
  110.     {
  111.         /*************************************************
  112.         * source  —1—2—3—4—5—6—7—8—9—10—11-|
  113.         * hold    ————-T———–F———–T———|
  114.         * result  —1—2—3————-4567–8—9———–10,11-|
  115.         *************************************************/
  116.         // TODO: complete the mission logic
  117.         throw new NotImplementedException();
  118.     }
  119. }

Good luck

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*