DCSIMG
Manu Cohen-Yashar's Blog

Manu Cohen-Yashar's Blog

Role Deployment Location on Azure

It is important to know where exactly will your role be deployed on the production machine in Azure data center. It is also important to know where windows is installed so you can reference its tools when writing startup tasks.

Well on windows Azure windows is deployed on drive D: !!!

Your role will be deployed to drive E: (E:\sitesroot\  for web role and a mirror in e:\AppsRoot for all roles)

you can learn about the location of your web role by simply open IIS Manager and explore to the web site of your role.

Manu

Blob Parallel Upload and Download

To gain the best performance from azure blob storage it is required to upload and download data in parallel. For very small files it is OK to use the simple blob API (UploadFile, UploadFromStream etc.) but for large chucks of data parallel upload is required.

To do parallel upload we'll use a block blob when working with streaming data (such as images or movies) and use the producer consumer design pattern. One thread read the stream, create blocks and put them into a queue. A collection of other threads read blocks from the queue and upload them to the cloud. Once all the threads finished the whole blob is committed.

Lets see the code:

public class PrallelBlobTransfer
    {
        // Async events and properties
        public event EventHandler TransferCompleted;
        private bool TaskIsRunning = false;
        private readonly object _sync = new object();

        // Used to calculate download speeds
        Queue<long> timeQueue = new Queue<long>(100);
        Queue<long> bytesQueue = new Queue<long>(100);

        public CloudBlobContainer Container { get; set; }
       
        public PrallelBlobTransfer(CloudBlobContainer container)
        {
            Container = container;
        }

       
        public void UploadFileToBlobAsync(string fileToUpload, string blobName)
        {
            
            if (!File.Exists(fileToUpload))
                throw new FileNotFoundException(fileToUpload);
            
            var worker = new Action<Stream,string>(ParallelUploadStream);

            lock (_sync)
            {
                if (TaskIsRunning)
                    throw new InvalidOperationException("The control is currently busy.");

                AsyncOperation async = AsyncOperationManager.CreateOperation(null);
                var fs = File.OpenRead(fileToUpload);
                worker.BeginInvoke(fs,blobName, TaskCompletedCallback, async);

                TaskIsRunning = true;
            }
        }
        
        public void UploadDataToBlobAsync(byte[] dataToUpload, string blobName)
        {
            var worker = new Action<Stream, string>(ParallelUploadStream);

            lock (_sync)
            {
                if (TaskIsRunning)
                    throw new InvalidOperationException("The control is currently busy.");

                AsyncOperation async = AsyncOperationManager.CreateOperation(null);
                var ms = new MemoryStream(dataToUpload);
                worker.BeginInvoke(ms, blobName, TaskCompletedCallback, async);

                TaskIsRunning = true;
            }
        }

        public void DownloadBlobToFileAsync(string filePath, string blobToDownload)
        {
            var worker = new Action<Stream,string>(ParallelDownloadFile);

            lock (_sync)
            {
                if (TaskIsRunning)
                    throw new InvalidOperationException("The control is currently busy.");

                AsyncOperation async = AsyncOperationManager.CreateOperation(null);
                var fs = File.OpenWrite(filePath);
                worker.BeginInvoke(fs, blobToDownload, TaskCompletedCallback, async);

                TaskIsRunning = true;
            }
        }
        
        public void DownloadBlobToBufferAsync(byte[] buffer, string blobToDownload)
        {
            var worker = new Action<Stream, string>(ParallelDownloadFile);

            lock (_sync)
            {
                if (TaskIsRunning)
                    throw new InvalidOperationException("The control is currently busy.");

                AsyncOperation async = AsyncOperationManager.CreateOperation(null);
                var ms = new MemoryStream(buffer);
                worker.BeginInvoke(ms, blobToDownload, TaskCompletedCallback, async);

                TaskIsRunning = true;
            }
        }

        public bool IsBusy
        {
            get { return TaskIsRunning; }
        }
        
        // Blob Upload Code
        // 200 GB max blob size
        // 50,000 max blocks
        // 4 MB max block size
        // Try to get close to 100k block size in order to offer good progress update response.
        private int GetBlockSize(long fileSize)
        {
            const long KB = 1024;
            const long MB = 1024 * KB;
            const long GB = 1024 * MB;
            const long MAXBLOCKS = 50000;
            const long MAXBLOBSIZE = 200 * GB;
            const long MAXBLOCKSIZE = 4 * MB;

            long blocksize = 100 * KB;
            //long blocksize = 4 * MB;
            long blockCount;
            blockCount = ((int)Math.Floor((double)(fileSize / blocksize))) + 1;
            while (blockCount > MAXBLOCKS - 1)
            {
                blocksize += 100 * KB;
                blockCount = ((int)Math.Floor((double)(fileSize / blocksize))) + 1;
            }

            if (blocksize > MAXBLOCKSIZE)
            {
                throw new ArgumentException("Blob too big to upload.");
            }

            return (int)blocksize;
        }

        /// <summary>
        /// Uploads content to a blob using multiple threads.
        /// </summary>
        /// <param name="inputStream"></param>
        /// <param name="blobName"></param>
        private void ParallelUploadStream(Stream inputStream,string blobName)
        {
          // the optimal number of transfer threads
            int numThreads = 10;

            long fileSize = inputStream.Length;

            int maxBlockSize = GetBlockSize(fileSize);
            long bytesUploaded = 0;

            // Prepare a queue of blocks to be uploaded. Each queue item is a key-value pair where
            // the 'key' is block id and 'value' is the block length.
            var queue = new Queue<KeyValuePair<int, int>>();
            var blockList = new List<string>();
            int blockId = 0;
            while (fileSize > 0)
            {
                int blockLength = (int)Math.Min(maxBlockSize, fileSize);
                string blockIdString = Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(
string.Format("BlockId{0}", blockId.ToString("0000000")))); KeyValuePair<int, int> kvp = new KeyValuePair<int, int>(blockId++, blockLength); queue.Enqueue(kvp); blockList.Add(blockIdString); fileSize -= blockLength; } var blob = Container.GetBlockBlobReference(blobName); blob.DeleteIfExists(); BlobRequestOptions options = new BlobRequestOptions() { RetryPolicy = RetryPolicies.RetryExponential(
RetryPolicies.DefaultClientRetryCount, RetryPolicies.DefaultMaxBackoff), Timeout = TimeSpan.FromSeconds(90) }; // Launch threads to upload blocks. var tasks = new List<Task>(); for (int idxThread = 0; idxThread < numThreads; idxThread++) { tasks.Add(Task.Factory.StartNew(() => { KeyValuePair<int, int> blockIdAndLength; using (inputStream) { while (true) { // Dequeue block details. lock (queue) { if (queue.Count == 0) break; blockIdAndLength = queue.Dequeue(); } byte[] buff = new byte[blockIdAndLength.Value]; BinaryReader br = new BinaryReader(inputStream); // move the file system reader to the proper position inputStream.Seek(blockIdAndLength.Key * (long)maxBlockSize, SeekOrigin.Begin); br.Read(buff, 0, blockIdAndLength.Value); // Upload block. string blockName = Convert.ToBase64String(BitConverter.GetBytes( blockIdAndLength.Key)); using (MemoryStream ms = new MemoryStream(buff, 0, blockIdAndLength.Value)) { string blockIdString = Convert.ToBase64String(
ASCIIEncoding.ASCII.GetBytes(string.Format("BlockId{0}", blockIdAndLength.Key.ToString("0000000")))); string blockHash = GetMD5HashFromStream(buff); blob.PutBlock(blockIdString, ms, blockHash, options); } } } })); } // Wait for all threads to complete uploading data. Task.WaitAll(tasks.ToArray()); // Commit the blocklist. blob.PutBlockList(blockList, options); } /// <summary> /// Downloads content from a blob using multiple threads. /// </summary> /// <param name="outputStream"></param> /// <param name="blobToDownload"></param> private void ParallelDownloadFile(Stream outputStream, string blobToDownload) { int numThreads = 10; var blob = Container.GetBlockBlobReference(blobToDownload); blob.FetchAttributes(); long blobLength = blob.Properties.Length; int bufferLength = GetBlockSize(blobLength); // 4 * 1024 * 1024; long bytesDownloaded = 0; // Prepare a queue of chunks to be downloaded. Each queue item is a key-value pair // where the 'key' is start offset in the blob and 'value' is the chunk length. Queue<KeyValuePair<long, int>> queue = new Queue<KeyValuePair<long, int>>(); long offset = 0; while (blobLength > 0) { int chunkLength = (int)Math.Min(bufferLength, blobLength); queue.Enqueue(new KeyValuePair<long, int>(offset, chunkLength)); offset += chunkLength; blobLength -= chunkLength; } int exceptionCount = 0; using (outputStream) { // Launch threads to download chunks. var tasks = new List<Task>(); for (int idxThread = 0; idxThread < numThreads; idxThread++) { tasks.Add(Task.Factory.StartNew(() => { KeyValuePair<long, int> blockIdAndLength; // A buffer to fill per read request. byte[] buffer = new byte[bufferLength]; while (true) { // Dequeue block details. lock (queue) { if (queue.Count == 0) break; blockIdAndLength = queue.Dequeue(); } try { // Prepare the HttpWebRequest to download data from the chunk. HttpWebRequest blobGetRequest = BlobRequest.Get(blob.Uri, 60, null, null); // Add header to specify the range blobGetRequest.Headers.Add("x-ms-range",
string.Format(System.Globalization.CultureInfo.InvariantCulture, "bytes={0}-{1}",
blockIdAndLength.Key, blockIdAndLength.Key + blockIdAndLength.Value - 1)); // Sign request. StorageCredentials credentials = blob.ServiceClient.Credentials; credentials.SignRequest(blobGetRequest); // Read chunk. using (HttpWebResponse response = blobGetRequest.GetResponse() as HttpWebResponse) { using (Stream stream = response.GetResponseStream()) { int offsetInChunk = 0; int remaining = blockIdAndLength.Value; while (remaining > 0) { int read = stream.Read(buffer, offsetInChunk, remaining); lock (outputStream) { outputStream.Position = blockIdAndLength.Key + offsetInChunk; outputStream.Write(buffer, offsetInChunk, read); } offsetInChunk += read; remaining -= read; Interlocked.Add(ref bytesDownloaded, read); } } } } catch (Exception ex) { // Add block back to queue queue.Enqueue(blockIdAndLength); exceptionCount++; // If we have had more than 100 exceptions then break if (exceptionCount == 100) { throw new Exception("Received 100 exceptions while downloading." + ex.ToString()); } if (exceptionCount >= 100) { break; } } } })); } // Wait for all threads to complete downloading data. Task.WaitAll(tasks.ToArray()); } } private void TaskCompletedCallback(IAsyncResult ar) { // get the original worker delegate and the AsyncOperation instance Action<Stream, string> worker = (Action<Stream, string>)((AsyncResult)ar).AsyncDelegate; AsyncOperation async = (AsyncOperation)ar.AsyncState; // finish the asynchronous operation worker.EndInvoke(ar); // clear the running task flag lock (_sync) { TaskIsRunning = false; } // raise the completed event async.PostOperationCompleted(state => OnTaskCompleted((EventArgs)state), new EventArgs()); } protected virtual void OnTaskCompleted(EventArgs e) { if (TransferCompleted != null) TransferCompleted(this, e); } private string GetMD5HashFromStream(byte[] data) { MD5 md5 = new MD5CryptoServiceProvider(); byte[] blockHash = md5.ComputeHash(data); return Convert.ToBase64String(blockHash, 0, 16); } }

