From 323bfca1dae1cf8c54a0ef9d2f88586f04b01fb5 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 30 Mar 2026 16:48:04 +0000 Subject: [PATCH 01/16] feat: support DataFrame input for spark.read.json --- python/pyspark/sql/connect/plan.py | 36 ++++++++++++++++++- python/pyspark/sql/connect/readwriter.py | 25 ++++++++++++- python/pyspark/sql/readwriter.py | 33 ++++++++++++++--- .../tests/connect/test_connect_readwriter.py | 9 +++++ python/pyspark/sql/tests/test_datasources.py | 18 ++++++++++ 5 files changed, 115 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 3dac4fc47ee70..4b5be6087f7e3 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -43,7 +43,7 @@ from pyspark.serializers import CloudPickleSerializer from pyspark.storagelevel import StorageLevel -from pyspark.sql.types import DataType +from pyspark.sql.types import DataType, StructType import pyspark.sql.connect.proto as proto from pyspark.sql.column import Column @@ -383,6 +383,40 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation: return plan +class Parse(LogicalPlan): + """Parse a DataFrame with a single string column into a structured DataFrame.""" + + def __init__( + self, + child: "LogicalPlan", + format: int, + schema: Optional[str] = None, + options: Optional[Mapping[str, str]] = None, + ) -> None: + super().__init__(child) + self._format = format + self._schema = schema + self._options = options + + def plan(self, session: "SparkConnectClient") -> proto.Relation: + assert self._child is not None + plan = self._create_proto_relation() + plan.parse.input.CopyFrom(self._child.plan(session)) + plan.parse.format = self._format + if self._schema is not None and len(self._schema) > 0: + plan.parse.schema.CopyFrom( + pyspark_types_to_proto_types( + StructType.fromDDL(self._schema) + if not self._schema.startswith("{") + else StructType.fromJson(self._schema) + ) + ) + if self._options is not None: + for k, v in self._options.items(): + plan.parse.options[k] = v + return plan + + class Read(LogicalPlan): def __init__( self, diff --git a/python/pyspark/sql/connect/readwriter.py b/python/pyspark/sql/connect/readwriter.py index c951a9caf6a56..880c2d97f8ca9 100644 --- a/python/pyspark/sql/connect/readwriter.py +++ b/python/pyspark/sql/connect/readwriter.py @@ -25,7 +25,9 @@ LogicalPlan, WriteOperation, WriteOperationV2, + Parse, ) +import pyspark.sql.connect.proto as proto from pyspark.sql.types import StructType from pyspark.sql.utils import to_str from pyspark.sql.readwriter import ( @@ -220,7 +222,28 @@ def json( ) if isinstance(path, str): path = [path] - return self.load(path=path, format="json", schema=schema) + if isinstance(path, list): + return self.load(path=path, format="json", schema=schema) + + from pyspark.sql.connect.dataframe import DataFrame + + if isinstance(path, DataFrame): + return self._df( + Parse( + child=path._plan, + format=proto.Parse.ParseFormat.PARSE_FORMAT_JSON, + schema=self._schema, + options=self._options, + ) + ) + raise PySparkTypeError( + errorClass="NOT_EXPECTED_TYPE", + messageParameters={ + "arg_name": "path", + "expected_type": "str, list, or DataFrame", + "arg_type": type(path).__name__, + }, + ) json.__doc__ = PySparkDataFrameReader.json.__doc__ diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bed87788d2c11..efb8f5f9c77f0 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -320,7 +320,7 @@ def load( def json( self, - path: Union[str, List[str], "RDD[str]"], + path: Union[str, List[str], "RDD[str]", "DataFrame"], schema: Optional[Union[StructType, str]] = None, primitivesAsString: Optional[Union[bool, str]] = None, prefersDecimal: Optional[Union[bool, str]] = None, @@ -361,11 +361,15 @@ def json( .. versionchanged:: 3.4.0 Supports Spark Connect. + .. versionchanged:: 4.1.0 + Supports DataFrame input. + Parameters ---------- - path : str, list or :class:`RDD` + path : str, list, :class:`RDD`, or :class:`DataFrame` string represents path to the JSON dataset, or a list of paths, - or RDD of Strings storing JSON objects. + or RDD of Strings storing JSON objects, + or a DataFrame with a single string column containing JSON strings. schema : :class:`pyspark.sql.types.StructType` or str, optional an optional :class:`pyspark.sql.types.StructType` for the input schema or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``). @@ -434,6 +438,20 @@ def json( +----+---+ | Bob| 30| +----+---+ + + Example 4: Parse JSON from a DataFrame with a single string column. + + >>> json_df = spark.createDataFrame( + ... [('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)], + ... schema="value STRING", + ... ) + >>> spark.read.json(json_df).sort("name").show() + +---+-----+ + |age| name| + +---+-----+ + | 25|Alice| + | 30| Bob| + +---+-----+ """ self._set_opts( schema=schema, @@ -486,12 +504,19 @@ def func(iterator: Iterable) -> Iterable: assert self._spark._jvm is not None jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString()) return self._df(self._jreader.json(jrdd)) + + from pyspark.sql.dataframe import DataFrame + + if isinstance(path, DataFrame): + string_encoder = self._spark._jvm.Encoders.STRING() + jdataset = getattr(path._jdf, "as")(string_encoder) + return self._df(self._jreader.json(jdataset)) else: raise PySparkTypeError( errorClass="NOT_EXPECTED_TYPE", messageParameters={ "arg_name": "path", - "expected_type": "str or list[RDD]", + "expected_type": "str, list, RDD, or DataFrame", "arg_type": type(path).__name__, }, ) diff --git a/python/pyspark/sql/tests/connect/test_connect_readwriter.py b/python/pyspark/sql/tests/connect/test_connect_readwriter.py index fc27771fff74d..4bda45a442fa0 100644 --- a/python/pyspark/sql/tests/connect/test_connect_readwriter.py +++ b/python/pyspark/sql/tests/connect/test_connect_readwriter.py @@ -177,6 +177,15 @@ def test_csv(self): # Read the text file as a DataFrame. self.assert_eq(self.connect.read.csv(d).toPandas(), self.spark.read.csv(d).toPandas()) + def test_json_with_dataframe_input(self): + json_df = self.connect.createDataFrame( + [('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)], + schema="value STRING", + ) + result = self.connect.read.json(json_df) + expected = [Row(age=25, name="Alice"), Row(age=30, name="Bob")] + self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) + def test_multi_paths(self): # SPARK-42041: DataFrameReader should support list of paths diff --git a/python/pyspark/sql/tests/test_datasources.py b/python/pyspark/sql/tests/test_datasources.py index 1ceb74c1d907c..e32685a621c7d 100644 --- a/python/pyspark/sql/tests/test_datasources.py +++ b/python/pyspark/sql/tests/test_datasources.py @@ -93,6 +93,24 @@ def test_linesep_json(self): finally: shutil.rmtree(tpath) + def test_json_with_dataframe_input(self): + json_df = self.spark.createDataFrame( + [('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)], + schema="value STRING", + ) + result = self.spark.read.json(json_df) + expected = [Row(age=25, name="Alice"), Row(age=30, name="Bob")] + self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) + + def test_json_with_dataframe_input_and_schema(self): + json_df = self.spark.createDataFrame( + [('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)], + schema="value STRING", + ) + result = self.spark.read.json(json_df, schema="name STRING, age INT") + expected = [Row(name="Alice", age=25), Row(name="Bob", age=30)] + self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) + def test_multiline_csv(self): ages_newlines = self.spark.read.csv( "python/test_support/sql/ages_newlines.csv", multiLine=True From e34911dc630a6513e293a757cfdef6ae0f015c13 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 30 Mar 2026 20:55:30 +0000 Subject: [PATCH 02/16] fix: mypy errors and schema passthrough in connect json --- python/pyspark/sql/connect/plan.py | 4 ++-- python/pyspark/sql/connect/readwriter.py | 2 ++ python/pyspark/sql/readwriter.py | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 4b5be6087f7e3..2189b521b87d6 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -402,13 +402,13 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation: assert self._child is not None plan = self._create_proto_relation() plan.parse.input.CopyFrom(self._child.plan(session)) - plan.parse.format = self._format + plan.parse.format = self._format # type: ignore[assignment] if self._schema is not None and len(self._schema) > 0: plan.parse.schema.CopyFrom( pyspark_types_to_proto_types( StructType.fromDDL(self._schema) if not self._schema.startswith("{") - else StructType.fromJson(self._schema) + else StructType.fromJson(json.loads(self._schema)) ) ) if self._options is not None: diff --git a/python/pyspark/sql/connect/readwriter.py b/python/pyspark/sql/connect/readwriter.py index 880c2d97f8ca9..af59ffd76e073 100644 --- a/python/pyspark/sql/connect/readwriter.py +++ b/python/pyspark/sql/connect/readwriter.py @@ -228,6 +228,8 @@ def json( from pyspark.sql.connect.dataframe import DataFrame if isinstance(path, DataFrame): + if schema is not None: + self.schema(schema) return self._df( Parse( child=path._plan, diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index efb8f5f9c77f0..704be30b860f9 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -508,6 +508,7 @@ def func(iterator: Iterable) -> Iterable: from pyspark.sql.dataframe import DataFrame if isinstance(path, DataFrame): + assert self._spark._jvm is not None string_encoder = self._spark._jvm.Encoders.STRING() jdataset = getattr(path._jdf, "as")(string_encoder) return self._df(self._jreader.json(jdataset)) From f395737680af389e70857999553629c7ad40ceb8 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 30 Mar 2026 21:06:44 +0000 Subject: [PATCH 03/16] fix: use proper proto type for format, add connect schema test --- python/pyspark/sql/connect/plan.py | 4 ++-- python/pyspark/sql/connect/readwriter.py | 2 ++ .../pyspark/sql/tests/connect/test_connect_readwriter.py | 9 +++++++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 2189b521b87d6..6aff0ef636820 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -389,7 +389,7 @@ class Parse(LogicalPlan): def __init__( self, child: "LogicalPlan", - format: int, + format: "proto.Parse.ParseFormat.ValueType", schema: Optional[str] = None, options: Optional[Mapping[str, str]] = None, ) -> None: @@ -402,7 +402,7 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation: assert self._child is not None plan = self._create_proto_relation() plan.parse.input.CopyFrom(self._child.plan(session)) - plan.parse.format = self._format # type: ignore[assignment] + plan.parse.format = self._format if self._schema is not None and len(self._schema) > 0: plan.parse.schema.CopyFrom( pyspark_types_to_proto_types( diff --git a/python/pyspark/sql/connect/readwriter.py b/python/pyspark/sql/connect/readwriter.py index af59ffd76e073..1096e771e1d01 100644 --- a/python/pyspark/sql/connect/readwriter.py +++ b/python/pyspark/sql/connect/readwriter.py @@ -228,6 +228,8 @@ def json( from pyspark.sql.connect.dataframe import DataFrame if isinstance(path, DataFrame): + # Schema must be set explicitly here because the DataFrame path + # bypasses load(), which normally calls self.schema(schema). if schema is not None: self.schema(schema) return self._df( diff --git a/python/pyspark/sql/tests/connect/test_connect_readwriter.py b/python/pyspark/sql/tests/connect/test_connect_readwriter.py index 4bda45a442fa0..1987450f4c425 100644 --- a/python/pyspark/sql/tests/connect/test_connect_readwriter.py +++ b/python/pyspark/sql/tests/connect/test_connect_readwriter.py @@ -186,6 +186,15 @@ def test_json_with_dataframe_input(self): expected = [Row(age=25, name="Alice"), Row(age=30, name="Bob")] self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) + def test_json_with_dataframe_input_and_schema(self): + json_df = self.connect.createDataFrame( + [('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)], + schema="value STRING", + ) + result = self.connect.read.json(json_df, schema="name STRING, age INT") + expected = [Row(name="Alice", age=25), Row(name="Bob", age=30)] + self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) + def test_multi_paths(self): # SPARK-42041: DataFrameReader should support list of paths From a6195370d271b454e7f228daffc60a855cf06164 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 2 Apr 2026 05:45:01 +0000 Subject: [PATCH 04/16] refactor: add JVM-side jsonFromDataFrame, add negative tests --- python/pyspark/sql/readwriter.py | 5 +---- .../sql/tests/connect/test_connect_readwriter.py | 12 ++++++++++++ python/pyspark/sql/tests/test_datasources.py | 12 ++++++++++++ .../apache/spark/sql/classic/DataFrameReader.scala | 9 +++++++++ 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 704be30b860f9..ed81ad262c639 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -508,10 +508,7 @@ def func(iterator: Iterable) -> Iterable: from pyspark.sql.dataframe import DataFrame if isinstance(path, DataFrame): - assert self._spark._jvm is not None - string_encoder = self._spark._jvm.Encoders.STRING() - jdataset = getattr(path._jdf, "as")(string_encoder) - return self._df(self._jreader.json(jdataset)) + return self._df(self._jreader.jsonFromDataFrame(path._jdf)) else: raise PySparkTypeError( errorClass="NOT_EXPECTED_TYPE", diff --git a/python/pyspark/sql/tests/connect/test_connect_readwriter.py b/python/pyspark/sql/tests/connect/test_connect_readwriter.py index 1987450f4c425..c1025e9983997 100644 --- a/python/pyspark/sql/tests/connect/test_connect_readwriter.py +++ b/python/pyspark/sql/tests/connect/test_connect_readwriter.py @@ -195,6 +195,18 @@ def test_json_with_dataframe_input_and_schema(self): expected = [Row(name="Alice", age=25), Row(name="Bob", age=30)] self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) + def test_json_with_dataframe_input_multiple_columns(self): + multi_df = self.connect.createDataFrame( + [("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING" + ) + with self.assertRaises(Exception): + self.connect.read.json(multi_df).collect() + + def test_json_with_dataframe_input_zero_columns(self): + empty_schema_df = self.connect.range(1).select() + with self.assertRaises(Exception): + self.connect.read.json(empty_schema_df).collect() + def test_multi_paths(self): # SPARK-42041: DataFrameReader should support list of paths diff --git a/python/pyspark/sql/tests/test_datasources.py b/python/pyspark/sql/tests/test_datasources.py index e32685a621c7d..166283b263876 100644 --- a/python/pyspark/sql/tests/test_datasources.py +++ b/python/pyspark/sql/tests/test_datasources.py @@ -111,6 +111,18 @@ def test_json_with_dataframe_input_and_schema(self): expected = [Row(name="Alice", age=25), Row(name="Bob", age=30)] self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) + def test_json_with_dataframe_input_multiple_columns(self): + multi_df = self.spark.createDataFrame( + [("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING" + ) + with self.assertRaises(Exception): + self.spark.read.json(multi_df).collect() + + def test_json_with_dataframe_input_zero_columns(self): + empty_schema_df = self.spark.range(1).select() + with self.assertRaises(Exception): + self.spark.read.json(empty_schema_df).collect() + def test_multiline_csv(self): ages_newlines = self.spark.read.csv( "python/test_support/sql/ages_newlines.csv", multiLine=True diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala index d0d6bf1e8ec0d..e064753423983 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala @@ -192,6 +192,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming) } + /** + * Parses a [[DataFrame]] containing JSON strings into a structured [[DataFrame]]. + * The input DataFrame must contain exactly one column of string type. + * This is used by PySpark to avoid manual Dataset[String] conversion on the Python side. + */ + private[sql] def jsonFromDataFrame(df: DataFrame): DataFrame = { + json(df.as(Encoders.STRING)) + } + /** @inheritdoc */ override def csv(path: String): DataFrame = super.csv(path) From 609e7bfc04380904808d4712b69b3aa2038b48f4 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 2 Apr 2026 21:24:47 +0000 Subject: [PATCH 05/16] refactor: move jsonFromDataFrame to PythonSQLUtils --- python/pyspark/sql/readwriter.py | 5 ++++- .../sql/tests/connect/test_connect_readwriter.py | 5 +++++ python/pyspark/sql/tests/test_datasources.py | 5 +++++ .../apache/spark/sql/api/python/PythonSQLUtils.scala | 12 +++++++++++- .../apache/spark/sql/classic/DataFrameReader.scala | 9 --------- 5 files changed, 25 insertions(+), 11 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index ed81ad262c639..3dd9dc023dd77 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -508,7 +508,10 @@ def func(iterator: Iterable) -> Iterable: from pyspark.sql.dataframe import DataFrame if isinstance(path, DataFrame): - return self._df(self._jreader.jsonFromDataFrame(path._jdf)) + assert self._spark._jvm is not None + return self._df( + self._spark._jvm.PythonSQLUtils.jsonFromDataFrame(self._jreader, path._jdf) + ) else: raise PySparkTypeError( errorClass="NOT_EXPECTED_TYPE", diff --git a/python/pyspark/sql/tests/connect/test_connect_readwriter.py b/python/pyspark/sql/tests/connect/test_connect_readwriter.py index c1025e9983997..d5ef75d963527 100644 --- a/python/pyspark/sql/tests/connect/test_connect_readwriter.py +++ b/python/pyspark/sql/tests/connect/test_connect_readwriter.py @@ -195,6 +195,11 @@ def test_json_with_dataframe_input_and_schema(self): expected = [Row(name="Alice", age=25), Row(name="Bob", age=30)] self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) + def test_json_with_dataframe_input_non_string_column(self): + int_df = self.connect.createDataFrame([(1,), (2,)], schema="value INT") + with self.assertRaises(Exception): + self.connect.read.json(int_df).collect() + def test_json_with_dataframe_input_multiple_columns(self): multi_df = self.connect.createDataFrame( [("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING" diff --git a/python/pyspark/sql/tests/test_datasources.py b/python/pyspark/sql/tests/test_datasources.py index 166283b263876..9084f23207b19 100644 --- a/python/pyspark/sql/tests/test_datasources.py +++ b/python/pyspark/sql/tests/test_datasources.py @@ -111,6 +111,11 @@ def test_json_with_dataframe_input_and_schema(self): expected = [Row(name="Alice", age=25), Row(name="Bob", age=30)] self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) + def test_json_with_dataframe_input_non_string_column(self): + int_df = self.spark.createDataFrame([(1,), (2,)], schema="value INT") + with self.assertRaises(Exception): + self.spark.read.json(int_df).collect() + def test_json_with_dataframe_input_multiple_columns(self): multi_df = self.spark.createDataFrame( [("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 5607c98bf29e5..03a9a62aee802 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.api.python.DechunkedInputStream import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.CLASS_LOADER import org.apache.spark.security.SocketAuthServer -import org.apache.spark.sql.{internal, Column, DataFrame, Row, SparkSession, TableArg} +import org.apache.spark.sql.{internal, Column, DataFrame, Encoders, Row, SparkSession, TableArg} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRegistry} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -193,6 +193,16 @@ private[sql] object PythonSQLUtils extends Logging { @scala.annotation.varargs def internalFn(name: String, inputs: Column*): Column = Column.internalFn(name, inputs: _*) + /** + * Parses a [[DataFrame]] containing JSON strings into a structured [[DataFrame]]. + * This is used by PySpark to avoid manual Dataset[String] conversion on the Python side. + */ + def jsonFromDataFrame( + reader: sql.DataFrameReader, + df: DataFrame): DataFrame = { + reader.json(df.as(Encoders.STRING)) + } + def cleanupPythonWorkerLogs(sessionUUID: String, sparkContext: SparkContext): Unit = { if (!sparkContext.isStopped) { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala index e064753423983..d0d6bf1e8ec0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala @@ -192,15 +192,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming) } - /** - * Parses a [[DataFrame]] containing JSON strings into a structured [[DataFrame]]. - * The input DataFrame must contain exactly one column of string type. - * This is used by PySpark to avoid manual Dataset[String] conversion on the Python side. - */ - private[sql] def jsonFromDataFrame(df: DataFrame): DataFrame = { - json(df.as(Encoders.STRING)) - } - /** @inheritdoc */ override def csv(path: String): DataFrame = super.csv(path) From fc96d9cfc1cd5fb4f9baa4f0dd971772f7dc89dd Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 2 Apr 2026 22:50:41 +0000 Subject: [PATCH 06/16] fix: use fully qualified DataFrameReader import --- .../org/apache/spark/sql/api/python/PythonSQLUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 03a9a62aee802..727f3cff900c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.api.python.DechunkedInputStream import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.CLASS_LOADER import org.apache.spark.security.SocketAuthServer -import org.apache.spark.sql.{internal, Column, DataFrame, Encoders, Row, SparkSession, TableArg} +import org.apache.spark.sql.{internal, Column, DataFrame, DataFrameReader, Encoders, Row, SparkSession, TableArg} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRegistry} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -198,7 +198,7 @@ private[sql] object PythonSQLUtils extends Logging { * This is used by PySpark to avoid manual Dataset[String] conversion on the Python side. */ def jsonFromDataFrame( - reader: sql.DataFrameReader, + reader: DataFrameReader, df: DataFrame): DataFrame = { reader.json(df.as(Encoders.STRING)) } From 030f413284c1a453659e5741562c8d62608e3851 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 3 Apr 2026 05:27:17 +0000 Subject: [PATCH 07/16] fix: validate DataFrame schema in jsonFromDataFrame --- .../sql/connect/planner/SparkConnectPlanner.scala | 10 +++++++++- .../apache/spark/sql/api/python/PythonSQLUtils.scala | 5 +++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 37bcf995ee16d..ed55234844ac4 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1760,7 +1760,15 @@ class SparkConnectPlanner( localMap.foreach { case (key, value) => reader.option(key, value) } reader } - def ds: Dataset[String] = Dataset(session, transformRelation(rel.getInput))(Encoders.STRING) + def ds: Dataset[String] = { + val input = transformRelation(rel.getInput) + val inputSchema = Dataset.ofRows(session, input).schema + require(inputSchema.fields.length == 1, + s"Input DataFrame must have exactly one column, but got ${inputSchema.fields.length}") + require(inputSchema.fields.head.dataType == org.apache.spark.sql.types.StringType, + s"Input DataFrame column must be StringType, but got ${inputSchema.fields.head.dataType}") + Dataset(session, input)(Encoders.STRING) + } rel.getFormat match { case ParseFormat.PARSE_FORMAT_CSV => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 727f3cff900c2..308d422368bd3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -195,11 +195,16 @@ private[sql] object PythonSQLUtils extends Logging { /** * Parses a [[DataFrame]] containing JSON strings into a structured [[DataFrame]]. + * The input DataFrame must have exactly one column of StringType. * This is used by PySpark to avoid manual Dataset[String] conversion on the Python side. */ def jsonFromDataFrame( reader: DataFrameReader, df: DataFrame): DataFrame = { + require(df.schema.fields.length == 1, + s"Input DataFrame must have exactly one column, but got ${df.schema.fields.length}") + require(df.schema.fields.head.dataType == org.apache.spark.sql.types.StringType, + s"Input DataFrame column must be StringType, but got ${df.schema.fields.head.dataType}") reader.json(df.as(Encoders.STRING)) } From dfbd9b3143c22be45b4545f5edc58e59c7be5a64 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 3 Apr 2026 22:56:36 +0000 Subject: [PATCH 08/16] fix: address review comments - skip validation, use assertRaisesRegex --- python/pyspark/sql/connect/readwriter.py | 2 +- python/pyspark/sql/readwriter.py | 2 +- .../sql/tests/connect/test_connect_readwriter.py | 6 +++--- python/pyspark/sql/tests/test_datasources.py | 6 +++--- .../sql/connect/planner/SparkConnectPlanner.scala | 10 +--------- .../apache/spark/sql/api/python/PythonSQLUtils.scala | 4 ---- 6 files changed, 9 insertions(+), 21 deletions(-) diff --git a/python/pyspark/sql/connect/readwriter.py b/python/pyspark/sql/connect/readwriter.py index 1096e771e1d01..027da28a31cb8 100644 --- a/python/pyspark/sql/connect/readwriter.py +++ b/python/pyspark/sql/connect/readwriter.py @@ -167,7 +167,7 @@ def changes(self, tableName: str) -> "DataFrame": def json( self, - path: PathOrPaths, + path: Union[PathOrPaths, "DataFrame"], schema: Optional[Union[StructType, str]] = None, primitivesAsString: Optional[Union[bool, str]] = None, prefersDecimal: Optional[Union[bool, str]] = None, diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 3dd9dc023dd77..7ada41a71655d 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -361,7 +361,7 @@ def json( .. versionchanged:: 3.4.0 Supports Spark Connect. - .. versionchanged:: 4.1.0 + .. versionchanged:: 4.2.0 Supports DataFrame input. Parameters diff --git a/python/pyspark/sql/tests/connect/test_connect_readwriter.py b/python/pyspark/sql/tests/connect/test_connect_readwriter.py index d5ef75d963527..d250d5adc5bf1 100644 --- a/python/pyspark/sql/tests/connect/test_connect_readwriter.py +++ b/python/pyspark/sql/tests/connect/test_connect_readwriter.py @@ -197,19 +197,19 @@ def test_json_with_dataframe_input_and_schema(self): def test_json_with_dataframe_input_non_string_column(self): int_df = self.connect.createDataFrame([(1,), (2,)], schema="value INT") - with self.assertRaises(Exception): + with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH"): self.connect.read.json(int_df).collect() def test_json_with_dataframe_input_multiple_columns(self): multi_df = self.connect.createDataFrame( [("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING" ) - with self.assertRaises(Exception): + with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.FIELD_NUMBER_MISMATCH"): self.connect.read.json(multi_df).collect() def test_json_with_dataframe_input_zero_columns(self): empty_schema_df = self.connect.range(1).select() - with self.assertRaises(Exception): + with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.FIELD_NUMBER_MISMATCH"): self.connect.read.json(empty_schema_df).collect() def test_multi_paths(self): diff --git a/python/pyspark/sql/tests/test_datasources.py b/python/pyspark/sql/tests/test_datasources.py index 9084f23207b19..21f83477a50ab 100644 --- a/python/pyspark/sql/tests/test_datasources.py +++ b/python/pyspark/sql/tests/test_datasources.py @@ -113,19 +113,19 @@ def test_json_with_dataframe_input_and_schema(self): def test_json_with_dataframe_input_non_string_column(self): int_df = self.spark.createDataFrame([(1,), (2,)], schema="value INT") - with self.assertRaises(Exception): + with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH"): self.spark.read.json(int_df).collect() def test_json_with_dataframe_input_multiple_columns(self): multi_df = self.spark.createDataFrame( [("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING" ) - with self.assertRaises(Exception): + with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.FIELD_NUMBER_MISMATCH"): self.spark.read.json(multi_df).collect() def test_json_with_dataframe_input_zero_columns(self): empty_schema_df = self.spark.range(1).select() - with self.assertRaises(Exception): + with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.FIELD_NUMBER_MISMATCH"): self.spark.read.json(empty_schema_df).collect() def test_multiline_csv(self): diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index ed55234844ac4..37bcf995ee16d 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1760,15 +1760,7 @@ class SparkConnectPlanner( localMap.foreach { case (key, value) => reader.option(key, value) } reader } - def ds: Dataset[String] = { - val input = transformRelation(rel.getInput) - val inputSchema = Dataset.ofRows(session, input).schema - require(inputSchema.fields.length == 1, - s"Input DataFrame must have exactly one column, but got ${inputSchema.fields.length}") - require(inputSchema.fields.head.dataType == org.apache.spark.sql.types.StringType, - s"Input DataFrame column must be StringType, but got ${inputSchema.fields.head.dataType}") - Dataset(session, input)(Encoders.STRING) - } + def ds: Dataset[String] = Dataset(session, transformRelation(rel.getInput))(Encoders.STRING) rel.getFormat match { case ParseFormat.PARSE_FORMAT_CSV => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 308d422368bd3..298aff3b156b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -201,10 +201,6 @@ private[sql] object PythonSQLUtils extends Logging { def jsonFromDataFrame( reader: DataFrameReader, df: DataFrame): DataFrame = { - require(df.schema.fields.length == 1, - s"Input DataFrame must have exactly one column, but got ${df.schema.fields.length}") - require(df.schema.fields.head.dataType == org.apache.spark.sql.types.StringType, - s"Input DataFrame column must be StringType, but got ${df.schema.fields.head.dataType}") reader.json(df.as(Encoders.STRING)) } From 9e0fc8002ec1252498cf8290bbb68599d82fea02 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sat, 4 Apr 2026 01:12:58 +0000 Subject: [PATCH 09/16] fix: add JVM-side validation for non-string and multi-column DataFrame input --- .../src/main/resources/error/error-conditions.json | 10 ++++++++++ .../sql/tests/connect/test_connect_readwriter.py | 6 +++--- python/pyspark/sql/tests/test_datasources.py | 6 +++--- .../sql/connect/planner/InvalidInputErrors.scala | 10 ++++++++++ .../sql/connect/planner/SparkConnectPlanner.scala | 12 +++++++++++- .../apache/spark/sql/api/python/PythonSQLUtils.scala | 4 ++++ 6 files changed, 41 insertions(+), 7 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 66f85059096a5..e3d325521c696 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -1164,6 +1164,16 @@ "No handler found for extension type: " ] }, + "PARSE_INPUT_NOT_SINGLE_COLUMN" : { + "message" : [ + "Input DataFrame must have exactly one column, but got ." + ] + }, + "PARSE_INPUT_NOT_STRING_TYPE" : { + "message" : [ + "Input DataFrame column must be StringType, but got ." + ] + }, "PLAN_SIZE_LARGER_THAN_MAX" : { "message" : [ "The plan size is larger than max ( vs. )", diff --git a/python/pyspark/sql/tests/connect/test_connect_readwriter.py b/python/pyspark/sql/tests/connect/test_connect_readwriter.py index d250d5adc5bf1..782e69ff8d4b5 100644 --- a/python/pyspark/sql/tests/connect/test_connect_readwriter.py +++ b/python/pyspark/sql/tests/connect/test_connect_readwriter.py @@ -197,19 +197,19 @@ def test_json_with_dataframe_input_and_schema(self): def test_json_with_dataframe_input_non_string_column(self): int_df = self.connect.createDataFrame([(1,), (2,)], schema="value INT") - with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH"): + with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_STRING_TYPE"): self.connect.read.json(int_df).collect() def test_json_with_dataframe_input_multiple_columns(self): multi_df = self.connect.createDataFrame( [("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING" ) - with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.FIELD_NUMBER_MISMATCH"): + with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_SINGLE_COLUMN"): self.connect.read.json(multi_df).collect() def test_json_with_dataframe_input_zero_columns(self): empty_schema_df = self.connect.range(1).select() - with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.FIELD_NUMBER_MISMATCH"): + with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_SINGLE_COLUMN"): self.connect.read.json(empty_schema_df).collect() def test_multi_paths(self): diff --git a/python/pyspark/sql/tests/test_datasources.py b/python/pyspark/sql/tests/test_datasources.py index 21f83477a50ab..0abcdc22e133a 100644 --- a/python/pyspark/sql/tests/test_datasources.py +++ b/python/pyspark/sql/tests/test_datasources.py @@ -113,19 +113,19 @@ def test_json_with_dataframe_input_and_schema(self): def test_json_with_dataframe_input_non_string_column(self): int_df = self.spark.createDataFrame([(1,), (2,)], schema="value INT") - with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH"): + with self.assertRaisesRegex(Exception, "StringType"): self.spark.read.json(int_df).collect() def test_json_with_dataframe_input_multiple_columns(self): multi_df = self.spark.createDataFrame( [("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING" ) - with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.FIELD_NUMBER_MISMATCH"): + with self.assertRaisesRegex(Exception, "exactly one column"): self.spark.read.json(multi_df).collect() def test_json_with_dataframe_input_zero_columns(self): empty_schema_df = self.spark.range(1).select() - with self.assertRaisesRegex(Exception, "UNSUPPORTED_DESERIALIZER.FIELD_NUMBER_MISMATCH"): + with self.assertRaisesRegex(Exception, "exactly one column"): self.spark.read.json(empty_schema_df).collect() def test_multiline_csv(self): diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala index cdba6c825332e..1052ee0391a81 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala @@ -166,6 +166,16 @@ object InvalidInputErrors { def invalidSchemaTypeNonStruct(dataType: DataType): InvalidPlanInput = InvalidPlanInput("INVALID_SCHEMA_TYPE_NON_STRUCT", Map("dataType" -> toSQLType(dataType))) + def parseInputNotSingleColumn(numColumns: Int): InvalidPlanInput = + InvalidPlanInput( + "CONNECT_INVALID_PLAN.PARSE_INPUT_NOT_SINGLE_COLUMN", + Map("numColumns" -> numColumns.toString)) + + def parseInputNotStringType(dataType: DataType): InvalidPlanInput = + InvalidPlanInput( + "CONNECT_INVALID_PLAN.PARSE_INPUT_NOT_STRING_TYPE", + Map("dataType" -> toSQLType(dataType))) + def lambdaFunctionArgumentCountInvalid(got: Int): InvalidPlanInput = InvalidPlanInput( "CONNECT_INVALID_PLAN.LAMBDA_FUNCTION_ARGUMENT_COUNT_INVALID", diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 37bcf995ee16d..c2f2263890f65 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1760,7 +1760,17 @@ class SparkConnectPlanner( localMap.foreach { case (key, value) => reader.option(key, value) } reader } - def ds: Dataset[String] = Dataset(session, transformRelation(rel.getInput))(Encoders.STRING) + def ds: Dataset[String] = { + val input = transformRelation(rel.getInput) + val inputSchema = Dataset.ofRows(session, input).schema + if (inputSchema.fields.length != 1) { + throw InvalidInputErrors.parseInputNotSingleColumn(inputSchema.fields.length) + } + if (inputSchema.fields.head.dataType != org.apache.spark.sql.types.StringType) { + throw InvalidInputErrors.parseInputNotStringType(inputSchema.fields.head.dataType) + } + Dataset(session, input)(Encoders.STRING) + } rel.getFormat match { case ParseFormat.PARSE_FORMAT_CSV => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 298aff3b156b2..308d422368bd3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -201,6 +201,10 @@ private[sql] object PythonSQLUtils extends Logging { def jsonFromDataFrame( reader: DataFrameReader, df: DataFrame): DataFrame = { + require(df.schema.fields.length == 1, + s"Input DataFrame must have exactly one column, but got ${df.schema.fields.length}") + require(df.schema.fields.head.dataType == org.apache.spark.sql.types.StringType, + s"Input DataFrame column must be StringType, but got ${df.schema.fields.head.dataType}") reader.json(df.as(Encoders.STRING)) } From 2a0f502b193457d523f139d41f23898b88b0ac45 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sat, 4 Apr 2026 01:52:49 +0000 Subject: [PATCH 10/16] fix: use Spark Error framework for DataFrame input validation --- .../src/main/resources/error/error-conditions.json | 10 ++++++++++ python/pyspark/sql/tests/test_datasources.py | 6 +++--- .../spark/sql/errors/QueryCompilationErrors.scala | 12 ++++++++++++ .../apache/spark/sql/api/python/PythonSQLUtils.scala | 11 +++++++---- 4 files changed, 32 insertions(+), 7 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index e3d325521c696..75e78f7ac0e37 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5656,6 +5656,16 @@ ], "sqlState" : "42846" }, + "PARSE_INPUT_NOT_SINGLE_COLUMN" : { + "message" : [ + "Input DataFrame must have exactly one column, but got ." + ] + }, + "PARSE_INPUT_NOT_STRING_TYPE" : { + "message" : [ + "Input DataFrame column must be StringType, but got ." + ] + }, "PARSE_EMPTY_STATEMENT" : { "message" : [ "Syntax error, unexpected empty statement." diff --git a/python/pyspark/sql/tests/test_datasources.py b/python/pyspark/sql/tests/test_datasources.py index 0abcdc22e133a..1cf1574b85d17 100644 --- a/python/pyspark/sql/tests/test_datasources.py +++ b/python/pyspark/sql/tests/test_datasources.py @@ -113,19 +113,19 @@ def test_json_with_dataframe_input_and_schema(self): def test_json_with_dataframe_input_non_string_column(self): int_df = self.spark.createDataFrame([(1,), (2,)], schema="value INT") - with self.assertRaisesRegex(Exception, "StringType"): + with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_STRING_TYPE"): self.spark.read.json(int_df).collect() def test_json_with_dataframe_input_multiple_columns(self): multi_df = self.spark.createDataFrame( [("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING" ) - with self.assertRaisesRegex(Exception, "exactly one column"): + with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_SINGLE_COLUMN"): self.spark.read.json(multi_df).collect() def test_json_with_dataframe_input_zero_columns(self): empty_schema_df = self.spark.range(1).select() - with self.assertRaisesRegex(Exception, "exactly one column"): + with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_SINGLE_COLUMN"): self.spark.read.json(empty_schema_df).collect() def test_multiline_csv(self): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index b6fb05f3f1b1a..032326fc039f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3495,6 +3495,18 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat ) } + def parseInputNotSingleColumnError(numColumns: Int): Throwable = { + new AnalysisException( + errorClass = "PARSE_INPUT_NOT_SINGLE_COLUMN", + messageParameters = Map("numColumns" -> numColumns.toString)) + } + + def parseInputNotStringTypeError(dataType: DataType): Throwable = { + new AnalysisException( + errorClass = "PARSE_INPUT_NOT_STRING_TYPE", + messageParameters = Map("dataType" -> toSQLType(dataType))) + } + def textDataSourceWithMultiColumnsError(schema: StructType): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1290", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 308d422368bd3..6138dce1b5823 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRe import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.classic.ExpressionUtils.expression import org.apache.spark.sql.execution.{ExplainMode, QueryExecution} @@ -201,10 +202,12 @@ private[sql] object PythonSQLUtils extends Logging { def jsonFromDataFrame( reader: DataFrameReader, df: DataFrame): DataFrame = { - require(df.schema.fields.length == 1, - s"Input DataFrame must have exactly one column, but got ${df.schema.fields.length}") - require(df.schema.fields.head.dataType == org.apache.spark.sql.types.StringType, - s"Input DataFrame column must be StringType, but got ${df.schema.fields.head.dataType}") + if (df.schema.fields.length != 1) { + throw QueryCompilationErrors.parseInputNotSingleColumnError(df.schema.fields.length) + } + if (df.schema.fields.head.dataType != org.apache.spark.sql.types.StringType) { + throw QueryCompilationErrors.parseInputNotStringTypeError(df.schema.fields.head.dataType) + } reader.json(df.as(Encoders.STRING)) } From 1fd89e65112b7f302a9b10578f238d04154e1527 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sat, 4 Apr 2026 04:57:05 +0000 Subject: [PATCH 11/16] fix: correct import ordering for scalastyle --- .../scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 6138dce1b5823..0f214cc51ba80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRe import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.classic.ExpressionUtils.expression +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{ExplainMode, QueryExecution} import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.execution.python.EvaluatePython From 8b39383b74d246ec302f10b5c8f94496cf35d5e1 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sat, 4 Apr 2026 05:00:58 +0000 Subject: [PATCH 12/16] fix: add sqlState for new error conditions --- common/utils/src/main/resources/error/error-conditions.json | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 75e78f7ac0e37..17dc1f0638669 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5659,12 +5659,14 @@ "PARSE_INPUT_NOT_SINGLE_COLUMN" : { "message" : [ "Input DataFrame must have exactly one column, but got ." - ] + ], + "sqlState" : "42K09" }, "PARSE_INPUT_NOT_STRING_TYPE" : { "message" : [ "Input DataFrame column must be StringType, but got ." - ] + ], + "sqlState" : "42K09" }, "PARSE_EMPTY_STATEMENT" : { "message" : [ From 7c462e4ddc3a76d3e5fc996910bee555a3cada62 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 6 Apr 2026 03:06:54 +0000 Subject: [PATCH 13/16] fix: sort error conditions in alphabetical order --- .../src/main/resources/error/error-conditions.json | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 17dc1f0638669..d9c1801180b46 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5656,6 +5656,12 @@ ], "sqlState" : "42846" }, + "PARSE_EMPTY_STATEMENT" : { + "message" : [ + "Syntax error, unexpected empty statement." + ], + "sqlState" : "42617" + }, "PARSE_INPUT_NOT_SINGLE_COLUMN" : { "message" : [ "Input DataFrame must have exactly one column, but got ." @@ -5668,12 +5674,6 @@ ], "sqlState" : "42K09" }, - "PARSE_EMPTY_STATEMENT" : { - "message" : [ - "Syntax error, unexpected empty statement." - ], - "sqlState" : "42617" - }, "PARSE_MODE_UNSUPPORTED" : { "message" : [ "The function doesn't support the mode. Acceptable modes are PERMISSIVE and FAILFAST." From 0642d646082bf5af715f83f6ff6815a52b8cbbe3 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 7 Apr 2026 19:30:47 +0000 Subject: [PATCH 14/16] refactor: address review feedback - unify errors, relax single-column constraint --- .../main/resources/error/error-conditions.json | 16 ---------------- .../tests/connect/test_connect_readwriter.py | 10 ++++++---- python/pyspark/sql/tests/test_datasources.py | 10 ++++++---- .../sql/errors/QueryCompilationErrors.scala | 6 ------ .../connect/planner/InvalidInputErrors.scala | 10 ---------- .../connect/planner/SparkConnectPlanner.scala | 14 ++++++++------ .../spark/sql/api/python/PythonSQLUtils.scala | 12 +++--------- .../spark/sql/classic/DataFrameReader.scala | 17 +++++++++++++++++ 8 files changed, 40 insertions(+), 55 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index d9c1801180b46..ee96d6d83f90e 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -1164,16 +1164,6 @@ "No handler found for extension type: " ] }, - "PARSE_INPUT_NOT_SINGLE_COLUMN" : { - "message" : [ - "Input DataFrame must have exactly one column, but got ." - ] - }, - "PARSE_INPUT_NOT_STRING_TYPE" : { - "message" : [ - "Input DataFrame column must be StringType, but got ." - ] - }, "PLAN_SIZE_LARGER_THAN_MAX" : { "message" : [ "The plan size is larger than max ( vs. )", @@ -5662,12 +5652,6 @@ ], "sqlState" : "42617" }, - "PARSE_INPUT_NOT_SINGLE_COLUMN" : { - "message" : [ - "Input DataFrame must have exactly one column, but got ." - ], - "sqlState" : "42K09" - }, "PARSE_INPUT_NOT_STRING_TYPE" : { "message" : [ "Input DataFrame column must be StringType, but got ." diff --git a/python/pyspark/sql/tests/connect/test_connect_readwriter.py b/python/pyspark/sql/tests/connect/test_connect_readwriter.py index 782e69ff8d4b5..9e8986b3c8623 100644 --- a/python/pyspark/sql/tests/connect/test_connect_readwriter.py +++ b/python/pyspark/sql/tests/connect/test_connect_readwriter.py @@ -202,14 +202,16 @@ def test_json_with_dataframe_input_non_string_column(self): def test_json_with_dataframe_input_multiple_columns(self): multi_df = self.connect.createDataFrame( - [("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING" + [('{"name": "Alice"}', "extra"), ('{"name": "Bob"}', "extra")], + schema="value STRING, other STRING", ) - with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_SINGLE_COLUMN"): - self.connect.read.json(multi_df).collect() + result = self.connect.read.json(multi_df) + expected = [Row(name="Alice"), Row(name="Bob")] + self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) def test_json_with_dataframe_input_zero_columns(self): empty_schema_df = self.connect.range(1).select() - with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_SINGLE_COLUMN"): + with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_STRING_TYPE"): self.connect.read.json(empty_schema_df).collect() def test_multi_paths(self): diff --git a/python/pyspark/sql/tests/test_datasources.py b/python/pyspark/sql/tests/test_datasources.py index 1cf1574b85d17..d742a96ed5f2e 100644 --- a/python/pyspark/sql/tests/test_datasources.py +++ b/python/pyspark/sql/tests/test_datasources.py @@ -118,14 +118,16 @@ def test_json_with_dataframe_input_non_string_column(self): def test_json_with_dataframe_input_multiple_columns(self): multi_df = self.spark.createDataFrame( - [("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING" + [('{"name": "Alice"}', "extra"), ('{"name": "Bob"}', "extra")], + schema="value STRING, other STRING", ) - with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_SINGLE_COLUMN"): - self.spark.read.json(multi_df).collect() + result = self.spark.read.json(multi_df) + expected = [Row(name="Alice"), Row(name="Bob")] + self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) def test_json_with_dataframe_input_zero_columns(self): empty_schema_df = self.spark.range(1).select() - with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_SINGLE_COLUMN"): + with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_STRING_TYPE"): self.spark.read.json(empty_schema_df).collect() def test_multiline_csv(self): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 032326fc039f3..60ed6b74cf288 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3495,12 +3495,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat ) } - def parseInputNotSingleColumnError(numColumns: Int): Throwable = { - new AnalysisException( - errorClass = "PARSE_INPUT_NOT_SINGLE_COLUMN", - messageParameters = Map("numColumns" -> numColumns.toString)) - } - def parseInputNotStringTypeError(dataType: DataType): Throwable = { new AnalysisException( errorClass = "PARSE_INPUT_NOT_STRING_TYPE", diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala index 1052ee0391a81..cdba6c825332e 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala @@ -166,16 +166,6 @@ object InvalidInputErrors { def invalidSchemaTypeNonStruct(dataType: DataType): InvalidPlanInput = InvalidPlanInput("INVALID_SCHEMA_TYPE_NON_STRUCT", Map("dataType" -> toSQLType(dataType))) - def parseInputNotSingleColumn(numColumns: Int): InvalidPlanInput = - InvalidPlanInput( - "CONNECT_INVALID_PLAN.PARSE_INPUT_NOT_SINGLE_COLUMN", - Map("numColumns" -> numColumns.toString)) - - def parseInputNotStringType(dataType: DataType): InvalidPlanInput = - InvalidPlanInput( - "CONNECT_INVALID_PLAN.PARSE_INPUT_NOT_STRING_TYPE", - Map("dataType" -> toSQLType(dataType))) - def lambdaFunctionArgumentCountInvalid(got: Int): InvalidPlanInput = InvalidPlanInput( "CONNECT_INVALID_PLAN.LAMBDA_FUNCTION_ARGUMENT_COUNT_INVALID", diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index c2f2263890f65..3a8a8f5a766c5 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1762,14 +1762,16 @@ class SparkConnectPlanner( } def ds: Dataset[String] = { val input = transformRelation(rel.getInput) - val inputSchema = Dataset.ofRows(session, input).schema - if (inputSchema.fields.length != 1) { - throw InvalidInputErrors.parseInputNotSingleColumn(inputSchema.fields.length) + val df = Dataset.ofRows(session, input) + val fields = df.schema.fields + if (fields.isEmpty) { + throw QueryCompilationErrors.parseInputNotStringTypeError( + org.apache.spark.sql.types.NullType) } - if (inputSchema.fields.head.dataType != org.apache.spark.sql.types.StringType) { - throw InvalidInputErrors.parseInputNotStringType(inputSchema.fields.head.dataType) + if (fields.head.dataType != org.apache.spark.sql.types.StringType) { + throw QueryCompilationErrors.parseInputNotStringTypeError(fields.head.dataType) } - Dataset(session, input)(Encoders.STRING) + df.select(df.columns.head).as(Encoders.STRING) } rel.getFormat match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 0f214cc51ba80..4ed02e43882d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -27,15 +27,15 @@ import org.apache.spark.api.python.DechunkedInputStream import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.CLASS_LOADER import org.apache.spark.security.SocketAuthServer -import org.apache.spark.sql.{internal, Column, DataFrame, DataFrameReader, Encoders, Row, SparkSession, TableArg} +import org.apache.spark.sql.{internal, Column, DataFrame, DataFrameReader, Row, SparkSession, TableArg} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRegistry} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.classic.{DataFrameReader => ClassicDataFrameReader} import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.classic.ExpressionUtils.expression -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{ExplainMode, QueryExecution} import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.execution.python.EvaluatePython @@ -202,13 +202,7 @@ private[sql] object PythonSQLUtils extends Logging { def jsonFromDataFrame( reader: DataFrameReader, df: DataFrame): DataFrame = { - if (df.schema.fields.length != 1) { - throw QueryCompilationErrors.parseInputNotSingleColumnError(df.schema.fields.length) - } - if (df.schema.fields.head.dataType != org.apache.spark.sql.types.StringType) { - throw QueryCompilationErrors.parseInputNotStringTypeError(df.schema.fields.head.dataType) - } - reader.json(df.as(Encoders.STRING)) + reader.asInstanceOf[ClassicDataFrameReader].json(df) } def cleanupPythonWorkerLogs(sessionUUID: String, sparkContext: SparkContext): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala index d0d6bf1e8ec0d..89835de3a532b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.UnresolvedDataSource import org.apache.spark.sql.catalyst.util.FailureSafeParser import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, XmlOptions} import org.apache.spark.sql.classic.ClassicConversions._ +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation} import org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema @@ -159,6 +160,22 @@ class DataFrameReader private[sql](sparkSession: SparkSession) json(sparkSession.createDataset(jsonRDD)(Encoders.STRING)) } + /** + * Parses a [[sql.DataFrame]] containing JSON strings into a structured [[sql.DataFrame]]. + * The first column of the input DataFrame must be of StringType. + */ + def json(df: sql.DataFrame): sql.DataFrame = { + val fields = df.schema.fields + if (fields.isEmpty) { + throw QueryCompilationErrors.parseInputNotStringTypeError( + org.apache.spark.sql.types.NullType) + } + if (fields.head.dataType != org.apache.spark.sql.types.StringType) { + throw QueryCompilationErrors.parseInputNotStringTypeError(fields.head.dataType) + } + json(df.select(df.columns.head).as(Encoders.STRING)) + } + /** @inheritdoc */ def json(jsonDataset: sql.Dataset[String]): DataFrame = { val parsedOptions = new JSONOptions( From cd205ccd27a67dd7209542017c3e6e886f7fcd43 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 7 Apr 2026 23:36:48 +0000 Subject: [PATCH 15/16] chore: retrigger CI From 30375b75fb4b5beb98f757a2ef603d181b746d1d Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Wed, 8 Apr 2026 03:56:45 +0000 Subject: [PATCH 16/16] fix: move DataFrame validation from DataFrameReader.json to PythonSQLUtils to avoid type erasure conflict --- .../spark/sql/api/python/PythonSQLUtils.scala | 14 ++++++++++++-- .../spark/sql/classic/DataFrameReader.scala | 17 ----------------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 4ed02e43882d6..aa941c81e9806 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.api.python.DechunkedInputStream import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.CLASS_LOADER import org.apache.spark.security.SocketAuthServer -import org.apache.spark.sql.{internal, Column, DataFrame, DataFrameReader, Row, SparkSession, TableArg} +import org.apache.spark.sql.{internal, Column, DataFrame, DataFrameReader, Encoders, Row, SparkSession, TableArg} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRegistry} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.classic.{DataFrameReader => ClassicDataFrameReader} import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.classic.ExpressionUtils.expression +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{ExplainMode, QueryExecution} import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.execution.python.EvaluatePython @@ -202,7 +203,16 @@ private[sql] object PythonSQLUtils extends Logging { def jsonFromDataFrame( reader: DataFrameReader, df: DataFrame): DataFrame = { - reader.asInstanceOf[ClassicDataFrameReader].json(df) + val classicReader = reader.asInstanceOf[ClassicDataFrameReader] + val fields = df.schema.fields + if (fields.isEmpty) { + throw QueryCompilationErrors.parseInputNotStringTypeError( + org.apache.spark.sql.types.NullType) + } + if (fields.head.dataType != org.apache.spark.sql.types.StringType) { + throw QueryCompilationErrors.parseInputNotStringTypeError(fields.head.dataType) + } + classicReader.json(df.select(df.columns.head).as(Encoders.STRING)) } def cleanupPythonWorkerLogs(sessionUUID: String, sparkContext: SparkContext): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala index 89835de3a532b..d0d6bf1e8ec0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.plans.logical.UnresolvedDataSource import org.apache.spark.sql.catalyst.util.FailureSafeParser import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, XmlOptions} import org.apache.spark.sql.classic.ClassicConversions._ -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation} import org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema @@ -160,22 +159,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) json(sparkSession.createDataset(jsonRDD)(Encoders.STRING)) } - /** - * Parses a [[sql.DataFrame]] containing JSON strings into a structured [[sql.DataFrame]]. - * The first column of the input DataFrame must be of StringType. - */ - def json(df: sql.DataFrame): sql.DataFrame = { - val fields = df.schema.fields - if (fields.isEmpty) { - throw QueryCompilationErrors.parseInputNotStringTypeError( - org.apache.spark.sql.types.NullType) - } - if (fields.head.dataType != org.apache.spark.sql.types.StringType) { - throw QueryCompilationErrors.parseInputNotStringTypeError(fields.head.dataType) - } - json(df.select(df.columns.head).as(Encoders.STRING)) - } - /** @inheritdoc */ def json(jsonDataset: sql.Dataset[String]): DataFrame = { val parsedOptions = new JSONOptions(