Task.Delay Challenge Solution

2017/04/21

no comments

Task.Delay Challenge Solution

This post suggest solution to the Task.Delay challenge.

If you haven’t read the Task.Delay challenge you can find it here.

In nutshell the challenge was about making Task.Delay like functionality

which is test-able (using IScheduler for having virtual time).

In order to support virtual time I will use Rx’s stream (which is also awaitable).

The original Task.Delay is actually having 2 different completion path:

– Complete by time.

– Complete as result of cancellation token event.

The general idea of the solution is to have 2 Rx’s streams one for each completion path:

Timer which will represent completion by time.

Custom stream which will react to the cancellation token.

I will combine both stream with Observable.Amb operator which will take the values

from the first stream that will produce a value (and unsubscribe from the other stream).

Task, Rx, TPL, Observable, Delay, Scheduler, Testable

For convenient API access I wrapped the solution with extension methods.

The following code snippet represent the cancellation stream:

Code Snippet
  1. // using const for memory optimization
  2. private readonly static OperationCanceledException CANCEL_ERROR =
  3.                             new OperationCanceledException();
  4. public static IObservable<T> ToObservable<T>(this CancellationToken token)
  5. {
  6.     return Observable.Create<T>(observer =>
  7.     {
  8.         var unsubscribe = new BooleanDisposable();
  9.         token.Register(() =>
  10.         {   // will be trigger even it the cancellation is already canceled
  11.             if (!unsubscribe.IsDisposed)
  12.                 observer.OnError(CANCEL_ERROR);
  13.         });
  14.         return unsubscribe;
  15.     });
  16. }

The stream was created by using Observable.Create (line 6) combined with registration to the cancellation token event (lines 9-13).

From here it’s all matter of composition (see the following code snippet):

Code Snippet
  1. public static IObservable<T> Delay<T>(
  2.     this TimeSpan duration,
  3.     CancellationToken cancellationToken,
  4.     IScheduler scheduler = null)
  5. {
  6.     var delayStream = Observable.Amb(
  7.                         Observable.Timer(duration, scheduler ?? Scheduler.Default)
  8.                                 .Select(_ => default(T)),
  9.                         cancellationToken.ToObservable<T>());
  10.     return delayStream;
  11. }

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*