Asynchronously Preloaded LINQ Queries

29 ביולי 2008

אין תגובות

real-buffering by 'nicedexter', CC BY-NC-SA As a rule of thumb, when presented with two independent blocking I/O operations on more than one independent devices, it's best to use threads to create parallel operations, instead of waiting for a single synchronous operation to complete. That way, executing operations O1, ..., On, each of which take T1, ..., Tn will result in total time where T < T1 + ... + Tn, instead of T = T1 + ... + Tn.

Let's take a simple example that consists of the following two operations: The first tries to load all of the files in the My Pictures folder as assemblies (silly), while the other simulates some obscure database operation by calling Thread.Sleep (very silly).

var assemblies = (from file in new DirectoryInfo(@"C:\...\My Pictures").GetFiles("*.*")
                  let assembly = TryLoadAssembly(file.FullName)
                  where assembly != null
                  select assembly).ToArray();

// Do some long database work.
Thread.Sleep(1000);

The code executes synchronously in about two seconds, one second for assembly loading operation and another second holding the thread.

What we could do is enqueue a work item for the ToArray call before the database operation and complete the execution synchronously once we're done. I've coded a short-hand syntax for that:

var assemblies = from file in new DirectoryInfo(@"C:\...\My Pictures").GetFiles("*.*")
                 let assembly = TryLoadAssembly(file.FullName)
                 where assembly != null
                 select assembly;

assemblies = assemblies.Buffered();

// Do some long database work.
Thread.Sleep(1000);

// Force Load
assemblies = assemblies.ToArray();

Once the Buffer call is made, the deferred query begins to run in a separate thread, buffering items into memory, waiting for the query to be executed. Once the query is executed, the buffered items are immediately returned and the iteration completes synchronously. The above code takes approximately one second to run, as both operations run concurrently.

I've attached the code behind this for your consideration and will be adding it to my LINQ Extensions project on CodePlex once I get around to it… :)

public static class ExtensionMethods
{
    /// <summary>
    /// Asynchronously begins buffering an enumeration, even before it is lazy loaded.
    /// </summary>
    public static IEnumerable<T> Buffered<T>(this IEnumerable<T> enumeration)
    {
        // Check to see that enumeration is not null
        if (enumeration == null)
            throw new ArgumentNullException("enumeration");

        return new AsyncEnumerable<T>(enumeration);
    }

    private class AsyncEnumerable<T> : IEnumerable<T>
    {
        private volatile bool shouldContinueBuffering;
        private IEnumerator<T> enumerator;
        private IAsyncResult asyncResult;
        private List<T> buffer;
        private Action bufferAction;
        private object syncLock;

        public AsyncEnumerable(IEnumerable<T> enumeration)
        {
            this.enumerator = enumeration.GetEnumerator();
            this.shouldContinueBuffering = true;
            this.buffer = new List<T>();
            this.syncLock = new object();
            this.bufferAction = this.Buffer;

            this.asyncResult = this.bufferAction.BeginInvoke(null, null);
        }

        private void Buffer()
        {
            lock (this.syncLock)
            {
                // Continue buffering for as long as we can and while there are still items left.
                while (this.shouldContinueBuffering && this.enumerator.MoveNext())
                {
                    buffer.Add(enumerator.Current);
                }
            }
        }

        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            this.shouldContinueBuffering = false;

            // Wait for the last item buffered to finish.
            lock (this.syncLock)
            {
                // End invocation so that exceptions could be throw here.
                this.bufferAction.EndInvoke(this.asyncResult);
            }

            // Iterate over buffered items.
            foreach (var item in buffer)
            {
                yield return item;
            }

            // Continue iterating from the point where we stopped buffering.
            while (enumerator.MoveNext())
            {
                yield return enumerator.Current;
            }
        }

        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<T>)this).GetEnumerator();
        }
    }
}

[Cross-Posted from Omer van Kloeten's .NET Zen, my English language weblog]

הוסף תגובה
facebook linkedin twitter email

כתיבת תגובה

האימייל לא יוצג באתר. (*) שדות חובה מסומנים