Skip to content

[SPARK-56255][PYTHON][CONNECT] Make spark.read.csv accept DataFrame input#55274

Closed
Yicong-Huang wants to merge 10 commits intoapache:masterfrom
Yicong-Huang:SPARK-56255
Closed

[SPARK-56255][PYTHON][CONNECT] Make spark.read.csv accept DataFrame input#55274
Yicong-Huang wants to merge 10 commits intoapache:masterfrom
Yicong-Huang:SPARK-56255

Conversation

@Yicong-Huang
Copy link
Copy Markdown
Contributor

@Yicong-Huang Yicong-Huang commented Apr 9, 2026

What changes were proposed in this pull request?

This PR adds support for passing a DataFrame containing CSV strings directly to spark.read.csv(), following the same pattern established by #55097 (SPARK-56253) for spark.read.json().

Why are the changes needed?

Adding DataFrame support to csv() makes the API consistent with json() and enables Connect-compatible CSV parsing without sc.parallelize().

Does this PR introduce any user-facing change?

Yes. spark.read.csv() now accepts a DataFrame with a single string column as input, in addition to the existing str, list, and RDD inputs.

csv_df = spark.createDataFrame([("Alice,25",), ("Bob,30",)], schema="value STRING")
spark.read.csv(csv_df, schema="name STRING, age INT").show()
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# |  Bob| 30|
# +-----+---+

How was this patch tested?

Added 10 new test cases.

Was this patch authored or co-authored using generative AI tooling?

No.


from pyspark.sql.connect.dataframe import DataFrame

if isinstance(path, DataFrame):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safer to add this if dataframe check first with return to avoid any behaviour changes. e.g., we might support sth else than list.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. moved the check up.

df: DataFrame): DataFrame = {
val classicReader = reader.asInstanceOf[ClassicDataFrameReader]
val fields = df.schema.fields
if (fields.isEmpty) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we fix the same as 638ca41?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Applied the same pattern to reject multi-column input.

def csvFromDataFrame(
reader: DataFrameReader,
df: DataFrame): DataFrame = {
val classicReader = reader.asInstanceOf[ClassicDataFrameReader]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This the same code as above. Create a util for this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Extracted a helper method.

@HyukjinKwon
Copy link
Copy Markdown
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants