Skip to content

Commit

Permalink
set the lazy attribute as an attribute of update_numprocs
Browse files Browse the repository at this point in the history
  • Loading branch information
julien6387 committed Apr 10, 2024
1 parent 7a9a7b3 commit f87b9c6
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 98 deletions.
6 changes: 3 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
The command is available through the new XML-RPCs `test_start_application` and `test_start_process` and have been
added to `supervisorctl`.

* Add a new XML-RPC `lazy_update_numprocs`, whose behaviour differ from `update_numprocs` in a way that the obsolete
processes will be deleted from the Supervisor configuration when they are stopped (exit, crash or later user request)
instead of being forced to stop immediately when requesting the numprocs to decrease.
* Add a `lazy` attribute to the `update_numprocs` XML-RPC, so that when set combined to a numprocs decrease,
**Supvisors** defers the obsolete processes deletion from the Supervisor configuration only when the processes stop
(exit, crash or later user request) instead of stopping them immediately.

* Add monotonic time in internal model and exchanges to cope with time updates while **Supvisors** is running.
Impact on the XML-RPC `get_instance_info`, `get_process_info` and on the event interface for instance status
Expand Down
4 changes: 1 addition & 3 deletions docs/xml_rpc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,14 @@ Process Control

.. automethod:: test_start_process(strategy, namespec)

.. automethod:: update_numprocs(program_name, numprocs, wait=True)
.. automethod:: update_numprocs(program_name, numprocs, wait=True, lazy=False)

.. hint::

This XML-RPC is the implementation of the following |Supervisor| request:

* `#177 - Dynamic numproc change <https://github.com/Supervisor/supervisor/issues/177>`_

.. automethod:: lazy_update_numprocs(program_name, numprocs)

.. automethod:: enable(program_name, wait=True)

.. hint::
Expand Down
34 changes: 5 additions & 29 deletions supvisors/rpcinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,16 +822,19 @@ def onwait() -> RPCInterface.OnWaitReturnType:
return onwait # deferred
return True

def update_numprocs(self, program_name: str, numprocs: int, wait: bool = True) -> WaitReturnType:
def update_numprocs(self, program_name: str, numprocs: int, wait: bool = True, lazy: bool = False) -> WaitReturnType:
""" Update dynamically the numprocs of the program.
Implementation of Supervisor issue #177 - Dynamic numproc change.
When the number of processes decreases, the processes in excess are stopped.
When the number of processes decreases:
- the processes in excess are immediately stopped if lazy is False ;
- the processes in excess are kept in Supervisor as long as they're still running if lazy is True.
:param str program_name: the program name, as found in the section of the Supervisor configuration files.
Programs, FastCGI programs and event listeners are supported.
:param int numprocs: the new numprocs value (must be strictly positive).
:param bool wait: if ``True`` and the numprocs value decreases, wait for the processes in excess to be stopped.
:param bool lazy: if ``True``, use the lazy mode when decreasing the program numprocs.
:return: always ``True`` unless error.
:rtype: bool
:raises RPCError: with code:
Expand All @@ -841,33 +844,6 @@ def update_numprocs(self, program_name: str, numprocs: int, wait: bool = True) -
``SupvisorsFaults.NOT_APPLICABLE`` if the program configuration does not support numprocs ;
``Faults.STILL_RUNNING`` if one process corresponding to ``program_name`` cannot be stopped.
"""
return self._update_numprocs(program_name, numprocs, wait, False)

def lazy_update_numprocs(self, program_name: str, numprocs: int) -> WaitReturnType:
""" Update dynamically the numprocs of the program.
Alternative implementation of Supervisor issue #177 - Dynamic numproc change.
When the number of processes decreases, the processes in excess are kept in Supervisor as long as they're still
running.
:param str program_name: the program name, as found in the section of the Supervisor configuration files.
Programs, FastCGI programs and event listeners are supported.
:param int numprocs: the new numprocs value (must be strictly positive).
:return: always ``True`` unless error.
:rtype: bool
:raises RPCError: with code:
``SupvisorsFaults.BAD_SUPVISORS_STATE`` if **Supvisors** is not in state ``OPERATION`` ;
``Faults.BAD_NAME`` if ``program_name`` is unknown to **Supvisors** ;
``Faults.INCORRECT_PARAMETERS`` if ``numprocs`` is not a strictly positive integer ;
``SupvisorsFaults.NOT_APPLICABLE`` if the program configuration does not support numprocs.
"""
return self._update_numprocs(program_name, numprocs, False, True)

