Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6301: Simplify usage of algebra operators to define custom functions #6302

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

dchigarev
Copy link
Collaborator

@dchigarev dchigarev commented Jun 27, 2023

What do these changes do?

This PR introduces the Operator.apply(...) method that takes and outputs modin.pandas.DataFrame, and is able to apply the specified function using the operator's scheme directly to high-level dataframes.

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Simplify the process of using UDFs algebra operators #6301
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
@@ -297,7 +297,7 @@ def register(
"""

def caller(
query_compiler, other, broadcast=False, *args, dtypes=None, **kwargs
query_compiler, other, *args, broadcast=False, dtypes=None, **kwargs
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the order so the function's positional arguments won't conflict with the keyword broadcast arg.

@dchigarev dchigarev marked this pull request as ready for review June 27, 2023 23:25
@dchigarev dchigarev requested a review from a team as a code owner June 27, 2023 23:25
-------
The same type as `df`.
"""
from modin.pandas import Series
Copy link
Collaborator

@YarShev YarShev Jun 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like that inner layers depend on upper layers. I do not see any benefit of introducing these changes rather than it will simplify registriation of a UDF for users, which doesn't happen very often from my point of view. I would like to bring more attention to these changes to decide whether we want these changes to be merged or not.

cc @modin-project/modin-core

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @YarShev. I'd rather avoid the dependency on a higher layer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can make the .apply() method a separate function and place it somewhere at modin.pandas.utils, the function would look something like:

# modin/pandas/utils.py
def apply_operator(df : modin.pandas.DataFrame, operator_cls : type[Operator], func, *args, **kwargs):
    res_qc = operator_cls.register(func)(df._query_compiler, *args, **kwargs)
    return type(df)(query_compiler=res_qc)

# the usage then would be:

from modin.pandas.utils import apply_operator
from modin.core.dataframe.algebra import Reduce

res_df = apply_operator(df, Reduce, func=reduce_func, axis=1)

One of the problem I see here is that managing operators-dependent behavior can be quite a pain here, as we can no longer use OOP mechanisms to align with operator-specific logic using inheritance and overriding:

# modin/pandas/utils.py

def _apply_reduce_op(...):
   ...

def _apply_groupby_op(...):
   ...
...

_operators_dict = {
    Reduce: _apply_reduce_op,
    GroupbyReduce: _apply_groupby_op,
    ...
}

def apply_operator(df : modin.pandas.DataFrame, operator_cls : type[Operator], func, *args, **kwargs):
    return _operators_dict[operator_cls](df, func, *args, **kwargs)

Do you have any other suggestion on how to improve this approach (or maybe you have another approach in mind)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dchigarev to me the current approach for using the operators doesn't look very verbose to me. will this PR make it much easier to use the operators anywhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this PR make it much easier to use the operators anywhere

The PR should make the usage of operators much easier for end-users of modin who would like to define their own distributed functions using modin's operators.

While optimizing customer workloads for modin we sometimes see places that would perform much better if rewritten from pandas API using modin's operators, however the present API the operators provide causes a lot of complex code written around that customers struggle to understand. That inspired us to create a simple method/function that makes operator's usage as simple as calling one single function.

Copy link
Collaborator Author

@dchigarev dchigarev Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like that inner layers depend on upper layers.

@YarShev @mvashishtha

So I made two versions of how we can eliminate this dependency.

  1. Avoid importing objects from modin.pandas... to the algebra level, but still allow passing such objects to the Operator.apply(). This way we're getting rid of the 'import dependency' on the higher level, meaning that we can easily detach the algebra layer if needed without worrying that it would require to bear stuff from the higher levels for the algebra to work correctly.

    I've made the changes to align with this approach and pushed it to the branch from this PR.

  2. Also avoid passing dataframe objects to the Operator.apply() and rework this method to accept query compilers only. Then add a helper function somewhere to the dataframe level that would take series/dataframes, extract their QCs, and pass to them the Operator.apply().

    I've implemented this approach in a separate branch. There, users have a function at modin.pandas.utils.apply_operator with the following signature:

    def apply_operator(operator_cls, *args, **kwargs):
        pass
    ...
    # use case example
    from modin.pandas.utils import apply_operator
    from modin.core.dataframe.algebra import Reduce
    
    series_obj = apply_operator(Reduce, df, reduce_func, axis=1)

    I don't really like this approach as apply_operator() doesn't provide meaningful signature and requires referring to the Operator.apply() for the list of allowed parameters.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dchigarev I wonder if you could provide an example of user-defined Modin operator (ideally a real case, even if simplified and anonymized)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you could provide an example of user-defined Modin operator

@vnlitvinov

What we usually use the user-defined operators for is to emulate lazy execution for those types of transformations that can't be written into a one or two pandas API calls (usually such transformations are performed in a for-loop).

As an example, consider that we have a dataframe with multiple string columns, and then we want to combine those columns into a single one using the specified separator. Surprisingly, but the fastest way to do so using vanilla pandas is simply writing a for-loop:

combining multiple string columns using different approaches in pandas:

for-loop: 17.533942513167858
df.apply(join): 21.597900850698352
df.str.cat(): 38.55254930164665
pandas code
import pandas as pd
import numpy as np
from timeit import default_timer as timer

NCOLS = 16
NROWS = 5_000_000

df = pd.DataFrame({f"col{i}": [f"col{i}-{j}" for j in range(NROWS)] for i in range(NCOLS)})

t1 = timer()
res = df.iloc[:, 0]
for col in df.columns[1:]:
    res += "_" + df[col]
print(f"for-loop: {timer() - t1}")

t1 = timer()
res = df.apply(lambda row: "_".join(row), axis=1)
print(f"df.apply(join): {timer() - t1}")

t1 = timer()
res = df.iloc[:, 0].str.cat(df.iloc[:, 1:], sep="_")
print(f"df.str.cat(): {timer() - t1}")

Then when adapting this code to modin, it appears that the for-loop approach works very slow due to a lot of kernels being submitted to Ray and so causing it to overwhelm (each iteration of the for-loop will result into 3 separate kernels: 1. df[col]; 2. "_" + df[col]; 3. res +=). And then it appears, that the most performant approach is to submit this for-loop as a single kernel using the Reduction operator:

combining multiple string columns using different approaches in modin:

reduction operator: 2.6848975336179137
batch pipeline API: 2.945119895040989
for-loop: 36.92861177679151
df.apply(join): 8.54124379903078
df.str.cat(): 43.84469765238464
modin code
import modin.pandas as pd
import modin.config as cfg
import numpy as np
from timeit import default_timer as timer

cfg.BenchmarkMode.put(True)
# start all the workers
pd.DataFrame([np.arange(cfg.MinPartitionSize.get()) for _ in range(cfg.NPartitions.get() ** 2)]).to_numpy()

NCOLS = 16
NROWS = 5_000_000

df = pd.DataFrame({f"col{i}": [f"col{i}-{j}" for j in range(NROWS)] for i in range(NCOLS)})

from modin.core.dataframe.algebra import Reduce

def reduction(df):
    res = df.iloc[:, 0]
    for col in df.columns[1:]:
        res += "_" + df[col]
    return res

t1 = timer()
res = Reduce.apply(df, reduction, axis=1)
print(f"reduction operator: {timer() - t1}")

from modin.experimental.batch import PandasQueryPipeline

t1 = timer()
pipeline = PandasQueryPipeline(df)
pipeline.add_query(reduction, is_output=True)
res = pipeline.compute_batch()
print(f"batch pipeline API: {timer() - t1}")

t1 = timer()
res = df.iloc[:, 0]
for col in df.columns[1:]:
    res += "_" + df[col]
print(f"for-loop: {timer() - t1}")

t1 = timer()
res = df.apply(lambda row: "_".join(row), axis=1)
print(f"df.apply(join): {timer() - t1}")

t1 = timer()
res = df.iloc[:, 0].str.cat(df.iloc[:, 1:], sep="_")
print(f"df.str.cat(): {timer() - t1}")

(as I was writing this comment, I found out about the batch API in modin that is supposed to serve exactly the same purpose of "emulating" the lazy execution. However, it seems that it doesn't provide a way to specify the scheme on how the kernels actually should be submitted (map, row-wise, column-wise, ...) and also have some slight overhead when comparing with the pure user-defined operator's approach)

Comment on lines +426 to +429
left : modin.pandas.DataFrame or modin.pandas.Series
Left operand.
right : modin.pandas.DataFrame or modin.pandas.Series
Right operand.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This layer depends on the upper layer in every apply.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lower layer still takes the object(s) from the API layer, namely, modin.pandas.DataFrame/Series. Then, there is a func, which takes pandas.DataFrame/Series. Also, there is a kwargs argument that needs to be passed to the cls.register(). What is cls.register for the user? Doesn't this look a little complicated for the user? So many things to understand. I am also thinking that we are trying to simplify the things not for the user, but for us ourselves, when we are re-writing a customer workload to get better performance.

Left operand.
right : modin.pandas.DataFrame or modin.pandas.Series
Right operand.
func : callable(pandas.DataFrame, pandas.DataFrame, \*args, axis, \*\*kwargs) -> pandas.DataFrame
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the signature is wrong here, as the implementation explicitly passes Query Compilers as arguments...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, func here is a kernel that will be applied to deserialized partitions (pandas dataframes) so the signature is correct

follow the track of the func:

  1. register(func)
  2. register(func) -> modin_dataframe.apply_full_axis(func, ...)

-------
The same type as `df`.
"""
operator = cls.register(func, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what is the purpose of .register for a one-off thing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, that's the only way of how we can get the caller function

modin/core/dataframe/algebra/binary.py Outdated Show resolved Hide resolved
modin/core/dataframe/algebra/fold.py Outdated Show resolved Hide resolved
modin/core/dataframe/algebra/binary.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@vnlitvinov vnlitvinov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dchigarev so this is a way to implement some user-defined functions, right? I wonder if we can actually try to take the leaf out of pandas/numpy book the way they support UDF-s...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Simplify the process of using UDFs algebra operators
5 participants