Rx Challenge #8 Solution


this post is part of series, the hole series can be found in here. this challenge was all about buffering the data on suspension period and flush it out on resume. the idea is to suspend the observable notification for a while without loosing data. the following diagram show the required functionality Marble Diagram /************************************************* * source  ---1---2---3---4---5---6---7---8---9---10---11-| * hold    -------------T-----------F-----------T---------| * result  ---1---2---3-------------4567--8---9-----------10,11-| *************************************************/ solving this problem require some kind of buffering and if you follow my blog you must know that when it come to buffer and dispatch I’m usually integrating...
no comments

Rx Challenge #8


this post is part of series, the hole series can be found in here.   this challenge is all about buffering data while suspended and flush it out on resume. we already had challenge on Suspend and Resume but that challenge ignored the data during the suspension period. on this challenge you expected to buffer the data during the suspension period and flush it out on resume. to achieve this challenge you have to provide extension method on the following format: Code Snippet public static IObservable<T> HoldWhile<T>(             this IObservable<T> source,             IObservable<bool> hold) {     /*************************************************...
no comments

Rx Challenge TOC


this is a quick access page to the Rx Challenges Questions #1 http://blogs.microsoft.co.il/bnaya/2015/01/27/rx-challenge/ #2 http://blogs.microsoft.co.il/bnaya/2015/02/05/rx-challenge-2/ #3 http://blogs.microsoft.co.il/bnaya/2015/03/06/rx-challenge-3/ #4 http://blogs.microsoft.co.il/bnaya/2015/03/25/rx-challenge-4/ #5 http://blogs.microsoft.co.il/bnaya/2015/04/03/rx-challenge-5/ #6 http://blogs.microsoft.co.il/bnaya/2015/04/10/rx-challenge-6/ #7 http://blogs.microsoft.co.il/bnaya/2015/07/11/rx-challenge-7/   Solutions #1 http://blogs.microsoft.co.il/bnaya/2015/01/31/rx-challenge-solution/ #2 http://blogs.microsoft.co.il/bnaya/2015/02/09/rx-challenge-2-solution/ #3 http://blogs.microsoft.co.il/bnaya/2015/03/25/rx-challenge-3-solution/ #4 http://blogs.microsoft.co.il/bnaya/2015/03/30/rx-challenge-4-solution/      http://blogs.microsoft.co.il/bnaya/2015/04/02/rx-challenge-4-more-solution/ #5 http://blogs.microsoft.co.il/bnaya/2015/04/10/rx-challenge-5-solution/ #6 http://blogs.microsoft.co.il/bnaya/2015/04/19/rx-challenge-6-solution/ #7 http://blogs.microsoft.co.il/bnaya/2015/07/17/rx-challenge-7-solution/
no comments

Rx Challenge #7 Solution

the last challenge was about extending Rx API to support WhereAsync. while following Rx guidelines to ensure keeping events in order and synchronized “Assume observer instances are called in a serialized fashion” page 8, synchronization of async events may not be so trivial, solving it may require message buffering and dispatching. with that in mind, you can either build custom buffering and dispatching component or use existing one like TPL Dataflow. it turn out that this is exactly what TPL Dataflow design for. more than that, TPL Dataflow integrate smoothly with Rx .   the following solution is...
no comments

Rx Challenge #7


this challenge is all about extending the Rx API to support WhereAsync let take the following scenario: var xs = Observable.Interval(TimeSpan.FromMinutes(1)); what if you’re having the following method which get long and return Task<bool> Code Snippet public async Task<bool> IsValidAsync(long value) {     // in real life the method may send     // the to a service and get whether it's     // ok to use it     await Task.Delay(1000);     return value % 2 == 0; } can you filter the observable using this method? the problem...
no comments

Custom Task Scheduler and await


on this post I will discuss the behavior of async and await when it's come to scheduling. let take the following snippet: 1: private async Task DoSomethingAsync() 2: { 3: // which thread is executing here? 4: // is it UI thread? 5: // is it Thread Pool thread? 6:  7: await Task.Delay(100); 8:  9:...
no comments

Rx Challenge #6 Solution


the previous challenge was about weakening the reference to the subscriber. you can see the challenge here. my solution for this challenge involve WeakObserver class + extension method which will create mediator of the weak observer each subscription. the following code snippet describe my solution: public class WeakObserver<T> : IObserver<T>{ private readonly WeakReference<IObserver<T>> _target; #region Ctor /// <summary> /// Initializes a new instance of the <see cref="WeakObserver{T}"/> class. /// </summary> /// <param name="target">The target.</param> /// <exception cref="System.ArgumentNullException">target</exception>...
no comments

Rx Challenge #6


this challenge come from Rx Forum question which ask about Weak subscription to observable. as the case with event, the observable is holding reference to its' subscription. when the observer life-time shouldn't dictate by the life-time of the observable, you need to weaken the reference to observer and let the GC to collect it while the only rooted reference to the observer come from its' subscription.   your challenge is to build this functionality. the next code snippet will help you to validate your solution: public void WeakSubscription_Test(){ // re-process a value when the...
no comments

Rx Challenge #5 Solution

Challenge #5 was about fault tolerance you can check the challenge details in here. the solution composed of Replay(1) and Retry() operations, but keep in mind that Replay return connectable observable, therefore it should come with Connect or RefCount (you can read more about it in here) you can also check out this Rx Forum's thread from where the original question was asked.   so without further ado, the code snippet below is my suggestion for the challenge's solution. public void FaultTolleranceAnswer1_Test(){ // re-process a value when the stream ahead produce error ...
no comments

Rx Challenge #5


this challenge come right from a question on the RX's forum. by checking the forum's thread you will find the answer to this challenge but you may want to solve it by yourself, before getting a backed answer.   this challenge is about fault tolerance in RX, RX is having a few fault handling operators but in general, when those operator triggered they only handling re-subscription to the original or fallback stream. this challenge is to have safe-point where RX's composition can re-process a value from this point in case of failure. it resemble to Spark capability of...
no comments