Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5652,6 +5652,12 @@
],
"sqlState" : "42617"
},
"PARSE_INPUT_NOT_STRING_TYPE" : {
"message" : [
"Input DataFrame column must be StringType, but got <dataType>."
],
"sqlState" : "42K09"
},
"PARSE_MODE_UNSUPPORTED" : {
"message" : [
"The function <funcName> doesn't support the <mode> mode. Acceptable modes are PERMISSIVE and FAILFAST."
Expand Down
36 changes: 35 additions & 1 deletion python/pyspark/sql/connect/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

from pyspark.serializers import CloudPickleSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import DataType
from pyspark.sql.types import DataType, StructType

import pyspark.sql.connect.proto as proto
from pyspark.sql.column import Column
Expand Down Expand Up @@ -383,6 +383,40 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation:
return plan


class Parse(LogicalPlan):
"""Parse a DataFrame with a single string column into a structured DataFrame."""

def __init__(
self,
child: "LogicalPlan",
format: "proto.Parse.ParseFormat.ValueType",
schema: Optional[str] = None,
options: Optional[Mapping[str, str]] = None,
) -> None:
super().__init__(child)
self._format = format
self._schema = schema
self._options = options

def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._child is not None
plan = self._create_proto_relation()
plan.parse.input.CopyFrom(self._child.plan(session))
plan.parse.format = self._format
if self._schema is not None and len(self._schema) > 0:
plan.parse.schema.CopyFrom(
pyspark_types_to_proto_types(
StructType.fromDDL(self._schema)
if not self._schema.startswith("{")
else StructType.fromJson(json.loads(self._schema))
)
)
if self._options is not None:
for k, v in self._options.items():
plan.parse.options[k] = v
return plan


class Read(LogicalPlan):
def __init__(
self,
Expand Down
31 changes: 29 additions & 2 deletions python/pyspark/sql/connect/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
LogicalPlan,
WriteOperation,
WriteOperationV2,
Parse,
)
import pyspark.sql.connect.proto as proto
from pyspark.sql.types import StructType
from pyspark.sql.utils import to_str
from pyspark.sql.readwriter import (
Expand Down Expand Up @@ -165,7 +167,7 @@ def changes(self, tableName: str) -> "DataFrame":

def json(
self,
path: PathOrPaths,
path: Union[PathOrPaths, "DataFrame"],
schema: Optional[Union[StructType, str]] = None,
primitivesAsString: Optional[Union[bool, str]] = None,
prefersDecimal: Optional[Union[bool, str]] = None,
Expand Down Expand Up @@ -220,7 +222,32 @@ def json(
)
if isinstance(path, str):
path = [path]
return self.load(path=path, format="json", schema=schema)
if isinstance(path, list):
return self.load(path=path, format="json", schema=schema)

from pyspark.sql.connect.dataframe import DataFrame

if isinstance(path, DataFrame):
# Schema must be set explicitly here because the DataFrame path
# bypasses load(), which normally calls self.schema(schema).
if schema is not None:
self.schema(schema)
return self._df(
Parse(
child=path._plan,
format=proto.Parse.ParseFormat.PARSE_FORMAT_JSON,
schema=self._schema,
options=self._options,
)
)
raise PySparkTypeError(
errorClass="NOT_EXPECTED_TYPE",
messageParameters={
"arg_name": "path",
"expected_type": "str, list, or DataFrame",
"arg_type": type(path).__name__,
},
)

json.__doc__ = PySparkDataFrameReader.json.__doc__

Expand Down
34 changes: 30 additions & 4 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def load(

def json(
self,
path: Union[str, List[str], "RDD[str]"],
path: Union[str, List[str], "RDD[str]", "DataFrame"],
schema: Optional[Union[StructType, str]] = None,
primitivesAsString: Optional[Union[bool, str]] = None,
prefersDecimal: Optional[Union[bool, str]] = None,
Expand Down Expand Up @@ -361,11 +361,15 @@ def json(
.. versionchanged:: 3.4.0
Supports Spark Connect.

.. versionchanged:: 4.2.0
Supports DataFrame input.

Parameters
----------
path : str, list or :class:`RDD`
path : str, list, :class:`RDD`, or :class:`DataFrame`
string represents path to the JSON dataset, or a list of paths,
or RDD of Strings storing JSON objects.
or RDD of Strings storing JSON objects,
or a DataFrame with a single string column containing JSON strings.
schema : :class:`pyspark.sql.types.StructType` or str, optional
an optional :class:`pyspark.sql.types.StructType` for the input schema or
a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
Expand Down Expand Up @@ -434,6 +438,20 @@ def json(
+----+---+
| Bob| 30|
+----+---+

Example 4: Parse JSON from a DataFrame with a single string column.

>>> json_df = spark.createDataFrame(
... [('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)],
... schema="value STRING",
... )
>>> spark.read.json(json_df).sort("name").show()
+---+-----+
|age| name|
+---+-----+
| 25|Alice|
| 30| Bob|
+---+-----+
"""
self._set_opts(
schema=schema,
Expand Down Expand Up @@ -486,12 +504,20 @@ def func(iterator: Iterable) -> Iterable:
assert self._spark._jvm is not None
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
return self._df(self._jreader.json(jrdd))

from pyspark.sql.dataframe import DataFrame

if isinstance(path, DataFrame):
assert self._spark._jvm is not None
return self._df(
self._spark._jvm.PythonSQLUtils.jsonFromDataFrame(self._jreader, path._jdf)
)
else:
raise PySparkTypeError(
errorClass="NOT_EXPECTED_TYPE",
messageParameters={
"arg_name": "path",
"expected_type": "str or list[RDD]",
"expected_type": "str, list, RDD, or DataFrame",
"arg_type": type(path).__name__,
},
)
Expand Down
37 changes: 37 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,43 @@ def test_csv(self):
# Read the text file as a DataFrame.
self.assert_eq(self.connect.read.csv(d).toPandas(), self.spark.read.csv(d).toPandas())

def test_json_with_dataframe_input(self):
json_df = self.connect.createDataFrame(
[('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)],
schema="value STRING",
)
result = self.connect.read.json(json_df)
expected = [Row(age=25, name="Alice"), Row(age=30, name="Bob")]
self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected)

def test_json_with_dataframe_input_and_schema(self):
json_df = self.connect.createDataFrame(
[('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)],
schema="value STRING",
)
result = self.connect.read.json(json_df, schema="name STRING, age INT")
expected = [Row(name="Alice", age=25), Row(name="Bob", age=30)]
self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected)

def test_json_with_dataframe_input_non_string_column(self):
int_df = self.connect.createDataFrame([(1,), (2,)], schema="value INT")
with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_STRING_TYPE"):
self.connect.read.json(int_df).collect()

def test_json_with_dataframe_input_multiple_columns(self):
multi_df = self.connect.createDataFrame(
[('{"name": "Alice"}', "extra"), ('{"name": "Bob"}', "extra")],
schema="value STRING, other STRING",
)
result = self.connect.read.json(multi_df)
expected = [Row(name="Alice"), Row(name="Bob")]
self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected)

def test_json_with_dataframe_input_zero_columns(self):
empty_schema_df = self.connect.range(1).select()
with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_STRING_TYPE"):
self.connect.read.json(empty_schema_df).collect()

def test_multi_paths(self):
# SPARK-42041: DataFrameReader should support list of paths

Expand Down
37 changes: 37 additions & 0 deletions python/pyspark/sql/tests/test_datasources.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,43 @@ def test_linesep_json(self):
finally:
shutil.rmtree(tpath)

def test_json_with_dataframe_input(self):
json_df = self.spark.createDataFrame(
[('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)],
schema="value STRING",
)
result = self.spark.read.json(json_df)
expected = [Row(age=25, name="Alice"), Row(age=30, name="Bob")]
self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected)

def test_json_with_dataframe_input_and_schema(self):
json_df = self.spark.createDataFrame(
[('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)],
schema="value STRING",
)
result = self.spark.read.json(json_df, schema="name STRING, age INT")
expected = [Row(name="Alice", age=25), Row(name="Bob", age=30)]
self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected)

def test_json_with_dataframe_input_non_string_column(self):
int_df = self.spark.createDataFrame([(1,), (2,)], schema="value INT")
with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_STRING_TYPE"):
self.spark.read.json(int_df).collect()

def test_json_with_dataframe_input_multiple_columns(self):
multi_df = self.spark.createDataFrame(
[('{"name": "Alice"}', "extra"), ('{"name": "Bob"}', "extra")],
schema="value STRING, other STRING",
)
result = self.spark.read.json(multi_df)
expected = [Row(name="Alice"), Row(name="Bob")]
self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case it should fail?


def test_json_with_dataframe_input_zero_columns(self):
empty_schema_df = self.spark.range(1).select()
with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_STRING_TYPE"):
self.spark.read.json(empty_schema_df).collect()

def test_multiline_csv(self):
ages_newlines = self.spark.read.csv(
"python/test_support/sql/ages_newlines.csv", multiLine=True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3495,6 +3495,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
)
}

def parseInputNotStringTypeError(dataType: DataType): Throwable = {
new AnalysisException(
errorClass = "PARSE_INPUT_NOT_STRING_TYPE",
messageParameters = Map("dataType" -> toSQLType(dataType)))
}

def textDataSourceWithMultiColumnsError(schema: StructType): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1290",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1760,7 +1760,19 @@ class SparkConnectPlanner(
localMap.foreach { case (key, value) => reader.option(key, value) }
reader
}
def ds: Dataset[String] = Dataset(session, transformRelation(rel.getInput))(Encoders.STRING)
def ds: Dataset[String] = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to avoid creating the dataset twice. Analysis can be somewhat expensive. Do this instead:

val input = transformRelation(rel.getInput)
val df = Dataset.ofRows(session, input)
val inputSchema = df.schema
if (inputSchema.fields.length != 1) {
  throw InvalidInputErrors.parseInputNotSingleColumn(inputSchema.fields.length)
}
if (inputSchema.fields.head.dataType != org.apache.spark.sql.types.StringType) {
  throw InvalidInputErrors.parseInputNotStringType(inputSchema.fields.head.dataType)
}
df.as(Encoders.STRING)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

val input = transformRelation(rel.getInput)
val df = Dataset.ofRows(session, input)
val fields = df.schema.fields
if (fields.isEmpty) {
throw QueryCompilationErrors.parseInputNotStringTypeError(
org.apache.spark.sql.types.NullType)
}
if (fields.head.dataType != org.apache.spark.sql.types.StringType) {
throw QueryCompilationErrors.parseInputNotStringTypeError(fields.head.dataType)
}
df.select(df.columns.head).as(Encoders.STRING)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't really have to add a projection here. df.as(Encoders.STRING) should work as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried removing the projection, but df.as(Encoders.STRING) on a multi-column DataFrame throws UNSUPPORTED_DESERIALIZER.FIELD_NUMBER_MISMATCH because the STRING encoder expects exactly one column. So the projection is needed to support multi-column DataFrames (using the first column). I'll keep it as-is.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the projection is needed to support multi-column DataFrames (using the first column)

I think it should fail in this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @hvanhovell wants to support multiple column case, #55097 (comment) but I am a bit not sure about how we should support multi column input.
Currently it silently drops columns after the first one when receiving more than one columns. I could also change it to raise an exception. Or, do we want to somehow join the remaining columns back after we parse the json from the first column?

@hvanhovell @zhengruifeng what do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally feel it should just fail, but if we want to support multiple columns by accept the first column, I think we need to document such behavior.
also cc @cloud-fan and @HyukjinKwon WDYT?

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

silently dropping things is an anti pattern, let's fail explicitly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok let me change it to fail the case. Then we will not be able to support multi column.

}
Comment on lines +1764 to +1775
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the checks here because otherwise INT can be implicitly cast to STRING.


rel.getFormat match {
case ParseFormat.PARSE_FORMAT_CSV =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ import org.apache.spark.api.python.DechunkedInputStream
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.CLASS_LOADER
import org.apache.spark.security.SocketAuthServer
import org.apache.spark.sql.{internal, Column, DataFrame, Row, SparkSession, TableArg}
import org.apache.spark.sql.{internal, Column, DataFrame, DataFrameReader, Encoders, Row, SparkSession, TableArg}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRegistry}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.classic.{DataFrameReader => ClassicDataFrameReader}
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.classic.ExpressionUtils.expression
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.{ExplainMode, QueryExecution}
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.execution.python.EvaluatePython
Expand Down Expand Up @@ -193,6 +195,26 @@ private[sql] object PythonSQLUtils extends Logging {
@scala.annotation.varargs
def internalFn(name: String, inputs: Column*): Column = Column.internalFn(name, inputs: _*)

/**
* Parses a [[DataFrame]] containing JSON strings into a structured [[DataFrame]].
* The input DataFrame must have exactly one column of StringType.
* This is used by PySpark to avoid manual Dataset[String] conversion on the Python side.
*/
def jsonFromDataFrame(
reader: DataFrameReader,
df: DataFrame): DataFrame = {
val classicReader = reader.asInstanceOf[ClassicDataFrameReader]
val fields = df.schema.fields
if (fields.isEmpty) {
throw QueryCompilationErrors.parseInputNotStringTypeError(
org.apache.spark.sql.types.NullType)
}
if (fields.head.dataType != org.apache.spark.sql.types.StringType) {
throw QueryCompilationErrors.parseInputNotStringTypeError(fields.head.dataType)
}
classicReader.json(df.select(df.columns.head).as(Encoders.STRING))
}

def cleanupPythonWorkerLogs(sessionUUID: String, sparkContext: SparkContext): Unit = {
if (!sparkContext.isStopped) {
try {
Expand Down