Skip to content

Commit a768b6a

Browse files
committed
fix: use Spark Error framework for DataFrame input validation
1 parent 01052c7 commit a768b6a

4 files changed

Lines changed: 32 additions & 7 deletions

File tree

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5572,6 +5572,16 @@
55725572
],
55735573
"sqlState" : "42846"
55745574
},
5575+
"PARSE_INPUT_NOT_SINGLE_COLUMN" : {
5576+
"message" : [
5577+
"Input DataFrame must have exactly one column, but got <numColumns>."
5578+
]
5579+
},
5580+
"PARSE_INPUT_NOT_STRING_TYPE" : {
5581+
"message" : [
5582+
"Input DataFrame column must be StringType, but got <dataType>."
5583+
]
5584+
},
55755585
"PARSE_EMPTY_STATEMENT" : {
55765586
"message" : [
55775587
"Syntax error, unexpected empty statement."

python/pyspark/sql/tests/test_datasources.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,19 @@ def test_json_with_dataframe_input_and_schema(self):
113113

114114
def test_json_with_dataframe_input_non_string_column(self):
115115
int_df = self.spark.createDataFrame([(1,), (2,)], schema="value INT")
116-
with self.assertRaisesRegex(Exception, "StringType"):
116+
with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_STRING_TYPE"):
117117
self.spark.read.json(int_df).collect()
118118

119119
def test_json_with_dataframe_input_multiple_columns(self):
120120
multi_df = self.spark.createDataFrame(
121121
[("a", "b"), ("c", "d")], schema="col1 STRING, col2 STRING"
122122
)
123-
with self.assertRaisesRegex(Exception, "exactly one column"):
123+
with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_SINGLE_COLUMN"):
124124
self.spark.read.json(multi_df).collect()
125125

126126
def test_json_with_dataframe_input_zero_columns(self):
127127
empty_schema_df = self.spark.range(1).select()
128-
with self.assertRaisesRegex(Exception, "exactly one column"):
128+
with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_SINGLE_COLUMN"):
129129
self.spark.read.json(empty_schema_df).collect()
130130

131131
def test_multiline_csv(self):

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3495,6 +3495,18 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
34953495
)
34963496
}
34973497

3498+
def parseInputNotSingleColumnError(numColumns: Int): Throwable = {
3499+
new AnalysisException(
3500+
errorClass = "PARSE_INPUT_NOT_SINGLE_COLUMN",
3501+
messageParameters = Map("numColumns" -> numColumns.toString))
3502+
}
3503+
3504+
def parseInputNotStringTypeError(dataType: DataType): Throwable = {
3505+
new AnalysisException(
3506+
errorClass = "PARSE_INPUT_NOT_STRING_TYPE",
3507+
messageParameters = Map("dataType" -> toSQLType(dataType)))
3508+
}
3509+
34983510
def textDataSourceWithMultiColumnsError(schema: StructType): Throwable = {
34993511
new AnalysisException(
35003512
errorClass = "_LEGACY_ERROR_TEMP_1290",

sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRe
3333
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
3434
import org.apache.spark.sql.catalyst.expressions._
3535
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
36+
import org.apache.spark.sql.errors.QueryCompilationErrors
3637
import org.apache.spark.sql.classic.ClassicConversions._
3738
import org.apache.spark.sql.classic.ExpressionUtils.expression
3839
import org.apache.spark.sql.execution.{ExplainMode, QueryExecution}
@@ -201,10 +202,12 @@ private[sql] object PythonSQLUtils extends Logging {
201202
def jsonFromDataFrame(
202203
reader: DataFrameReader,
203204
df: DataFrame): DataFrame = {
204-
require(df.schema.fields.length == 1,
205-
s"Input DataFrame must have exactly one column, but got ${df.schema.fields.length}")
206-
require(df.schema.fields.head.dataType == org.apache.spark.sql.types.StringType,
207-
s"Input DataFrame column must be StringType, but got ${df.schema.fields.head.dataType}")
205+
if (df.schema.fields.length != 1) {
206+
throw QueryCompilationErrors.parseInputNotSingleColumnError(df.schema.fields.length)
207+
}
208+
if (df.schema.fields.head.dataType != org.apache.spark.sql.types.StringType) {
209+
throw QueryCompilationErrors.parseInputNotStringTypeError(df.schema.fields.head.dataType)
210+
}
208211
reader.json(df.as(Encoders.STRING))
209212
}
210213

0 commit comments

Comments
 (0)