Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support serializing packed tables directly for the normal shuffle path #10818

Draft
wants to merge 8 commits into
base: branch-24.08
Choose a base branch
from

Conversation

firestarman
Copy link
Collaborator

@firestarman firestarman commented May 15, 2024

Contribute to #10790
Fix #10841

This PR is trying to accelerate the normal shuffle path by partitioning and slicing tables on GPU.

The sliced table is already serializable so can be written to the Shuffle output stream directly, along with a lightweight metadata (a TableMeta) to rebuild the table on the Shuffle read side.

On the Shuffle read side, the new introduced PackedTableIterator will read the tables from the Shuffle input stream and rebuild them on GPU by leveraging the existing utils (MetaUtils, GpuCompressedColumnVector). Next, the existing GpuCoalesceBatches node is used to do the batch concatenation for the downstream operators, similar as what Rapids Shuffle does.

It led to some perf degression in NDS runs, so disable this feature by default. But we got about 2x speedup for a customer query (We got this only when setting the executor cores to 2, but it supposed to be 16).

Waiting for more tests ...

Numbers of 3k parquest data on our cluster.

// ==GPU Serde
app-20240517075217-0003,Power Test Time,607000

// ==CPU Serde
app-20240517070754-0000,Power Test Time,585000

@firestarman
Copy link
Collaborator Author

Make it draft because there are still 5 unit tests failing.

@firestarman
Copy link
Collaborator Author

firestarman commented May 16, 2024

WAR the failing tests by disabling the GPU serde, and filed an issue (#10823) to track the follow-up

@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

@abellina abellina self-requested a review May 17, 2024 14:00
Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a quick first pass

@@ -1788,6 +1788,15 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.integerConf
.createWithDefault(20)

val SHUFFLE_GPU_SERDE_ENABLED =
conf("spark.rapids.shuffle.serde.enabled")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets change this to:

spark.rapids.shuffle.serde.type

And there are two types so far: "CPU", "GPU". The way the flag is used is fine we would still convert it to a boolean isGpuSerdeEnabled, but we would test whether spark.rapids.shuffle.serde.type=="GPU".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's internal right now, but as we learn more about this method we need to add documentation that says when to use which. Or come up with smart heuristics that will pick CPU/GPU automatically (so we can add another type.. AUTO)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done for the config name part

@@ -504,6 +504,7 @@ class AdaptiveQueryExecSuite
// disable DemoteBroadcastHashJoin rule from removing BHJ due to empty partitions
.set(SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key, "0")
.set(RapidsConf.TEST_ALLOWED_NONGPU.key, "ShuffleExchangeExec,HashPartitioning")
.set(RapidsConf.SHUFFLE_GPU_SERDE_ENABLED.key, "false")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should remove these overrides setting since disabled is the default.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import org.apache.spark.sql.vectorized.ColumnarBatch

private sealed trait TableSerde {
protected val P_MAGIC_NUM: Int = 0x43554447 // "CUDF".asInt + 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we have our own P_MAGIC_NUM could we not use this to detect whether the data is GpuTableSerde or JCudfSerialized? This should be follow on work but I imagine a case where we might want to use a specific serialization format for columns of a certain type, size, complexity vs the other.

Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, I didn't mean to approve before.

@abellina
Copy link
Collaborator

It led to some perf degression in NDS runs, so disable this feature by default.

Which queries were slower? It would be great to get some feedback from you on what is different between the customer query and the NDS queries.

Also which queries got faster from NDS? That would be interesting.

I did also write internally as I'd like to see more standard configurations used for this benchmark on the next run, so we can compare apples-to-apples with our baseline.

@firestarman
Copy link
Collaborator Author

build

@sameerz
Copy link
Collaborator

sameerz commented May 21, 2024

Please add more context about why the test cases in #10823 are failing before merging this PR. We'd like to understand if that issue needs to be addressed as part of this PR.

@firestarman firestarman changed the base branch from branch-24.06 to branch-24.08 May 27, 2024 01:34
@firestarman
Copy link
Collaborator Author

Please add more context about why the test cases in #10823 are failing before merging this PR. We'd like to understand if that issue needs to be addressed as part of this PR.

Done

@firestarman firestarman marked this pull request as draft May 30, 2024 08:23
@firestarman
Copy link
Collaborator Author

firestarman commented May 30, 2024

Move to draft since the perf is not as good as our expectation. The previous 2x speedup was got only when setting the executor cores to 2, but it supposed to be 16.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Support GPU slicing and compression for the normal Shuffle path
3 participants