-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Optimize batching / incremental progress #3089
base: master
Are you sure you want to change the base?
Conversation
|
||
val filesToProcess = bins.flatMap(_._2) | ||
|
||
txn.trackFilesRead(filesToProcess) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be the only difference with the existing default behavior. Only the filtered candidates are registered with the transaction, not all matching files.
val filesToProcess = bins.flatMap(_._2) | ||
|
||
txn.trackFilesRead(filesToProcess) | ||
txn.trackReadPredicates(partitionPredicate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the partition predicate should still be registered even if it's just a subset of the partition that's being processed. This is already what's happening anyway with the candidates being filtered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this looks correct to me compared with filtering code through OptimisticTransaction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change, I left a few comments and we need more tests for this change.
@@ -61,17 +62,17 @@ abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaComman | |||
*/ | |||
def validateZorderByColumns( | |||
spark: SparkSession, | |||
txn: OptimisticTransaction, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you delete the obsolete comment above for txn
and replace with snapshot
?
* @param txn the [[OptimisticTransaction]] being used to optimize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
@@ -273,34 +280,17 @@ class OptimizeExecutor( | |||
|
|||
val maxThreads = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable can be removed now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep good catch
bins: Seq[(Map[String, String], Seq[AddFile])], | ||
batchSize: Long) | ||
: Seq[Seq[(Map[String, String], Seq[AddFile])]] = { | ||
val batches = new ArrayBuffer[Seq[(Map[String, String], Seq[AddFile])]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it has multiple nested containers, can we add a named type for Seq[(Map[String, String], Seq[AddFile])]
and (Map[String, String], Seq[AddFile])
to make it more readable?
Something like
case class Bin(partitionValue: Map[String, String], files: Seq[AddFile])
case class Batch(bins: Seq[Bin])
or similiar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep thought these were getting verbose. Added case classes.
val filesToProcess = bins.flatMap(_._2) | ||
|
||
txn.trackFilesRead(filesToProcess) | ||
txn.trackReadPredicates(partitionPredicate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this looks correct to me compared with filtering code through OptimisticTransaction
@@ -309,10 +299,10 @@ class OptimizeExecutor( | |||
optimizeStats.totalConsideredFiles = candidateFiles.size | |||
optimizeStats.totalFilesSkipped = optimizeStats.totalConsideredFiles - removedFiles.size | |||
optimizeStats.totalClusterParallelism = sparkSession.sparkContext.defaultParallelism | |||
val numTableColumns = txn.snapshot.metadata.schema.size | |||
val numTableColumns = snapshot.metadata.schema.size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we rename optimizeStats.numBatches
at line 298 to optimizeStats.numBins
since it is not batch any more with this change? Also we probably want to add optimizeStats.numBins = jobs.size
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense, I also kept a numBatches to actually represent the number of batches? Or is that bad because it changes the meaning of the existing stat
val rows = new OptimizeExecutor(spark, txn, partitionPredicates, Seq(), true, optimizeContext) | ||
.optimize() | ||
val rows = new OptimizeExecutor(spark, deltaLog.update(), catalogTable, partitionPredicates, | ||
Seq(), true, optimizeContext).optimize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I know this is pre-existing, can you spell out the arguments for Seq()
and true
for better readablity?
.internal() | ||
.doc( | ||
""" | ||
|The size of a batch within an OPTIMIZE JOB. After a batch is complete, it's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|The size of a batch within an OPTIMIZE JOB. After a batch is complete, it's | |
|The size of a batch within an OPTIMIZE JOB. After a batch is complete, its |
} | ||
val batchResults = batchSize match { | ||
case Some(size) => | ||
groupBinsIntoBatches(jobs, size).map(runOptimizeBatch(_, maxFileSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong. The files are binpacked in the following steps:
groupFilesIntoBins
: bin pack files according toOptimizeTableStrategy.maxBinSize
and respecting the partition boundaries.groupBinsIntoBatches
: Group multiple bins into one batch. Multiple partitioned bins can come to the same batch.
So each transaction can have data from multiple partitions. the ConflictChecker rejects one transaction if two txns writting to the same partition. It won't be a problem for those txns within the single OPTIMIZE since batches are executed in serialized order. For concurrent OPTIMIZE commands, we can consider each batch only include single partitioend data so we can minimize the chance conflict from concurrent OPTIMIZE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm didn't look at the ConflictChecker when working on this, I did have in my mind an improvement to doing multiple batches simultaneously/overlapping to prevent the tail of execution for each batch. There's no conceptual reason the commits should conflict, since they are reading specific files and not changing data. If they would conflict that might just be an improvement that should be made in the ConflictChecker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though since I'm creating a transaction off the same original snapshot, even doing them serially they should invoke the same conflict checking against themselves right? And the current simple test doesn't seem to have an issue. Will see if I encounter anything in further tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I guess the Optimize command does it's own conflict checking to resolve that
assert(files.values.forall(_.length == 1)) | ||
// The last 5 commits in the history should be optimize batches, one for each partition | ||
val commits = deltaLog.history.getHistory(None) | ||
assert(commits.filter(_.operation == "OPTIMIZE").length == optimizeCommits) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a check the data before and after OPTIMIZE should be the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
@@ -536,6 +536,39 @@ trait OptimizeCompactionSuiteBase extends QueryTest | |||
} | |||
} | |||
|
|||
test("optimize command with batching") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add more tests to cover:
OPTIMIZE WHERE
for patititioned table. This makes sure the batching works correctly with the filter.- Since this change also impacts
zorder by
andcluster by
, we need to add tests for both of them to validate batching works as expected. - OPTIMIZE on an empty table. Make sure it doens't trigger any divide by zero errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks I'll work on those
Which Delta project/connector is this regarding?
Description
Resolves #3081
Adds support for splitting an optimize run into batches with a new config
spark.databricks.delta.optimize.batchSize
. Batches will be created by grouping existing bins into groups untilbatchSize
is reached. The default behavior remains the same, and batching is only enabled if thebatchSize
is configured.This will apply to all optimization paths. I don't see any reason it shouldn't apply to to compaction, z-ordering, clustering, auto-compaction, or reorg/DV rewriting if a user configures it.
The way transactions are handled within the optimize executor had to be updated. Instead of creating a transaction upfront, we list all the files in the most recent snapshot, and then create transactions for each batch.
This is very important to add for clustering, as there is no way to manually do a partial set of the table using partition filtering. This could cause a lot of execution time and storage space to be wasted if something fails before optimizing the entire table finishes.
How was this patch tested?
A simple new UT is added. I can add others as well, just looking for some feedback on the approach and suggestions of what other tests to add.
Does this PR introduce any user-facing changes?
Yes, adds new capability to optimization that is disabled by default.