ReactiveQueue Example: Increasing Throughput for Stateless WCF services
Code for this post is available here
As promised, here is an example of how to use the ReactiveQueue<T> from RxContrib. Imagine that you have a stateless WCF service that needs to handle a large number of client requests – perhaps, a distributed logging service. Clients need to send messages as quickly as possible and then be on their way. It is up to your service to then do something with those messages. So, assuming we have a logging service, we know several things at this point: it’s a singleton (i.e., InstanceContextMode = InstanceContextMode.Single in the ServiceBehavior attribute) and since requests are independent of each other, we do not need to maintain any shared mutable state. This last points hints at the fact that perhaps our service can be single-threaded (add ConcurrencyMode = ConcurrencyMode.Single to the ServiceBehavior).
The Sample Code
The sample code included with this post contains 4 projects:
-
Client – This is a simple WCF client for accessing our service. Nothing special to say.
-
Common – This project contains our service interface, and the request objects used by the server. I chose to encapsulate each processing task carried out by the server in a separate object, since the service itself does not need to know the specifics of each operation. For this example, we have a ShortRequest which returns immediately (after supplying some diagnostic output) and a LongRequest which takes ~3 seconds to execute (you may think of these as perhaps a write request and some kind of analytic processing request). The client sends a total of 100 requests to the server, randomly mixed.
-
ReactiveQueueServer and WCFServer – discussed below.
Using the Plain WCF Server
If we use the two attributes above, we indeed get a singleton and single-threaded service. Of course, each client message is handled in sequence – so the client does not finish until all requests have been fully processed. On my machine this looks like this:
In fact, if I do nothing with the binding parameters I get a timeout at some point – since the service has not been available for that long.
Obviously, what we would like to do is to receive the client requests as quickly as possible and free the service for the next request. We can do this by modifying the service to only add the requests to an in-memory queue, rather than run them immediately. In this way, we are then free to dequeue the requests at our leisure and still have a responsive service. This is a variation on a pattern known as Half-Async/Half-Reactive.
Using the ReactiveQueue Server
The ReactiveQueue server has the exact same WCF configuration as above – it’s a singleton and single-threaded. However, in this implementation we use a ReactiveQueue instance to queue requests, and subscribe an IObserver instance to it (that, for simplicity, is implemented on the ServiceImpl class). The ReactiveQueue is then responsible for dequeueing the requests – on a separate thread – and calling the OnNext method of the observer. Our implementation of the method performs the actual execution of the request.
One of the options that the ReactiveQueue gives us is the ability to decide how we’d like the OnNext method to be called. Note line 16 in the ServiceImpl class of the ReactiveQueueServer project:
_requestQueue =
ReactiveQueue<RequestBase>.CreateConcurrentQueue(
ConcurrentPublicationBehavior.Async);
The ConcurrentPublicationBehavior.Async value tells the ReactiveQueue to run each invocation of the OnNext method in a separate TPL task. Running the client with the ReactiveQueue server rather than the plain server gives us this:
Here each separate request is being handled in a separate task (that is, a separate thread from the CLR thread pool)– and we see that the end of the batch is composed of only long requests. This actually makes sense, since threads with short requests have all finished.
Coming back to our hypothetical logging server, though, we see that this way of doing things does not fit our needs. We’d still like to make sure the service is available for further requests ASAP (i.e., the queue must still be there) but the requests must be processed in the same order in which they arrived. This is easy to do – simply change the value of the ConcurrentPublicationBehavior enum to Sync rather than Async. This has the effect of using a single task for calling all OnNext invocations, so we get the required order back. Note, however, that this thread is distinct from the thread on which the ServiceImpl runs and receives messages so that we still get high throughput.
Summary
In this post we saw how to use the ReactiveQueue class for implementing a simple WCF service with high throughput, using the Half-Async/Half-Reactive pattern. We have a simple and intuitive programming model for the service where no threading synchronization is necessary and we encapsulated the actual processing into stand-alone request objects. Applying this pattern to your own services should be fairly easy.