Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Make VACUUM more efficient via optimistic execution mode #3106

Open
2 of 8 tasks
mwc360 opened this issue May 16, 2024 · 0 comments
Open
2 of 8 tasks
Labels
enhancement New feature or request

Comments

@mwc360
Copy link
Contributor

mwc360 commented May 16, 2024

Feature request

Introduce an new boolean flag, optimisticExecution, this would perform an initial transaction log check for file generating operations and the most recent Vacuum operation run before running the rest of the Vacuum operations to evaluate if Vacuum has already been run within the retention period. This would make the Vacuum operation fast when there haven't been any new file generation operations outside the retention period.

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Overview

Run Vacuum multiple times in a row without any changes to a delta table and it will run in the same amount of time (too long), always performing the same operations: listing files, comparing the file list against the transaction log, and then performing any deletions.

Vacuum should introduce a new default enabled feature flag that first uses the transaction log to evaluate if there were any file generation operations run within N days of when the last Vacuum operation took place, if not it would not proceed with the rest of the Vacuum operation.

I implemented a wrapper around the Vacuum operation which does exactly this so that we can run Vacuum much more frequently and have it self evaluate if there's hypothetically anything to vacuum. This makes repeat runs of the Vacuum operation where files have only been added within the retention period take sub 1 second rather than the normal duration of a Vacuum operation.

This could be called optimisticExecution mode since technically there could still be files that fall outside the retention window that would not be caught but the key thing is being optimistic about how often a true vacuum operation should be run and base this off of the retention hours... so if we are retaining 7 days of history, this feature would prevent a full vacuum operation from execution more than once w/in 7 days.

Motivation

Decrease the cost of frequently running Vacuum.

Further details

