Blob Parallel Upload and Download

14 במאי 2012

tags: ,
5 comments

To gain the best performance from azure blob storage it is required to upload and download data in parallel. For very small files it is OK to use the simple blob API (UploadFile, UploadFromStream etc.) but for large chucks of data parallel upload is required.

To do parallel upload we'll use a block blob when working with streaming data (such as images or movies) and use the producer consumer design pattern. One thread read the stream, create blocks and put them into a queue. A collection of other threads read blocks from the queue and upload them to the cloud. Once all the threads finished the whole blob is committed.

Lets see the code:

public class PrallelBlobTransfer
    {
        // Async events and properties
        public event EventHandler TransferCompleted;
        private bool TaskIsRunning = false;
        private readonly object _sync = new object();

        // Used to calculate download speeds
        Queue<long> timeQueue = new Queue<long>(100);
        Queue<long> bytesQueue = new Queue<long>(100);

        public CloudBlobContainer Container { get; set; }
       
        public PrallelBlobTransfer(CloudBlobContainer container)
        {
            Container = container;
        }

       
        public void UploadFileToBlobAsync(string fileToUpload, string blobName)
        {
            
            if (!File.Exists(fileToUpload))
                throw new FileNotFoundException(fileToUpload);
            
            var worker = new Action<Stream,string>(ParallelUploadStream);

            lock (_sync)
            {
                if (TaskIsRunning)
                    throw new InvalidOperationException("The control is currently busy.");

                AsyncOperation async = AsyncOperationManager.CreateOperation(null);
                var fs = File.OpenRead(fileToUpload);
                worker.BeginInvoke(fs,blobName, TaskCompletedCallback, async);

                TaskIsRunning = true;
            }
        }
        
        public void UploadDataToBlobAsync(byte[] dataToUpload, string blobName)
        {
            var worker = new Action<Stream, string>(ParallelUploadStream);

            lock (_sync)
            {
                if (TaskIsRunning)
                    throw new InvalidOperationException("The control is currently busy.");

                AsyncOperation async = AsyncOperationManager.CreateOperation(null);
                var ms = new MemoryStream(dataToUpload);
                worker.BeginInvoke(ms, blobName, TaskCompletedCallback, async);

                TaskIsRunning = true;
            }
        }

        public void DownloadBlobToFileAsync(string filePath, string blobToDownload)
        {
            var worker = new Action<Stream,string>(ParallelDownloadFile);

            lock (_sync)
            {
                if (TaskIsRunning)
                    throw new InvalidOperationException("The control is currently busy.");

                AsyncOperation async = AsyncOperationManager.CreateOperation(null);
                var fs = File.OpenWrite(filePath);
                worker.BeginInvoke(fs, blobToDownload, TaskCompletedCallback, async);

                TaskIsRunning = true;
            }
        }
        
        public void DownloadBlobToBufferAsync(byte[] buffer, string blobToDownload)
        {
            var worker = new Action<Stream, string>(ParallelDownloadFile);

            lock (_sync)
            {
                if (TaskIsRunning)
                    throw new InvalidOperationException("The control is currently busy.");

                AsyncOperation async = AsyncOperationManager.CreateOperation(null);
                var ms = new MemoryStream(buffer);
                worker.BeginInvoke(ms, blobToDownload, TaskCompletedCallback, async);

                TaskIsRunning = true;
            }
        }

        public bool IsBusy
        {
            get { return TaskIsRunning; }
        }
        
        // Blob Upload Code
        // 200 GB max blob size
        // 50,000 max blocks
        // 4 MB max block size
        // Try to get close to 100k block size in order to offer good progress update response.
        private int GetBlockSize(long fileSize)
        {
            const long KB = 1024;
            const long MB = 1024 * KB;
            const long GB = 1024 * MB;
            const long MAXBLOCKS = 50000;
            const long MAXBLOBSIZE = 200 * GB;
            const long MAXBLOCKSIZE = 4 * MB;

            long blocksize = 100 * KB;
            //long blocksize = 4 * MB;
            long blockCount;
            blockCount = ((int)Math.Floor((double)(fileSize / blocksize))) + 1;
            while (blockCount > MAXBLOCKS - 1)
            {
                blocksize += 100 * KB;
                blockCount = ((int)Math.Floor((double)(fileSize / blocksize))) + 1;
            }

            if (blocksize > MAXBLOCKSIZE)
            {
                throw new ArgumentException("Blob too big to upload.");
            }

            return (int)blocksize;
        }

        /// <summary>
        /// Uploads content to a blob using multiple threads.
        /// </summary>
        /// <param name="inputStream"></param>
        /// <param name="blobName"></param>
        private void ParallelUploadStream(Stream inputStream,string blobName)
        {
          // the optimal number of transfer threads
            int numThreads = 10;

            long fileSize = inputStream.Length;

            int maxBlockSize = GetBlockSize(fileSize);
            long bytesUploaded = 0;

            // Prepare a queue of blocks to be uploaded. Each queue item is a key-value pair where
            // the 'key' is block id and 'value' is the block length.
            var queue = new Queue<KeyValuePair<int, int>>();
            var blockList = new List<string>();
            int blockId = 0;
            while (fileSize > 0)
            {
                int blockLength = (int)Math.Min(maxBlockSize, fileSize);
                string blockIdString = Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(
string.Format("BlockId{0}", blockId.ToString("0000000")))); KeyValuePair<int, int> kvp = new KeyValuePair<int, int>(blockId++, blockLength); queue.Enqueue(kvp); blockList.Add(blockIdString); fileSize -= blockLength; } var blob = Container.GetBlockBlobReference(blobName); blob.DeleteIfExists(); BlobRequestOptions options = new BlobRequestOptions() { RetryPolicy = RetryPolicies.RetryExponential(
RetryPolicies.DefaultClientRetryCount, RetryPolicies.DefaultMaxBackoff), Timeout = TimeSpan.FromSeconds(90) }; // Launch threads to upload blocks. var tasks = new List<Task>(); for (int idxThread = 0; idxThread < numThreads; idxThread++) { tasks.Add(Task.Factory.StartNew(() => { KeyValuePair<int, int> blockIdAndLength; using (inputStream) { while (true) { // Dequeue block details. lock (queue) { if (queue.Count == 0) break; blockIdAndLength = queue.Dequeue(); } byte[] buff = new byte[blockIdAndLength.Value]; BinaryReader br = new BinaryReader(inputStream); // move the file system reader to the proper position inputStream.Seek(blockIdAndLength.Key * (long)maxBlockSize, SeekOrigin.Begin); br.Read(buff, 0, blockIdAndLength.Value); // Upload block. string blockName = Convert.ToBase64String(BitConverter.GetBytes( blockIdAndLength.Key)); using (MemoryStream ms = new MemoryStream(buff, 0, blockIdAndLength.Value)) { string blockIdString = Convert.ToBase64String(
ASCIIEncoding.ASCII.GetBytes(string.Format("BlockId{0}", blockIdAndLength.Key.ToString("0000000")))); string blockHash = GetMD5HashFromStream(buff); blob.PutBlock(blockIdString, ms, blockHash, options); } } } })); } // Wait for all threads to complete uploading data. Task.WaitAll(tasks.ToArray()); // Commit the blocklist. blob.PutBlockList(blockList, options); } /// <summary> /// Downloads content from a blob using multiple threads. /// </summary> /// <param name="outputStream"></param> /// <param name="blobToDownload"></param> private void ParallelDownloadFile(Stream outputStream, string blobToDownload) { int numThreads = 10; var blob = Container.GetBlockBlobReference(blobToDownload); blob.FetchAttributes(); long blobLength = blob.Properties.Length; int bufferLength = GetBlockSize(blobLength); // 4 * 1024 * 1024; long bytesDownloaded = 0; // Prepare a queue of chunks to be downloaded. Each queue item is a key-value pair // where the 'key' is start offset in the blob and 'value' is the chunk length. Queue<KeyValuePair<long, int>> queue = new Queue<KeyValuePair<long, int>>(); long offset = 0; while (blobLength > 0) { int chunkLength = (int)Math.Min(bufferLength, blobLength); queue.Enqueue(new KeyValuePair<long, int>(offset, chunkLength)); offset += chunkLength; blobLength -= chunkLength; } int exceptionCount = 0; using (outputStream) { // Launch threads to download chunks. var tasks = new List<Task>(); for (int idxThread = 0; idxThread < numThreads; idxThread++) { tasks.Add(Task.Factory.StartNew(() => { KeyValuePair<long, int> blockIdAndLength; // A buffer to fill per read request. byte[] buffer = new byte[bufferLength]; while (true) { // Dequeue block details. lock (queue) { if (queue.Count == 0) break; blockIdAndLength = queue.Dequeue(); } try { // Prepare the HttpWebRequest to download data from the chunk. HttpWebRequest blobGetRequest = BlobRequest.Get(blob.Uri, 60, null, null); // Add header to specify the range blobGetRequest.Headers.Add("x-ms-range",
string.Format(System.Globalization.CultureInfo.InvariantCulture, "bytes={0}-{1}",
blockIdAndLength.Key, blockIdAndLength.Key + blockIdAndLength.Value - 1)); // Sign request. StorageCredentials credentials = blob.ServiceClient.Credentials; credentials.SignRequest(blobGetRequest); // Read chunk. using (HttpWebResponse response = blobGetRequest.GetResponse() as HttpWebResponse) { using (Stream stream = response.GetResponseStream()) { int offsetInChunk = 0; int remaining = blockIdAndLength.Value; while (remaining > 0) { int read = stream.Read(buffer, offsetInChunk, remaining); lock (outputStream) { outputStream.Position = blockIdAndLength.Key + offsetInChunk; outputStream.Write(buffer, offsetInChunk, read); } offsetInChunk += read; remaining -= read; Interlocked.Add(ref bytesDownloaded, read); } } } } catch (Exception ex) { // Add block back to queue queue.Enqueue(blockIdAndLength); exceptionCount++; // If we have had more than 100 exceptions then break if (exceptionCount == 100) { throw new Exception("Received 100 exceptions while downloading." + ex.ToString()); } if (exceptionCount >= 100) { break; } } } })); } // Wait for all threads to complete downloading data. Task.WaitAll(tasks.ToArray()); } } private void TaskCompletedCallback(IAsyncResult ar) { // get the original worker delegate and the AsyncOperation instance Action<Stream, string> worker = (Action<Stream, string>)((AsyncResult)ar).AsyncDelegate; AsyncOperation async = (AsyncOperation)ar.AsyncState; // finish the asynchronous operation worker.EndInvoke(ar); // clear the running task flag lock (_sync) { TaskIsRunning = false; } // raise the completed event async.PostOperationCompleted(state => OnTaskCompleted((EventArgs)state), new EventArgs()); } protected virtual void OnTaskCompleted(EventArgs e) { if (TransferCompleted != null) TransferCompleted(this, e); } private string GetMD5HashFromStream(byte[] data) { MD5 md5 = new MD5CryptoServiceProvider(); byte[] blockHash = md5.ComputeHash(data); return Convert.ToBase64String(blockHash, 0, 16); } }

Enjoy

Manu

Add comment
facebook linkedin twitter email

כתיבת תגובה

האימייל לא יוצג באתר. (*) שדות חובה מסומנים

תגי HTML מותרים: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

5 comments

  1. bbharbison7 בפברואר 2013 ב 12:31

    What about the LOH from possibly creating 4MB byte arrays and multi-threaded, multi-file uploads, which will destroy memory? You're relying on all the default Task related functionality, which will inhibit performance. What would you recommend? Thanks.

    להגיב
  2. Vitor Ciaramella11 במרץ 2013 ב 11:07

    You can also use the Blob Transfer Utility to download and upload all your blob files.
    It's a tool to handle thousands of blob transfers in a effective way.

    Binaries and source code, here: http://bit.ly/blobtransfer

    להגיב
  3. Fola18 במרץ 2013 ב 9:54

    I don't know if the UploadDataToBlobAsync method was tested but there are problems with this code namely: The inputStream is being disposed way too early. The parallel tasks seem to be overwriting (or at least corrupting) each other's blocks. Still, I guess it's a good starting point if you know what modifications to make.

    להגיב
  4. Sarath9 באפריל 2013 ב 8:52

    Hi please provide namespace …..PrallelBlobTransfer shold have return type error is coming.. give me a sample how to use….

    להגיב
  5. michauwojt@gmail.com21 באפריל 2013 ב 15:09

    That you cause it to be appear that easy together with your display however i obtain this trouble to generally be really some thing which I'm sure I'd hardly ever have an understanding of. This reveals as well challenging and extremely broad to me. I will be looking forward for the next article, I am going to attempt to learn them!

    להגיב