To help you use single-node tools on massive data sets, Databricks Runtime HLS includes a utility called the Pipe Transformer to process Spark DataFrames with command-line programs.
The Pipe Transformer is currently in Beta. In particular, the name of the transformer entry point
db_genomics in Python and
DBGenomics in Java/Scala) will change.
The Pipe Transformer requires Python 3.
Consider a minimal case with a DataFrame containing a single column of strings. You can use the Pipe
Transformer to reverse each of the strings in the input DataFrame using the
rev Linux command:
import db_genomics as dbg import pyspark.sql.expressions # Create a text-only DataFrame df = spark.createDataFrame([['foo'], ['bar'], ['baz']], ['value']) display(dbg.transform('pipe', df, cmd='["rev"]', inputformatter='text', outputformatter='text'))
The options in this example demonstrate how to control the basic behavior of the transformer:
cmdis a JSON-encoded array that contains the command to invoke the program
inputformatterdefines how each input row should be passed to the program
outputformatterdefines how the program output should be converted into a new DataFrame
The input DataFrame can come from any Spark data source — Delta Lake, Parquet, VCF, BGEN, and so on.
To integrate with tools for genomic data, you can configure the Pipe Transformer to write each
partition of the input DataFrame as VCF by choosing
vcf as the input and output formatter.
import db_genomics as dbg df = spark.read.format("com.databricks.vcf")\ .load("/databricks-datasets/genomics/1kg-vcfs")\ .limit(1000) dbg.transform('pipe', df, cmd='["grep", "-v", "#INFO"]', inputformatter='vcf', in_vcfheader='infer', outputformatter='vcf')
When you use the VCF input formatter, you must specify a method to determine the VCF header. The
simplest option is
infer, which instructs the Pipe Transformer to derive a VCF header from the
You can also invoke the Pipe Transformer from Scala. You specify options as a
import com.databricks.hls.DBGenomics DBGenomics.transform("pipe", df, Map( "cmd" -> "[\"grep\", \"-v\", \"#INFO\"]" "inputFormatter" -> "vcf", "outputFormatter" -> "vcf", "in_vcfHeader" -> "infer"))
Option keys and values are always strings. From Python, you provide options through the
argument or as keyword args. From Scala, you provide options as a
Option names are case insensitive; for example
INPUTFORMATTER are all equivalent.
|cmd||The command, specified as an array of strings, to invoke the piped program. The program’s stdin receives the formatted contents of the input DataFrame, and the output DataFrame is constructed from its stdout. The stderr stream will appear in the executor logs.|
|inputFormatter||Converts the input DataFrame to a format that the piped program understands. Built-in
input formatters are
|outputFormatter||Converts the output of the piped program back into a DataFrame. Built-in output
|env_*||Options beginning with
Some of the input and output formatters take additional options.
VCF input formatter:
How to determine a VCF header from the input DataFrame. Possible values:
The CSV input and output formatters accept most of the same options as the CSV data source Supported options.
You must prefix options to the input formatter with
in_, and options to the output formatter with
out_. For example,
in_quote sets the quote character when writing the input DataFrame to the piped program.
The following options are not supported:
pathoptions are ignored
parserLiboption is ignored.
univocityis always used as the CSV parsing library.
The Pipe Transformer can be used with any command line tool. Try any of the following examples.