Different types of data blocks in TPL dataflow (TDF) part 3

18 ביולי 2012

2 תגובות

Hey.

In the last 2 posts I have talked about the difference of the data block in the new TPL data flow library that is a part of the new async concept in Microsoft.NET Framework 4.5. In the first post I have talk about the exection data blocks Link and in the second post I have talked about buffering data blocks Link if you haven't read it I encourage to go and read it now before continue reading this post.

In this post I would like to talk and explain about the third data block group the: Joining blocks. The purpose of data blocks in this group is to combine together a massage from 2 or more data source blocks into 1 massage. As for now, in the given data blocks you can group together 2 or 3 data sources but you can write your own data block that groups more than 3 data sources. Like the buffer block group, the data blocks in this group will not change the inner massages and but simply collect them and join them into a single massage. Let’s jump into the first data block.

First Block: BatchBlock<T>.

This block is a simple block that collects predefined number of massages, into a single batch message. The predefined number of massages that represents the batch size will be set in the contractor of the batch data block and you will not be able to change after creation. This data block will store massages posted to it until the number of the stored massaged will reach the batch size then a new massage will be propagated out of the batch block contains all the messages. If the massages number will not reach the batch size the massage will stay in the batch block infinity, in order to prevent this behavior, the batch block has a method called ‘TriggerBatch’ that will create a batch with a smaller amount of massages then declared in the constructor batch size. Let’s create a batch block:

var batchBlock = new BatchBlock<BasketballPlayer>(batchSize: 5);

I have created a batch block that received and stores items of class ‘BasketballPlayer’ with batch size of 5. Meaning that every 5 items of type ‘BasketballPlayer’ a new batch massage will be created by this batch block containing an array of 5 items of ‘BasketballPlayer’ objects.

For example, let’s say I would like to created basketball teams containing 5 players for each team. I will create an action block that creates a team out of 5 basketball players:

var basketballTeamCreator = new ActionBlock<BasketballPlayer[]>(players => {
// A code for creating a basketball team.
});

Now I will link those 2 data blocks together:

batchBlock.LinkedTo<string>( basketballTeamCreator );

Now, I will start posting basketball players into the batch block, each time the number of players in the batch block will reach the number 5, a new massage will be created with an array of 5 ‘BasketballPlayer’ items and only then will be a massage will be propagated into the basketballTeamCreator action data block.

Now I can post ‘BasketballPlayer’ items into the batch block I have created and I can check the array size of the action block basketballTeamCreator input parameter.

for (int i = 0 ; i < 23 ; i++)
{
    batchBlock.Post< BasketballPlayer >(new BasketballPlayer(
        “Player” + i.ToString()));
}

Note that out code will only create 4 basketball teams (5 players in each group), the last 3 ‘BasketballPlayer’ items will be kept in the batch block until farther posting will be made into the batch block. In order to create a batch massage will less the our predefined number of batch size we will need to invoke it from our code:

batchRequests.TriggerBatch();

This will create a new massage with batch size 3, with the last remaining ‘BasketballPlayer’ items inside our batch block and empty it.

Second block : JoinBlock<T,T,..>.

This block is one of my personal favorites; it receives massages from multiple data sources and creates a Tuple class that represents a join of massages from all the data sources. If you have no idea what is a Tuple class read about it here. Because it has multiple data sources, you can’t simply just post a massage into it. For each input type the join data block offers us a target property that you can post massages to it. For the first input type a property called ‘Target1’, for the second input type a property called ‘Target2’ and so on (I promise it will be much more clearer in the examples). Let’s create one.

var joinRequests = new JoinBlock<BasketballPlayer, UniformNumber>();

I have created a new join block, that gets 2 types of items, the first item is from type ‘BasketballPlayer’ and the second type is from type ‘UniformNumber’. For each basketball player I would like to give a uniform number in the team.

var giveUniformNumberToPlayers = new TransformBlock<Tuple<
    BasketballPlayer, UniformNumber>, BasketballPlayer>(b =>
{
    b.Item1.UniformNumber = b.Item2.Number;
    return b.Item1;
});

Now, I have created a transform block that gets a Tuple with input types ‘BasketballPlayer’ and ‘UniformNumber’ and set the uniform number in the basketball player class from the property 'Number' in the ‘UniformNumber’ class. Now let’s join those 2 data blocks.

joinBlock.LinkTo(giveUniformNumberToPlayers);

Now every time a massage will be created in the join data block ‘joinRequests’ it will be propagated into the transform data block ‘giveUniformNumberToPlayers’. Now I would like to post messages into the join data block I will do it like this