def _update_numprocs(self, program_name: str, numprocs: int, wait: bool, lazy: bool) -> WaitReturnType:
""" Update dynamically the numprocs of the program.
In lazy mode, the obsolete processes are not terminated directly. They will be removed from Supervisor once
they will be stopped, unless they are re-activated again following an increase of numprocs.
"""
self.logger.trace(f'RPCInterface.update_numprocs: program={program_name} numprocs={numprocs}')
self._check_operating()
# test that program_name is known to the ServerOptions
Expand Down
2 changes: 1 addition & 1 deletion supvisors/supvisorsctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ def do_lazy_update_numprocs(self, arg):
self.help_lazy_update_numprocs()
return
try:
result = self.supvisors().lazy_update_numprocs(args[0], value)
result = self.supvisors().update_numprocs(args[0], value, True, True)
except xmlrpclib.Fault as e:
self.ctl.output(f'ERROR ({e.faultString})')
self.ctl.exitstatus = LSBInitExitStatuses.GENERIC
Expand Down
93 changes: 33 additions & 60 deletions supvisors/tests/test_rpcinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1146,33 +1146,6 @@ def test_restart_process_wait(mocker, rpc):
check_restart_deferred_function(mocker, rpc, process_1, result)


def test_update_numprocs(mocker, rpc):
""" Test the update_numprocs RPC. """
# get patches
mocked_check = mocker.patch.object(rpc, '_update_numprocs')
# test RPC calls
rpc.update_numprocs('dummy_program', 1)
assert mocked_check.call_args_list == [call('dummy_program', 1, True, False)]
mocker.resetall()
rpc.update_numprocs('dummy_program', 2, True)
assert mocked_check.call_args_list == [call('dummy_program', 2, True, False)]
mocker.resetall()
rpc.update_numprocs('dummy_program', 3, False)
assert mocked_check.call_args_list == [call('dummy_program', 3, False, False)]


def test_lazy_update_numprocs(mocker, rpc):
""" Test the lazy_update_numprocs RPC. """
# get patches
mocked_check = mocker.patch.object(rpc, '_update_numprocs')
# test RPC calls
rpc.lazy_update_numprocs('dummy_program', 1)
assert mocked_check.call_args_list == [call('dummy_program', 1, False, True)]
mocker.resetall()
rpc.lazy_update_numprocs('dummy_program', 2)
assert mocked_check.call_args_list == [call('dummy_program', 2, False, True)]


