How to List and Delete Files Faster in Azure Databricks

Scenario

Suppose you need to delete a table that is partitioned by year, month, date, region, and service. However, the table is huge, and there will be around 1000 part files per partition. One way you can do this is to list all the files in each partition and delete them using an Apache Spark job. For example, suppose you have a table that is partitioned by a, b, and c:

Seq((1,2,3,4,5),
  (2,3,4,5,6),
  (3,4,5,6,7),
  (4,5,6,7,8))
  .toDF("a", "b", "c", "d", "e")
  .write.mode("overwrite")
  .partitionBy("a", "b", "c")
  .parquet("/mnt/path/table")

List files

You can list all the part files using this function:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
import java.net.URI

def listFiles(basep: String, globp: String): Seq[String] = {
  val conf = new Configuration(sc.hadoopConfiguration)
  val fs = FileSystem.get(new URI(basep), conf)

  def validated(path: String): Path = {
    if(path startsWith "/") new Path(path)
    else new Path("/" + path)
  }

  val fileCatalog = InMemoryFileIndex.bulkListLeafFiles(
    paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
    hadoopConf = conf,
    filter = null,
    sparkSession = spark)

  fileCatalog.flatMap(_._2.map(_.path))
}

val root = "/mnt/path/table"
val globp = "[^_]*" // glob pattern, e.g. "service=webapp/date=2019-03-31/*log4j*"

val files = listFiles(root, globp)
files.toDF("path").show()
+------------------------------------------------------------------------------------------------------------------------------+
|path                                                                                                                          |
+------------------------------------------------------------------------------------------------------------------------------+
|dbfs:/mnt/path/table/a=1/b=2/c=3/part-00000-tid-5046671251912249212-afa32967-b7db-444e-b895-d12d68c05500-5.c000.snappy.parquet|
|dbfs:/mnt/path/table/a=2/b=3/c=4/part-00001-tid-5046671251912249212-afa32967-b7db-444e-b895-d12d68c05500-6.c000.snappy.parquet|
|dbfs:/mnt/path/table/a=3/b=4/c=5/part-00002-tid-5046671251912249212-afa32967-b7db-444e-b895-d12d68c05500-7.c000.snappy.parquet|
|dbfs:/mnt/path/table/a=4/b=5/c=6/part-00003-tid-5046671251912249212-afa32967-b7db-444e-b895-d12d68c05500-8.c000.snappy.parquet|
+------------------------------------------------------------------------------------------------------------------------------+

The listFiles function takes a base path and a glob path as arguments, scans the files and matches with the glob pattern, and then returns all the leaf files that were matched as a sequence of strings.

The function also uses another utility function globPath from the SparkHadoopUtil package. This function lists all the paths in a directory with the specified prefix, and does not further list leaf children (files). The list of paths is passed into InMemoryFileIndex.bulkListLeafFiles method, which is a Spark internal API for distributed file listing.

Neither of these listing utility functions work well alone, but in combination they provide benefits: you can get a list of top-level directories that you want to list using globPath function, which will run on the driver, and you can distribute the listing for all child leaves of the top-level directories into Spark workers using bulkListLeafFiles.

The speed-up can be around 20-50x faster according to Amdahl’s law. The reason is that, you can easily control the glob path according to the real file physical layout and control the parallelism through spark.sql.sources.parallelPartitionDiscovery.parallelism``for ``InMemoryFileIndex.

Delete files

When you delete files or partitions from an unmanaged table, you can use the Azure Databricks utility function dbutils.fs.rm. This function leverages the native cloud storage file system API, which is optimized for all file operations. However, you can’t delete a gigantic table directly using dbutils.fs.rm(“path/to/the/table”), for reasons such as storage request rate limit, networking limit, and so on.

You can list files efficiently using the script above. For smaller tables, the collected paths of the files to delete fit into the driver memory, so you can use a Spark job to distribute the file deletion task.

For gigantic tables, even for a single top-level partition, the string representations of the file paths cannot fit into the driver memory. The easiest way to solve this problem is to collect the paths of the inner partitions recursively, list the paths, and delete them in parallel.

import scala.util.{Try, Success, Failure}

def delete(p: String): Unit = {
  dbutils.fs.ls(p).map(_.path).toDF.foreach { file =>
    println(s"deleting file: $file")
    dbutils.fs.rm(file(0).toString, true)
  }
}

final def walkDelete(root: String)(level: Int): Unit = {
  dbutils.fs.ls(root).map(_.path).foreach { p =>
    println(s"Deleting: $p, on level: ${level}")
    val deleting = Try {
      if(level == 0) delete(p)
      else if(p endsWith "/") walkDelete(p)(level-1)
      //
      // Set only n levels of recursion, so it won't be a problem
      //
      else delete(p)
    }
    deleting match {
      case Success(v) => {
        println(s"Successfully deleted $p")
        dbutils.fs.rm(p, true)
      }
      case Failure(e) => println(e.getMessage)
    }
  }
}

The code deletes inner partitions while ensuring that the partition that is being deleted is small enough. It does this by searching through the partitions recursively by each level, and only starts deleting when it hits the level you set. For instance, if you want to start with deleting the top-level partitions, use walkDelete(root)(0). Spark will delete all the files under dbfs:/mnt/path/table/a=1/, then delete .../a=2/, etc.

The Spark job distributes the deletion task using the delete function shown above, listing the files with dbutils.fs.ls with the assumption that the number of child partitions at this level is small. You can also be more efficient by replacing the dbutils.fs.ls function with the listFiles function shown above, with only slight modification.

Summary

The two approaches above show how to divide and conquer issues with listing and deleting gigantic tables. They use some Spark utility functions and functions specific to the Azure Databricks environment. Even if you cannot use them directly, you can create your own utility functions to solve the problem in an analogous way.