Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staging: into dev feature/multiprocess #998

Merged
merged 1 commit into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions dataprofiler/profilers/profile_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2867,9 +2867,14 @@ def tqdm(level: set[int]) -> Generator[int, None, None]:
)
)

# If options.multiprocess is enabled, auto-toggle multiprocessing
auto_multiprocess_toggle = False
if self.options.multiprocess.is_enabled:
auto_multiprocess_toggle = profiler_utils.auto_multiprocess_toggle(data)

# Generate pool and estimate datasize
pool = None
if self.options.multiprocess.is_enabled:
if auto_multiprocess_toggle:
est_data_size = data[:50000].memory_usage(index=False, deep=True).sum()
est_data_size = (est_data_size / min(50000, len(data))) * len(data)
pool, pool_size = profiler_utils.generate_pool(
Expand Down Expand Up @@ -2990,7 +2995,7 @@ def tqdm(level: set[int]) -> Generator[int, None, None]:
# Process and label the data
notification_str = "Calculating the statistics... "
pool = None
if self.options.multiprocess.is_enabled:
if auto_multiprocess_toggle:
pool, pool_size = profiler_utils.generate_pool(4, est_data_size)
if pool:
notification_str += " (with " + str(pool_size) + " processes)"
Expand Down
28 changes: 28 additions & 0 deletions dataprofiler/profilers/profiler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,34 @@ def partition(data: list, chunk_size: int) -> Generator[list, None, Any]:
yield data[idx : idx + chunk_size]


def auto_multiprocess_toggle(
data: DataFrame,
num_rows_threshold: int = 750000,
num_cols_threshold: int = 20,
) -> bool:
"""
Automate multiprocessing toggle depending on dataset sizes.
:param data: a dataset
:type data: pandas.DataFrame
:param num_rows_threshold: threshold for number of rows to
use multiprocess
:type num_rows_threshold: int
:param num_cols_threshold: threshold for number of columns
to use multiprocess
:type num_cols_threshold: int
:return: recommended option.multiprocess.is_enabled value
:rtype: bool
"""
# If the number of rows or columns exceed their respective threshold,
# we want to turn on multiprocessing
if data.shape[0] > num_rows_threshold or data.shape[1] > num_cols_threshold:
return True
# Otherwise, we do not turn on multiprocessing
else:
return False


def suggest_pool_size(data_size: int = None, cols: int = None) -> int | None:
"""
Suggest the pool size based on resources.
Expand Down
49 changes: 49 additions & 0 deletions dataprofiler/tests/profilers/test_profiler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,52 @@ def test_odd_merge_profile_list(self, mock_data_labeler, *mocks):

self.assertEqual(1, single_report["data_stats"][0]["statistics"]["min"])
self.assertEqual(60.0, single_report["data_stats"][0]["statistics"]["max"])


class TestAutoMultiProcessToggle(unittest.TestCase):

"""
Validate profile_utils.auto_multiprocess_toggle is properly working.
"""

def test_auto_multiprocess_toggle(self):
rows_threshold = 5
cols_threshold = 10

# Test for no multiprocessing for sufficiently small datasets
data = pd.DataFrame(np.random.random((2, 5)))
self.assertFalse(
profiler_utils.auto_multiprocess_toggle(
data, rows_threshold, cols_threshold
)
)
data = pd.DataFrame(np.random.random((5, 10)))
self.assertFalse(
profiler_utils.auto_multiprocess_toggle(
data, rows_threshold, cols_threshold
)
)

# Test for multiprocessing with only rows passing threshold
data = pd.DataFrame(np.random.random((6, 10)))
self.assertTrue(
profiler_utils.auto_multiprocess_toggle(
data, rows_threshold, cols_threshold
)
)

# Test for multiprocessing with only columns passing threshold
data = pd.DataFrame(np.random.random((5, 11)))
self.assertTrue(
profiler_utils.auto_multiprocess_toggle(
data, rows_threshold, cols_threshold
)
)

# Test for multiprocessing with both rows and columns passing threshold
data = pd.DataFrame(np.random.random((6, 11)))
self.assertTrue(
profiler_utils.auto_multiprocess_toggle(
data, rows_threshold, cols_threshold
)
)