def test_update_numprocs_unknown_program(mocker, rpc):
""" Test the update_numprocs RPC with unknown program. """
# get patches
Expand Down Expand Up @@ -1201,7 +1174,7 @@ def test_update_numprocs_invalid_numprocs(mocker, rpc):
# test RPC call with known program and invalid numprocs value (not integer)
rpc.supvisors.server_options.program_configs = {'dummy_program': {}}
with pytest.raises(RPCError) as exc:
rpc._update_numprocs('dummy_program', 'one', False, False)
rpc.update_numprocs('dummy_program', 'one', False, False)
assert exc.value.args == (Faults.INCORRECT_PARAMETERS,
'program=dummy_program incorrect numprocs=one - integer > 0 expected')
assert mocked_check.call_args_list == [call()]
Expand All @@ -1220,7 +1193,7 @@ def test_update_numprocs_incorrect_numprocs(mocker, rpc):
# test RPC call with known program and invalid numprocs value (not integer)
rpc.supvisors.server_options.program_configs = {'dummy_program': {}}
with pytest.raises(RPCError) as exc:
rpc._update_numprocs('dummy_program', 0, False, False)
rpc.update_numprocs('dummy_program', 0, False, False)
assert exc.value.args == (Faults.INCORRECT_PARAMETERS,
'program=dummy_program incorrect numprocs=0 - integer > 0 expected')
assert mocked_check.call_args_list == [call()]
Expand All @@ -1240,7 +1213,7 @@ def test_update_numprocs_wrong_config(mocker, rpc):
rpc.supvisors.server_options.program_configs = {'dummy_program': {}}
mocked_numprocs.side_effect = ValueError('program_num missing in process_name')
with pytest.raises(RPCError) as exc:
rpc._update_numprocs('dummy_program', 2, False, False)
rpc.update_numprocs('dummy_program', 2, False, False)
assert exc.value.args == (SupvisorsFaults.NOT_APPLICABLE.value,
'numprocs not applicable to program=dummy_program')
assert mocked_check.call_args_list == [call()]
Expand All @@ -1259,7 +1232,7 @@ def test_update_numprocs_unchanged(mocker, rpc):
# test RPC call with known program, correct numprocs value and numprocs increase (nothing to stop)
rpc.supvisors.server_options.program_configs = {'dummy_program': {}}
mocked_numprocs.return_value = [], []
assert rpc._update_numprocs('dummy_program', 2, False, False) is True
assert rpc.update_numprocs('dummy_program', 2, False, False) is True
assert mocked_check.call_args_list == [call()]
assert mocked_numprocs.call_args_list == [call('dummy_program', 2)]
assert not mocked_insert.called
Expand All @@ -1277,7 +1250,7 @@ def test_update_numprocs_increase(mocker, rpc):
# test RPC call with known program, correct numprocs value and numprocs increase (nothing to stop)
rpc.supvisors.server_options.program_configs = {'dummy_program': {}}
mocked_numprocs.return_value = ['dummy_program_01', 'dummy_program_02'], None
assert rpc._update_numprocs('dummy_program', 2, False, False)
assert rpc.update_numprocs('dummy_program', 2, False, False)
assert mocked_check.call_args_list == [call()]
assert mocked_numprocs.call_args_list == [call('dummy_program', 2)]
assert mocked_insert.call_args_list == [call(['dummy_program_01', 'dummy_program_02'])]
Expand All @@ -1295,7 +1268,7 @@ def test_update_numprocs_decrease(mocker, rpc):
# test RPC call with known program, correct numprocs value and numprocs decrease
rpc.supvisors.server_options.program_configs = {'dummy_program': {}}
mocked_numprocs.return_value = None, ['dummy_program_01', 'dummy_program_02']
assert rpc._update_numprocs('dummy_program', 2, True, False) is decrease_mock
assert rpc.update_numprocs('dummy_program', 2, True, False) is decrease_mock
assert mocked_check.call_args_list == [call()]
assert mocked_numprocs.call_args_list == [call('dummy_program', 2)]
assert not mocked_increase.called
Expand All @@ -1312,27 +1285,27 @@ def test_update_numprocs_decrease_lazy(mocker, rpc):
# test RPC call with known program, correct numprocs value and numprocs decrease
rpc.supvisors.server_options.program_configs = {'dummy_program': {}}
mocked_numprocs.return_value = None, ['dummy_program_01', 'dummy_program_02']
assert rpc._update_numprocs('dummy_program', 2, True, True)
assert rpc.update_numprocs('dummy_program', 2, True, True)
assert mocked_check.call_args_list == [call()]
assert mocked_numprocs.call_args_list == [call('dummy_program', 2)]
assert not mocked_increase.called
assert not mocked_decrease.called


