Table Batch Reads and Writes

Databricks Delta supports most of the options provided by Spark SQL DataFrame read and write APIs for performing batch reads and writes on tables.

For information on Databricks Delta SQL commands, see SQL Guide.

Create a table

Databricks Delta supports creating tables in the metastore using standard DDL:

CREATE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING DELTA

When you create a table in the metastore using Databricks Delta, it creates a symlink-like pointer in the metastore to the transaction log and data that are stored on DBFS. This pointer makes it easier for other users to discover and refer to the data without having to worry about exactly where it is stored. However, the metastore is not the source of truth about what is valid in the table. That responsibility stays with Databricks Delta.

Use DataFrameWriter (Scala or Java/Python) to write data into Databricks Delta as an atomic operation. At a minimum you must specify the format delta:

df.write.format("delta").save("/delta/events")

Partition data

You can partition data to speed up queries or DML that have predicates involving the partition columns. To partition data when you create a Databricks Delta table, specify PARTITION BY columns. A common pattern is to partition by date, for example:

CREATE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING DELTA
PARTITIONED BY (date)
LOCATION '/delta/events'

You can also specify partition columns when writing a DataFrame to a new Databricks Delta table:

df.write.format("delta").partitionBy("date").save("/delta/events")

Control data location

To control the location of the table files, you can optionally specify the LOCATION as a path on DBFS.

Tables created with a specified LOCATION are considered unmanaged by the metastore. Unlike a managed table, where no path is specified, an unmanaged table’s files are not deleted when you DROP the table.

When you run CREATE TABLE with a LOCATION that already contains data stored using Databricks Delta, Databricks Delta does the following:

  • If you specify only the table name and location, for example:

    CREATE TABLE events
    USING DELTA
    LOCATION '/delta/events'
    

    the table in the Hive metastore automatically inherits the schema, partitioning, and table properties of the existing data. This functionality can be used to “import” data into the metastore.

  • If you specify any configuration (schema, partitioning, or table properties), Databricks Delta verifies that the specification exactly matches the configuration of the existing data.

    Important

    If the specified configuration does not exactly match the configuration of the data, Databricks Delta throws an exception that describes the discrepancy.

Read a table

You can load a Databricks Delta table as a DataFrame by specifying either its path:

Scala
spark.read.format("delta").load("/delta/events")
SQL
SELECT * FROM delta.`/delta/events`

or table name:

Scala
spark.table("events")
SQL
SELECT * FROM 'events'

The DataFrame returned automatically reads the most recent snapshot of the table for any query; you never need to run REFRESH TABLE. Databricks Delta automatically uses partitioning and statistics to read the minimum amount of data when there are applicable predicates in the query.

Query an older snapshot of a table (time travel)

Note

This feature is available with Databricks Runtime 5.3 and above.

Databricks Delta time travel allows you to query an older snapshot of a Databricks Delta table. Time travel has many use cases, including:

  • Re-creating analyses, reports, or outputs (for example, the output of a machine learning model). This could be useful for debugging or auditing, especially in regulated industries.
  • Writing complex temporal queries.
  • Fixing mistakes in your data.
  • Providing snapshot isolation for a set of queries for fast changing tables.

This section describes the supported methods for querying older versions of tables, data retention concerns, and provides examples.

Syntax

There are several ways to query an older version of a Databricks Delta table.

SQL AS OF syntax
SELECT * FROM events TIMESTAMP AS OF timestamp_expression
SELECT * FROM events VERSION AS OF version
  • timestamp_expression can be any one of:
    • '2018-10-18T22:15:12.013Z', that is, a string that can be cast to a timestamp
    • cast(‘2018-10-18 13:36:32 CEST’ as timestamp)
    • '2018-10-18', that is, a date string
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Any other expression that is or can be cast to a timestamp
  • version is a long value that can be obtained from the output of DESCRIBE HISTORY events.

Neither timestamp_expression nor version can be subqueries.

DataFrameReader options

DataFrameReader options allow you to create a DataFrame from a Databricks Delta table that is fixed to a specific version of the table.

df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

For timestamp_string, only date or timestamp strings are accepted. For example, "2019-01-01" and "2019-01-01'T'00:00:00.000Z".

A common pattern is to use the latest state of the Databricks Delta table throughout the execution of a Databricks job to update downstream applications.

Because Databricks Delta tables auto update, a DataFrame loaded from a Databricks Delta table may return different results across invocations if the underlying data is updated. By using time travel, you can fix the data returned by the DataFrame across invocations:

latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/delta/events`)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/delta/events")

# Every query that stems off df will use the same snapshot
@ syntax

You may have a parametrized pipeline, where the input path of your pipeline is a parameter of your job. After the execution of your job, you may want to reproduce the output some time in the future. In this case, you can use the @ syntax to specify the timestamp or version:

df1 = spark.read.format("delta").load("/delta/events@20190101000000000") # table on 2019-01-01 00:00:00.000
df2 = spark.read.format("delta").load("/delta/events@v123") # table on version 123
SELECT * FROM events@20190101000000000
SELECT * FROM events@v123

The timestamp must be in yyyyMMddHHmmssSSS format. You can obtain table versions by running DESCRIBE HISTORY <table> and specify a version after @ by prepending a v to the version. For example, to query version 123 for the table events, specify events@v123.

Data retention

By default, Databricks Delta tables keep a commit history of 30 days. This means that you can potentially specify a version from 30 days ago. However, there are some caveats:

  • All writers to the Databricks Delta table must be using Databricks Runtime 5.1 or above.
  • You must not have run VACUUM on your Databricks Delta table. If you have run VACUUM, then you may lose the ability to go back to a version older than the default 7 day data retention period.

You configure retention periods using the following table properties:

  • delta.logRetentionDuration = "interval <interval>": Configure how long you can go back in time. Default is interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": Configure how long stale data files are kept around before being deleted with VACUUM. Default is interval 1 week.

For full access to 30 days of historical data, set delta.deletedFileRetentionDuration = "interval 30 days" on your table. This setting may cause your storage costs to go up.

Examples

  • Fix accidental deletes to a table for the user 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Fix accidental incorrect updates to a table:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Query the number of new customers added over the last week.

    SELECT count(distinct userId) - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Write to a table

Append using DataFrames

Using append mode you can atomically add new data to an existing Databricks Delta table:

df.write.format("delta").mode("append").save("/delta/events")

Overwrite using DataFrames

To atomically replace all of the data in a table, you can use overwrite mode:

df.write.format("delta").mode("overwrite").save("/delta/events")

You can selectively overwrite only the data that matches predicates over partition columns. The following command atomically replaces the month of January with the data in df:

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
  .save("/delta/events")

This sample code writes out the data in df, validates that it all falls within the specified partitions, and performs an atomic replacement.

Note

Unlike the file APIs in Apache Spark, Databricks Delta remembers and enforces the schema of a table. This means that by default overwrites do not replace the schema of an existing table.

UPDATE

The UPDATE statement allows you to apply expressions to change the value of columns when a row matches a predicate. For example, you can use UPDATE to fix a spelling mistake in the eventType:

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'

Similar to delete, update operations automatically make use of the partitioning of the table when possible.

Upserts (MERGE INTO)

The MERGE INTO statement allows you to merge a set of updates and insertions into an existing dataset. For example, the following statement takes a stream of updates and merges it into the events table. When there is already an event present with the same eventId, Databricks Delta updates the data column using the given expression. When there is no matching event, Databricks Delta adds a new row.

Here’s a worked example:

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET
    events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

You must specify a value for every column in your table when you perform an INSERT (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.

Important

MERGE INTO requires that the update table is small. There are no requirements on the destination table size. If your workload does not satisfy this requirement, try using separate INSERT and UPDATE statements.

Tip

You should add as much information to the ON condition in MERGE INTO to both reduce the amount of work and reduce the chances of transaction conflicts. For example, suppose you have a table that is partitioned by country and date and you use MERGE to update information for the last day country by country. If you’re updating country='USA', then you can write a MERGE statement such as:

MERGE INTO target_table
USING source
ON target_table.user_id = source.user_id AND target_table.date = current_date() AND country = 'USA'
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

This lets you break a very large MERGE operation into smaller chunks and run them all in parallel to get better performance or meet SLAs.

DELETE FROM

Databricks Delta tables allow you to remove data that matches a predicate. For instance, to delete all events from before 2017, you can run the following DML:

DELETE FROM events WHERE date < '2017-01-01'

Delete operations automatically make use of the partitioning of the table when possible. This optimization means that it will be significantly faster to delete data based on partition predicates.

Schema validation

Databricks Delta automatically validates that the schema of the DataFrame being written is compatible with the schema of the table. Columns that are present in the table but not in the DataFrame are set to null. If there are extra columns in the DataFrame that are not present in the table, this operation throws an exception. Databricks Delta has DDL to explicitly add new columns explicitly and the ability to update the schema automatically.

If you specify other options, such as partitionBy, in combination with append mode, Databricks Delta validates that they match and throws an error for any mismatch. When partitionBy is not present, appends automatically follow the partitioning of the existing data.

Update table schema

Note

This feature requires Databricks Runtime 4.1 or above.

Databricks Delta lets you update the schema of a table. The following types of changes are supported:

  • Adding new columns (at arbitrary positions)
  • Reordering existing columns

You can make these changes explicitly using DDL or implicitly using DML.

Important

When you update a Databricks Delta table schema, streams that read from that table terminate. If you want the stream to continue you must restart it. For recommended methods, see Structured Streaming in Production.

Explicitly update schema

You can use the following DDL to explicitly change the schema of a table.

  • Add columns

    ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
    

    By default, nullability is true.

    To add a column to a nested field, use:

    ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
    

    Example

    If the schema before running ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) is:

    - root
    | - colA
    | - colB
    | +-field1
    | +-field2
    

    the schema after is:

    - root
    | - colA
    | - colB
    | +-field1
    | +-nested
    | +-field2
    

    Note

    Adding nested columns is supported only for structs. Arrays and maps are not supported.

  • Change column comment or ordering

    ALTER TABLE table_name CHANGE [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
    

    To change a column in a nested field, use:

    ALTER TABLE table_name CHANGE [COLUMN] col_name.nested_col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
    

    Example

    If the schema before running ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST is:

    - root
    | - colA
    | - colB
    | +-field1
    | +-field2
    

    the schema after is:

    - root
    | - colA
    | - colB
    | +-field2
    | +-field1
    
  • Replace columns

    ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
    

    Example

    When running the following DSL:

    ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
    

    if the schema before is:

    - root
    | - colA
    | - colB
    | +-field1
    | +-field2
    

    the schema after is:

    - root
    | - colC
    | - colB
    | +-field2
    | +-nested
    | +-field1
    | - colA
    
  • Change column type

    Changing a column’s type or dropping a column requires rewriting the table. Suppose you have a table with a column of type INT that you want to change to DOUBLE. The following sequence of steps illustrates how to do that using the diamonds dataset included in the Databricks datasets:

  1. Create a diamonds table from a CSV file with price as INT.

    CREATE TABLE diamonds(_c0 INT, carat DOUBLE, cut STRING, color STRING, clarity STRING, depth DOUBLE, table DOUBLE, price INT, x DOUBLE, y DOUBLE, z DOUBLE)
    USING CSV
    LOCATION '/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv'
    
  2. Create a DataFrame.

    %scala
    val df = spark.table("diamonds")
    
  3. Write out the DataFrame as Databricks Delta files.

    %scala
    df.write.format("delta").save("/delta/diamonds")
    
  4. Drop the diamonds table backed by CSV files.

    DROP TABLE diamonds
    
  5. Create a diamonds table backed by Databricks Delta files.

    CREATE TABLE diamonds
    USING delta
    LOCATION '/delta/diamonds/'
    
  6. Create a DataFrame where the price is a Double.

    %scala
    import org.apache.spark.sql.functions._
    val toDouble = udf[Double, Int]( _.toDouble)
    val df2 = df.withColumn("priceD", toDouble(df("price"))).drop("price").withColumnRenamed("priceD", "price").select("carat", "cut", "color", "clarity", "depth", "table", "price", "x", "y", "z")
    
  7. Drop the Databricks Delta table.

    DROP TABLE diamonds
    
  8. Remove the old Databricks Delta files.

    %fs rm -r dbfs:/delta/diamonds
    
  9. Write out the updated DataFrame as Databricks Delta files.

    %scala
    df2.write.format("delta").save("/delta/diamonds")
    
  10. Create a Databricks Delta table using the new files.

    CREATE TABLE diamonds
    USING delta
    LOCATION '/delta/diamonds/'
    

Automatic schema update

Databricks Delta can automatically update the schema of a table as part of a DML transaction (either appending or overwriting), and make the schema compatible with the data being written.

Add columns

Columns that are present in the DataFrame but missing from the table are automatically added as part of a write transaction when either of the following is true:

  • write or writeStream have .option("mergeSchema", "true")
  • spark.databricks.delta.schema.autoMerge is true

When both options are specified, the option from the DataFrameWriter takes precedence. The added columns are appended to the end of the struct they are present in. Case is preserved when appending a new column.

Note

  • mergeSchema is not supported when table access control is enabled (as it elevates a request that requires MODIFY to one that requires ALL PRIVILEGES).
  • mergeSchema cannot be used with INSERT INTO or .write.insertInto().

NullType columns

Columns that are NullType are dropped from the DataFrame when writing into Databricks Delta (because Parquet doesn’t support NullType), but are still stored in the schema. When a different data type is received for that column, Databricks Delta merges the schema to the new data type. If we receive a NullType for an existing column, we will keep the old schema, and drop the new column during the write.

NullType in streaming is not supported. Since you must set schemas when using streaming this should be very rare. NullType is also not accepted for complex types such as ArrayType and MapType.

Replace table schema

By default, overwriting the data in a table does not overwrite the schema. When overwriting a table using mode("overwrite") without replaceWhere, you may still want to override the schema of the data being written. You can choose to replace the schema and partitioning of the table by setting:

df.write.option("overwriteSchema", "true")

Views on tables

Databricks Delta supports the creation of views on top of Databricks Delta tables just like you might with a data source table. These views integrate with table access control to allow for column and row level security. The core challenge when you operate with views is resolving the schemas. If you alter a Databricks Delta table schema, you must recreate derivative views to account for any additions to the schema. For instance, if you add a new column to a Databricks Delta table, you must make sure that this column is available in the appropriate views built on top of that base table.

Table properties

You can store your own metadata as a table property using TBLPROPERTIES in CREATE and ALTER.

TBLPROPERTIES are stored as part of Databricks Delta‘s metadata. You cannot define new TBLPROPERTIES in a CREATE statement if a Databricks Delta table already exists in a given location. See table creation for more details.

In addition, to tailor behavior and performance, Databricks Delta supports certain Databricks Delta table properties:

  • Block deletes and modifications of a table: delta.appendOnly=true.
  • Configure the number of columns for which statistics are collected: delta.dataSkippingNumIndexedCols=<number-of-columns>. This property takes affect only for new data that is written out.
  • Configure the time travel retention properties: delta.logRetentionDuration=<interval-string> and delta.deletedFileRetentionDuration=<interval-string>

Note

These are the only delta.-prefixed table properties that Databricks Delta lets you set.

You can also set delta.-prefixed properties during the first commit to a Databricks Delta table using Spark configurations. For example, to initialize a Databricks Delta table with the property delta.appendOnly=true, set the Spark configuration spark.databricks.delta.properties.defaults.appendOnly = true.

Table metadata

Databricks Delta has rich features for exploring table metadata. It supports Show Partitions, Show Columns, Describe Table, and so on. It also provides the following unique commands:

DESCRIBE DETAIL

Provides information about schema, partitioning, table size, and so on. For example, you can see the current reader and writer versions of a table:

../_images/describe-detail.png
DESCRIBE HISTORY

Provides provenance information, including the operation, user, and so on, for each write to a table. This information is not recorded by versions of Databricks Runtime below 4.1 and tables created using these versions will show this information as null. Table history is retained for 30 days.

../_images/describe-history.png

The Data sidebar provides a visual view of this detailed table information and history for Databricks Delta tables. In addition to the table schema and sample data, you can click the History tab to see the table history that displays with DESCRIBE HISTORY.

For an example of the various Databricks Delta table metadata commands, see the end of the following notebook: