diff --git a/python/ray/data/_internal/arrow_block.py b/python/ray/data/_internal/arrow_block.py index 2078aea57575d..dbdc3738d9ed9 100644 --- a/python/ray/data/_internal/arrow_block.py +++ b/python/ray/data/_internal/arrow_block.py @@ -1,3 +1,4 @@ +import bisect import collections import heapq import random @@ -24,7 +25,7 @@ is_valid_udf_return, ) from ray.data._internal.table_block import TableBlockAccessor, TableBlockBuilder -from ray.data._internal.util import _truncated_repr, find_partitions +from ray.data._internal.util import _truncated_repr from ray.data.block import ( Block, BlockAccessor, @@ -426,7 +427,41 @@ def sort_and_partition( if len(boundaries) == 0: return [table] - return find_partitions(table, boundaries, sort_key) + partitions = [] + # For each boundary value, count the number of items that are less + # than it. Since the block is sorted, these counts partition the items + # such that boundaries[i] <= x < boundaries[i + 1] for each x in + # partition[i]. If `descending` is true, `boundaries` would also be + # in descending order and we only need to count the number of items + # *greater than* the boundary value instead. + + def find_partition_index(records, boundary, sort_key): + if sort_key.get_descending(): + return len(records) - bisect.bisect_left(records[::-1], boundary) + else: + return bisect.bisect_left(records, boundary) + + def searchsorted(table, boundaries, sort_key): + records = [ + tuple(d.values()) + for d in transform_pyarrow.to_pylist( + table.select(sort_key.get_columns()) + ) + ] + return [ + find_partition_index(records, boundary, sort_key) + for boundary in boundaries + ] + + bounds = searchsorted(table, boundaries, sort_key) + + partitions = [] + last_idx = 0 + for idx in bounds: + partitions.append(table.slice(last_idx, idx - last_idx)) + last_idx = idx + partitions.append(table.slice(last_idx)) + return partitions def combine(self, key: str, aggs: Tuple["AggregateFn"]) -> Block: """Combine rows with the same key into an accumulator. diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index 40164d6add858..badae9aed3fef 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -286,3 +286,17 @@ def combine_chunks(table: "pyarrow.Table") -> "pyarrow.Table": arr = col.combine_chunks() new_cols.append(arr) return pyarrow.Table.from_arrays(new_cols, schema=table.schema) + + +def to_pylist(table: "pyarrow.Table") -> "pyarrow.Table": + """Convert the Table to a list of rows / dictionaries. + + Required for compatibility with Arrow 6. + """ + pydict = table.to_pydict() + names = table.schema.names + pylist = [ + {column: pydict[column][row] for column in names} + for row in range(table.num_rows) + ] + return pylist diff --git a/python/ray/data/_internal/pandas_block.py b/python/ray/data/_internal/pandas_block.py index fb10f9c225778..4c3b204cc3079 100644 --- a/python/ray/data/_internal/pandas_block.py +++ b/python/ray/data/_internal/pandas_block.py @@ -1,3 +1,4 @@ +import bisect import collections import heapq from typing import ( @@ -17,7 +18,6 @@ from ray.air.constants import TENSOR_COLUMN_NAME from ray.data._internal.table_block import TableBlockAccessor, TableBlockBuilder -from ray.data._internal.util import find_partitions from ray.data.block import ( Block, BlockAccessor, @@ -357,7 +357,38 @@ def sort_and_partition( if len(boundaries) == 0: return [table] - return find_partitions(table, boundaries, sort_key) + partitions = [] + # For each boundary value, count the number of items that are less + # than it. Since the block is sorted, these counts partition the items + # such that boundaries[i] <= x < boundaries[i + 1] for each x in + # partition[i]. If `descending` is true, `boundaries` would also be + # in descending order and we only need to count the number of items + # *greater than* the boundary value instead. + + def find_partition_index(records, boundary, sort_key): + if sort_key.get_descending(): + return len(records) - bisect.bisect_left(records[::-1], boundary) + else: + return bisect.bisect_left(records, boundary) + + def searchsorted(table, boundaries, sort_key): + records = list( + table[sort_key.get_columns()].itertuples(index=False, name=None) + ) + + return [ + find_partition_index(records, boundary, sort_key) + for boundary in boundaries + ] + + bounds = searchsorted(table, boundaries, sort_key) + + last_idx = 0 + for idx in bounds: + partitions.append(table[last_idx:idx]) + last_idx = idx + partitions.append(table[last_idx:]) + return partitions def combine(self, key: str, aggs: Tuple["AggregateFn"]) -> "pandas.DataFrame": """Combine rows with the same key into an accumulator. diff --git a/python/ray/data/_internal/util.py b/python/ray/data/_internal/util.py index 5e06192d362bf..4aaddd437b4ba 100644 --- a/python/ray/data/_internal/util.py +++ b/python/ray/data/_internal/util.py @@ -19,7 +19,6 @@ import pyarrow from ray.data._internal.compute import ComputeStrategy - from ray.data._internal.sort import SortKey from ray.data.block import Block, BlockMetadata, UserDefinedFunction from ray.data.datasource import Reader from ray.util.placement_group import PlacementGroup @@ -516,69 +515,6 @@ def unify_block_metadata_schema( return None -def find_partition_index( - table: Union["pyarrow.Table", "pandas.DataFrame"], - desired: List[Any], - sort_key: "SortKey", -) -> int: - columns = sort_key.get_columns() - descending = sort_key.get_descending() - - left, right = 0, len(table) - for i in range(len(desired)): - if left == right: - return right - col_name = columns[i] - col_vals = table[col_name].to_numpy()[left:right] - desired_val = desired[i] - - prevleft = left - if descending is True: - left = prevleft + ( - len(col_vals) - - np.searchsorted( - col_vals, - desired_val, - side="right", - sorter=np.arange(len(col_vals) - 1, -1, -1), - ) - ) - right = prevleft + ( - len(col_vals) - - np.searchsorted( - col_vals, - desired_val, - side="left", - sorter=np.arange(len(col_vals) - 1, -1, -1), - ) - ) - else: - left = prevleft + np.searchsorted(col_vals, desired_val, side="left") - right = prevleft + np.searchsorted(col_vals, desired_val, side="right") - return right - - -def find_partitions(table, boundaries, sort_key): - partitions = [] - - # For each boundary value, count the number of items that are less - # than it. Since the block is sorted, these counts partition the items - # such that boundaries[i] <= x < boundaries[i + 1] for each x in - # partition[i]. If `descending` is true, `boundaries` would also be - # in descending order and we only need to count the number of items - # *greater than* the boundary value instead. - bounds = [ - find_partition_index(table, boundary, sort_key) for boundary in boundaries - ] - - last_idx = 0 - for idx in bounds: - partitions.append(table[last_idx:idx]) - last_idx = idx - partitions.append(table[last_idx:]) - return partitions - - def get_attribute_from_class_name(class_name: str) -> Any: """Get Python attribute from the provided class name.