Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#6556: do window block-wise #6290

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
141 changes: 132 additions & 9 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2232,8 +2232,8 @@ def map(
def window(
self,
axis: Union[int, Axis],
reduce_fn: Callable,
window_size: int,
reduce_fn: Callable,
result_schema: Optional[Dict[Hashable, type]] = None,
) -> "PandasDataframe":
"""
Expand All @@ -2244,9 +2244,9 @@ def window(
axis : int or modin.core.dataframe.base.utils.Axis
The axis to slide over.
reduce_fn : callable(rowgroup|colgroup) -> row|col
The reduce function to apply over the data.
The reduce function to apply over the data. This must not change metadata.
window_size : int
The number of row/columns to pass to the function.
The number of rows/columns to pass to the reduce function.
(The size of the sliding window).
result_schema : dict, optional
Mapping from column labels to data types that represents the types of the output dataframe.
Expand All @@ -2256,13 +2256,136 @@ def window(
PandasDataframe
A new PandasDataframe with the reduce function applied over windows of the specified
axis.

Notes
-----
The user-defined reduce function must reduce each window’s column
(row if axis=1) down to a single value.
"""
pass

axis = Axis(axis)

# applies reduction function over entire virtual partition
def window_function_complete(virtual_partition):
# have to copy the pandas dataframe on ray because it's immutable
virtual_partition_copy = virtual_partition.copy()
window_result = reduce_fn(virtual_partition_copy)
return window_result

# applies reduction function over one window of virtual partition
def window_function_partition(virtual_partition):
virtual_partition_copy = virtual_partition.copy()
christinafan marked this conversation as resolved.
Show resolved Hide resolved
window_result = reduce_fn(virtual_partition_copy)
return (
window_result.iloc[:, window_size - 1 :]
if axis == Axis.COL_WISE
else window_result.iloc[window_size - 1 :, :]
)

num_parts = (
len(self._partitions[0]) if axis == Axis.COL_WISE else len(self._partitions)
)
results = []

for i in range(num_parts):
# get the ith partition
starting_part = (
self._partitions[:, [i]]
if axis == Axis.COL_WISE
else self._partitions[i]
)

# partitions to join in virtual partition
parts_to_join = (
[starting_part]
if (axis == Axis.ROW_WISE)
else [[partition[0]] for partition in starting_part]
)

# used to determine if window continues into next partition or if we can create virtual partition
last_window_span = window_size - 1
k = i + 1

while last_window_span > 0 and k < num_parts:
# new partition
new_parts = (
self._partitions[:, [k]]
if axis == Axis.COL_WISE
else self._partitions[k]
)
part_len = (
new_parts[0][0].width()
if axis == Axis.COL_WISE
else new_parts[0].length()
)

if last_window_span <= part_len:
if axis == Axis.COL_WISE:
masked_new_parts = [
[
part[0].mask(
row_labels=slice(None),
col_labels=slice(0, last_window_span),
)
]
for part in new_parts
]
for x, r in enumerate(parts_to_join):
r.append(masked_new_parts[x][0])
else:
masked_new_parts = np.array(
[
part.mask(
row_labels=slice(0, last_window_span),
col_labels=slice(None),
)
for part in new_parts
]
)
parts_to_join.append(masked_new_parts)
break
christinafan marked this conversation as resolved.
Show resolved Hide resolved
else:
# window continues into next part, so just add this part to parts_to_join
if axis == Axis.COL_WISE:
for x, r in enumerate(parts_to_join):
r.append(new_parts[x][0])
else:
parts_to_join.append(new_parts)
last_window_span -= part_len
k += 1

# create virtual partition and perform window operation
virtual_partitions = (
self._partition_mgr_cls.row_partitions(
np.array(parts_to_join), full_axis=False
)
if axis == Axis.COL_WISE
else self._partition_mgr_cls.column_partitions(
np.array(parts_to_join), full_axis=False
)
)

if i == 0:
reduce_result = [
virtual_partition.apply(window_function_complete)
for virtual_partition in virtual_partitions
]
else:
reduce_result = [
virtual_partition.apply(window_function_partition)
for virtual_partition in virtual_partitions
]

# append reduction result to results array
if axis == Axis.ROW_WISE:
results.append(reduce_result)
else:
if results == []:
results = [[x] for x in reduce_result]
else:
for x, r in enumerate(results):
r.append(reduce_result[x])

results = np.array(results)

return self.__constructor__(
np.array(results), self.index, self.columns, None, None, result_schema
)

@lazy_metadata_decorator(apply_axis="both")
def fold(self, axis, func, new_columns=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def column_partitions(cls, partitions, full_axis=True):
]

@classmethod
def row_partitions(cls, partitions):
def row_partitions(cls, partitions, full_axis=True):
"""
List of `BaseDataframeAxisPartition` objects representing row-wise partitions.

Expand All @@ -250,7 +250,11 @@ def row_partitions(cls, partitions):
"""
if not isinstance(partitions, list):
partitions = [partitions]
return [cls._row_partition_class(row) for frame in partitions for row in frame]
return [
cls._row_partition_class(row, full_axis=full_axis)
for frame in partitions
for row in frame
]

@classmethod
def axis_partition(cls, partitions, axis, full_axis: bool = True):
Expand Down