LINQ to HPC (Formerly known as DryadLINQ) Tutorial: Part 2–Data Partitioning (DSC)

9 באוגוסט 2011

A new beta has been released since I wrote part 1 of this tutorial. While very little was changed in the product, we have a new name. Another thing held me back personally from publishing this part was the fact that LINQ to HPC is not a part of Windows HPC R2 SP2. So without farther ado I am proud to present the second part of my tutorial about LINQ to HPC.

In part 1 of this tutorial we discussed the fundamentals of DSC: how to manually write data to DSC files and how to use the FromEnumerable<T> extension method (from the HpcLinqExtras project) to implicitly save object data to a temporary file set (in order to use it inline in a subsequent query). We also saw a caveat in this method, namely that because FromEnumerable<T> saves the data to a single file in the temporary file set, Windows HPC Server 2008 R2 DSC DryadLINQ Dryad LINQ to HPCthe subsequent query cannot be parallelized. This is due to the fact that LINQ to HPC runs any query logic locally on the DSC node containing the data to which it refers.

The task at hand is quite straight forward: we would like to partitions our data into logical pieces that can be distributed across the cluster. Before we start discussing how we can physically partition data in LINQ to HPC, I would like to consider the logic we will use for dividing the data into groups. in order to do so we will take a look at vertices, which are the basic tasks that execute the query on the cluster. I will describe vertices in detail in a later part of this tutorial but for now there are few facts I would like you to consider:

  • A vertex can only use data from a single DSC file, located on the node it is executing on. This is, of course, in order to preserve data locality. The main implication of this little fun fact is that we should make sure that pieces of data that are dependent on each other will reside continuously in the DSC file set. A good example for this is the use of GroupBy in a query. Lets create a Student class defined as follows:
    public class Student
        public int Id { get; set; }
        public string Name { get; set; }
        public string Nationality { get; set; }
        public double AvgGrade { get; set; }

    Now let’s say we are grouping our Persons by nationality, so our data should be ordered like this:

    Windows HPC Server 2008 R2 DSC DryadLINQ Dryad LINQ to HPC 
    Dryad can execute local queries in each vertex and then union all the groups. If the same data needs to be reordered by the query (let’s say items were ordered by Id in the query), the first thing LINQ to HPC would need to do is to reorganize the data into intermediate files, and only then execute the necessary logic.
    Note: grouping operators are a bit more complex when it comes to LINQ to HPC and will be discussed in a later part of this tutorial.

  • A vertex will process all the data in the DSC file it is accessing. This means that if we would like to break down the processing of local queries in to smaller pieces we need to break the data in to smaller files. This is possible since DSC file set support creating more files than the number of nodes.

We can control the order in which our objects are written to file when using custom HPC serialization (as I have shown in part 1 of the tutorial). However this can become tedious, especially if we need to use the same data in different queries that can benefit from different partitioning and ordering.

Repartitioning Operators

Repartitioning operators are LINQ to HPC operators that result in intermediate DSC files partitioned in a way that is not dependent on the partitioning of the input files. There are two Repartitioning operators in LINQ to HPC: Hash and Range Partitioning.

Hash Partitioning

Hash partitioning provides a mechanism for partitioning data that is not sorted; Returning to our students sample, nationality is a prime candidate for hash partitioning. To use hash partitioning you need to call the HashPartition operation, which provides an overload that accepts the number of partitions to be created, once called you can use the ToDsc operator to create a new DSC file set and call SubmitAndWait to commit the operation (I have reviewed this steps in part 1 of this tutorial):

// getting the list of students
List<Student> students = GetStudentsList();

// saving the students range partitioned to the file set with 5 partitions
       .HashPartition(std => std.AvgGrade, 5)

The Why hash partitioning selects the partition for a specific entity is by performing a mod operation between the hash code of the key selector and the number of partitions, the following code mimics the behavior of hash partitioning regarding the partition selection:

var students = GetStudentsList();

foreach (var student in students)
    int portNum = student.GetHashCode() % 5;

    var str = "the student {0} with nationality {1} will be written into partition no: {2}";

This method is disappointingly crude. If you run this code (supplied with my samples) you will see that although we have instructed the HashPartition operator to create 5 partitions, the result of the mod operation results in only four different values. This is of course due to the nature of the values in our key selector (none of them divides evenly by 5). This result is somewhat arbitrary, and we could have had the result distributed in many ways (even and un even) dependent on the result of the key selector GetHashCode. To overcome this pitfall, HashPartition has another overload that accepts an IEqualityComparer that can be used to override the implementation of GetHashCode of the key selector.

Range Partitioning
Range partitioning allows the ordered partitioning of sorted keys. Returning once more to our student’s sample, the average grade can be used as such a key. This is useful if our query uses this key selector ordering in its logic. The way range partitioning works is by assigning a range of keys for every file: any object whose key belongs in that range will be placed in the DSC file. By using this method files can be created un-evenly, but we can ensure that objects within a specific range will reside in the same file. Range separators are used to define ranges: these are values that mark the border points between one range and another. Let’s say we now would like to partition our students into files that are partitioned by grades. We will use two range separators to split the data in to three files:

Windows HPC Server 2008 R2 DSC DryadLINQ Dryad LINQ to HPC

In this case our range separators are 3 and 6. One thing that is very easy to overlook is the fact that if our student’s grade equals the value of a range separator, it can belong, range-wise to the two files on both sides of the separator. Range separators can be assigned in two ways:

  • Statically assigned by user:
    In some cases we would like to explicitly force the range structure. This is useful when we know our data and queries structure and believe we can benefit from it. Let’s say we know our queries mostly filter students with grades of 6 and above, we can reflect this knowledge into our file structure even dough it results in an uneven distribution.
    We can pass an array of range separators like this: 
    // getting the list of students
    List<Student> students = GetStudentsList();

    // saving the students range partitioned to the file set
           .RangePartition(std => std.AvgGrade, new[] { 3d, 6d })

    All we need to provide here is a key selector delegate, to select the value on which we partition and the rangeKeys parameter which holds the array of range separators of the same type as the return type of the key selector.

  • Dynamically sampled: 
    Another, perhaps simpler approach is to use a different overload that allows LINQ to HPC to generate partition separators for us. When we allow RangePartition to select the range separators for us, it will try to create DSC files of approximately equal size, but on the other hand we do lose much of the control we had creating the range separators ourselves. There are few overloads of RanePartition; the simplest looks like this:
    // getting the list of students
    List<Student> students = GetStudentsList();

    // saving the students range partitioned to the file set with 5 partitions
           .RangePartition(std => std.AvgGrade, 5)

    Other than losing control with dynamic range partitioning there are few key points you should bear in mind:

    • Currently dynamic sampling will take place for every 1,000 records – not really useful for small datasets.
    • Dynamic range partitioning is using range separators even if you did not set them yourself. If the key selector will return non-proportional ranges, the files will have to differ in size.

Data partitioning allows us to implicitly distribute our data over the cluster, thus adding more control to how (and where) our queries will execute. Now that we got all our data just where we want it, we can start creating distributed kick-ass queries. But this calls for a completely different post.

Source code for all the samples can be found here.

Shout it kick it on

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>


one comment

  1. Samar29 בנובמבר 2012 ב 9:19

    Indeed looks like that sneaked in with .NET 2.0 and I misesd it completely.The main point I was trying to get across though was that we shouldn't think of LINQ as just a data interrogation tool, I end up using LINQ functionality all over the place.I will in future make a note to look for a more appropriate native function. We live and learn!