From 2b699dd71ef4fab9b7383cc546b02da748f67e32 Mon Sep 17 00:00:00 2001 From: Bob Caddy Date: Mon, 30 Oct 2023 15:19:23 -0400 Subject: [PATCH] Refactor slice & dset_3d scripts with common structure The two scripts now have nearly identical CLI and structure --- python_scripts/cat_dset_3D.py | 244 ++++++++++++++++++++-------------- python_scripts/cat_slice.py | 140 +++++++++---------- 2 files changed, 205 insertions(+), 179 deletions(-) mode change 100644 => 100755 python_scripts/cat_slice.py diff --git a/python_scripts/cat_dset_3D.py b/python_scripts/cat_dset_3D.py index 7c403933e..959d692ae 100755 --- a/python_scripts/cat_dset_3D.py +++ b/python_scripts/cat_dset_3D.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ Python script for concatenating 3D hdf5 datasets. Includes a CLI for concatenating Cholla HDF5 datasets and can be -imported into other scripts where the `concat_3d` function can be used to concatenate the datasets. +imported into other scripts where the `concat_3d_field` function can be used to concatenate the datasets. Generally the easiest way to import this script is to add the `python_scripts` directory to your python path in your script like this: @@ -18,85 +18,10 @@ import pathlib # ====================================================================================================================== -def main(): - """This function handles the CLI argument parsing and is only intended to be used when this script is invoked from the - command line. If you're importing this file then use the `concat_3d` or `concat_3d_single` functions directly. - """ - # Argument handling - cli = argparse.ArgumentParser() - # Required Arguments - cli.add_argument('-s', '--start_num', type=int, required=True, help='The first output step to concatenate') - cli.add_argument('-e', '--end_num', type=int, required=True, help='The last output step to concatenate') - cli.add_argument('-n', '--num_processes', type=int, required=True, help='The number of processes that were used') - # Optional Arguments - cli.add_argument('-i', '--input_dir', type=pathlib.Path, default=pathlib.Path.cwd(), help='The input directory.') - cli.add_argument('-o', '--output_dir', type=pathlib.Path, default=pathlib.Path.cwd(), help='The output directory.') - cli.add_argument('--skip-fields', type=list, default=[], help='List of fields to skip concatenating. Defaults to empty.') - cli.add_argument('--dtype', type=str, default=None, help='The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.') - cli.add_argument('--compression-type', type=str, default=None, help='What kind of compression to use on the output data. Defaults to None.') - cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') - args = cli.parse_args() - - # Perform the concatenation - concat_3d(start_num=args.start_num, - end_num=args.end_num, - num_processes=args.num_processes, - input_dir=args.input_dir, - output_dir=args.output_dir, - skip_fields=args.skip_fields, - destination_dtype=args.dtype, - compression_type=args.compression_type, - compression_options=args.compression_opts) -# ====================================================================================================================== - -# ====================================================================================================================== -def concat_3d(start_num: int, - end_num: int, - num_processes: int, - input_dir: pathlib.Path = pathlib.Path.cwd(), - output_dir: pathlib.Path = pathlib.Path.cwd(), - skip_fields: list = [], - destination_dtype: np.dtype = None, - compression_type: str = None, - compression_options: str = None): - """Concatenate 3D HDF5 Cholla datasets. i.e. take the single files generated per process and concatenate them into a - single, large file. All outputs from start_num to end_num will be concatenated. - - Args: - start_num (int): The first output step to concatenate - end_num (int): The last output step to concatenate - num_processes (int): The number of processes that were used - input_dir (pathlib.Path, optional): The input directory. Defaults to pathlib.Path.cwd(). - output_dir (pathlib.Path, optional): The output directory. Defaults to pathlib.Path.cwd(). - skip_fields (list, optional): List of fields to skip concatenating. Defaults to []. - destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. - compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. - compression_options (str, optional): What compression settings to use if compressing. Defaults to None. - """ - - # Error checking - assert start_num >= 0, 'start_num must be greater than or equal to 0' - assert end_num >= 0, 'end_num must be greater than or equal to 0' - assert start_num <= end_num, 'end_num should be greater than or equal to start_num' - assert num_processes > 1, 'num_processes must be greater than 1' - - # loop over outputs - for n in range(start_num, end_num+1): - concat_3d_single(output_number=n, - num_processes=num_processes, - input_dir=input_dir, - output_dir=output_dir, - skip_fields=skip_fields, - destination_dtype=destination_dtype, - compression_type=compression_type, - compression_options=compression_options) -# ====================================================================================================================== - -# ====================================================================================================================== -def concat_3d_single(output_number: int, +def concat_3d_output(source_directory: pathlib.Path, + output_directory: pathlib.Path, num_processes: int, - input_dir: pathlib.Path = pathlib.Path.cwd(), - output_dir: pathlib.Path = pathlib.Path.cwd(), + output_number: int, skip_fields: list = [], destination_dtype: np.dtype = None, compression_type: str = None, @@ -105,11 +30,10 @@ def concat_3d_single(output_number: int, single, large file. Args: - output_number (int): The output - end_num (int): The last output step to concatenate - num_processes (int): The number of processes that were used - input_dir (pathlib.Path, optional): The input directory. Defaults to pathlib.Path.cwd(). - output_dir (pathlib.Path, optional): The output directory. Defaults to pathlib.Path.cwd(). + source_directory (pathlib.Path): The directory containing the unconcatenated files + output_directory (pathlib.Path): The directory containing the new concatenated files + num_processes (int): The number of ranks that Cholla was run with + output_number (int): The output number to concatenate skip_fields (list, optional): List of fields to skip concatenating. Defaults to []. destination_dtype (np.dtype, optional): The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. @@ -120,13 +44,13 @@ def concat_3d_single(output_number: int, assert num_processes > 1, 'num_processes must be greater than 1' assert output_number >= 0, 'output_number must be greater than or equal to 0' - # open the output file for writing (don't overwrite if exists) - fileout = h5py.File(output_dir / f'{output_number}.h5', 'a') + # open the output file for writing (fail if it exists) + destination_file = h5py.File(output_directory / f'{output_number}.h5', 'w-') # Setup the output file - with h5py.File(input_dir / f'{output_number}.h5.0', 'r') as source_file: + with h5py.File(source_directory / f'{output_number}.h5.0', 'r') as source_file: # Copy header data - fileout = copy_header(source_file, fileout) + destination_file = copy_header(source_file, destination_file) # Create the datasets in the output file datasets_to_copy = list(source_file.keys()) @@ -137,29 +61,42 @@ def concat_3d_single(output_number: int, data_shape = source_file.attrs['dims'] - fileout.create_dataset(name=dataset, - shape=data_shape, - dtype=dtype, - compression=compression_type, - compression_opts=compression_options) + if dataset == 'magnetic_x': data_shape[0] += 1 + if dataset == 'magnetic_y': data_shape[1] += 1 + if dataset == 'magnetic_z': data_shape[2] += 1 + + destination_file.create_dataset(name=dataset, + shape=data_shape, + dtype=dtype, + compression=compression_type, + compression_opts=compression_options) # loop over files for a given output for i in range(0, num_processes): # open the input file for reading - filein = h5py.File(input_dir / f'{output_number}.h5.{i}', 'r') - # read in the header data from the input file - head = filein.attrs + source_file = h5py.File(source_directory / f'{output_number}.h5.{i}', 'r') - # write data from individual processor file to correct location in concatenated file - nx_local, ny_local, nz_local = filein.attrs['dims_local'] - x_start, y_start, z_start = filein.attrs['offset'] + # Compute the offset slicing + nx_local, ny_local, nz_local = source_file.attrs['dims_local'] + x_start, y_start, z_start = source_file.attrs['offset'] + x_end, y_end, z_end = x_start+nx_local, y_start+ny_local, z_start+nz_local + # write data from individual processor file to correct location in concatenated file for dataset in datasets_to_copy: - fileout[dataset][x_start:x_start+nx_local, y_start:y_start+ny_local,z_start:z_start+nz_local] = filein[dataset] + magnetic_offset = [0,0,0] + if dataset == 'magnetic_x': magnetic_offset[0] = 1 + if dataset == 'magnetic_y': magnetic_offset[1] = 1 + if dataset == 'magnetic_z': magnetic_offset[2] = 1 + + destination_file[dataset][x_start:x_end+magnetic_offset[0], + y_start:y_end+magnetic_offset[1], + z_start:z_end+magnetic_offset[2]] = source_file[dataset] - filein.close() + # Now that the copy is done we close the source file + source_file.close() - fileout.close() + # Close destination file now that it is fully constructed + destination_file.close() # ====================================================================================================================== # ============================================================================== @@ -182,5 +119,106 @@ def copy_header(source_file: h5py.File, destination_file: h5py.File): return destination_file # ============================================================================== +# ============================================================================== +def common_cli() -> argparse.ArgumentParser: + """This function provides the basis for the common CLI amongst the various concatenation scripts. It returns an + `argparse.ArgumentParser` object to which additional arguments can be passed before the final `.parse_args()` method + is used. + """ + + # ============================================================================ + # Function used to parse the `--concat-output` argument + def concat_output(raw_argument: str) -> list: + # Check if the string is empty + if len(raw_argument) < 1: + raise ValueError('The --concat-output argument must not be of length zero.') + + # Strip unneeded characters + cleaned_argument = raw_argument.replace(' ', '') + cleaned_argument = cleaned_argument.replace('[', '') + cleaned_argument = cleaned_argument.replace(']', '') + + # Check that it only has the allowed characters + allowed_charaters = set('0123456789,-') + if not set(cleaned_argument).issubset(allowed_charaters): + raise ValueError("Argument contains incorrect characters. Should only contain '0-9', ',', and '-'.") + + # Split on commas + cleaned_argument = cleaned_argument.split(',') + + # Generate the final list + iterable_argument = set() + for arg in cleaned_argument: + if '-' not in arg: + if int(arg) < 0: + raise ValueError() + iterable_argument.add(int(arg)) + else: + start, end = arg.split('-') + start, end = int(start), int(end) + if end < start: + raise ValueError('The end of a range must be larger than the start of the range.') + if start < 0: + raise ValueError() + iterable_argument = iterable_argument.union(set(range(start, end+1))) + + return iterable_argument + # ============================================================================ + + # ============================================================================ + def positive_int(raw_argument: str) -> int: + arg = int(raw_argument) + if arg < 0: + raise ValueError('Argument must be 0 or greater.') + + return arg + # ============================================================================ + + # ============================================================================ + def skip_fields(raw_argument: str) -> list: + # Strip unneeded characters + cleaned_argument = raw_argument.replace(' ', '') + cleaned_argument = cleaned_argument.replace('[', '') + cleaned_argument = cleaned_argument.replace(']', '') + cleaned_argument = cleaned_argument.split(',') + + return cleaned_argument + # ============================================================================ + + # Initialize the CLI + cli = argparse.ArgumentParser() + + # Required Arguments + cli.add_argument('-s', '--source-directory', type=pathlib.Path, required=True, help='The path to the directory for the source HDF5 files.') + cli.add_argument('-o', '--output-directory', type=pathlib.Path, required=True, help='The path to the directory to write out the concatenated HDF5 files.') + cli.add_argument('-n', '--num-processes', type=positive_int, required=True, help='The number of processes that were used') + cli.add_argument('-c', '--concat-outputs', type=concat_output, required=True, help='Which outputs to concatenate. Can be a single number (e.g. 8), a range (e.g. 2-9), or a list (e.g. [1,2,3]). Ranges are inclusive') + + # Optional Arguments + cli.add_argument('--skip-fields', type=skip_fields, default=[], help='List of fields to skip concatenating. Defaults to empty.') + cli.add_argument('--dtype', type=str, default=None, help='The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.') + cli.add_argument('--compression-type', type=str, default=None, help='What kind of compression to use on the output data. Defaults to None.') + cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') + + return cli +# ============================================================================== + if __name__ == '__main__': - main() + from timeit import default_timer + start = default_timer() + + cli = common_cli() + args = cli.parse_args() + + # Perform the concatenation + for output in args.concat_outputs: + concat_3d_output(source_directory=args.source_directory, + output_directory=args.output_directory, + num_processes=args.num_processes, + output_number=output, + skip_fields=args.skip_fields, + destination_dtype=args.dtype, + compression_type=args.compression_type, + compression_options=args.compression_opts) + + print(f'\nTime to execute: {round(default_timer()-start,2)} seconds') diff --git a/python_scripts/cat_slice.py b/python_scripts/cat_slice.py old mode 100644 new mode 100755 index 51aae2d6d..88f66ea2f --- a/python_scripts/cat_slice.py +++ b/python_scripts/cat_slice.py @@ -19,48 +19,12 @@ import pathlib import numpy as np -from cat_dset_3D import copy_header - -# ============================================================================== -def main(): - """This function handles the CLI argument parsing and is only intended to be used when this script is invoked from the - command line. If you're importing this file then use the `concat_slice` function directly. - """ - # Argument handling - cli = argparse.ArgumentParser() - # Required Arguments - cli.add_argument('-s', '--source-directory', type=pathlib.Path, required=True, help='The path to the source HDF5 files.') - cli.add_argument('-o', '--output-file', type=pathlib.Path, required=True, help='The path and filename of the concatenated file.') - cli.add_argument('-n', '--num-processes', type=int, required=True, help='The number of processes that were used to generate the slices.') - cli.add_argument('-t', '--output-num', type=int, required=True, help='The output number to be concatenated') - # Optional Arguments - cli.add_argument('--xy', type=bool, default=True, help='If True then concatenate the XY slice. Defaults to True.') - cli.add_argument('--yz', type=bool, default=True, help='If True then concatenate the YZ slice. Defaults to True.') - cli.add_argument('--xz', type=bool, default=True, help='If True then concatenate the XZ slice. Defaults to True.') - cli.add_argument('--skip-fields', type=list, default=[], help='List of fields to skip concatenating. Defaults to empty.') - cli.add_argument('--dtype', type=str, default=None, help='The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.') - cli.add_argument('--compression-type', type=str, default=None, help='What kind of compression to use on the output data. Defaults to None.') - cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') - args = cli.parse_args() - - # Perform the concatenation - concat_slice(source_directory=args.source_directory, - destination_file_path=args.output_file, - num_ranks=args.num_processses, - output_number=args.output_num, - concat_xy=args.xy, - concat_yz=args.yz, - concat_xz=args.xz, - skip_fields=args.skip_fields, - destination_dtype=args.dtype, - compression_type=args.compression_type, - compression_options=args.compression_opts) -# ============================================================================== +from cat_dset_3D import copy_header, common_cli # ============================================================================== def concat_slice(source_directory: pathlib.Path, - destination_file_path: pathlib.Path, - num_ranks: int, + output_directory: pathlib.Path, + num_processes: int, output_number: int, concat_xy: bool = True, concat_yz: bool = True, @@ -76,8 +40,8 @@ def concat_slice(source_directory: pathlib.Path, Args: source_directory (pathlib.Path): The directory containing the unconcatenated files - destination_file_path (pathlib.Path): The path and name of the new concatenated file - num_ranks (int): The number of ranks that Cholla was run with + output_directory (pathlib.Path): The directory containing the new concatenated files + num_processes (int): The number of ranks that Cholla was run with output_number (int): The output number to concatenate concat_xy (bool, optional): If True then concatenate the XY slice. Defaults to True. concat_yz (bool, optional): If True then concatenate the YZ slice. Defaults to True. @@ -87,53 +51,57 @@ def concat_slice(source_directory: pathlib.Path, compression_type (str, optional): What kind of compression to use on the output data. Defaults to None. compression_options (str, optional): What compression settings to use if compressing. Defaults to None. """ - # Open destination file and first file for getting metadata - source_file = h5py.File(source_directory / f'{output_number}_slice.h5.0', 'r') - destination_file = h5py.File(destination_file_path, 'w') - - # Copy over header - destination_file = copy_header(source_file, destination_file) - # Get a list of all datasets in the source file - datasets_to_copy = list(source_file.keys()) + # Error checking + assert num_processes > 1, 'num_processes must be greater than 1' + assert output_number >= 0, 'output_number must be greater than or equal to 0' - # Filter the datasets to only include those I wish to copy - if not concat_xy: - datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'xy' in dataset] - if not concat_yz: - datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'yz' in dataset] - if not concat_xz: - datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'xz' in dataset] - datasets_to_copy = [dataset for dataset in datasets_to_copy if not dataset in skip_fields] - - # Create the datasets in the destination file - for dataset in datasets_to_copy: - dtype = source_file[dataset].dtype if (destination_dtype == None) else destination_dtype - - slice_shape = get_slice_shape(source_file, dataset) + # Open destination file and first file for getting metadata + destination_file = h5py.File(output_directory / f'{output_number}_slice.h5', 'w-') + + # Setup the output file + with h5py.File(source_directory / f'{output_number}_slice.h5.0', 'r') as source_file: + # Copy over header + destination_file = copy_header(source_file, destination_file) + + # Get a list of all datasets in the source file + datasets_to_copy = list(source_file.keys()) + + # Filter the datasets to only include those that need to be copied + if not concat_xy: + datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'xy' in dataset] + if not concat_yz: + datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'yz' in dataset] + if not concat_xz: + datasets_to_copy = [dataset for dataset in datasets_to_copy if not 'xz' in dataset] + datasets_to_copy = [dataset for dataset in datasets_to_copy if not dataset in skip_fields] + + # Create the datasets in the destination file + for dataset in datasets_to_copy: + dtype = source_file[dataset].dtype if (destination_dtype == None) else destination_dtype - destination_file.create_dataset(name=dataset, - shape=slice_shape, - dtype=dtype, - compression=compression_type, - compression_opts=compression_options) + slice_shape = __get_slice_shape(source_file, dataset) - # Close source file in prep for looping through source files - source_file.close() + destination_file.create_dataset(name=dataset, + shape=slice_shape, + dtype=dtype, + compression=compression_type, + compression_opts=compression_options) # Copy data - for rank in range(num_ranks): + for rank in range(num_processes): # Open source file source_file = h5py.File(source_directory / f'{output_number}_slice.h5.{rank}', 'r') # Loop through and copy datasets for dataset in datasets_to_copy: # Determine locations and shifts for writing - (i0_start, i0_end, i1_start, i1_end), file_in_slice = write_bounds(source_file, dataset) + (i0_start, i0_end, i1_start, i1_end), file_in_slice = __write_bounds_slice(source_file, dataset) if file_in_slice: # Copy the data - destination_file[dataset][i0_start:i0_end, i1_start:i1_end] = source_file[dataset] + destination_file[dataset][i0_start:i0_end, + i1_start:i1_end] = source_file[dataset] # Now that the copy is done we close the source file source_file.close() @@ -143,7 +111,7 @@ def concat_slice(source_directory: pathlib.Path, # ============================================================================== # ============================================================================== -def get_slice_shape(source_file: h5py.File, dataset: str): +def __get_slice_shape(source_file: h5py.File, dataset: str): """Determine the shape of the full slice in a dataset Args: @@ -171,7 +139,7 @@ def get_slice_shape(source_file: h5py.File, dataset: str): # ============================================================================== # ============================================================================== -def write_bounds(source_file: h5py.File, dataset: str): +def __write_bounds_slice(source_file: h5py.File, dataset: str): """Determine the bounds of the concatenated file to write to Args: @@ -206,5 +174,25 @@ def write_bounds(source_file: h5py.File, dataset: str): if __name__ == '__main__': from timeit import default_timer start = default_timer() - main() + + cli = common_cli() + cli.add_argument('--disable-xy', default=True, action='store_false', help='Disables concating the XY slice.') + cli.add_argument('--disable-yz', default=True, action='store_false', help='Disables concating the YZ slice.') + cli.add_argument('--disable-xz', default=True, action='store_false', help='Disables concating the XZ slice.') + args = cli.parse_args() + + # Perform the concatenation + for output in args.concat_outputs: + concat_slice(source_directory=args.source_directory, + output_directory=args.output_directory, + num_processes=args.num_processes, + output_number=output, + concat_xy=args.disable_xy, + concat_yz=args.disable_yz, + concat_xz=args.disable_xz, + skip_fields=args.skip_fields, + destination_dtype=args.dtype, + compression_type=args.compression_type, + compression_options=args.compression_opts) + print(f'\nTime to execute: {round(default_timer()-start,2)} seconds')