From a763ee840731817545bed9b6b5128bc3d7a6248b Mon Sep 17 00:00:00 2001 From: Michael Gleason Date: Thu, 23 Jan 2025 07:21:53 -0700 Subject: [PATCH] implementing batching for parallelization --- reV/supply_curve/tech_mapping.py | 102 +++++++++++++++++-------------- 1 file changed, 55 insertions(+), 47 deletions(-) diff --git a/reV/supply_curve/tech_mapping.py b/reV/supply_curve/tech_mapping.py index ba2f36611..0ba73bb11 100644 --- a/reV/supply_curve/tech_mapping.py +++ b/reV/supply_curve/tech_mapping.py @@ -369,60 +369,68 @@ def map_resource(self, tm_dset, max_workers=None, points_per_worker=10): gid_chunks = ceil(len(self._gids) / points_per_worker) gid_chunks = np.array_split(self._gids, gid_chunks) - futures = {} loggers = [__name__, "reV"] - logger.info(f"Kicking off {len(gid_chunks)} resource mapping jobs.") + n_jobs = len(gid_chunks) + batch_size = 100 + n_submitted = 0 + logger.info(f"Kicking off {n_jobs} resource mapping jobs.") with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe: - # iterate through split executions, submitting each to worker - for i, gid_set in enumerate(gid_chunks): - # submit executions and append to futures list - excl_coords = self._get_excl_coords( - self._excl_fpath, - gid_set, - self._sc_row_indices, - self._sc_col_indices, - self._excl_row_slices, - self._excl_col_slices, - ) - futures[ - exe.submit( - self.map_resource_gids, + while n_submitted < n_jobs: + futures = {} + batch_gid_chunks = gid_chunks[ + n_submitted:min(n_submitted + batch_size, n_jobs) + ] + # iterate through split executions, submitting each to worker + for i, gid_set in enumerate(batch_gid_chunks): + # submit executions and append to futures list + excl_coords = self._get_excl_coords( + self._excl_fpath, gid_set, - excl_coords, - self._tree, - self.distance_threshold, - ) - ] = i - logger.info("All jobs submitted.") - - with h5py.File(self._excl_fpath, "a") as f: - indices = f[tm_dset] - n_finished = 0 - for future in as_completed(futures): - n_finished += 1 - logger.info( - "Parallel TechMapping futures collected: " - "{} out of {}".format(n_finished, len(futures)) + self._sc_row_indices, + self._sc_col_indices, + self._excl_row_slices, + self._excl_col_slices, ) - - i = futures[future] - result = future.result() - - for j, gid in enumerate(gid_chunks[i]): - row_slice, col_slice = self._get_excl_slices( - gid, - self._sc_row_indices, - self._sc_col_indices, - self._excl_row_slices, - self._excl_col_slices, + futures[ + exe.submit( + self.map_resource_gids, + gid_set, + excl_coords, + self._tree, + self.distance_threshold, ) - n_rows = row_slice.stop - row_slice.start - n_cols = col_slice.stop - col_slice.start - result_shape = (n_rows, n_cols) - indices[row_slice, col_slice] = result[j].reshape( - result_shape + ] = i + n_submitted += batch_size + logger.info("{n_submitted} jobs submitted.") + + with h5py.File(self._excl_fpath, "a") as f: + indices = f[tm_dset] + n_finished = 0 + for future in as_completed(futures): + n_finished += 1 + logger.info( + "Parallel TechMapping futures collected: " + "{} out of {}".format(n_finished, len(futures)) ) + i = futures[future] + result = future.result() + + for j, gid in enumerate(batch_gid_chunks[i]): + row_slice, col_slice = self._get_excl_slices( + gid, + self._sc_row_indices, + self._sc_col_indices, + self._excl_row_slices, + self._excl_col_slices, + ) + n_rows = row_slice.stop - row_slice.start + n_cols = col_slice.stop - col_slice.start + result_shape = (n_rows, n_cols) + indices[row_slice, col_slice] = result[j].reshape( + result_shape + ) + @classmethod def run( cls,