Waltzing Through the Parallel Extensions June CTP: Collection Classes

June 11, 2008

no comments

In the previous posts in this series, we have looked at the new synchronization mechanisms and the new task-related features in the PFX June CTP.  This post features a brief overview of the new collection classes introduced in the CTP.

In the new System.Threading.Collections namespace we find three new classes which facilitate concurrent programming.  These collections do not yet represent the wealth of concurrent and non-blocking collections that might be implemented in the future, but they are certainly a good sign.

A concurrent collection in the PFX nomenclature is what we recognize by the names of a non-blocking or a wait-free or a lock-free collection, i.e. a collection that does not incur a kernel-mode transition (wait) when items are added or removed.  (In my DevAcademy2 session last year I have demonstrated the performance and scalability benefits of using lock-free collections.)  Concurrent collections in PFX implement the IConcurrentCollection<T> interface, which extends IEnumerable<T> and ICollection:

public interface IConcurrentCollection<T> :

    IEnumerable<T>, ICollection, IEnumerable

{

    bool Add(T item);

    bool Remove(out T item);

    T[] ToArray();

}

The concurrent (i.e. lock-free) collections featured in this release are the ConcurrentQueue<T> and ConcurrentStack<T> collections.  These are classic implementation of a lock-free queue and lock-free stack, which rely on spinning internally when there’s significant contention for adding or removing elements.  They also support enumeration semantics, with the caveat that if the collection is modified during enumeration, no exception is thrown and the enumeration can proceed and retrieve stale information.  Since the collection is lock-free, there is no way to synchronize enumeration with add or remove operations, because there’s no way to make enumeration over N elements an atomic coordinated operation without true locking.

Note that both ConcurrentQueue<T> and ConcurrentStack<T> are based on a singly-linked list.  This is significantly less efficient than an array-based implementation for two reasons: First, there is significantly more garbage created by the add operations, because a node must be allocated (and the node is a reference type, adding to the overhead).  Second, traversing a linked list is slower than traversing an array because an array features explicit locality.  There’s work in progress in the direction of alleviating these concerns, and we’ll see where it leads.

The most interesting class of the bunch is the BlockingCollection<T> adapter class, which wraps a concurrent collection (any implementation of IConcurrentCollection<T>, defaulting to ConcurrentQueue<T>).  It provides the facilities for blocking when elements are removed or added to the collection when a bound is reached.  For example, it the collection is bounded to 1,000 elements, then when the bound is reached the add operation will block.  Alternatively, if the collection is currently empty then the remove operation will block.  Internally, the blocking collection adapter is implemented with two slim semaphores.

To streamline scenarios such as those where the blocking collection is used as a work-item coordination queue, there are also the static AddAny and RemoveAny methods, which can add or remove elements to any blocking collection from a set of blocking collections.  For example, if we implement a producer-consumer scenario with multiple producers and multiple consumers, then the producers can use the AddAny method to add the work item to any of the consumer’s blocking queues, and the consumers can use the RemoveAny method to remove a work item from any of the producer’s blocking queues.

The following code demonstrates the above scenario in a very rudimentary fashion – work items are enqueued to any available blocking collection, and retrieved from any available blocking collection:

BlockingCollection<Action>[] queues =

    new BlockingCollection<Action>[4];

for (int i = 0; i < queues.Length; ++i)

{

    queues[i] = new BlockingCollection<Action>(5);

}

for (int i = 0; i < queues.Length; ++i)

{

    new Thread(() =>

    {

        while (true)

        {

            Action action;

            if (BlockingCollection<Action>.RemoveAny(

                queues, out action) != -1)

            {

                action();

            }

        }

    }).Start();

}

for (int i = 0; i < 100; ++i)

{

    BlockingCollection<Action>.AddAny(queues, () =>

        {

            Thread.Sleep(100);

            Console.WriteLine(

                Thread.CurrentThread.ManagedThreadId);

        });

}

Console.ReadLine();

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*