diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala index e8ccc9f083c63..5237554b3625d 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala @@ -163,6 +163,27 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe } } + test("createTable(tableName, path) uses spark.sql.sources.default") { + val tableName = "default_source_table" + withSQLConf("spark.sql.sources.default" -> "json") { + withTable(tableName) { + withTempPath { dir => + val session = spark + import session.implicits._ + // Write the data as JSON. If createTable hardcoded the parquet provider, reading the + // table back would fail because the files are not parquet. + Seq((1, "a")).toDF("id", "value").write.json(dir.getPath) + spark.catalog.createTable(tableName, dir.getPath) + assert(spark.catalog.tableExists(tableName)) + val ddl = spark.catalog.getCreateTableString(tableName) + assert(ddl.toLowerCase(java.util.Locale.ROOT).contains("using json")) + // Reading the table back succeeds only if it was created with the json provider. + assert(spark.table(tableName).count() == 1) + } + } + } + } + test("Cache Table APIs") { val parquetTableName = "parquet_table" withTable(parquetTableName) { diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala index 2324ca05d7b7f..ce7a10c4026c2 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala @@ -392,7 +392,15 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog { * @since 3.5.0 */ override def createTable(tableName: String, path: String): DataFrame = { - createTable(tableName, path, "parquet") + // Leave the source unset so the server resolves spark.sql.sources.default, as documented + // above. Routing through createTable(tableName, path, "parquet") would hardcode the provider + // and ignore that configuration. + createTable( + tableName = tableName, + source = None, + schema = new StructType, + description = "", + options = Map("path" -> path)) } /** @@ -478,12 +486,26 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog { schema: StructType, description: String, options: Map[String, String]): DataFrame = { + createTable(tableName, Some(source), schema, description, options) + } + + /** + * Shared implementation for the public `createTable` overloads. When `source` is `None`, the + * proto's `source` field is left unset so the server resolves `spark.sql.sources.default`; + * otherwise the provided source is pinned via `setSource`. + */ + private def createTable( + tableName: String, + source: Option[String], + schema: StructType, + description: String, + options: Map[String, String]): DataFrame = { sparkSession.execute { builder => val createTableBuilder = builder.getCatalogBuilder.getCreateTableBuilder .setTableName(tableName) - .setSource(source) .setSchema(DataTypeProtoConverter.toConnectProtoType(schema)) .setDescription(description) + source.foreach(createTableBuilder.setSource) options.foreach { case (k, v) => createTableBuilder.putOptions(k, v) }