How to Resolve Job Hangs and Collect Diagnostic Information

Problem

Execution of a streaming or batch job hangs, but the cluster does not hang.

Cause

There are various rare scenarios and corner cases that can cause a streaming or batch job to hang. It is also possible for a job to hang because the Azure Databricks internal metastore has become corrupted.

Solution

Often restarting the cluster or creating a new one resolves the problem. In other cases, run the following script to unhang the job and collect notebook information, which can be provided to Azure Databricks Support.

  1. Run this code on the cluster where the job was hanging. Use with jobs that are stalled or showing no progress, but the cluster itself is still healthy. It is very unlikely that an Apache Spark job on a job cluster will hang permanently.

    %scala
    import scala.collection.JavaConverters._
    
    def dumpHeap(): Unit = {
      import scala.sys.process._
    
      val id = System.currentTimeMillis
      val corePathLocal = s"/tmp/core.$id"
      val corePathDBFS = s"/FileStore/test/core.$id.gz"
      val corePathCompressed = s"/tmp/core.$id.gz"
    
      val pidSofer = Process("jps").lineStream.filter(_.split(" ")(1) == "ChauffeurDaemon").head.split(" ")(0).toInt
      val pidDriver = Process("jps").lineStream.filter(_.split(" ")(1) == "DriverDaemon").head.split(" ")(0).toInt
    
      println(s"driver pid is $pidDriver")
      println(s"chauffeur pid is $pidSofer")
    
      val resDump = Process(s"jmap -dump:live,file=$corePathLocal $pidDriver").lineStream.mkString
    
      println(s"compressing $corePathLocal to $corePathCompressed")
      println(Process(s"gzip --fast $corePathLocal").lineStream.mkString)
      println(s"uploading to $corePathDBFS")
      dbutils.fs.cp("file://" + corePathCompressed, corePathDBFS)
    
      displayHTML(s"""<a href="/files/test/core.$id.gz">Download heap dump</a>""")
    }
    
    def checkMetastore(): Unit = {
      // how many seconds to wait for the metastore call
      val waitForSecs = 60 * 2
      // run a metastore command in another thread; if the metastore is hanging this thread will also hang
      val thr = new Thread("TROUBLESHOOT_TEST") {
        override def run(): Unit = {
          spark.sessionState.catalog.databaseExists("default")
        }
      }
      // start metastore call
      thr.start
      // wait for metastore call
      thr.join(waitForSecs * 1000)
      // if thread is still running, this call will set the interrupt flag to true, otherwise it will do nothing
      thr.interrupt
    
      if (thr.isInterrupted) {
        dumpHeap()
        // throw exception if thread is still running; this will also trigger a DB Job fail
        throw new Exception(s"Hive metastore was locked for $waitForSecs seconds")
      } else {
        // metastore call was OK
        println(s"${new java.util.Date} Hive metastore was OK")
      }
    }
    
    def isThreadAlreadyRunning(): Boolean = {
      val threadOpt = Thread.getAllStackTraces().keySet().asScala.find(_.getName == "ES7956")
      threadOpt.isDefined
    }
    
    if (!isThreadAlreadyRunning()) {
      checkMetastore()
    } else {
      throw new Exception("Metastore checker thread is already running.")
    }
    
  2. Call the dumpHeap() function to download the heap dump to the DBFS location.

    dumpHeap()
    

    The command runs and then generates a link to the dump file.

    ../../_images/hung-job1.png
    ../../_images/hung-job2.png
  3. Click the Download heap dump link.

  4. Contact Azure Databricks Support and submit the downloaded file.