From e7dbaea53a6d87aa9ab3280b7a060f4701b4b9a9 Mon Sep 17 00:00:00 2001 From: Christopher Lee Date: Thu, 3 Aug 2023 09:22:41 -0400 Subject: [PATCH] Staging multiprocess automation into dev --- dataprofiler/profilers/profile_builder.py | 9 +++- dataprofiler/profilers/profiler_utils.py | 28 +++++++++++ .../tests/profilers/test_profiler_utils.py | 49 +++++++++++++++++++ 3 files changed, 84 insertions(+), 2 deletions(-) diff --git a/dataprofiler/profilers/profile_builder.py b/dataprofiler/profilers/profile_builder.py index 4cc3e5c6b..113d19ef2 100644 --- a/dataprofiler/profilers/profile_builder.py +++ b/dataprofiler/profilers/profile_builder.py @@ -2867,9 +2867,14 @@ def tqdm(level: set[int]) -> Generator[int, None, None]: ) ) + # If options.multiprocess is enabled, auto-toggle multiprocessing + auto_multiprocess_toggle = False + if self.options.multiprocess.is_enabled: + auto_multiprocess_toggle = profiler_utils.auto_multiprocess_toggle(data) + # Generate pool and estimate datasize pool = None - if self.options.multiprocess.is_enabled: + if auto_multiprocess_toggle: est_data_size = data[:50000].memory_usage(index=False, deep=True).sum() est_data_size = (est_data_size / min(50000, len(data))) * len(data) pool, pool_size = profiler_utils.generate_pool( @@ -2990,7 +2995,7 @@ def tqdm(level: set[int]) -> Generator[int, None, None]: # Process and label the data notification_str = "Calculating the statistics... " pool = None - if self.options.multiprocess.is_enabled: + if auto_multiprocess_toggle: pool, pool_size = profiler_utils.generate_pool(4, est_data_size) if pool: notification_str += " (with " + str(pool_size) + " processes)" diff --git a/dataprofiler/profilers/profiler_utils.py b/dataprofiler/profilers/profiler_utils.py index ff11dbb13..a3ed375b4 100644 --- a/dataprofiler/profilers/profiler_utils.py +++ b/dataprofiler/profilers/profiler_utils.py @@ -177,6 +177,34 @@ def partition(data: list, chunk_size: int) -> Generator[list, None, Any]: yield data[idx : idx + chunk_size] +def auto_multiprocess_toggle( + data: DataFrame, + num_rows_threshold: int = 750000, + num_cols_threshold: int = 20, +) -> bool: + """ + Automate multiprocessing toggle depending on dataset sizes. + + :param data: a dataset + :type data: pandas.DataFrame + :param num_rows_threshold: threshold for number of rows to + use multiprocess + :type num_rows_threshold: int + :param num_cols_threshold: threshold for number of columns + to use multiprocess + :type num_cols_threshold: int + :return: recommended option.multiprocess.is_enabled value + :rtype: bool + """ + # If the number of rows or columns exceed their respective threshold, + # we want to turn on multiprocessing + if data.shape[0] > num_rows_threshold or data.shape[1] > num_cols_threshold: + return True + # Otherwise, we do not turn on multiprocessing + else: + return False + + def suggest_pool_size(data_size: int = None, cols: int = None) -> int | None: """ Suggest the pool size based on resources. diff --git a/dataprofiler/tests/profilers/test_profiler_utils.py b/dataprofiler/tests/profilers/test_profiler_utils.py index fcef4fb57..4eee1963a 100644 --- a/dataprofiler/tests/profilers/test_profiler_utils.py +++ b/dataprofiler/tests/profilers/test_profiler_utils.py @@ -469,3 +469,52 @@ def test_odd_merge_profile_list(self, mock_data_labeler, *mocks): self.assertEqual(1, single_report["data_stats"][0]["statistics"]["min"]) self.assertEqual(60.0, single_report["data_stats"][0]["statistics"]["max"]) + + +class TestAutoMultiProcessToggle(unittest.TestCase): + + """ + Validate profile_utils.auto_multiprocess_toggle is properly working. + """ + + def test_auto_multiprocess_toggle(self): + rows_threshold = 5 + cols_threshold = 10 + + # Test for no multiprocessing for sufficiently small datasets + data = pd.DataFrame(np.random.random((2, 5))) + self.assertFalse( + profiler_utils.auto_multiprocess_toggle( + data, rows_threshold, cols_threshold + ) + ) + data = pd.DataFrame(np.random.random((5, 10))) + self.assertFalse( + profiler_utils.auto_multiprocess_toggle( + data, rows_threshold, cols_threshold + ) + ) + + # Test for multiprocessing with only rows passing threshold + data = pd.DataFrame(np.random.random((6, 10))) + self.assertTrue( + profiler_utils.auto_multiprocess_toggle( + data, rows_threshold, cols_threshold + ) + ) + + # Test for multiprocessing with only columns passing threshold + data = pd.DataFrame(np.random.random((5, 11))) + self.assertTrue( + profiler_utils.auto_multiprocess_toggle( + data, rows_threshold, cols_threshold + ) + ) + + # Test for multiprocessing with both rows and columns passing threshold + data = pd.DataFrame(np.random.random((6, 11))) + self.assertTrue( + profiler_utils.auto_multiprocess_toggle( + data, rows_threshold, cols_threshold + ) + )