diff --git a/commonl/__init__.py b/commonl/__init__.py index 6cd8c6dd..5811640e 100755 --- a/commonl/__init__.py +++ b/commonl/__init__.py @@ -209,6 +209,41 @@ def retval(self): return None +def processes_guess(factor: int): + """Very simple parallelization count adjuster + + This is meant to be passed to something as + *concurrent.futures.ProcessPoolExecutor*: + + >>> paralellization_factor = -4 + >>> processes = commonl.processes_guess(paralellization_factor) + >>> concurrent.futures.ProcessPoolExecutor(processes) + + :param int factor: parallelization factor: + + - positive: absolute number of threads to use; use *1* to + serialize. + + - 0: get the best value for a CPU intensive workload; this + returns the amount of CPUs in the system. + + - < 0: get the best value for an IO intensive workload, where we + can do N IO operations / CPU. + + """ + if factor == 0: + factor = int(os.environ.get("THREADS_GUESS", 0)) + if factor > 0: + return factor + if factor == 0: + # factor for a CPU intensive workload + return multiprocessing.cpu_count() + # factor is negative; the absolute value is the intesiveness of + # the IO vs CPU, so how many IO we can run in parallel for each CPU + return -factor * multiprocessing.cpu_count() + + + class Process(fork_c): # COMPAT pass diff --git a/tcfl/servers.py b/tcfl/servers.py index 066b18a4..fe32da66 100644 --- a/tcfl/servers.py +++ b/tcfl/servers.py @@ -22,6 +22,7 @@ import logging import os +import commonl import tcfl.ttb_client # COMPAT: FIXME remove logger = logging.getLogger("tcfl.servers") @@ -102,9 +103,9 @@ def _run_on_server(server_name, fn, *args, return None, e - def run_fn_on_each_server(servers: dict, fn: callable, *args, serialize: bool = False, traces: bool = False, + parallelization_factor: int = -4, **kwargs): """ Run a function on each server in parallel @@ -122,24 +123,22 @@ def run_fn_on_each_server(servers: dict, fn: callable, *args, :data:`tcfl.server_c.servers` for all servers or any other dict with whatever server names are chosen. - :param bool serialize: (optional, default *False*) if calls to - each server need to be run in a single thread or can be run in - parallel (default). + :param int parallelization_factor: (optional, default -4, run + four operations per processor) number of threads to use to + parallelize the operation; use *1* to serialize. :param bool traces: (optional, default *True*) if log messages for exceptions shall include stack traces. """ - if serialize: - threads = 1 - else: - threads = len(servers) - + processes = min( + len(servers), + commonl.processes_guess(parallelization_factor)) results = {} - if threads == 0: + if processes == 0: return results - with concurrent.futures.ProcessPoolExecutor(threads) as executor: + with concurrent.futures.ProcessPoolExecutor(processes) as executor: futures = { # for each server, queue a thread that will call # _fn, who will call fn taking care of exceptions @@ -164,6 +163,7 @@ def run_fn_on_each_server(servers: dict, fn: callable, *args, return results + def subsystem_setup(*args, **kwargs): """ Initialize the server management system in a synchronous way diff --git a/tcfl/ui_cli.py b/tcfl/ui_cli.py index e2e917ed..71b35c0e 100644 --- a/tcfl/ui_cli.py +++ b/tcfl/ui_cli.py @@ -67,9 +67,14 @@ def args_targetspec_add( help = "Consider also disabled targets") if targetspec_n != 1: ap.add_argument( - "--serialize", action = "store_true", default = False, + "--serialize", + action = "store_const", dest = "parellization_factor", default = 1, help = "Serialize (don't parallelize) the operation on" " multiple targets") + ap.add_argument( + "--parallelization-factor", + action = "store", type = int, default = -4, + help = "(advanced) parallelization factor") if isinstance(targetspec_n, bool): if targetspec_n: nargs = "+" diff --git a/tcfl/ui_cli_servers.py b/tcfl/ui_cli_servers.py index df97ee7f..44ff9b7a 100644 --- a/tcfl/ui_cli_servers.py +++ b/tcfl/ui_cli_servers.py @@ -60,7 +60,8 @@ def _cmdline_servers(cli_args: argparse.Namespace): r = tcfl.servers.run_fn_on_each_server( servers, _logged_in_username, - serialize = cli_args.serialize, traces = cli_args.traces) + parallelization_factor = cli_args.parallelization_factor, + traces = cli_args.traces) # r now is a dict keyed by server_name of tuples usernames, # exception for server_name, ( username, _e ) in r.items(): diff --git a/tcfl/ui_cli_users.py b/tcfl/ui_cli_users.py index ff1f20c8..76fa6ed1 100644 --- a/tcfl/ui_cli_users.py +++ b/tcfl/ui_cli_users.py @@ -177,7 +177,8 @@ def _cmdline_login(cli_args: argparse.Namespace): r = tcfl.servers.run_fn_on_each_server( servers, _login, credentials, - serialize = cli_args.serialize, traces = cli_args.traces) + parallelization_factor = cli_args.parallelization_factor, + traces = cli_args.traces) # r now is a dict keyed by server_name of tuples usernames, # exception logged_count = 0 @@ -216,7 +217,8 @@ def _cmdline_logout(cli_args: argparse.Namespace): r = tcfl.servers.run_fn_on_each_server( servers, _logout, cli_args, - serialize = cli_args.serialize, traces = cli_args.traces) + parallelization_factor = cli_args.parallelization_factor, + traces = cli_args.traces) # r now is a dict keyed by server_name of tuples usernames, # exception for server_name, ( _, e ) in r.items(): @@ -258,7 +260,8 @@ def _cmdline_role_gain(cli_args: argparse.Namespace): tcfl.servers.run_fn_on_each_server( servers, _user_role, cli_args, "gain", - serialize = cli_args.serialize, traces = cli_args.traces) + parallelization_factor = cli_args.parallelization_factor, + traces = cli_args.traces) def _cmdline_role_drop(cli_args: argparse.Namespace): @@ -269,7 +272,8 @@ def _cmdline_role_drop(cli_args: argparse.Namespace): tcfl.servers.run_fn_on_each_server( servers, _user_role, cli_args, "drop", - serialize = cli_args.serialize, traces = cli_args.traces) + parallelization_factor = cli_args.parallelization_factor, + traces = cli_args.traces) @@ -290,7 +294,8 @@ def _cmdline_user_list(cli_args: argparse.Namespace): result = tcfl.servers.run_fn_on_each_server( tcfl.server_c.servers, _user_list, cli_args, - serialize = cli_args.serialize, traces = cli_args.traces) + parallelization_factor = cli_args.parallelization_factor, + traces = cli_args.traces) # so now result is a dictionary of SERVER: ( DATA, EXCEPTION ), # where DATA is dictionaries of USERNAME: USERDATA @@ -399,9 +404,14 @@ def cmdline_setup(arg_subparser): "AKA is the short name of the server (defaults to the sole " "host name, without the domain). Find it with 'tcf servers'") ap.add_argument( - "--serialize", action = "store_true", default = False, + "--serialize", + action = "store_const", dest = "parellization_factor", default = 1, help = "Serialize (don't parallelize) the operation on" - " multiple servers") + " multiple targets") + ap.add_argument( + "--parallelization-factor", + action = "store", type = int, default = -4, + help = "(advanced) parallelization factor") ap.add_argument( "username", nargs = '?', metavar = "USERNAME", action = "store", default = None, @@ -434,9 +444,14 @@ def cmdline_setup_advanced(arg_subparser): help = "ID of user whose role is to be dropped" " (optional, defaults to yourself)") ap.add_argument( - "--serialize", action = "store_true", default = False, + "--serialize", + action = "store_const", dest = "parellization_factor", default = 1, help = "Serialize (don't parallelize) the operation on" - " multiple servers") + " multiple targets") + ap.add_argument( + "--parallelization-factor", + action = "store", type = int, default = -4, + help = "(advanced) parallelization factor") ap.add_argument( "role", action = "store", help = "Role to gain") @@ -452,9 +467,14 @@ def cmdline_setup_advanced(arg_subparser): help = "ID of user whose role is to be dropped" " (optional, defaults to yourself)") ap.add_argument( - "--serialize", action = "store_true", default = False, + "--serialize", + action = "store_const", dest = "parellization_factor", default = 1, help = "Serialize (don't parallelize) the operation on" - " multiple servers") + " multiple targets") + ap.add_argument( + "--parallelization-factor", + action = "store", type = int, default = -4, + help = "(advanced) parallelization factor") ap.add_argument( "role", action = "store", help = "Role to drop") @@ -467,9 +487,14 @@ def cmdline_setup_advanced(arg_subparser): "admin role privilege to list users others than your own)") tcfl.ui_cli.args_verbosity_add(ap) ap.add_argument( - "--serialize", action = "store_true", default = False, + "--serialize", + action = "store_const", dest = "parellization_factor", default = 1, help = "Serialize (don't parallelize) the operation on" " multiple targets") + ap.add_argument( + "--parallelization-factor", + action = "store", type = int, default = -4, + help = "(advanced) parallelization factor") ap.add_argument("userid", action = "store", default = None, nargs = "*", help = "Users to list (default all)")