How to Update Nested Columns

Spark doesn’t support adding new columns or dropping existing columns in nested structures. In particular, the withColumn and drop methods of the Dataset class don’t allow you to specify a column name different from any top level columns. For example, suppose you have a dataset with the following schema:

val schema = (new StructType)
      .add("metadata",(new StructType)
             .add("eventid", "string", true)
             .add("hostname", "string", true)
             .add("timestamp", "string", true)
           , true)
      .add("items", (new StructType)
             .add("books", (new StructType).add("fees", "double", true), true)
             .add("paper", (new StructType).add("pages", "int", true), true)
           ,true)
schema.treeString

The schema looks like:

root
 |-- metadata: struct (nullable = true)
 |    |-- eventid: string (nullable = true)
 |    |-- hostname: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |-- items: struct (nullable = true)
 |    |-- books: struct (nullable = true)
 |    |    |-- fees: double (nullable = true)
 |    |-- paper: struct (nullable = true)
 |    |    |-- pages: integer (nullable = true)

Suppose you have the DataFrame:

val rdd: RDD[Row] = sc.parallelize(Seq(Row(
  Row("eventid1", "hostname1", "timestamp1"),
  Row(Row(100.0), Row(10)))))
val df = spark.createDataFrame(rdd, schema)
display(df)

You want to increase the fees column, which is nested under books, by 1%. To update the fees column, you can reconstruct the dataset from existing columns and the updated column as follows:

val updated = df.selectExpr("""
    named_struct(
        'metadata', metadata,
        'items', named_struct(
          'books', named_struct('fees', items.books.fees * 1.01),
          'paper', items.paper
        )
    ) as named_struct
""").select($"named_struct.metadata", $"named_struct.items")
updated.show(false)

Then you will get the result:

metadata items
[eventid1, hostname1, timestamp1] [[101.0], [10]]