[SPARK-56125][SQL] Simplify schema calculation for Merge Into Schema Evolution#55124
Open
szehon-ho wants to merge 2 commits intoapache:masterfrom
Open
[SPARK-56125][SQL] Simplify schema calculation for Merge Into Schema Evolution#55124szehon-ho wants to merge 2 commits intoapache:masterfrom
szehon-ho wants to merge 2 commits intoapache:masterfrom
Conversation
389ffac to
5637683
Compare
aokolnychyi
reviewed
Apr 1, 2026
| // * INSERT (a, nested.b) VALUES (source.a, source.nested.b) | ||
| // New columns/nested fields in this schema that are not existing in target schema | ||
| // will be added for schema evolution. | ||
| def sourceSchemaForSchemaEvolution(merge: MergeIntoTable): StructType = { |
Contributor
There was a problem hiding this comment.
This was the most complicated method that did many recursive calls, so I agree we should avoid it.
The idea that I had was compatible with changes that Johan did and could look like this:
private def computeSchemaChanges(merge: MergeIntoTable): Seq[TableChange] = {
val actions = merge.matchedActions ++ merge.notMatchedActions
val assignments = actions.flatMap {
case a: UpdateAction => a.assignments
case a: InsertAction => a.assignments
case _ => Seq.empty
}
val changes = new mutable.HashSet[TableChange]()
assignments.foreach {
case a if isFieldAdditionCandidate(a, merge) =>
val fieldPath = extractFieldPath(a.key)
changes += TableChange.addColumn(fieldPath.toArray, a.value.dataType)
case a if a.resolved && a.key.dataType != a.value.dataType =>
changes ++= ResolveSchemaEvolution.computeSchemaChanges(
a.key.dataType,
a.value.dataType,
merge.targetTable.schema,
merge.sourceTable.schema,
fieldPath = extractFieldPath(a.key),
isByName = true)
case _ =>
// OK
}
changes.toSeq
}
private def extractFieldPath(expr: Expression): Seq[String] = {
expr match {
case UnresolvedAttribute(nameParts) => nameParts
case a: AttributeReference => Seq(a.name)
case Alias(child, _) => extractFieldPath(child)
case GetStructField(child, ordinal, nameOpt) =>
extractFieldPath(child) :+ nameOpt.getOrElse(s"col$ordinal")
case _ => Seq.empty
}
}
private def areSchemaEvolutionReady(
assignments: Seq[Assignment],
merge: MergeIntoTable): Boolean = {
assignments.forall(assign => assign.resolved || isFieldAdditionCandidate(assign, merge))
}
// TODO: clean up and add doc
private def isFieldAdditionCandidate(
assignment: Assignment,
merge: MergeIntoTable): Boolean = {
val key = assignment.key
val keyPath = extractFieldPath(key)
val value = assignment.value
val valuePath = extractFieldPath(value)
!key.resolved &&
value.resolved &&
keyPath == valuePath &&
assignment.value.references.subsetOf(merge.sourceTable.outputSet) &&
merge.targetTable.resolve(keyPath, SQLConf.get.resolver).isEmpty
}
Contributor
There was a problem hiding this comment.
With that said, yours may be better. Let me explore it in more detail.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Simplifies the schema evolution logic for MERGE INTO in two ways:
Replace pre-filtering source schema with post-filtering schema changes: Instead of building a pruned source schema (
sourceSchemaForSchemaEvolution) and then diffing it against the target, we now compute all schema diffs against the full source schema and filter the resultingTableChangeobjects to only those whose field paths fall under identity assignment paths. This eliminates the recursivefilterSchemafunction, theisEqualhelper (which was a duplicate ofisSameColumnAssignment), and theisPrefixhelper.Replace
originalTarget/originalSourcethreading withonIncompatiblecallback inResolveSchemaEvolution.computeSchemaChanges: The original target and source schemas were passed through every recursive call (7+ call sites across 3 methods) but only used once in the error case. A by-nameerrorparameter replaces both, reducing parameter count and improving readability.Also removes dead code: the
containsStarActioncheck insourceSchemaForSchemaEvolutionwas unreachable becauseactionsSchemaEvolutionReadyreturnsfalsefor star actions (viacase _ => false), preventingpendingSchemaChangesfrom callingsourceSchemaForSchemaEvolutionuntilResolveReferenceshas expanded all star actions.Why are the changes needed?
Code simplification and dead code removal. The existing logic was harder to follow due to the recursive schema pruning approach and redundant parameter threading.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests (the logic is semantically equivalent).