Structured Streaming in Production

It is convenient to attach a notebook to a cluster and run your streaming queries interactively. However, when you run them in production, you are likely to want more robustness and uptime guarantees. This topic discusses how to make your streaming application more fault tolerant using Azure Databricks jobs.

Recover from query failures

A production-grade streaming application must have robust failure handling. In Structured Streaming, if you enable checkpointing for a streaming query, then you can restart the query after a failure and the restarted query will continue where the failed one left off, while ensuring fault tolerance and data consistency guarantees. Hence, to make your queries fault tolerant, you must enable query checkpointing and configure Databricks jobs to restart your queries automatically after a failure.

Enable checkpointing

To enable checkpointing, set the option checkpointLocation to a DBFS or cloud storage path before you start the query. For example:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "dbfs://outputPath/")
  .option("checkpointLocation", "dbfs://checkpointPath")
  .start()

This checkpoint location preserves all of the essential information that uniquely identifies a query. Hence, each query must have a different checkpoint location, and multiple queries should never have the same location. See the Structured Streaming Programming Guide for more details.

Configure jobs to restart streaming queries on failure

You can create a Databricks job with the notebook or JAR that has your streaming queries and configure it to:

  • Always use a new cluster.
  • Always retry on failure.

Jobs have tight integration with Structured Streaming APIs and can monitor all streaming queries active in a run. This configuration ensures that if any part of the query fails, jobs automatically terminate the run (along all the other queries) and start a new run in a new cluster. The new run re-executes the notebook or JAR code and restarts all of the queries again. This is the safest way to ensure that you get back into a good state.

Warning

Notebook workflows are not supported with long-running jobs. Therefore we don’t recommend using notebook workflows in your streaming jobs.

Note

  • Failure in any of the active streaming queries causes the active run to fail and terminate all the other streaming queries.
  • You do not need to use streamingQuery.awaitTermination() or spark.streams.awaitAnyTermination() at the end of your notebook. Jobs automatically prevent a run from completing when a streaming query is active.

Here are the details of the recommended job configuration.

  • Cluster: Set this always to use a new cluster and use the latest Spark version (or at least version 2.1). Queries started in Spark 2.1 and above are recoverable after query and Spark version upgrades.
  • Alerts: Set this if you want email notification on failures.
  • Schedule: Do not set a schedule.
  • Timeout: Do not set a timeout. Streaming queries run for an indefinitely long time.
  • Maximum concurrent runs: Set to 1. There must be only one instance of each query concurrently active.
  • Retries: Set to Unlimited.

See Jobs to understand these configurations. Here is a screenshot of a good job configuration.

../../../_images/job-conf-azure.png

Recover after changes in a streaming query

There are limitations on what changes in a streaming query are allowed between restarts from the same checkpoint location. Here are a few kinds of changes that are either not allowed, or the effect of the change is not well-defined. For all of them:

  • The term allowed means you can do the specified change but whether the semantics of its effect is well-defined depends on the query and the change.
  • The term not allowed means you should not do the specified change as the restarted query is likely to fail with unpredictable errors.
  • sdf represents a streaming DataFrame/Dataset generated with sparkSession.readStream.

