Connecting to SQL Databases using JDBC

You can use Azure Databricks to query Microsoft SQL Server and Azure SQL Database tables using the JDBC drivers that come with Databricks Runtime 3.4 and above.

Azure SQL Database is a relational database-as-a service using Microsoft SQL Server. SQL Database is a high-performance, reliable, and secure database you can use to build data-driven applications and websites in the programming language of your choice, without needing to manage infrastructure.

This topic covers how to use the DataFrame API to connect to SQL databases using JDBC and how to control the parallelism of reads through the JDBC interface. This topic provides detailed examples using the Scala API, with abbreviated Python and Spark SQL examples at the end. For all of the supported arguments for connecting to SQL databases using JDBC, see the JDBC section of the Spark SQL programming guide.


For improved performance, you can instead use the Spark connector to connect to Microsoft SQL Server and Azure SQL Database.


The examples in this topic do not include usernames and passwords in JDBC URLs. Instead it expects that you follow the Secrets user guide to store your database credentials as secrets, and then leverage them in a notebook to populate your credentials in a java.util.Properties object. For example:

val jdbcUsername = dbutils.secrets.get(scope = "jdbc", key = "username")
val jdbcPassword = dbutils.secrets.get(scope = "jdbc", key = "password")

For a full example of secret management, see Secret Workflow Example.

Establish connectivity to SQL Server

This example queries SQL Server using its JDBC driver.

Step 1: Check that the JDBC driver is available


Step 2: Create the JDBC URL

val jdbcHostname = "<hostname>"
val jdbcPort = 1433
val jdbcDatabase = "<database>"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"

// Create a Properties() object to hold the parameters.
import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")

Step 3: Check connectivity to the SQLServer database

val driverClass = ""
connectionProperties.setProperty("Driver", driverClass)

Read data from JDBC

This section loads data from a database table. This uses a single JDBC connection to pull the table into the Spark environment. For parallel reads, see Manage parallelism.

val employees_table =, "employees", connectionProperties)

Spark automatically reads the schema from the database table and maps its types back to Spark SQL types.


You can run queries against this JDBC table:

display("age", "salary").groupBy("age").avg("salary"))

Write data to JDBC

This section shows how to write data to a database from an existing Spark SQL table named diamonds.

%sql -- quick test that this test table exists
select * from diamonds limit 5

The following code saves the data into a database table named diamonds. Using column names that are reserved keywords can trigger an exception. The example table has column named table, so you can rename it with withColumnRenamed() prior to pushing it to the JDBC API.

spark.table("diamonds").withColumnRenamed("table", "table_number")
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

Spark automatically creates a database table with the appropriate schema determined from the DataFrame schema.

The default behavior is to create a new table and to throw an error message if a table with the same name already exists. You can use the Spark SQL SaveMode feature to change this behavior. For example, here’s how to append more rows to the table:

import org.apache.spark.sql.SaveMode

spark.sql("select * from diamonds limit 10").withColumnRenamed("table", "table_number")
     .mode(SaveMode.Append) // <--- Append to the existing table
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

You can also overwrite an existing table:

spark.table("diamonds").withColumnRenamed("table", "table_number")
     .mode(SaveMode.Overwrite) // <--- Overwrite the existing table
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

Push down a query to the database engine

You can push down an entire query to the database and return just the result. The table parameter identifies the JDBC table to read. You can use anything that is valid in a SQL query FROM clause.

// Note: The parentheses are required.
val pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
val df =, table=pushdown_query, properties=connectionProperties)

Push down optimization

In addition to ingesting an entire table, you can push down a query to the database to leverage it for processing, and return only the results.

// Explain plan with no column selection returns all columns, "diamonds", connectionProperties).explain(true)

You can prune columns and pushdown query predicates to the database with DataFrame methods.

// Explain plan with column selection will prune columns and just return the ones specified
// Notice that only the 3 specified columns are in the explain plan, "diamonds", connectionProperties).select("carat", "cut", "price").explain(true)
// You can push query predicates down too
// Notice the filter at the top of the physical plan, "diamonds", connectionProperties).select("carat", "cut", "price").where("cut = 'Good'").explain(true)

Manage parallelism

In the Spark UI, you can see that the numPartitions dictate the number of tasks that are launched. Each task is spread across the executors, which can increase the parallelism of the reads and writes through the JDBC interface. See the Spark SQL programming guide for other parameters, such as fetchsize, that can help with performance.

JDBC reads

You can provide split boundaries based on the dataset’s column values.

These options specify the parallelism on read. These options must all be specified if any of them is specified.


These options specify the parallelism of the table read. lowerBound and upperBound decide the partition stride, but do not filter the rows in table. Therefore, Spark partitions and returns all rows in the table.

You can split the table read across executors on the emp_no column using the partitionColumn, lowerBound, upperBound, and numPartitions parameters.

val df = (,

JDBC writes

Spark’s partitions dictate the number of connections used to push data through the JDBC API. You can control the parallelism by calling coalesce(<N>) or repartition(<N>) depending on the existing number of partitions. Call coalesce when reducing the number of partitions, and repartition when increasing the number of partitions.

import org.apache.spark.sql.SaveMode

val df = spark.table("diamonds")

// Given the number of partitions above, you can reduce the partition value by calling coalesce() or increase it by calling repartition() to manage the number of connections.
df.repartition(10).write.mode(SaveMode.Append).jdbc(jdbcUrl, "diamonds", connectionProperties)

Python example

The following Python examples cover some of the same tasks as those provided for Scala.

Create the JDBC URL

jdbcHostname = "<hostname>"
jdbcDatabase = "employees"
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, username, password)

You can pass in a dictionary that contains the credentials and driver class similar to the Scala example above.

jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : ""

Push down a query to the database engine

pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df =, table=pushdown_query, properties=connectionProperties)

Read from JDBC connections across multiple workers

df =, table="employees", column="emp_no", lowerBound=1, upperBound=100000, numPartitions=100)

Spark SQL example

You can define a Spark SQL table or view that uses a JDBC connection. For details, see Create Table and Create View.

USING org.apache.spark.sql.jdbc
  url "jdbc:<databaseServerType>://<jdbcHostname>:<jdbcPort>",
  table "<jdbcDatabase>.atable",
  user "<jdbcUsername>",
  password "<jdbcPassword>"

Append data into the database table using Spark SQL:

INSERT INTO diamonds
SELECT * FROM diamonds LIMIT 10 -- append 10 records to the table
SELECT count(*) record_count FROM diamonds --count increased by 10

Overwrite data in the database table using Spark SQL. This causes the database to drop and create the diamonds table:

SELECT carat, cut, color, clarity, depth, TABLE AS table_number, price, x, y, z FROM diamonds
SELECT count(*) record_count FROM diamonds --count returned to original value (10 less)