Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#4494: Get partition widths/lengths in parallel instead of serially #4683

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2457,6 +2457,9 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
base_frame._partitions,
make_reindexer(do_reindex_base, base_frame_idx),
)
base_frame._partition_mgr_cls._update_partition_dimension_caches(
reindexed_base[0] if axis else reindexed_base.T[0]
)
if axis:
base_lengths = [obj.width() for obj in reindexed_base[0]]
else:
Expand Down
67 changes: 67 additions & 0 deletions modin/core/dataframe/pandas/partitioning/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from abc import ABC
from copy import copy
from typing import Union, Any

import pandas
from pandas.api.types import is_scalar
Expand Down Expand Up @@ -272,6 +273,24 @@ def length(self):
-------
int
The length of the object.

Notes
-----
Subclasses where `build_length_cache` returns a future-like object instead of a concrete
value should override this method to force the future's materialization.
"""
return self.build_length_cache()

def build_length_cache(self) -> Union[Any, int]:
"""
Attempt to set this partition's length cache, and return it.

Returns
-------
Any | int
Either a future-like object representing the length of the object wrapped by this
partition, or the concrete value of the length if it was already cached or was
just computed.
"""
if self._length_cache is None:
cls = type(self)
Expand All @@ -280,6 +299,21 @@ def length(self):
self._length_cache = self.apply(preprocessed_func)
return self._length_cache

def set_length_cache(self, length: int):
"""
Attempt to set this partition's length cache field.

This should be used in situations where the futures returned by ``build_length_cache``
for multiple partitions were computed in parallel, and the value now needs to be
propagated back to this partition.

Parameters
----------
length : int
The new value of the length cache.
"""
self._length_cache = length

def width(self):
"""
Get the width of the object wrapped by the partition.
Expand All @@ -288,6 +322,24 @@ def width(self):
-------
int
The width of the object.

Notes
-----
Subclasses where `build_width_cache` returns a future-like object instead of a concrete
int should override this method to force the future's materialization.
"""
return self.build_width_cache()

def build_width_cache(self) -> Union[Any, int]:
"""
Attempt to set this partition's width cache, and return it.

Returns
-------
Any | int
Either a future-like object representing the width of the object wrapped by this
partition, or the concrete value of the width if it was already cached or was
just computed.
"""
if self._width_cache is None:
cls = type(self)
Expand All @@ -296,6 +348,21 @@ def width(self):
self._width_cache = self.apply(preprocessed_func)
return self._width_cache

def set_width_cache(self, width: int):
"""
Attempt to set this partition's width cache field.

This should be used in situations where the futures returned by ``build_width_cache``
for multiple partitions were computed in parallel, and the value now needs to be
propagated back to this partition.

Parameters
----------
width : int
The new value of the width cache.
"""
self._width_cache = width

@classmethod
def empty(cls):
"""
Expand Down
32 changes: 32 additions & 0 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,25 @@ def get_indices(cls, axis, partitions, index_func=None):
total_idx = new_idx[0].append(new_idx[1:]) if new_idx else new_idx
return total_idx, new_idx

@classmethod
def _update_partition_dimension_caches(cls, partitions: np.ndarray):
"""
Build and set the length and width caches of each _physical_ partition in the given list of partitions.

Parameters
----------
partitions : np.ndarray
The partitions for which to update length caches.

Notes
-----
For backends that support parallel computations, these caches are be computed asynchronously.
The naive implementation computes the length and width caches in serial.
"""
for part in partitions:
part.build_width_cache()
part.build_length_cache()

