diff --git a/docs/flow/modin/core/dataframe/algebra.rst b/docs/flow/modin/core/dataframe/algebra.rst index ded04dc386b..d07c3b43d7a 100644 --- a/docs/flow/modin/core/dataframe/algebra.rst +++ b/docs/flow/modin/core/dataframe/algebra.rst @@ -33,6 +33,9 @@ Uniformly apply a function argument to each partition in parallel. .. figure:: /img/map_evaluation.svg :align: center +.. autoclass:: modin.core.dataframe.algebra.map.Map + :members: register, apply + Reduce operator --------------- Applies an argument function that reduces each column or row on the specified axis into a scalar, but requires knowledge about the whole axis. @@ -43,6 +46,9 @@ that the reduce function returns a one dimensional frame. .. figure:: /img/reduce_evaluation.svg :align: center +.. autoclass:: modin.core.dataframe.algebra.reduce.Reduce + :members: register, apply + TreeReduce operator ------------------- Applies an argument function that reduces specified axis into a scalar. First applies map function to each partition @@ -50,6 +56,9 @@ in parallel, then concatenates resulted partitions along the specified axis and function. In contrast with `Map function` template, here you're allowed to change partition shape in the map phase. Note that the execution engine expects that the reduce function returns a one dimensional frame. +.. autoclass:: modin.core.dataframe.algebra.tree_reduce.TreeReduce + :members: register, apply + Binary operator --------------- Applies an argument function, that takes exactly two operands (first is always `QueryCompiler`). @@ -65,23 +74,32 @@ the right operand to the left. it automatically but note that this requires repartitioning, which is a much more expensive operation than the binary function itself. +.. autoclass:: modin.core.dataframe.algebra.binary.Binary + :members: register, apply + Fold operator ------------- Applies an argument function that requires knowledge of the whole axis. Be aware that providing this knowledge may be expensive because the execution engine has to concatenate partitions along the specified axis. +.. autoclass:: modin.core.dataframe.algebra.fold.Fold + :members: register, apply + GroupBy operator ---------------- Evaluates GroupBy aggregation for that type of functions that can be executed via TreeReduce approach. To be able to form groups engine broadcasts ``by`` partitions to each partition of the source frame. +.. autoclass:: modin.core.dataframe.algebra.groupby.GroupByReduce + :members: register, apply + Default-to-pandas operator -------------------------- Do :doc:`fallback to pandas ` for passed function. -How to register your own function -''''''''''''''''''''''''''''''''' +How to use UDFs with these operators +'''''''''''''''''''''''''''''''''''' Let's examine an example of how to use the algebra module to create your own new functions. @@ -95,34 +113,39 @@ Let's implement a function that counts non-NA values for each column or row TreeReduce approach would be great: in a map phase, we'll count non-NA cells in each partition in parallel and then just sum its results in the reduce phase. -To define the TreeReduce function that does `count` + `sum` we just need to register the -appropriate functions and then assign the result to the picked `QueryCompiler` -(`PandasQueryCompiler` in our case): +To execute a TreeReduce function that does `count` + `sum` you can simply use the operator's ``.apply(...)`` +method that takes and outputs a :py:class:`~modin.pandas.dataframe.DataFrame`: .. code-block:: python - from modin.core.storage_formats import PandasQueryCompiler from modin.core.dataframe.algebra import TreeReduce - PandasQueryCompiler.custom_count = TreeReduce.register(pandas.DataFrame.count, pandas.DataFrame.sum) + res_df = TreeReduce.apply( + df, + map_func=lambda df: df.count(), + reduce_function=lambda df: df.sum() + ) -Then, we want to handle it from the :py:class:`~modin.pandas.dataframe.DataFrame`, so we need to create a way to do that: +If you're going to use your custom-defined function quite often you may want +to wrap it into a separate function or assign it as a DataFrame's method: .. code-block:: python import modin.pandas as pd - def count_func(self, **kwargs): - # The constructor allows you to pass in a query compiler as a keyword argument - return self.__constructor__(query_compiler=self._query_compiler.custom_count(**kwargs)) + def count_func(self): + return TreeReduce.apply( + self, + map_func=lambda df: df.count(), + reduce_function=lambda df: df.sum() + ) - pd.DataFrame.count_custom = count_func - -And then you can use it like you usually would: + # you can then use the function as is + res = count_func(df) -.. code-block:: python - - df.count_custom(axis=1) + # or assign it to the DataFrame's class and use it as a method + pd.DataFrame.count_custom = count_func + res = df.count_custom() Many of the `pandas` API functions can be easily implemented this way, so if you find out that one of your favorite function is still defaulted to pandas and decide to diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 46585f6f69c..cdffd3c9bb0 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -297,7 +297,7 @@ def register( """ def caller( - query_compiler, other, broadcast=False, *args, dtypes=None, **kwargs + query_compiler, other, *args, broadcast=False, dtypes=None, **kwargs ): """ Apply binary `func` to passed operands. @@ -414,3 +414,43 @@ def caller( ) return caller + + @classmethod + def apply( + cls, left, right, func, axis=0, func_args=None, func_kwargs=None, **kwargs + ): + r""" + Apply a binary function row-wise or column-wise. + + Parameters + ---------- + left : modin.pandas.DataFrame or modin.pandas.Series + Left operand. + right : modin.pandas.DataFrame or modin.pandas.Series + Right operand. + func : callable(pandas.DataFrame, pandas.DataFrame, \*args, axis, \*\*kwargs) -> pandas.DataFrame + A binary function to apply `left` and `right`. + axis : int, default: 0 + Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``). + func_args : tuple, optional + Positional arguments to pass to the funcs. + func_kwargs : dict, optional + Keyword arguments to pass to the funcs. + **kwargs : dict + Additional arguments to pass to the ``cls.register()``. + + Returns + ------- + The same type as `df`. + """ + operator = cls.register(func, **kwargs) + + qc_result = operator( + left._query_compiler, + right._query_compiler, + broadcast=right.ndim == 1, + *(func_args or ()), + axis=axis, + **(func_kwargs or {}), + ) + return left.__constructor__(query_compiler=qc_result) diff --git a/modin/core/dataframe/algebra/fold.py b/modin/core/dataframe/algebra/fold.py index 419a0b56903..3aeb1b60586 100644 --- a/modin/core/dataframe/algebra/fold.py +++ b/modin/core/dataframe/algebra/fold.py @@ -65,3 +65,29 @@ def caller(query_compiler, fold_axis=None, *args, **kwargs): ) return caller + + @classmethod + def apply(cls, df, func, fold_axis=0, func_args=None, func_kwargs=None): + r""" + Apply a Fold (full-axis) function to the dataframe. + + Parameters + ---------- + df : modin.pandas.DataFrame or modin.pandas.Series + DataFrame object to apply the operator against. + func : callable(pandas.DataFrame[NxM], \*args, \*\*kwargs) -> pandas.DataFrame[NxM] + A function to apply to every partition. Note that the function shouldn't change + the shape of the dataframe. + fold_axis : int, default: 0 + Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``). + func_args : tuple, optional + Positional arguments to pass to the funcs. + func_kwargs : dict, optional + Keyword arguments to pass to the funcs. + + Returns + ------- + the same type as `df` + """ + func_args = (fold_axis,) + (func_args or ()) + return super().apply(df, func, func_args, func_kwargs or {}) diff --git a/modin/core/dataframe/algebra/groupby.py b/modin/core/dataframe/algebra/groupby.py index 78cb1ab8365..f04b79e2af0 100644 --- a/modin/core/dataframe/algebra/groupby.py +++ b/modin/core/dataframe/algebra/groupby.py @@ -736,3 +736,51 @@ def wrapper(df): return result return _map, _reduce + + @classmethod + def apply( + cls, + df, + map_func, + reduce_func, + by, + groupby_kwargs=None, + agg_args=None, + agg_kwargs=None, + ): + """ + Apply groupby aggregation function using map-reduce pattern. + + Parameters + ---------- + df : modin.pandas.DataFrame or modin.pandas.Series + A source DataFrame to group. + map_func : callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame + A map function to apply to a groupby object in every partition. + reduce_func : callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame + A reduction function to apply to the results of the map functions. + by : label or list of labels + Columns of the `df` to group on. + groupby_kwargs : dict, optional + Keyword arguments matching the signature of ``pandas.DataFrame.groupby``. + agg_args : tuple, optional + Positional arguments to pass to the funcs. + agg_kwargs : dict, optional + Keyword arguments to pass to the funcs. + + Returns + ------- + The same type as `df`. + """ + operator = cls.register(map_func, reduce_func) + qc_result = operator( + df._query_compiler, + df[by]._query_compiler, + axis=0, + groupby_kwargs=groupby_kwargs or {}, + agg_args=agg_args or (), + agg_kwargs=agg_kwargs or {}, + drop=True, + ) + + return df.__constructor__(query_compiler=qc_result) diff --git a/modin/core/dataframe/algebra/operator.py b/modin/core/dataframe/algebra/operator.py index cc093e6720b..80458b0a3ce 100644 --- a/modin/core/dataframe/algebra/operator.py +++ b/modin/core/dataframe/algebra/operator.py @@ -60,3 +60,31 @@ def validate_axis(cls, axis: Optional[int]) -> int: Integer representation of given axis. """ return 0 if axis is None else axis + + @classmethod + def apply(cls, df, func, func_args=None, func_kwargs=None, **kwargs): + r""" + Apply a function to a Modin DataFrame using the operators scheme. + + Parameters + ---------- + df : modin.pandas.DataFrame or modin.pandas.Series + DataFrame object to apply the operator against. + func : callable(pandas.DataFrame, \*args, \*\*kwargs) -> pandas.DataFrame + A function to apply. + func_args : tuple, optional + Positional arguments to pass to the `func`. + func_kwargs : dict, optional + Keyword arguments to pass to the `func`. + **kwargs : dict + Aditional arguments to pass to the ``cls.register()``. + + Returns + ------- + return_type + """ + operator = cls.register(func, **kwargs) + qc_result = operator( + df._query_compiler, *(func_args or ()), **(func_kwargs or {}) + ) + return df.__constructor__(query_compiler=qc_result) diff --git a/modin/core/dataframe/algebra/reduce.py b/modin/core/dataframe/algebra/reduce.py index 0f4fbe3667f..583533ce8dd 100644 --- a/modin/core/dataframe/algebra/reduce.py +++ b/modin/core/dataframe/algebra/reduce.py @@ -50,3 +50,28 @@ def caller(query_compiler, *args, **kwargs): ) return caller + + @classmethod + def apply(cls, df, func, axis=0, func_args=None, func_kwargs=None): + r""" + Apply a reduction function to each row/column partition of the dataframe. + + Parameters + ---------- + df : modin.pandas.DataFrame or modin.pandas.Series + DataFrame object to apply the operator against. + func : callable(pandas.DataFrame, \*args, \*\*kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]] + A function to apply. + axis : int, default: 0 + Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``). + func_args : tuple, optional + Positional arguments to pass to the `func`. + func_kwargs : dict, optional + Keyword arguments to pass to the `func`. + + Returns + ------- + modin.pandas.Series + """ + result = super().apply(df, func, func_args, func_kwargs, axis=axis) + return result if result.ndim == 1 else result.squeeze(axis) diff --git a/modin/core/dataframe/algebra/tree_reduce.py b/modin/core/dataframe/algebra/tree_reduce.py index 671faa1ea0a..45ae29c4f6d 100644 --- a/modin/core/dataframe/algebra/tree_reduce.py +++ b/modin/core/dataframe/algebra/tree_reduce.py @@ -54,3 +54,39 @@ def caller(query_compiler, *args, **kwargs): ) return caller + + @classmethod + def apply( + cls, df, map_function, reduce_function, axis=0, func_args=None, func_kwargs=None + ): + r""" + Apply a map-reduce function to the dataframe. + + Parameters + ---------- + df : modin.pandas.DataFrame or modin.pandas.Series + DataFrame object to apply the operator against. + map_function : callable(pandas.DataFrame, \*args, \*\*kwargs) -> pandas.DataFrame + A map function to apply to every partition. + reduce_function : callable(pandas.DataFrame, \*args, \*\*kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]] + A reduction function to apply to the results of the map functions. + axis : int, default: 0 + Whether to apply the reduce function across rows (``axis=0``) or across columns (``axis=1``). + func_args : tuple, optional + Positional arguments to pass to the funcs. + func_kwargs : dict, optional + Keyword arguments to pass to the funcs. + + Returns + ------- + modin.pandas.Series + """ + result = super().apply( + df, + map_function, + func_args, + func_kwargs, + reduce_function=reduce_function, + axis=axis, + ) + return result if result.ndim == 1 else result.squeeze(axis) diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index 7d0c1d17d1b..210ea0dc1b1 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -1062,3 +1062,101 @@ def test_setitem_bool_preserve_dtypes(): # scalar as a col_loc df.loc[indexer, "a"] = 2.0 assert df._query_compiler._modin_frame.has_materialized_dtypes + + +class TestOperatorsApply: + def test_binary(self): + from modin.core.dataframe.algebra import Binary + + md_df1, pd_df1 = create_test_dfs(test_data_values[0]) + md_df2, pd_df2 = md_df1 // 2, pd_df1 // 2 + + def func(df, other, value, axis=None): + return df * other + value + + # DataFrame case + md_res = Binary.apply(md_df1, md_df2, func, func_args=(10,)) + pd_res = func(pd_df1, pd_df2, 10) + df_equals(md_res, pd_res) + + # Series case + md_res = Binary.apply(md_df1, md_df2.iloc[0, :], func, axis=1, func_args=(10,)) + pd_res = func(pd_df1, pd_df2.iloc[0, :], 10) + df_equals(md_res, pd_res) + + def test_fold(self): + from modin.core.dataframe.algebra import Fold + + md_df, pd_df = create_test_dfs(test_data_values[0]) + + def func(df, value, axis): + return (df + value).cumsum(axis) + + md_res = Fold.apply(md_df, func, fold_axis=0, func_args=(10, 0)) + pd_res = func(pd_df, 10, 0) + df_equals(md_res, pd_res) + + md_res = Fold.apply(md_df, func, fold_axis=1, func_args=(10, 1)) + pd_res = func(pd_df, 10, 1) + df_equals(md_res, pd_res) + + def test_groupby(self): + from modin.core.dataframe.algebra import GroupByReduce + + md_df, pd_df = create_test_dfs(test_data_values[0]) + by_col = md_df.columns[0] + + def map_func(grp): + return grp.count() + + def reduce_func(grp): + return grp.sum() + + md_res = GroupByReduce.apply( + md_df, map_func, reduce_func, by=by_col, groupby_kwargs={"as_index": False} + ) + pd_res = pd_df.groupby(by_col, as_index=False).count() + df_equals(md_res, pd_res) + + def test_map(self): + from modin.core.dataframe.algebra import Map + + md_df, pd_df = create_test_dfs(test_data_values[0]) + + def func(df, value): + return df**value + + md_res = Map.apply(md_df, func, func_args=(3,)) + pd_res = func(pd_df, 3) + df_equals(md_res, pd_res) + + def test_reduce(self): + from modin.core.dataframe.algebra import Reduce + + md_df, pd_df = create_test_dfs(test_data_values[0]) + + def func(df, value, axis): + return (df**value).sum(axis) + + md_res = Reduce.apply(md_df, func, func_args=(3, 0)) + pd_res = func(pd_df, 3, 0) + df_equals(md_res, pd_res) + + md_res = Reduce.apply(md_df, func, axis=1, func_args=(3, 1)) + pd_res = func(pd_df, 3, 1) + df_equals(md_res, pd_res) + + def test_tree_reduce(self): + from modin.core.dataframe.algebra import TreeReduce + + md_df, pd_df = create_test_dfs(test_data_values[0]) + + def map_func(df): + return df.count() + + def reduce_func(df): + return df.sum() + + md_res = TreeReduce.apply(md_df, map_func, reduce_func) + pd_res = pd_df.count() + df_equals(md_res, pd_res)