diff --git a/broker/broker.py b/broker/broker.py index e94e280..f499c9d 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -24,6 +24,7 @@ from broker import exceptions, helpers from broker.hosts import Host from broker.providers import PROVIDER_ACTIONS, PROVIDERS, _provider_imports +from broker.settings import settings # load all the provider class so they are registered for _import in _provider_imports: @@ -86,7 +87,8 @@ def _act(self, provider, method, checkout=False): method_obj = getattr(provider_inst, method) logger.debug(f"On {provider_inst=} executing {method_obj=} with params {self._kwargs=}.") # Overkill for a single action, cleaner than splitting the logic - with ThreadPoolExecutor() as workers: + max_workers = min(count, int(settings.thread_limit)) if settings.thread_limit else None + with ThreadPoolExecutor(max_workers=max_workers) as workers: tasks = [workers.submit(method_obj, **self._kwargs) for _ in range(count)] result = [] for task in as_completed(tasks): @@ -202,8 +204,8 @@ def checkin(self, sequential=False, host=None, in_context=False): if not hosts: logger.debug("Checkin called with no hosts, taking no action") return - - with ThreadPoolExecutor(max_workers=1 if sequential else None) as workers: + max_workers = min(len(hosts), int(settings.thread_limit)) if settings.thread_limit else None + with ThreadPoolExecutor(max_workers=1 if sequential else max_workers) as workers: completed_checkins = as_completed( # reversing over a copy of the list to avoid skipping workers.submit(self._checkin, _host)