def test_decrease_numprocs_no_stop(mocker, rpc):
def test_decrease_numprocs_no_stop(mocker, supvisors, rpc):
""" Test the RPCInterface._decrease_numprocs method.
This test case deals with a context where the processes to remove are already stopped. """
# get patches
mocked_check = mocker.patch.object(rpc, '_check_process_deletion')
mocked_stop = rpc.supvisors.stopper.stop_process
mocked_next = rpc.supvisors.stopper.next
mocked_progress = rpc.supvisors.stopper.in_progress
local_identifier = rpc.supvisors.mapper.local_identifier
mocked_stop = supvisors.stopper.stop_process
mocked_next = supvisors.stopper.next
mocked_progress = supvisors.stopper.in_progress
local_identifier = supvisors.mapper.local_identifier
process_1 = Mock(namespec='process_1', info_map={local_identifier: {}}, **{'running_on.return_value': False})
process_2 = Mock(namespec='process_2', info_map={local_identifier: {}}, **{'running_on.return_value': False})
process_3 = Mock(namespec='process_3', info_map={local_identifier: {}}, **{'running_on.return_value': False})
get_map = {'process_1': process_1, 'process_2': process_2, 'process_3': process_3}
mocker.patch.object(rpc.supvisors.context, 'get_process', side_effect=lambda x: get_map[x])
mocker.patch.object(supvisors.context, 'get_process', side_effect=lambda x: get_map[x])
mocked_progress.return_value = False
# 1. test RPC call with known program, correct numprocs value and numprocs decrease (no process to stop) / no wait
params = ['process_1', 'process_2', 'process_3']
Expand All @@ -1355,20 +1328,20 @@ def test_decrease_numprocs_no_stop(mocker, rpc):
assert mocked_check.called


def test_decrease_numprocs_stop(mocker, rpc):
def test_decrease_numprocs_stop(mocker, supvisors, rpc):
""" Test the RPCInterface._decrease_numprocs method.
This test case deals with a context where the processes to remove have to be stopped. """
# get patches
mocked_check = mocker.patch.object(rpc, '_check_process_deletion')
mocked_stop = rpc.supvisors.stopper.stop_process
mocked_next = rpc.supvisors.stopper.next
mocked_progress = rpc.supvisors.stopper.in_progress
local_identifier = rpc.supvisors.mapper.local_identifier
mocked_stop = supvisors.stopper.stop_process
mocked_next = supvisors.stopper.next
mocked_progress = supvisors.stopper.in_progress
local_identifier = supvisors.mapper.local_identifier
process_1 = Mock(namespec='process_1', info_map={local_identifier: {}}, **{'running_on.return_value': True})
process_2 = Mock(namespec='process_2', info_map={local_identifier: {}}, **{'running_on.return_value': True})
process_3 = Mock(namespec='process_3', info_map={local_identifier: {}}, **{'running_on.return_value': False})
get_map = {'process_1': process_1, 'process_2': process_2, 'process_3': process_3}
mocker.patch.object(rpc.supvisors.context, 'get_process', side_effect=lambda x: get_map[x])
mocker.patch.object(supvisors.context, 'get_process', side_effect=lambda x: get_map[x])
mocked_progress.return_value = True
# test RPC call with known program, correct numprocs value and numprocs decrease (one process to stop) / wait
params = ['process_1', 'process_2', 'process_3']
Expand All @@ -1395,16 +1368,16 @@ def test_decrease_numprocs_stop(mocker, rpc):
assert not mocked_check.call_args_list == [call(['process_2', 'process_3'])]


def test_check_process_insertion(mocker, rpc):
def test_check_process_insertion(mocker, supvisors, rpc):
""" Test the RPCInterface._check_process_insertion method. """
# get patches
local_identifier = rpc.supvisors.mapper.local_identifier
local_identifier = supvisors.mapper.local_identifier
process_1 = Mock(namespec='process_1', info_map={})
process_2 = Mock(namespec='process_2', info_map={})
get_map = {'process_1': process_1, 'process_2': process_2}
mocker.patch.object(rpc.supvisors.context, 'get_process', side_effect=lambda x: get_map[x])
mocker.patch.object(supvisors.context, 'get_process', side_effect=lambda x: get_map[x])
# test with processes not added yet
params = 'process_1', 'process_2'
params = ['process_1', 'process_2']
with pytest.raises(RPCError) as exc:
rpc._check_process_insertion(params)
assert exc.value.args == (Faults.FAILED, "processes=['process_1', 'process_2']")
Expand All @@ -1418,16 +1391,16 @@ def test_check_process_insertion(mocker, rpc):
rpc._check_process_insertion(params)


