Skip to content

Commit

Permalink
implementing batching for parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
mjgleason committed Jan 23, 2025
1 parent 44e2225 commit a763ee8
Showing 1 changed file with 55 additions and 47 deletions.
102 changes: 55 additions & 47 deletions reV/supply_curve/tech_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,60 +369,68 @@ def map_resource(self, tm_dset, max_workers=None, points_per_worker=10):
gid_chunks = ceil(len(self._gids) / points_per_worker)
gid_chunks = np.array_split(self._gids, gid_chunks)

futures = {}
loggers = [__name__, "reV"]
logger.info(f"Kicking off {len(gid_chunks)} resource mapping jobs.")
n_jobs = len(gid_chunks)
batch_size = 100
n_submitted = 0
logger.info(f"Kicking off {n_jobs} resource mapping jobs.")
with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe:
# iterate through split executions, submitting each to worker
for i, gid_set in enumerate(gid_chunks):
# submit executions and append to futures list
excl_coords = self._get_excl_coords(
self._excl_fpath,
gid_set,
self._sc_row_indices,
self._sc_col_indices,
self._excl_row_slices,
self._excl_col_slices,
)
futures[
exe.submit(
self.map_resource_gids,
while n_submitted < n_jobs:
futures = {}
batch_gid_chunks = gid_chunks[
n_submitted:min(n_submitted + batch_size, n_jobs)
]
# iterate through split executions, submitting each to worker
for i, gid_set in enumerate(batch_gid_chunks):
# submit executions and append to futures list
excl_coords = self._get_excl_coords(
self._excl_fpath,
gid_set,
excl_coords,
self._tree,
self.distance_threshold,
)
] = i
logger.info("All jobs submitted.")

with h5py.File(self._excl_fpath, "a") as f:
indices = f[tm_dset]
n_finished = 0
for future in as_completed(futures):
n_finished += 1
logger.info(
"Parallel TechMapping futures collected: "
"{} out of {}".format(n_finished, len(futures))
self._sc_row_indices,
self._sc_col_indices,
self._excl_row_slices,
self._excl_col_slices,
)

i = futures[future]
result = future.result()

for j, gid in enumerate(gid_chunks[i]):
row_slice, col_slice = self._get_excl_slices(
gid,
self._sc_row_indices,
self._sc_col_indices,
self._excl_row_slices,
self._excl_col_slices,
futures[
exe.submit(
self.map_resource_gids,
gid_set,
excl_coords,
self._tree,
self.distance_threshold,
)
n_rows = row_slice.stop - row_slice.start
n_cols = col_slice.stop - col_slice.start
result_shape = (n_rows, n_cols)
indices[row_slice, col_slice] = result[j].reshape(
result_shape
] = i
n_submitted += batch_size
logger.info("{n_submitted} jobs submitted.")

with h5py.File(self._excl_fpath, "a") as f:
indices = f[tm_dset]
n_finished = 0
for future in as_completed(futures):
n_finished += 1
logger.info(
"Parallel TechMapping futures collected: "
"{} out of {}".format(n_finished, len(futures))
)

i = futures[future]
result = future.result()

for j, gid in enumerate(batch_gid_chunks[i]):
row_slice, col_slice = self._get_excl_slices(
gid,
self._sc_row_indices,
self._sc_col_indices,
self._excl_row_slices,
self._excl_col_slices,
)
n_rows = row_slice.stop - row_slice.start
n_cols = col_slice.stop - col_slice.start
result_shape = (n_rows, n_cols)
indices[row_slice, col_slice] = result[j].reshape(
result_shape
)

@classmethod
def run(
cls,
Expand Down

0 comments on commit a763ee8

Please sign in to comment.