Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate the reprojection step into individual jobs #40

Open
DinoBektesevic opened this issue Sep 3, 2024 · 0 comments
Open

Separate the reprojection step into individual jobs #40

DinoBektesevic opened this issue Sep 3, 2024 · 0 comments
Assignees

Comments

@DinoBektesevic
Copy link
Member

DinoBektesevic commented Sep 3, 2024

When we reproject a work unit, we either require a lot of parallel processes or we require a lot of time. When we have many 32+ CPUs per node or when we run for a longer time, we have a great chance of getting pre-empted on the checkpoint queue.

This means it's not feasible for us to run on a large number of GPUs (the costliest step) because of the reprojection (an embarrassingly parallelizable step). There are > 4k available CPUs on klone, so we should take advantage of that by reprojecting each shard individually.

The following scheme is proposed.

Create manifest --> Ic2ShardWu --> fake main_shard
                               --> reproject shard 1
                               --> ...
                               --> reproject shard N 
                    Ic2ShardWu --> fake main_shard 
                               --> ...

The following is the sketch of what should happen

@python_app(executor=["1thread_0gpu",])
def ic2shardWu(inputs=(collection,), outputs=(dir, wcs)):
    ic = ImageCollection.read(collection)

   # filtering scheme example
   mask_zp = np.logical_and(ic["zp] > 29 , ic["zp"] < 32)
   mask_wcs_err = ic["wcs_err"] > 1e-04
   mask_detectors = np.logical_and(np.logical_or(ic["detector"] == 31, ic["detector"] == 61), ic["detector"] == 2)
   ic = ic[np.logical_and(mask_zp, mask_wcs_err)]
    ic.reset_lazy_loading_indices()

   if len(ic) < 40:
      # in the main workflow.py loop you woudl check "if future[0]"
      return (False, None)

    butler = Butler("gscratch/dirac/DEEP")
    wu = ic.toWorkUnit(SearchConfig(), butler=butler)
    dirname = staging_dir/collection_name
    wu.to_sharded_fits("main.fits", dirname)
    wcs,shape = find_optimal_wcs(list(ic.wcs))
    obstimes = sorted(ic["mjd_mid"])
    obstimes_idxs = [i, t for i, t in enumerate(np.unique(obstimes))]
    obstimes_idxs = ascii.write(obstime_idxs)
    return [dirname, wcs, shape]
    
@python_app(executor=["1thread_0gpu",])
def reproject(inputs=(shard,wcs, shape), outputs=()):
    hdul = fitsio.open(shard)
    new_image, _ = reproject.reproject_interp(
        hdul,
        wcs,
        shape_out=shape,
        bad_value_mode="ignore",
        return_footprint=False,
        roundtrip_coords=False,
    )
    for hdu in new_image:
        hdu.header.update(wcs.to_header())
    new_image.write(shard, overwrite=True)
    
@python_app(executor=["1thread"])
def fake_main_shard(inputs=(sharddir,), outputs=()):
    hdul = fitsio.open(sharddir/main.fits)
    # ok I still don't understand what am I supposed to overwrite here
    # "get_unique_obstimes_and_indices" needs to read in all of the shards 
    # it casts them to imagestack
    # so just leace the copy as is
@drewoldag drewoldag self-assigned this Sep 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants