Delta Lake Quickstart

Create a table

To create a Delta Lake table, you can use existing Spark SQL code and change the format from parquet, csv, json, and so on, to delta.

For all file types, you read the files into a DataFrame and write out in delta format:

Python
events = spark.read.json("/databricks-datasets/structured-streaming/events/")
events.write.format("delta").save("/delta/events")
spark.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events/'")
SQL
CREATE TABLE events
USING delta
AS SELECT *
FROM json.`/data/events/`

These operations create a new managed table using the schema that was inferred from the JSON data. For the full set of options available when you create a new Delta Lake table, see Create a table and Write to a table.

If your source files are in Parquet format, you can use the SQL Convert to Delta statement to convert files in place to create an unmanaged table:

CONVERT TO DELTA parquet.`/delta/events`

Partition data

To speed up queries that have predicates involving the partition columns, you can partition data.

Python
events = spark.read.json("/databricks-datasets/structured-streaming/events/")
events.write.partitionBy("date").format("delta").save("/delta/events")
spark.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events/'")

To partition data when you create a Delta Lake table using SQL, specify PARTITION BY columns.

SQL
CREATE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING delta
PARTITIONED BY (date)

Modify a table

Delta Lake supports a rich set of operations to modify tables.

Stream writes to a table

You can write data into a Delta Lake table using Structured Streaming. The Delta Lake transaction log guarantees exactly-once processing, even when there are other streams or batch queries running concurrently against the table. By default, streams run in append mode, which adds new records to the table.

from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

eventsDF = (
  spark
    .readStream
    .schema(jsonSchema) # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

eventsDF.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .table("events")

For more information about Delta Lake integration with Structured Streaming, see Table Streaming Reads and Writes.

Batch upserts

To merge a set of updates and insertions into an existing table, you use the MERGE INTO statement. For example, the following statement takes a stream of updates and merges it into the events table. When there is already an event present with the same eventId, Delta Lake updates the data column using the given expression. When there is no matching event, Delta Lake adds a new row.

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET
    events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

You must specify a value for every column in your table when you perform an INSERT (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.

Read a table

You access data in Delta Lake tables either by specifying the path on DBFS ("/delta/events") or the table name ("events"):

Scala
val events = spark.read.format("delta").load("/delta/events")

or

val events = spark.table("events")
SQL
SELECT * FROM delta.`/delta/events`

or

SELECT * FROM events

Display table history

To view the history of a table, use the DESCRIBE HISTORY statement, which provides provenance information, including the table version, operation, user, and so on, for each write to a table.

../_images/describe-history.png

Query an earlier version of the table (time travel)

Delta Lake time travel allows you to query an older snapshot of a Delta Lake table.

For timestamp_string, only date or timestamp strings are accepted. For example, "2019-01-01" and "2019-01-01'T'00:00:00.000Z".

To query an older version of a table, specify a version or timestamp in a SELECT statement. For example, to query version 0 from the history above, use:

SELECT * FROM events VERSION AS OF 0

or

SELECT * FROM events TIMESTAMP AS OF '2019-01-29 00:37:58'

Note

Because version 1 is at timestamp '2019-01-29 00:38:10', to query version 0 you can use any timestamp in the range '2019-01-29 00:37:58' to '2019-01-29 00:38:09' inclusive.

DataFrameReader options allow you to create a DataFrame from a Delta Lake table that is fixed to a specific version of the table.

df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

For details, see Query an older snapshot of a table (time travel).

Optimize a table

Once you have performed multiple changes to a table, you might have a lot of small files. To improve the speed of read queries, you can use OPTIMIZE to collapse small files into larger ones:

OPTIMIZE delta.`/delta/events`

or

OPTIMIZE events

Z-order by columns

To improve read performance further, you can co-locate related information in the same set of files by Z-Ordering. This co-locality is automatically used by Delta Lake data-skipping algorithms to dramatically reduce the amount of data that needs to be read. To Z-Order data, you specify the columns to order on in the ZORDER BY clause. For example, to co-locate by eventType, run:

OPTIMIZE events
  ZORDER BY (eventType)

For the full set of options available when running OPTIMIZE, see Optimizing Performance with File Management.

Clean up snapshots

Delta Lake provides snapshot isolation for reads, which means that it is safe to run OPTIMIZE even while other users or jobs are querying the table. Eventually however, you should clean up old snapshots. You can do this by running the VACUUM command:

VACUUM events

You control the age of the latest retained snapshot by using the RETAIN <N> HOURS option:

VACUUM events RETAIN 24 HOURS

For details on using VACUUM effectively, see Garbage collection.