Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create own thread pool #13

Open
2 tasks
JulianKropp opened this issue Jul 19, 2024 · 1 comment
Open
2 tasks

Create own thread pool #13

JulianKropp opened this issue Jul 19, 2024 · 1 comment

Comments

@JulianKropp
Copy link
Collaborator

JulianKropp commented Jul 19, 2024

@JulianKropp JulianKropp mentioned this issue Jul 19, 2024
11 tasks
@JulianKropp JulianKropp changed the title Change max_workers in runtime without clearing the worker queue Create own thread pool Jul 19, 2024
@JulianKropp JulianKropp mentioned this issue Jul 19, 2024
13 tasks
@JulianKropp
Copy link
Collaborator Author

import threading
import queue
from concurrent.futures import Future

class DynamicThreadPoolExecutor:
    def __init__(self, max_workers):
        self.max_workers = max_workers
        self.tasks = queue.Queue()
        self.workers = []
        self.lock = threading.Lock()
        self.shutdown_flag = False
        self._create_workers(max_workers)

    def _create_workers(self, num_workers):
        for _ in range(num_workers):
            worker = threading.Thread(target=self._worker)
            worker.start()
            self.workers.append(worker)

    def _worker(self):
        while not self.shutdown_flag:
            try:
                fn, args, kwargs, future = self.tasks.get(timeout=1)
                if future.set_running_or_notify_cancel():
                    try:
                        result = fn(*args, **kwargs)
                        future.set_result(result)
                    except Exception as e:
                        future.set_exception(e)
                self.tasks.task_done()
            except queue.Empty:
                continue

    def submit(self, fn, *args, **kwargs):
        if self.shutdown_flag:
            raise RuntimeError("Cannot submit task after shutdown")
        future = Future()
        self.tasks.put((fn, args, kwargs, future))
        return future

    def set_max_workers(self, new_max_workers):
        with self.lock:
            if new_max_workers > self.max_workers:
                self._create_workers(new_max_workers - self.max_workers)
            elif new_max_workers < self.max_workers:
                for _ in range(self.max_workers - new_max_workers):
                    self.tasks.put((self._terminate_worker, (), {}, Future()))
            self.max_workers = new_max_workers

    def _terminate_worker(self):
        self.shutdown_flag = True

    def shutdown(self, wait=True):
        self.shutdown_flag = True
        if wait:
            for worker in self.workers:
                worker.join()

# Usage example
import time

def example_task(n):
    time.sleep(n)
    return n

if __name__ == '__main__':
    executor = DynamicThreadPoolExecutor(max_workers=2)
    futures = [executor.submit(example_task, i) for i in range(4)]
    
    time.sleep(5)
    executor.set_max_workers(4)
    
    for future in futures:
        print(future.result())
    
    executor.shutdown()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant