[spark] Fail fast on incompatible query schema in v2 write#8456
[spark] Fail fast on incompatible query schema in v2 write#8456kerwin-zk wants to merge 1 commit into
Conversation
| } | ||
|
|
||
| val expectedFields = SparkTypeUtils.fromPaimonRowType(table.rowType()).fields | ||
| val actualFields = SparkSystemColumns.filterSparkSystemColumns(dataSchema).fields |
There was a problem hiding this comment.
This does not catch the PR's own type-mismatch case. On this branch, running
mvn -pl paimon-spark/paimon-spark-ut -am -Pfast-build -DfailIfNoTests=false -DwildcardSuites=org.apache.paimon.spark.sql.PaimonV2WriteSchemaValidationTest -Dtest=none test
still fails at PaimonOptionTest.scala:331: the insert reaches PaimonV2DataWriter and aborts the Spark job with a SparkException caused by NegativeArraySizeException, instead of failing here with the new PaimonSparkSessionExtensions validation message. That means dataSchema is not a reliable proxy for the physical query rows in this write path (or the validation is happening at the wrong layer), so the original fail-fast/corruption path remains unfixed.
There was a problem hiding this comment.
Thanks for checking! I believe that run picked up a stale paimon-spark-common_2.12-1.5-SNAPSHOT from the local repository instead of the one from this branch, so the PR's change was never on the classpath.
-Pfast-build explicitly activates a profile, which disables the activeByDefault activation of the spark3 profile (standard Maven behavior). Without spark3, the paimon-spark-common modules drop out of the reactor — the reactor summary of that exact command only contains:
[INFO] Paimon : Spark [pom]
[INFO] Paimon : Spark : UT : 2.12 [jar]
so -am never compiles this PR's change to PaimonV2WriteBuilder, and the test runs against whatever 1.5-SNAPSHOT is installed in ~/.m2. If that snapshot predates this PR, the test fails with exactly the signature you saw — I actually produced the same message on purpose while developing (ran the new test against a build with the fix stashed to verify it was red first):
Expected exception java.lang.RuntimeException to be thrown,
but org.apache.spark.SparkException was thrown (PaimonOptionTest.scala:331)
Adding the spark3 profile brings the common modules back into the reactor:
mvn -pl paimon-spark/paimon-spark-ut -am -Pfast-build,spark3 -DfailIfNoTests=false \
-DwildcardSuites=org.apache.paimon.spark.sql.PaimonV2WriteSchemaValidationTest -Dtest=none clean test
and the suite passes (your exact command also passes once the branch's paimon-spark-common is installed to the local repo; the PR CI, which builds from scratch, is green on both 2.12 and 2.13 as well):
- Paimon V2 write: fail fast on incompatible query schema without Paimon extensions
Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
On the layering question: for this insert path LogicalWriteInfo.schema() is the analyzed query's output schema, which is exactly the row layout the writer receives, so validating it against the table row type in build() catches the mismatch on the driver before any task runs. Row-level operations (copyOnWriteScan) and write.merge-schema writes are exempted since their schemas intentionally diverge and are validated elsewhere.
Purpose
Since #6281, Paimon tables declare
ACCEPT_ANY_SCHEMAfor Spark v2 write, so Spark skips its own output resolution (DataSourceV2Relation#skipSchemaResolution) and the query is aligned to the table schema byPaimonAnalysisinstead, which is only injected whenPaimonSparkSessionExtensionsis configured.If a write is planned without the extensions (e.g.
spark.paimon.requiredSparkConfsCheck.enabled=falseis set, or the plan is constructed by a third-party engine), a type-mismatched INSERT reaches the writer as-is: the writer interprets the incomingUnsafeRowwith the table's row type, so a slot holding a DOUBLE gets read as a string's (offset, size). Depending on the bits this throwsNegativeArraySizeExceptiondeep inside the format writer, crashes the executor with an out-of-boundsUnsaferead, or silently writes garbage strings that can be committed (we hit all three in a production case where a query wroteSUM(...)doubles into STRING columns).This PR adds a defense-in-depth fail-fast in
PaimonV2WriteBuilder#build: the query schema must be positionally type-compatible with the table schema (reusingDataType.equalsIgnoreCompatibleNullability; CHAR/VARCHAR are normalized to STRING since they share the string binary layout). Mismatches now fail at planning time with a clear message pointing toPaimonSparkSessionExtensions.The check is skipped for row-level operations (
copyOnWriteScanis set) and forwrite.merge-schemawrites, which intentionally diverge from the current table schema and are validated elsewhere.Tests
PaimonV2WriteSchemaValidationTest: without the extensions, a schema-matching INSERT still succeeds, while type-mismatched and column-count-mismatched INSERTs fail fast with the new error (previously the type mismatch failed at execution time withNegativeArraySizeException).PaimonOptionTest,PaimonConfigCheckTest,V2WriteMergeSchemaTest,V2WriteRequireDistributionTest,PaimonDynamicPartitionOverwriteCommandTest,SparkWriteITCasepass on Spark 3.5 (Scala 2.12);PaimonV2WriteSchemaValidationTest+V2WriteMergeSchemaTestpass on Spark 4.x (Scala 2.13).API and Format
No.
Documentation
No.