WCF Router and Publish/Subscribe Sample Implementation

March 15, 2008

tags:
4 comments

A WCF intermediary router is available on MSDN as a sample.  The sample demonstrates what you would do to implement routing logic from a client to a destination service.  It also builds the groundwork for implementing other SOAP intermediaries, such as those that cache message responses, validate incoming messages, load-balance requests across multiple servers, and several additional scenarios.  However, it is slightly complicated if all you need a router to do is forward requests from one place to another.

Additionally, Juwal Lowy’s “WCF Essentials” article on the October 2006 issue of MSDN Magazine provides the foundations of building a WCF publish/subscribe architecture.  However, it lacks in one area – services need to explicitly implement subscription and publishing logic for the specific event contracts they are interested in.  I was pursuing a generic solution where a single service could provide subscription and publishing services for a generic contract.

So let’s look into both of these subjects.  Router first.  All we need to do is provide a service which has an untyped contract, accepting the Message class.  As the SOAP “Action” header we will specify “*”, meaning that the router is completely contract-independent.

[ServiceContract]
public interface IRouter
{
    [OperationContract(Action = "*", ReplyAction = "*")]
    Message Action(Message msg);
}

The router implementing this contract will simply create a channel to the actual destination and forward the message there:

[ServiceBehavior(InstanceContextMode=InstanceContextMode.Single,
    ConcurrencyMode=ConcurrencyMode.Multiple)]
class Router : IRouter
{
    ChannelFactory<IRouter> _forwardCF;

    public Router(Binding binding, Uri forwardTo)
    {
        _forwardCF = new ChannelFactory<IRouter>
            (binding, new EndpointAddress(forwardTo));
        _forwardCF.Endpoint.Behaviors.Add(new MustUnderstandBehavior(false));
    }

    public Message Action(Message msg)
    {
        IRouter target = _forwardCF.CreateChannel();
        try
        {
            return target.Action(msg);
        }
        finally
        {
            ((ICommunicationObject)target).Close();
        }
    }
}

Note that there’s just a single instance of the router, and its concurrency mode allows multiple clients to enter it.  The router itself is completely stateless and therefore we don’t mind multiple threads – if the target requires synchronization then it’s the target’s responsibility, not the router’s.  Another detail is the MustUnderstandBehavior set to disable validation – the router doesn’t understand the request and reply messages, and if we omit the behavior we will get a ProtocolException.

This is just the basic skeleton: If we want dynamic routing, we can implement a routing table; if we want the client to provide the target URI, we can take it from the incoming message headers; if we want load balancing, . . .  You get the idea.

Publish/subscribe next.  What we need to do here is refine the router so that there’s a publish/subscribe mechanism built-in the routing service.  Whenever a published message arrives, it will be inspected and forwarded to the registered subscribers.  So this is what the contract should look like:

[ServiceContract]
public interface ISubscribe
{
    [OperationContract]
    void Subscribe(string action, string ea);
    [OperationContract]
    void Unsubscribe(string action, string ea);
}

[ServiceContract]
public interface IPublish
{
    [OperationContract(Action = "*", IsOneWay = true)]
    void Publish(Message msg);
}

The service itself is fairly easy, because all we need to do is distribute requests to all registered endpoints in the generic way we’ve seen earlier.  So as far as we’re concerned, we’re treating the endpoints as IPublish and forwarding the same message we received.

[ServiceBehavior(InstanceContextMode=InstanceContextMode.Single,
    ConcurrencyMode=ConcurrencyMode.Multiple)]
class PubSubService : ISubscribe, IPublish
{
    Dictionary<string, List<EndpointAddress>> _subscribers =
        new Dictionary<string, List<EndpointAddress>>();

    public void Subscribe(string action, string ea)
    {
        lock (_subscribers)
        {
            List<EndpointAddress> ealist;
            if (!_subscribers.TryGetValue(action, out ealist))
            {
                ealist = new List<EndpointAddress>();
                _subscribers.Add(action, ealist);
            }
            ealist.Add(new EndpointAddress(ea));
        }
    }

    public void Unsubscribe(string action, string ea)
    {
        lock (_subscribers)
        {
            //Add error handling
            _subscribers[action].Remove(new EndpointAddress(ea));
        }
    }

    public void Publish(Message msg)
    {
        List<EndpointAddress> targets;
        lock (_subscribers)
        {
           //Make a copy of the collection
           targets = _subscribers[OperationContext.Current.
                IncomingMessageHeaders.Action].ToList();
        }

        foreach (EndpointAddress ea in targets)
        {
            IPublish pub = ChannelFactory<IPublish>.CreateChannel(
                new NetTcpBinding(), ea);
            pub.Publish(msg);
        }
    }
}

Again, the service is a singleton but multiple thread access is allowed to scale during the async publishing operation.  However, a lock must be applied whenever modifying the registration information.  (Since this should happen less frequently than publishing, it’s not a big overhead.)

Clearly, there’s lots to add here – we need to handle distribution failures and transparently remove subscriptions or provide a removal policy, there’s room for caching the distribution channels, the solution is bound to NetTcpBinding only – but again, we have the skeleton.

An important requirement is persisting the subscriptions.  In a long-running system, the pub/sub infrastructure is a lasting component which must survive restarts.  Therefore, storing the state in a Dictionary<> is not the best option.  We could either provide serialization to a durable store (like a DB) on each operation, or use .NET 3.5 Durable Services to accomplish the same goal.  I will explore making the pub/sub a durable service in a future post.

Wrapping it up – you can download the sample code from my SkyDrive.  It’s really simple to create a truly scalable enterprise-level system using out-of-the-box WCF services.  However, we can expect more from the future – ESB, ISB and of course Oslo are supposed to make extinct the kind of infrastructure code we have just seen.

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*

4 comments

  1. Udi DahanMarch 28, 2008 ב 10:37 AM

    What you’re describing is a broker and not a bus architectural style. In the broader sense of autonomous services, each would have its own set of requirements of pub/sub – durable or not, protocol, etc. This would lead you to having one of your brokers per service.

    Thoughts?

    Reply
  2. Sasha GoldshteinMarch 30, 2008 ב 11:16 PM

    Udi, I agree with you entirely. It has not been my intention to show a full-blown bus architecture (note that nowhere in my post did I mention the word “bus”).

    What I wanted to show is twofold: (a) You can easily leverage WCF to build out-of-the-box solutions which could be a great fit for a specific scenario (e.g. where the system does not involve hundreds of services with varying policies); (b) How a publish/subscribe approach can be realistically implemented in very few lines of code.

    Reply
  3. MatthewMay 26, 2008 ב 12:43 PM

    There is a problem forwarding a WS binding message due to unique security header, meaning that every time you need to recreate the message before sending it (=forwarding). And I don’t want this serialization/deserialization process to happen. Is there any solution for it? Removing unnecessary headers?

    Reply
  4. needbrewMarch 6, 2009 ב 5:30 PM

    How do you get multiple subscribers to one message. If I have two subscribers to the same message it errors out when you try to publish the message a second time?

    Reply