@classmethod
def _apply_func_to_list_of_partitions_broadcast(
cls, func, partitions, other, **kwargs
Expand Down Expand Up @@ -1210,6 +1229,19 @@ def apply_func_to_indices_both_axis(
if col_widths is None:
col_widths = [None] * len(col_partitions_list)

if row_lengths is None and col_widths is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to compute dimensions here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The length and width values of each partition are accessed in the local compute_part_size, defined immediately below. The double for loop structure where compute_part_size is called makes it hard to parallelize the computation of these dimensions, so I thought it would be simplest to precompute the relevant dimensions before the loop.

# Before anything else, compute length/widths of each partition (possibly in parallel)
all_parts = np.array(
[
[
partition_copy[row_blk_idx, col_blk_idx]
for row_blk_idx, _ in row_partitions_list
]
for col_blk_idx, _ in col_partitions_list
]
).flatten()
cls._update_partition_dimension_caches(all_parts)

def compute_part_size(indexer, remote_part, part_idx, axis):
"""Compute indexer length along the specified axis for the passed partition."""
if isinstance(indexer, slice):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,35 +56,30 @@ def _get_partition_size_along_axis(self, partition, axis=0):
Returns
-------
list
A list of lengths along the specified axis that sum to the overall length of the partition
along the specified axis.
A list of Dask futures representing lengths along the specified axis that sum to
the overall length of the partition along the specified axis.

Notes
-----
This utility function is used to ensure that computation occurs asynchronously across all partitions
whether the partitions are virtual or physical partitions.
"""

def len_fn(df):
return len(df) if not axis else len(df.columns)

if isinstance(partition, self._partition_mgr_cls._partition_class):
return [
partition.apply(
lambda df: len(df) if not axis else len(df.columns)
)._data
]
return [partition.apply(len_fn)._data]
elif partition.axis == axis:
return [
ptn.apply(lambda df: len(df) if not axis else len(df.columns))._data
for ptn in partition.list_of_block_partitions
ptn.apply(len_fn)._data for ptn in partition.list_of_block_partitions
]
return [
partition.list_of_block_partitions[0]
.apply(lambda df: len(df) if not axis else (len(df.columns)))
._data
]
return [partition.list_of_block_partitions[0].apply(len_fn)._data]

@property
def _row_lengths(self):
"""
Compute ther row partitions lengths if they are not cached.
Compute the row partitions lengths if they are not cached.

Returns
-------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""Module houses class that wraps data (block partition) and its metadata."""

from typing import Union

from distributed import Future
from distributed.utils import get_ip
from dask.distributed import wait
Expand Down Expand Up @@ -256,12 +258,25 @@ def length(self):
int
The length of the object.
"""
if self._length_cache is None:
self._length_cache = self.apply(lambda df: len(df))._data
self.build_length_cache()
if isinstance(self._length_cache, Future):
self._length_cache = DaskWrapper.materialize(self._length_cache)
return self._length_cache

def build_length_cache(self) -> Union[Future, int]:
"""
Attempt to set this partition's length cache, and return it.

Returns
-------
distributed.Future | int
Either a Dask future representing the length of the object wrapped by this partition,
or the concrete value of the length if it was already cached.
"""
if self._length_cache is None:
self._length_cache = self.apply(lambda df: len(df))._data
return self._length_cache

def width(self):
"""
Get the width of the object wrapped by the partition.
Expand All @@ -271,12 +286,25 @@ def width(self):
int
The width of the object.
"""
if self._width_cache is None:
self._width_cache = self.apply(lambda df: len(df.columns))._data
self.build_width_cache()
if isinstance(self._width_cache, Future):
self._width_cache = DaskWrapper.materialize(self._width_cache)
return self._width_cache

def build_width_cache(self) -> Union[Future, int]:
"""
Attempt to set this partition's width cache, and return it.

Returns
-------
distributed.Future | int
Either a Dask future representing the length of the object wrapped by this partition,
or the concrete value of the length if it was already cached.
"""
if self._width_cache is None:
self._width_cache = self.apply(lambda df: len(df.columns))._data
return self._width_cache

def ip(self):
"""
Get the node IP address of the object wrapped by this partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,20 @@ def length(self):
"""
if self._length_cache is None:
if self.axis == 0:
caches = [
obj.build_length_cache()
for obj in self.list_of_partitions_to_combine
]
new_lengths = DaskWrapper.materialize(
[cache for cache in caches if isinstance(cache, Future)]
)
dask_idx = 0
for i, cache in enumerate(caches):
if isinstance(cache, Future):
self.list_of_partitions_to_combine[i].set_length_cache(
new_lengths[dask_idx]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this just be i as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since new_lengths may have fewer elements than caches in the case where some length values were already computed (and are filtered out by the isinstance(cache, Future) check). The value computed at new_lengths[dask_idx] should correspond to the promise at caches[i].

)
dask_idx += 1
self._length_cache = sum(
obj.length() for obj in self.list_of_block_partitions
)
Expand All @@ -402,6 +416,20 @@ def width(self):
"""
if self._width_cache is None:
if self.axis == 1:
caches = [
obj.build_width_cache()
for obj in self.list_of_partitions_to_combine
]
new_widths = DaskWrapper.materialize(
[cache for cache in caches if isinstance(cache, Future)]
)
dask_idx = 0
for i, cache in enumerate(caches):
if isinstance(cache, Future):
self.list_of_partitions_to_combine[i].set_width_cache(
new_widths[dask_idx]
)
dask_idx += 1
self._width_cache = sum(
obj.width() for obj in self.list_of_block_partitions
)
Expand Down