Skip to content

[SPARK-56253][PYTHON][CONNECT] Make spark.read.json accept DataFrame input#55097

Open
Yicong-Huang wants to merge 3 commits intoapache:masterfrom
Yicong-Huang:SPARK-56253
Open

[SPARK-56253][PYTHON][CONNECT] Make spark.read.json accept DataFrame input#55097
Yicong-Huang wants to merge 3 commits intoapache:masterfrom
Yicong-Huang:SPARK-56253

Conversation

@Yicong-Huang
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Allow spark.read.json() to accept a DataFrame with a single string column as input, in addition to file paths and RDDs.

Why are the changes needed?

Parsing in-memory JSON text into a structured DataFrame currently requires sc.parallelize(), which is unavailable on Spark Connect. Accepting a DataFrame as input provides a Connect-compatible alternative. This is the inverse of DataFrame.toJSON().

Part of SPARK-55227.

Does this PR introduce any user-facing change?

Yes. spark.read.json() now accepts a single-string-column DataFrame as input.

How was this patch tested?

New tests in test_datasources.py (classic) and test_connect_readwriter.py (Connect).

Was this patch authored or co-authored using generative AI tooling?

No

if isinstance(path, DataFrame):
assert self._spark._jvm is not None
string_encoder = self._spark._jvm.Encoders.STRING()
jdataset = getattr(path._jdf, "as")(string_encoder)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a private overload in JVM side def json(jsonDataset: DataFrame): DataFrame and directly call it here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the benefit?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can simplify the python side callsite a bit.
another trivial benefit is to reduce the number of py4j calls.

result = self.spark.read.json(json_df, schema="name STRING, age INT")
expected = [Row(name="Alice", age=25), Row(name="Bob", age=30)]
self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected)

Copy link
Copy Markdown
Contributor

@zhengruifeng zhengruifeng Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JVM side implementations accept RDD[String] and Dataset[String], now we are using DataFrame in python side.

So I think we need to add negative tests cases like:
1, single column, non-str type;
2, multiple columns
3, zero columns


from pyspark.sql.connect.dataframe import DataFrame

if isinstance(path, DataFrame):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check the schema in python side that it should only contain a string type column? or we depends on the check in the JVM side (or connect server)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants