Using AppFabric Azure Topics
Recently the AppFabric team released real pub-sub capabilities with the new queues and topics features.
Queues are simple. AppFabric provides a queue for durable messaging between senders and receivers. Topics are interesting because they allows to create different subscriptions so different clients can receive only the messages they need.
To demonstrate how to use topics I created two helper classes: Senders and Receiver that will send and receive messages using AppFabric Topics.
Sender
- /// <summary>
- /// Send messages to AppFabric Topic
- /// </summary>
- /// <typeparam name="T"></typeparam>
- public class ServiceBusSender<T> : IBrokeredMessageSender<T>
- {
- private string m_ServiceNamespace;
- private string m_IssuerName;
- private string m_IssuerKey;
- private Uri m_serviceUri;
- private TokenProvider m_tokenProvider;
- private TopicDescription m_topicDescriptor;
-
- /// <summary>
- /// Constructor
- /// </summary>
- /// <param name="topicName">The name of the topic to use</param>
- public ServiceBusSender(string topicName)
- {
- m_ServiceNamespace = ServiceBusConfigurationAccessor.AppFabricServiceNamespace;
- m_IssuerName = ServiceBusConfigurationAccessor.AppFabricIssuerName;
- m_IssuerKey = ServiceBusConfigurationAccessor.AppFabricIssuerKey;
-
- m_tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(m_IssuerName, m_IssuerKey);
- m_serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", m_ServiceNamespace, string.Empty);
- NamespaceManager namespaceManager = new NamespaceManager(m_serviceUri, m_tokenProvider);
-
- if (!namespaceManager.TopicExists(topicName))
- m_topicDescriptor = namespaceManager.CreateTopic(topicName);
- else
- m_topicDescriptor = namespaceManager.GetTopic(topicName);
-
- }
-
- /// <summary>
- /// Send a message to the topic
- /// </summary>
- /// <param name="messageContent"></param>
- public void SendMessage(T messageContent)
- {
- Contract.Requires(messageContent != null);
-
- MessagingFactory factory = MessagingFactory.Create(m_serviceUri, m_tokenProvider);
- TopicClient myTopicClient = factory.CreateTopicClient(m_topicDescriptor.Path);
- BrokeredMessage message = new BrokeredMessage(messageContent);
-
- //All the message's properties are propagated so they can be used in a Sql Filter
- foreach (var prop in typeof(T).GetProperties())
- {
- message.Properties.Add(prop.Name, prop.GetValue(messageContent, null));
- }
-
- myTopicClient.Send(message);
- myTopicClient.Close();
- }
-
- }
As you can see the sender creates the topic (if it does not exist already). A messaging factory to create a client that can send messages to the topic. For that we need to supply the credentials for our namespace when we create the factory. To send a message to the topic we have to wrap our data in a BrokeredMessage. One of the capabilities we gain when doing that is properties propagation. A propagated property can participate a sql-like rule for filtering messages (will be defined by the client's subscription). In this sender we propagate all the properties. In real life you might choose to propagate only a the properties that participate in filtering because propagation costs.
Receiver
- /// <summary>
- /// Listen on an AppFabric service bus topic. One receiver can listen on multiple subscriptions
- /// </summary>
- /// <typeparam name="T"></typeparam>
- public class ServiceBusReceiver<T> : IBrokeredMessageReceiver<T>
- {
- private string m_ServiceNamespace;
- private string m_IssuerName;
- private string m_IssuerKey;
- private Dictionary<string, SubscriptionClient> m_subscriptions;
- private MessagingFactory m_factory;
- private Uri m_serviceUri;
- private TokenProvider m_tokenProvider;
-
- /// <summary>
- /// The name of the topic to use
- /// </summary>
- public string TopicName { get; set; }
-
-
- /// <summary>
- /// Constructor
- /// </summary>
- /// <param name="topicName">The name of the topic to listen to</param>
- public ServiceBusReceiver(string topicName)
- {
- m_subscriptions = new Dictionary<string, SubscriptionClient>();
- m_ServiceNamespace = ServiceBusConfigurationAccessor.AppFabricServiceNamespace;
- m_IssuerName = ServiceBusConfigurationAccessor.AppFabricIssuerName;
- m_IssuerKey = ServiceBusConfigurationAccessor.AppFabricIssuerKey;
- m_tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(m_IssuerName, m_IssuerKey);
- m_serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", m_ServiceNamespace, string.Empty);
- m_factory = MessagingFactory.Create(m_serviceUri, m_tokenProvider);
- TopicName = topicName;
- }
-
- /// <summary>
- /// Create a new subscription
- /// </summary>
- /// <param name="subscriptionName">The name of the subscription to create</param>
- /// <param name="filter">filter to use for filtering messages</param>
- public void InitSubscription(string subscriptionName, Filter filter=null)
- {
- Contract.Requires(!string.IsNullOrEmpty(subscriptionName));
- Contract.Requires(!string.IsNullOrEmpty(TopicName), "Topic name must be provided. Use the receiver's TopicName property");
-
- var namespaceManager = new NamespaceManager(m_serviceUri, m_tokenProvider);
-
- if (!namespaceManager.TopicExists(TopicName))
- namespaceManager.CreateTopic(TopicName);
-
- if (!namespaceManager.SubscriptionExists(TopicName, subscriptionName))
- {
- filter = (filter == null) ? new TrueFilter() : filter;
- namespaceManager.CreateSubscription(TopicName, subscriptionName, filter);
- }
-
- if (!m_subscriptions.ContainsKey(subscriptionName))
- m_subscriptions.Add(subscriptionName, m_factory.CreateSubscriptionClient(TopicName, subscriptionName, ReceiveMode.ReceiveAndDelete));
- }
-
- /// <summary>
- /// Delete a new subscription from the topic
- /// </summary>
- /// <param name="subscriptionName">The name of the subscription to delete</param>
- public void DeleteSubscription(string subscriptionName)
- {
- Contract.Requires(!string.IsNullOrEmpty(subscriptionName));
- Contract.Requires(!string.IsNullOrEmpty(TopicName), "Topic name must be provided. Use the receiver's TopicName property");
-
-
- var namespaceManager = new NamespaceManager(m_serviceUri, m_tokenProvider);
- if (namespaceManager.TopicExists(TopicName) && namespaceManager.SubscriptionExists(TopicName, subscriptionName))
- namespaceManager.DeleteSubscription(TopicName, subscriptionName);
- }
-
-
- /// <summary>
- /// Start listening on a subscription
- /// </summary>
- /// <param name="subscriptionName">The name of the subscription to listen to</param>
- /// <param name="processMessage">A function to trigger for each comming message</param>
- public void ReceiveMessages(string subscriptionName, Action<T> processMessage)
- {
- Contract.Requires(!string.IsNullOrEmpty(subscriptionName));
- Contract.Requires(m_subscriptions.ContainsKey(subscriptionName));
-
- try
- {
- Task.Factory.StartNew(() =>
- {
- while (m_subscriptions.ContainsKey(subscriptionName))
- {
- BrokeredMessage message;
- while ((message = m_subscriptions[subscriptionName].Receive(TimeSpan.FromSeconds(5))) != null)
- {
- processMessage(message.GetBody<T>());
- }
- }
- });
- }
- catch (Exception ex)
- {
- // Log this
- throw;
- }
-
- }
-
-
- /// <summary>
- /// Stop Listening on a subscription and delete it from the current subscriptions list
- /// </summary>
- /// <param name="subscriptionName">The name of the subscription</param>
- public void StopReceiveNessages(string subscriptionName)
- {
- if (!m_subscriptions.ContainsKey(subscriptionName))
- return;
-
- var subscriptionClient = m_subscriptions[subscriptionName];
-
- if ((subscriptionClient != null) && (!subscriptionClient.IsClosed))
- subscriptionClient.Close();
-
- subscriptionClient = null;
- m_subscriptions.Remove(subscriptionName);
- }
-
-
- }
The receiver can handle a collection of subscriptions. (Use InitSubscription to create one).
After a subscription is created call ReceiveMessages and provide a method to run for each message received. The receiver will constantly pool the topic and wait for a message as long as the subscription exist. When a message arrives it will extract the data object out of the brokered message at trigger the method you provided on the data.
The following test demonstrate how to use the sender and receiver.
Code Snippet
- [TestMethod]
- public void ServiceBusSenderAndReceiverTest()
- {
- var sender = new ServiceBusSender<TestMessage>("testTopic");
- var receiver = new ServiceBusReceiver<TestMessage>("testTopic");
- var finished = new AutoResetEvent(false);
-
- receiver.InitSubscription("testSubscription");
- receiver.ReceiveMessages("testSubscription",
- (msg) => { Assert.AreEqual(39, msg.Age); Assert.AreEqual("manu", msg.Name); finished.Set(); });
- sender.SendMessage(new TestMessage() { Name = "manu", Age = 39 });
-
- finished.WaitOne();
- receiver.DeleteSubscription("testSubscription");
- }
Enjoy
Manu