How To Parallelize R Code with gapply

Parallelization of R code is difficult, because R code runs on the driver and R data.frames are not distributed. Often, there is existing R code that is run locally and that is converted to run on Apache Spark. In other cases, some SparkR functions used for advanced statistical analysis and machine learning techniques may not support distributed computing. In such cases, the SparkR UDF API can be used to distribute the desired workload across a cluster.

Example use case: You want to train a machine learning model on subsets of a data set, grouped by a key. If the subsets of the data fit on the workers, it may be more efficient to use the SparkR UDF API to train multiple models at once.

The gapply and gapplyCollect functions apply a function to each group in a Spark DataFrame. For each group in a Spark DataFrame:

  1. Collect each group as an R data.frame.
  2. Send the function to the worker and execute.
  3. Return the result to the driver as specified by the schema.

Note

When calling gapply, the output schema has to be specified. With gapplyCollect, the result is collected to the driver using an R data.frame for the output.

In the following example, a separate support vector machine model is fit on the airquality data for each month. The output is a data.frame with the resulting MSE for each month, shown both with and without specifying the schema.

df <- createDataFrame(na.omit(airquality))

schema <- structType(
structField("Month", "MSE"),
structField("integer", "Number"))

result <- gapply(df, c("Month"), function(key, x) {
library(e1071)
data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
}, schema)
df <- createDataFrame(na.omit(airquality))

gapplyCollect(df, c("Month"), function(key, x) {
library(e1071)
y <- data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
names(y) <- c("Month", "MSE")
y
})

Note

Start with a Spark DataFrame and install packages on all workers.