From aa6e5c23bc98ecddaa67af62e6471b155149ab43 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Tue, 14 May 2024 21:44:23 +0200 Subject: [PATCH] FEAT-#7203: Make sure Modin works correctly with pandas, which uses pyarrow as a backend (#7204) Signed-off-by: Anatoly Myachev Co-authored-by: Iaroslav Igoshev --- modin/core/dataframe/algebra/binary.py | 2 - modin/core/dataframe/base/dataframe/utils.py | 4 +- .../dataframe/pandas/dataframe/dataframe.py | 220 +++++++++++++----- .../core/dataframe/pandas/metadata/dtypes.py | 33 ++- modin/core/dataframe/pandas/metadata/index.py | 5 +- .../pandas/partitioning/partition_manager.py | 10 +- .../pandas_on_dask/dataframe/dataframe.py | 2 + .../pandas_on_python/dataframe/dataframe.py | 2 + .../cudf_on_ray/dataframe/dataframe.py | 2 + .../pandas_on_ray/dataframe/dataframe.py | 2 + .../pandas_on_unidist/dataframe/dataframe.py | 2 + .../storage_formats/base/query_compiler.py | 11 + .../storage_formats/pandas/aggregations.py | 13 +- modin/core/storage_formats/pandas/groupby.py | 6 +- .../storage_formats/pandas/query_compiler.py | 52 ++++- modin/pandas/base.py | 27 +-- modin/pandas/dataframe.py | 11 +- modin/pandas/utils.py | 22 ++ modin/tests/pandas/dataframe/test_binary.py | 8 +- modin/tests/pandas/dataframe/test_default.py | 18 +- modin/tests/pandas/dataframe/test_reduce.py | 8 +- modin/tests/pandas/test_series.py | 16 +- modin/tests/pandas/utils.py | 25 +- 23 files changed, 359 insertions(+), 142 deletions(-) diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 7f098e746ac..f18887e0ab1 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -269,8 +269,6 @@ def try_compute_new_dtypes( try: if infer_dtypes == "bool" or is_bool_dtype(result_dtype): - # FIXME: https://github.com/modin-project/modin/issues/7203 - # can be `pandas.api.types.pandas_dtype("bool[pyarrow]")` depending on the data dtypes = maybe_build_dtypes_series( first, second, dtype=pandas.api.types.pandas_dtype(bool) ) diff --git a/modin/core/dataframe/base/dataframe/utils.py b/modin/core/dataframe/base/dataframe/utils.py index adc159a1a0f..7a1478ca5da 100644 --- a/modin/core/dataframe/base/dataframe/utils.py +++ b/modin/core/dataframe/base/dataframe/utils.py @@ -21,10 +21,10 @@ from enum import Enum from typing import Dict, List, Sequence, Tuple, cast -import numpy as np import pandas from pandas._typing import IndexLabel from pandas.api.types import is_scalar +from pandas.core.dtypes.common import is_integer_dtype class Axis(Enum): # noqa: PR01 @@ -170,7 +170,7 @@ def is_trivial_index(index: pandas.Index) -> bool: return True if isinstance(index, pandas.RangeIndex): return index.start == 0 and index.step == 1 - if not (isinstance(index, pandas.Index) and index.dtype == np.int64): + if not (isinstance(index, pandas.Index) and is_integer_dtype(index)): return False return ( index.is_monotonic_increasing diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index b9e1c18a34a..04952c00f45 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -52,22 +52,26 @@ from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler from modin.core.storage_formats.pandas.utils import get_length_list from modin.error_message import ErrorMessage +from modin.logging import ClassLogger +from modin.logging.config import LogLevel +from modin.pandas.indexing import is_range_like +from modin.pandas.utils import ( + check_both_not_none, + get_pandas_backend, + is_full_grab_slice, +) +from modin.utils import MODIN_UNNAMED_SERIES_LABEL if TYPE_CHECKING: + from pandas._typing import npt + from modin.core.dataframe.base.interchange.dataframe_protocol.dataframe import ( ProtocolDataframe, ) - from pandas._typing import npt from modin.core.dataframe.pandas.partitioning.partition_manager import ( PandasDataframePartitionManager, ) -from modin.logging import ClassLogger -from modin.logging.config import LogLevel -from modin.pandas.indexing import is_range_like -from modin.pandas.utils import check_both_not_none, is_full_grab_slice -from modin.utils import MODIN_UNNAMED_SERIES_LABEL - class PandasDataframe( ClassLogger, modin_layer="CORE-DATAFRAME", log_level=LogLevel.DEBUG @@ -97,13 +101,20 @@ class PandasDataframe( each of the block partitions. Is computed if not provided. dtypes : pandas.Series or callable, optional The data types for the dataframe columns. + pandas_backend : {"pyarrow", None}, optional + Backend used by pandas. """ _partition_mgr_cls: PandasDataframePartitionManager _query_compiler_cls = PandasQueryCompiler # These properties flag whether or not we are deferring the metadata synchronization - _deferred_index = False - _deferred_column = False + _deferred_index: bool = False + _deferred_column: bool = False + + _index_cache: ModinIndex = None + _columns_cache: ModinIndex = None + _dtypes: Optional[ModinDtypes] = None + _pandas_backend: Optional[str] = None @cached_property def __constructor__(self) -> type[PandasDataframe]: @@ -112,7 +123,7 @@ def __constructor__(self) -> type[PandasDataframe]: Returns ------- - PandasDataframe + callable """ return type(self) @@ -123,14 +134,23 @@ def __init__( columns=None, row_lengths=None, column_widths=None, - dtypes=None, + dtypes: Optional[Union[pandas.Series, ModinDtypes, Callable]] = None, + pandas_backend: Optional[str] = None, ): self._partitions = partitions self.set_index_cache(index) self.set_columns_cache(columns) self._row_lengths_cache = row_lengths self._column_widths_cache = column_widths - self.set_dtypes_cache(dtypes) + self._pandas_backend = pandas_backend + if pandas_backend != "pyarrow": + self.set_dtypes_cache(dtypes) + else: + # In this case, the type precomputation may be incorrect; we need + # to know the type algebra precisely. Considering the number of operations + # and different combinations of backends, the best solution would be to + # introduce optimizations gradually, with a large number of tests. + self.set_dtypes_cache(None) self._validate_axes_lengths() self._filter_empties(compute_metadata=False) @@ -399,6 +419,9 @@ def dtypes(self): else: dtypes = self._compute_dtypes() self.set_dtypes_cache(dtypes) + # During materialization, we can find out the backend and, if it + # is suitable, use the ability to pre-calculate types. + self._pandas_backend = get_pandas_backend(dtypes) return dtypes def get_dtypes_set(self): @@ -413,13 +436,13 @@ def get_dtypes_set(self): return self._dtypes.get_dtypes_set() return set(self.dtypes.values) - def _compute_dtypes(self, columns=None): + def _compute_dtypes(self, columns=None) -> pandas.Series: """ Compute the data types via TreeReduce pattern for the specified columns. Parameters ---------- - columns : list-like, default: None + columns : list-like, optional Columns to compute dtypes for. If not specified compute dtypes for all the columns in the dataframe. @@ -458,9 +481,6 @@ def dtype_builder(df): dtypes.name = None return dtypes - _index_cache = None - _columns_cache = None - def set_index_cache(self, index): """ Set index cache. @@ -866,7 +886,7 @@ def synchronize_labels(self, axis=None): Parameters ---------- - axis : int, default: None + axis : int, optional The deferred axis. 0 for the index, 1 for the columns. """ @@ -887,7 +907,7 @@ def _propagate_index_objs(self, axis=None) -> None: Parameters ---------- - axis : int, default: None + axis : int, optional The axis to apply to. If it's None applies to both axes. """ self._filter_empties(compute_metadata=False) @@ -1276,8 +1296,15 @@ def _take_2d_positional( new_dtypes = self.dtypes.iloc[monotonic_col_idx] elif isinstance(self._dtypes, ModinDtypes): try: + supported_monotonic_col_idx = monotonic_col_idx + if isinstance(monotonic_col_idx, slice): + supported_monotonic_col_idx = pandas.RangeIndex( + monotonic_col_idx.start, + monotonic_col_idx.stop, + monotonic_col_idx.step, + ).to_list() new_dtypes = self._dtypes.lazy_get( - monotonic_col_idx, numeric_index=True + supported_monotonic_col_idx, numeric_index=True ) # can raise either on missing cache or on duplicated labels except (ValueError, NotImplementedError): @@ -1310,6 +1337,7 @@ def _take_2d_positional( new_row_lengths, new_col_widths, new_dtypes, + pandas_backend=self._pandas_backend, ) return self._maybe_reorder_labels( @@ -1448,7 +1476,9 @@ def from_labels(self) -> PandasDataframe: new_column_names = pandas.Index(level_names, tupleize_cols=False) new_columns = new_column_names.append(self.columns) - def from_labels_executor(df, **kwargs): + def from_labels_executor( + df: pandas.DataFrame, **kwargs + ) -> pandas.DataFrame: # pragma: no cover # Setting the names here ensures that external and internal metadata always match. df.index.names = new_column_names @@ -1484,6 +1514,7 @@ def from_labels_executor(df, **kwargs): row_lengths=self._row_lengths_cache, column_widths=new_column_widths, dtypes=new_dtypes, + pandas_backend=self._pandas_backend, ) # Set flag for propagating deferred row labels across dataframe partitions result.synchronize_labels(axis=0) @@ -1610,7 +1641,13 @@ def _reorder_labels(self, row_positions=None, col_positions=None): col_idx = self.copy_columns_cache(copy_lengths=True) new_widths = self._column_widths_cache return self.__constructor__( - ordered_cols, row_idx, col_idx, new_lengths, new_widths, new_dtypes + ordered_cols, + row_idx, + col_idx, + new_lengths, + new_widths, + new_dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis=None) @@ -1630,6 +1667,7 @@ def copy(self): self._row_lengths_cache, self._column_widths_cache, self.copy_dtypes_cache(), + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -1731,6 +1769,7 @@ def astype_builder(df): self._row_lengths_cache, self._column_widths_cache, new_dtypes, + pandas_backend=get_pandas_backend(new_dtypes), ) def numeric_columns(self, include_bool=True): @@ -1932,7 +1971,7 @@ def _join_index_objects(axis, indexes, how, sort, fill_value=None): considered to be the first index in the `indexes` list. sort : boolean Whether or not to sort the joined index. - fill_value : any, default: None + fill_value : any, optional Value to use for missing values. Returns @@ -2091,6 +2130,7 @@ def _compute_tree_reduce_metadata(self, axis, new_parts, dtypes=None): *new_axes, *new_axes_lengths, dtypes, + pandas_backend=self._pandas_backend, ) return result @@ -2242,6 +2282,7 @@ def map( self._row_lengths_cache, self._column_widths_cache, dtypes=dtypes, + pandas_backend=self._pandas_backend, ) def window( @@ -2321,6 +2362,7 @@ def fold(self, axis, func, new_columns=None): self.copy_columns_cache(copy_lengths=True), self._row_lengths_cache, self._column_widths_cache, + pandas_backend=self._pandas_backend, ) def infer_objects(self) -> PandasDataframe: @@ -2367,6 +2409,7 @@ def infer_types(self, col_labels: List[str]) -> PandasDataframe: self._row_lengths_cache, self._column_widths_cache, new_dtypes, + pandas_backend=self._pandas_backend, ) def join( @@ -2472,6 +2515,7 @@ def combine_and_apply( self._row_lengths_cache, [len(self.columns)] if self.has_materialized_columns else None, self.copy_dtypes_cache(), + pandas_backend=self._pandas_backend, ) else: modin_frame = self @@ -2775,6 +2819,7 @@ def filter(self, axis: Union[Axis, int], condition: Callable) -> PandasDataframe *new_axes, *new_lengths, self.copy_dtypes_cache() if axis == Axis.COL_WISE else None, + pandas_backend=self._pandas_backend, ) def filter_by_types(self, types: List[Hashable]) -> PandasDataframe: @@ -2828,7 +2873,12 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> PandasDataframe: 1, partitions ) return self.__constructor__( - partitions, new_index, new_columns, row_lengths, column_widths + partitions, + new_index, + new_columns, + row_lengths, + column_widths, + pandas_backend=self._pandas_backend, ) def combine(self) -> PandasDataframe: @@ -2856,6 +2906,7 @@ def combine(self) -> PandasDataframe: else None ), dtypes=self.copy_dtypes_cache(), + pandas_backend=self._pandas_backend, ) result.synchronize_labels() return result @@ -2890,12 +2941,12 @@ def apply_full_axis( new_columns : list-like, optional The columns of the result. We may know this in advance, and if not provided it must be computed. - apply_indices : list-like, default: None + apply_indices : list-like, optional Indices of `axis ^ 1` to apply function over. enumerate_partitions : bool, default: False Whether pass partition index into applied `func` or not. Note that `func` must be able to obtain `partition_idx` kwarg. - dtypes : list-like, optional + dtypes : list-like or scalar, optional The data types of the result. This is an optimization because there are functions that always result in a particular data type, and allows us to avoid (re)computing it. @@ -2949,7 +3000,7 @@ def apply_full_axis_select_indices( new_index=None, new_columns=None, keep_remaining=False, - new_dtypes=None, + new_dtypes: Optional[Union[pandas.Series, ModinDtypes]] = None, ): """ Apply a function across an entire axis for a subset of the data. @@ -2960,9 +3011,9 @@ def apply_full_axis_select_indices( The axis to apply over. func : callable The function to apply. - apply_indices : list-like, default: None + apply_indices : list-like, optional The labels to apply over. - numeric_indices : list-like, default: None + numeric_indices : list-like, optional The indices to apply over. new_index : list-like, optional The index of the result. We may know this in advance, @@ -3005,7 +3056,13 @@ def apply_full_axis_select_indices( if new_columns is None: new_columns = self.columns if axis == 0 else None return self.__constructor__( - new_partitions, new_index, new_columns, None, None, dtypes=new_dtypes + new_partitions, + new_index, + new_columns, + None, + None, + dtypes=new_dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -3018,10 +3075,10 @@ def apply_select_indices( col_labels=None, new_index=None, new_columns=None, - new_dtypes=None, + new_dtypes: Optional[pandas.Series] = None, keep_remaining=False, item_to_distribute=no_default, - ): + ) -> PandasDataframe: """ Apply a function for a subset of the data. @@ -3031,12 +3088,12 @@ def apply_select_indices( The axis to apply over. func : callable The function to apply. - apply_indices : list-like, default: None + apply_indices : list-like, optional The labels to apply over. Must be given if axis is provided. - row_labels : list-like, default: None + row_labels : list-like, optional The row labels to apply over. Must be provided with `col_labels` to apply over both axes. - col_labels : list-like, default: None + col_labels : list-like, optional The column labels to apply over. Must be provided with `row_labels` to apply over both axes. new_index : list-like, optional @@ -3100,6 +3157,7 @@ def apply_select_indices( lengths_objs[0], lengths_objs[1], new_dtypes, + pandas_backend=self._pandas_backend, ) else: # We are applying over both axes here, so make sure we have all the right @@ -3127,6 +3185,7 @@ def apply_select_indices( self._row_lengths_cache, self._column_widths_cache, new_dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -3163,7 +3222,7 @@ def broadcast_apply( labels : {"keep", "replace", "drop"}, default: "keep" Whether keep labels from `self` Modin DataFrame, replace them with labels from joined DataFrame or drop altogether to make them be computed lazily later. - dtypes : "copy", pandas.Series or None, default: None + dtypes : "copy", pandas.Series or None, optional Dtypes of the result. "copy" to keep old dtypes and None to compute them on demand. Returns @@ -3232,6 +3291,7 @@ def _pick_axis(get_axis, sizes_cache): new_row_lengths, new_column_widths, dtypes=dtypes, + pandas_backend=self._pandas_backend, ) def _prepare_frame_to_broadcast(self, axis, indices, broadcast_all): @@ -3299,14 +3359,14 @@ def broadcast_apply_select_indices( self, axis, func, - other, + other: PandasDataframe, apply_indices=None, numeric_indices=None, keep_remaining=False, broadcast_all=True, new_index=None, new_columns=None, - ): + ) -> PandasDataframe: """ Apply a function to select indices at specified axis and broadcast partitions of `other` Modin DataFrame. @@ -3318,9 +3378,9 @@ def broadcast_apply_select_indices( Function to apply. other : PandasDataframe Partitions of which should be broadcasted. - apply_indices : list, default: None + apply_indices : list, optional List of labels to apply (if `numeric_indices` are not specified). - numeric_indices : list, default: None + numeric_indices : list, optional Numeric indices to apply (if `apply_indices` are not specified). keep_remaining : bool, default: False Whether drop the data that is not computed over or not. @@ -3373,7 +3433,10 @@ def broadcast_apply_select_indices( keep_remaining, ) return self.__constructor__( - new_partitions, index=new_index, columns=new_columns + new_partitions, + index=new_index, + columns=new_columns, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -3409,12 +3472,12 @@ def broadcast_apply_full_axis( new_columns : list-like, optional Columns of the result. We may know this in advance, and if not provided it must be computed. - apply_indices : list-like, default: None + apply_indices : list-like, optional Indices of `axis ^ 1` to apply function over. enumerate_partitions : bool, default: False Whether pass partition index into applied `func` or not. Note that `func` must be able to obtain `partition_idx` kwarg. - dtypes : list-like, default: None + dtypes : list-like or scalar, optional Data types of the result. This is an optimization because there are functions that always result in a particular data type, and allows us to avoid (re)computing it. @@ -3488,11 +3551,9 @@ def broadcast_apply_full_axis( kw["dtypes"] = dtypes.copy() else: if new_columns is None: - kw["dtypes"] = ModinDtypes( - DtypesDescriptor( - remaining_dtype=pandas.api.types.pandas_dtype(dtypes) - ) - ) + assert not is_list_like(dtypes) + dtype = pandas.api.types.pandas_dtype(dtypes) + kw["dtypes"] = ModinDtypes(DtypesDescriptor(remaining_dtype=dtype)) else: kw["dtypes"] = ( pandas.Series(dtypes, index=new_columns) @@ -3562,7 +3623,11 @@ def broadcast_apply_full_axis( kw["column_widths"] = self._column_widths_cache result = self.__constructor__( - new_partitions, index=new_index, columns=new_columns, **kw + new_partitions, + index=new_index, + columns=new_columns, + **kw, + pandas_backend=self._pandas_backend, ) if sync_labels and new_index is not None: result.synchronize_labels(axis=0) @@ -3624,7 +3689,7 @@ def _copartition( this method will skip repartitioning if it is possible. This is because reindexing is extremely inefficient. Because this method is used to `join` or `append`, it is vital that the internal indices match. - fill_value : any, default: None + fill_value : any, optional Value to use for missing values. Returns @@ -3742,12 +3807,12 @@ def _copartition( def n_ary_op( self, op, - right_frames: list, + right_frames: list[PandasDataframe], join_type="outer", copartition_along_columns=True, labels="replace", - dtypes=None, - ): + dtypes: Optional[pandas.Series] = None, + ) -> PandasDataframe: """ Perform an n-opary operation by joining with other Modin DataFrame(s). @@ -3765,7 +3830,7 @@ def n_ary_op( labels : {"replace", "drop"}, default: "replace" Whether use labels from joined DataFrame or drop altogether to make them be computed lazily later. - dtypes : series, default: None + dtypes : pandas.Series, optional Dtypes of the resultant dataframe, this argument will be received if the resultant dtypes of n-opary operation is precomputed. @@ -3784,6 +3849,7 @@ def n_ary_op( self.copy_columns_cache(copy_lengths=True), row_lengths, self._column_widths_cache, + pandas_backend=self._pandas_backend, ) new_right_frames = [ self.__constructor__( @@ -3792,6 +3858,7 @@ def n_ary_op( right_frame.copy_columns_cache(copy_lengths=True), row_lengths, right_frame._column_widths_cache, + pandas_backend=self._pandas_backend, ) for right_parts, right_frame in zip(list_of_right_parts, right_frames) ] @@ -3829,6 +3896,7 @@ def n_ary_op( row_lengths, column_widths, dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -3916,6 +3984,8 @@ def _compute_new_widths(): new_index = self.index.append([other.index for other in others]) new_columns = joined_index frames = [self] + others + # TODO: should we wrap all `concat` call into "try except" block? + # `ModinDtypes.concat` can throw exception in case of duplicate values new_dtypes = ModinDtypes.concat([frame._dtypes for frame in frames], axis=1) # If we have already cached the length of each row in at least one # of the row's partitions, we can build new_lengths for the new @@ -3955,11 +4025,23 @@ def _compute_new_widths(): new_widths = None return self.__constructor__( - new_partitions, new_index, new_columns, new_lengths, new_widths, new_dtypes + new_partitions, + new_index, + new_columns, + new_lengths, + new_widths, + new_dtypes, + pandas_backend=self._pandas_backend, ) def _apply_func_to_range_partitioning_broadcast( - self, right, func, key, new_index=None, new_columns=None, new_dtypes=None + self, + right, + func, + key, + new_index=None, + new_columns=None, + new_dtypes: Optional[Union[ModinDtypes, pandas.Series]] = None, ): """ Apply `func` against two dataframes using range-partitioning implementation. @@ -4024,6 +4106,7 @@ def _apply_func_to_range_partitioning_broadcast( index=new_index, columns=new_columns, dtypes=new_dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -4372,6 +4455,7 @@ def join_cols(df, *cols): new_partitions, index=result.copy_index_cache(), row_lengths=result._row_lengths_cache, + pandas_backend=self._pandas_backend, ) if ( @@ -4422,7 +4506,7 @@ def groupby_reduce( new_columns : pandas.Index, optional Columns of the result. We may know this in advance, and if not provided it must be computed. - apply_indices : list-like, default: None + apply_indices : list-like, optional Indices of `axis ^ 1` to apply groupby over. Returns @@ -4448,7 +4532,10 @@ def groupby_reduce( axis, self._partitions, by_parts, map_func, reduce_func, apply_indices ) return self.__constructor__( - new_partitions, index=new_index, columns=new_columns + new_partitions, + index=new_index, + columns=new_columns, + pandas_backend=self._pandas_backend, ) @classmethod @@ -4469,8 +4556,8 @@ def from_pandas(cls, df): new_index = df.index new_columns = df.columns new_dtypes = df.dtypes - new_frame, new_lengths, new_widths = cls._partition_mgr_cls.from_pandas( - df, True + new_frame, pandas_backend, new_lengths, new_widths = ( + cls._partition_mgr_cls.from_pandas(df, True) ) return cls( new_frame, @@ -4479,6 +4566,7 @@ def from_pandas(cls, df): new_lengths, new_widths, dtypes=new_dtypes, + pandas_backend=pandas_backend, ) @classmethod @@ -4496,8 +4584,8 @@ def from_arrow(cls, at): PandasDataframe New Modin DataFrame. """ - new_frame, new_lengths, new_widths = cls._partition_mgr_cls.from_arrow( - at, return_dims=True + new_frame, pandas_backend, new_lengths, new_widths = ( + cls._partition_mgr_cls.from_arrow(at, return_dims=True) ) new_columns = Index.__new__(Index, data=at.column_names, dtype="O") new_index = Index.__new__(RangeIndex, data=range(at.num_rows)) @@ -4512,6 +4600,7 @@ def from_arrow(cls, at): row_lengths=new_lengths, column_widths=new_widths, dtypes=new_dtypes, + pandas_backend=pandas_backend, ) @classmethod @@ -4532,6 +4621,10 @@ def _arrow_type_to_dtype(cls, arrow_type): import pyarrow try: + # TODO: should we map arrow types to pyarrow-backed pandas types? + # It seems like this might help avoid the expense of transferring + # data between backends (numpy and pyarrow), but we need to be sure + # how this fits into the type inference system in pandas. res = arrow_type.to_pandas_dtype() # Conversion to pandas is not implemented for some arrow types, # perform manual conversion for them: @@ -4630,6 +4723,7 @@ def transpose(self): self._column_widths_cache, self._row_lengths_cache, dtypes=new_dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -4817,6 +4911,7 @@ def remote_fn(df, name, caselist): # pragma: no cover columns, row_lengths, column_widths, + pandas_backend=self._pandas_backend, ) for part in list_of_right_parts ) @@ -4888,4 +4983,5 @@ def map_data( index=self.index, row_lengths=lengths, column_widths=[1], + pandas_backend=self._pandas_backend, ) diff --git a/modin/core/dataframe/pandas/metadata/dtypes.py b/modin/core/dataframe/pandas/metadata/dtypes.py index c403e9ef7f9..1918cce16fa 100644 --- a/modin/core/dataframe/pandas/metadata/dtypes.py +++ b/modin/core/dataframe/pandas/metadata/dtypes.py @@ -278,9 +278,7 @@ def copy(self) -> DtypesDescriptor: _schema_is_known=self._schema_is_known, ) - def set_index( - self, new_index: Union[pandas.Index, "ModinIndex"] - ) -> DtypesDescriptor: + def set_index(self, new_index: Union[pandas.Index, ModinIndex]) -> DtypesDescriptor: """ Set new column names for this descriptor. @@ -497,21 +495,18 @@ def _merge_dtypes( # in the 'dtypes_matrix' series = pandas.Series(dtypes, name=i) dtypes_matrix = pandas.concat([dtypes_matrix, series], axis=1) - dtypes_matrix.fillna( - value={ - # If we encountered a 'NaN' while 'val' describes all the columns, then - # it means, that the missing columns for this instance will be filled with NaNs (floats), - # otherwise, it may indicate missing columns that this 'val' has no info about, - # meaning that we shouldn't try computing a new dtype for this column, - # so marking it as 'unknown' - i: ( - pandas.api.types.pandas_dtype(float) - if val._know_all_names and val._remaining_dtype is None - else "unknown" - ) - }, - inplace=True, - ) + if not (val._know_all_names and val._remaining_dtype is None): + dtypes_matrix.fillna( + value={ + # If we encountered a 'NaN' while 'val' describes all the columns, then + # it means, that the missing columns for this instance will be filled with NaNs (floats), + # otherwise, it may indicate missing columns that this 'val' has no info about, + # meaning that we shouldn't try computing a new dtype for this column, + # so marking it as 'unknown' + i: "unknown", + }, + inplace=True, + ) elif isinstance(val, pandas.Series): dtypes_matrix = pandas.concat([dtypes_matrix, val], axis=1) elif val is None: @@ -889,7 +884,7 @@ def concat(cls, values: list, axis: int = 0) -> ModinDtypes: desc = pandas.concat(values) return ModinDtypes(desc) - def set_index(self, new_index: Union[pandas.Index, "ModinIndex"]) -> ModinDtypes: + def set_index(self, new_index: Union[pandas.Index, ModinIndex]) -> ModinDtypes: """ Set new column names for stored dtypes. diff --git a/modin/core/dataframe/pandas/metadata/index.py b/modin/core/dataframe/pandas/metadata/index.py index d5aa37e52a0..b731a99bc73 100644 --- a/modin/core/dataframe/pandas/metadata/index.py +++ b/modin/core/dataframe/pandas/metadata/index.py @@ -15,6 +15,7 @@ import functools import uuid +from typing import Optional import pandas from pandas.core.dtypes.common import is_list_like @@ -44,7 +45,7 @@ class ModinIndex: Materialized dtypes of index levels. """ - def __init__(self, value=None, axis=None, dtypes=None): + def __init__(self, value=None, axis=None, dtypes: Optional[pandas.Series] = None): from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe self._is_default_callable = False @@ -69,7 +70,7 @@ def __init__(self, value=None, axis=None, dtypes=None): self._index_id = uuid.uuid4() self._lengths_id = uuid.uuid4() - def maybe_get_dtypes(self): + def maybe_get_dtypes(self) -> Optional[pandas.Series]: """ Get index dtypes if available. diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index e813560e244..79b3d9ccf75 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -41,6 +41,7 @@ from modin.error_message import ErrorMessage from modin.logging import ClassLogger from modin.logging.config import LogLevel +from modin.pandas.utils import get_pandas_backend if TYPE_CHECKING: from modin.core.dataframe.pandas.dataframe.utils import ShuffleFunctions @@ -1019,7 +1020,7 @@ def from_pandas(cls, df, return_dims=False): Returns ------- - np.ndarray or (np.ndarray, row_lengths, col_widths) + (np.ndarray, backend) or (np.ndarray, backend, row_lengths, col_widths) A NumPy array with partitions (with dimensions or not). """ num_splits = NPartitions.get() @@ -1059,10 +1060,11 @@ def update_bar(f): parts = cls.split_pandas_df_into_partitions( df, row_chunksize, col_chunksize, update_bar ) + backend = get_pandas_backend(df.dtypes) if ProgressBar.get(): pbar.close() if not return_dims: - return parts + return parts, backend else: row_lengths = [ ( @@ -1080,7 +1082,7 @@ def update_bar(f): ) for i in range(0, len(df.columns), col_chunksize) ] - return parts, row_lengths, col_widths + return parts, backend, row_lengths, col_widths @classmethod def from_arrow(cls, at, return_dims=False): @@ -1097,7 +1099,7 @@ def from_arrow(cls, at, return_dims=False): Returns ------- - np.ndarray or (np.ndarray, row_lengths, col_widths) + (np.ndarray, backend) or (np.ndarray, backend, row_lengths, col_widths) A NumPy array with partitions (with dimensions or not). """ return cls.from_pandas(at.to_pandas(), return_dims=return_dims) diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py index 0920d963840..5e4598d0ddf 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py @@ -38,6 +38,8 @@ class PandasOnDaskDataframe(PandasDataframe): each of the block partitions. Is computed if not provided. dtypes : pandas.Series, optional The data types for the dataframe columns. + pandas_backend : {"pyarrow", None}, optional + Backend used by pandas. None - means default NumPy backend. """ _partition_mgr_cls = PandasOnDaskDataframePartitionManager diff --git a/modin/core/execution/python/implementations/pandas_on_python/dataframe/dataframe.py b/modin/core/execution/python/implementations/pandas_on_python/dataframe/dataframe.py index 6e314beaa9c..0e2bc70d995 100644 --- a/modin/core/execution/python/implementations/pandas_on_python/dataframe/dataframe.py +++ b/modin/core/execution/python/implementations/pandas_on_python/dataframe/dataframe.py @@ -45,6 +45,8 @@ class PandasOnPythonDataframe(PandasDataframe): each of the block partitions. Is computed if not provided. dtypes : pandas.Series, optional The data types for the dataframe columns. + pandas_backend : {"pyarrow", None}, optional + Backend used by pandas. None - means default NumPy backend. """ _partition_mgr_cls = PandasOnPythonDataframePartitionManager diff --git a/modin/core/execution/ray/implementations/cudf_on_ray/dataframe/dataframe.py b/modin/core/execution/ray/implementations/cudf_on_ray/dataframe/dataframe.py index a40d10cd4c1..b4fef2ed18a 100644 --- a/modin/core/execution/ray/implementations/cudf_on_ray/dataframe/dataframe.py +++ b/modin/core/execution/ray/implementations/cudf_on_ray/dataframe/dataframe.py @@ -50,6 +50,8 @@ class cuDFOnRayDataframe(PandasOnRayDataframe): each of the block partitions. Is computed if not provided. dtypes : pandas.Series, optional The data types for the dataframe columns. + pandas_backend : {"pyarrow", None}, optional + Backend used by pandas. None - means default NumPy backend. """ _partition_mgr_cls = cuDFOnRayDataframePartitionManager diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/dataframe/dataframe.py b/modin/core/execution/ray/implementations/pandas_on_ray/dataframe/dataframe.py index 6838fd9edca..373a84ecdb4 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/dataframe/dataframe.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/dataframe/dataframe.py @@ -39,6 +39,8 @@ class PandasOnRayDataframe(PandasDataframe): each of the block partitions. Is computed if not provided. dtypes : pandas.Series, optional The data types for the dataframe columns. + pandas_backend : {"pyarrow", None}, optional + Backend used by pandas. None - means default NumPy backend. """ _partition_mgr_cls = PandasOnRayDataframePartitionManager diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py index 3241e9299e8..9adba6bc6dc 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/dataframe/dataframe.py @@ -38,6 +38,8 @@ class PandasOnUnidistDataframe(PandasDataframe): each of the block partitions. Is computed if not provided. dtypes : pandas.Series, optional The data types for the dataframe columns. + pandas_backend : {"pyarrow", None}, optional + Backend used by pandas. None - means default NumPy backend. """ _partition_mgr_cls = PandasOnUnidistDataframePartitionManager diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index b25bc9b1ea2..50008f261a4 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -6757,6 +6757,17 @@ def case_when(self, caselist): # noqa: PR01, RT01, D200 ] return SeriesDefault.register(pandas.Series.case_when)(self, caselist=caselist) + def get_pandas_backend(self) -> Optional[str]: + """ + Get backend stored in `_modin_frame`. + + Returns + ------- + str | None + Backend name. + """ + return self._modin_frame._pandas_backend + def repartition(self, axis=None): """ Repartitioning QueryCompiler objects to get ideal partitions inside. diff --git a/modin/core/storage_formats/pandas/aggregations.py b/modin/core/storage_formats/pandas/aggregations.py index 454b75c442b..3959b86f3ab 100644 --- a/modin/core/storage_formats/pandas/aggregations.py +++ b/modin/core/storage_formats/pandas/aggregations.py @@ -13,6 +13,8 @@ """Contains implementations for aggregation functions.""" +from __future__ import annotations + from enum import Enum from typing import TYPE_CHECKING, Callable, Tuple @@ -38,7 +40,7 @@ class Method(Enum): @classmethod def build_corr_method( cls, - ) -> Callable[["PandasQueryCompiler", str, int, bool], "PandasQueryCompiler"]: + ) -> Callable[[PandasQueryCompiler, str, int, bool], PandasQueryCompiler]: """ Build a query compiler method computing the correlation matrix. @@ -49,12 +51,13 @@ def build_corr_method( """ def corr_method( - qc: "PandasQueryCompiler", + qc: PandasQueryCompiler, method: str, min_periods: int = 1, numeric_only: bool = True, - ) -> "PandasQueryCompiler": - if method != "pearson": + ) -> PandasQueryCompiler: + # Further implementation is designed for the default pandas backend (numpy) + if method != "pearson" or qc.get_pandas_backend() == "pyarrow": return super(type(qc), qc).corr( method=method, min_periods=min_periods, numeric_only=numeric_only ) @@ -101,7 +104,7 @@ def corr_method( @classmethod def build_cov_method( cls, - ) -> Callable[["PandasQueryCompiler", int, int], "PandasQueryCompiler"]: + ) -> Callable[[PandasQueryCompiler, int, int], PandasQueryCompiler]: """ Build a query compiler method computing the covariance matrix. diff --git a/modin/core/storage_formats/pandas/groupby.py b/modin/core/storage_formats/pandas/groupby.py index e327efbda4a..55de645a898 100644 --- a/modin/core/storage_formats/pandas/groupby.py +++ b/modin/core/storage_formats/pandas/groupby.py @@ -15,6 +15,7 @@ import numpy as np import pandas +from pandas.core.dtypes.cast import find_common_type from modin.config import use_range_partitioning_groupby from modin.core.dataframe.algebra import GroupByReduce @@ -358,7 +359,10 @@ def applyier(df, other): # pragma: no cover # transposing it back to be consistent with column axis values along # different partitions if len(index) == 0 and len(columns) > 0: - result = result.transpose() + common_type = find_common_type(result.dtypes.tolist()) + # TODO: remove find_common_type+astype after pandas fix the following issue + # transpose loses dtypes: https://github.com/pandas-dev/pandas/issues/43337 + result = result.transpose().astype(common_type, copy=False) return result diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index b710add0130..0014f4992ef 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -25,7 +25,7 @@ import re import warnings from collections.abc import Iterable -from typing import Hashable, List +from typing import TYPE_CHECKING, Hashable, List, Optional import numpy as np import pandas @@ -81,6 +81,9 @@ from .merge import MergeImpl from .utils import get_group_names, merge_partitioning +if TYPE_CHECKING: + from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe + def _get_axis(axis): """ @@ -265,7 +268,10 @@ class PandasQueryCompiler(BaseQueryCompiler): Shape hint for frames known to be a column or a row, otherwise None. """ - def __init__(self, modin_frame, shape_hint=None): + _modin_frame: PandasDataframe + _shape_hint: Optional[str] + + def __init__(self, modin_frame: PandasDataframe, shape_hint: Optional[str] = None): self._modin_frame = modin_frame self._shape_hint = shape_hint @@ -1862,9 +1868,33 @@ def isin_func(df, values): abs = Map.register(pandas.DataFrame.abs, dtypes="copy") map = Map.register(pandas.DataFrame.map) conj = Map.register(lambda df, *args, **kwargs: pandas.DataFrame(np.conj(df))) - convert_dtypes = Fold.register(pandas.DataFrame.convert_dtypes) + + def convert_dtypes( + self, + infer_objects: bool = True, + convert_string: bool = True, + convert_integer: bool = True, + convert_boolean: bool = True, + convert_floating: bool = True, + dtype_backend: str = "numpy_nullable", + ): + result = Fold.register(pandas.DataFrame.convert_dtypes)( + self, + infer_objects=infer_objects, + convert_string=convert_string, + convert_integer=convert_integer, + convert_boolean=convert_boolean, + convert_floating=convert_floating, + dtype_backend=dtype_backend, + ) + # TODO: `numpy_nullable` should be handled similar + if dtype_backend == "pyarrow": + result._modin_frame._pandas_backend = "pyarrow" + return result + invert = Map.register(pandas.DataFrame.__invert__, dtypes="copy") isna = Map.register(pandas.DataFrame.isna, dtypes=np.bool_) + # TODO: better way to distinguish methods for NumPy API? _isfinite = Map.register( lambda df, *args, **kwargs: pandas.DataFrame(np.isfinite(df, *args, **kwargs)), dtypes=np.bool_, @@ -2254,6 +2284,9 @@ def clip(self, lower, upper, **kwargs): corr = CorrCovBuilder.build_corr_method() def cov(self, min_periods=None, ddof=1): + if self.get_pandas_backend() == "pyarrow": + return super().cov(min_periods=min_periods, ddof=ddof) + # _nancorr use numpy which incompatible with pandas dataframes on pyarrow return self._nancorr(min_periods=min_periods, cov=True, ddof=ddof) def _nancorr(self, min_periods=1, cov=False, ddof=1): @@ -2822,7 +2855,7 @@ def applyier(df, internal_indices, other=[], internal_other_indices=[]): ) # __setitem__ methods - def setitem_bool(self, row_loc, col_loc, item): + def setitem_bool(self, row_loc: PandasQueryCompiler, col_loc, item): def _set_item(df, row_loc): # pragma: no cover df = df.copy() df.loc[row_loc.squeeze(axis=1), col_loc] = item @@ -2899,7 +2932,9 @@ def getitem_array(self, key): ) return self.getitem_column_array(key) - def getitem_column_array(self, key, numeric=False, ignore_order=False): + def getitem_column_array( + self, key, numeric=False, ignore_order=False + ) -> PandasQueryCompiler: shape_hint = "column" if len(key) == 1 else None if numeric: if ignore_order and is_list_like(key): @@ -3111,7 +3146,9 @@ def reduce(df: pandas.DataFrame, mask: pandas.DataFrame): shape_hint=self._shape_hint, ) - def drop(self, index=None, columns=None, errors: str = "raise"): + def drop( + self, index=None, columns=None, errors: str = "raise" + ) -> PandasQueryCompiler: # `errors` parameter needs to be part of the function signature because # other query compilers may not take care of error handling at the API # layer. This query compiler assumes there won't be any errors due to @@ -3912,7 +3949,7 @@ def agg_func(grp, *args, **kwargs): add_missing_cats=add_missing_cats, **groupby_kwargs, ) - result_qc = self.__constructor__(result) + result_qc: PandasQueryCompiler = self.__constructor__(result) if not is_transform and not groupby_kwargs.get("as_index", True): return result_qc.reset_index(drop=True) @@ -4578,7 +4615,6 @@ def sort_columns_by_row_values(self, rows, ascending=True, **kwargs): def cat_codes(self): def func(df: pandas.DataFrame) -> pandas.DataFrame: ser = df.iloc[:, 0] - assert isinstance(ser.dtype, pandas.CategoricalDtype) return ser.cat.codes.to_frame(name=MODIN_UNNAMED_SERIES_LABEL) res = self._modin_frame.map(func=func, new_columns=[MODIN_UNNAMED_SERIES_LABEL]) diff --git a/modin/pandas/base.py b/modin/pandas/base.py index 7861ad165a6..746ed2ec9fa 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -364,14 +364,14 @@ def _validate_other( other_dtypes = [other.dtype] * len(other) elif is_dict_like(other): other_dtypes = [ - type(other[label]) + other[label] if pandas.isna(other[label]) else type(other[label]) for label in self._get_axis(axis) # The binary operation is applied for intersection of axis labels # and dictionary keys. So filtering out extra keys. if label in other ] else: - other_dtypes = [type(x) for x in other] + other_dtypes = [x if pandas.isna(x) else type(x) for x in other] if compare_index: if not self.index.equals(other.index): raise TypeError("Cannot perform operation with non-equal index") @@ -391,17 +391,18 @@ def _validate_other( # TODO(https://github.com/modin-project/modin/issues/5239): # this spuriously rejects other that is a list including some # custom type that can be added to self's elements. - if not all( - (is_numeric_dtype(self_dtype) and is_numeric_dtype(other_dtype)) - or (is_object_dtype(self_dtype) and is_object_dtype(other_dtype)) - or ( - lib.is_np_dtype(self_dtype, "mM") - and lib.is_np_dtype(self_dtype, "mM") - ) - or is_dtype_equal(self_dtype, other_dtype) - for self_dtype, other_dtype in zip(self_dtypes, other_dtypes) - ): - raise TypeError("Cannot do operation with improper dtypes") + for self_dtype, other_dtype in zip(self_dtypes, other_dtypes): + if not ( + (is_numeric_dtype(self_dtype) and is_numeric_dtype(other_dtype)) + or (is_numeric_dtype(self_dtype) and pandas.isna(other_dtype)) + or (is_object_dtype(self_dtype) and is_object_dtype(other_dtype)) + or ( + lib.is_np_dtype(self_dtype, "mM") + and lib.is_np_dtype(self_dtype, "mM") + ) + or is_dtype_equal(self_dtype, other_dtype) + ): + raise TypeError("Cannot do operation with improper dtypes") return result def _validate_function(self, func, on_invalid=None) -> None: diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 1025937b107..7cbfc2634d9 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1622,12 +1622,17 @@ def prod( skipna is not False and numeric_only is False and min_count > len(axis_to_apply) + # This fast path is only suitable for the default backend + and self._query_compiler.get_pandas_backend() is None ): new_index = self.columns if not axis else self.index + # >>> pd.DataFrame([1,2,3,4], dtype="int64[pyarrow]").prod(min_count=10) + # 0 + # dtype: int64[pyarrow] return Series( [np.nan] * len(new_index), index=new_index, - dtype=pandas.api.types.pandas_dtype("object"), + dtype=pandas.api.types.pandas_dtype("float64"), ) data = self._validate_dtypes_prod_mean(axis, numeric_only, ignore_axis=True) @@ -2147,12 +2152,14 @@ def sum( skipna is not False and numeric_only is False and min_count > len(axis_to_apply) + # This fast path is only suitable for the default backend + and self._query_compiler.get_pandas_backend() is None ): new_index = self.columns if not axis else self.index return Series( [np.nan] * len(new_index), index=new_index, - dtype=pandas.api.types.pandas_dtype("object"), + dtype=pandas.api.types.pandas_dtype("float64"), ) # We cannot add datetime types, so if we are summing a column with diff --git a/modin/pandas/utils.py b/modin/pandas/utils.py index 03a9078b0ee..19e7f4df1c3 100644 --- a/modin/pandas/utils.py +++ b/modin/pandas/utils.py @@ -13,6 +13,8 @@ """Implement utils for pandas component.""" +from __future__ import annotations + from typing import Iterator, Optional, Tuple import numpy as np @@ -116,6 +118,26 @@ def is_scalar(obj): return not isinstance(obj, BasePandasDataset) and pandas_is_scalar(obj) +def get_pandas_backend(dtypes: pandas.Series) -> str | None: + """ + Determine the backend based on the `dtypes`. + + Parameters + ---------- + dtypes : pandas.Series + DataFrame dtypes. + + Returns + ------- + str | None + Backend name. + """ + backend = None + if any(isinstance(x, pandas.ArrowDtype) for x in dtypes): + backend = "pyarrow" + return backend + + def is_full_grab_slice(slc, sequence_len=None): """ Check that the passed slice grabs the whole sequence. diff --git a/modin/tests/pandas/dataframe/test_binary.py b/modin/tests/pandas/dataframe/test_binary.py index 062250bbd8a..10dabbe32bc 100644 --- a/modin/tests/pandas/dataframe/test_binary.py +++ b/modin/tests/pandas/dataframe/test_binary.py @@ -75,7 +75,8 @@ *("truediv", "rtruediv", "mul", "rmul", "floordiv", "rfloordiv"), ], ) -def test_math_functions(other, axis, op): +@pytest.mark.parametrize("backend", [None, "pyarrow"]) +def test_math_functions(other, axis, op, backend): data = test_data["float_nan_data"] if (op == "floordiv" or op == "rfloordiv") and axis == "rows": # lambda == "series_or_list" @@ -85,8 +86,11 @@ def test_math_functions(other, axis, op): # lambda == "series_or_list" pytest.xfail(reason="different behavior") + if op in ("mod", "rmod") and backend == "pyarrow": + pytest.skip(reason="These functions are not implemented in pandas itself") eval_general( - *create_test_dfs(data), lambda df: getattr(df, op)(other(df, axis), axis=axis) + *create_test_dfs(data, backend=backend), + lambda df: getattr(df, op)(other(df, axis), axis=axis), ) diff --git a/modin/tests/pandas/dataframe/test_default.py b/modin/tests/pandas/dataframe/test_default.py index 28c2c20a53e..45ab3e2ec95 100644 --- a/modin/tests/pandas/dataframe/test_default.py +++ b/modin/tests/pandas/dataframe/test_default.py @@ -250,15 +250,16 @@ def test_combine_first(): class TestCorr: @pytest.mark.parametrize("method", ["pearson", "kendall", "spearman"]) - def test_corr(self, method): + @pytest.mark.parametrize("backend", [None, "pyarrow"]) + def test_corr(self, method, backend): eval_general( - *create_test_dfs(test_data["int_data"]), + *create_test_dfs(test_data["int_data"], backend=backend), lambda df: df.corr(method=method), ) # Modin result may slightly differ from pandas result # due to floating pointing arithmetic. eval_general( - *create_test_dfs(test_data["float_nan_data"]), + *create_test_dfs(test_data["float_nan_data"], backend=backend), lambda df: df.corr(method=method), comparator=modin_df_almost_equals_pandas, ) @@ -352,7 +353,8 @@ def test_corr_nans_in_different_partitions(self): @pytest.mark.parametrize("min_periods", [1, 3, 5], ids=lambda x: f"min_periods={x}") @pytest.mark.parametrize("ddof", [1, 2, 4], ids=lambda x: f"ddof={x}") -def test_cov(min_periods, ddof): +@pytest.mark.parametrize("backend", [None, "pyarrow"]) +def test_cov(min_periods, ddof, backend): # Modin result may slightly differ from pandas result # due to floating pointing arithmetic. if StorageFormat.get() == "Hdk": @@ -366,13 +368,13 @@ def comparator1(df1, df2): comparator2 = modin_df_almost_equals_pandas eval_general( - *create_test_dfs(test_data["int_data"]), + *create_test_dfs(test_data["int_data"], backend=backend), lambda df: df.cov(min_periods=min_periods, ddof=ddof), comparator=comparator1, ) eval_general( - *create_test_dfs(test_data["float_nan_data"]), + *create_test_dfs(test_data["float_nan_data"], backend=backend), lambda df: df.cov(min_periods=min_periods), comparator=comparator2, ) @@ -777,6 +779,7 @@ def test_pivot_table_data(data, index, columns, values, aggfunc, request): [pytest.param("Custom name", id="str_name")], ) @pytest.mark.parametrize("fill_value", [None, 0]) +@pytest.mark.parametrize("backend", [None, "pyarrow"]) def test_pivot_table_margins( data, index, @@ -785,13 +788,14 @@ def test_pivot_table_margins( aggfunc, margins_name, fill_value, + backend, request, ): expected_exception = None if "dict_func" in request.node.callspec.id: expected_exception = KeyError("Column(s) ['col28', 'col38'] do not exist") eval_general( - *create_test_dfs(data), + *create_test_dfs(data, backend=backend), operation=lambda df, *args, **kwargs: df.pivot_table(*args, **kwargs), index=index, columns=columns, diff --git a/modin/tests/pandas/dataframe/test_reduce.py b/modin/tests/pandas/dataframe/test_reduce.py index 7c2ca37124a..d6f76d68507 100644 --- a/modin/tests/pandas/dataframe/test_reduce.py +++ b/modin/tests/pandas/dataframe/test_reduce.py @@ -324,7 +324,7 @@ def test_sum(data, axis, skipna, is_transposed, request): df_equals(modin_result, pandas_result) -@pytest.mark.parametrize("dtype", ["int64", "Int64"]) +@pytest.mark.parametrize("dtype", ["int64", "Int64", "int64[pyarrow]"]) def test_dtype_consistency(dtype): # test for issue #6781 res_dtype = pd.DataFrame([1, 2, 3, 4], dtype=dtype).sum().dtype @@ -355,6 +355,12 @@ def test_sum_prod_specific(fn, min_count, numeric_only): ) +@pytest.mark.parametrize("backend", [None, "pyarrow"]) +def test_sum_prod_min_count(backend): + md_df, pd_df = create_test_dfs(test_data["float_nan_data"], backend=backend) + eval_general(md_df, pd_df, lambda df: df.prod(min_count=len(pd_df) + 1)) + + @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) def test_sum_single_column(data): modin_df = pd.DataFrame(data).iloc[:, [0]] diff --git a/modin/tests/pandas/test_series.py b/modin/tests/pandas/test_series.py index ded75a4af65..be737e4c70d 100644 --- a/modin/tests/pandas/test_series.py +++ b/modin/tests/pandas/test_series.py @@ -1419,13 +1419,15 @@ def comparator(df1, df2): comparator=comparator, ) - # FIXME: https://github.com/modin-project/modin/issues/7203 - # eval_general( - # modin_series, - # pandas_series, - # lambda ser: ser > (ser + 1), - # comparator=comparator, - # ) + if StorageFormat.get() != "Hdk": + # FIXME: HDK should also work in this case but + # since we deprecated it, we will just remove this branch + eval_general( + modin_series, + pandas_series, + lambda ser: ser > (ser + 1), + comparator=comparator, + ) eval_general( modin_series, diff --git a/modin/tests/pandas/utils.py b/modin/tests/pandas/utils.py index 7a1403b3842..2dd4346c814 100644 --- a/modin/tests/pandas/utils.py +++ b/modin/tests/pandas/utils.py @@ -662,9 +662,9 @@ def assert_dtypes_equal(df1, df2): lambda obj: isinstance(obj, pandas.PeriodDtype), ) - for col in dtypes1.keys(): + for idx in range(len(dtypes1)): for comparator in dtype_comparators: - if assert_all_act_same(comparator, dtypes1[col], dtypes2[col]): + if assert_all_act_same(comparator, dtypes1.iloc[idx], dtypes2.iloc[idx]): # We met a dtype that both types satisfy, so we can stop iterating # over comparators and compare next dtypes break @@ -1086,14 +1086,25 @@ def eval_io_from_str(csv_str: str, unique_filename: str, **kwargs): ) -def create_test_dfs(*args, **kwargs) -> tuple[pd.DataFrame, pandas.DataFrame]: - post_fn = kwargs.pop("post_fn", lambda df: df) +def create_test_dfs( + *args, post_fn=None, backend=None, **kwargs +) -> tuple[pd.DataFrame, pandas.DataFrame]: + if post_fn is None: + post_fn = lambda df: ( # noqa: E731 + df.convert_dtypes(dtype_backend=backend) if backend is not None else df + ) + elif backend is not None: + post_fn = lambda df: post_fn(df).convert_dtypes( # noqa: E731 + dtype_backend=backend + ) return tuple( map(post_fn, [pd.DataFrame(*args, **kwargs), pandas.DataFrame(*args, **kwargs)]) ) -def create_test_series(vals, sort=False, **kwargs) -> tuple[pd.Series, pandas.Series]: +def create_test_series( + vals, sort=False, backend=None, **kwargs +) -> tuple[pd.Series, pandas.Series]: if isinstance(vals, dict): modin_series = pd.Series(vals[next(iter(vals.keys()))], **kwargs) pandas_series = pandas.Series(vals[next(iter(vals.keys()))], **kwargs) @@ -1103,6 +1114,10 @@ def create_test_series(vals, sort=False, **kwargs) -> tuple[pd.Series, pandas.Se if sort: modin_series = modin_series.sort_values().reset_index(drop=True) pandas_series = pandas_series.sort_values().reset_index(drop=True) + + if backend is not None: + modin_series = modin_series.convert_dtypes(dtype_backend=backend) + pandas_series = pandas_series.convert_dtypes(dtype_backend=backend) return modin_series, pandas_series