As I already mentioned in one of my previous post, the SqlBulkCopy is a powerful tool which gives us an option to perform insertion for a large amount of data. The evolution of the ADO.NET creates us the Linq to Sql, as a great O/R Mapping framework from Microsoft kitchen. One of the missing features in Linq to Sql is the ability to use some bulk insert capabilities. In order to improve our Linq to Sql infrastructure I decided to implement the bulk insert as part of the Table class, and I will show you how I implemented it.
The SqlBulkCopy has various input types, I decided to use the IDataReader as my data source when using the WriteToServer method. The minimum implementation of the IDataReader interface as part of the SqlBulkCopy, requires to override three method: Read, GetValue and fieldCount methods.
public abstract class SqlBulkCopyReader : IDataReader { // derived must implement only these three public abstract bool Read(); public abstract object GetValue(int i); public abstract int FieldCount { get; } ... }
Next, I built our LinqBulkCopyReader as a generic class which inherits from the SqlBulkCopyReader class. In this class I also added a constraint that the generic type should be a class (like all other Linq to Sql entities). In our class constructor I retrieved all class properties and for each one of them I checked if this property has a changeable attribute column (column which isn't automatic generates/updates by the database). I also created a sub-class which called ColumnMapping that helps us to store the association between the class properties and the table columns. For each one of the column I stored the column id, column name and a function which gets the appropriate value from the enumerator.
public class LinqBulkCopyReader<TEntity> : SqlBulkCopyReader
where TEntity : class
{
public LinqBulkCopyReader(IEnumerable<TEntity> entities)
{
_Enumerator = entities.GetEnumerator();
_ColumnMappingList = new List<ColumnMapping>();
var entityType = entities.GetType()
.GetInterface("IEnumerable`1")
.GetGenericArguments()[0];
_TableName = (entityType.GetCustomAttributes(
typeof(TableAttribute), false) as TableAttribute[])[0].Name;
var properties = entityType.GetProperties();
for (int index = 0; index < properties.Length; index++)
{
var property = properties[index];
var columns = property.GetCustomAttributes(typeof(ColumnAttribute), false)
as ColumnAttribute[];
foreach (var column in columns)
{
//Check if this property repserent a updatable column
if ((!column.DbType.Contains("IDENTITY")) &&
(!column.IsDbGenerated) &&
(!column.IsVersion))
{
_ColumnMappingList.Add(new ColumnMapping()
{
ColumnIndex = index,
ColumnName = column.Name ?? property.Name,
ColumnGetter = row => property.GetValue(row, null)
});
}
}
}
}
private readonly IEnumerator<TEntity> _Enumerator;
private readonly IList<ColumnMapping> _ColumnMappingList;
private readonly string _TableName;
public string TableName
{
get { return _TableName;}
}
public IDictionary<string, int> ColumnMappingList
{
get
{
return _ColumnMappingList.Select(column =>
new {column.ColumnName, column.ColumnIndex})
.ToDictionary(x => x.ColumnName, y => y.ColumnIndex);
}
}
private class ColumnMapping
{
public int ColumnIndex { get; set; }
public string ColumnName { get; set; }
public Func<object, object> ColumnGetter { get; set; }
}
…
}
I implemented the IDataReader three requires methods (Read, GetValue and FieldCount). Furthermore, I decided to implement the GetOrdinal method too, in order to give another overload for mapping between the entity properties and the table columns.
public class LinqBulkCopyReader<TEntity> : SqlBulkCopyReader where TEntity : class { …
public override bool Read() { return _Enumerator.MoveNext(); } public override object GetValue(int i) { return _ColumnMappingList.Where(column => column.ColumnIndex == i) .Single() .ColumnGetter(_Enumerator.Current); } public override int FieldCount { get { return _ColumnMappingList.Count; } }
public override int GetOrdinal(string name) { return _ColumnMappingList .Where(column => column.ColumnName == name) .Single() .ColumnIndex; } }
The next step was to use this code inside the data context. I used the extension method feature in order to extend the current Table class, and to provide a standard Linq to Sql API to execute it (like all other data modification methods: insert, delete and etc). For example, in order to insert a new customer your probably write something like this:
Customer customer = new Customer()
{
FirstName ="Avi",
LastName ="Wortzel"
};
LinqDB db = new LinqDB();
db.Customers.InsertOnSubmit(customer);
db.SubmitChanges();
My motivation is to create a similar API to insert a collection of customer in almost the same way:
List<Customer> customers = new List<Customer>()
{
new Customer()
{
FirstName ="Avi",
LastName ="Wortzel"
},
new Customer()
{
FirstName ="Yuval",
LastName ="Wortzel"
}
};
LinqDB db = new LinqDB();
db.Customers.BulkCopy(customers);
Our extension method will be defined on the generic table class and will get the collection of customer as a parameter. All we need to do is to initiate the LinqBulkCopyReader and then to iterate on the column mapping list and to synchronize the column mapping collection in the SqlBulkCopy class. The last part of this code will be to set the database working table and to call to call to WriteToServer method which will perform the bulk insert:
public static class LinqDBExtension
{
public static void BulkCopy<TEntity>(this Table<TEntity> table,
IEnumerable<TEntity> entities)
where TEntity:class
{
SqlBulkCopy bulk = new SqlBulkCopy(ConfigurationManager.ConnectionStrings
["LinqDbConnectionString"].ConnectionString);
LinqBulkCopyReader<TEntity> reader = new LinqBulkCopyReader<TEntity>(entities);
foreach (var column in reader.ColumnMappingList)
{
bulk.ColumnMappings.Add(column.Key, column.Value);
}
bulk.DestinationTableName = reader.TableName;
bulk.WriteToServer(reader);
}
}
Summary:
In order to improve our Linq to Sql framework, we added a significant functionality for bulk insert which works side by side with the DataContext. It gives us the opportunity to use the Linq to Sql without losing a great feature which helps us to improve our system performance.
Update: I added the entire source code in the attachment.
Published
Tuesday, May 06, 2008 12:30 AM
by
Avi Wortzel