Custom Task Scheduler and await


on this post I will discuss the behavior of async and await when it's come to scheduling. let take the following snippet: 1: private async Task DoSomethingAsync() 2: { 3: // which thread is executing here? 4: // is it UI thread? 5: // is it Thread Pool thread? 6:  7: await Task.Delay(100); 8:  9:...
no comments

Rx Challenge #6 Solution


the previous challenge was about weakening the reference to the subscriber. you can see the challenge here. my solution for this challenge involve WeakObserver class + extension method which will create mediator of the weak observer each subscription. the following code snippet describe my solution: public class WeakObserver<T> : IObserver<T>{ private readonly WeakReference<IObserver<T>> _target; #region Ctor /// <summary> /// Initializes a new instance of the <see cref="WeakObserver{T}"/> class. /// </summary> /// <param name="target">The target.</param> /// <exception cref="System.ArgumentNullException">target</exception>...
no comments

Rx Challenge #6


this challenge come from Rx Forum question which ask about Weak subscription to observable. as the case with event, the observable is holding reference to its' subscription. when the observer life-time shouldn't dictate by the life-time of the observable, you need to weaken the reference to observer and let the GC to collect it while the only rooted reference to the observer come from its' subscription.   your challenge is to build this functionality. the next code snippet will help you to validate your solution: public void WeakSubscription_Test(){ // re-process a value when the...
no comments

Rx Challenge #5 Solution

Challenge #5 was about fault tolerance you can check the challenge details in here. the solution composed of Replay(1) and Retry() operations, but keep in mind that Replay return connectable observable, therefore it should come with Connect or RefCount (you can read more about it in here) you can also check out this Rx Forum's thread from where the original question was asked.   so without further ado, the code snippet below is my suggestion for the challenge's solution. public void FaultTolleranceAnswer1_Test(){ // re-process a value when the stream ahead produce error ...
no comments

Rx Challenge #5


this challenge come right from a question on the RX's forum. by checking the forum's thread you will find the answer to this challenge but you may want to solve it by yourself, before getting a backed answer.   this challenge is about fault tolerance in RX, RX is having a few fault handling operators but in general, when those operator triggered they only handling re-subscription to the original or fallback stream. this challenge is to have safe-point where RX's composition can re-process a value from this point in case of failure. it resemble to Spark capability of...
no comments

Rx Challenge #4 (More Solutions)


My colleague Eric Rabinovitch was offering different yet elegant solution for Rx Challenge #4 which I want to share with you. his approach includes window + aggregation. you can check the following code, which provided by Eric. // device provides stream of byte that represent message header + body// for each byte the first byte represent the header// the first byte represent Message Type Enum of (None, MessageA, MessageB, C)// while the real message may be construct from multiple sequence of byte// when the first byte != None the 2nd byte represent the number of byte used //...
no comments

Rx Challenge #4 (solution)


the last challenge was about composing multiple message into single messageby reading the message header. you can read more about it in here.   you can check the solutions for this challenge on the Rx forum or on the following code snippet: Enjoy public void SwitchPlusTests_Test(){ // device provides stream of byte that represent message header + body // for each byte the first byte represent the header // the first byte represent Message Type Enum of (None, MessageA, MessageB, C) // while the real message...
no comments

Rx Challenge #4


this challenge was taken from real life problem. It was one of my colleague who ask about reacting to Bluetooth device's output the device provides stream of byte that represent message header + bodyfor each byte.the first chunk of bytes represent the header (kind, chunk count)- the first byte represent message kind (Enum of None = 0, MessageA = 1, MessageB = 2)- the second byte represent number of chunk (the message split to)on each following message's chunk- the first byte is equals to zero (this is how you know that the message is not a starting point)  when...
no comments

Rx Challenge #3 (solution)

the last challenge was to apply suspend / resume behavior. for example we might want to suspend the input stream when itbecome to intense. you can read more on the challenge post the solution for this post is not so easy, and it's using the Scan operator. at the heart of this solution I was creating threshold class (latter will be wrap by accumulation which in turn will be used by the Scan) /// <summary>/// Encapsulate time stamping threshold/// where stamps are checked against staleness duration (limit duration) /// and non-staled stamp count evaluate against the count limit///...
no comments

Rx Challenge #3


this Rx's Challenge is actually real-life problem.I had to solve for one of my's actually a piece of infrastructure which may fit on many scenarios.the scenario I was encounter is fault tolerant for remote logging  service. we used to send our logs to central log viewer in order toinspect the heart bit of our client's apps and keeping it healthy. the problem starts when for some reason the client's apps cannot sent the information (offline, wrong  URL, etc.), in those cases wewand to suspend sending for awhile and resume later. so we react to send's failures and if...
no comments