Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LOcal REplay framework: dump and replay GpuProjectExec runtime [databricks] #10825

Open
wants to merge 1 commit into
base: branch-24.08
Choose a base branch
from

Conversation

res-life
Copy link
Collaborator

@res-life res-life commented May 16, 2024

Closes #10862
Collecting nsys/ncu files are time-consuming when running customer data because usually customer data is huge.
Actually we only need a small data segment which is running in a JNI/cuDF kernel.
This PR aims to dump/replay the runtime(data and meta) when Project Exec(s) execution time on a batch exceeds the threshold time.

This is a feature to dump and replay Project Exec runtime (by column batch) for performance purpose debug.

  • Project exec
    store/restore GpuTieredProject case class
    store/restore ColumnarBatch data
    replay GpuProejctExec

@res-life res-life changed the title [Do not review] [WIP] Add store and replay exec env feature [Do not review] [WIP] Add store and replay exec runtime feature May 16, 2024
@res-life res-life force-pushed the replay-exec branch 3 times, most recently from d704928 to 2702c0a Compare May 17, 2024 12:36
@res-life res-life changed the title [Do not review] [WIP] Add store and replay exec runtime feature [WIP] LOcal REplay framework: dump and replay GpuProjectExec runtime May 20, 2024
store files in. Remote path is supported e.g.: `hdfs://url:9000/path/to/save`

```
spark.conf.set("spark.rapids.sql.test.replay.exec.threshold.timeMS", 100)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Hmm, let's start with 1000 ms as a default value.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Only dump the column batches when it's executing time exceeds threshold time.

```
spark.conf.set("spark.rapids.sql.test.replay.exec.maxBatchNum", 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: will be more suitable saying spark.rapids.sql.test.replay.batch.limit and mention it's per executor base or task.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -2188,6 +2188,40 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.integerConf
.createWithDefault(1024)

/**
* refer to dev doc: `replay-exec.md`
* only supports "project", will supports "agg" later
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: TODO. any ticket to link here?

override def otherCopyArgs: Seq[AnyRef] =
Seq[AnyRef](useTieredProject.asInstanceOf[java.lang.Boolean])

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not necessary for extra lines?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

override def output: Seq[Attribute] = projectList.map(_.toAttribute)

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME))

override def internalDoExecuteColumnar() : RDD[ColumnarBatch] = {
override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep it, because it follows the code format.

val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val opTime = gpuLongMetric(OP_TIME)
val boundProjectList = GpuBindReferences.bindGpuReferencesTiered(projectList, child.output,
useTieredProject)

val rdd = child.executeColumnar()

// This is for test purpose; dump project list
replayDumper.foreach(d => d.dumpMeta[GpuTieredProject]("GpuTieredProject", boundProjectList))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hide this behind dumpForReplay = true?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to replayDumperOpt, so it shows it's an option.
Now using replayDumperOpt to distinguish dumpForReplay = true , dumpForReplay = false

f => f.getName.startsWith(s"${projectHash}_cb_data_") &&
f.getName.endsWith(".parquet"))
if (parquets == null || parquets.isEmpty) {
logError(s"Project Exec replayer: there is no cb_data_xxx.parquet file in $replayDir")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add an usage method and replace all those logError with that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@res-life res-life changed the title [WIP] LOcal REplay framework: dump and replay GpuProjectExec runtime [WIP] LOcal REplay framework: dump and replay GpuProjectExec runtime [databricks] May 21, 2024
@res-life
Copy link
Collaborator Author

build

@res-life res-life force-pushed the replay-exec branch 2 times, most recently from dee4dfc to fee9e38 Compare May 22, 2024 03:25
@res-life res-life marked this pull request as ready for review May 22, 2024 04:25
@res-life res-life changed the title [WIP] LOcal REplay framework: dump and replay GpuProjectExec runtime [databricks] LOcal REplay framework: dump and replay GpuProjectExec runtime [databricks] May 22, 2024
@res-life
Copy link
Collaborator Author

Tested dump/replay with remote directory(e.g.: hdfs://path/to/dir) successfully.

@res-life
Copy link
Collaborator Author

build

Copy link
Collaborator

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mostly looked at docs, not full code, but here are some comments

docs/dev/replay-exec.md Show resolved Hide resolved
Set the following configurations to enable this feature:

```
spark.conf.set("spark.rapids.sql.test.replay.exec.type", "project")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there a ".test." in the name of this config? isn't this for debugging, testing makes it sound not to be used in real workloads and for testing purposes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now updated to .debug.

spark.conf.set("spark.rapids.sql.test.replay.exec.type", "project")
```
Default `type` value is empty which means do not dump.
Set this `type` to `project` if you want to dump Project Exec runtime data. Currently only support
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you set it to multiple execs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to exec.types
Doc: Define the Exec types for dumping, separated by comma, e.g.: project,aggregate,sort.

`project` and empty.

```
spark.conf.set("spark.rapids.sql.test.replay.exec.dumpDir", "file:/tmp")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment here, shouldn't have .test in the name of config

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

spark.conf.set("spark.rapids.sql.test.replay.exec.dumpDir", "file:/tmp")
```
Default value is `file:/tmp`.
specify the dump directory, e.g.: `file:/tmp/my-debug-path`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be distributed filesystem, you should specify very specifically what all is supported. I think we do this in other places in docs for like dumping parquet files

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, can be distributed/remote path. Now we use hadoop FileSystem to open/write a file stream. If user specify the corresponding config to get access to remote file system, then our code can handle this.
Refer to https://github.com/NVIDIA/spark-rapids/blob/branch-24.06/docs/dev/get-json-object-dump-tool.md

'spark.rapids.sql.expression.GetJsonObject.debugPath': '/tmp/DEBUG_JSON_DUMP/'

```

<path_to_saved_replay_dir> is the replay directory
<hash_code_of_project_exec> is the hash code for a specific Project Exec. Dumping may generate
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this just replays one exec then? What does the replay do specifically?

What about other things you may want to collect like number of workers, executor sizes, etc. to try to get same environment, seems like that is also good data to have to try to reproduce same issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, this tool is used to run a single batch (fits in memory) to debug underlying cuDF/JNI kernel, so it's not related to number of workers, executor sizes, etc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats fine but a user who uses this tool likely wants to get that other information and when they replay likely wants as close to customer setup as possible so making a comment about that would help I think.

* refer to dev doc: `replay-exec.md`
* only supports "project" now
*/
val TEST_REPLAY_EXEC_TYPE =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above I would rather see these called like DEBUG instead of test.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please change the defined from TEST to DEBUG_ as well or just remove it.
TEST_REPLAY_EXEC_TYPE -> REPLAY_EXEC_TYPE or DEBUG_REPLAY_EXEC_TYPE

@res-life
Copy link
Collaborator Author

build

Copy link
Collaborator

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of high level questions. How does this scale to other execs? I assume you have to modify every single exec separately?

Also does this actually work for other execs - hash aggregate, join. What about when multiple batches come in and the exec generates multiple batches? What about if the exec carries some state across batches? Wondering if you have thought about all the different cases?

```
mvn clean install -DskipTests -pl dist -am -DallowConventionalDistJar=true -Dbuildver=330
```
Note: Should specify `-DallowConventionalDistJar`, this config will make sure to generate a
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather see this first saying you must build with this option and then below that have an example and point to our build docs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

spark.conf.set("spark.rapids.sql.debug.replay.exec.types", "project")
```
Default `types` value is empty which means do not dump.
Define the Exec types for dumping, separated by comma, e.g.: `project,aggregate,sort`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you specify for different types of joins and other execs - like broadcastjoin vs hashjoin?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For agg, sort, it's a future feature. This PR only handle project.
Will change to:

Default `types` value is empty which means do not dump.   
Define the Exec types for dumping, separated by comma, e.g.: `project`.   
Note currently only support `project`, so it's no need to specify comma.

```
spark.conf.set("spark.rapids.sql.debug.replay.exec.batch.limit", 1)
```
This config defines the max dumping number of column batches.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean exactly, if it exceeds threshold of processing multiple batches then it dumps all of them? If it exceeds threshold of processing one batch then we dump that batch and others after it?

I assume this is column batches per exec?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a limit for per Exec instance.
If current batches exceeds the max limit, then the batches will be skipped to dump.

After the job is done, check the dump path will have files like:
```
/tmp/replay-exec:
- xxx_GpuTieredProject.meta // this is serialized GPU Tiered Project case class
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I have a huge job that has say 10000 projects is this directory going to become to big? I assume the job keeps running after dumping this out, correct? Is there a way to limit the total number you get?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I have a huge job that has say 10000 projects is this directory going to become to big?

Yes, it will be big. We have a config threshold.timeMS to reduce the number of batches to dump.
Typically usage is: Running the query at customer env, then check the eventlog to find a threshold time; then run at customer env again with dump enabled and with this threshold time. Usually we only need one batch to reproduce the slowness, and reproduce with NSYS.

I assume the job keeps running after dumping this out, correct?

Yes. Keep runnig, if it's needed we can terminate the process when the first dumping is done.

Is there a way to limit the total number you get?

No. In future will collect batches for multiple Execs. We will do not know how many slowness batches in advance.

* refer to dev doc: `replay-exec.md`
* only supports "project" now
*/
val TEST_REPLAY_EXEC_TYPE =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please change the defined from TEST to DEBUG_ as well or just remove it.
TEST_REPLAY_EXEC_TYPE -> REPLAY_EXEC_TYPE or DEBUG_REPLAY_EXEC_TYPE

@sameerz sameerz added the tools label May 28, 2024
@res-life res-life changed the base branch from branch-24.06 to branch-24.08 May 29, 2024 00:59
@res-life
Copy link
Collaborator Author

A couple of high level questions. How does this scale to other execs? I assume you have to modify every single exec separately?

Yes. This PR is only dump Project which is a simple Exec.

Also does this actually work for other execs - hash aggregate, join.

Do not work for hash aggregate, join.

What about when multiple batches come in and the exec generates multiple batches?

Does cover in this PR. For agg Exec, it does handle multiple batches.

What about if the exec carries some state across batches? Wondering if you have thought about all the different cases?

Allen and I went into Hash Agg, and found what you mentioned.
Currently, It's not easy to complete all the Execs.
This PR is a start of dumping from project exec.

Maybe it's a follow-up to have a convenient method to handle all the Execs.

@res-life
Copy link
Collaborator Author

build

@res-life res-life marked this pull request as draft June 3, 2024 08:08
@res-life
Copy link
Collaborator Author

res-life commented Jun 3, 2024

Changed to draft, because there are error when testing Dataproc.

@res-life res-life force-pushed the replay-exec branch 2 times, most recently from 06aeb96 to 5742c4b Compare June 4, 2024 10:14
@res-life
Copy link
Collaborator Author

res-life commented Jun 4, 2024

build

@res-life res-life marked this pull request as ready for review June 5, 2024 02:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Support ProjectExec in LoRe framework
4 participants