-
Notifications
You must be signed in to change notification settings - Fork 46
SO 5.6 Tutorials TrickyThreadPool DeviceManager
In this section, we'll discuss the main moments in the implementation of a_device_manager_t
agent. Other details can be found in the source code of that agent.
The agent should simulate the work with many devices but should also be a stateless object (it means that it shouldn't change its SObjectizer-related state or value of its field during the work). The fact that a_device_manager_t
is a stateless object allows it to have thread-safe event handlers.
But if a_device_manager_t
doesn't change its state how it determines what should happen with a device? Should a device be initialized, or reinitialized, or read?
It's simple: all necessary information is sent inside messages that a_device_manager_t
sends to itself.
The device manager sends N messages of type init_device_t
to itself at the start. At the arrival of init_device_t
instance, the device manager creates an instance of "device", an object of type device_t
, and performs initialization of that object. Then a pointer to that object is sent in perform_io_t
message. It looks like that:
void on_init_device(mhood_t<init_device_t> cmd) const {
// Update the stats for that op.
handle_msg_delay(a_dashboard_t::op_type_t::init, *cmd);
// A new device should be created.
// We should imitate a pause related to the device initialization.
auto dev = std::make_unique<device_t>(cmd->id_,
calculate_io_period(),
calculate_io_ops_before_reinit(),
calculate_reinits_before_recreate());
std::this_thread::sleep_for(args_.device_init_time_);
// Send a message for the first IO-op on that device.
send_perform_io_msg(std::move(dev));
}
When the device manager receives perform_io_t
message it simulates IO-operation for the device from the message. The counter of IO-operations for the device is decremented. If that counter becomes zero the device manager sends reinit_device_t
(only if the counter of reinitializations is not zero yet) or init_device_t
(if there are no more reinitialization attempts). That simple logic reflects the behavior of real devices that can "hangs" and requires reinitialization. Or can become broken after some time of use and should be detached and changed by a new one.
If the counter of IO-operations is not zero yet then the device manager resends a new perform_io_t
message to itself.
That is expressed in the code the following way:
void on_perform_io(mutable_mhood_t<perform_io_t> cmd) const {
// Update the stats for that op.
handle_msg_delay(a_dashboard_t::op_type_t::io_op, *cmd);
// Simulate a pause for IO-op.
std::this_thread::sleep_for(args_.io_op_time_);
// The remaining count of IO-ops should be decremented.
cmd->device_->remaining_io_ops_ -= 1;
// Maybe it is time to reinit or recreate the device?
if(0 == cmd->device_->remaining_io_ops_) {
if(0 == cmd->device_->remaining_reinits_)
// The device should recreated. Using the same ID.
so_5::send<init_device_t>(*this, cmd->device_->id_);
else
// There are remaining reinit attempts.
so_5::send<so_5::mutable_msg<reinit_device_t>>(*this, std::move(cmd->device_));
}
else
// It isn't time for reinit yet. Continue IO-operations.
send_perform_io_msg(std::move(cmd->device_));
}
That is all the main logic. But a few details are worth discussed further.
The first action in handlers of init_device_t
, reinit_device_t
and perform_io_t
is the line like that:
handle_msg_delay(a_dashboard_t::op_type_t::init, *cmd);
This is the delivery of information about the waiting of the message to a_dashboard_t
agent. This information is used for building the result statistics.
In principle, there is no way to gather precise information about the waiting of a message without a deep integration inside SObjectizer's internals. In that case, we could fix the time of storing a message into the message-queue and the time of extraction of the message. But that is extremism for this simple simulation. So we use a much simpler approach: we'll store expected arrival time inside a message. For example, if we send a delayed by 250ms message then we expect the arrival of the message at the moment (Tc+250ms), where Tc is the current time. If the message arrives at (Tc+350ms) then it spent 100ms in the queue.
Obviously, it isn't a precise way, but it is appropriate for our simulation.
We can see calls of std::this_thread::sleep_for
in handlers of init_device_t
, reinit_device_t
and perform_io_t
messages. Those calls is an simulation of synchronous interaction with a device.
The times of the blocking of the current thread can be specified in the command line. By default, the following values are used: 1250ms for init_device_t
, 50ms for perform_io_t
. Value of reinit_device_t
is calculated as 3/4 of init_device_t
duration (833ms by default).
Maybe the most interesting part of a_device_manager_t
is the limitation of a device's lifetime. An instance of a device is created dynamically during the handling of init_device_t
message. After that, the device object should be alive while there are reinitialization attempts. When there are no more reinitialization attempts left the device object should be destroyed.
The device manager should be a stateless object at the same time. It means that we can't use some container with live objects (like std::map
or std::set
) in a_device_manager_t
.
A trick is used for solving that problem. A unique_ptr
is transferred in reinit_device_t
and perform_io_t
messages. So when we handle reinit_device_t
or perform_io_t
message and want to send the next message for that message then we simply move unique_ptr
from the old message to a new one. But if the device is no more used and we don't send new messages for it then the device instance is destroyed automatically. It is because the handled message is destroyed and unique_ptr
in it is being destroyed too.
But messages usually are sent as immutable objects in SObjectizer. And they should not be modified. This is because SObjectizer implements Pub/Sub model and it is impossible to say how many receivers are subscribed to a message. Maybe a dozen. Maybe a hundred. Maybe a thousand. Some subscribers will handle the message at the same time on different worker threads. Therefore the modification of a message should be prohibited. Because of that messages are passed to event handlers via const references.
However, there are cases when a message is sent to exactly one receiver. And that receiver wants to modify the message. Just like in our example where we want to take away unique_ptr
form the old perform_io_t
message and pass it to a new reinit_device_t
message.
Support for mutable messages has been added to SObjectizer for those cases. Mutable messages are marked the special way. SObjectizer checks the destination for a mutable message (mutable message can't be sent to multi-consumer mbox). It means that a mutable message can be delivered only to just one receiver. A mutable message is passed to an event handler by non-const reference. That allows to modify the message received.
We can see the traces of mutable messages in a_device_manager_t
code. For example, the following signature says that it is an event handler for a mutable message:
void on_perform_io(mutable_mhood_t<perform_io_t> cmd) const
And that code says that an instance of mutable message is sent:
so_5::send<so_5::mutable_msg<reinit_device_t>>(*this, std::move(cmd->device_));