Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NERSC SFAPI Reconstruction Flow #44

Merged
merged 25 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b4c26f5
WIP: Oct 3rd update for nersc recon with podman-hpc container
rajasriramoju Nov 12, 2024
37632c7
Adding Raja's nersc.py script
davramov Nov 13, 2024
b128b2c
able to get nersc reconstruction running in this configuration
davramov Dec 4, 2024
0463739
Added an sfapi slurm call to run tiff_to_zarr on nersc
davramov Dec 4, 2024
91356bd
Merge branch 'als-computing:main' into nersc_sfapi
davramov Dec 4, 2024
e1e7add
Migrating nersc tomography flow code to an abstract class structure (…
davramov Dec 6, 2024
8c5d20a
Addressing dylan's comments. Moved image names to config.yml and conf…
davramov Dec 7, 2024
235acda
Moved NERSC specific NERSCTomographyHPCController to flows/bl832/ners…
davramov Dec 9, 2024
9afefa3
Configured both reconstruction and multires steps to take in a file p…
davramov Dec 11, 2024
6304fe9
Resolved circular import issue, NERSC controller is not defined in fl…
davramov Dec 12, 2024
8e16fcc
Catch sfapi errors after submitting jobs, such that we are able to wa…
davramov Dec 13, 2024
c6601be
Adding test_sfapi_flow.py pytest script.
davramov Dec 14, 2024
6057aae
Addressing prefect secret patch issue in pytest script
davramov Dec 14, 2024
f9548a8
Removing the NerscClient dependency, and using sfapi_client module di…
davramov Dec 17, 2024
3fe95b5
Adding deployment configuration for the nersc_recon_flow
davramov Dec 17, 2024
6a4d2bf
Added a comments in indicating that NerscClient is deprecated and wi…
davramov Dec 19, 2024
223c3b4
Updated NERSC job script to pull from ghcr.io image instead of Harbor…
davramov Dec 20, 2024
79f128c
Updated config to include location of reconstruction scripts on NERSC…
davramov Jan 14, 2025
fae190f
Update pruning code in nersc.py, although it still needs testing. Add…
davramov Jan 15, 2025
1b2ae5c
Added transfer_controller.py, and use the GlobusTransferController to…
davramov Jan 17, 2025
e2963f0
Included new controller class for data transfers and refactored the N…
davramov Jan 21, 2025
f30dd28
Added placeholders for ALCFTomographyHPCController in alcf.py and OLC…
davramov Jan 21, 2025
fd62200
Added a pytest script for the new transfer_controller and globus/simp…
davramov Jan 28, 2025
f74a7a3
Updated unittests
davramov Jan 28, 2025
087db2d
removed @dataclass decorator because it is redundant with an __init__…
davramov Jan 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ globus:
client_id: ${GLOBUS_CLIENT_ID}
client_secret: ${GLOBUS_CLIENT_SECRET}

harbor_images832:
recon_image: tomorecon_nersc_mpi_hdf5@sha256:cc098a2cfb6b1632ea872a202c66cb7566908da066fd8f8c123b92fa95c2a43c
multires_image: tomorecon_nersc_mpi_hdf5@sha256:cc098a2cfb6b1632ea872a202c66cb7566908da066fd8f8c123b92fa95c2a43c

prefect:
deployments:
- type_spec: new_file_832
Expand Down
1 change: 1 addition & 0 deletions orchestration/flows/bl832/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ def __init__(self) -> None:
self.alcf832_raw = self.endpoints["alcf832_raw"]
self.alcf832_scratch = self.endpoints["alcf832_scratch"]
self.scicat = config["scicat"]
self.harbor_images832 = config["harbor_images832"]
332 changes: 332 additions & 0 deletions orchestration/flows/bl832/job_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
from abc import ABC, abstractmethod
from dotenv import load_dotenv
from enum import Enum
import logging
import os
from pathlib import Path
import time
from typing import Callable, Optional

from orchestration.flows.bl832.config import Config832
# from orchestration.flows.bl832.nersc import NERSCTomographyHPCController
from orchestration.nersc import NerscClient

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
load_dotenv()


class TomographyHPCController(ABC):
"""
Abstract class for tomography HPC controllers.
Provides interface methods for reconstruction and building multi-resolution datasets.

Args:
ABC: Abstract Base Class
"""
def __init__(
self,
Config832: Optional[Config832] = None
) -> None:
pass

@abstractmethod
def reconstruct(
self,
file_path: str = "",
) -> bool:
"""Perform tomography reconstruction

:param file_path: Path to the file to reconstruct.
:return: True if successful, False otherwise.
"""
pass

@abstractmethod
def build_multi_resolution(
self,
file_path: str = "",
) -> bool:
"""Generate multi-resolution version of reconstructed tomography

:param file_path: Path to the file for which to build multi-resolution data.
:return: True if successful, False otherwise.
"""
pass


class ALCFTomographyHPCController(TomographyHPCController):
"""
Implementation of TomographyHPCController for ALCF.
Methods here leverage Globus Compute for processing tasks.

Args:
TomographyHPCController (ABC): Abstract class for tomography HPC controllers.
"""

def __init__(self) -> None:
pass

def reconstruct(
self,
file_path: str = "",
) -> bool:

# uses Globus Compute to reconstruct the tomography
pass

def build_multi_resolution(
self,
file_path: str = "",
) -> bool:
# uses Globus Compute to build multi-resolution tomography

pass


class NERSCTomographyHPCController(TomographyHPCController):
davramov marked this conversation as resolved.
Show resolved Hide resolved
"""
Implementation for a NERSC-based tomography HPC controller.

Submits reconstruction and multi-resolution jobs to NERSC via SFAPI.
"""

def __init__(
self,
client: NerscClient = None,
config: Optional[Config832] = None
) -> None:
self.client = client

if not config:
self.config = Config832()
else:
self.config = config

def create_nersc_client() -> NerscClient:
"""Create and return an NERSC client instance"""

client_id_path = os.getenv("PATH_NERSC_CLIENT_ID")
sfapi_key_path = os.getenv("PATH_NERSC_PRI_KEY")

if not client_id_path or not sfapi_key_path:
logger.error("NERSC credentials paths are missing.")
raise ValueError("Missing NERSC credentials paths.")
if not os.path.isfile(client_id_path) or not os.path.isfile(sfapi_key_path):
logger.error("NERSC credential files are missing.")
raise FileNotFoundError("NERSC credential files are missing.")

try:
return NerscClient(client_id_path, sfapi_key_path)
except Exception as e:
logger.error(f"Failed to create NERSC client: {e}")
raise e

def reconstruct(
self,
file_path: str = "",
) -> bool:
"""
Use NERSC for tomography reconstruction
"""
logger.info("Starting NERSC reconstruction process.")

# Can't use this long term in production. Need to find a better way to handle credentials.
# Want to run this as the alsdev user
# username = os.getenv("NERSC_USERNAME")
# password = os.getenv("NERSC_PASSWORD")

user = self.client.user()

home_path = f"/global/homes/{user.name[0]}/{user.name}"
scratch_path = f"/pscratch/sd/{user.name[0]}/{user.name}"
logger.info(home_path)
logger.info(scratch_path)

image_name = self.config.harbor_images832["recon_image"]
logger.info(image_name)
path = Path(file_path)
folder_name = path.parent.name
if not folder_name:
folder_name = ""

file_name = f"{path.stem}.h5"

logger.info(f"File name: {file_name}")
logger.info(f"Folder name: {folder_name}")

# IMPORTANT: job script must be deindented to the leftmost column or it will fail immediately
# Note: If q=debug, there is no minimum time limit
# However, if q=preempt, there is a minimum time limit of 2 hours. Otherwise the job won't run.

# If the image has not been pulled before, then you must login to Harbor first (hopefully we can get a robot account)
# srun podman-hpc login registry.nersc.gov --username {username} --password {password}
# --volume {home_path}/tomo_recon_repo/microct/legacy/input.txt:/alsuser/input.txt \

job_script = f"""#!/bin/bash
#SBATCH -q debug
#SBATCH -A als
#SBATCH -C cpu
#SBATCH --job-name=tomo_recon_test-0
#SBATCH --output={scratch_path}/nerscClient-test/%x_%j.out
#SBATCH --error={scratch_path}/nerscClient-test/%x_%j.err
#SBATCH -N 1
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 64
#SBATCH --time=0:15:00
#SBATCH --exclusive

date
srun podman-hpc run \
--volume {home_path}/tomo_recon_repo/microct/legacy/sfapi_reconstruction.py:/alsuser/sfapi_reconstruction.py \
--volume {scratch_path}/microctdata:/alsdata \
--volume {scratch_path}/microctdata:/alsuser/ \
registry.nersc.gov/als/{image_name} \
bash -c "python -m pip install numpy==1.23.2 && \
python sfapi_reconstruction.py {file_name} {folder_name}"
date
"""

try:
logger.info("Submitting reconstruction job script to Perlmutter.")
job = self.client.perlmutter.submit_job(job_script)
logging.info(job.jobid)
job.update()
time.sleep(60) # Wait 60 seconds for job to register before checking status
logging.info(job.state)
job.complete() # waits for job completion
logger.info("Reconstruction job completed successfully.")
return True
except Exception as e:
logger.error(f"Failed to submit or complete reconstruction job: {e}")
return False

def build_multi_resolution(
self,
file_path: str = "",
) -> bool:
"""Use NERSC to make multiresolution version of tomography results."""

# username = os.getenv("NERSC_USERNAME")
# password = os.getenv("NERSC_PASSWORD")

user = self.client.user()

home_path = f"/global/homes/{user.name[0]}/{user.name}"
scratch_path = f"/pscratch/sd/{user.name[0]}/{user.name}"
logger.info(home_path)
logger.info(scratch_path)

image_name = self.config.harbor_images832["multires_image"]

# TODO: fix these paths

path = Path(file_path)
folder_name = path.parent.name
file_name = path.stem

recon_path = f"scratch/{folder_name}/rec{file_name}/"
raw_path = f"{folder_name}/{file_name}.h5"

# Need to update this script:
# rebuild image with dependencies

# IMPORTANT: job script must be deindented to the leftmost column or it will fail immediately
job_script = f"""#!/bin/bash
#SBATCH -q debug
#SBATCH -A als
#SBATCH -C cpu
#SBATCH --job-name=tomo_multires_test-0
#SBATCH --output={scratch_path}/nerscClient-test/%x_%j.out
#SBATCH --error={scratch_path}/nerscClient-test/%x_%j.err
#SBATCH -N 1
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 64
#SBATCH --time=0:15:00
#SBATCH --exclusive

date
srun podman-hpc run --volume {home_path}/tomo_recon_repo/microct/legacy/tiff_to_zarr.py:/alsuser/tiff_to_zarr.py \
--volume {home_path}/tomo_recon_repo/microct/legacy/input.txt:/alsuser/input.txt \
--volume {scratch_path}/microctdata:/alsdata \
--volume {scratch_path}/microctdata:/alsuser/ \
registry.nersc.gov/als/{image_name} \
bash -c "python -m pip show ngff_zarr || python -m pip install ngff_zarr && \
python -m pip show dask_image || python -m pip install dask_image && \
python tiff_to_zarr.py {recon_path} --raw_file {raw_path}"

date
"""
try:
logger.info("Submitting Tiff to Zarr job script to Perlmutter.")
job = self.client.perlmutter.submit_job(job_script)
time.sleep(30) # Wait 30 seconds before checking job completion
job.complete() # waits for job completion
logger.info("Tiff to Zarr job completed successfully.")
return True
except Exception as e:
logger.error(f"Failed to submit or complete Tiff to Zarr job: {e}")
return False


class HPC(Enum):
"""
Enum representing different HPC environments.
Use enum names as strings to identify HPC sites, ensuring a standard set of values.

Members:
ALCF: Argonne Leadership Computing Facility
NERSC: National Energy Research Scientific Computing Center
"""
ALCF = "ALCF"
NERSC = "NERSC"


def get_controller(hpc_type: str) -> TomographyHPCController:
"""
Factory function that returns an HPC controller instance for the given HPC environment.

:param hpc_type: A string identifying the HPC environment (e.g., 'ALCF', 'NERSC').
:return: An instance of a TomographyHPCController subclass corresponding to the given HPC environment.
:raises ValueError: If an invalid or unsupported HPC type is specified.
"""
if not hpc_type:
raise ValueError("No HPC type provided.")

# Normalize input
hpc_str = hpc_type.strip().upper()

# Attempt to map the given string to the HPC enum
try:
hpc_enum = HPC(hpc_str)
except ValueError:
raise ValueError(f"'{hpc_type}' is not a valid HPC") from None

# Map HPC enum members to corresponding controller constructors
controller_map: dict[HPC, Callable[[], TomographyHPCController]] = {
HPC.ALCF: lambda: ALCFTomographyHPCController(),
HPC.NERSC: lambda: NERSCTomographyHPCController(NERSCTomographyHPCController.create_nersc_client()),
}
davramov marked this conversation as resolved.
Show resolved Hide resolved

# Return a new controller instance
return controller_map[hpc_enum]()


def do_it_all() -> None:
controller = get_controller("ALCF")
controller.reconstruct()
controller.build_multi_resolution()

file_path = ""
controller = get_controller("NERSC")
controller.reconstruct(
file_path=file_path,
)
controller.build_multi_resolution(
file_path=file_path,
)


if __name__ == "__main__":
do_it_all()
logger.info("Done.")
44 changes: 44 additions & 0 deletions orchestration/flows/bl832/nersc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from dotenv import load_dotenv
import logging
# import os
# from pathlib import Path
from prefect import flow
# from typing import Optional

# from orchestration.flows.bl832.config import Config832
from orchestration.flows.bl832.job_controller import get_controller
# from orchestration.nersc import NerscClient

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
load_dotenv()


@flow(name="nersc_recon_flow")
def nersc_recon_flow(
file_path: str,
) -> bool:
"""
Perform tomography reconstruction on NERSC.

:param file_path: Path to the file to reconstruct.
"""

# To do: Implement file transfers, pruning, and other necessary steps

controller = get_controller("NERSC")
nersc_reconstruction_success = controller.reconstruct(
file_path=file_path,
)
nersc_multi_res_success = controller.build_multi_resolution(
file_path=file_path,
)

if nersc_reconstruction_success and nersc_multi_res_success:
return True
else:
return False


if __name__ == "__main__":
nersc_recon_flow(file_path="dabramov/20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast.h5")
Loading
Loading