Skip to content

[spark] Refactor PaimonSparkWriter write flow#8417

Open
huangxiaopingRD wants to merge 5 commits into
apache:masterfrom
huangxiaopingRD:refactor-paimon-spark-writer
Open

[spark] Refactor PaimonSparkWriter write flow#8417
huangxiaopingRD wants to merge 5 commits into
apache:masterfrom
huangxiaopingRD:refactor-paimon-spark-writer

Conversation

@huangxiaopingRD

Copy link
Copy Markdown
Contributor

Purpose

Refactor PaimonSparkWriter to make the Spark write flow easier to follow without changing write semantics.
This change mainly:

  • splits bucket-mode handling in write into dedicated private methods
  • extracts write context initialization from the main write flow
  • extracts the shared per-partition write pattern for bucketed and non-bucketed writes
  • keeps the existing write topology and commit merge behavior unchanged

Tests

Not adding new tests.
Reason:

  • this change is a readability refactor and does not intend to change behavior
  • the affected paths are already covered indirectly by existing Spark write, dynamic bucket, update, delete, and merge test suites

@huangxiaopingRD huangxiaopingRD marked this pull request as draft July 1, 2026 15:12
@huangxiaopingRD huangxiaopingRD marked this pull request as ready for review July 1, 2026 16:19
@JingsongLi

Copy link
Copy Markdown
Contributor

cc @Zouxxyy

batchId: Option[Long] = None)
extends WriteHelper {

private case class WriteContext(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task closures below capture this whole WriteContext via ctx.newWrite() / ctx.bucketColIdx. Because the context also stores driver-side objects (SparkSession, the original DataFrame, and preparedData), those objects become part of every executor closure even though they are only needed while building the plan on the driver. This can make Spark closure serialization fail or drag SparkContext/Dataset state into tasks. Please keep WriteContext limited to serializable task inputs, or extract only the needed scalars/functions before mapPartitions and keep SparkSession/DataFrame out of the captured object.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants