Connecting to Microsoft SQL Server and Azure SQL Database with the Spark Connector

The Spark connector for Microsoft SQL Server and Azure SQL Database enables Microsoft SQL Server and Azure SQL Database to act as input data sources and output data sinks for Spark jobs. It allows you to use real- time transactional data in big data analytics and persist results for ad-hoc queries or reporting.

Compared to the built-in Spark connector, this connector provides the ability to bulk insert data into SQL databases. It can outperform row-by-row insertion with 10x to 20x faster performance. The Spark connector for SQL Server and Azure SQL Database also supports Azure Active Directory (AAD) authentication. It allows you to securely connect to your Azure SQL databases from Azure Databricks using your AAD account. It provides interfaces that are similar to the built-in JDBC connector. It is easy to migrate your existing Spark jobs to use this connector.

Requirements

Component Versions Supported
Apache Spark 2.0.2 and above
Scala 2.10 and above
Microsoft JDBC Driver for SQL Server 6.2 and above
Microsoft SQL Server SQL Server 2008 and above
Azure SQL Database Supported

Create library

  1. Create an Azure Databricks library for the Spark connector as a Maven library. Use the coordinate: com.microsoft.azure:azure-sqldb-spark:1.0.2.
  2. Attach the library to the cluster that will access the database.

Connect to Spark using this library

This connector uses the Microsoft SQLServer JDBC driver to fetch data from and write data to the Azure SQL Database. Results are of the DataFrame type.

All connection properties in Microsoft JDBC Driver for SQL Server are supported in this connector. Add connection properties as fields in the com.microsoft.azure.sqldb.spark.config.Config object.

Read from Azure SQL Database or SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "dbTable"        -> "dbo.Clients"
  "user"           -> "username",
  "password"       -> "xxxxxxxx",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

val collection = spark.read.sqlDB(config)
collection.show()

Write to Azure SQL Database or SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Aquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "dbTable"      -> "dbo.Clients"
  "user"         -> "username",
  "password"     -> "xxxxxxxx"
))

import org.apache.spark.sql.SaveMode

collection.write.mode(SaveMode.Append).sqlDB(config)

Pushdown query to Azure SQL Database or SQL Server

For SELECT queries with expected return results, use Read from Azure SQL Database or SQL Server.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
              |UPDATE Customers
              |SET ContactName = 'Alfred Schmidt', City= 'Frankfurt'
              |WHERE CustomerID = 1;
            """.stripMargin

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "user"         -> "username",
  "password"     -> "xxxxxxxx",
  "queryCustom"  -> query
))

spark.azurePushdownQuery(config)

Bulk copy to Azure SQL Database or SQL Server

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._


//  Add column Metadata.
//  If not specified, metadata will be automatically added
// from the destination table, which may suffer performance.

var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "xxxxxxxx",
  "databaseName"      -> "MyDatabase",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.