joinRequests.Target1.Post<BasketballPlayer>(new BasketballPlayer(“John”));
joinRequests.Target1.Post<BasketballPlayer>(new BasketballPlayer(“Adam”));
joinRequests.Target1.Post<BasketballPlayer>(new BasketballPlayer(“Alex”));
joinRequests.Target1.Post<BasketballPlayer>(new BasketballPlayer(“Danny”));
joinRequests.Target2.Post<UniformNumber>(new UniformNumber(4));
joinRequests.Target2.Post<UniformNumber>(new UniformNumber(5));

As you can see I have posted 4 massages into the first target property ‘Target1’ and I can only post messages from type ‘BasketballPlayer’ as is was declared as the first type on the join data block. I also posted 2 massages into the second target property ‘Target2’ that can only get massages from type ‘UniformNumber’.

So, how many massages were propagated into the transformation block ‘giveUniformNumberToPlayers’ ? the answer is only 2 massages the pair BasketballPlayer  'John' and the UniformNumber '4' and the BasketballPlayer 'Adam' and UniformNumber '5'

So what happens to the massages posted into property Target1 BasketballPlayer “Alex” and BasketballPlayer “Danny” ? They will stay in the join data block until another UniformNumber will be posted into Target2 and then and only then will be pulled and inserted into a Tuple and prorogated into transformation block ‘giveUniformNumberToPlayers’.

Third block: BatchedJoinBlock<T,T,..>.


This data block is a combination of the two blocks I have talked earlier, the batch data block and the join data block, like the batch data block it batches massages into a single massage, the number of the massages batched together, like the batch block, is set in the batched join data block constructor, but unlike the batch data block that has only one data source, the batched join data block have multiple data sources (like the join block). Like the join block, for each input type the batched join block offers us a target property that you can post massages to it. Again like the join block, batched join data block creates a Tuple with the massages batch in it. The big difference from the join data block and an important thing to know about the batched join data block is that a new massage is created after the batched size is reached regardless the amount of massages from each data source, if for example I have created a joined batch data block with batch size of 3 with 2 data sources and I have only posted massages into the first data source, after I will post 3 massages, the joined batch data block will create a massage a prorogated forward, unlike the join data block that joined batch data block will not wait until massages will arrives from all data sources but create a massage after the batch size is reached regardless the origin of the input massage. Let’s see the code:

var batchedJoinBlock = new BatchedJoinBlock<DraftedPlayers,
    UnDraftedPlayers, OverseasPlayers>(batchSize:3);

I have created a batched join data block that has three data sources from type ‘DraftedPlayers’, ‘UnDraftedPlayers’ and from type ‘OverseasPlayers’, the batch size is set to 3. Now let’s create a transform many data block:

var joinAllBasketballPlayers = new TransformManyBlock<Tuple<IList<
    DraftedPlayers>, IList<UnDraftedPlayers>, 
    IList<OverseasPlayers>>, BasketballPlayer>(b =>
{
    var basketballPlayersList = new List<BasketballPlayer>();
    basketballPlayersList.AddRange(b.Item1);
    basketballPlayersList.AddRange(b.Item2);
    basketballPlayersList.AddRange(b.Item3);
    return basketballPlayersList;
});

I have create a transform many data block that gets Tuple contains 3 lists, DraftedPlayers list, UnDraftedPlayers list and OverseasPlayers list and creates one list with all of basketball players (types 'DraftedPlayers', 'UnDraftedPlayers' and 'OverseasPlayers' inherit from class 'BasketballPlayer'), now I can link this transform many data block to the batched join data block I have create earlier.

batchedJoinBlock.LinkTo(joinAllBasketballPlayers);

Now every time I will post a 3 massages regardless whether the massages types (DraftedPlayer, UnDraftedPlayer or OverseasPlayer) a Tuple will be create and will be propagated to the joinAllBasketballPlayers transform many data block.

batchedJoinBlock.Target1.Post<DraftedPlayers>(new DraftedPlayers());
batchedJoinBlock.Target2.Post<UnDraftedPlayers>(new UnDraftedPlayers());
batchedJoinBlock.Target3.Post<OverseasPlayers>(new OverseasPlayers());
// A new massage will be propagated into the 
//joinAllBasketballPlayers transform many data block.
batchedJoinBlock.Target1.Post<DraftedPlayers>(new DraftedPlayers());
batchedJoinBlock.Target2.Post<UnDraftedPlayers>(new UnDraftedPlayers());
batchedJoinBlock.Target1.Post<DraftedPlayers>(new DraftedPlayers());
// A new massage will be propagated into the joinAllBasketballPlayers
//transform many data block.
batchedJoinBlock.Target1.Post<DraftedPlayers>(new DraftedPlayers());
batchedJoinBlock.Target2.Post<UnDraftedPlayers>(new UnDraftedPlayers());
// The join batched data block will store those massages until 
//another massage will be posted in either data sources

