-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-56253][PYTHON][CONNECT] Make spark.read.json accept DataFrame input #55097
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
323bfca
e34911d
f395737
a619537
609e7bf
fc96d9c
030f413
dfbd9b3
9e0fc80
2a0f502
1fd89e6
8b39383
7c462e4
0642d64
cd205cc
30375b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,6 +93,43 @@ def test_linesep_json(self): | |
| finally: | ||
| shutil.rmtree(tpath) | ||
|
|
||
| def test_json_with_dataframe_input(self): | ||
| json_df = self.spark.createDataFrame( | ||
| [('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)], | ||
| schema="value STRING", | ||
| ) | ||
| result = self.spark.read.json(json_df) | ||
| expected = [Row(age=25, name="Alice"), Row(age=30, name="Bob")] | ||
| self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) | ||
|
|
||
| def test_json_with_dataframe_input_and_schema(self): | ||
| json_df = self.spark.createDataFrame( | ||
| [('{"name": "Alice", "age": 25}',), ('{"name": "Bob", "age": 30}',)], | ||
| schema="value STRING", | ||
| ) | ||
| result = self.spark.read.json(json_df, schema="name STRING, age INT") | ||
| expected = [Row(name="Alice", age=25), Row(name="Bob", age=30)] | ||
| self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) | ||
|
|
||
Yicong-Huang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def test_json_with_dataframe_input_non_string_column(self): | ||
| int_df = self.spark.createDataFrame([(1,), (2,)], schema="value INT") | ||
| with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_STRING_TYPE"): | ||
| self.spark.read.json(int_df).collect() | ||
|
|
||
| def test_json_with_dataframe_input_multiple_columns(self): | ||
| multi_df = self.spark.createDataFrame( | ||
| [('{"name": "Alice"}', "extra"), ('{"name": "Bob"}', "extra")], | ||
| schema="value STRING, other STRING", | ||
| ) | ||
| result = self.spark.read.json(multi_df) | ||
| expected = [Row(name="Alice"), Row(name="Bob")] | ||
| self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in this case it should fail? |
||
|
|
||
| def test_json_with_dataframe_input_zero_columns(self): | ||
| empty_schema_df = self.spark.range(1).select() | ||
| with self.assertRaisesRegex(Exception, "PARSE_INPUT_NOT_STRING_TYPE"): | ||
| self.spark.read.json(empty_schema_df).collect() | ||
|
|
||
| def test_multiline_csv(self): | ||
| ages_newlines = self.spark.read.csv( | ||
| "python/test_support/sql/ages_newlines.csv", multiLine=True | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1760,7 +1760,19 @@ class SparkConnectPlanner( | |
| localMap.foreach { case (key, value) => reader.option(key, value) } | ||
| reader | ||
| } | ||
| def ds: Dataset[String] = Dataset(session, transformRelation(rel.getInput))(Encoders.STRING) | ||
| def ds: Dataset[String] = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's try to avoid creating the dataset twice. Analysis can be somewhat expensive. Do this instead: val input = transformRelation(rel.getInput)
val df = Dataset.ofRows(session, input)
val inputSchema = df.schema
if (inputSchema.fields.length != 1) {
throw InvalidInputErrors.parseInputNotSingleColumn(inputSchema.fields.length)
}
if (inputSchema.fields.head.dataType != org.apache.spark.sql.types.StringType) {
throw InvalidInputErrors.parseInputNotStringType(inputSchema.fields.head.dataType)
}
df.as(Encoders.STRING)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done! |
||
| val input = transformRelation(rel.getInput) | ||
| val df = Dataset.ofRows(session, input) | ||
| val fields = df.schema.fields | ||
| if (fields.isEmpty) { | ||
| throw QueryCompilationErrors.parseInputNotStringTypeError( | ||
| org.apache.spark.sql.types.NullType) | ||
| } | ||
| if (fields.head.dataType != org.apache.spark.sql.types.StringType) { | ||
| throw QueryCompilationErrors.parseInputNotStringTypeError(fields.head.dataType) | ||
| } | ||
| df.select(df.columns.head).as(Encoders.STRING) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't really have to add a projection here. df.as(Encoders.STRING) should work as well.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried removing the projection, but
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think it should fail in this case?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think @hvanhovell wants to support multiple column case, #55097 (comment) but I am a bit not sure about how we should support multi column input. @hvanhovell @zhengruifeng what do you think?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I personally feel it should just fail, but if we want to support multiple columns by accept the first column, I think we need to document such behavior.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. silently dropping things is an anti pattern, let's fail explicitly.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok let me change it to fail the case. Then we will not be able to support multi column. |
||
| } | ||
|
Comment on lines
+1764
to
+1775
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the checks here because otherwise INT can be implicitly cast to STRING. |
||
|
|
||
| rel.getFormat match { | ||
| case ParseFormat.PARSE_FORMAT_CSV => | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.