SparkR Overview

SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. Starting with Spark 1.5.1, SparkR provides a distributed DataFrame implementation that supports operations such as selection, filtering, and aggregation (similar to R data frames and dplyr) but on large datasets. SparkR also supports distributed machine learning using MLlib.

Note

Azure Databricks also supports sparklyr.

SparkR in Azure Databricks notebooks

SparkR started as a research project at AMPLab. With the release of Spark 1.4.0, SparkR was inlined in Apache Spark. At that time Databricks released R notebooks, becoming the first company officially to support SparkR. To facilitate the usage of SparkR, Databricks R notebooks imported SparkR by default and provided a working sqlContext object.

SparkR and Databricks R notebooks have evolved significantly since 2015. For the best experience, Databricks recommends that you use the latest version of Spark on Databricks when you use either R or SparkR. Some of the most notable changes in R and SparkR since Spark 1.6 include:

  • Starting with Spark 2.0, you do not need to explicitly pass a sqlContext object to every function call. This change reduced boilerplate code and made SparkR user code more intuitive and readable. In this document we use the new syntax. For old syntax examples, see SparkR 1.6 Overview.
  • Starting with Spark 2.2, Databricks notebooks no longer imports SparkR by default. Some SparkR functions were conflicting with similarly named functions from other popular packages. To use SparkR you can call library(SparkR) in your notebooks. The SparkR session is already configured, and all SparkR functions will talk to your attached cluster using the existing session.

Creating SparkR DataFrames

You can create a DataFrame from a local R data.frame, from a data source, or using a Spark SQL query.

From a local R data.frame

The simplest way to create a DataFrame is to convert a local R data.frame into a SparkDataFrame. Specifically we can use createDataFrame and pass in the local R data.frame to create a SparkDataFrame. Like most other SparkR functions, createDataFrame syntax changed in Spark 2.0. You can see examples of this in the code snippet bellow. Refer to createDataFrame for more examples.

library(SparkR)
df <- createDataFrame(faithful)

# Displays the content of the DataFrame to stdout
head(df)

Using the data source API

The general method for creating a DataFrame from a data source is read.df. This method takes the path for the file to load and the type of data source. SparkR supports reading CSV, JSON, text, and Parquet files natively. Through Spark Packages you can find data source connectors for popular file formats such as Avro.

library(SparkR)
diamondsDF <- read.df("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
                    source = "csv", header="true", inferSchema = "true")
head(diamondsDF)

SparkR automatically infers the schema from the CSV file.

printSchema(diamondsDF)
display(diamondsDF)

Adding a data source connector with Spark Packages

As an example, we will use the spark-avro package to load an Avro file. The availability of the spark-avro package depends on your cluster’s image version. See Avro Files.

First we take an existing data.frame, convert to a Spark DataFrame, and save it as an Avro file.

require(SparkR)
irisDF <- createDataFrame(iris)
write.df(irisDF, source = "com.databricks.spark.avro", path = "dbfs:/tmp/iris.avro", mode = "overwrite")

To verify that we saved an Avro file:

%fs ls /tmp/iris

Now we use the spark-avro package again to read back the data.

irisDF2 <- read.df(path = "/tmp/iris.avro", source = "com.databricks.spark.avro")
head(irisDF2)

The data sources API can also be used to save DataFrames into multiple file formats. For example we can save the DataFrame from the previous example to a Parquet file using write.df

write.df(irisDF2, path="dbfs:/tmp/iris.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

From a Spark SQL query

You can also create SparkR DataFrames using Spark SQL queries.

# Register earlier df as temp table
registerTempTable(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql("SELECT age FROM peopleTemp")
head(age)

Note that age is a SparkDataFrame.

# Resulting df is a SparkDataFrame
str(age)

DataFrame operations

SparkDataFrames support a number of functions to do structured data processing. Here we include some basic examples and a complete list can be found in the API docs.

Selecting rows and columns

# Import SparkR package if this is a new notebook
require(SparkR)

# Create DataFrame
df <- createDataFrame(faithful)
df
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

Grouping and aggregation

SparkDataFrames support a number of commonly used functions to aggregate data after grouping. For example we can count the number of times each waiting time appears in the faithful dataset.

head(count(groupBy(df, df$waiting)))
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Column operations

SparkR provides a number of functions that can be directly applied to columns for data processing and aggregation. The example below shows the use of basic arithmetic functions.

# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Machine learning

SparkR exposes most of MLLib algorithms. Under the hood, SparkR uses MLlib to train the model.

The example below shows the use of building a gaussian GLM model using SparkR. To run Linear Regression, set family to “gaussian”. To run Logistic Regression, set family to “binomial”. When using SparkML GLM SparkR automatically performs one-hot encoding of categorical features so that it does not need to be done manually. Beyond String and Double type features, it is also possible to fit over MLlib Vector features, for compatibility with other MLlib components.

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)