Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#4494: Get partition widths/lengths in parallel instead of serially #4683

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

noloerino
Copy link
Collaborator

@noloerino noloerino commented Jul 18, 2022

What do these changes do?

Computes widths and lengths of block partitions in parallel as batched calls to ray.get/DaskWrapper.materialize rather than in serial.

This adds the try_build_[length|width]_cache and try_set_[length|width]_cache methods to block partitions; the former returns a promise/future for computing the partition's length, and the latter should be called by the partition manager to inform the block partition of the computation's value. This also adds the _update_partition_dimension_caches to the PartitionManager class, which will call the length/width futures returned by its constituent partitions.

  • commit message follows format outlined here
  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves PERF: get all partition widths/lengths in parallel instead of serially. #4494
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date
  • added (Issue Number: PR title (PR Number)) and github username to release notes for next major release

@noloerino noloerino requested a review from a team as a code owner July 18, 2022 21:36
@noloerino noloerino marked this pull request as draft July 18, 2022 21:36
@codecov
Copy link

codecov bot commented Jul 19, 2022

Codecov Report

Merging #4683 (490778c) into master (8e1190c) will decrease coverage by 13.12%.
The diff coverage is 67.93%.

@@             Coverage Diff             @@
##           master    #4683       +/-   ##
===========================================
- Coverage   85.28%   72.15%   -13.13%     
===========================================
  Files         259      259               
  Lines       19378    19496      +118     
===========================================
- Hits        16527    14068     -2459     
- Misses       2851     5428     +2577     
Impacted Files Coverage Δ
...s/pandas_on_dask/partitioning/virtual_partition.py 62.99% <0.00%> (-23.74%) ⬇️
...ns/pandas_on_ray/partitioning/virtual_partition.py 71.66% <6.66%> (-16.07%) ⬇️
...lementations/pandas_on_dask/dataframe/dataframe.py 80.76% <25.00%> (-15.07%) ⬇️
...dataframe/pandas/partitioning/partition_manager.py 75.67% <75.00%> (-10.79%) ⬇️
...entations/pandas_on_dask/partitioning/partition.py 79.77% <81.81%> (-9.25%) ⬇️
...plementations/pandas_on_ray/dataframe/dataframe.py 84.44% <82.50%> (-15.56%) ⬇️
modin/core/dataframe/pandas/dataframe/dataframe.py 71.44% <100.00%> (-22.89%) ⬇️
...in/core/dataframe/pandas/partitioning/partition.py 100.00% <100.00%> (ø)
...mentations/pandas_on_ray/partitioning/partition.py 91.66% <100.00%> (+0.51%) ⬆️
...ns/pandas_on_ray/partitioning/partition_manager.py 83.50% <100.00%> (+2.68%) ⬆️
... and 84 more

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

@@ -377,6 +377,20 @@ def length(self):
"""
if self._length_cache is None:
if self.axis == 0:
caches = [
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is duplicated from the PartitionManager classes above, but I'm not sure how to access the correct partition manager from here.

@noloerino noloerino marked this pull request as ready for review July 20, 2022 21:24
@pyrito
Copy link
Collaborator

pyrito commented Jul 21, 2022

Haven't taken a closer look at the implementation details, but do you have any benchmarks or performance measurements to compare with master?

@noloerino
Copy link
Collaborator Author

noloerino commented Jul 21, 2022 via email

@mvashishtha
Copy link
Collaborator

mvashishtha commented Jul 21, 2022

@noloerino @pyrito

Sadly no, and I’d appreciate some suggestions on what code to run.

I spent a while today trying to get a script that showcases the performance here without breaking anything in Modin, but I failed. Getting a reproducer is hard for a few reasons.

For one thing, this optimization is only useful for unusual cases like in #4493 where the partitions' call queues include costly operations. When there is no call queue, the partitions will execute all dataframe functions eagerly, simultaneously calculating shapes. The call queues are generally meant to carry cheap operations like transpose and reindexing, but the reproducer in that issue has a frame that is very expensive to serialize, so that even the transpose was expensive. There the slow code was in _copartition, which unnecessarily calculated the widths of the base frame. #4495 fixed that unnecessary recalculation, so that script no longer works. Also, every PandasDataFrame computes all the lengths when it filters empty subframes as soon as it's constructed here, so any Modin dataframe at rest already knows its partition shapes.

Looking at all the serial shape computations I listed here, most are in internal length computations. One is _copartition, and I spent a while trying to get around the cache fix in #4495 with a pair of frames that really needed copartitioning, but in that case the map_axis_partitions in _copartition triggers parallel computation. The last type of length computation is in apply_func_to_indices_both_axis, which as far as I can tell is only used in melt. We could try engineering an example that bypasses the cache for melt, but I don't think it's worth the time...

I think it's good practice to get multiple ray objects in parallel (see also this note about a similar improvement in _to_pandas). Also, if our caches fail for any reason later on, we can have faster length computation as a backup.

@vnlitvinov
Copy link
Collaborator

This adds a certain bit of complexity (judging by the number of lines change, haven't looked at the diff yet), and I haven't yet seen any performance proof for that. I would like to see some measurements before increasing our (already huge) codebase...

Copy link
Collaborator

@RehanSD RehanSD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, but great work!

@@ -1214,6 +1233,19 @@ def apply_func_to_indices_both_axis(
if col_widths is None:
col_widths = [None] * len(col_partitions_list)

if row_lengths is None and col_widths is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to compute dimensions here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The length and width values of each partition are accessed in the local compute_part_size, defined immediately below. The double for loop structure where compute_part_size is called makes it hard to parallelize the computation of these dimensions, so I thought it would be simplest to precompute the relevant dimensions before the loop.

@@ -273,13 +274,42 @@ def length(self):
int
The length of the object.
"""
self.try_build_length_cache()
return self._length_cache
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to unwrap _length_cache here, since its type will be PandasDataframePartition

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by unwrap? Also, as far as I can tell, the logic for this method should be the same as it originally was (the code was just moved into the try_build_length_cache, so does this mean the original code returned PandasDataframePartition as well?

for i, cache in enumerate(caches):
if isinstance(cache, Future):
self.list_of_partitions_to_combine[i].try_set_length_cache(
new_lengths[dask_idx]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this just be i as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since new_lengths may have fewer elements than caches in the case where some length values were already computed (and are filtered out by the isinstance(cache, Future) check). The value computed at new_lengths[dask_idx] should correspond to the promise at caches[i].

@noloerino
Copy link
Collaborator Author

@vnlitvinov that makes sense, I'll look into coming up with concrete benchmarks.

@vnlitvinov
Copy link
Collaborator

vnlitvinov commented Jul 27, 2022

@pyrito please have a look at https://github.com/vnlitvinov/modin/tree/speedup-masking and #4726, it might be doing somewhat the same in terms of getting the sizes in parallel

@YarShev
Copy link
Collaborator

YarShev commented Jul 27, 2022

Related discussion on handling metadata (index and columns) in #3673.

@noloerino noloerino force-pushed the parallel-dims branch 2 times, most recently from 6a17fc3 to e0bb5fa Compare August 8, 2022 23:52
Signed-off-by: Jonathan Shi <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

PERF: get all partition widths/lengths in parallel instead of serially.
7 participants