Examples

Write to Cassandra using foreachBatch() in Scala

streamingDF.writeStream.foreachBatch() allows you to reuse existing batch data writers to write the output of a streaming query to Cassandra. The following notebook shows this by using the Spark Cassandra connector from Scala to write the key-value output of an aggregation query to Cassandra. See the foreachBatch documentation for details.

To run this example, you need to install the appropriate Cassandra Spark connector for your Spark version as a Maven library.

In this example, we create a table, and then start a Structured Streaming query to write to that table. We then use foreachBatch() to write the streaming output using a batch DataFrame connector.

import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._

import com.datastax.spark.connector.cql.CassandraConnectorConf
import com.datastax.spark.connector.rdd.ReadConf
import com.datastax.spark.connector._


val host = "<ip address>"
val clusterName = "<cluster name>"
val keyspace = "<keyspace>"
val tableName = "<tableName>"

spark.setCassandraConf(clusterName, CassandraConnectorConf.ConnectionHostParam.option(host))
spark.readStream.format("rate").load()
  .selectExpr("value % 10 as key")
  .groupBy("key")
  .count()
  .toDF("key", "value")
  .writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>

    batchDF.write       // Use Cassandra batch data source to write streaming out
      .cassandraFormat(tableName, keyspace)
      .option("cluster", clusterName)
      .mode("append")
      .save()
  }
  .outputMode("update")
  .start()

Write to Azure SQL Data Warehouse using foreachBatch() in Python

streamingDF.writeStream.foreachBatch() allows you to reuse existing batch data writers to write the output of a streaming query to Azure SQL Data Warehouse. See the foreachBatch documentation for details.

To run this example, you need the Azure SQL Data Warehouse connector. For details on the Azure SQL Data Warehouse connector, see Azure SQL Data Warehouse.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehose(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehose)
    .outputMode("update")
    .start()
    )

Stream-Stream Joins

These two notebooks show how to use stream-stream joins in Python and Scala.

Stream-Stream Joins in Python

Stream-Stream Joins in Scala