Converting Event-based Asynchronous Pattern into Task-based Asynchronous Pattern in C++ using PPL

April 23, 2013

no comments

In C# we have many ways for writing asynchronous code, we can create our own thread (a bad practice most of the times), we can use the thread-pool, we can use BackgroundWorker or we can use Tasks (and Async\Await which is based on tasks)

In order to notify the completion of an asynchronous operation we also have a few options (which depends on the type of dispatch mechanism we used).
We can raise event when the work is done, we can call a callback function, or we can use a continuation style like Task.ContinueWith() or the Async\Await pattern that does the continuation for us.

C++ can also run asynchronous operations but the way to do it is usually more complex, fortunately libraries like Boost.Asio or the STL async that was added to C++11 helps with that.

Never the less, notifying that the operation completed is still done in the old fashion way, and even std::future class doesn’t quite help because there is no continuation and trying to call the get() function will freeze the calling thread until the result is provided.

Last year Microsoft published the Parallel Patterns Library (PPL) which makes our life easier and provide continuation capabilities.

The down side? This lib is still not a standard and is not cross-platform yet, so you can only use it under a windows app. But the good news are that PPL papers were sent to the standard committee and this gives hope it will enter to the STL at some point.

Here is an example of a task with continuation in PPL

auto s = make_shared<wstring>(L"Value 1");
create_task([s] 
{
	// Print the current value.
	wcout << L"Current value: " << *s << endl;

	// Assign to a new value.
	*s = L"Value 2";
}).then([s] 
{
	// Print the current value.
	wcout << L"Current value: " << *s << endl;

	// Assign to a new value and return the string.
	*s = L"Value 3";

	return *s;
});

Did you notice the function create_task ?

This function is in “ppltasks.h” under the concurrency namespace. It will accept a lambda or function object and returns task<T>

The task<T> in c++ offers very similar methods to the Task<T> of .NET and enables continuations(then function), and join(when_all function) and choice(when_any function) patterns.

The task_completion_event

The  task_completion_event is similar to .NET TaskCompletionSource and gives us the possibility to control the task’s state (setting it to finish state) outside the task itself.

For example, lets create a task that will complete after some delay.

#include <future>
#include <ppltasks.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Creates a task that completes after the specified delay.
task<void> complete_after(unsigned int timeout)
{
	// A task completion event that is set when a timer fires.
	task_completion_event<void> tce;

	// Create a non-repeating timer.
	auto fire_once = std::make_shared<timer<int>>(timeout, 0, nullptr, false);
	// Create a call object that sets the completion event after the timer fires.
	auto callback = new call<int>([tce](int)
	{
		tce.set();
	});

	// Connect the timer to the callback and start the timer.
	fire_once->link_target(callback);
	fire_once->start();

	// Create a task that completes after the completion event is set.
	task<void> event_set(tce);

	// Create a continuation task that cleans up resources and 
	// and return that continuation task. 
	return event_set.then([callback, fire_once]()
	{
		delete callback;		
	});
}

The snippet above creates a task_completion_event and pass it to the task, what this does is tying the state of the task to the operations task_completion_event provides.

The timer that was created will launch the callback when the time is elapsed and as a consequence will set the task_completion_event which will make the task we created (event_set) to finish, so any continuation that was linked to the task will be launched only after the provided time is elapsed.

Now we can make this simple alarm clock

//wake me up in one hour
int hourInMiliseconds=60*60*1000;
complete_after(60*60*1000).then([]()
{
	cout<<"wake up!!!!!!";
});

This is a very useful way to convert the Event-based Asynchronous Pattern into a Task-based Asynchronous Pattern

C++ doesnt have a notion of events so the way to achieve event-like code is by using the Observer pattern.

With events you usually have something like this:

1) You define an interface that the observers needs to implement in order to be notified that a work is done 

struct IWorkerObserver
{
	virtual void OnWorkIsDone() = 0;
};
 

2)  The class that does the hard work needs to provide a way for the observers to register for notifications

class MyWorker
{

public:
	void Register(std::shared_ptr<IWorkerObserver> observer);
	virtual void Unregister(std::shared_ptr<IWorkerObserver> observer);

	//making an heavy work asynchronously
	void DoWorkAsync();

private:
	std::vector<std::weak_ptr<IWorkerObserver>> _observers;
};

3)  The method that does the hard work notifies to all the observers upon a successful (or unsuccessful) result

void MyWorker::DoWorkAsync()	
{
	//dispatching the work to a new thread
	std::async(std::launch::async,[=]()
	{
		//a very complex and long operation

		//notify all the observers that the work is done
		int count = _observers.size();	
		for(auto i = 0; i < count; i++) 
		{ 
			auto observer = _observers[i].lock(); 
			if(observer) 
			{
				observer->OnWorkIsDone(); 
			}
		} 
	});
}
 
 
 
 

Even with only one method and event this looks tedious.

Not only that, a specific observer only wants to get notified when the method that it invoked finished but this kind of implementation will notifiy to all observers that the work has finished and will do so every time the DoWorkAsync() method is invoke.

Now let’s look at the same thing with tasks

class MyWorkerWithTask
{

public:

	//making an heavy work asynchronously
	task<void> DoWorkAsync();
};

task<void> MyWorkerWithTask::DoWorkAsync()	
{
	task_completion_event<void> tce;
	task<void> doWorkTask(tce);
	//dispatching the work to a new thread
	std::async(std::launch::async,[=]()
	{
		//a very complex and long operation

		//notify the caller that the work is done
		tce.set();
	});
	return doWorkTask;
}

You can easily see how much simpler this version is.

The PPL include many more useful options and possibilities so you should defintly check it out and embrace it to make your code cleaner and shorter which can do a charm to your maintainability.

 
Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*