Implementing SqlBulkCopy in Linq to Sql

6 במאי 2008

8 תגובות

As I already mentioned in one of my previous post, the SqlBulkCopy is a powerful tool which gives us an option to perform insertion for a large amount of data. The evolution of the ADO.NET creates us the Linq to Sql, as a great O/R Mapping framework from Microsoft kitchen. One of the missing features in Linq to Sql is the ability to use some bulk insert capabilities. In order to improve our Linq to Sql infrastructure I decided to implement the bulk insert as part of the Table class, and I will show you how I implemented it.

The SqlBulkCopy has various input types, I decided to use the IDataReader as my data source when using the WriteToServer method. The minimum implementation of the IDataReader interface as part of the SqlBulkCopy, requires to override three method: Read, GetValue and fieldCount methods.

public abstract class SqlBulkCopyReader : IDataReader 
{
    // derived must implement only these three
    public abstract bool Read();
    public abstract object GetValue(int i);
    public abstract int FieldCount { get; }
     …
}

Next, I built our LinqBulkCopyReader as a generic class which inherits from the SqlBulkCopyReader class. In this class I also added a constraint that the generic type should be a class (like all other Linq to Sql entities). In our class constructor I retrieved all class properties and for each one of them I checked if this property has a changeable attribute column (column which isn't automatic generates/updates by the database). I also created a sub-class which called ColumnMapping that helps us to store the association between the class properties and the table columns. For each one of the column I stored the column id, column name and a function which gets the appropriate value from the enumerator.

public class LinqBulkCopyReader<TEntity> : SqlBulkCopyReader
    where TEntity : class
{
    public LinqBulkCopyReader(IEnumerable<TEntity> entities)
    {
            _Enumerator = entities.GetEnumerator();
            _ColumnMappingList = new List<ColumnMapping>();
            var entityType = entities.GetType()
                .GetInterface("IEnumerable`1")
                .GetGenericArguments()[0]; 
            _TableName = (entityType.GetCustomAttributes(
                 typeof(TableAttribute), false) as TableAttribute[])[0].Name;
            var properties = entityType.GetProperties();
            for (int index = 0; index < properties.Length; index++)
            {   
                var property = properties[index];
                var columns = property.GetCustomAttributes(typeof(ColumnAttribute), false
                    as ColumnAttribute[];
                foreach (var column in columns)
                {                     

                    //Check if this property repserent a updatable column 
                    if ((!column.DbType.Contains("IDENTITY")) && 
                       (!column.IsDbGenerated) && 
                       (!column.IsVersion)) 
                    {
                         _ColumnMappingList.Add(new ColumnMapping()
                         {
                             ColumnIndex = index,
                             ColumnName = column.Name ?? property.Name,
                             ColumnGetter = row => property.GetValue(row, null)
                          }); 
                    }   
              }
         }
    }
    private readonly IEnumerator<TEntity> _Enumerator;
    private readonly IList<ColumnMapping> _ColumnMappingList; 
    private readonly string _TableName;
    public string TableName
    {
        get { return _TableName;}
    } 
    public IDictionary<string, int> ColumnMappingList
    {
        get
        {
            return _ColumnMappingList.Select(column =>
               
new {column.ColumnName, column.ColumnIndex})
                .ToDictionary(x => x.ColumnName, y => y.ColumnIndex); 
        }
    }
    private class ColumnMapping
    {
        public int ColumnIndex { get; set; }
        public string ColumnName { get; set; }
        public Func<object, object> ColumnGetter { get; set; }
    }

}

I implemented the IDataReader three requires methods (Read, GetValue and FieldCount). Furthermore, I decided to implement the GetOrdinal method too, in order to give another overload for mapping between the entity properties and the table columns.

public class LinqBulkCopyReader<TEntity> : SqlBulkCopyReader
    where TEntity : class 
{
    …

    public
override bool Read()

    {
        return _Enumerator.MoveNext();
    }
    public override object GetValue(int i)
    {
        return _ColumnMappingList.Where(column => column.ColumnIndex == i)
            .Single()
            .ColumnGetter(_Enumerator.Current);
    }
    public override int FieldCount
    {
        get
        {
            return _ColumnMappingList.Count;
        } 
    }
    public override int GetOrdinal(string name)
    {
        return _ColumnMappingList
            .Where(column => column.ColumnName == name)
            .Single()
            .ColumnIndex;
    }
}

The next step was to use this code inside the data context. I used the extension method feature in order to extend the current Table class, and to provide a standard Linq to Sql API to execute it (like all other data modification methods: insert, delete and etc). For example, in order to insert a new customer your probably write something like this:

Customer customer = new Customer() 

    FirstName ="Avi",
    LastName ="Wortzel" 
};
LinqDB db = new LinqDB();
db.Customers.InsertOnSubmit(customer);
db.SubmitChanges();

My motivation is to create a similar API to insert a collection of customer in almost the same way:

List<Customer> customers = new List<Customer>() 
{
    new Customer() 
    { 
        FirstName ="Avi",
        LastName ="Wortzel"
    }, 
    new Customer() 
    { 
        FirstName ="Yuval",
        LastName ="Wortzel" 
    } 
};
LinqDB db = new LinqDB();
db.Customers.BulkCopy(customers);

Our extension method will be defined on the generic table class and will get the collection of customer as a parameter. All we need to do is to initiate the LinqBulkCopyReader and then to iterate on the column mapping list and to synchronize the column mapping collection in the SqlBulkCopy class. The last part of this code will be to set the database working table and to call to call to WriteToServer method which will perform the bulk insert:

public static class LinqDBExtension
{
    public static void BulkCopy<TEntity>(this Table<TEntity> table,
        IEnumerable<TEntity> entities)
    where TEntity:class 
    {
        SqlBulkCopy bulk = new SqlBulkCopy(ConfigurationManager.ConnectionStrings
            ["LinqDbConnectionString"].ConnectionString);
        LinqBulkCopyReader<TEntity> reader = new LinqBulkCopyReader<TEntity>(entities);
        foreach (var column in reader.ColumnMappingList)         
        {

            bulk.ColumnMappings.Add(column.Key, column.Value); 
        } 
        bulk.DestinationTableName = reader.TableName;
        bulk.WriteToServer(reader); 
    }
}

Summary:
In order to improve our Linq to Sql framework, we added a significant functionality for bulk insert which works side by side with the DataContext. It gives us the opportunity to use the Linq to Sql without losing a great feature which helps us to improve our system performance.

Update: I added the entire source code in the attachment.

הוסף תגובה
facebook linkedin twitter email

כתיבת תגובה

האימייל לא יוצג באתר. (*) שדות חובה מסומנים

8 תגובות

  1. Rob Fonseca-Ensor8 במאי 2008 ב 22:04

    Hi Avi

    Have you tested the performance with a profiler? I suspect you'd find that the PropertyInfo.GetValue method is slowing you down quite a lot. I adapted the following code from http://jachman.wordpress.com/2006/08/22/2000-faster-using-dynamic-method-calls/

    public delegate void Setter(object target, T value);

    private static Setter createSetMethod(PropertyInfo info)
    {
    Type targetType = typeof(T);

    Debug.Assert(info.PropertyType == typeof(T));

    DynamicMethod setter = new DynamicMethod(
    string.Format("_Set{0}_", info.Name),
    typeof(void),
    new Type[] { typeof(object), typeof(object) },
    info.DeclaringType);

    ILGenerator generator = setter.GetILGenerator();
    generator.Emit(OpCodes.Ldarg_0);
    generator.Emit(OpCodes.Castclass, info.DeclaringType);
    generator.Emit(OpCodes.Ldarg_1);

    if (targetType.IsClass)
    {
    generator.Emit(OpCodes.Castclass, targetType);
    }
    else
    {
    generator.Emit(OpCodes.Unbox_Any, targetType);
    }

    generator.EmitCall(OpCodes.Callvirt, info.GetSetMethod(), null);
    generator.Emit(OpCodes.Ret);

    /*
    * Create the delegate and return it
    */
    return (Setter)setter.CreateDelegate(typeof(Setter));
    }

    using those setters instead of propertyinfo objects can give you a huge boost to speed.

    Cheers – Rob

    להגיב
  2. Andre' Hazelwood9 במאי 2008 ב 23:27

    Would you mind sharing the code for: SqlBulkCopyReader? I think it would be a big help to a lot of users looking to utilize the functionality of your code.

    Thanks.

    להגיב
  3. Avi Wortzel18 במאי 2008 ב 9:08

    Rob – Thanks for you r advice, it's really more professional way when using reflection.
    Andre' – I added the entire source code in the attachment.

    להגיב
  4. Greg Leflar28 באוקטובר 2008 ב 10:41

    Great example. This is very helpful. The one limitation I see is that it's a shallow copy. How could you modify this code to bulk insert the child entities (orders in your example)?

    להגיב
  5. Eric31 בדצמבר 2008 ב 11:21

    I've created something similar, but instead of passing a table in the bulk method, I'm passing in the DataContext and build tables for each type of object in context.GetChangeSet().Inserts

    I've worked around getting the identity fields by creating a identity key provider which reads the identity seeds of the database before insert and sticks that id in the identity field if it's not set yet.
    The last problem I have not solved yet, is how to update the foreign key relations to the newly generated id field. any ideas?

    להגיב
  6. Avi Wortzel6 במרץ 2009 ב 8:04

    Hi Eric,
    Your idea is a good one, but I'm not sure if this is right to do your insert operations with bulk.
    One way to load your foreign keys references is using Refresh method (from the DataContext) on your entities.
    Avi

    להגיב
  7. Dhawal8 במאי 2009 ב 20:37

    I use a similar method except that I call the BulkCopy method directly and pass the entities. In BulkCopy, I convert by entities to a dataTable and call sqlBulkCopy.WriteToServer(dataTable).

    My problem is I created the entities in memory and tied it to a parent object. After doing the bulk insert, I use DataContext to perform few other updates and when I call SubmitChanges(), it tries to insert all entities again. How do I refresh my DataContext with new rows inserted by SqlBulkInsert?

    להגיב
  8. Avi Wortzel12 במאי 2009 ב 0:53

    just try to use the Refresh method

    להגיב