Skip to content

Commit

Permalink
Summary: tcf/ui_cli: introduce processes_guess
Browse files Browse the repository at this point in the history
Unify the handling of "how many subprocesses" to use in the servers
code with a utility to guess how many we shall use. This is fed into
run_fn_on_each_server() so it can take care of the guessing for the
caller (who can also override it if they know).

Signed-off-by: Inaky Perez-Gonzalez <[email protected]>
  • Loading branch information
inaky-intc committed Jun 1, 2023
1 parent 5f2fa16 commit 9e62f6c
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 25 deletions.
35 changes: 35 additions & 0 deletions commonl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,41 @@ def retval(self):
return None


def processes_guess(factor: int):
"""Very simple parallelization count adjuster
This is meant to be passed to something as
*concurrent.futures.ProcessPoolExecutor*:
>>> paralellization_factor = -4
>>> processes = commonl.processes_guess(paralellization_factor)
>>> concurrent.futures.ProcessPoolExecutor(processes)
:param int factor: parallelization factor:
- positive: absolute number of threads to use; use *1* to
serialize.
- 0: get the best value for a CPU intensive workload; this
returns the amount of CPUs in the system.
- < 0: get the best value for an IO intensive workload, where we
can do N IO operations / CPU.
"""
if factor == 0:
factor = int(os.environ.get("THREADS_GUESS", 0))
if factor > 0:
return factor
if factor == 0:
# factor for a CPU intensive workload
return multiprocessing.cpu_count()
# factor is negative; the absolute value is the intesiveness of
# the IO vs CPU, so how many IO we can run in parallel for each CPU
return -factor * multiprocessing.cpu_count()



class Process(fork_c): # COMPAT
pass

Expand Down
22 changes: 11 additions & 11 deletions tcfl/servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import logging
import os

import commonl
import tcfl.ttb_client # COMPAT: FIXME remove

logger = logging.getLogger("tcfl.servers")
Expand Down Expand Up @@ -102,9 +103,9 @@ def _run_on_server(server_name, fn, *args,
return None, e



def run_fn_on_each_server(servers: dict, fn: callable, *args,
serialize: bool = False, traces: bool = False,
parallelization_factor: int = -4,
**kwargs):
"""
Run a function on each server in parallel
Expand All @@ -122,24 +123,22 @@ def run_fn_on_each_server(servers: dict, fn: callable, *args,
:data:`tcfl.server_c.servers` for all servers or any other dict
with whatever server names are chosen.
:param bool serialize: (optional, default *False*) if calls to
each server need to be run in a single thread or can be run in
parallel (default).
:param int parallelization_factor: (optional, default -4, run
four operations per processor) number of threads to use to
parallelize the operation; use *1* to serialize.
:param bool traces: (optional, default *True*) if log messages for
exceptions shall include stack traces.
"""

if serialize:
threads = 1
else:
threads = len(servers)

processes = min(
len(servers),
commonl.processes_guess(parallelization_factor))
results = {}
if threads == 0:
if processes == 0:
return results

with concurrent.futures.ProcessPoolExecutor(threads) as executor:
with concurrent.futures.ProcessPoolExecutor(processes) as executor:
futures = {
# for each server, queue a thread that will call
# _fn, who will call fn taking care of exceptions
Expand All @@ -164,6 +163,7 @@ def run_fn_on_each_server(servers: dict, fn: callable, *args,
return results



def subsystem_setup(*args, **kwargs):
"""
Initialize the server management system in a synchronous way
Expand Down
7 changes: 6 additions & 1 deletion tcfl/ui_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,14 @@ def args_targetspec_add(
help = "Consider also disabled targets")
if targetspec_n != 1:
ap.add_argument(
"--serialize", action = "store_true", default = False,
"--serialize",
action = "store_const", dest = "parellization_factor", default = 1,
help = "Serialize (don't parallelize) the operation on"
" multiple targets")
ap.add_argument(
"--parallelization-factor",
action = "store", type = int, default = -4,
help = "(advanced) parallelization factor")
if isinstance(targetspec_n, bool):
if targetspec_n:
nargs = "+"
Expand Down
3 changes: 2 additions & 1 deletion tcfl/ui_cli_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def _cmdline_servers(cli_args: argparse.Namespace):

r = tcfl.servers.run_fn_on_each_server(
servers, _logged_in_username,
serialize = cli_args.serialize, traces = cli_args.traces)
parallelization_factor = cli_args.parallelization_factor,
traces = cli_args.traces)
# r now is a dict keyed by server_name of tuples usernames,
# exception
for server_name, ( username, _e ) in r.items():
Expand Down
49 changes: 37 additions & 12 deletions tcfl/ui_cli_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ def _cmdline_login(cli_args: argparse.Namespace):

r = tcfl.servers.run_fn_on_each_server(
servers, _login, credentials,
serialize = cli_args.serialize, traces = cli_args.traces)
parallelization_factor = cli_args.parallelization_factor,
traces = cli_args.traces)
# r now is a dict keyed by server_name of tuples usernames,
# exception
logged_count = 0
Expand Down Expand Up @@ -216,7 +217,8 @@ def _cmdline_logout(cli_args: argparse.Namespace):

r = tcfl.servers.run_fn_on_each_server(
servers, _logout, cli_args,
serialize = cli_args.serialize, traces = cli_args.traces)
parallelization_factor = cli_args.parallelization_factor,
traces = cli_args.traces)
# r now is a dict keyed by server_name of tuples usernames,
# exception
for server_name, ( _, e ) in r.items():
Expand Down Expand Up @@ -258,7 +260,8 @@ def _cmdline_role_gain(cli_args: argparse.Namespace):

tcfl.servers.run_fn_on_each_server(
servers, _user_role, cli_args, "gain",
serialize = cli_args.serialize, traces = cli_args.traces)
parallelization_factor = cli_args.parallelization_factor,
traces = cli_args.traces)


def _cmdline_role_drop(cli_args: argparse.Namespace):
Expand All @@ -269,7 +272,8 @@ def _cmdline_role_drop(cli_args: argparse.Namespace):

tcfl.servers.run_fn_on_each_server(
servers, _user_role, cli_args, "drop",
serialize = cli_args.serialize, traces = cli_args.traces)
parallelization_factor = cli_args.parallelization_factor,
traces = cli_args.traces)



Expand All @@ -290,7 +294,8 @@ def _cmdline_user_list(cli_args: argparse.Namespace):

result = tcfl.servers.run_fn_on_each_server(
tcfl.server_c.servers, _user_list, cli_args,
serialize = cli_args.serialize, traces = cli_args.traces)
parallelization_factor = cli_args.parallelization_factor,
traces = cli_args.traces)

# so now result is a dictionary of SERVER: ( DATA, EXCEPTION ),
# where DATA is dictionaries of USERNAME: USERDATA
Expand Down Expand Up @@ -399,9 +404,14 @@ def cmdline_setup(arg_subparser):
"AKA is the short name of the server (defaults to the sole "
"host name, without the domain). Find it with 'tcf servers'")
ap.add_argument(
"--serialize", action = "store_true", default = False,
"--serialize",
action = "store_const", dest = "parellization_factor", default = 1,
help = "Serialize (don't parallelize) the operation on"
" multiple servers")
" multiple targets")
ap.add_argument(
"--parallelization-factor",
action = "store", type = int, default = -4,
help = "(advanced) parallelization factor")
ap.add_argument(
"username", nargs = '?', metavar = "USERNAME",
action = "store", default = None,
Expand Down Expand Up @@ -434,9 +444,14 @@ def cmdline_setup_advanced(arg_subparser):
help = "ID of user whose role is to be dropped"
" (optional, defaults to yourself)")
ap.add_argument(
"--serialize", action = "store_true", default = False,
"--serialize",
action = "store_const", dest = "parellization_factor", default = 1,
help = "Serialize (don't parallelize) the operation on"
" multiple servers")
" multiple targets")
ap.add_argument(
"--parallelization-factor",
action = "store", type = int, default = -4,
help = "(advanced) parallelization factor")
ap.add_argument(
"role", action = "store",
help = "Role to gain")
Expand All @@ -452,9 +467,14 @@ def cmdline_setup_advanced(arg_subparser):
help = "ID of user whose role is to be dropped"
" (optional, defaults to yourself)")
ap.add_argument(
"--serialize", action = "store_true", default = False,
"--serialize",
action = "store_const", dest = "parellization_factor", default = 1,
help = "Serialize (don't parallelize) the operation on"
" multiple servers")
" multiple targets")
ap.add_argument(
"--parallelization-factor",
action = "store", type = int, default = -4,
help = "(advanced) parallelization factor")
ap.add_argument(
"role", action = "store",
help = "Role to drop")
Expand All @@ -467,9 +487,14 @@ def cmdline_setup_advanced(arg_subparser):
"admin role privilege to list users others than your own)")
tcfl.ui_cli.args_verbosity_add(ap)
ap.add_argument(
"--serialize", action = "store_true", default = False,
"--serialize",
action = "store_const", dest = "parellization_factor", default = 1,
help = "Serialize (don't parallelize) the operation on"
" multiple targets")
ap.add_argument(
"--parallelization-factor",
action = "store", type = int, default = -4,
help = "(advanced) parallelization factor")
ap.add_argument("userid", action = "store",
default = None, nargs = "*",
help = "Users to list (default all)")
Expand Down

0 comments on commit 9e62f6c

Please sign in to comment.