def test_check_process_deletion(mocker, rpc):
def test_check_process_deletion(mocker, supvisors, rpc):
""" Test the RPCInterface._check_process_deletion method. """
# get patches
local_identifier = rpc.supvisors.mapper.local_identifier
local_identifier = supvisors.mapper.local_identifier
process_1 = Mock(namespec='process_1', info_map={local_identifier: {}})
process_2 = Mock(namespec='process_2', info_map={local_identifier: {}})
get_map = {'process_1': process_1, 'process_2': process_2}
mocket_get = mocker.patch.object(rpc.supvisors.context, 'get_process', side_effect=lambda x: get_map[x])
mocket_get = mocker.patch.object(supvisors.context, 'get_process', side_effect=lambda x: get_map[x])
# test with processes not removed yet
params = 'process_1', 'process_2'
params = ['process_1', 'process_2']
with pytest.raises(RPCError) as exc:
rpc._check_process_deletion(params)
assert exc.value.args == (Faults.FAILED, "processes=['process_1', 'process_2']")
Expand All @@ -1444,27 +1417,27 @@ def test_check_process_deletion(mocker, rpc):
rpc._check_process_deletion(params)


def test_enable_unknown_program(mocker, rpc):
def test_enable_unknown_program(mocker, supvisors, rpc):
""" Test the enable RPC with unknown program. """
# get patches
mocked_check = mocker.patch.object(rpc, '_check_operating')
mocked_enable = mocker.patch.object(rpc.supvisors.supervisor_updater, 'enable_program')
mocked_enable = mocker.patch.object(supvisors.supervisor_updater, 'enable_program')
# test RPC call with unknown program
rpc.supvisors.server_options.program_configs = {}
supvisors.server_options.program_configs = {}
with pytest.raises(RPCError) as exc:
rpc.enable('dummy_program')
assert exc.value.args == (Faults.BAD_NAME, 'program=dummy_program unknown to Supvisors')
assert mocked_check.call_args_list == [call()]
assert not mocked_enable.called


def test_enable_no_wait(mocker, rpc):
def test_enable_no_wait(mocker, supvisors, rpc):
""" Test the enable RPC / no wait. """
# get patches
mocked_check = mocker.patch.object(rpc, '_check_operating')
mocked_enable = mocker.patch.object(rpc.supvisors.supervisor_updater, 'enable_program')
mocked_enable = mocker.patch.object(supvisors.supervisor_updater, 'enable_program')
# test RPC call with unknown program
rpc.supvisors.server_options.program_configs = {'dummy_program': {}}
supvisors.server_options.program_configs = {'dummy_program': {}}
assert rpc.enable('dummy_program', False) is True
assert mocked_check.call_args_list == [call()]
assert mocked_enable.call_args_list == [call('dummy_program')]
Expand Down
4 changes: 2 additions & 2 deletions supvisors/tests/test_supvisorsctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,9 +814,9 @@ def test_lazy_update_numprocs(controller, plugin, mocked_check):
assert mocked_check.call_args_list == [call()]
mocked_check.reset_mock()
# test help and request
mocked_rpc = plugin.supvisors().lazy_update_numprocs
mocked_rpc = plugin.supvisors().update_numprocs
_check_call(controller, mocked_check, mocked_rpc, plugin.help_lazy_update_numprocs, plugin.do_lazy_update_numprocs,
'dummy_process 2', [call('dummy_process', 2)])
'dummy_process 2', [call('dummy_process', 2, True, True)])


def test_enable(controller, plugin, mocked_check):
Expand Down

0 comments on commit f87b9c6

Please sign in to comment.