Skip to content →

Merge Schema in Delta Lake (even for nested columns)

As of Delta 0.7.0 the schema evolution supported by delta worked for cases, but had some nuances. It didn’t work for all cases. One of the cases that was a bummer for us at Iterable was, it didn’t support schema evolution on nested columns (columns that were StructType)

Before Delta 0.6.0 released schema evolution, we internally had a way of doing schema evolution, which later we extended to also include nested schema. The code pasted below can be used by

  1. First calling a mergeSchemas to update your target tables schema. mergeSchemas("db.table_name", updateDf, partitionCols = Some(Seq("partitionCol1"))
  2. The mergeSchemas will return your updatedDf with a schema that can be used in the write operation.
  3. Then continuing with your write operation updatedDf.write.mode("append").saveAsTable("db.table_name")

Do note, this is a more of a temporary solution until Delta expands the offering of mergeSchema because the catch of this solution lies in leveraging overwriteSchema provided by spark, and using from_json and to_json to add fields to existing struct types (which can be computationally expensive).

import org.apache.spark.sql.types.{ArrayType, DataType, NullType, StructType}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, lit, from_json, to_json}

def mergeSchemas(
    targetTableName: String,
    updateDf: DataFrame,
    partitionColumnsOpt: Option[Seq[String]] = None
  ): DataFrame = {
    val separatorForSchema = ";"
    var targetDf = spark.table(targetTableName)
    val partitionColumns: Seq[String] = partitionColumnsOpt match {
      case Some(partitionColumns) => partitionColumns
      case None => getPartitionColumns(targetTableName)
    }

    val updateDfSchema =
      getFlattenedSchemaToDataType(updateDf.schema, separator = separatorForSchema).toMap
    var targetDfSchema =
      getFlattenedSchemaToDataType(targetDf.schema, separator = separatorForSchema).toMap

    // Get a list of columns not in target
    val columnsMissingInTarget: Set[String] = updateDfSchema.keySet -- targetDfSchema.keySet

    // Get a list of columns that are NullType in target but non-null type in update
    val nullColumnsInTarget: Set[String] = targetDfSchema.filter {
      case (colName, dtype) =>
        if (dtype.isInstanceOf[NullType] &&
          updateDfSchema.contains(colName) &&
          !updateDfSchema(colName).isInstanceOf[NullType]) {
          true
        } else { false }
    }.keySet

    var parentColumnsToBeUpsertedInTarget: Set[String] =
      (columnsMissingInTarget ++ nullColumnsInTarget).map(_.split(separatorForSchema)(0))

    if (parentColumnsToBeUpsertedInTarget.nonEmpty) {
      // add partition columns to the list of cols to be upserted
      // it might be possible that partition columns exist in both parent and update
      // but when they don't we will have to add it explicitly
      // since df.write.mode("append").saveAsTable(tbl) cannot happen if tbl and df both don't
      // have partition columns
      parentColumnsToBeUpsertedInTarget = parentColumnsToBeUpsertedInTarget ++ partitionColumns
      // this line would fail if parent_cols_to_be_upserted_in_target contains only a
      // null type and partiton column only because, spark requires us to specify atleast one non-null column
      // select the top level missing columns from update and get the schema

      // create an empty dataframe with the new subset of columns that need to be updated in target
      val schemaToAddInTarget: StructType =
        updateDf.select(parentColumnsToBeUpsertedInTarget.toSeq.map(name => col(name)): _*).schema

      spark
        .createDataFrame(sc.emptyRDD[Row], schemaToAddInTarget)
        .write
        .format("delta")
        .option("mergeSchema", "true")
        .mode("append")
        .saveAsTable(targetTableName)

      spark.sql(s"REFRESH TABLE $targetTableName")
      targetDf = spark.table(targetTableName)
      targetDfSchema =
        getFlattenedSchemaToDataType(targetDf.schema, separator = separatorForSchema).toMap
    }

    // get the cols not existing in updateDf
    val columnsMissingInUpdate: Set[String] = targetDfSchema.keySet -- updateDfSchema.keySet
    val parentColumnsMissingInUpdate = columnsMissingInUpdate.map(_.split(separatorForSchema)(0))
    val updateDfParentColumnsSet: Set[String] = updateDf.columns.toSet

    var updatedDf = updateDf
    parentColumnsMissingInUpdate foreach { colName =>
      val dataTypeInTargetForMissingColumn: DataType =
        targetDf.select(colName).schema.fields(0).dataType

      if (updateDfParentColumnsSet.contains(colName)) {
        // col exists already, which means the col is of a complex datatype i.e Struct/ArrayType
        // cast the update col to json, and then get back the desired schema using from_json.
        updatedDf = updatedDf.withColumn(
          colName,
          from_json(to_json(col(colName)), dataTypeInTargetForMissingColumn)
        )
      } else {
        // create new column with column doesn't exist
        updatedDf = updatedDf.withColumn(colName, lit(null).cast(dataTypeInTargetForMissingColumn))
      }
    }
    updatedDf
  }


def getPartitionColumns(tableDestination: String): Seq[String] = {
    spark
      .sql(s"DESCRIBE DETAIL $tableDestination")
      .select("partitionColumns")
      .collect
      .flatMap(_.getSeq(0))
  }

def getFlattenedSchemaToDataType(
    schema: StructType,
    prefix: Option[String] = None,
    separator: String = ";"
  ): Array[(String, DataType)] = {
    schema.fields.flatMap(field => {
      val colName: String = if (prefix.isEmpty) field.name else (prefix.get + separator + field.name)
      val colType: DataType = field.dataType match {
        case ar: ArrayType => ar.elementType
        case _ => field.dataType
      }
      colType match {
        case st: StructType =>
          getFlattenedSchemaToDataType(st, Some(colName), separator = separator)
        case _ => Array(colName -> colType)
      }
    })
  }

Published in Spark Uncategorized

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.