Rx Challenge #9: Execute Async Until [Solution]

2016/08/30

Rx Challenge #9: Execute Async Until [Solution]

Challenge #9 was about implementing a pattern where you have to keep sending

async message until you will get notification which approve that the message is handled by the other side.

see the full detail at the Challenge at this post.

Simple scenario

For the simple scenario of you calling service and retry sending, according to it’s response.

You can use the following pattern.

Code Snippet
  1. private static async Task VerySimpleScenario()
  2. {
  3.     var delay = Observable.Timer(TimeSpan.FromSeconds(1));
  4.     var noDelay = Observable.Return(1L);
  5.     var xs = Observable.FromAsync(() => ExecSimpleAsync(1))
  6.                                         .Delay(v => v ? noDelay : delay)
  7.                                         .Repeat()
  8.                                         .FirstOrDefaultAsync(v => v);
  9.  
  10.     bool result = await xs;
  11.  
  12.     Console.WriteLine(":)");
  13. }
  14. private static async Task<bool> ExecSimpleAsync(int i)
  15. {
  16.     string indent = new string('\t', i);
  17.     Console.WriteLine($"{indent}~~> [{i}]");
  18.     await Task.Delay(1000);
  19.     return Environment.TickCount % 3 == 0;
  20. }

By using the combination of FromAsync –> conditional Delay –> Repeat –> conditional FristOrDefaultAsync

you can ensure repetitive call until getting the right response.

The idea is to repeat calling (line 7) until the result meet specific criteria (line 8).

The conditional  delay (line 6) is put in place for immediate response when meeting the criteria.

You should keep in mind that it won’t be valid for complex scenario of fire and forget (like queue based messaging).

 

The same solution is valid for multiple calls like the following code snippet:

Code Snippet
  1. private static void SimpleScenario()
  2. {
  3.     var delay = Observable.Timer(TimeSpan.FromSeconds(1));
  4.     var noDelay = Observable.Return(1L);
  5.     var xs = from i in Observable.Interval(TimeSpan.FromSeconds(2))
  6.                                     .Take(5)
  7.                                     .Select(v => (int)v)
  8.                 from result in Observable.FromAsync(() => ExecSimpleAsync(i))
  9.                                         .Delay(v => v ? noDelay : delay)
  10.                                         .Repeat()
  11.                                         .FirstOrDefaultAsync(v => v)
  12.                 select (int)i;
  13.  
  14.     xs.Subscribe(index =>
  15.     {
  16.         string indent = new string('\t', index);
  17.         Console.WriteLine($"{indent}[{index}] = :)");
  18.     }, ex => Console.WriteLine(ex));
  19. }

This snippet initiate 5 calls (line 6) every 2 seconds (line 5).

The same pattern we discuss earlier is put in place at lines 8 –11.

Check it out on your computer and see the results.

 

Complex scenario

The complex scenario is eligible for fire and forget communication (like queue based channels).

I was encapsulating the logic into extension method in order of usability.

Check the following code snippet:

Code Snippet
  1. public static Task<T> SendUntil<T>(
  2.     this IObservable<T> acknowledgeStream,
  3.     Func<CancellationToken, Task> action,
  4.     CancellationToken cancellationToken = default(CancellationToken),
  5.     TimeSpan? delayAfterExecution = null,
  6.     IScheduler scheduler = null)
  7. {
  8.     scheduler = scheduler ?? Scheduler.Default;
  9.     TimeSpan delay = delayAfterExecution ?? TimeSpan.FromSeconds(2);
  10.     var executer = Observable.FromAsync(action, scheduler)
  11.                     .Delay(delay, scheduler)
  12.                     .Repeat(); // can be limmit to specific number of re-send
  13.     var sendingUntil = // keep calling until the acknowledgeStream produce value or complete
  14.         Observable.CombineLatest(executer, acknowledgeStream, (exec, ack) => ack)
  15.                     .FirstOrDefaultAsync();
  16.  
  17.     return sendingUntil.ToTask(cancellationToken);
  18. }

I was adding some best practice like option for scheduler injection and cancellation token,

but the it’s not really matter for the solution.

