Introduction to Databricks Delta

Databricks Delta is a next-generation engine built on top of Apache Spark. Databricks Delta provides ACID transactions, optimized layouts and indexes, and execution engine improvements for building data pipelines to support big data use cases: batch and streaming ingests, fast interactive queries, and machine learning. Specifically, Databricks Delta offers:

  • ACID transactions: Serializable isolation levels ensure that readers never see inconsistent data.
  • Efficient upserts: Fine-grained updates easily handle late coming data and changing records.
  • High throughput streaming ingestion: Ingest high volume data directly into query tables.
  • Optimized data layout: Choose a data layout that suits your query patterns; Databricks Delta automatically manages the layout to reduce query latency.
  • Schema enforcement and evolution: Automatically handles schema variations to clean bad records during ingestion.
  • Data versioning and time travel: Automatically versions your data for easy rollback and lets you time travel to query earlier versions.
  • Execution engine optimizations: Optimizes operations with nested data types, higher order functions, range and data skew joins.

Requirements

Databricks Delta requires Databricks Runtime 4.1 or above. If you created a Databricks Delta table using a Databricks Runtime lower than 4.1, you must upgrade the table version. For details, see Table Versioning.

Quickstart

This quickstart provides an overview of the basics of working with Databricks Delta. It shows how to build a pipeline that reads JSON data into a Databricks Delta table, modify the table, read the table and display table history, and optimize the table.

To try out Databricks Delta, see Try Azure Databricks.

For runnable notebooks that demonstrate these features, see Introductory Notebooks.

Create a table

To create a Databricks Delta table, you can use existing Spark SQL code and change the format from parquet, csv, json, and so on, to delta.

For all file types, you read the files into a DataFrame and write out in delta format:

Python
events = spark.read.json("/databricks-datasets/structured-streaming/events/")
events.write.format("delta").save("/delta/events")
spark.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events/'")
SQL
CREATE TABLE events
USING delta
AS SELECT *
FROM json.`/data/events/`

These operations create a new managed table using the schema that was inferred from the JSON data. For the full set of options available when you create a new Databricks Delta table, see Create a table and Write to a table.

If your source files are in Parquet format, you can use the SQL Convert to |Delta| statement to convert files in place to create an unmanaged table:

CONVERT TO DELTA parquet.`/delta/events`

Partition data

To speed up queries that have predicates involving the partition columns, you can partition data. To partition data when you create a Databricks Delta table, specify PARTITION BY columns.

CREATE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING delta
PARTITIONED BY (date)

Modify a table

Databricks Delta supports a rich set of operations to modify tables.

Stream writes to a table

You can write data into a Databricks Delta table using Structured Streaming. The Databricks Delta transaction log guarantees exactly-once processing, even when there are other streams or batch queries running concurrently against the table. By default, streams run in append mode, which adds new records to the table.

from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

eventsDF = (
  spark
    .readStream
    .schema(jsonSchema) # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

eventsDF.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .table("events")

For more information about Databricks Delta integration with Structured Streaming, see Table Streaming Reads and Writes.

Batch upserts

To merge a set of updates and insertions into an existing table, you use the MERGE INTO statement. For example, the following statement takes a stream of updates and merges it into the events table. When there is already an event present with the same eventId, Databricks Delta updates the data column using the given expression. When there is no matching event, Databricks Delta adds a new row.

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET
    events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

You must specify a value for every column in your table when you perform an INSERT (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.

Read a table

You access data in Databricks Delta tables either by specifying the path on DBFS ("/delta/events") or the table name ("events"):

Scala
val events = spark.read.format("delta").load("/delta/events")

or

val events = spark.table("events")
SQL
SELECT * FROM delta.`/delta/events`

or

SELECT * FROM events

Display table history

To view the history of a table, use the DESCRIBE HISTORY statement, which provides provenance information, including the table version, operation, user, and so on, for each write to a table.

../_images/describe-history.png

Query an earlier version of the table (time travel)

To query an older version of a table, specify a version or timestamp in a SELECT statement. For example, to query version 0 from the history above, use:

SELECT * FROM events VERSION AS OF 0

or

SELECT * FROM events TIMESTAMP AS OF '2019-01-29 00:37:58'

Note

Because version 1 is at timestamp '2019-01-29 00:38:10', to query version 0 you can use any timestamp in the range '2019-01-29 00:37:58' to '2019-01-29 00:38:09' inclusive.

For details, see Query an older snapshot of a table (time travel).

Optimize a table

Once you have performed multiple changes to a table, you might have a lot of small files. To improve the speed of read queries, you can use OPTIMIZE to collapse small files into larger ones:

OPTIMIZE delta.`/delta/events`

or

OPTIMIZE events

Z-order by columns

To improve read performance further, you can co-locate related information in the same set of files by Z-Ordering. This co-locality is automatically used by Databricks Delta data-skipping algorithms to dramatically reduce the amount of data that needs to be read. To Z-Order data, you specify the columns to order on in the ZORDER BY clause. For example, to co-locate by eventType, run:

OPTIMIZE events
  ZORDER BY (eventType)

For the full set of options available when running OPTIMIZE, see Optimizing Performance with File Management.

Clean up snapshots

Databricks Delta provides snapshot isolation for reads, which means that it is safe to run OPTIMIZE even while other users or jobs are querying the table. Eventually however, you should clean up old snapshots. You can do this by running the VACUUM command:

VACUUM events

You control the age of the latest retained snapshot by using the RETAIN <N> HOURS option:

VACUUM events RETAIN 24 HOURS

For details on using VACUUM effectively, see Garbage collection.