draft of modifications to gc() method:

  def gc(
      spark: SparkSession,
      deltaLog: DeltaLog,
      dryRun: Boolean = true,
      optimisticExecution: Boolean = true,
      retentionHours: Option[Double] = None,
      inventory: Option[DataFrame] = None,
      clock: Clock = new SystemClock): DataFrame = {
    recordDeltaOperation(deltaLog, "delta.gc") {

      val vacuumStartTime = System.currentTimeMillis()
      val path = deltaLog.dataPath
      val deltaHadoopConf = deltaLog.newDeltaHadoopConf()
      val fs = path.getFileSystem(deltaHadoopConf)

      import org.apache.spark.sql.delta.implicits._

      val snapshot = deltaLog.update()
      deltaLog.protocolWrite(snapshot.protocol)

      val snapshotTombstoneRetentionMillis = DeltaLog.tombstoneRetentionMillis(snapshot.metadata)
      val retentionMillis = retentionHours.map(h => TimeUnit.HOURS.toMillis(math.round(h)))
      val deleteBeforeTimestamp = retentionMillis match {
        case Some(millis) => clock.getTimeMillis() - millis
        case _ => snapshot.minFileRetentionTimestamp
      }
      val basePath = fs.makeQualified(path).toString

      if (optimisticExecution) {
        val vacuumNeeded = {
          val history = snapshot.history.getHistory(None)
          val vacuumDateThreshold = new Date(deleteBeforeTimestamp)
          val fileGeneratingOperations = Seq(
            "WRITE", "STREAMING UPDATE", "MERGE", "OVERWRITE", "UPDATE", "CREATE OR REPLACE TABLE"
          )

          val latestVacuum = history.filter(_.operation == "VACUUM END")
            .sortBy(_.timestamp)
            .reverse
            .headOption

          val latestVacuumTimestamp = if (latestVacuum.nonEmpty) {
            latestVacuum.get.timestamp
          } else {
            val oldestFileGenOperation = history.filter(op => fileGeneratingOperations.contains(op.operation))
              .sortBy(_.timestamp)
              .headOption

            if (oldestFileGenOperation.nonEmpty) {
              oldestFileGenOperation.get.timestamp
            } else {
              new Date(System.currentTimeMillis())
            }
          }

          if (latestVacuumTimestamp.before(vacuumDateThreshold)) {
            val changesAfterVacuum = history.filter(op =>
              op.timestamp.after(latestVacuumTimestamp) && fileGeneratingOperations.contains(op.operation)
            )
            changesAfterVacuum.nonEmpty
          } else {
            false
          }
        }
        if (!vacuumNeeded) {
          logInfo(s"Garbage collection has already been run withing the retention period, skipping garbage collection."
          return spark.createDataset(Seq(basePath)).toDF("path")
        }
      }

      logInfo(s"Starting garbage collection (dryRun = $dryRun) of untracked files older than " +
        s"${new Date(deleteBeforeTimestamp).toGMTString} in $path")
      val hadoopConf = spark.sparkContext.broadcast(
        new SerializableConfiguration(deltaHadoopConf))
      val parallelDeleteEnabled =
        spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_ENABLED)
      val parallelDeletePartitions =
        spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_PARALLELISM)
        .getOrElse(spark.sessionState.conf.numShufflePartitions)
      val startTimeToIdentifyEligibleFiles = System.currentTimeMillis()

      val validFiles =
        getValidFilesFromSnapshot(
          spark,
          basePath,
          snapshot,
          retentionMillis,
          hadoopConf,
          clock,
          checkAbsolutePathOnly = false)

      val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
      val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism
      val allFilesAndDirsWithDuplicates = inventory match {
        case Some(inventoryDF) => getFilesFromInventory(basePath, partitionColumns, inventoryDF)
        case None => DeltaFileOperations.recursiveListDirs(
          spark,
          Seq(basePath),
          hadoopConf,
          hiddenDirNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
          hiddenFileNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
          fileListingParallelism = Option(parallelism)
        )
      }
      val allFilesAndDirs = allFilesAndDirsWithDuplicates.groupByKey(_.path)
        .mapGroups { (k, v) =>
          val duplicates = v.toSeq
          // of all the duplicates we can return the newest file.
          duplicates.maxBy(_.modificationTime)
        }

      recordFrameProfile("Delta", "VacuumCommand.gc") {
        try {
          allFilesAndDirs.cache()

          implicit val fileNameAndSizeEncoder =
            org.apache.spark.sql.Encoders.product[FileNameAndSize]

          val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path
          val filesAndDirsPresentBeforeDelete = allFilesAndDirs.count()

          // The logic below is as follows:
          //   1. We take all the files and directories listed in our reservoir
          //   2. We filter all files older than our tombstone retention period and directories
          //   3. We get the subdirectories of all files so that we can find non-empty directories
          //   4. We groupBy each path, and count to get how many files are in each sub-directory
          //   5. We subtract all the valid files and tombstones in our state
          //   6. We filter all paths with a count of 1, which will correspond to files not in the
          //      state, and empty directories. We can safely delete all of these
          val diff = allFilesAndDirs
            .where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
            .mapPartitions { fileStatusIterator =>
              val reservoirBase = new Path(basePath)
              val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
              fileStatusIterator.flatMap { fileStatus =>
                if (fileStatus.isDir) {
                  Iterator.single(FileNameAndSize(
                    relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L))
                } else {
                  val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
                  val dirsWithSlash = dirs.map { p =>
                    val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
                    FileNameAndSize(relativizedPath, 0L)
                  }
                  dirsWithSlash ++ Iterator(
                    FileNameAndSize(relativize(
                      fileStatus.getHadoopPath, fs, reservoirBase, isDir = false),
                      fileStatus.length))
                }
              }
            }.groupBy(col("path")).agg(count(new Column("*")).as("count"),
              sum("length").as("length"))
            .join(validFiles, Seq("path"), "leftanti")
            .where(col("count") === 1)


          val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first()
          val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) {
            0L
          } else {
            sizeOfDataToDeleteRow.getLong(0)
          }

          val diffFiles = diff
            .select(col("path"))
            .as[String]
            .map { relativePath =>
              assert(!stringToPath(relativePath).isAbsolute,
                "Shouldn't have any absolute paths for deletion here.")
              pathToString(DeltaFileOperations.absolutePath(basePath, relativePath))
            }
          val timeTakenToIdentifyEligibleFiles =
            System.currentTimeMillis() - startTimeToIdentifyEligibleFiles


          val numFiles = diffFiles.count()
          if (dryRun) {
            val stats = DeltaVacuumStats(
              isDryRun = true,
              specifiedRetentionMillis = retentionMillis,
              defaultRetentionMillis = snapshotTombstoneRetentionMillis,
              minRetainedTimestamp = deleteBeforeTimestamp,
              dirsPresentBeforeDelete = dirCounts,
              filesAndDirsPresentBeforeDelete = filesAndDirsPresentBeforeDelete,
              objectsDeleted = numFiles,
              sizeOfDataToDelete = sizeOfDataToDelete,
              timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
              timeTakenForDelete = 0L,
              vacuumStartTime = vacuumStartTime,
              vacuumEndTime = System.currentTimeMillis,
              numPartitionColumns = partitionColumns.size
            )

            recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
            logInfo(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " +
              s"a total of $dirCounts directories that are safe to delete. Vacuum stats: $stats")

            return diffFiles.map(f => stringToPath(f).toString).toDF("path")
          }
          logVacuumStart(
            spark,
            deltaLog,
            path,
            diffFiles,
            sizeOfDataToDelete,
            retentionMillis,
            snapshotTombstoneRetentionMillis)

          val deleteStartTime = System.currentTimeMillis()
          val filesDeleted = try {
            delete(diffFiles, spark, basePath,
              hadoopConf, parallelDeleteEnabled, parallelDeletePartitions)
          } catch {
            case t: Throwable =>
              logVacuumEnd(deltaLog, spark, path)
              throw t
          }
          val timeTakenForDelete = System.currentTimeMillis() - deleteStartTime
          val stats = DeltaVacuumStats(
            isDryRun = false,
            specifiedRetentionMillis = retentionMillis,
            defaultRetentionMillis = snapshotTombstoneRetentionMillis,
            minRetainedTimestamp = deleteBeforeTimestamp,
            dirsPresentBeforeDelete = dirCounts,
            filesAndDirsPresentBeforeDelete = filesAndDirsPresentBeforeDelete,
            objectsDeleted = filesDeleted,
            sizeOfDataToDelete = sizeOfDataToDelete,
            timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
            timeTakenForDelete = timeTakenForDelete,
            vacuumStartTime = vacuumStartTime,
            vacuumEndTime = System.currentTimeMillis,
            numPartitionColumns = partitionColumns.size)
          recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
          logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts))
          logInfo(s"Deleted $filesDeleted files ($sizeOfDataToDelete bytes) and directories in " +
            s"a total of $dirCounts directories. Vacuum stats: $stats")


          spark.createDataset(Seq(basePath)).toDF("path")
        } finally {
          allFilesAndDirs.unpersist()
        }
      }
    }
  }

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

  • Yes. I can contribute this feature independently.
  • Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
  • No. I cannot contribute this feature at this time.

LMK if this feature makes sense and I'll submit a PR.

@mwc360 mwc360 added the enhancement New feature or request label May 16, 2024
@mwc360 mwc360 changed the title [Feature Request] Make VACUUM more efficient w/ transaction log check [Feature Request] Make VACUUM more efficient via optimistic execution mode May 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant