Connecting to SQL Databases using JDBC

You can use Azure Databricks to query Microsoft SQL Server and Azure SQL Database tables using the JDBC drivers that come with Databricks Runtime 3.4 and above.

Azure SQL Database is a relational database-as-a service using Microsoft SQL Server. SQL Database is a high-performance, reliable, and secure database you can use to build data-driven applications and websites in the programming language of your choice, without needing to manage infrastructure.

This topic covers how to use the DataFrame API to connect to SQL databases using JDBC and how to control the parallelism of reads through the JDBC interface. This topic provides detailed examples using the Scala API, with abbreviated Python and Spark SQL examples at the end. For all of the supported arguments for connecting to SQL databases using JDBC, see the JDBC section of the Spark SQL programming guide.

Important

The examples in this topic do not include usernames and passwords in JDBC URLs. Instead it expects that you follow the Secrets user guide to store your database credentials as secrets, and then leverage them in a notebook to populate your credentials in a java.util.Properties object. For example:

val jdbcUsername = dbutils.secrets.get(scope = "jdbc", key = "username")
val jdbcPassword = dbutils.secrets.get(scope = "jdbc", key = "password")

See Secret Workflow Example for a full example of leveraging secret management.

Establish connectivity to SQL Server

In this example, we query SQL Server using its JDBC driver.

Step 1: Check that the JDBC driver is available

Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")

Step 2: Create the JDBC URL

val jdbcHostname = "<hostname>"
val jdbcPort = 1433
val jdbcDatabase = "<database>"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"

// Create a Properties() object to hold the parameters.
import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")

Step 3: Check connectivity to the SQLServer database

val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)

Read data from JDBC

In this section, we load data from a database table. This uses a single JDBC connection to pull the table into the Spark environment. We discuss parallel reads in Manage parallelism.

val employees_table = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)

Spark automatically reads the schema from the database table and maps its types back to Spark SQL types.

employees_table.printSchema

We can run queries against this JDBC table:

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Write data to JDBC

In this section, we show how to write data to a database from an existing Spark SQL table named diamonds.

%sql -- quick test that this test table exists
select * from diamonds limit 5

The following code saves the data into a database table named diamonds. Using column names that are reserved keywords can trigger an exception. Our example table has column named table, so we rename it with withColumnRenamed() prior to pushing it to the JDBC API.

spark.table("diamonds").withColumnRenamed("table", "table_number")
     .write
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

Spark automatically creates a database table with the appropriate schema determined from the DataFrame schema.

The default behavior is to create a new table and to throw an error message if a table with the same name already exists. You can use the Spark SQL SaveMode feature to change this behavior. For example, let’s append more rows to our table:

import org.apache.spark.sql.SaveMode

spark.sql("select * from diamonds limit 10").withColumnRenamed("table", "table_number")
     .write
     .mode(SaveMode.Append) // <--- Append to the existing table
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

You can also overwrite an existing table:

spark.table("diamonds").withColumnRenamed("table", "table_number")
     .write
     .mode(SaveMode.Overwrite) // <--- Overwrite the existing table
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

Push down a query to the database engine

You can push down an entire query to the database and return just the result. The table parameter identifies the JDBC table to read. You can use anything that is valid in a SQL query FROM clause.

// Note: The parentheses are required.
val pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

Push down optimization

In addition to ingesting an entire table, you can push down a query to the database to leverage it for processing and return only the results.

// Explain plan with no column selection returns all columns
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).explain(true)

You can prune columns and pushdown query predicates to the database with DataFrame methods.

// Explain plan with column selection will prune columns and just return the ones specified
// Notice that only the 3 specified columns are in the explain plan
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).select("carat", "cut", "price").explain(true)
// You can push query predicates down too
// Notice the Filter at the top of the Physical Plan
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).select("carat", "cut", "price").where("cut = 'Good'").explain(true)

Manage parallelism

In the Spark UI, you can see that the numPartitions dictate the number of tasks that are launched. Each task is spread across the executors, which can increase the parallelism of the reads and writes through the JDBC interface. See the Spark SQL programming guide for other parameters, such as fetchsize, that can help with performance.

JDBC reads

You can provide split boundaries based on the dataset’s column values.

These options specify the parallelism on read. These options must all be specified if any of them is specified.

Note

These options specify the parallelism of the table read. lowerBound and upperBound decide the partition stride, but do not filter the rows in table. Therefore, Spark partitions and returns all rows in the table.

We split the table read across executors on the emp_no column using the partitionColumn, lowerBound, upperBound, and numPartitions parameters.

val df = (spark.read.jdbc(url=jdbcUrl,
    table="employees",
    columnName="emp_no",
    lowerBound=1L,
    upperBound=100000L,
    numPartitions=100,
    connectionProperties=connectionProperties))
display(df)

JDBC writes

Spark’s partitions dictate the number of connections used to push data through the JDBC API. You can control the parallelism by calling coalesce(<N>) or repartition(<N>) depending on the existing number of partitions. Call coalesce when reducing the number of partitions, and repartition when increasing the number of partitions.

import org.apache.spark.sql.SaveMode

val df = spark.table("diamonds")
println(df.rdd.partitions.length)

// Given the number of partitions above, you can reduce the partition value by calling coalesce() or increase it by calling repartition() to manage the number of connections.
df.repartition(10).write.mode(SaveMode.Append).jdbc(jdbcUrl, "diamonds", connectionProperties)

Python example

The following Python examples cover some of the same tasks that we provided for Scala.

Create the JDBC URL

jdbcHostname = "<hostname>"
jdbcDatabase = "employees"
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, username, password)

You can pass in a dictionary that contains the credentials and driver class similar to the Scala example above.

jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

Push down a query to the database engine

pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

Read from JDBC connections across multiple workers

df = spark.read.jdbc(url=jdbcUrl, table='employees', column='emp_no', lowerBound=1, upperBound=100000, numPartitions=100)
display(df)

Spark SQL example

You can define a Spark SQL table or view that uses a JDBC connection. For details, see Create Table and Create View.

%sql
CREATE TABLE jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:<databaseServerType>://<jdbcHostname>:<jdbcPort>",
  table "<jdbcDatabase>.atable",
  user "<jdbcUsername>",
  password "<jdbcPassword>"
)

Append data into the database table using Spark SQL:

%sql
INSERT INTO diamonds
SELECT * FROM diamonds LIMIT 10 -- append 10 records to the table
%sql
SELECT count(*) record_count FROM diamonds --count increased by 10

Overwrite data in the database table using Spark SQL. This causes the database to drop and create the diamonds table:

%sql
INSERT OVERWRITE TABLE diamonds
SELECT carat, cut, color, clarity, depth, TABLE AS table_number, price, x, y, z FROM diamonds
%sql
SELECT count(*) record_count FROM diamonds --count returned to original value (10 less)