Skip to content

Commit

Permalink
Staging multiprocess automation into dev (#997) (#998)
Browse files Browse the repository at this point in the history
  • Loading branch information
clee1152 authored and taylorfturner committed Aug 4, 2023
1 parent 09e345f commit ede6df1
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 2 deletions.
9 changes: 7 additions & 2 deletions dataprofiler/profilers/profile_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2867,9 +2867,14 @@ def tqdm(level: set[int]) -> Generator[int, None, None]:
)
)

# If options.multiprocess is enabled, auto-toggle multiprocessing
auto_multiprocess_toggle = False
if self.options.multiprocess.is_enabled:
auto_multiprocess_toggle = profiler_utils.auto_multiprocess_toggle(data)

# Generate pool and estimate datasize
pool = None
if self.options.multiprocess.is_enabled:
if auto_multiprocess_toggle:
est_data_size = data[:50000].memory_usage(index=False, deep=True).sum()
est_data_size = (est_data_size / min(50000, len(data))) * len(data)
pool, pool_size = profiler_utils.generate_pool(
Expand Down Expand Up @@ -2990,7 +2995,7 @@ def tqdm(level: set[int]) -> Generator[int, None, None]:
# Process and label the data
notification_str = "Calculating the statistics... "
pool = None
if self.options.multiprocess.is_enabled:
if auto_multiprocess_toggle:
pool, pool_size = profiler_utils.generate_pool(4, est_data_size)
if pool:
notification_str += " (with " + str(pool_size) + " processes)"
Expand Down
28 changes: 28 additions & 0 deletions dataprofiler/profilers/profiler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,34 @@ def partition(data: list, chunk_size: int) -> Generator[list, None, Any]:
yield data[idx : idx + chunk_size]


def auto_multiprocess_toggle(
data: DataFrame,
num_rows_threshold: int = 750000,
num_cols_threshold: int = 20,
) -> bool:
"""
Automate multiprocessing toggle depending on dataset sizes.
:param data: a dataset
:type data: pandas.DataFrame
:param num_rows_threshold: threshold for number of rows to
use multiprocess
:type num_rows_threshold: int
:param num_cols_threshold: threshold for number of columns
to use multiprocess
:type num_cols_threshold: int
:return: recommended option.multiprocess.is_enabled value
:rtype: bool
"""
# If the number of rows or columns exceed their respective threshold,
# we want to turn on multiprocessing
if data.shape[0] > num_rows_threshold or data.shape[1] > num_cols_threshold:
return True
# Otherwise, we do not turn on multiprocessing
else:
return False


def suggest_pool_size(data_size: int = None, cols: int = None) -> int | None:
"""
Suggest the pool size based on resources.
Expand Down
49 changes: 49 additions & 0 deletions dataprofiler/tests/profilers/test_profiler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,52 @@ def test_odd_merge_profile_list(self, mock_data_labeler, *mocks):

self.assertEqual(1, single_report["data_stats"][0]["statistics"]["min"])
self.assertEqual(60.0, single_report["data_stats"][0]["statistics"]["max"])


class TestAutoMultiProcessToggle(unittest.TestCase):

"""
Validate profile_utils.auto_multiprocess_toggle is properly working.
"""

def test_auto_multiprocess_toggle(self):
rows_threshold = 5
cols_threshold = 10

# Test for no multiprocessing for sufficiently small datasets
data = pd.DataFrame(np.random.random((2, 5)))
self.assertFalse(
profiler_utils.auto_multiprocess_toggle(
data, rows_threshold, cols_threshold
)
)
data = pd.DataFrame(np.random.random((5, 10)))
self.assertFalse(
profiler_utils.auto_multiprocess_toggle(
data, rows_threshold, cols_threshold
)
)

# Test for multiprocessing with only rows passing threshold
data = pd.DataFrame(np.random.random((6, 10)))
self.assertTrue(
profiler_utils.auto_multiprocess_toggle(
data, rows_threshold, cols_threshold
)
)

# Test for multiprocessing with only columns passing threshold
data = pd.DataFrame(np.random.random((5, 11)))
self.assertTrue(
profiler_utils.auto_multiprocess_toggle(
data, rows_threshold, cols_threshold
)
)

# Test for multiprocessing with both rows and columns passing threshold
data = pd.DataFrame(np.random.random((6, 11)))
self.assertTrue(
profiler_utils.auto_multiprocess_toggle(
data, rows_threshold, cols_threshold
)
)

0 comments on commit ede6df1

Please sign in to comment.