Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#5394: Reduce amount of remote calls for TreeReduce and GroupByReduce operators #7245

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 7 additions & 41 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import CpuCount, Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -2205,46 +2205,12 @@ def map(
PandasDataframe
A new dataframe.
"""
if self.num_parts <= 1.5 * CpuCount.get():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was the implementation moved to a lower level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was moved here to avoid duplicating logic, and map_partitions in the partition manager is only used in these cases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, your implementation was also used instead of self._partition_mgr_cls.lazy_map_partitions function, under some condition, but now it is not. Is that how it was intended?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tree_reduce and groupby_reduce call map_partitions at the partition mgr. That's why @Retribution98 moved the logic there I guess.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see it. I think lazy map will be better in a lazy pipeline because some partitions may not be calculated further, so this issue is not relevant for this case.

# block-wise map
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
else:
# axis-wise map
# we choose an axis for a combination of partitions
# whose size is closer to the number of CPUs
if abs(self._partitions.shape[0] - CpuCount.get()) < abs(
self._partitions.shape[1] - CpuCount.get()
):
axis = 1
else:
axis = 0

column_splits = CpuCount.get() // self._partitions.shape[1]

if axis == 0 and column_splits > 1:
# splitting by parts of columnar partitions
new_partitions = (
self._partition_mgr_cls.map_partitions_joined_by_column(
self._partitions, column_splits, func, func_args, func_kwargs
)
)
else:
# splitting by full axis partitions
new_partitions = self._partition_mgr_cls.map_axis_partitions(
axis,
self._partitions,
lambda df: func(
df,
*(func_args if func_args is not None else ()),
**(func_kwargs if func_kwargs is not None else {}),
),
keep_partitioning=True,
)
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)

if new_columns is not None and self.has_materialized_columns:
assert len(new_columns) == len(
Expand Down
58 changes: 46 additions & 12 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from modin.config import (
BenchmarkMode,
CpuCount,
Engine,
MinPartitionSize,
NPartitions,
Expand Down Expand Up @@ -621,20 +622,53 @@ def map_partitions(
NumPy array
An array of partitions
"""
preprocessed_map_func = cls.preprocess_func(map_func)
return np.array(
[
if np.prod(partitions.shape) <= 1.5 * CpuCount.get():
# block-wise map
preprocessed_map_func = cls.preprocess_func(map_func)
new_partitions = np.array(
[
part.apply(
preprocessed_map_func,
*func_args if func_args is not None else (),
**func_kwargs if func_kwargs is not None else {},
)
for part in row_of_parts
[
part.apply(
preprocessed_map_func,
*func_args if func_args is not None else (),
**func_kwargs if func_kwargs is not None else {},
)
for part in row_of_parts
]
for row_of_parts in partitions
]
for row_of_parts in partitions
]
)
)
else:
# axis-wise map
# we choose an axis for a combination of partitions
# whose size is closer to the number of CPUs
if abs(partitions.shape[0] - CpuCount.get()) < abs(
partitions.shape[1] - CpuCount.get()
):
axis = 1
else:
axis = 0

column_splits = CpuCount.get() // partitions.shape[1]

if axis == 0 and column_splits > 1:
# splitting by parts of columnar partitions
new_partitions = cls.map_partitions_joined_by_column(
partitions, column_splits, map_func, func_args, func_kwargs
)
else:
# splitting by full axis partitions
new_partitions = cls.map_axis_partitions(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using map_axis_partitions function in map_partitions function does not seem obvious and defeats the purpose of map_partitions function, which declares that it applies to every partition.

The dataframe level for choosing a suitable strategy seems more appropriate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since partition mgr rather than core df is designed to play around with partitions, maybe we should just update the docstring?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I added base_map_partitions to keep the simplest implementation, but by default we will use new approaches. Are you agree?

axis,
partitions,
lambda df: map_func(
df,
*(func_args if func_args is not None else ()),
**(func_kwargs if func_kwargs is not None else {}),
),
keep_partitioning=True,
)
return new_partitions

@classmethod
@wait_computations_if_benchmark_mode
Expand Down
3 changes: 2 additions & 1 deletion modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2657,14 +2657,15 @@ def test_map_approaches(partitioning_scheme, expected_map_approach):
df = pandas.DataFrame(data)

modin_df = construct_modin_df_by_scheme(df, partitioning_scheme(df))
partitions = modin_df._query_compiler._modin_frame._partitions
partition_mgr_cls = modin_df._query_compiler._modin_frame._partition_mgr_cls

with mock.patch.object(
partition_mgr_cls,
expected_map_approach,
wraps=getattr(partition_mgr_cls, expected_map_approach),
) as expected_method:
try_cast_to_pandas(modin_df.map(lambda x: x * 2))
partition_mgr_cls.map_partitions(partitions, lambda x: x * 2)
expected_method.assert_called()


Expand Down