One Use Case for TPL

TPL is a set of libraries from Microsoft that can be used for parallel processing, basically it is a set of high level tools that can make your code run in parallel. TPL is not a panacea for parallel processing, mainly because you don’t always need parallel processing, but when you need it, it can be a life-saver.

The system

The system was built with a simple purpose, to consume and process events delivered by Kafka. The events were published, in Avro format, by a separate system, let’s call it producer. Essentially the events represent a course of actions, which are used to produce our model. Eventually that model is publicly exposed through an API (not HTTP). Everything was built using C# on dotnet core and ran on a docker container on linux hosts.

Kafka is a distributed messaging system, but the software industry is using it in many different ways. In this case Kafka was used as a messaging platform, where every message (i.e. event) was stored indefinitely. Avro is a data serialisation system, which was used to serialise the events that were transmitted through Kafka.

The issue

At the time the system was built there was only one avro library in C#, so the only choice was to use Microsoft’s implementation. The implementation is single threaded, so you can process one message at a time. On average one message took 150ms to deserialise, thus the system’s deserialisation throughput is ~7 events per second. If you have to deserialise one billion events you need a lot of time. So, it’s safe to assume that the deserialiser is slow speed-wise. It’s important to state that speed is not the only aspect to judge a deserialiser, but in this case it was the bottleneck.

So, the throughput of the system was low and had to be increased, which is not that hard considering the power of modern machines.

The solution

Multi-core machines to the rescue, they are cheap, they are everywhere and high level libraries make it easy to utilise them. Instead of running a single threaded deserialiser the system fires up 6 deserialisers, which process events in parallel.

Why 6 deserializers?

The system ran on an 8 core machine, so one core was used for serving our API, one core for the thread that read from Kafka and the rest for the deserialisers. A few tests were performed where we used more than 6 deserialisers which made the system run slower and be unresponsive at times of high load. It is safe to assume that a machine with more cores, can run more serialisers, but test it to be sure.

namespace Project
{
    public class ParallelDeserializer
    {
        private TransformBlock<KeyValuePair<int, byte[]>, object> processor;
        private boolean run = true;

        public ParallelDeserializer()
        {
            processor = new TransformBlock<KeyValuePair<int, byte[]>, object>(x => DoSomeWork(x), new ExecutionDataflowBlockOptions
            {
                BoundedCapacity = 10000,
                MaxDegreeOfParallelism = 6
            });
        }

        // Give data to the deserializer, this is called by the consumer
        public void PushMessage(int index, byte[] avro)
        {
            processor.SendAsync(new KeyValuePair<int, byte[]>(index, avro)).Wait();
        }

        public async Task StartAsync()
        {
            while (run)
            {
                object msg = await processor.ReceiveAsync();
                // Notify someone here that a message has been deserialized
            }
        }

        public void Stop()
        {
            run = false;
        }

        private object DoSomeWork(KeyValuePair<int, byte[]> input)
        {
            AvroDeserializer<object> deserializer = new AvroDeserializer<object>();
            object deserialized = null;

            try
            {
                deserialized = deserializer.Deserialize(input.Value);
            }
            catch
            {
                Console.WriteLine("An error occurred during deserialization");
            }
            return deserialized;
        }
    }
}

Why TransformBlock

It’s obvious. When receiving binary data, the system needed to deserialise, i.e. transform into something more meaningful. Together with that, TransformBlock provides guarantees on the order, so re-ordering is not needed.

When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore messages might not be processed in the order in which they are received. The order in which the messages are output from the block is, however, the same one in which they are received. [last link]

Why would you need re-ordering? Simply because every event takes different amount of time to deserialise. Consider the following case where the order is not kept:

                         → [ deserialise func ] → | e2 |
        | d1 | d2 | d3 | → [ deserialise func ] → | e1 | → | e2 | e1 | e3 |
                         → [ deserialise func ] → | e3 |

This is incorrect because the order is not kept. On our case the order of the events is very important so it must be kept at all times. Hence we used TransformBlock to guarantee that, without going into the hustle of re-ordering at the end.

Conclusion

TPL is an amazing library, which can help you solve issues that require parallel execution, easily. Instead of using TransformBlock someone could have written its own multi-core deserialiser, which might have been the obvious idea, but that would have added an extra cost on the maintenance of the project. Try to keep it simple, do not over-engineer and don’t reinvent the wheel.

References

NOTE: originally posted on medium: https://medium.com/@ipinak/one-use-case-for-tpl-8e7d0fd7dc5f