Types of changes

  • Changes in the number or type (i.e. different source) of input sources: This is not allowed.
  • Changes in the parameters of input sources: Whether this is allowed and whether the semantics of the change are well-defined depends on the source and the query. Here are a few examples.
    • Addition/deletion/modification of rate limits is allowed: spark.readStream.format("kafka").option("subscribe", "topic") to spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
    • Changes to subscribed topics/files is generally not allowed as the results are unpredictable: spark.readStream.format("kafka").option("subscribe", "topic") to spark.readStream.format("kafka").option("subscribe", "newTopic")
  • Changes in the type of output sink: Changes between a few specific combinations of sinks are allowed. This needs to be verified on a case-by-case basis. Here are a few examples.
    • File sink to Kafka sink is allowed. Kafka will see only the new data.
    • Kafka sink to file sink is not allowed.
    • Kafka sink changed to foreach, or vice versa is allowed.
  • Changes in the parameters of output sink: Whether this is allowed and whether the semantics of the change are well-defined depends on the sink and the query. Here are a few examples.
    • Changes to output directory of a file sink is not allowed: sdf.writeStream.format("parquet").option("path", "/somePath") to sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Changes to output topic is allowed: sdf.writeStream.format("kafka").option("topic", "someTopic") to sdf.writeStream.format("kafka").option("path", "anotherTopic")
    • Changes to the user-defined foreach sink (that is, the ForeachWriter code) is allowed, but the semantics of the change depends on the code.
  • Changes in projection / filter / map-like operations: Some cases are allowed. For example:
    • Addition / deletion of filters is allowed: sdf.selectExpr("a") to sdf.where(...).selectExpr("a").filter(...).
    • Changes in projections with same output schema is allowed: sdf.selectExpr("stringColumn AS json").writeStream to sdf.select(to_json(...).as("json")).writeStream.
    • Changes in projections with different output schema are conditionally allowed: sdf.selectExpr("a").writeStream to sdf.selectExpr("b").writeStream is allowed only if the output sink allows the schema change from "a" to "b".
  • Changes in stateful operations - Some operations in streaming queries need to maintain state data in order to continuously update the result. Structured Streaming automatically checkpoints the state data to fault-tolerant storage (for example, DBFS, AWS S3, Azure Blob storage) and restores it after restart. However, this assumes that the schema of the state data remains same across restarts. This means that any changes (that is, additions, deletions, or schema modifications) to the stateful operations of a streaming query are not allowed between restarts. Here is the list of stateful operations whose schema should not be changed between restarts in order to ensure state recovery:
    • Streaming aggregation: For example, sdf.groupBy("a").agg(...). Any change in number or type of grouping keys or aggregates is not allowed.
    • Streaming deduplication: For example, sdf.dropDuplicates("a"). Any change in number or type of grouping keys or aggregates is not allowed.
    • Stream-stream join: For example, sdf1.join(sdf2, ...) (i.e. both inputs are generated with sparkSession.readStream). Changes in the schema or equi-joining columns are not allowed. Changes in join type (outer or inner) not allowed. Other changes in the join condition are ill-defined.
    • Arbitrary stateful operation: For example, sdf.groupByKey(...).mapGroupsWithState(...) or sdf.groupByKey(...).flatMapGroupsWithState(...). Any change to the schema of the user-defined state and the type of timeout is not allowed. Any change within the user-defined state-mapping function are allowed, but the semantic effect of the change depends on the user-defined logic. If you really want to support state schema changes, then you can explicitly encode/decode your complex state data structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully.

Configure Spark scheduler pools for efficiency

By default, all queries started in a notebook run in the same fair scheduling pool. Therefore, jobs generated by triggers from all of the streaming queries in a notebook run one after another in first in, first out (FIFO) order. This can cause unnecessary delays in the queries, because they are not efficiently sharing the cluster resources.

To enable all streaming queries to execute jobs concurrently and to share the cluster efficiently, you can set the queries to execute in separate scheduler pools. For example:

// Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").format("parquet").start(path1)

// Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").format("orc").start(path2)

Note

The local property configure must be in the same notebook cell where you start your streaming query.

See Apache fair scheduler documentation for more details.

Optimize performance of stateful streaming queries

If you have stateful operations in your streaming query (for example, streaming aggregation, streaming dropDuplicates, stream-stream joins, mapGroupsWithState, or flatMapGroupsWithState) and you want to maintain millions of keys in the state, then you may face issues related to large JVM garbage collection (GC) pauses causing high variations in the micro-batch processing times. This occurs because, by default, the state data is maintained in the JVM memory of the executors and large number of state objects puts memory pressure on the JVM causing high GC pauses.

In such cases, you can choose to use a more optimized state management solution based on RocksDB . This solution is available in Databricks Runtime. Rather than keeping the state in the JVM memory, this solution uses RocksDB to efficiently manage the state in the native memory and the local SSD. Furthermore, any changes to this state is automatically saved by Structured Streaming to the checkpoint location you have provided, thus providing full fault-tolerance guarantees (same as the default state management).

You can enable RockDB-based state management by setting the following configuration in the SparkSession before starting the streaming query.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Other recommended configurations for best performance:

  • Use Databricks Runtime 4.1 or above.
  • Use compute-optimized instances as workers. For example, Azure Standard_F16s instances.
  • Set the number of shuffle partitions to 1-2 times number of cores in the cluster.

Regarding performance benefits, RocksDB-based state management can maintain 100 times more state keys than the default one. For example, in a Spark cluster with Azure Standard_F16s instances as workers, the default state management can maintain up to 1-2 million state keys per executor after which the JVM GC starts affecting performance significantly. In contrast, the RocksDB-based state management can easily maintain 100 million state keys per executor without any GC issues.

Note

The state management scheme cannot be changed between query restarts. That is, if a query has been started with the default management, then it cannot changed without starting the query from scratch with a new checkpoint location.