Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NERSC SFAPI Reconstruction Flow #44

Merged
merged 25 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b4c26f5
WIP: Oct 3rd update for nersc recon with podman-hpc container
rajasriramoju Nov 12, 2024
37632c7
Adding Raja's nersc.py script
davramov Nov 13, 2024
b128b2c
able to get nersc reconstruction running in this configuration
davramov Dec 4, 2024
0463739
Added an sfapi slurm call to run tiff_to_zarr on nersc
davramov Dec 4, 2024
91356bd
Merge branch 'als-computing:main' into nersc_sfapi
davramov Dec 4, 2024
e1e7add
Migrating nersc tomography flow code to an abstract class structure (…
davramov Dec 6, 2024
8c5d20a
Addressing dylan's comments. Moved image names to config.yml and conf…
davramov Dec 7, 2024
235acda
Moved NERSC specific NERSCTomographyHPCController to flows/bl832/ners…
davramov Dec 9, 2024
9afefa3
Configured both reconstruction and multires steps to take in a file p…
davramov Dec 11, 2024
6304fe9
Resolved circular import issue, NERSC controller is not defined in fl…
davramov Dec 12, 2024
8e16fcc
Catch sfapi errors after submitting jobs, such that we are able to wa…
davramov Dec 13, 2024
c6601be
Adding test_sfapi_flow.py pytest script.
davramov Dec 14, 2024
6057aae
Addressing prefect secret patch issue in pytest script
davramov Dec 14, 2024
f9548a8
Removing the NerscClient dependency, and using sfapi_client module di…
davramov Dec 17, 2024
3fe95b5
Adding deployment configuration for the nersc_recon_flow
davramov Dec 17, 2024
6a4d2bf
Added a comments in indicating that NerscClient is deprecated and wi…
davramov Dec 19, 2024
223c3b4
Updated NERSC job script to pull from ghcr.io image instead of Harbor…
davramov Dec 20, 2024
79f128c
Updated config to include location of reconstruction scripts on NERSC…
davramov Jan 14, 2025
fae190f
Update pruning code in nersc.py, although it still needs testing. Add…
davramov Jan 15, 2025
1b2ae5c
Added transfer_controller.py, and use the GlobusTransferController to…
davramov Jan 17, 2025
e2963f0
Included new controller class for data transfers and refactored the N…
davramov Jan 21, 2025
f30dd28
Added placeholders for ALCFTomographyHPCController in alcf.py and OLC…
davramov Jan 21, 2025
fd62200
Added a pytest script for the new transfer_controller and globus/simp…
davramov Jan 28, 2025
f74a7a3
Updated unittests
davramov Jan 28, 2025
087db2d
removed @dataclass decorator because it is redundant with an __init__…
davramov Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ globus:
client_id: ${GLOBUS_CLIENT_ID}
client_secret: ${GLOBUS_CLIENT_SECRET}

harbor_images832:
recon_image: tomorecon_nersc_mpi_hdf5@sha256:cc098a2cfb6b1632ea872a202c66cb7566908da066fd8f8c123b92fa95c2a43c
multires_image: tomorecon_nersc_mpi_hdf5@sha256:cc098a2cfb6b1632ea872a202c66cb7566908da066fd8f8c123b92fa95c2a43c

prefect:
deployments:
- type_spec: new_file_832
Expand Down
1 change: 1 addition & 0 deletions orchestration/flows/bl832/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ def __init__(self) -> None:
self.alcf832_raw = self.endpoints["alcf832_raw"]
self.alcf832_scratch = self.endpoints["alcf832_scratch"]
self.scicat = config["scicat"]
self.harbor_images832 = config["harbor_images832"]
132 changes: 132 additions & 0 deletions orchestration/flows/bl832/job_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from abc import ABC, abstractmethod
from dotenv import load_dotenv
from enum import Enum
import logging
from typing import Optional

from orchestration.flows.bl832.config import Config832
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
load_dotenv()


class TomographyHPCController(ABC):
"""
Abstract class for tomography HPC controllers.
Provides interface methods for reconstruction and building multi-resolution datasets.

Args:
ABC: Abstract Base Class
"""
def __init__(
self,
Config832: Optional[Config832] = None
) -> None:
pass

@abstractmethod
def reconstruct(
self,
file_path: str = "",
) -> bool:
"""Perform tomography reconstruction

:param file_path: Path to the file to reconstruct.
:return: True if successful, False otherwise.
"""
pass

@abstractmethod
def build_multi_resolution(
self,
file_path: str = "",
) -> bool:
"""Generate multi-resolution version of reconstructed tomography

:param file_path: Path to the file for which to build multi-resolution data.
:return: True if successful, False otherwise.
"""
pass


class ALCFTomographyHPCController(TomographyHPCController):
"""
Implementation of TomographyHPCController for ALCF.
Methods here leverage Globus Compute for processing tasks.

Args:
TomographyHPCController (ABC): Abstract class for tomography HPC controllers.
"""

def __init__(self) -> None:
pass

def reconstruct(
self,
file_path: str = "",
) -> bool:

# uses Globus Compute to reconstruct the tomography
pass

def build_multi_resolution(
self,
file_path: str = "",
) -> bool:
# uses Globus Compute to build multi-resolution tomography

pass


class HPC(Enum):
"""
Each HPC enum member directly stores a callable that returns a TomographyHPCController.
"""
ALCF = ("ALCF", lambda: ALCFTomographyHPCController())
NERSC = ("NERSC", lambda: NERSCTomographyHPCController(
client=NERSCTomographyHPCController.create_nersc_client()
))
# Ex: add more HPCs here
# OLCF = ("OLCF", lambda: OLCFTomographyHPCController())


def get_controller(
hpc_type: str = None
) -> TomographyHPCController:
"""
Factory function to retrieve the appropriate HPC controller instance based on the given HPC type.

:param hpc_type: The type of HPC environment as a string, (e.g. 'ALCF' or 'NERSC').
:return: An instance of TomographyHPCController for the given HPC environment.
:raises ValueError: If an invalid HPC type is provided.
"""
if not hpc_type:
raise ValueError("No HPC type provided.")

# Convert the string to uppercase and remove whitespace to avoid errors and validate hpc_type against HPC enum.
hpc_enum = HPC(hpc_type.strip().upper())

# Access hpc_enum.value directly. As defined, it should be directly callable.
return hpc_enum.value() # Call the stored class to get a new instance of the selected Controller.


def do_it_all() -> None:
controller = get_controller("ALCF")
controller.reconstruct()
controller.build_multi_resolution()

file_path = ""
controller = get_controller("NERSC")
controller.reconstruct(
file_path=file_path,
)
controller.build_multi_resolution(
file_path=file_path,
)


if __name__ == "__main__":
do_it_all()
logger.info("Done.")
207 changes: 207 additions & 0 deletions orchestration/flows/bl832/nersc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
from dotenv import load_dotenv
import logging
import os
from pathlib import Path
from prefect import flow
from typing import Optional

from orchestration.flows.bl832.config import Config832
from orchestration.flows.bl832.job_controller import get_controller, TomographyHPCController
from orchestration.nersc import NerscClient

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
load_dotenv()


class NERSCTomographyHPCController(TomographyHPCController):
"""
Implementation for a NERSC-based tomography HPC controller.

Submits reconstruction and multi-resolution jobs to NERSC via SFAPI.
"""

def __init__(
self,
client: NerscClient = None,
Config832: Optional[Config832] = None
) -> None:
self.client = client
if not Config832:
self.config = Config832()
else:
self.config = Config832

def create_nersc_client() -> NerscClient:
"""Create and return an NERSC client instance"""

client_id_path = os.getenv("PATH_NERSC_CLIENT_ID")
sfapi_key_path = os.getenv("PATH_NERSC_PRI_KEY")

if not client_id_path or not sfapi_key_path:
logger.error("NERSC credentials paths are missing.")
raise ValueError("Missing NERSC credentials paths.")
if not os.path.isfile(client_id_path) or not os.path.isfile(sfapi_key_path):
logger.error("NERSC credential files are missing.")
raise FileNotFoundError("NERSC credential files are missing.")

try:
return NerscClient(client_id_path, sfapi_key_path)
except Exception as e:
logger.error(f"Failed to create NERSC client: {e}")
raise e

def reconstruct(
self,
file_path: str = "",
) -> bool:
"""
Use NERSC for tomography reconstruction
"""
logger.info("Starting NERSC reconstruction process.")

# Can't use this long term in production. Need to find a better way to handle credentials.
# Want to run this as the alsdev user
username = os.getenv("NERSC_USERNAME")
password = os.getenv("NERSC_PASSWORD")

user = self.client.user()

home_path = f"/global/homes/{user.name[0]}/{user.name}"
scratch_path = f"/pscratch/sd/{user.name[0]}/{user.name}"
logger.info(home_path)
logger.info(scratch_path)

image_name = self.config.harbor_images832["recon_image"]
path = Path(file_path)
folder_name = path.parent.name
file_name = path.stem

# Can't use this long term in production. Need to find a better way to handle credentials.
# Want to run this as the alsdev user
username = os.getenv("NERSC_USERNAME")
password = os.getenv("NERSC_PASSWORD")

job_script = f"""#!/bin/bash
#SBATCH -q preempt
dylanmcreynolds marked this conversation as resolved.
Show resolved Hide resolved
#SBATCH -A als
#SBATCH -C cpu
#SBATCH --job-name=tomorecon_nersc_mpi_hdf5_1-0
#SBATCH --output={scratch_path}/nerscClient-test/%x_%j.out
#SBATCH --error={scratch_path}/nerscClient-test/%x_%j.err
#SBATCH -N 1
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 64
#SBATCH --time=00:15:00
#SBATCH --exclusive

date

srun podman-hpc login registry.nersc.gov --username {username} --password {password}
srun podman-hpc run
--volume {home_path}/tomo_recon_repo/microct/legacy/sfapi_reconstruction.py:/alsuser/sfapi_reconstruction.py
--volume {home_path}/tomo_recon_repo/microct/legacy/input.txt:/alsuser/input.txt
--volume {scratch_path}/microctdata:/alsdata
--volume {scratch_path}/microctdata:/alsuser/ \
registry.nersc.gov/als/{image_name} \
python sfapi_reconstruction.py {file_name} {folder_name}

date
"""

try:
logger.info("Submitting reconstruction job script to Perlmutter.")
job = self.client.perlmutter.submit_job(job_script)
job.complete() # waits for job completion
logger.info("Reconstruction job completed successfully.")
return True
except Exception as e:
davramov marked this conversation as resolved.
Show resolved Hide resolved
logger.error(f"Failed to submit or complete reconstruction job: {e}")
return False

def build_multi_resolution(
self,
file_path: str = "",
) -> bool:
"""Use NERSC to make multiresolution version of tomography results."""

username = os.getenv("NERSC_USERNAME")
password = os.getenv("NERSC_PASSWORD")

user = self.client.user()

home_path = f"/global/homes/{user.name[0]}/{user.name}"
scratch_path = f"/pscratch/sd/{user.name[0]}/{user.name}"
logger.info(home_path)
logger.info(scratch_path)

image_name = self.config.harbor_images832["multires_image"]
recon_path = file_path
raw_path = file_path

# Need to update this script:
# rebuild image with dependencies

job_script = f"""#!/bin/bash
#SBATCH -q preempt
#SBATCH -A als
#SBATCH -C cpu
#SBATCH --job-name=tomorecon_nersc_mpi_hdf5_1-0
#SBATCH --output={scratch_path}/nerscClient-test/%x_%j.out
#SBATCH --error={scratch_path}/nerscClient-test/%x_%j.err
#SBATCH -N 1
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 64
#SBATCH --time=00:15:00
#SBATCH --exclusive

date

srun podman-hpc login registry.nersc.gov --username {username} --password {password}
srun podman-hpc run --volume {home_path}/tomo_recon_repo/microct/legacy/tiff_to_zarr.py:/alsuser/tiff_to_zarr.py \
--volume {home_path}/tomo_recon_repo/microct/legacy/input.txt:/alsuser/input.txt \
--volume {scratch_path}/microctdata:/alsdata \
--volume {scratch_path}/microctdata:/alsuser/ \
registry.nersc.gov/als/{image_name} \
bash -c "python -m pip show ngff_zarr || python -m pip install ngff_zarr && \
python -m pip show dask_image || python -m pip install dask_image && \
python tiff_to_zarr.py {recon_path} --raw_file {raw_path}"

date
"""
try:
logger.info("Submitting Tiff to Zarr job script to Perlmutter.")
job = self.client.perlmutter.submit_job(job_script)
logger.info(f"jobid={job.job_id}")
job.complete() # waits for job completion
logger.info("Tiff to Zarr job completed successfully.")
return True
except Exception as e:
logger.error(f"Failed to submit or complete Tiff to Zarr job: {e}")
return False


@flow(name="nersc_recon_flow")
def nersc_recon_flow(
file_path: str,
) -> bool:
"""
Perform tomography reconstruction on NERSC.

:param file_path: Path to the file to reconstruct.
"""

# To do: Implement file transfers, pruning, and other necessary steps

controller = get_controller("NERSC")
nersc_reconstruction_success = controller.reconstruct(
file_path=file_path,
)
nersc_multi_res_success = controller.build_multi_resolution(
file_path=file_path,
)

if nersc_reconstruction_success and nersc_multi_res_success:
return True
else:
return False
Loading
Loading