Handling Large Queries in Interactive Workflows

A challenge with interactive data workflows is handling large queries. This includes queries that generate too many output rows, fetch many external partitions, or compute on extremely large data sets. These queries can be extremely slow, saturate cluster resources, and make it difficult for others to share the same cluster.

Query Watchdog is a process that prevents queries from monopolizing cluster resources by examining the most common causes of large queries and terminating queries that pass a threshold. This topic describes how to enable and configure Query Watchdog.

Important

Query Watchdog is enabled for all interactive clusters created using the UI.

Example of a disruptive query

An analyst is performing some ad hoc queries in a just-in-time data warehouse. The analyst uses a shared autoscaling cluster that makes it easy for multiple users to use a single cluster at the same time. Suppose there are two tables that each have a million rows.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

These table sizes are manageable in Apache Spark. However, they each include a join_key column with an empty string in every row. This can happen if the data is not perfectly clean or if there is significant data skew where some keys are more prevalent than others. These empty join keys are far more prevalent than any other value.

In the code below, the analyst is joining these two tables on their keys, which produces output of one trillion results, and all of these are produced on a single executor (the executor that gets the " " key):

%sql

SELECT
  id, count()
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

This query appears to be running. But without knowing about the data, the analyst sees that there’s “only” a single task left over the course of executing the job. The query never finishes, leaving the analyst frustrated and confused about why it did not work.

In this case there is only one problematic join key. Other times there may be many more.

Enable and configure Query Watchdog

To a prevent a query from creating too many output rows for the number of input rows, you can enable Query Watchdog and configure the maximum number of output rows as a multiple of the number of input rows. In this example we use a ratio of 1000 (the default).

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

The latter configuration declares that any given task should never produce more than 1000 times the number of input rows.

Tip

The output ratio is completely customizable. We recommend starting lower and seeing what threshold works well for you and your team. A range of 1,000 to 10,000 is a good starting point.

Not only does Query Watchdog prevent users from monopolizing cluster resources for jobs that will never complete, it also saves time by fast-failing a query that would have never completed. For example, the following query will fail after several minutes because it exceeds the ratio.

%sql

SELECT
  join_key,
  sum(x.id),
  count()
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY join_key

Here’s what you would see:

../../../_images/query-watchdog-example.png

It’s usually enough to enable Query Watchdog and set the output/input threshold ratio, but you also have the option to set two additional properties: spark.databricks.queryWatchdog.minTimeSecs and spark.databricks.queryWatchdog.minOutputRows. These properties specify the minimum time a given task in a query must run before cancelling it and the minimum number of output rows for a task in that query.

For example, you can set minTimeSecs to a higher value if you want to give it a chance to produce a large number of rows per task. Likewise, you can set spark.databricks.queryWatchdog.minOutputRows to ten million if you want to stop a query only after a task in that query has produced ten million rows. Anything less and the query succeeds, even if the output/input ratio was exceeded.

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

Tip

If you configure Query Watchdog in a notebook, the configuration does not persist across cluster restarts. If you want to configure Query Watchdog for all users of a cluster, we recommend that you use a cluster configuration.

Detect query on extremely large dataset

Another typical large query may scan a large amount of data from big tables/datasets. The scan operation may last for a long time and saturate cluster resources (even reading metadata of a big Hive table can take a significant amount of time). You can set maxHivePartitions to prevent fetching too many partitions from a big Hive table. Similarly, you can also set maxQueryTasks to limit queries on an extremely large dataset.

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

Note

maxHivePartitions is available in Databricks Runtime 3.4 and above. maxQueryTasks is available in Databricks Runtime 3.5 and above.

When should you enable Query Watchdog?

Query Watchdog should be enabled for ad hoc analytics clusters where SQL analysts and data scientists are sharing a given cluster and an administrator needs to make sure that queries “play nicely” with one another.

When should you disable Query Watchdog?

In general we do not advise eagerly cancelling queries used in an ETL scenario because there typically isn’t a human in the loop to correct the error. We recommend that you disable Query Watchdog for all but ad hoc analytics clusters.