Skip to content

Commit

Permalink
fix BrokenPipeError in multiprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
wanqqq31 committed Oct 9, 2024
1 parent 2e4ad04 commit 31d062c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 34 deletions.
55 changes: 22 additions & 33 deletions src/pfs_target_uploader/utils/ppp.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def PPPrunStart(
d_pfi=1.38,
quiet=True,
clustering_algorithm="HDBSCAN",
max_exetime=900,
queue=None,
logger=None,
):
Expand All @@ -69,8 +68,6 @@ def PPPrunStart(
if weight_para is None:
weight_para = [2.02, 0.01, 0.01]

t_start_ppp = time.time()

def count_N(sample):
"""calculate local count of targets
Expand Down Expand Up @@ -293,7 +290,7 @@ def KDE_xy(sample, X, Y):

return Z

def KDE(sample, multiProcesing, sleep_time=0.01, max_sleep_time=2.5):
def KDE(sample, multiProcesing):
"""define binning and calculate KDE
Parameters
Expand All @@ -307,19 +304,6 @@ def KDE(sample, multiProcesing, sleep_time=0.01, max_sleep_time=2.5):
=======
ra_bin, dec_bin, significance of KDE over the field, ra of peak in KDE, dec of peak in KDE
"""

t_start_kde = time.time()

logger.debug(f"time elapsed in ppp: {t_start_kde-t_start_ppp:.2f}s")

if (max_exetime > 0) and (
t_start_kde - t_start_ppp > max_exetime - max_sleep_time * 1.5
):
logger.warning(
f"running out of time in KDE calculation soon. Change sleep time to {max_sleep_time:.1f}s"
)
sleep_time = max_sleep_time

if len(sample) == 1:
# if only one target, set it as the peak
return (
Expand Down Expand Up @@ -356,11 +340,16 @@ def KDE(sample, multiProcesing, sleep_time=0.01, max_sleep_time=2.5):
threads_count, round(len(sample) * 0.5)
) # threads_count=10 in this machine

with mp.Pool(thread_n) as p:
dMap_ = p.map(
partial(KDE_xy, X=X_, Y=Y_), np.array_split(sample, thread_n)
)
time.sleep(sleep_time)
kde_p = mp.Pool(thread_n)
dMap_ = kde_p.map_async(

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "dMap_" doesn't conform to snake_case naming style Warning

Variable name "dMap_" doesn't conform to snake_case naming style
partial(KDE_xy, X=X_, Y=Y_),
np.array_split(sample, thread_n),
)

kde_p.close()
kde_p.join()

dMap_ = dMap_.get()

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "dMap_" doesn't conform to snake_case naming style Warning

Variable name "dMap_" doesn't conform to snake_case naming style

Z = sum(dMap_)

Expand All @@ -378,7 +367,7 @@ def KDE(sample, multiProcesing, sleep_time=0.01, max_sleep_time=2.5):

return X_, Y_, obj_dis_sig_, peak_x, peak_y

def PPP_centers(sample_f, ppc_f, mutiPro, weight_para, starttime):
def PPP_centers(sample_f, ppc_f, mutiPro, weight_para):

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Argument name "mutiPro" doesn't conform to snake_case naming style Warning

Argument name "mutiPro" doesn't conform to snake_case naming style

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Function name "PPP_centers" doesn't conform to snake_case naming style Warning

Function name "PPP_centers" doesn't conform to snake_case naming style

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Too many local variables (20/15) Warning

Too many local variables (20/15)
"""determine pointing centers
Parameters
Expand Down Expand Up @@ -913,7 +902,7 @@ def complete_ppc(sample, point_l):
sub_l,
)

def netflow_iter(uS, obj_allo, weight_para, starttime, status):
def netflow_iter(uS, obj_allo, weight_para, status):

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Argument name "uS" doesn't conform to snake_case naming style Warning

Argument name "uS" doesn't conform to snake_case naming style
"""iterate the total procedure to re-assign fibers to targets which have not been assigned
in the previous/first iteration
Expand Down Expand Up @@ -952,7 +941,7 @@ def netflow_iter(uS, obj_allo, weight_para, starttime, status):
uS_t1["exptime_PPP"] - uS_t1["exptime_assign"]
) # remained exposure time

uS_t2, status = PPP_centers(uS_t1, [], True, weight_para, starttime)
uS_t2, status = PPP_centers(uS_t1, [], True, weight_para)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "uS_t2" doesn't conform to snake_case naming style Warning

Variable name "uS_t2" doesn't conform to snake_case naming style

obj_allo_t = netflowRun(uS_t2)

Expand Down Expand Up @@ -991,13 +980,13 @@ def netflow_iter(uS, obj_allo, weight_para, starttime, status):
uPPC_M = []

if len(uS_L) > 0 and len(uS_M) == 0:
uS_L_s2, status_ = PPP_centers(uS_L, uPPC_L, True, weight_para, t_ppp_start)
uS_L_s2, status_ = PPP_centers(uS_L, uPPC_L, True, weight_para)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "uS_L_s2" doesn't conform to snake_case naming style Warning

Variable name "uS_L_s2" doesn't conform to snake_case naming style
obj_allo_L = netflowRun(uS_L_s2)

if len(uPPC_L) == 0:
uS_L2 = complete_ppc(uS_L_s2, obj_allo_L)[0]
obj_allo_L_fin, status_ = netflow_iter(
uS_L2, obj_allo_L, weight_para, t_ppp_start, status_
uS_L2, obj_allo_L, weight_para, status_
)
uS_L2, cR_L_fh, cR_L_fh_, cR_L_n, cR_L_n_, sub_l = complete_ppc(
uS_L_s2, obj_allo_L_fin
Expand All @@ -1016,13 +1005,13 @@ def netflow_iter(uS, obj_allo, weight_para, starttime, status):
out_sub_l = sub_l

if len(uS_M) > 0 and len(uS_L) == 0:
uS_M_s2, status_ = PPP_centers(uS_M, uPPC_M, True, weight_para, t_ppp_start)
uS_M_s2, status_ = PPP_centers(uS_M, uPPC_M, True, weight_para)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "uS_M_s2" doesn't conform to snake_case naming style Warning

Variable name "uS_M_s2" doesn't conform to snake_case naming style
obj_allo_M = netflowRun(uS_M_s2)

if len(uPPC_M) == 0:
uS_M2 = complete_ppc(uS_M_s2, obj_allo_M)[0]
obj_allo_M_fin, status_ = netflow_iter(
uS_M2, obj_allo_M, weight_para, t_ppp_start, status_
uS_M2, obj_allo_M, weight_para, status_
)
uS_M2, cR_M_fh, cR_M_fh_, cR_M_n, cR_M_n_, sub_m = complete_ppc(
uS_M_s2, obj_allo_M_fin
Expand All @@ -1040,12 +1029,12 @@ def netflow_iter(uS, obj_allo, weight_para, starttime, status):
out_sub_m = sub_m

if len(uS_L) > 0 and len(uS_M) > 0:
uS_L_s2, status_ = PPP_centers(uS_L, uPPC_L, True, weight_para, t_ppp_start)
uS_L_s2, status_ = PPP_centers(uS_L, uPPC_L, True, weight_para)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "uS_L_s2" doesn't conform to snake_case naming style Warning

Variable name "uS_L_s2" doesn't conform to snake_case naming style
obj_allo_L = netflowRun(uS_L_s2)
if len(uPPC_L) == 0:
uS_L2 = complete_ppc(uS_L_s2, obj_allo_L)[0]
obj_allo_L_fin, status_ = netflow_iter(
uS_L2, obj_allo_L, weight_para, t_ppp_start, status_
uS_L2, obj_allo_L, weight_para, status_
)
uS_L2, cR_L_fh, cR_L_fh_, cR_L_n, cR_L_n_, sub_l = complete_ppc(
uS_L_s2, obj_allo_L_fin
Expand All @@ -1057,12 +1046,12 @@ def netflow_iter(uS, obj_allo, weight_para, starttime, status):
)
out_obj_allo_L_fin = obj_allo_L

uS_M_s2, status_ = PPP_centers(uS_M, uPPC_M, True, weight_para, t_ppp_start)
uS_M_s2, status_ = PPP_centers(uS_M, uPPC_M, True, weight_para)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "uS_M_s2" doesn't conform to snake_case naming style Warning

Variable name "uS_M_s2" doesn't conform to snake_case naming style
obj_allo_M = netflowRun(uS_M_s2)
if len(uPPC_M) == 0:
uS_M2 = complete_ppc(uS_M_s2, obj_allo_M)[0]
obj_allo_M_fin, status_ = netflow_iter(
uS_M2, obj_allo_M, weight_para, t_ppp_start, status_
uS_M2, obj_allo_M, weight_para, status_
)
uS_M2, cR_M_fh, cR_M_fh_, cR_M_n, cR_M_n_, sub_m = complete_ppc(
uS_M_s2, obj_allo_M_fin
Expand Down
12 changes: 11 additions & 1 deletion src/pfs_target_uploader/widgets/PppResultWidgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import multiprocessing as mp
import sys
import psutil

Check warning

Code scanning / Prospector (reported by Codacy)

Unable to import 'psutil' (import-error) Warning

Unable to import 'psutil' (import-error)

import numpy as np
import panel as pn
Expand Down Expand Up @@ -309,7 +310,6 @@ def run_ppp(
1.38,
quiet,
clustering_algorithm,
max_exetime,
ppp_run_results,
logger,
),
Expand All @@ -329,6 +329,16 @@ def run_ppp(
duration=0, # ever
)

# Terminate child processes related to ppp_run (otherwise ppp_run.terminate will report BrokenPipeError)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Line too long (116/100) Warning

Line too long (116/100)

Check warning

Code scanning / Pylint (reported by Codacy)

Line too long (116/100) Warning

Line too long (116/100)
for ps in mp.active_children():

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Variable name "ps" doesn't conform to snake_case naming style Warning

Variable name "ps" doesn't conform to snake_case naming style

Check warning

Code scanning / Pylint (reported by Codacy)

mp.active_children is not callable Warning

mp.active_children is not callable

Check warning

Code scanning / Pylint (reported by Codacy)

Variable name "ps" doesn't conform to snake_case naming style Warning

Variable name "ps" doesn't conform to snake_case naming style
current_process = psutil.Process(ps.pid)
children_process = current_process.children(recursive=True)

if len(children_process) > 0:

Check warning

Code scanning / Pylint (reported by Codacy)

Do not use len(SEQUENCE) to determine if a sequence is empty Warning

Do not use len(SEQUENCE) to determine if a sequence is empty
# only kill ppp_run, not ppp_run_results (or kill it as well?? need FIX)
for child_ in children_process:
psutil.Process(child_.pid).terminate()

# Terminate PPP
ppp_run.terminate()

Expand Down

0 comments on commit 31d062c

Please sign in to comment.