Databricks Delta is deeply integrated with Spark Structured Streaming through
writeStream. Delta overcomes many of the limitations typically associated with streaming systems and files, including:
- Coalescing small files produced by low latency ingest.
- Maintaining “exactly-once” processing with more than one stream (or concurrent batch jobs).
- Efficiently discovering which files are new when using files as the source for a stream.
When you load a Delta table as a stream source and use it in a streaming query, the query processes all of the data present in the table as well as any new data that arrives after the stream is started. You can load both paths and tables as a stream.
You can also control the maximum size of any micro-batch that Delta gives to streaming by setting the
maxFilesPerTrigger option. This specifies the maximum number of new files to be considered in every trigger. The default is 1000.
Structured Streaming does not handle input that is not an append and throws an exception if any modifications occur on the table being used as a source. There are two main strategies for dealing with changes that cannot be propagated downstream automatically:
- Since Delta tables retain all history by default, in many cases you can delete the output and checkpoint and restart the stream from the beginning.
- You can set either of these two options:
ignoreDeletesskips any transaction that deletes entire partitions from the source table. For example, you can set this option if you delete data from the source table that was previously kept for data retention purposes and you do not want these deletions to propagate downstream.
ignoreChangesskips any transaction that updates data in the source table. For example, you can set this option if you use
OVERWRITEand these changes do not need to be propagated downstream.
For example, suppose you have a table
user_events composed of
action that is partitioned by
date. You stream out of the
user_events table, but you need to delete data from it due to GDPR.
If you’re only deleting data older than 30 days, you can use:
events.readStream .option("ignoreDeletes", "true") .table("user_events")
However, if you have to delete data based on
user_email, then you will need to use:
events.readStream .option("ignoreChanges", "true") .table("user_events")
ignoreDeletes. Therefore if you use
ignoreChanges, your stream will not be disrupted by either deletions or updates to the source table.
You can also write data into a Delta table using Structured Streaming. The transaction log enables Delta to guarantee exactly-once processing, even when there are other streams or batch queries running concurrently against the table.
By default, streams run in append mode, which adds new records to the table. You can use the path or table method:
events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .start("/delta/events") // as a path
events.writeStream .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .table("events")
You can also use Structured Streaming to replace the entire table with every batch. One example use case is to compute a summary using aggregation:
spark.readStream .table("events") .groupBy("customerId") .count() .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg") .start("/delta/eventsByCustomer")
The preceding example continuously updates a table that contains the aggregate number of events by customer.
For applications with more lenient latency requirements, you can save computing resources with one-time triggers. Use these to update summary aggregation tables on a given schedule, processing only new data that has arrived since the last update.