Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6990: Implement lazy execution for the Ray virtual partitions. #6991

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

AndreyPavlenko
Copy link
Collaborator

What do these changes do?

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Implement lazy execution for the Ray virtual partitions. #6990
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

@AndreyPavlenko AndreyPavlenko force-pushed the issue-6990 branch 2 times, most recently from bf2943d to ea540cc Compare March 1, 2024 20:38
@AndreyPavlenko AndreyPavlenko force-pushed the issue-6990 branch 13 times, most recently from 128509d to 8ce0b34 Compare March 20, 2024 20:17
return False

@remote_function
def _remote_fn(obj, index): # pragma: no cover

Check notice

Code scanning / CodeQL

First parameter of a method is not named 'self' Note

Normal methods should have 'self', rather than 'obj', as their first parameter.
axis = 0

@remote_function
def _remote_concat(dfs): # pragma: no cover # noqa: GL08

Check notice

Code scanning / CodeQL

First parameter of a method is not named 'self' Note

Normal methods should have 'self', rather than 'dfs', as their first parameter.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that the concat works as intended, given the message about the naming of the function arguments?

axis = 1

@remote_function
def _remote_concat(dfs): # pragma: no cover # noqa: GL08

Check notice

Code scanning / CodeQL

First parameter of a method is not named 'self' Note

Normal methods should have 'self', rather than 'dfs', as their first parameter.
Copy link
Collaborator

@anmyachev anmyachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Judging by the annotations, we need to write a lot more tests to cover most of the changes.

from .utils import initialize_ray

__all__ = [
"initialize_ray",
"RayWrapper",
"MaterializationHook",
"SignalActor",
"RayObjectRefTypes",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This item has been deleted

@@ -214,7 +214,7 @@ def wait(cls, obj_ids, num_returns=None):
num_returns : int, optional
"""
if not isinstance(obj_ids, Sequence):
obj_ids = list(obj_ids)
obj_ids = list(obj_ids) if isinstance(obj_ids, Iterable) else [obj_ids]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be deleted.

@@ -868,7 +868,7 @@ class LazyExecution(EnvironmentVariable, type=str):
"""

varname = "MODIN_LAZY_EXECUTION"
choices = ("Auto", "On", "Off")
choices = ("Auto", "On", "Off", "Axis")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why introduce a new mode?

Comment on lines +456 to +459
try:
ref = ray.get(ref, timeout=0)
except ray.exceptions.GetTimeoutError:
return False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an object has been calculated and placed in distributed storage, will materialization occur here?

If this approach can be effective, then it is worth considering the possibility of using it in other places.

@@ -419,7 +424,7 @@ def eager_exec(self, func, *args, length=None, width=None, **kwargs):
LazyExecution.subscribe(_configure_lazy_exec)


class SlicerHook(MaterializationHook):
class SlicerHook(MaterializationHook, DeferredExecution):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the idea behind this change?

from .partition import PandasOnRayDataframePartition


class PandasOnRayDataframeVirtualPartition(BaseDataframeAxisPartition):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not such inheritance?

Suggested change
class PandasOnRayDataframeVirtualPartition(BaseDataframeAxisPartition):
class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition):

@@ -42,6 +54,82 @@ class PandasOnRayDataframePartitionManager(GenericRayDataframePartitionManager):
_execution_wrapper = RayWrapper
materialize_futures = RayWrapper.materialize

if LazyExecution.get() in ("On", "Axis"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether to use this function or not is determined during the first import without the possibility of further replacement. As far as I remember, in all other places, functions are defined on each call.


@classmethod
@_inherit_docstrings(GenericRayDataframePartitionManager.get_indices)
def get_indices(cls, axis, partitions, index_func=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried making lazy changes to the already existing get_indices? (without overriding)

Comment on lines +36 to +38
PandasOnRayDataframeColumnPartition,
PandasOnRayDataframeRowPartition,
PandasOnRayDataframeVirtualPartition,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried making changes to existing classes?

axis = 0

@remote_function
def _remote_concat(dfs): # pragma: no cover # noqa: GL08
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that the concat works as intended, given the message about the naming of the function arguments?

@@ -29,13 +29,20 @@

import pandas
import ray
import ray.exceptions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that a lot of the changes in this file are not directly affected by this pull request and therefore it would be great to move them into a separate pull request.

@@ -12,24 +12,36 @@
# governing permissions and limitations under the License.

"""Module houses class that implements ``GenericRayDataframePartitionManager`` using Ray."""
import math
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import math
import math

PandasOnRayDataframeRowPartition,
)

if LazyExecution.get() in ("On", "Axis"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic should probably be placed in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/init.py.

# governing permissions and limitations under the License.

"""Module houses classes responsible for storing a virtual partition and applying a function to it."""
import math
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import math
import math

"""

partition_type = PandasOnRayDataframePartition
instance_type = ray.ObjectRef
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
instance_type = ray.ObjectRef

@anmyachev, can this be removed?

list of lengths or None
Estimated chunk lengths, that could be different form the real ones.
bool
Whether the specified partitions represent the full block or just the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate a little on this?

manual_partition=False,
**kwargs,
) -> Union[List[PandasOnRayDataframePartition], PandasOnRayDataframePartition]:
if not manual_partition:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this parameter have effect only in case of False? Should we copy the related logic from the base class?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement lazy execution for the Ray virtual partitions.
3 participants