Skip to content

Commit

Permalink
try to implement right merge
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed May 10, 2024
1 parent e089cd6 commit b380dfd
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 31 deletions.
5 changes: 3 additions & 2 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import pandas.core.resample
from pandas._typing import DtypeBackend, IndexLabel, Suffixes
from pandas.core.dtypes.common import is_number, is_scalar
from typing_extensions import Self

from modin.config import StorageFormat
from modin.core.dataframe.algebra.default2pandas import (
Expand Down Expand Up @@ -150,7 +151,7 @@ def __wrap_in_qc(self, obj):
else:
return obj

def default_to_pandas(self, pandas_op, *args, **kwargs):
def default_to_pandas(self, pandas_op, *args, **kwargs) -> Self:
"""
Do fallback to pandas for the passed function.
Expand Down Expand Up @@ -4459,7 +4460,7 @@ def write_items(df, broadcasted_items):
# END Abstract methods for QueryCompiler

@cached_property
def __constructor__(self) -> type[BaseQueryCompiler]:
def __constructor__(self) -> type[Self]:
"""
Get query compiler constructor.
Expand Down
60 changes: 39 additions & 21 deletions modin/core/storage_formats/pandas/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional, Union

import pandas
from pandas.core.dtypes.common import is_list_like
Expand Down Expand Up @@ -103,7 +103,7 @@ def func(left, right):
@classmethod
def row_axis_merge(
cls, left: PandasQueryCompiler, right: PandasQueryCompiler, kwargs: dict
):
) -> PandasQueryCompiler:
"""
Execute merge using row-axis implementation.
Expand All @@ -126,17 +126,25 @@ def row_axis_merge(
right_index = kwargs.get("right_index", False)
sort = kwargs.get("sort", False)

if how in ["left", "inner"] and left_index is False and right_index is False:
if (

Check warning on line 129 in modin/core/storage_formats/pandas/merge.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/merge.py#L129

Added line #L129 was not covered by tests
(
how in ["left", "inner"]
or (how == "right" and right._modin_frame._partitions.size != 0)
)
and left_index is False
and right_index is False
):
kwargs["sort"] = False

reverted = False
if (
how == "inner" and right._modin_frame._partitions.size != 0
): # and left._modin_frame._partitions.shape[0] == 1:
if how == "right":
left, right = right, left
reverted = True

Check warning on line 142 in modin/core/storage_formats/pandas/merge.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/merge.py#L139-L142

Added lines #L139 - L142 were not covered by tests

def should_keep_index(left, right):
def should_keep_index(

Check warning on line 144 in modin/core/storage_formats/pandas/merge.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/merge.py#L144

Added line #L144 was not covered by tests
left: Union[PandasQueryCompiler, pandas.DataFrame],
right: Union[PandasQueryCompiler, pandas.DataFrame],
) -> bool:
keep_index = False
if left_on is not None and right_on is not None:
keep_index = any(
Expand All @@ -159,7 +167,7 @@ def map_func(
else:
df = pandas.merge(left, right, **kwargs)

if kwargs["how"] == "left":
if kwargs["how"] in ("left", "right"):
partition_idx = service_kwargs["partition_idx"]
if len(axis_lengths):
if not should_keep_index(left, right):
Expand All @@ -180,15 +188,16 @@ def map_func(
on = list(on) if is_list_like(on) else [on]

right_to_broadcast = right._modin_frame.combine()
if not reverted:
new_columns, new_dtypes = cls._compute_result_metadata(
left,
right,
on,
left_on,
right_on,
kwargs.get("suffixes", ("_x", "_y")),
)
# if not reverted:
new_columns, new_dtypes = cls._compute_result_metadata(
left,
right,
on,
left_on,
right_on,
kwargs.get("suffixes", ("_x", "_y") if not reverted else ("_y", "_x")),
)
"""
else:
new_columns, new_dtypes = cls._compute_result_metadata(
right,
Expand All @@ -198,6 +207,7 @@ def map_func(
left_on,
kwargs.get("suffixes", ("_x", "_y")),
)
"""

# We rebalance when the ratio of the number of existing partitions to
# the ideal number of partitions is smaller than this threshold. The
Expand All @@ -213,7 +223,7 @@ def map_func(
left._modin_frame.broadcast_apply_full_axis(
axis=1,
func=map_func,
enumerate_partitions=how == "left",
enumerate_partitions=how in ("left", "right"),
other=right_to_broadcast,
# We're going to explicitly change the shape across the 1-axis,
# so we want for partitioning to adapt as well
Expand All @@ -224,7 +234,7 @@ def map_func(
new_columns=new_columns,
sync_labels=False,
dtypes=new_dtypes,
pass_axis_lengths_to_partitions=how == "left",
pass_axis_lengths_to_partitions=how in ("left", "right"),
)
)

Expand Down Expand Up @@ -265,14 +275,22 @@ def map_func(

return (
new_left.reset_index(drop=True)
if not keep_index and (kwargs["how"] != "left" or sort)
if not keep_index and (kwargs["how"] not in ("left", "right") or sort)
else new_left
)
else:
return left.default_to_pandas(pandas.DataFrame.merge, right, **kwargs)

@classmethod
def _compute_result_metadata(cls, left, right, on, left_on, right_on, suffixes):
def _compute_result_metadata(
cls,
left: PandasQueryCompiler,
right: PandasQueryCompiler,
on,
left_on,
right_on,
suffixes,
) -> tuple[Optional[pandas.Index], Optional[ModinDtypes]]:
"""
Compute columns and dtypes metadata for the result of merge if possible.
Expand Down
8 changes: 5 additions & 3 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
queries for the ``PandasDataframe``.
"""

from __future__ import annotations

import ast
import hashlib
import re
Expand Down Expand Up @@ -313,8 +315,8 @@ def from_dataframe(cls, df, data_cls):

# END Dataframe exchange protocol

index = property(_get_axis(0), _set_axis(0))
columns = property(_get_axis(1), _set_axis(1))
index: pandas.Index = property(_get_axis(0), _set_axis(0))
columns: pandas.Index = property(_get_axis(1), _set_axis(1))

@property
def dtypes(self):
Expand Down Expand Up @@ -586,7 +588,7 @@ def reindex(self, axis, labels, **kwargs):
)
return self.__constructor__(new_modin_frame)

def reset_index(self, **kwargs):
def reset_index(self, **kwargs) -> PandasQueryCompiler:
if self.lazy_execution:

def _reset(df, *axis_lengths, partition_idx): # pragma: no cover
Expand Down
9 changes: 4 additions & 5 deletions modin/tests/pandas/dataframe/test_join_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def test_merge(test_data, test_data2):
index=pandas.Index([i for i in range(1, test_data2.shape[0] + 1)], name="key"),
)

hows = ["left", "inner"]
hows = ["left", "inner", "right"]
ons = ["col33", ["col33", "col34"]]
sorts = [False, True]
for i in range(2):
Expand Down Expand Up @@ -398,11 +398,10 @@ def test_merge(test_data, test_data2):
modin_df.merge("Non-valid type")


def test_merge_empty():
@pytest.mark.parametrize("how", ["inner", "right"])
def test_merge_empty(how):
data = np.random.uniform(0, 100, size=(2**6, 2**6))
pandas_df = pandas.DataFrame(data)
modin_df = pd.DataFrame(data)
eval_general(modin_df, pandas_df, lambda df: df.merge(df.iloc[:0]))
eval_general(*create_test_dfs(data), lambda df: df.merge(df.iloc[:0], how=how))


def test_merge_with_mi_columns():
Expand Down

0 comments on commit b380dfd

Please sign in to comment.