How to Restart a Structured Streaming Query From Last Written Offset

Scenario

You have a stream, running a windowed aggregation query, that reads from Apache Kafka and writes files in Append mode. You want to upgrade the application and restart the query with the offset equal to the last written offset. You want to discard all state information that hasn’t been written to the sink, start processing from the earliest offsets that contributed to the discarded state, and modify the checkpoint directory accordingly.

However, if you use existing checkpoints after upgrading the application code, old states and objects from the previous application version are re-used, which results in unexpected output such as reading from old sources or processing with old application code.

Solution

Apache Spark maintains state across the execution and binary objects on checkpoints. Therefore you cannot modify the checkpoint directory. As an alternative, copy and update the offset with the input records and store this in a file or a database. Read it during the initialization of the next restart and use the same value in readStream. Make sure to delete the checkpoint directory.

You can get the current offsets by using asynchronous APIs:

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started:" + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated" + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
     println("Query made progress")
        println("Starting offset:" + queryProgress.progress.sources(0).startOffset)
        println("Ending offset:" + queryProgress.progress.sources(0).endOffset)
        //Logic to save these offsets
    }
})

You can use readStream with the latest offset written by the process shown above:

option("startingOffsets",  """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """)

The input schema for streaming records is:

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

Also, you can implement logic to save and update the offset to a database and read it at the next restart.