Enjoy

Manu

Hosting Classic ASP on Azure

Is it possible to run a classic Asp site on Windows Azure? Of Course it is, Anything that runs on IIs can be hosted in Azure.

So How do we do it?

  1. Create a simple asp page in notepad (e.g. using http://support.microsoft.com/kb/301097)
  2. Create startup.cmd to install ASP engine
    • start /w pkgmgr /iu:IIS-AS
  3. Create a ASP .Net Web role and modify csdef to include a startup task:
    • <Startup>
    • <Task commandLine="startup.cmd" executionContext="elevated" taskType="simple" />
    • </Startup>
  4. Add both files to web role - change properties to 'Content' and 'Copy if New'.
  5. Build and Publish solution to Azure

More info:

 Windows Azure (How-to enable classic ASP support).

Manu

Moles Fails with InteliiTrace On

Moles is a great infrastructure for injecting mock functions into existing code. This is of course very useful when writing unit tests. Unfortunately from version 0.93 (now we are in 0.94) moles fails when IntelliTrace is on.

When running tests that uses moles and IntelliTrace is on the test will abort with the exception: [agent] host process exited with unmanaged profiler failed (-667 - 0xfffffd65)

To fix this simply disable IntelliTrace (Tools|Options|IntelliTrace|Enable IntelliTrace)
and run your tests.

Manu

Scale Data Applications with Sql Azure

When designing a data application, traditionally the bottleneck lies within the database. A traditional on-premises database does not scale very well. It is difficult to split a database between a cluster of machines so usually the database runs on the most powerful machine available which tend to be expensive.

To solve the problem some data processing is moved form its natural environment (i.e. DB) to the business tier which scale much better. The business tier is easy to replicate so it can help reduce the performance bottleneck of the database.

In Sql Azure things are different. Sql Azure is NOT running on simple Sql servers 2008 machines which happen to be on the cloud. Sql Azure has a different architecture. It runs on a grid of servers split into service and platform layers. The platform layer runs the sql engine and the service layer is responsible for the connection and provisioning.

"SQL Azure subscribers access the actual databases, which are stored on multiple machines in the data center, through the logical server. The SQL Azure Gateway service acts as a proxy, forwarding the Tabular Data Stream (TDS) requests to the logical server. It also acts as a security boundary providing login validation, enforcing your firewall and protecting the instances of SQL Server behind the gateway against denial-of-service attacks. The Gateway is composed of multiple computers, each of which accepts connections from clients, validates the connection information and then passes on the TDS to the appropriate physical server, based on the database name specified in the connection"

An important aspect about Sql Azure is the fact that this is a shared infrastructure. You do not own a specific server.  This is why Sql Azure billing is calculated according to the amount of storage you use only and does not take the cpu consumption of the database into consideration.

Another thing to know about Sql Azure is the fact that your databases are replicated across 3 different servers. Large databases can be split horizontally using Sql Azure Sharding.

"Sharding is an application pattern for improving the scalability and throughput of large-scale data solutions. To “shard” an application is the process of breaking an application’s logical database into smaller chunks of data, and distributing the chunks of data across multiple physical databases to achieve application scalability"

Sql Azure in contrast to on-premises databases scales very well.
SQL Azure combined with database sharding techniques provides for virtually unlimited scalability of data for an application.

And a final word about billing: The only compute power customers are charged for in windows azure is on the running roles. Sql Azure CPU is free !!!

So the conclusion is that in cloud applications you might want to consider to move data processing back to the database, use Sql Azure Sharding and save money.

Enjoy

Manu

Application security auditing and logging

Auditing is one of the main pillars of security policies. The question is how to do it wisely

The infrastructure can log almost everything. For example access to files, registry keys databases etc. The problem is that the infrastructure has no knowledge about the application use cases. It means that the context for these logs is missing.

Let us ask what is the purpose of auditing? The trivial reason is to collect information that will be useful in case of a problem, yet how do you know that there is a problem after all?

Auditing can help you identify that you are in an abnormal state and something is wrong.
To do that you have to be able to distinguish between normality and abnormality. The application knows its use cases. It knows their behavior patterns and so can identify that something is wrong. This is why only the application layer can perform smart security auditing.

Application logging and auditing is used to collect enough information to analyze the behavior of the application use cases and identify any abnormal situation, then the information collected with traditional logging is used to identify the problem and find the best action.

Enjoy

Manu

Http error 405 when calling STS

I wrote a simple example for demonstrating delegation with Windows Identity Framework (WIF).
I created a simple web site that used a simple custom STS for authentication. The web site called another web service to calculate a simple calculation (calculator).

The web site used passive federation using a simple asp.net STS. SOAP Web service in general can only use passive federation for authentication, so I created another WCF custom STS.

The idea was that the web application will obtain an act-as token from the active STS using the token it received from the client (using the passive STS) and then call the calculator web service.

When I run the code I got the "http error 405 method not allowed".

Http error 405 means that there is a mismatch between the http verb (e.g. Post, Get) between the call and the service. If the service waits for a http put and we send a http get request this is the error that will be triggered.

My problem was simple. I called the base address of the STS instead of the STS itself. I did that by specifying the base address of the STS in the issuer configuration. When I fixed the address to the real address of the STS the problem was solved.

 <customBinding>
   <binding name="CustomBinding_ICalc">
     <security authenticationMode="IssuedTokenForCertificate" 
messageSecurityVersion="WSSecurity11WSTrust13WSSecureConversation13WSSecurityPolicy12BasicSecurityProfile10"> <issuedTokenParameters keyType="SymmetricKey" tokenType="http://docs.oasis-open.org/wss/oasis-wss-saml-token-profile-1.1#SAMLV1.1"> <issuer address="http://manu-newlap/WCFSTS/service.svc" binding="ws2007HttpBinding" bindingConfiguration="http://manu-newlap/WCFSTS"> </issuer> <issuerMetadata address="http://manu-newlap/WCFSTS/mex" /> </issuedTokenParameters> </security> <httpTransport/> </binding> </customBinding>

The call to the STS to obtain an Act-As token is implemented by sending a http post request that contains the bootstrap token received from the passive STS. The base address does not allow non Get requests so my Post request was rejected.

remember that STS is a simple WCF service. The same rules that applies in simple WCF applies here.

Hope this will help.

Manu

כלכלת ענן

Hi this post will be written in Hebrew (sorry for all English speakers)

כולנו שומעים על ענן מכל כיוון אפשרי. נשאלת השאלה למה?
מה עושה את הענן הזה לשם החם הבא? מה הסוד מאחורי המושג הזה?

הכול מתחיל ונגמר בכסף. הסיפור הוא כלכלי.

מחוק מור למדנו שכל שנתיים מערכות מכפילות את יכולתן הטכנולוגית השאלה שנשאלת מה קורה למחיר? בסביבה הטכנולוגית כיום הלקוח התרגל לא רק לקבל מערכות טובות יותר ככל שהזמן עובר, אלא גם לשלם פחות. זה מוזר מבחינה כלכלית – לקבל יותר אך לשלם פחות.

הכול טוב ויפה מצדו של הצרכן אבל נותן השירות או ספק התוכנה נמצא במצב מביך, הלקוח דורש דרישות הולכות וגדלות אך מצד שני הוא מוכן לשלם פחות. בימינו הצרכן דורש אמינות מושלמת זמינות מוחלטת, ביזור ועמידה בעומסים משתנים. ניתן כמובן לספק יכולות כאלו הבעיה רק שזה ממש יקר. והלקוח כאמור לא מוכן לשלם.

במצב כזה אין ברירה אלא לשנות משהו בסיסי כי במצב הנוכחי פשוט אי אפשר לספק את הדרישות במחירי השוק. פה מגיע הענן.

הענן הוא ההזדמנות של השוק לגדול מבחינה טכנולוגית ועדיין להישאר רלוונטי מבחינה כלכלית. זוהי הסיבה שכולם מדברים על הענן מפני שזו היא התשתית עליה תתבסס התעשייה בעתיד הקרוב.

הרעיון שעומד מאחורי הענן אינו חדש. תשתיות הן דבר יקר ובעל נצילות נמוכה. שיתוף תשתיות הוא פוטנציאל אדיר לחיסכון בכסף. המימוש המפורסם ביותר של הרעיון הזה מגיע מתשתיות החשמל. למי מאתנו יש תחנת כוח ביתית שמספקת את כל תצרוכת החשמל שלנו? לרובנו זו נראית שאלה מגוחכת. למה לי לבנות תשתית יקרה של גנרטורים שיספקו לי הרבה חשמל למזגנים בקיץ ולחימום בחורף ומעט חשמל באביב ובסתיו כשאני יכול בפשטות כה רבה לקנות חשמל בזול מחברת חשמל ולשלם בדיוק לפי הצריכה שלי?

האנלוגיה ברורה. למה לי להקים מרכז מחשוב יקר שיספק לי את הכוח הדרוש לשעות העומס ויעמוד בטל בשאר ימות השנה כשאני יכול בפשטות לקנות כוח מחשוב ואזורי אחסון בענן ולשלם בדיוק לפי צריכה?

כנראה עוד מספר שנים השאלה עם לעבוד בענן תראה טריוויאלית בדיוק כמו השאלה אם לרכוש חשמל מספק מרכזי. כיום אנו בתחילתו של עידן אבל הכיוון ברור והתעשייה כולה מתנהגת בהתאם. ספקי התקשורת (Telecoms) בונים לנו ערוצים ייעודיים לענן כדי להבטיח ביצועים בסביבה מבוזרת. ספקי הענן בונים מרכזי מחשוב (Data Centers) ענקיים ומשקיעים מיליארדים בפיתוח הטכנולוגיה אשר תאפשר לנו בקלות להשתמש בשירותי הענן שלהם. יצרני מערכות ההפעלה וה – Hypervisors בונים תשתיות תוכנה ייעודיות לבניית מרכזי מחשוב ענן (ראה Windows Server 8 כדוגמא). יצרני החומרה בונים רכיבים מיוחדים עבור שרתים בענן וספקי התוכנה מספקים את מרכולתם כ – Software as a Service.

בקיצור הענן הוא הדבר הבא כי הוא מספק את ההזדמנות הכלכלית שכל כך דרושה לתעשיית המידע.

Azure ServiceBus Topic using REST API – Part 4

In the three last post's we learned how to use Azure ServiceBus REST API to send and receive messages using Topics.

The difference between Queues and Topics is the fact that with topics different customers can receive different messages according message filtering which is deployed as filtering rules on the subscription. MSDN describe the following about topics and rules:

image

Topics extend the messaging features provided by Queues with the addition of Publish-Subscribe capabilities.
Each Subscription can define one or multiple Rule entities. Each Rule specifies a filter expression that is used to filter messages that pass through the subscription and a filter action that can modify message properties. In particular, the SqlFilterExpression class allows you to define a SQL92-like condition on message properties:

  • OrderTotal > 5000 OR ClientPriority > 2
  • ShipDestinationCountry = ‘USA’ AND ShipDestinationState = ‘WA’

Conversely, the SqlFilterAction class can be used to modify, add or remove properties, as the filter expression is true and the message selected, using a syntax similar to that used by the SET clause of an UPDATE SQL-command.

  • SET AuditRequired = 1
  • SET Priority = 'High', Severity = 1

Each matching rule generates a separate copy of the published message, so any subscription can potentially generate more copies of the same message, one for each matching rule.

Let us learn how to Add a rule and action using the REST API.

To add a rule to a subscription we need to provide a rule description. To do that I created a rule, action and filter description classes.

Rule Description
  1. /// <summary>
  2. /// Helper class to serialize a rule for a subscription
  3. /// </summary>
  4. [DataContract(Name = "RuleDescription", Namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect")]
  5. public sealed class RuleDescription
  6. {
  7.     /// <summary>
  8.     /// The default name used in creating default rule when adding subscriptions
  9.     /// to a topic. The name is "$Default".
  10.     /// </summary>
  11.     public const string DefaultRuleName = "$Default";
  12.     public RuleDescription()
  13.         : this(new TrueFilter())
  14.     {
  15.     }
  16.     public RuleDescription(Filter filter)
  17.     {
  18.         if (filter == null)
  19.         {
  20.             throw new ArgumentNullException("filter");
  21.         }
  22.         this.Filter = filter;
  23.     }
  24.     [DataMember(Name = "Filter", IsRequired = false, Order = 1001, EmitDefaultValue = false)]
  25.     public Filter Filter { get; set; }
  26.     [DataMember(Name = "Action", IsRequired = false, Order = 1002, EmitDefaultValue = false)]
  27.     public RuleAction Action { get; set; }
  28.     public string TopicPath { getset; }
  29.     public string SubscriptionName { get; set; }
  30.     public string Name { getset; }       
  31.    
  32. }

The rule description contains rule actions.

Rule actions
  1. /// <summary>
  2. /// Helper class to serialize a rule for a subscription
  3. /// </summary>
  4. [DataContract(Name = "RuleAction", Namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect")]
  5. [KnownType(typeof(EmptyRuleAction))]
  6. [KnownType(typeof(SqlRuleAction))]
  7. public abstract class RuleAction
  8. {
  9.     internal RuleAction()
  10.     {
  11.     }
  12. }
  13. /// <summary>
  14. /// Helper class to serialize a rule for a subscription
  15. /// </summary>
  16. [DataContract(Name = "EmptyRuleAction", Namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect")]
  17. public sealed class EmptyRuleAction : RuleAction
  18. {
  19. }
  20. /// <summary>
  21. /// Helper class to serialize a rule for a subscription
  22. /// </summary>
  23. [DataContract(Name = "SqlRuleAction", Namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect")]
  24. public sealed class SqlRuleAction : RuleAction
  25. {
  26.     public const int DefaultCompatibilityLevel = 20;
  27.     public SqlRuleAction(string sqlExpression)
  28.     {
  29.         if (string.IsNullOrEmpty(sqlBLOCKED EXPRESSION
  30.         {
  31.             throw new ArgumentNullException("sqlExpression");
  32.         }
  33.         this.SqlExpression = sqlExpression;
  34.         this.CompatibilityLevel = DefaultCompatibilityLevel;
  35.     }
  36.     [DataMember(Order = 0x10001)]
  37.     public string SqlExpression { get; private set; }
  38.     [DataMember(Order = 0x10002)]
  39.     public int CompatibilityLevel { get; private set; }
  40. }

The rule description contains rule filters

Rule Filters
  1. /// <summary>
  2. /// Helper class to serialize a rule for a subscription
  3. /// </summary>
  4. [DataContract(Name = "Filter", Namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect")]
  5. [KnownType(typeof(SqlFilter))]
  6. [KnownType(typeof(TrueFilter))]
  7. [KnownType(typeof(FalseFilter))]
  8. [KnownType(typeof(CorrelationFilter))]
  9. public abstract class Filter
  10. {
  11.     internal Filter()
  12.     {
  13.     }
  14. }
  15. /// <summary>
  16. /// Helper class to serialize a rule for a subscription
  17. /// </summary>
  18. [DataContract(Name = "SqlFilter", Namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect")]
  19. [KnownType(typeof(TrueFilter))]
  20. [KnownType(typeof(FalseFilter))]
  21. public class SqlFilter : Filter
  22. {
  23.     public const int DefaultCompatibilityLevel = 20;
  24.     public SqlFilter(string sqlExpression)
  25.     {
  26.         if (String.IsNullOrEmpty(sqlBLOCKED EXPRESSION
  27.         {
  28.             throw new ArgumentNullException("sqlExpression");
  29.         }
  30.         this.SqlExpression = sqlExpression;
  31.         this.CompatibilityLevel = DefaultCompatibilityLevel;
  32.     }
  33.     [DataMember(Order = 0x10001)]
  34.     public string SqlExpression { get; set; }
  35.     [DataMember(Order = 0x10002)]
  36.     public int CompatibilityLevel { get; set; }
  37. }
  38. /// <summary>
  39. /// Helper class to serialize a rule for a subscription
  40. /// </summary>
  41. [DataContract(Name = "TrueFilter", Namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect")]
  42. public sealed class TrueFilter : SqlFilter
  43. {
  44.     public TrueFilter()
  45.         : base("1=1")
  46.     {
  47.     }
  48. }
  49. /// <summary>
  50. /// Helper class to serialize a rule for a subscription
  51. /// </summary>
  52. [DataContract(Name = "FalseFilter", Namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect")]
  53. public sealed class FalseFilter : SqlFilter
  54. {
  55.     public FalseFilter()
  56.         : base("1=0")
  57.     {
  58.     }
  59. }
  60. /// <summary>
  61. /// Helper class to serialize a rule for a subscription
  62. /// </summary>
  63. [DataContract(Name = "CorrelationFilter", Namespace = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect")]
  64. public sealed class CorrelationFilter : Filter
  65. {
  66.     public CorrelationFilter(string correlationId)
  67.     {
  68.         this.CorrelationId = correlationId;
  69.     }
  70.     [DataMember(Order = 0x10001)]
  71.     public string CorrelationId { get; private set; }
  72. }

Now we are ready to add a new rule using REST API

Add a Rule
  1. public static void CreateRule(string serviceNamespace, string topicName, string subscriptionName, RuleDescription description , string token)
  2. {            
  3.     var ruleAddress = string.Format("https://{0}.servicebus.windows.net/{1}/subscriptions/{2}/rules/{3}", serviceNamespace, topicName, subscriptionName, description.Name);
  4.     try
  5.     {
  6.         var defautRuleAddress = string.Format("https://{0}.servicebus.windows.net/{1}/subscriptions/{2}/rules/$Default", serviceNamespace, topicName, subscriptionName);
  7.         DeleteResource(defautRuleAddress, token);
  8.     }
  9.     catch (WebException ex)
  10.     {
  11.         //The default rule was already deleted
  12.         if (ex.Message != "The remote server returned an error: (404) Not Found.")
  13.             throw;
  14.     }                       
  15.     WebClient webClient = new WebClient();
  16.     webClient.Headers[HttpRequestHeader.Authorization] = token;
  17.     webClient.Headers[HttpRequestHeader.ContentType] = @"application/atom+xml;type=entry;charset=utf-8";
  18.     var ruleDescription = Encoding.UTF8.GetString(Serialize<RuleDescription>(description));
  19.     var putData = @"<entry xmlns='http://www.w3.org/2005/Atom'><content type='application/xml'>" + ruleDescription  + @"</content></entry>";
  20.     webClient.UploadData(ruleAddress, "PUT", Encoding.UTF8.GetBytes(putData));
  21. }

As you can see the first thing we do is delete the default rule that accepts all messages. Then we attach the serialization of our rule description to a PUT Http request we send to the URI of the new rule at:
https://{serviceNamespace}.servicebus.windows.net/{topicName}/subscriptions/{subscriptionName}/rules/{ruleName}

That is it. We have a new rule on our subscription.

With that I end the Series of post about using REST API of AppFabric Topics.

Enjoy

Manu

Azure ServiceBus Topic using REST API – Part 3

In the last two posts we showed how to use the Azure ServiceBus service bus REST API to send a message to a topic.
In this post we will see how to listen on a topic and receive a message.

There are two options:

1. Receive a message and delete it from the topic.

Receive and Delete
  1. public static T ReceiveAndDeleteMessage<T>(string serviceNamespace, string topicName,
  2.     string subscriptionName, string token) where T : class
  3. {
  4.     var address = string.Format("https://{0}.{1}/{2}/subscriptions/{3}/messages/head?timeout=10",
  5.         serviceNamespace, sbHostName, topicName, subscriptionName);
  6.     WebClient webClient = new WebClient();
  7.     webClient.Headers[HttpRequestHeader.Authorization] = token;
  8.     byte[] response = webClient.UploadData(address, "DELETE", new byte[0]);
  9.     return Deserialize<T>(response);            
  10. }

2. Receive the message but keep it locked on the queue until we decide to delete it (for example after message processing succeeded).

Receive and Peek
  1. public static T ReceiveAndPeekMessage<T>(string serviceNamespace, string topicName,
  2.     string subscriptionName, string token) where T : class
  3. {
  4.     var address = string.Format("https://{0}.{1}/{2}/subscriptions/{3}/messages/head?timeout=10",
  5.         serviceNamespace, sbHostName, topicName, subscriptionName);
  6.     WebClient webClient = new WebClient();
  7.     webClient.Headers[HttpRequestHeader.Authorization] = token;
  8.     byte[] response = (webClient.UploadData(address, "POST", new byte[0]));
  9.     //use the information in the BrokerProperties to unlock the message
  10.     var serilaizedBrokeredProperties = webClient.ResponseHeaders["BrokerProperties"];
  11.     BrokerProperties brokerProperties = BrokerProperties.Deserialize(serilaizedBrokeredProperties);            
  12.    
  13.     return Deserialize<T>(response);
  14. }

The following table describe the information that will be placed in the brokerProperties object 

Response Header

Description

BrokerProperties{MessageLocation}

The URI of the locked message. This URI is needed to unlock or delete the message.

BrokerProperties{LockTocken}

The lock ID for the locked message. This ID is needed to delete the locked message.

BrokerProperties{LockLocation}

The lock URI for the locked message. This URI is needed to unlock the message for processing by other receivers. The lock URI will be the following format: https://{serviceNamespace}.appfabric.Windows.net/{topic-path}/subscriptions/{subscription-name}/messages/{message-id}/{lock-id}.

To delete a lock or delete message (after the lock is deleted) I will use a general method for deleting service bus resources

Delete Resources
  1. private static void DeleteResource(string address, string token)
  2. {
  3.     WebClient webClient = new WebClient();
  4.     webClient.Headers[HttpRequestHeader.Authorization] = token;
  5.     webClient.UploadData(address, "DELETE", new byte[0]);
  6. }

That's all we received a message. On the next post we will learn how to create a rule for a subscription.

Enjoy

manu

Azure ServiceBus Topic using REST API – Part 2

In the last post I described why to use REST API when working with Azure service bus and demonstrated how to create a topic and subscription.

In this post we will actually send a message to the topic.

When sending a http request with a message to a topic we have to provide two special headers
1. Authorization header with the token we received from ACS
2.  BrokerProperties with Json serialization of metadata of metadata about our message.

To do that let us create a special class that will help us represent and serialize the metadata.

Broker Properties
  1. /// <summary>
  2. /// Container for general properties of messages that are placed in the "BrokerProperties" http header and can be used to filter messages
  3. /// </summary>
  4. [DataContract]
  5. public sealed class BrokerProperties
  6. {
  7.     private static readonly DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(BrokerProperties));
  8.     [DataMember(EmitDefaultValue = false)]
  9.     public string CorrelationId { get; set; }
  10.     [DataMember(EmitDefaultValue = false)]
  11.     public string SessionId { get; set; }
  12.     [DataMember(EmitDefaultValue = false)]
  13.     public int? DeliveryCount { get; set; }
  14.     [DataMember(EmitDefaultValue = false)]
  15.     public Guid? LockToken { get; set; }
  16.     [DataMember(EmitDefaultValue = false)]
  17.     public string MessageId { get; set; }
  18.     [DataMember(EmitDefaultValue = false)]
  19.     public string Label { get; set; }
  20.     [DataMember(EmitDefaultValue = false)]
  21.     public string ReplyTo { get; set; }
  22.     [DataMember(EmitDefaultValue = false)]
  23.     public long? SequenceNumber { get; set; }
  24.     [DataMember(EmitDefaultValue = false)]
  25.     public string To { get; set; }
  26.     public DateTime? LockedUntilUtc { get; set; }
  27.     public DateTime? ScheduledEnqueueTimeUtc { get; set; }
  28.     public TimeSpan? TimeToLive { get; set; }
  29.     [DataMember(EmitDefaultValue = false)]
  30.     public string ReplyToSessionId { get; set; }
  31.     [DataMember(Name = "LockedUntilUtc", EmitDefaultValue = false)]
  32.     public string LockedUntilUtcString
  33.     {
  34.         get
  35.         {
  36.             if (this.LockedUntilUtc != null && this.LockedUntilUtc.HasValue)
  37.             {
  38.                 return this.LockedUntilUtc.Value.ToString("r");
  39.             }
  40.             return null;
  41.         }
  42.         set
  43.         {
  44.             try
  45.             {
  46.                 // When deserializing from JSON format, attempt to parse the value.
  47.                 // If the value cannot be parsed as a date time, ignore it.
  48.                 this.LockedUntilUtc = DateTime.Parse(value, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind);
  49.             }
  50.             catch
  51.             {
  52.             }
  53.         }
  54.     }
  55.     [DataMember(Name = "ScheduledEnqueueTimeUtc", EmitDefaultValue = false)]
  56.     public string ScheduledEnqueueTimeUtcString
  57.     {
  58.         get
  59.         {
  60.             if (this.ScheduledEnqueueTimeUtc != null && this.ScheduledEnqueueTimeUtc.HasValue)
  61.             {
  62.                 return this.ScheduledEnqueueTimeUtc.Value.ToString("r");
  63.             }
  64.             return null;
  65.         }
  66.         set
  67.         {
  68.             try
  69.             {
  70.                 // When deserializing from JSON format, attempt to parse the value.
  71.                 // If the value cannot be parsed as a date time, ignore it.
  72.                 this.ScheduledEnqueueTimeUtc = DateTime.Parse(value, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind);
  73.             }
  74.             catch
  75.             {
  76.             }
  77.         }
  78.     }
  79.     [DataMember(Name = "TimeToLive", EmitDefaultValue = false)]
  80.     public double TimeToLiveString
  81.     {
  82.         get
  83.         {
  84.             if (this.TimeToLive != null && this.TimeToLive.HasValue)
  85.             {
  86.                 return this.TimeToLive.Value.TotalSeconds;
  87.             }
  88.             return 0;
  89.         }
  90.         set
  91.         {
  92.             // This is needed as TimeSpan.FromSeconds(TimeSpan.MaxValue.TotalSeconds) throws Overflow exception.
  93.             if (TimeSpan.MaxValue.TotalSeconds == value)
  94.             {
  95.                 this.TimeToLive = TimeSpan.MaxValue;
  96.             }
  97.             else
  98.             {
  99.                 this.TimeToLive = TimeSpan.FromSeconds(value);
  100.             }
  101.         }
  102.     }
  103.     public static BrokerProperties Deserialize(string jsonString)
  104.     {
  105.         using (MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes(jsonString)))
  106.         {
  107.             return (BrokerProperties)serializer.ReadObject(ms);
  108.         }
  109.     }
  110.     public string Serialize()
  111.     {
  112.         using (MemoryStream memoryStream = new MemoryStream())
  113.         {
  114.             serializer.WriteObject(memoryStream, this);
  115.             memoryStream.Position = 0;
  116.             using (StreamReader reader = new StreamReader(memoryStream))
  117.             {
  118.                 return reader.ReadToEnd();
  119.             }
  120.         }
  121.     }
  122. }

Now we know how to create a token (previous post) and represent metadata. so let us send a message

Code Snippet
  1. public static void SendMessage<T>(string serviceNamespace, string topicName,
  2.     T content, string token, string receiverName) where T : class
  3. {
  4.     var fullAddress = string.Format("https://{0}.{1}/{2}//messages?timeout=60",
  5.         serviceNamespace, sbHostName, topicName);
  6.     BrokerProperties brokeredProparties = new BrokerProperties();
  7.     brokeredProparties.To = receiverName;
  8.     WebClient webClient = new WebClient();
  9.     webClient.Headers[HttpRequestHeader.Authorization] = token;            
  10.     webClient.Headers["BrokerProperties"] = brokeredProparties.Serialize();
  11.                 
  12.     //All the message's properties are propagated so they can be used in a Sql Filter
  13.     foreach (var prop in typeof(T).GetProperties())
  14.     {
  15.         webClient.Headers[prop.Name] = prop.GetValue(content, null).ToString();
  16.     }            
  17.     webClient.UploadData(fullAddress, "POST", Serialize(content));
  18. }

As you can see I put the receiver name in the "To" properties of the BrokerProperties object. This will allow us later to create a rule that filter the messages accordingly (e.g. To=receiver)

Also you can see that I propagated all of the message's properties. A propagated property is placed as a Http header and provide the ability to later filter messages according to its value by creating a relevant rule.
In production I assume that you will propagate only the properties you need !!!

The body message contains the DataContract Serialization of our data.
All we need to do is send the Http Request (using POST verb) to our topic at:
https://{namespace}.{appfabric.windows.net}/{topicname}//messages?timeout=60

Serialization
  1. private static byte[] Serialize<T>(T content) where T : class
  2. {
  3.     try
  4.     {
  5.         var serializer = new DataContractSerializer(typeof(T));
  6.         MemoryStream memstream = new MemoryStream();
  7.         var writer = XmlDictionaryWriter.CreateTextWriter(memstream, Encoding.UTF8);
  8.         serializer.WriteObject(writer, content);
  9.         writer.Flush();
  10.         return memstream.ToArray();
  11.     }
  12.     catch (Exception)
  13.     {
  14.         // log this
  15.         return null;
  16.     }           
  17. }


Enjoy

Manu

My Application Security Talk on Canada's TechDays

Starting the end of February, Canada's TechDays TV will air brand new TechDays sessions (exclusive to TechDays Online). The experts will be LIVE and INTERACTIVE which means that throughout the session, as well as after the session, you’ll be able to post your questions via chat or Twitter and have them answered in real-time.

To launch this new TV channel, MS Canada chose me to talk about "Securing.NET Applications", In this session I discuss a range of security topics including: what is application security, security design and the SDL, identity Management, role-based security and claim based identity, cryptography, secure communication, and input validation. By the end of this session, I emphasized that .NET provides the tools and technologies to implement mitigations for threats identified during the security design.

If you want to watch the recording here

Enjoy

Manu

Azure ServiceBus Topic using REST API – Part 1

To use Service Bus, we must ensure that our firewall allows outgoing TCP communication on TCP ports 9350 to 9354. This is not a very strong request yet some customers cannot touch their firewall policy and so their firewall blocks all interaction with the service bus. In such scenarios (any many others) AppFabric REST API is the only alternative to work with the service bus.

When working with the REST API we have to construct Http request to the service bus resource we want to work with. Unfortunately the REST API documentation is not accurate. In this post I will describe how to execute basic operation against service bus topics using REST API.

All operations against the service bus requires authentication. To do that we have to call Access Control Service (ACS), prove our identity and receive a token which will be presented to Service bus. In a previous post ("Securing AppFabric Service Bus with ACS") I described in details how to configure ACS.

Let us call ACS and receive a token:

Create a Token
  1. public static string GetToken(string serviceNamespace, string issuerName, string issuerSecret)
  2. {
  3.     var acsEndpoint = string.Format(@"https://{0}-sb.{1}/WRAPv0.9/", serviceNamespace, acsHostName);
  4.     // Note that the realm used when requesting a token uses the HTTP scheme, even though
  5.     // calls to the service are always issued over HTTPS
  6.     var realm = string.Format(@"http://{0}.{1}/", serviceNamespace, sbHostName);
  7.     var values = new NameValueCollection();
  8.     values.Add("wrap_name", issuerName);
  9.     values.Add("wrap_password", issuerSecret);
  10.     values.Add("wrap_scope", realm);
  11.     WebClient webClient = new WebClient();
  12.     byte[] response = webClient.UploadValues(acsEndpoint, values);
  13.     string responseString = Encoding.UTF8.GetString(response);
  14.     var responseProperties = responseString.Split('&');
  15.     var tokenProperty = responseProperties[0].Split('=');
  16.     var token = Uri.UnescapeDataString(tokenProperty[1]);
  17.     return "WRAP access_token=\"" + token + "\"";
  18. }

Now Let us create a topic

Create a Topic
  1. public static string CreateTopic(string serviceNamespace, string topicName, string token)
  2. {
  3.     var topicAddress = string.Format("https://{0}.{1}/{2}", serviceNamespace, sbHostName, topicName);
  4.     WebClient webClient = new WebClient();
  5.     webClient.Headers[HttpRequestHeader.Authorization] = token;
  6.     // Prepare the body of the create queue request
  7.     var putData = @"<entry xmlns=""http://www.w3.org/2005/Atom"">
  8.                                   <title type=""text"">" + topicName + @"</title>
  9.                                   <content type=""application/xml"">
  10.                                     <TopicDescription xmlns:i=""http://www.w3.org/2001/XMLSchema-instance"" xmlns=""http://schemas.microsoft.com/netservices/2010/10/servicebus/connect"" />
  11.                                   </content>
  12.                                 </entry>";
  13.     byte[] response = webClient.UploadData(topicAddress, "PUT", Encoding.UTF8.GetBytes(putData));
  14.     return Encoding.UTF8.GetString(response);
  15. }

Before sending a message to the topic let us create a subscription

Create a Subscription
  1. public static string CreateSubscription(string serviceNamespace, string topicName, string subscriptionName, string token)
  2. {           
  3.     var subscriptionAddress = string.Format("https://{0}.{1}/{2}/subscriptions/{3}", serviceNamespace, sbHostName, topicName, subscriptionName);
  4.      
  5.     WebClient webClient = new WebClient();
  6.     webClient.Headers[HttpRequestHeader.Authorization] = token;
  7.    
  8.     // Prepare the body of the create queue request
  9.     var putData = @"<entry xmlns=""http://www.w3.org/2005/Atom"">
  10.                                   <title type=""text"">" + subscriptionName + @"</title>
  11.                                   <content type=""application/xml"">
  12.                                     <SubscriptionDescription xmlns:i=""http://www.w3.org/2001/XMLSchema-instance"" xmlns=""http://schemas.microsoft.com/netservices/2010/10/servicebus/connect"" />
  13.                                   </content>
  14.                                 </entry>";
  15.     byte[] response = webClient.UploadData(subscriptionAddress, "PUT", Encoding.UTF8.GetBytes(putData));
  16.     return Encoding.UTF8.GetString(response);            
  17. }

Now we are ready to send a message. This will be described in the next post.

Enjoy

Manu

Windows Azure Benchmarks - Part 17: Compare Storage types performance

One of the most common questions customers ask is how does windows azure perform.
Well Microsoft published a series of benchmarks in an excellent web site called azurescope.
The thing is azurescope is going down on January 15th 2012 so I decided to publish these benchmarks here and make sure they will be available to the public.

Let us discuss storage types and compare their performance.

Charts on this page compare the maximum throughput achieved for the various storage types irrespective of the VM Size or values of other parameters on which the test was run. Please refer individual test cases for detailed information.

Key Analysis
  • Page Blobs provide better throughput than Block Blobs.
  • In best cases, Windows Azure Drives can deliver similar performance as that of a local disk.
  • The observed write performance of Sql Azure is better than Windows Azure Tables while the read performance of Windows Azure Tables is better than Sql Azure.

image

image

Enjoy

Manu

Windows Azure Benchmarks - Part 16: Sql Azure write latency

One of the most common questions customers ask is how does windows azure perform.
Well Microsoft published a series of benchmarks in an excellent web site called azurescope.
The thing is azurescope is going down on January 15th 2012 so I decided to publish these benchmarks here and make sure they will be available to the public.

Let us discuss Sql Azure write latency.

This test measures the write latency observed from a Windows Azure worker role instance while accessing SQL Azure.

Key Analysis
  • The number of pre-populated items have almost no affect on the observed latencies

image

Enjoy

Manu

More Posts Next page »