Merge Into (Databricks Delta)

Merge a set of updates, insertions, and deletions based on a source table into a target Databricks Delta table.

Note

This syntax is available in Databricks Runtime 5.1 and above.

MERGE INTO [db_name.]target_table [AS target_alias]
USING [db_name.]source_table [AS source_alias]
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]

where

<matched_action>  =
  DELETE  |
  UPDATE SET *  |
  UPDATE SET column1 = value1 [, column2 = value2 ...]

<not_matched_action>  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
  • There can be 1, 2, or 3 WHEN clauses. Of these, at most 2 can be WHEN MATCHED clauses, and at most 1 can be WHEN NOT MATCHED clause. WHEN NOT MATCHED must be the last clause.
  • WHEN MATCHED clauses:
    • There can be at most one UPDATE action and one DELETE action in WHEN MATCHED clauses.
    • Each MATCHED clause can have an optional condition. However, if there are two MATCHED clauses, then the first one must have a condition.
    • When there are two MATCHED clauses and there are conditions (or the lack of) such that a row matches both clauses, then the first clause/action is executed. In other words, the order of the MATCHED clauses matter.
    • If none of the WHEN MATCHED clauses match a source-target row pair that satisfy the merge_condition, then the target rows will not be updated.
    • If UPDATE SET * is present, then it automatically expands to UPDATE SET column1 = source_table.column1 [, column2 = source_table.column2 ...] where column1, column2, etc. are the names of columns present in the target table being updated. Therefore, this syntax assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error.
  • WHEN NOT MATCHED clause:
    • This clause can have only the INSERT action, which can have an optional condition.
    • If the NOT MATCHED clause is not present or if it is present but the non-matching source row does not satisfy the condition, then the source row is not inserted.
    • If INSERT * is present, then it automatically expands to INSERT SET (column1 [, column2 ...]) VALUES (source_table.value1 [, source_table.value2 ...]) where column1, column2, etc. are the names of columns present in the target table being updated. Therefore, this syntax assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error.

Note

Prior to Databricks Runtime 5.1, only the basic syntax of MERGE was supported:

MERGE INTO [db_name.]target_table [AS target_alias]
USING [db_name.]source_table [AS source_alias]
ON <merge_condition>
WHEN MATCHED THEN
  UPDATE SET column1 = value1 [, column2 = value2 ...]
WHEN NOT MATCHED THEN
  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

Note

Prior to Databricks Runtime 5.0, MERGE required that the update table is small. If your workload does not satisfy this requirement, then either try MERGE in Databricks Runtime 5.0 and above, or use separate INSERT and UPDATE statements.

Examples

Here are a few examples on how to use MERGE in different scenarios.

Simple merging of new data based on a key

The following statement takes a set of updates containing updated customer addresses and merges it into the customers table of customer data. If customer is already present (based on the customerId), its address is updated using the new address from updates. Otherwise, the new customerId and address is inserted.

MERGE INTO customers
USING updates
ON customers.customerId = source.customerId
WHEN MATCHED THEN
  UPDATE SET address = updates.address
WHEN NOT MATCHED
  THEN INSERT (customerId, address) VALUES (updates.customerId, updates.address)

Deduplicating data during ETL

Note

Supported in clusters running Databricks Runtime 5.1 and above.

A common ETL use case is to collect logs into Databricks Delta table by appending them to a table. However, often the sources can generate duplicate log records and downstream deduplication steps are needed to take care of them. With MERGE, you can deduplicate while inserting the records.

MERGE INTO logs
USING updates
ON logs.uniqueId = updates.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Furthermore, if you know that you may get duplicate records only for a few days, then you can optimized your query further by partitioning the table by date, and then specifying the date range of the target table to match on.

MERGE INTO logs
USING updates
ON logs.uniqueId = updates.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND updates.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

This will be more efficient that the previous command as it will looks for duplicates only in the last 7 days of logs, not the entire table.

Slowly Changing Data (SCD) Type 2

Note

Supported in clusters running Databricks Runtime 5.1 and above.

The first simple example is a case of slowly changing dimensions (SCD) where values of the table changes slowly over time (for example, customer addresses do not change frequently). Specifically, the earlier example was of a SCD Type 1 query where only the latest value of a key is maintained. Another common one is SCD Type 2 which maintains history of all changes made to each key in a dimensional table. Such operations require updating existing rows to mark previous values of keys as old, and the inserting the new rows as the latest values. Given a source table with updates and the target table with the dimensional data, SCD Type 2 can be expressed with MERGE as follows.

Writing Change Data into Databricks Delta

Note

Supported in clusters running Databricks Runtime 5.1 and above.

Similar to SCD, another common use case, often called Change Data Capture (CDC), is to apply all data changes generated from an external database into a Databricks Delta table. In other words, a set of updates, deletes, and inserts applied to an external table needs to applied to a Databricks Delta table. This can be done using the extended MERGE syntax as follow.

Writing change data using MERGE

Using MERGE in streaming queries using foreachBatch

(Suggested Databricks Runtime 5.0 and above)

You can use a combination of foreachBatch and MERGE to write complex updates from streaming query into a Databricks Delta table. For example:

  • Writing streaming aggregates in Update Mode: This is much more efficient than Complete Mode. See notebook below.
  • Writing stream of database changes into a Databricks Delta table: The earlier MERGE query for writing change data can be used in foreachBatch to continuously apply a stream of changes to a Databricks Delta table.

Note

  • Make sure that your MERGE statement inside foreachBatch is idempotent as reprocessing of the streaming query can apply the operation on the same batch of data multiple times.
  • When MERGE is used in foreachBatch, the input data rate of the streaming query (reported through StreamingQueryProgress and visible in the notebook rate graph) may be reported as a multiple of the actual rate at which data is generated at the source. This is because MERGE reads the input data multiple times causing the input metrics to be multiplied. If this is a bottleneck, you can cache the batch DataFrame before MERGE and then uncache it after MERGE.

However, in all cases of using MERGE in a streaming query, make sure that your MERGE statement inside foreachBatch is idempotent as reprocessing of the streaming query can apply the operation on the same batch of data multiple times.

Writing streaming aggregates in update mode using MERGE and foreachBatch