Rx Challenge #11–Solution

2016/09/15

no comments

Rx Challenge #11–Solution

You can find the solution for Challenge 11 below.

Previous challenges available here.

Task, Semaphore, Paallel, WaitAsync

The following code snippet implement the base functionality which

is similar to the functionality of SemaphoreSlim.

Code Snippet
  1. public class AsyncLock
  2. {
  3.     private int _counter = 0;
  4.     private readonly int _limit;
  5.     // semantic Task
  6.     private ConcurrentQueue<TaskCompletionSource<object>> _tcsQueue =
  7.         new ConcurrentQueue<TaskCompletionSource<object>>();
  8.  
  9.     #region Ctor
  10.  
  11.     /// <summary>
  12.     /// Initializes a new instance of the <see cref="AsyncLock"/> class.
  13.     /// </summary>
  14.     /// <param name="limit">The limit.</param>
  15.     public AsyncLock(int limit)
  16.     {
  17.         _limit = limit;
  18.     }
  19.  
  20.     #endregion // Ctor
  21.  
  22.     #region WaitAsync
  23.  
  24.     /// <summary>
  25.     /// Increments the counter and return
  26.     /// synchronization handle which can be awaited.
  27.     /// </summary>
  28.     /// <returns></returns>
  29.     public Task WaitAsync()
  30.     {
  31.         int counter = Interlocked.Increment(ref _counter);
  32.         if (counter <= _limit)
  33.             return Task.CompletedTask; // below the limit return completed task
  34.         var tcs = new TaskCompletionSource<object>();
  35.         _tcsQueue.Enqueue(tcs);
  36.         return tcs.Task; // something to await on
  37.     }
  38.  
  39.     #endregion // WaitAsync
  40.  
  41.     #region Release
  42.  
  43.     /// <summary>
  44.     /// Release a lock.
  45.     /// </summary>
  46.     public void Release()
  47.     {
  48.         int counter = Interlocked.Decrement(ref _counter);
  49.         TaskCompletionSource<object> tcs;
  50.         if (_tcsQueue.TryDequeue(out tcs))
  51.             tcs.TrySetResult(null); // when executer complete it release
  52.     }
  53.  
  54.     #endregion // Release
  55. }

The solution is using Concurrent Queue of TaskCompletionSource i(line 6) n order to handle the async lock.

On Release (lines 50-51) the queue is used  for releasing single awaiting task.

The use of TaskCompletionSource is general pattern which can replace classic synchronization.

The problem with most classic synchronization is the fact that it blocking the execution (rather than waiting).

I use Interlocked in order to handle the count of executer which ask for the lock.

It increment on the WaitAsync (line 31) and decrement when releasing the lock (line 48).

Finally WaitAsync return Task.CompleedTask (.NET 4.6 API) when it still below the execution limit,

which mean that await below the limit, will continue synchronously without any delay.

 

The next code snippet represent the disposable wrapper:

Code Snippet
  1. public class AsyncLocker
  2. {
  3.     private readonly AsyncLock _gate;
  4.  
  5.     #region Ctor
  6.  
  7.     /// <summary>
  8.     /// Initializes a new instance of the <see cref="AsyncLock"/> class.
  9.     /// </summary>
  10.     /// <param name="limit">The limit.</param>
  11.     public AsyncLocker(int limit)
  12.     {
  13.         _gate = new AsyncLock(limit);
  14.     }
  15.  
  16.     #endregion // Ctor
  17.  
  18.     #region Acquire
  19.  
  20.     /// <summary>
  21.     /// Get async disposable for releasing the lock
  22.     /// </summary>
  23.     /// <returns></returns>
  24.     public async Task<IDisposable> Acquire()
  25.     {
  26.         await _gate.WaitAsync();
  27.         return Disposable.Create(() => _gate.Release());
  28.     }
  29.  
  30.     #endregion // Acquire        
  31. }

The idea behind the disposable wrapper is to provide better locking API.

It encapsulate the AsyncLock (in real life it should encapsulate SemaphoreSlim) and provide async disposable

for releasing the lock (line 27), the disposable come from Rx library.

Acquire await the lock and provide disposable.

 

Using this API will look like the following snippet:

Code Snippet
  1. const int LIMIT = 3;
  2. AsyncLocker _asyncLocker = new AsyncLocker(LIMIT);
  3. public async Task ExecAsync()
  4. {
  5.     using (await _asyncLocker.Acquire())
  6.     {
  7.         // …
  8.     }
  9. }

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*