The essence of the solution use CombineLatest operator along with FirstOrDefaultAsync (lines 15, 16).

It combine the repeating pattern you saw earlier with the first notification of the acknowledge stream.

This way you keep calling until the first acknowledge.

ToTask (line 17) help with the cancellation, otherwise you could simply await  the combined stream.

 

You can check the extension method using the following code snippet:

Code Snippet
  1. private static void ComplexScenario()
  2. {
  3.     var xs = from i in Observable.Interval(TimeSpan.FromSeconds(2)).Take(5)
  4.                 from ack in acknowledgeStream.Where(ack => ack == i)
  5.                                             .SendUntil(ct => ExecAAsync((int)i))
  6.                 select ack;
  7.     xs.Subscribe(ack =>
  8.     {
  9.         string indent = new string('\t', ack);
  10.         Console.WriteLine($"{indent}[{ack}] = :)");
  11.     }, ex => Console.WriteLine(ex));
  12. }
  13.  
  14. private static async Task ExecAAsync(int i)
  15. {
  16.     string indent = new string('\t', i);
  17.     Console.WriteLine($"{indent}~~> [{i}]");
  18.     await Task.Delay(1000);
  19.     var fireAndForget = ExecBAsync(i).ConfigureAwait(false);
  20. }
  21.  
  22. private static async Task ExecBAsync(int i)
  23. {
  24.     await Task.Delay(1000);
  25.     if (i % 2 == 0)
  26.         await ExecCAsync(i).ConfigureAwait(false);
  27.     else
  28.         await ExecDAsync(i).ConfigureAwait(false);
  29. }
  30.  
  31. private static async Task ExecCAsync(int i)
  32. {
  33.     await Task.Delay(1000);
  34.     acknowledgeStream.OnNext(i);
  35. }
  36.  
  37. private static async Task ExecDAsync(int i)
  38. {
  39.     await Task.Delay(5000);
  40.     acknowledgeStream.OnNext(i);
  41. }

 

Update:

The above solution will always be delayed even if the acknowledge stream will notify immediately.

This Issue can be solved by having custom trigger for the delay, which will be composition of the duration

and the acknowledge stream.

This solution will look like:

Code Snippet
  1. public static async Task<T> SendUntil<T>(
  2.     this IObservable<T> completeTrigger,
  3.     Func<CancellationToken, Task> action,
  4.     IScheduler scheduler = null,
  5.     TimeSpan? delayBetweenRetries = null,
  6.     int retries = 10,
  7.     CancellationToken cancellationToken = default(CancellationToken))
  8. {
  9.     Validations.NotNull(action, () => nameof(action));
  10.     Validations.NotNull(completeTrigger, () => nameof(completeTrigger));
  11.  
  12.     scheduler = scheduler ?? Scheduler.Default;
  13.  
  14.     var dataStream = completeTrigger.Publish(hot =>
  15.     {
  16.         // delay by duration until ack
  17.         TimeSpan delay = delayBetweenRetries ?? SEND_DELAY_INTERVAL;
  18.         var delayTrigger = Observable.Amb(
  19.                                     Observable.Timer(delay, scheduler),
  20.                                     hot.Select(_ => -1L));
  21.  
  22.         var executer = Observable.FromAsync(action, scheduler)
  23.                                 .Delay(_ => delayTrigger)
  24.                                 .Repeat(retries);
  25.         var sendingUntil = // keep calling until the completeTrigger produce value or complete
  26.             Observable.CombineLatest(executer, hot, (e, t) => t)
  27.                       .FirstOrDefaultAsync();
  28.         return sendingUntil;
  29.     });
  30.  
  31.     var result = await dataStream.ToTask(cancellationToken);
  32.     return result;
  33. }

Lines 18-20 construct the combined trigger, which is used at line 23.

I use publish because the completeTrigger stream is used twice (without publish it would be wrong for cold streams).

Summary

Rx is all about composition of operators.

In matter of fact I see Rx as the pipeline between producer and consumer, this is its real strength.

On this challenge you saw composing pattern which solve repetitive sending until acknowledge.

 

All challenges available here.

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*