Delta Lake supports several statements to facilitate deleting data from and updating data in Delta Lake tables.
You can remove data that matches a predicate from a Delta Lake table. For instance, to delete all events from before
2017, you can run the following:
DELETE FROM events WHERE date < '2017-01-01' DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'
Scala API is available in Databricks Runtime 6.0 and above.
import io.delta.tables._ val deltaTable = DeltaTable.forPath(spark, /data/events/) deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted string import org.apache.spark.sql.functions._ import spark.implicits._ deltaTable.delete($"date" < "2017-01-01") // predicate using Spark SQL functions and implicits
See Delta Lake API Reference for more details.
delete removes the data from the latest version of the Delta Lake table but does not remove it from the physical storage until the old versions are explicitly vacuumed. See vacuum for more details.
When possible, provide predicates on the partition columns for a partitioned Delta Lake table as such predicates can significantly speed up the operation.
You can update data that matches a predicate in a Delta Lake table. For example, to fix a spelling mistake in the
eventType, you can run the following:
UPDATE events SET eventType = 'click' WHERE eventType = 'clck' UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'
The Scala API is available in Databricks Runtime 6.0 and above.import io.delta.tables._ val deltaTable = DeltaTable.forPath(spark, /data/events/) deltaTable.updateExpr( // predicate and update expressions using SQL formatted string "eventType = 'clck'", Map("eventType" -> "'click'") import org.apache.spark.sql.functions._ import spark.implicits._ deltaTable.update( // predicate using Spark SQL functions and implicits $"eventType" = "clck"), Map("eventType" -> lit("click"));
See Delta Lake API Reference for more details.
Similar to delete, update operations can get a significant speedup with predicates on partitions.
You can upsert data from an Apache Spark DataFrame into a Delta Lake table using the
merge operation. This operation is similar to the SQL
MERGE command but has additional support for deletes and extra conditions in updates, inserts, and deletes.
Suppose you have a Spark DataFrame that contains new data for events with
eventId. Some of these events may already be present in the
To merge the new data into the
events table, you want to update the matching rows (that is,
eventId already present) and insert the new rows (that is,
eventId not present). You can run the following:
MERGE INTO events USING updates ON events.eventId = updates.eventId WHEN MATCHED THEN UPDATE SET events.data = updates.data WHEN NOT MATCHED THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
See the Merge Into (Delta Lake) command for details.
The Scala API is available in Databricks Runtime 6.0 and above.import io.delta.tables._ import org.apache.spark.sql.functions._ val updatesDF = ... // define the updates DataFrame[date, eventId, data] DeltaTable.forPath(spark, /data/events/) .as("events") .merge( updatesDF.as("updates"), "events.eventId = updates.eventId") .whenMatched .updateExpr( Map("data" -> "updates.data")) .whenNotMatched .insertExpr( Map( "date" -> "updates.date", "eventId" -> "updates.eventId", "data" -> "updates.data")) .execute()
Here is a detailed description of the
merge programmatic operation.
- There can be 1, 2, or 3
when[Matched | NotMatched]clauses. Of these, at most 2 can be
whenMatchedclauses, and at most 1 can be a
- Can have at most one
updateaction and one
- Can have an optional condition. However, if there are two clauses, then the first one must have a condition.
- When there are two clauses and there are conditions (or the lack of) such that a row matches both clauses, the first clause/action is executed. In other words, the order of the clauses matters.
- If none of the clauses matches a source-target row pair that satisfy the merge condition, the target rows are not updated.
- To update all the columns of the target Delta Lake table with the corresponding column of the source DataFrame, use
whenMatched(...).updateAll(). This is equivalent to
updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...)).
- Can have at most one
- Can have only the
insertaction, which can have an optional condition.
- If not present or if it is present but the non-matching source row does not satisfy the condition, the source row is not inserted.
- To insert all the columns of the target Delta Lake table with the corresponding column of the source Dataframe, use
whenMatched(...).insertAll(). This is equivalent to
insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...)).
- Can have only the
See the Delta Lake API Reference for details.
See Merge Into (Delta Lake) for a description of the
MERGE SQL command.
You should add as much information to the merge condition to both reduce the amount of work and the chance of transaction conflicts. For example, suppose you have a table that is partitioned by
date and you want to use
merge to update information for the last day country by country. Adding the condition
events.date = current_date() AND events.country = 'USA' will make the query faster as it will look for matches only in the relevant partitions.
Here are a few examples on how to use
merge in different scenarios.
A common ETL use case is to collect logs into Delta Lake table by appending them to a table. However, often the sources can generate duplicate log records and downstream deduplication steps are needed to take care of them. With
merge, you can avoid inserting the duplicate records.
MERGE INTO logs USING updates ON logs.uniqueId = updates.uniqueId WHEN NOT MATCHED THEN INSERT *
deltaTable .as("logs") .merge( updates.as("updates"), "logs.uniqueId = updates.uniqueId") .whenNotMatched() .insertAll() .execute()
Furthermore, if you know that you may get duplicate records only for a few days, you can optimized your query further by partitioning the table by date, and then specifying the date range of the target table to match on.
MERGE INTO logs USING updates ON logs.uniqueId = updates.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS WHEN NOT MATCHED AND updates.date > current_date() - INTERVAL 7 DAYS THEN INSERT *
deltaTable .as("logs") .merge( updates.as("updates"), "logs.uniqueId = updates.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") .whenNotMatched("updates.date > current_date() - INTERVAL 7 DAYS") .insertAll() .execute()
This will be more efficient than the previous command as it will looks for duplicates only in the last 7 days of logs, not the entire table.
Another common operation is SCD Type 2, which maintains history of all changes made to each key in a dimensional table. Such operations require updating existing rows to mark previous values of keys as old, and the inserting the new rows as the latest values. Given a source table with updates and the target table with the dimensional data, SCD Type 2 can be expressed with
Here is a concrete example of maintaining the history of addresses for a customer along with the active date range of each address. When a customer’s address needs to be updated, you have to mark the previous address as not the current one, update its active date range, and add the new address as the current one.
Similar to SCD, another common use case, often called change data capture (CDC), is to apply
all data changes generated from an external database into a Delta Lake table. In other words, a set
of updates, deletes, and inserts applied to an external table needs to be applied to a Delta Lake table.
You can do this using
merge as follows.
You can use a combination of
foreachBatch (see foreachbatch for more information) to write complex upserts from a streaming query into a Delta Lake table. For example:
- Write streaming aggregates in Update Mode: This is much more efficient than Complete Mode.
- Write a stream of database changes into a Delta Lake table: The earlier
mergequery for writing change data can be used in
foreachBatchto continuously apply a stream of changes to a Delta Lake table.
MERGE INTOin Python
foreachBatch()is supported in Databricks Runtime 5.5 and above.
- Make sure that your
MERGE INTOstatement inside
foreachBatchis idempotent as restarts of the streaming query can apply the operation on the same batch of data multiple times.
MERGE INTOis used in
foreachBatch, the input data rate of the streaming query (reported through
StreamingQueryProgressand visible in the notebook rate graph) may be reported as a multiple of the actual rate at which data is generated at the source. This is because
MERGEreads the input data multiple times causing the input metrics to be multiplied. If this is a bottleneck, you can cache the batch DataFrame before
MERGE INTOand then uncache it after