Wednesday, February 4, 2015

Smart Message Dispatcher

1. Introduction

Message dispatchers are central part of event-driven systems. There are many approaches to message dispatching. Starting from simple where handlers are part of code to complex beasts with possibility of adding handlers in run-time and crazy C++14 implementation[1]. Recently I discovered really smart and elegant implementation which I would like to demonstrate here.   

2. Motivation

Suppose we have some message-driven system and want to transfer some message over transport layer. Sending is piece of cake. Receiving is not:)
We cannot write receiving code just as:
template<typename Message> Message receive()
{
 ...
}

SomeMsg msg;
msg.receive();
...

The reasons are simple. We have never known real type of received message. With a different type a different handler is associated so we must know the real message type.
Dispatching always ends with downcasting through message class hierarchy in shared memory system or adding some kind of getTypeId inside every message and therefore testing that with id from deserialized incoming message header.
As a result there is a need to create a class dispatching received message to right handler. We want to focus on case where number of handlers is relatively small so adding handlers in run-time will be unnecessary. We would like to have the ability to write following use case:
dispatcher
    .handle<messages::enter_pin>(
        [&](messages::enter_pin const& msg)
        {
            ...
        })
    .handle<messages::pin_correct>(
        [&](messages::pin_correct const& msg)
        {
            ...
        })
    .handle<messages::pin_incorrect>(
        [&](messages::pin_incorrect const& msg)
        {
            ...
        }); 

3. Solution

At the beginning I decided to show code which will be equivalent with code above and which should be actually generated after inlining dispatcher internals:
receiver.wait_on_msg();
    if (receiver.received_msg_with_type(enter_pin::id))
        dispatch<enter_pin>();
    else
    if (receiver.received_msg_with_type(pin_correct::id))
        dispatch<pin_correct>();
    else
    if (receiver.received_msg_with_type(pin_incorrect::id))
        dispatch<pin_incorrect>();
where dispatch is something like
Msg msg;
receiver.deserialize_from_buffer(buffer);
handler(msg);
OK. So the problem is well defined. How to transform multiple calls of handle with handlers in form of lambdas to nested conditionals choosing right handler looking on message id? Solution is interesting and clever and comes from excellent book "C++ Concurrency in Action" written by Anthony Williams. Implementation first, comments after.
template<typename previous_dispatcher, typename Msg, typename Func>
class template_dispatcher
{
    std::shared_ptr<remote_transport::receiver> receiver;
    previous_dispatcher* prev;
    Func handler;
    bool chained;

    template_dispatcher(template_dispatcher const&) = delete;
    template_dispatcher& operator=(template_dispatcher const&) = delete;

    template<typename Dispatcher, typename OtherMsg, typename OtherFunc> friend class template_dispatcher;

    void wait_and_dispatch()
    {
        receiver->wait_on_msg();
        dispatch();
    }

    bool dispatch()
    {
        char id = Msg::message_id();
        if (receiver->received_msg_with_type(id))
        {
            Msg msg;
            receiver->deserialize_from_buffer(msg);
            handler(msg);
            return true;
        }
        else
            return prev->dispatch();
    }

public:

    template_dispatcher(template_dispatcher&& other) :
        receiver(other.receiver),
        prev(other.prev),
        handler(std::move(other.handler)),
        chained(other.chained)
    {
        other.chained = true;
    }

    template_dispatcher(std::shared_ptr<remote_transport::receiver> preceiver,
                        previous_dispatcher* prev_, Func&& handler_) :
        receiver(preceiver),
        prev(prev_),
        handler(std::forward<Func>(handler_)),
        chained(false)
    {
        prev_->chained = true;
    }

    template<typename OtherMsg, typename OtherFunc>
    template_dispatcher<template_dispatcher, OtherMsg, OtherFunc> handle(OtherFunc&& handler_)
    {
        return template_dispatcher<template_dispatcher, OtherMsg, OtherFunc>(
            receiver, this, std::forward<OtherFunc>(handler_));
    }

    ~template_dispatcher()
    {
        if (!chained)
            wait_and_dispatch();
    }
};



class dispatcher
{
    std::shared_ptr<remote_transport::receiver> receiver;
    bool chained;

    dispatcher(dispatcher const&) = delete;
    dispatcher& operator=(dispatcher const&) = delete;

    template<typename Dispatcher, typename Msg, typename Func> friend class template_dispatcher;

    void wait_and_dispatch()
    {
        receiver->wait_on_msg();
        dispatch();
    }

    bool dispatch()
    {
        char id = last_msg::message_id();
        if (receiver->received_msg_with_type(id))
        {
            assert(false);
        }
        return false;
    }

public:
    dispatcher(dispatcher&& other) :
        receiver(other.receiver),
        chained(other.chained)
    {
        other.chained = true;
    }

    explicit dispatcher(std::shared_ptr<remote_transport::receiver> preceiver) :
        receiver(preceiver),
        chained(false)
    {
    }

    template<typename Message, typename Func>
    template_dispatcher<dispatcher, Message, Func> handle(Func&& handler)
    {
        return template_dispatcher<dispatcher, Message, Func>(
            receiver, this, std::forward<Func>(handler));
    }

    ~dispatcher()
    {
        if (!chained)
            wait_and_dispatch();
    }
};

Above message dispatcher implementation is interesting for many reasons. It's short and compact. Moreover many modern C++11 features are being involved. Let's discuss how does this code work and why it works:
  • We introduced main dispatcher class and helper class called template_dispatcher. The idea behind those two classes is very simple. We want to compare the type of received message from buffer with the type registered on last handler. If it matches then we execute deserialization and run handler. If not then we continue process comparing the type of received message with the type registered on one before last handler and so on. When we cannot match message with any handler then program is terminated by assert(false).
  • Method dispatcher::handle is the only one public method except constructors and destructor. This method responsibility is limited to creating temporary template_dispatcher object with forwarding handler. This trick let us call handle again (but now template_dispatcher::handle) in client code with different handler. Inside template_dispatcher::handle creating temporary template_dispatcher object takes place so we have come full circle.
  • Sooner or later flow in client code achieve last handler which causes a call to last template_dispatcher::handle on some temporary template_dispatcher. After that destructor must be called. This is the moment when actual dispatching is running. Finally. Only last temporary template_dispatcher is not chained so we have call to wait_and_dispatch.
  • Wait_and_dispatch of course waits for message and dispatches it to the right handler:) First step is to receive message inside receiver->wait_on_msg call. Second step is to iterate over dispatchers by prev pointers (they build one directly linked list) and testing the type of received message with the message type associated with current template_dispatcher/dispatcher. The first dispatcher is always object from dispatcher class (not template_dispatcher) so when message types doesn't match then assert(false) is called.
  • Friendship is used for access to chained field. Classes are not not copy able (proper constructors are disabled) but moveable. I'm not sure if this is important because both classes are very lightweight. Explicit keyword is used for preventing passing e.g char value as pointer to receiver.
  • perfect forwarding in dispatcher and template_dispatcher constructors by std::forward and by std::move. Using perfect forwarding here may be unnecessary because for dispatching hot spot will be I/O probably. Anyway I didn't make any measurement to prove the last statement.
Notice how versatile presented solution is. Receiver from remote_transport namespace may pull messages from remote machine by network or may work on the same machine as sender using IPC or even ITC (inter-thread communication) mechanism. We may choose the kind of transport layer we want and dispatcher will handle this in proper way. Another interesting thing about dispatchers is that they usually cooperate with objects like handlers or event queues. Our dispatcher may be part of greater pattern like e.g. Proactor. That's quite common pattern which lets e.g handle many network data streams without explicit concurrency in simple way. 
If you are interested you can take a look on Boost Asio internals which was implemented with Proactor pattern in mind[2].

[1] see slides (in polish, but for code doesn't matter): http://cppwroclaw.pl/dokuwiki/_media/spotkania/007/03_dyspozytor_w_cpp11.pdf
[2] http://www.boost.org/doc/libs/1_47_0/doc/html/boost_asio/overview/core/async.html

No comments:

Post a Comment