From afdeb5ba59e4a1a5fe004b57be1d6e785d7138fe Mon Sep 17 00:00:00 2001 From: Haoyan Geng Date: Fri, 29 May 2026 19:58:22 +0000 Subject: [PATCH 1/2] [SPARK-52812][CONNECT] Preserve spark.sql.sources.default for eager createTable(tableName, path) ### What changes were proposed in this pull request? SPARK-52812 (apache/spark#56064) made Spark Connect `Catalog.createTable` eager by re-routing the two-argument `createTable(tableName, path)` overload through `createTable(tableName, path, "parquet")`. That hardcodes the parquet provider and drops the `spark.sql.sources.default` fallback that the overload previously relied on. This PR restores the original behavior: the two-argument overload again leaves the source unset so the server resolves `spark.sql.sources.default`, while keeping the eager execution introduced by SPARK-52812. A regression test is added to `CatalogSuite`. ### Why are the changes needed? The two-argument `createTable(tableName, path)` overload is documented as "It will use the default data source configured by spark.sql.sources.default." After SPARK-52812 it always used parquet regardless of that configuration, contradicting its own contract and the classic Catalog behavior. ### Does this PR introduce _any_ user-facing change? Yes, within the unreleased master branch. `spark.catalog.createTable(tableName, path)` on Spark Connect once again honors `spark.sql.sources.default` instead of always creating a parquet table. The eager-execution behavior from SPARK-52812 is preserved. ### How was this patch tested? Added a regression test in `CatalogSuite` that sets `spark.sql.sources.default` to `json`, writes JSON data, creates the table via the two-argument overload, and asserts the resulting table uses the json provider and is readable. The test fails on the previous hardcoded-parquet behavior. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8) --- .../spark/sql/connect/CatalogSuite.scala | 21 +++++++++++++++++++ .../apache/spark/sql/connect/Catalog.scala | 12 ++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala index e8ccc9f083c63..5237554b3625d 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala @@ -163,6 +163,27 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe } } + test("createTable(tableName, path) uses spark.sql.sources.default") { + val tableName = "default_source_table" + withSQLConf("spark.sql.sources.default" -> "json") { + withTable(tableName) { + withTempPath { dir => + val session = spark + import session.implicits._ + // Write the data as JSON. If createTable hardcoded the parquet provider, reading the + // table back would fail because the files are not parquet. + Seq((1, "a")).toDF("id", "value").write.json(dir.getPath) + spark.catalog.createTable(tableName, dir.getPath) + assert(spark.catalog.tableExists(tableName)) + val ddl = spark.catalog.getCreateTableString(tableName) + assert(ddl.toLowerCase(java.util.Locale.ROOT).contains("using json")) + // Reading the table back succeeds only if it was created with the json provider. + assert(spark.table(tableName).count() == 1) + } + } + } + } + test("Cache Table APIs") { val parquetTableName = "parquet_table" withTable(parquetTableName) { diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala index 2324ca05d7b7f..50b0b1aa9ae07 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala @@ -392,7 +392,17 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog { * @since 3.5.0 */ override def createTable(tableName: String, path: String): DataFrame = { - createTable(tableName, path, "parquet") + // Leave the source unset so the server resolves spark.sql.sources.default, as documented + // above. Routing through createTable(tableName, path, "parquet") would hardcode the provider + // and ignore that configuration. + sparkSession.execute { builder => + builder.getCatalogBuilder.getCreateTableBuilder + .setTableName(tableName) + .setSchema(DataTypeProtoConverter.toConnectProtoType(new StructType)) + .setDescription("") + .putOptions("path", path) + } + sparkSession.table(tableName) } /** From f0ccaecd07d218709eec2884a2ac8f033651bc57 Mon Sep 17 00:00:00 2001 From: Haoyan Geng Date: Sat, 30 May 2026 20:37:33 +0000 Subject: [PATCH 2/2] [SPARK-52812][CONNECT] Extract shared createTable impl taking source: Option[String] Address review feedback: route the two-argument createTable(tableName, path) overload (source unset) and the five-argument overload (source set) through a single private impl that calls setSource only when the source is defined, removing the duplicated proto-build block while preserving the unset-source semantics that lets the server resolve spark.sql.sources.default. Co-authored-by: Isaac --- .../apache/spark/sql/connect/Catalog.scala | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala index 50b0b1aa9ae07..ce7a10c4026c2 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala @@ -395,14 +395,12 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog { // Leave the source unset so the server resolves spark.sql.sources.default, as documented // above. Routing through createTable(tableName, path, "parquet") would hardcode the provider // and ignore that configuration. - sparkSession.execute { builder => - builder.getCatalogBuilder.getCreateTableBuilder - .setTableName(tableName) - .setSchema(DataTypeProtoConverter.toConnectProtoType(new StructType)) - .setDescription("") - .putOptions("path", path) - } - sparkSession.table(tableName) + createTable( + tableName = tableName, + source = None, + schema = new StructType, + description = "", + options = Map("path" -> path)) } /** @@ -488,12 +486,26 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog { schema: StructType, description: String, options: Map[String, String]): DataFrame = { + createTable(tableName, Some(source), schema, description, options) + } + + /** + * Shared implementation for the public `createTable` overloads. When `source` is `None`, the + * proto's `source` field is left unset so the server resolves `spark.sql.sources.default`; + * otherwise the provided source is pinned via `setSource`. + */ + private def createTable( + tableName: String, + source: Option[String], + schema: StructType, + description: String, + options: Map[String, String]): DataFrame = { sparkSession.execute { builder => val createTableBuilder = builder.getCatalogBuilder.getCreateTableBuilder .setTableName(tableName) - .setSource(source) .setSchema(DataTypeProtoConverter.toConnectProtoType(schema)) .setDescription(description) + source.foreach(createTableBuilder.setSource) options.foreach { case (k, v) => createTableBuilder.putOptions(k, v) }