Publish/Subscriber Using Routing Service (WCF4)
Hi all.
Download Code
Few weeks ago I went to an interesting lecture of Ido Flatow called: “What’s new in WCF4”. The lecture was very good mostly because it was the first time somebody gathered and organized the new abilities of WCF 4, and the material level in my opinion was pretty high. One of the features Ido talked about was the Routing Service. That feature just amazed me, I found it very powerful and useful in many scenarios: load balancing, on service failure, content base routing, bridge and more (MSDN Link).
Pub/Sub Solutions
I was at the time examining some pub/sub solutions. Discussing them can easily fill a series of posts. I will only mention few points on the subject: First there are many options available:
Second, the Pub/Sub solution I looked for have several significant requirements:
- The usage must be very simple and clear – strong typed without complicated configuration – something like the Mass Transit interface(this ruled out the last option).
- I didn’t want the MSMQ based solution (the first 3) – one of the reasons is that MSMQ has size limitation (4 Mega) – although there are ways to reduce the size limitation.
- The pub/sub host must be completely loose coupled from publishers and subscribers.
I wanted WCF based solution but I had some bad experience using the Duplex capabilities of WCF – it was causing many problems, especially when the publisher server goes down and back up.
My Pub/Sub POC
During Ido’s lecture I realized that Routing service is perfect for Pub/Sub solution. Routing Service Distribute massage to all subscribers – and we get out of the box loose coupled of Binding, Security and contract. Each subscriber is hosting one way WCF service per massage type to get notification, so my first step was to define the client interface (requirement number 1).
The interface:
public interface IPubSubClient
{ void Publish<T>(T message);
void Subscribe<T>(ref Action<T> handler);
void Unsubscribe<T>(ref Action<T> handler);
}
This is how to Configure and run the routing service:
ServiceHost pubServiceHost = new ServiceHost(typeof(RoutingService));
ConfigureRouter(pubServiceHost);
pubServiceHost.Open();
Configure Router:
var routerContract = typeof(ISimplexDatagramRouter);
string routerAddress = "net.tcp://localhost:8888/publisher";
Binding routerBinding = new NetTcpBinding();
//add the endpoint the router will use to recieve messages
serviceHost.AddServiceEndpoint(routerContract, routerBinding, routerAddress);
//create a new routing configuration object
RoutingConfiguration rc = new RoutingConfiguration();
rc.RouteOnHeadersOnly = false;
serviceHost.Description.Behaviors.Add(new RoutingBehavior(rc));
Writing the POC was quite fluent, which made me more sure that the Routing Service is really suitable in this case.
Interesting Points
The code lies in front of you, still I wanted to point out some points I think are important:
- Configure the routing service is very simple using the class RoutingConfiguration we can add/remove keys from the filter table – which holds the routing table.
- We can change the Filter table during run time by calling ApplyConfiguration:
internal void Subscribe(string messageType, string endpointAddress)
{ lock (_routingConfiguration)
{ ContractDescription cd = ContractDescription.GetContract(typeof(IRequestReplyRouter));
ServiceEndpoint servEndpoint = new ServiceEndpoint(cd, new NetTcpBinding(), new EndpointAddress(endpointAddress));
_routingConfiguration.FilterTable.Add(new MessageTypeMessageFilter(messageType),
new List<ServiceEndpoint>() { servEndpoint });
_routingHost.Extensions.Find<RoutingExtension>().ApplyConfiguration(_routingConfiguration);
}
}
- The subscribers open the the notification service channel during runtime , using the same port (although they can use other random port numbers) – in order for it to work you must enable net.tcp port sharing by going the windows services (running services.msc) and start the service “NetTcpPortSharing”.
private ServiceHost CreateNewService<T>(List<Action<T>> handlers)
{ ServiceHost pubServiceHost = new ServiceHost(new PublisherService<T>() { Handlers = handlers }); string pubAddress = string.Format( "net.tcp://{0}:7777/pub/{1}/{2}",Environment.MachineName,typeof(T).Name,Guid.NewGuid()); var portSharingBinding = new NetTcpBinding();
portSharingBinding.PortSharingEnabled = true;
pubServiceHost.AddServiceEndpoint(new ServiceEndpoint(ContractDescription.GetContract(typeof(IPublisherService<T>)), portSharingBinding, new EndpointAddress(pubAddress)));
return pubServiceHost;
}
- I’m using XPathMessageFilter for matching massage type to his subscribers.
I am printing the latency of the message (avg. of ~60 MilliSec in my computer) and I notice that even when running ~50 subscribers the latency remain the same – this lays in the Routing Service implementation that distribute the massages efficiently.
Summery
Download and run the POC (don’t forget to start the port sharing service) – run the Pub/Sub Server and as much clients as you like. Please share your comments or/and thoughts.
Cheers Offir.