Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6301: Simplify usage of algebra operators to define custom functions #6302

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
57 changes: 40 additions & 17 deletions docs/flow/modin/core/dataframe/algebra.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ Uniformly apply a function argument to each partition in parallel.
.. figure:: /img/map_evaluation.svg
:align: center

.. autoclass:: modin.core.dataframe.algebra.map.Map
:members: register, apply

Reduce operator
---------------
Applies an argument function that reduces each column or row on the specified axis into a scalar, but requires knowledge about the whole axis.
Expand All @@ -43,13 +46,19 @@ that the reduce function returns a one dimensional frame.
.. figure:: /img/reduce_evaluation.svg
:align: center

.. autoclass:: modin.core.dataframe.algebra.reduce.Reduce
:members: register, apply

TreeReduce operator
-------------------
Applies an argument function that reduces specified axis into a scalar. First applies map function to each partition
in parallel, then concatenates resulted partitions along the specified axis and applies reduce
function. In contrast with `Map function` template, here you're allowed to change partition shape
in the map phase. Note that the execution engine expects that the reduce function returns a one dimensional frame.

.. autoclass:: modin.core.dataframe.algebra.tree_reduce.TreeReduce
:members: register, apply

Binary operator
---------------
Applies an argument function, that takes exactly two operands (first is always `QueryCompiler`).
Expand All @@ -65,23 +74,32 @@ the right operand to the left.
it automatically but note that this requires repartitioning, which is a much
more expensive operation than the binary function itself.

.. autoclass:: modin.core.dataframe.algebra.binary.Binary
:members: register, apply

Fold operator
-------------
Applies an argument function that requires knowledge of the whole axis. Be aware that providing this knowledge may be
expensive because the execution engine has to concatenate partitions along the specified axis.

.. autoclass:: modin.core.dataframe.algebra.fold.Fold
:members: register, apply

GroupBy operator
----------------
Evaluates GroupBy aggregation for that type of functions that can be executed via TreeReduce approach.
To be able to form groups engine broadcasts ``by`` partitions to each partition of the source frame.

.. autoclass:: modin.core.dataframe.algebra.groupby.GroupByReduce
:members: register, apply

Default-to-pandas operator
--------------------------
Do :doc:`fallback to pandas </supported_apis/defaulting_to_pandas>` for passed function.