This was the last post about different types of data blocks in TDF, I haven't talked about everything and there are many things more to learn but I wanted to give you a taste of TDF and to get you started. I think Stephen Toub and the rest of the async team made a great job and async and TDF will be a major part in the develop process in the near future. If you have any more question about TDF can leave it here in the comments or send me an email and I will try to answer.


For you convince, I have added the all the code in this post together, a code that collect basketball players, give uniform number for each player and create basketball teams, enjoy.

var batchRequests = new BatchBlock<BasketballPlayer>(batchSize: 5);
var basketballTeamCreator = new ActionBlock<BasketballPlayer[]>(players =>
{
    for (int i = 0; i < players.Length; i++)
    {
        // A code for creating a basketball team.
    });

batchRequests.LinkTo(basketballTeamCreator);

var joinBlock = new JoinBlock<BasketballPlayer, UniformNumber>();
var giveUniformNumberToPlayers = new TransformBlock<Tuple<
    BasketballPlayer, UniformNumber>, BasketballPlayer>(b =>
{
    b.Item1.UniformNumber = b.Item2.Number;
    return b.Item1;
});

joinBlock.LinkTo(giveUniformNumberToPlayers);
giveUniformNumberToPlayers.LinkTo(batchRequests);

var batchedJoinBlock = new BatchedJoinBlock<DraftedPlayers, 
    UnDraftedPlayers, OverseasPlayers>(batchSize:3);
var joinAllBasketballPlayers = new TransformManyBlock<Tuple<IList<
    DraftedPlayers>, IList<UnDraftedPlayers>, 
    IList<OverseasPlayers>>, BasketballPlayer>(b =>
{
    var basketballPlayersList = new List<BasketballPlayer>();
    basketballPlayersList.AddRange(b.Item1);
    basketballPlayersList.AddRange(b.Item2);
    basketballPlayersList.AddRange(b.Item3);
    return basketballPlayersList;
});


batchedJoinBlock.LinkTo(joinAllBasketballPlayers);
joinAllBasketballPlayers.LinkTo(joinBlock.Target1);

batchedJoinBlock.Target1.Post<DraftedPlayers>(new DraftedPlayers());
batchedJoinBlock.Target2.Post<UnDraftedPlayers>(new UnDraftedPlayers());
batchedJoinBlock.Target3.Post<OverseasPlayers>(new OverseasPlayers());
batchedJoinBlock.Target1.Post<DraftedPlayers>(new DraftedPlayers());
batchedJoinBlock.Target2.Post<UnDraftedPlayers>(new UnDraftedPlayers());
batchedJoinBlock.Target1.Post<DraftedPlayers>(new DraftedPlayers());
batchedJoinBlock.Target1.Post<DraftedPlayers>(new DraftedPlayers());
batchedJoinBlock.Target2.Post<UnDraftedPlayers>(new UnDraftedPlayers());
batchedJoinBlock.Target3.Post<OverseasPlayers>(new OverseasPlayers());
batchedJoinBlock.Target1.Post<DraftedPlayers>(new DraftedPlayers());
batchedJoinBlock.Target2.Post<UnDraftedPlayers>(new UnDraftedPlayers());
batchedJoinBlock.Target3.Post<OverseasPlayers>(new OverseasPlayers());

joinBlock.Target2.Post<UniformNumber>(new UniformNumber(4));
joinBlock.Target2.Post<UniformNumber>(new UniformNumber(5));
joinBlock.Target2.Post<UniformNumber>(new UniformNumber(6));
joinBlock.Target2.Post<UniformNumber>(new UniformNumber(7));
joinBlock.Target2.Post<UniformNumber>(new UniformNumber(8));
joinBlock.Target2.Post<UniformNumber>(new UniformNumber(9));
joinBlock.Target2.Post<UniformNumber>(new UniformNumber(10));
joinBlock.Target2.Post<UniformNumber>(new UniformNumber(11));
joinBlock.Target2.Post<UniformNumber>(new UniformNumber(12));
joinBlock.Target2.Post<UniformNumber>(new UniformNumber(13));
הוסף תגובה
facebook linkedin twitter email

כתיבת תגובה

האימייל לא יוצג באתר. שדות החובה מסומנים *

2 תגובות

  1. icons archive24 בספטמבר 2012 ב 10:45

    I am sorry, that has interfered… This situation is familiar To me. Write here or in PM.

    hpixel

    הגב
  2. Elliot14 בדצמבר 2012 ב 5:55

    Thx! This type of clear explanation is just what is needed for datablocks.

    הגב