Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@ package org.apache.paimon.spark.write
import org.apache.paimon.CoreOptions
import org.apache.paimon.Snapshot
import org.apache.paimon.options.Options
import org.apache.paimon.spark.SparkTypeUtils
import org.apache.paimon.spark.commands.SchemaEvolutionHelper
import org.apache.paimon.spark.schema.SparkSystemColumns
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.types.RowType

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.PaimonUtils
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.types.{DataType, StructType}

import scala.collection.JavaConverters._

Expand All @@ -40,6 +45,7 @@ class PaimonV2WriteBuilder(table: FileStoreTable, dataSchema: StructType, option
}

override def build: PaimonV2Write = {
validateDataSchema()
val finalTable = overwriteDynamic match {
case Some(o) =>
table.copy(Map(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key -> o.toString).asJava)
Expand All @@ -55,4 +61,63 @@ class PaimonV2WriteBuilder(table: FileStoreTable, dataSchema: StructType, option
}

override def partitionRowType(): RowType = table.schema().logicalPartitionType()

/**
* Fail fast when the query schema is binary-incompatible with the table schema.
*
* Paimon tables declare `ACCEPT_ANY_SCHEMA`, so Spark skips its own output resolution and the
* query is aligned to the table schema by `PaimonAnalysis` instead, which is only injected when
* `PaimonSparkSessionExtensions` is configured. If a write is planned without it, a
* type-mismatched query would reach the writer as-is and be interpreted with the table's row
* type, silently corrupting data or failing with errors like `NegativeArraySizeException` deep
* inside the format writer.
*/
private def validateDataSchema(): Unit = {
// Row-level operations write the table's own rows back; schema evolution intentionally
// diverges from the current table schema and is validated when merging.
if (copyOnWriteScan.nonEmpty || SchemaEvolutionHelper.mergeSchemaEnabled(options)) {
return
}

val expectedFields = SparkTypeUtils.fromPaimonRowType(table.rowType()).fields
val actualFields = SparkSystemColumns.filterSparkSystemColumns(dataSchema).fields

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not catch the PR's own type-mismatch case. On this branch, running

mvn -pl paimon-spark/paimon-spark-ut -am -Pfast-build -DfailIfNoTests=false -DwildcardSuites=org.apache.paimon.spark.sql.PaimonV2WriteSchemaValidationTest -Dtest=none test

still fails at PaimonOptionTest.scala:331: the insert reaches PaimonV2DataWriter and aborts the Spark job with a SparkException caused by NegativeArraySizeException, instead of failing here with the new PaimonSparkSessionExtensions validation message. That means dataSchema is not a reliable proxy for the physical query rows in this write path (or the validation is happening at the wrong layer), so the original fail-fast/corruption path remains unfixed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking! I believe that run picked up a stale paimon-spark-common_2.12-1.5-SNAPSHOT from the local repository instead of the one from this branch, so the PR's change was never on the classpath.

-Pfast-build explicitly activates a profile, which disables the activeByDefault activation of the spark3 profile (standard Maven behavior). Without spark3, the paimon-spark-common modules drop out of the reactor — the reactor summary of that exact command only contains:

[INFO] Paimon : Spark                                                     [pom]
[INFO] Paimon : Spark : UT : 2.12                                         [jar]

so -am never compiles this PR's change to PaimonV2WriteBuilder, and the test runs against whatever 1.5-SNAPSHOT is installed in ~/.m2. If that snapshot predates this PR, the test fails with exactly the signature you saw — I actually produced the same message on purpose while developing (ran the new test against a build with the fix stashed to verify it was red first):

Expected exception java.lang.RuntimeException to be thrown,
but org.apache.spark.SparkException was thrown (PaimonOptionTest.scala:331)

Adding the spark3 profile brings the common modules back into the reactor:

mvn -pl paimon-spark/paimon-spark-ut -am -Pfast-build,spark3 -DfailIfNoTests=false \
  -DwildcardSuites=org.apache.paimon.spark.sql.PaimonV2WriteSchemaValidationTest -Dtest=none clean test

and the suite passes (your exact command also passes once the branch's paimon-spark-common is installed to the local repo; the PR CI, which builds from scratch, is green on both 2.12 and 2.13 as well):

- Paimon V2 write: fail fast on incompatible query schema without Paimon extensions
Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0

On the layering question: for this insert path LogicalWriteInfo.schema() is the analyzed query's output schema, which is exactly the row layout the writer receives, so validating it against the table row type in build() catches the mismatch on the driver before any task runs. Row-level operations (copyOnWriteScan) and write.merge-schema writes are exempted since their schemas intentionally diverge and are validated elsewhere.


def fail(reason: String): Unit = {
throw new RuntimeException(
s"Cannot write incompatible data to table '${table.name()}': $reason. " +
"The write was planned without Paimon's output resolution, which usually means " +
"'org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions' is not configured " +
"in 'spark.sql.extensions'. Please configure it, or align the query columns with the " +
"table schema explicitly.")
}

if (actualFields.length != expectedFields.length) {
fail(
s"the number of query columns (${actualFields.length}) does not match " +
s"the table schema's (${expectedFields.length})")
}

val incompatible = actualFields.zip(expectedFields).filterNot {
case (actual, expected) =>
val actualType = binaryCompatibleType(
CharVarcharUtils.getRawType(actual.metadata).getOrElse(actual.dataType))
PaimonUtils.equalsIgnoreCompatibleNullability(
actualType,
binaryCompatibleType(expected.dataType))
}
if (incompatible.nonEmpty) {
val details = incompatible
.map {
case (actual, expected) =>
s"${actual.name} ${actual.dataType.simpleString} -> " +
s"${expected.name} ${expected.dataType.simpleString}"
}
.mkString("[", ", ", "]")
fail(s"incompatible query column type(s): $details")
}
}

// CHAR/VARCHAR share the string binary layout, treat them as STRING for compatibility check.
private def binaryCompatibleType(dataType: DataType): DataType =
CharVarcharUtils.replaceCharVarcharWithString(dataType)
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,41 @@ class PaimonConfigCheckTest extends SparkFunSuite {
}
}
}

class PaimonV2WriteSchemaValidationTest extends SparkFunSuite with SparkVersionSupport {

test("Paimon V2 write: fail fast on incompatible query schema without Paimon extensions") {
assume(gteqSpark3_4)
val spark = SparkSession
.builder()
.master("local[2]")
.config("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog")
.config("spark.sql.catalog.paimon.warehouse", Utils.createTempDir.getCanonicalPath)
.config("spark.paimon.requiredSparkConfsCheck.enabled", "false")
.config("spark.paimon.write.use-v2-write", "true")
.getOrCreate()
try {
spark.sql("USE paimon")
spark.sql("CREATE TABLE T (a STRING, b STRING, pt STRING) PARTITIONED BY (pt)")

// A query schema that matches the table schema still works without the extensions.
spark.sql("INSERT INTO T VALUES ('x', 'y', '20260601')")
assert(spark.sql("SELECT * FROM T").collect() === Array(Row("x", "y", "20260601")))

// Type mismatch: without PaimonAnalysis no cast is added, the write must fail fast
// instead of corrupting data or throwing NegativeArraySizeException in the writer.
val typeMismatch = intercept[RuntimeException] {
spark.sql("INSERT OVERWRITE TABLE T PARTITION (pt) SELECT 'x', 0.98d, '20260601'")
}
assert(typeMismatch.getMessage.contains("PaimonSparkSessionExtensions"))

// Column count mismatch must fail fast as well.
val countMismatch = intercept[RuntimeException] {
spark.sql("INSERT INTO T VALUES ('x', 'y')")
}
assert(countMismatch.getMessage.contains("PaimonSparkSessionExtensions"))
} finally {
spark.close()
}
}
}
Loading