Skip to content

Commit

Permalink
Merge pull request #1212 from caracal-pipeline/issue1179
Browse files Browse the repository at this point in the history
closes #1179
  • Loading branch information
gigjozsa authored Jul 26, 2020
2 parents 6dcfa85 + 2b2e5d0 commit 2853f09
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
11 changes: 10 additions & 1 deletion caracal/schema/line_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ mapping:
type: str
required: false
example: '1.420405752GHz'

ncpu:
type: int
desc: Number of CPUs to use for distributed processing. If set to 0 all available CPUs are used. This parameter is currently only passed on to WSClean for line imaging.
required: false
example: '0'
rewind_flags:
desc: Rewind flags of the input .MS file(s) to specified version. Note that this is not applied to the .MS file(s) you might be running "transfer_apply_gains" on.
type: map
Expand Down Expand Up @@ -367,6 +371,11 @@ mapping:
type: float
required: false
example: '0.6'
wscl_nrdeconvsubimg:
desc: Speed-up deconvolution by splitting each channel into a number of subimages, which are deconvolved in parallel. This parameter sets the number of subimages as follows. If set to 1 no parallel deconvolution is performed. If set to 0 the number of subimages is the same as the number of CPUs used by the line worker (see "ncpu" parameter above). If set to a number > 1 , the number of subimages is greater than or equal to the one requested by the user.
type: int
required: false
example: '0'
casa_thr:
desc: Flux-density level to stop CASA cleaning. It must include units, e.g. '1.0mJy'.
type: str
Expand Down
14 changes: 14 additions & 0 deletions caracal/workers/line_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from caracal.dispatch_crew import utils
import itertools
from caracal.workers.utils import manage_flagsets as manflags
import psutil

NAME = 'Process and Image Line Data'
LABEL = 'line'
Expand Down Expand Up @@ -199,6 +200,18 @@ def worker(pipeline, recipe, config):
firstchanfreq_all, chanw_all, lastchanfreq_all = [], [], []
restfreq = config['restfreq']

# distributed deconvolution settings
ncpu = config['ncpu']
if ncpu == 0:
ncpu = psutil.cpu_count()
else:
ncpu = min(ncpu, psutil.cpu_count())
nrdeconvsubimg = ncpu if config['make_cube']['wscl_nrdeconvsubimg'] == 0 else config['make_cube']['wscl_nrdeconvsubimg']
if nrdeconvsubimg == 1:
wscl_parallel_deconv = None
else:
wscl_parallel_deconv = int(np.ceil(max(config['make_cube']['npix'])/np.sqrt(nrdeconvsubimg)))

for i, msfile in enumerate(all_msfiles):
# Update pipeline attributes (useful if, e.g., channel averaging was
# performed by the split_data worker)
Expand Down Expand Up @@ -661,6 +674,7 @@ def worker(pipeline, recipe, config):
"auto-threshold": config['make_cube']['wscl_auto_thr'],
"multiscale": config['make_cube']['wscl_multiscale'],
"multiscale-scale-bias": config['make_cube']['wscl_multiscale_bias'],
"parallel-deconvolution": sdm.dismissable(wscl_parallel_deconv),
"no-update-model-required": config['make_cube']['wscl_noupdatemod']
}
if config['make_cube']['wscl_multiscale_scales']:
Expand Down

0 comments on commit 2853f09

Please sign in to comment.