How to register your own function
'''''''''''''''''''''''''''''''''
How to use UDFs with these operators
''''''''''''''''''''''''''''''''''''
Let's examine an example of how to use the algebra module to create your own
new functions.

Expand All @@ -95,34 +113,39 @@ Let's implement a function that counts non-NA values for each column or row
TreeReduce approach would be great: in a map phase, we'll count non-NA cells in each
partition in parallel and then just sum its results in the reduce phase.

To define the TreeReduce function that does `count` + `sum` we just need to register the
appropriate functions and then assign the result to the picked `QueryCompiler`
(`PandasQueryCompiler` in our case):
To execute a TreeReduce function that does `count` + `sum` you can simply use the operator's ``.apply(...)``
method that takes and outputs a :py:class:`~modin.pandas.dataframe.DataFrame`:

.. code-block:: python

from modin.core.storage_formats import PandasQueryCompiler
from modin.core.dataframe.algebra import TreeReduce

PandasQueryCompiler.custom_count = TreeReduce.register(pandas.DataFrame.count, pandas.DataFrame.sum)
res_df = TreeReduce.apply(
df,
map_func=lambda df: df.count(),
reduce_function=lambda df: df.sum()
)

Then, we want to handle it from the :py:class:`~modin.pandas.dataframe.DataFrame`, so we need to create a way to do that:
If you're going to use your custom-defined function quite often you may want
to wrap it into a separate function or assign it as a DataFrame's method:

.. code-block:: python

import modin.pandas as pd

def count_func(self, **kwargs):
# The constructor allows you to pass in a query compiler as a keyword argument
return self.__constructor__(query_compiler=self._query_compiler.custom_count(**kwargs))
def count_func(self):
return TreeReduce.apply(
self,
map_func=lambda df: df.count(),
reduce_function=lambda df: df.sum()
)

pd.DataFrame.count_custom = count_func

And then you can use it like you usually would:
# you can then use the function as is
res = count_func(df)

.. code-block:: python

df.count_custom(axis=1)
# or assign it to the DataFrame's class and use it as a method
pd.DataFrame.count_custom = count_func
res = df.count_custom()

Many of the `pandas` API functions can be easily implemented this way, so if you find
out that one of your favorite function is still defaulted to pandas and decide to
Expand Down
46 changes: 45 additions & 1 deletion modin/core/dataframe/algebra/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def register(
"""

def caller(
query_compiler, other, broadcast=False, *args, dtypes=None, **kwargs
query_compiler, other, *args, broadcast=False, dtypes=None, **kwargs
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the order so the function's positional arguments won't conflict with the keyword broadcast arg.

):
"""
Apply binary `func` to passed operands.
Expand Down Expand Up @@ -413,3 +413,47 @@ def caller(
)

return caller

@classmethod
def apply(
cls, left, right, func, axis=0, func_args=None, func_kwargs=None, **kwargs
):
r"""
Apply a binary function row-wise or column-wise.

Parameters
----------
left : modin.pandas.DataFrame or modin.pandas.Series
Left operand.
right : modin.pandas.DataFrame or modin.pandas.Series
Right operand.
Comment on lines +427 to +430
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This layer depends on the upper layer in every apply.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lower layer still takes the object(s) from the API layer, namely, modin.pandas.DataFrame/Series. Then, there is a func, which takes pandas.DataFrame/Series. Also, there is a kwargs argument that needs to be passed to the cls.register(). What is cls.register for the user? Doesn't this look a little complicated for the user? So many things to understand. I am also thinking that we are trying to simplify the things not for the user, but for us ourselves, when we are re-writing a customer workload to get better performance.

func : callable(pandas.DataFrame, pandas.DataFrame, \*args, axis, \*\*kwargs) -> pandas.DataFrame
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the signature is wrong here, as the implementation explicitly passes Query Compilers as arguments...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, func here is a kernel that will be applied to deserialized partitions (pandas dataframes) so the signature is correct

follow the track of the func:

  1. register(func)
  2. register(func) -> modin_dataframe.apply_full_axis(func, ...)

A binary function to apply `left` and `right`.
axis : int, default: 0
Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``).
func_args : tuple, optional
Positional arguments to pass to the funcs.
func_kwargs : dict, optional
Keyword arguments to pass to the funcs.
**kwargs : dict
Additional arguments to pass to the ``cls.register()``.

Returns
-------
The same type as `df`.
"""
from modin.pandas import Series
Copy link
Collaborator

@YarShev YarShev Jun 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like that inner layers depend on upper layers. I do not see any benefit of introducing these changes rather than it will simplify registriation of a UDF for users, which doesn't happen very often from my point of view. I would like to bring more attention to these changes to decide whether we want these changes to be merged or not.

cc @modin-project/modin-core

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @YarShev. I'd rather avoid the dependency on a higher layer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can make the .apply() method a separate function and place it somewhere at modin.pandas.utils, the function would look something like:

# modin/pandas/utils.py
def apply_operator(df : modin.pandas.DataFrame, operator_cls : type[Operator], func, *args, **kwargs):
    res_qc = operator_cls.register(func)(df._query_compiler, *args, **kwargs)
    return type(df)(query_compiler=res_qc)

# the usage then would be:

from modin.pandas.utils import apply_operator
from modin.core.dataframe.algebra import Reduce

res_df = apply_operator(df, Reduce, func=reduce_func, axis=1)

One of the problem I see here is that managing operators-dependent behavior can be quite a pain here, as we can no longer use OOP mechanisms to align with operator-specific logic using inheritance and overriding:

# modin/pandas/utils.py

def _apply_reduce_op(...):
   ...

def _apply_groupby_op(...):
   ...
...

_operators_dict = {
    Reduce: _apply_reduce_op,
    GroupbyReduce: _apply_groupby_op,
    ...
}

def apply_operator(df : modin.pandas.DataFrame, operator_cls : type[Operator], func, *args, **kwargs):
    return _operators_dict[operator_cls](df, func, *args, **kwargs)

Do you have any other suggestion on how to improve this approach (or maybe you have another approach in mind)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dchigarev to me the current approach for using the operators doesn't look very verbose to me. will this PR make it much easier to use the operators anywhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this PR make it much easier to use the operators anywhere

The PR should make the usage of operators much easier for end-users of modin who would like to define their own distributed functions using modin's operators.

While optimizing customer workloads for modin we sometimes see places that would perform much better if rewritten from pandas API using modin's operators, however the present API the operators provide causes a lot of complex code written around that customers struggle to understand. That inspired us to create a simple method/function that makes operator's usage as simple as calling one single function.

Copy link
Collaborator Author

@dchigarev dchigarev Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like that inner layers depend on upper layers.

@YarShev @mvashishtha

So I made two versions of how we can eliminate this dependency.

  1. Avoid importing objects from modin.pandas... to the algebra level, but still allow passing such objects to the Operator.apply(). This way we're getting rid of the 'import dependency' on the higher level, meaning that we can easily detach the algebra layer if needed without worrying that it would require to bear stuff from the higher levels for the algebra to work correctly.

    I've made the changes to align with this approach and pushed it to the branch from this PR.

  2. Also avoid passing dataframe objects to the Operator.apply() and rework this method to accept query compilers only. Then add a helper function somewhere to the dataframe level that would take series/dataframes, extract their QCs, and pass to them the Operator.apply().

    I've implemented this approach in a separate branch. There, users have a function at modin.pandas.utils.apply_operator with the following signature:

    def apply_operator(operator_cls, *args, **kwargs):
        pass
    ...
    # use case example
    from modin.pandas.utils import apply_operator
    from modin.core.dataframe.algebra import Reduce
    
    series_obj = apply_operator(Reduce, df, reduce_func, axis=1)

    I don't really like this approach as apply_operator() doesn't provide meaningful signature and requires referring to the Operator.apply() for the list of allowed parameters.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dchigarev I wonder if you could provide an example of user-defined Modin operator (ideally a real case, even if simplified and anonymized)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you could provide an example of user-defined Modin operator

@vnlitvinov

What we usually use the user-defined operators for is to emulate lazy execution for those types of transformations that can't be written into a one or two pandas API calls (usually such transformations are performed in a for-loop).

As an example, consider that we have a dataframe with multiple string columns, and then we want to combine those columns into a single one using the specified separator. Surprisingly, but the fastest way to do so using vanilla pandas is simply writing a for-loop:

combining multiple string columns using different approaches in pandas:

for-loop: 17.533942513167858
df.apply(join): 21.597900850698352
df.str.cat(): 38.55254930164665
pandas code
import pandas as pd
import numpy as np
from timeit import default_timer as timer

NCOLS = 16
NROWS = 5_000_000

df = pd.DataFrame({f"col{i}": [f"col{i}-{j}" for j in range(NROWS)] for i in range(NCOLS)})

t1 = timer()
res = df.iloc[:, 0]
for col in df.columns[1:]:
    res += "_" + df[col]
print(f"for-loop: {timer() - t1}")

t1 = timer()
res = df.apply(lambda row: "_".join(row), axis=1)
print(f"df.apply(join): {timer() - t1}")

t1 = timer()
res = df.iloc[:, 0].str.cat(df.iloc[:, 1:], sep="_")
print(f"df.str.cat(): {timer() - t1}")

Then when adapting this code to modin, it appears that the for-loop approach works very slow due to a lot of kernels being submitted to Ray and so causing it to overwhelm (each iteration of the for-loop will result into 3 separate kernels: 1. df[col]; 2. "_" + df[col]; 3. res +=). And then it appears, that the most performant approach is to submit this for-loop as a single kernel using the Reduction operator:

combining multiple string columns using different approaches in modin:

reduction operator: 2.6848975336179137
batch pipeline API: 2.945119895040989
for-loop: 36.92861177679151
df.apply(join): 8.54124379903078
df.str.cat(): 43.84469765238464
modin code
import modin.pandas as pd
import modin.config as cfg
import numpy as np
from timeit import default_timer as timer

cfg.BenchmarkMode.put(True)
# start all the workers
pd.DataFrame([np.arange(cfg.MinPartitionSize.get()) for _ in range(cfg.NPartitions.get() ** 2)]).to_numpy()

NCOLS = 16
NROWS = 5_000_000

df = pd.DataFrame({f"col{i}": [f"col{i}-{j}" for j in range(NROWS)] for i in range(NCOLS)})

from modin.core.dataframe.algebra import Reduce

def reduction(df):
    res = df.iloc[:, 0]
    for col in df.columns[1:]:
        res += "_" + df[col]
    return res

t1 = timer()
res = Reduce.apply(df, reduction, axis=1)
print(f"reduction operator: {timer() - t1}")

from modin.experimental.batch import PandasQueryPipeline

t1 = timer()
pipeline = PandasQueryPipeline(df)
pipeline.add_query(reduction, is_output=True)
res = pipeline.compute_batch()
print(f"batch pipeline API: {timer() - t1}")

t1 = timer()
res = df.iloc[:, 0]
for col in df.columns[1:]:
    res += "_" + df[col]
print(f"for-loop: {timer() - t1}")

t1 = timer()
res = df.apply(lambda row: "_".join(row), axis=1)
print(f"df.apply(join): {timer() - t1}")

t1 = timer()
res = df.iloc[:, 0].str.cat(df.iloc[:, 1:], sep="_")
print(f"df.str.cat(): {timer() - t1}")

(as I was writing this comment, I found out about the batch API in modin that is supposed to serve exactly the same purpose of "emulating" the lazy execution. However, it seems that it doesn't provide a way to specify the scheme on how the kernels actually should be submitted (map, row-wise, column-wise, ...) and also have some slight overhead when comparing with the pure user-defined operator's approach)


operator = cls.register(func, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what is the purpose of .register for a one-off thing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, that's the only way of how we can get the caller function


func_args = tuple() if func_args is None else func_args
func_kwargs = dict() if func_kwargs is None else func_kwargs
qc_result = operator(
left._query_compiler,
right._query_compiler,
broadcast=isinstance(right, Series),
*func_args,
axis=axis,
**func_kwargs,
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
)
return type(left)(query_compiler=qc_result)
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
29 changes: 29 additions & 0 deletions modin/core/dataframe/algebra/fold.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,32 @@ def caller(query_compiler, fold_axis=None, *args, **kwargs):
)

return caller

@classmethod
def apply(cls, df, func, fold_axis=0, func_args=None, func_kwargs=None):
r"""
Apply a Fold (full-axis) function to the dataframe.

Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
func : callable(pandas.DataFrame[NxM], \*args, \*\*kwargs) -> pandas.DataFrame[NxM]
A function to apply to every partition. Note that the function shouldn't change
the shape of the dataframe.
fold_axis : int, default: 0
Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``).
func_args : tuple, optional
Positional arguments to pass to the funcs.
func_kwargs : dict, optional
Keyword arguments to pass to the funcs.

Returns
-------
the same type as `df`
"""
func_args = tuple() if func_args is None else func_args
func_kwargs = dict() if func_kwargs is None else func_kwargs
func_args = (fold_axis,) + func_args

return super().apply(df, func, func_args, func_kwargs)
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
52 changes: 52 additions & 0 deletions modin/core/dataframe/algebra/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,3 +736,55 @@ def wrapper(df):
return result

return _map, _reduce

@classmethod
def apply(
cls,
df,
map_func,
reduce_func,
by,
groupby_kwargs=None,
agg_args=None,
agg_kwargs=None,
):
"""
Apply groupby aggregation function using map-reduce pattern.

Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
A source DataFrame to group.
map_func : callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame
A map function to apply to a groupby object in every partition.
reduce_func : callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame
A reduction function to apply to the results of the map functions.
by : label or list of labels
Columns of the `df` to group on.
groupby_kwargs : dict, optional
Keyword arguments matching the signature of ``pandas.DataFrame.groupby``.
agg_args : tuple, optional
Positional arguments to pass to the funcs.
agg_kwargs : dict, optional
Keyword arguments to pass to the funcs.

Returns
-------
The same type as `df`.
"""
agg_args = tuple() if agg_args is None else agg_args
agg_kwargs = dict() if agg_kwargs is None else agg_kwargs
groupby_kwargs = dict() if groupby_kwargs is None else groupby_kwargs

operator = cls.register(map_func, reduce_func)
qc_result = operator(
df._query_compiler,
df[by]._query_compiler,
axis=0,
groupby_kwargs=groupby_kwargs,
agg_args=agg_args,
agg_kwargs=agg_kwargs,
drop=True,
)

return type(df)(query_compiler=qc_result)
39 changes: 39 additions & 0 deletions modin/core/dataframe/algebra/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,42 @@ def validate_axis(cls, axis: Optional[int]) -> int:
Integer representation of given axis.
"""
return 0 if axis is None else axis

@classmethod
def apply(
cls, df, func, func_args=None, func_kwargs=None, _return_type=None, **kwargs
):
r"""
Apply a function to a Modin DataFrame using the operators scheme.

Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
func : callable(pandas.DataFrame, \*args, \*\*kwargs) -> pandas.DataFrame
A function to apply.
func_args : tuple, optional
Positional arguments to pass to the `func`.
func_kwargs : dict, optional
Keyword arguments to pass to the `func`.
_return_type : type, optional
A class that takes the ``query_compiler`` keyword argument. If not specified
will be identical to the type of the passed `df`.
**kwargs : dict
Aditional arguments to pass to the ``cls.register()``.

Returns
-------
return_type
"""
operator = cls.register(func, **kwargs)

func_args = tuple() if func_args is None else func_args
func_kwargs = dict() if func_kwargs is None else func_kwargs

qc_result = operator(df._query_compiler, *func_args, **func_kwargs)

if _return_type is None:
_return_type = type(df)

return _return_type(query_compiler=qc_result)
28 changes: 28 additions & 0 deletions modin/core/dataframe/algebra/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,31 @@ def caller(query_compiler, *args, **kwargs):
)

return caller

@classmethod
def apply(cls, df, func, axis=0, func_args=None, func_kwargs=None):
r"""
Apply a reduction function to each row/column partition of the dataframe.

Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
func : callable(pandas.DataFrame, \*args, \*\*kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]]
A function to apply.
axis : int, default: 0
Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``).
func_args : tuple, optional
Positional arguments to pass to the `func`.
func_kwargs : dict, optional
Keyword arguments to pass to the `func`.

Returns
-------
modin.pandas.Series
"""
from modin.pandas import Series

return super().apply(
df, func, func_args, func_kwargs, axis=axis, _return_type=Series
)
38 changes: 38 additions & 0 deletions modin/core/dataframe/algebra/tree_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,41 @@ def caller(query_compiler, *args, **kwargs):
)

return caller

@classmethod
def apply(
cls, df, map_function, reduce_function, axis=0, func_args=None, func_kwargs=None
):
r"""
Apply a map-reduce function to the dataframe.

Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
map_function : callable(pandas.DataFrame, \*args, \*\*kwargs) -> pandas.DataFrame
A map function to apply to every partition.
reduce_function : callable(pandas.DataFrame, \*args, \*\*kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]]
A reduction function to apply to the results of the map functions.
axis : int, default: 0
Whether to apply the reduce function across rows (``axis=0``) or across columns (``axis=1``).
func_args : tuple, optional
Positional arguments to pass to the funcs.
func_kwargs : dict, optional
Keyword arguments to pass to the funcs.

Returns
-------
modin.pandas.Series
"""
from modin.pandas import Series

return super().apply(
df,
map_function,
func_args,
func_kwargs,
reduce_function=reduce_function,
axis=axis,
_return_type=Series,
)