Skip to content

Commit

Permalink
Update multicore.py
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-migas committed Jan 24, 2025
1 parent 3fb2f36 commit a6158f7
Showing 1 changed file with 44 additions and 19 deletions.
63 changes: 44 additions & 19 deletions src/koyo/multicore.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ def estimate_cpu_count_from_size(max_obj_size_in_bytes, keep_free_in_bytes=4_000
if n_cores > get_cpu_count():
n_cores = get_cpu_count()
print(
f"Total RAM: {virtual_memory().total / 1024 ** 3:0f} Gb",
f"\nReserved memory: {keep_free_in_bytes / 1024 ** 3} Gb",
f"\nMax. size of object: {max_obj_size_in_bytes / 1024 ** 3:0f} Gb",
f"Total RAM: {virtual_memory().total / 1024**3:0f} Gb",
f"\nReserved memory: {keep_free_in_bytes / 1024**3} Gb",
f"\nMax. size of object: {max_obj_size_in_bytes / 1024**3:0f} Gb",
f"\nMax. recommended cores: {n_cores}",
sep="",
)
Expand Down Expand Up @@ -91,7 +91,9 @@ def get_cpu_count(keep_free: int = 2, max_cpu: int = 24, n_tasks: int = 0) -> in
n_cores = n_tasks
if n_cores < 1:
warnings.warn(
f"Tried to reserve `{keep_free}` cores free ut failed be cores. Action will use all cores.", RuntimeWarning
f"Tried to reserve `{keep_free}` cores free ut failed be cores. Action will use all cores.",
RuntimeWarning,
stacklevel=2,
)
n_cores = cpu_count()
return n_cores
Expand All @@ -116,6 +118,12 @@ def run(self, func: ty.Callable, args: ty.Iterable, auto_expand: bool = True):
"""Execute."""
import mpire

kws = {
"worker_lifespan": 1,
"progress_bar": not self.silent,
"progress_bar_options": {"desc": self.desc, "mininterval": 5},
}

res = []
if auto_expand:
if args:
Expand All @@ -125,23 +133,40 @@ def run(self, func: ty.Callable, args: ty.Iterable, auto_expand: bool = True):
res.append(func(*arg))
else:
with mpire.WorkerPool(n_jobs=self.n_cores, keep_alive=False) as pool:
res = pool.map(
func,
args,
iterable_len=len(args),
worker_lifespan=1,
progress_bar=not self.silent,
progress_bar_options={"desc": self.desc, "mininterval": 5},
)
res = pool.map(func, args, iterable_len=len(args), **kws)
else:
with mpire.WorkerPool(n_jobs=self.n_cores, keep_alive=False) as pool:
res = pool.map(
func,
args,
worker_lifespan=1,
progress_bar=not self.silent,
progress_bar_options={"desc": self.desc, "mininterval": 5},
)
res = pool.map(func, args, **kws)
return res

def lazy_run(self, func: ty.Callable, args: ty.Iterable, auto_expand: bool = True):
"""Execute."""
import mpire

kws = {
"worker_lifespan": 1,
"progress_bar": not self.silent,
"progress_bar_options": {"desc": self.desc, "mininterval": 5},
}

res = []
if auto_expand:
if args:
args = list(args)
if len(args) == 1 or self.n_cores == 1:
for arg in tqdm(args, disable=False, desc=self.desc):
res.append(func(*arg))
yield res[-1]
else:
with mpire.WorkerPool(n_jobs=self.n_cores, keep_alive=True) as pool:
for res_ in pool.imap(func, args, iterable_len=len(args), **kws):
res.append(res_)
yield res_
else:
with mpire.WorkerPool(n_jobs=self.n_cores, keep_alive=True) as pool:
for res_ in pool.imap(func, args, **kws):
res.append(res_)
yield res_
return res


Expand Down

0 comments on commit a6158f7

Please sign in to comment.