Skip to content

Commit a2f06d7

Browse files
committed
[SPARK-56176][SPARK-56232][SQL] V2-native ANALYZE TABLE/COLUMN with stats propagation to FileScan
1 parent 752ef85 commit a2f06d7

6 files changed

Lines changed: 202 additions & 30 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -430,34 +430,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
430430
case AnalyzeTables(ResolvedV1Database(db), noScan) =>
431431
AnalyzeTablesCommand(Some(db), noScan)
432432

433-
// TODO(SPARK-56176): V2-native ANALYZE TABLE/COLUMN for file tables.
434-
// FileTable from V2SessionCatalog.loadTable doesn't match V1 extractors,
435-
// so we intercept here and delegate to V1 commands using catalogTable.
436-
case AnalyzeTable(
437-
ResolvedTable(catalog, _, ft: FileTable, _),
438-
partitionSpec, noScan)
439-
if supportsV1Command(catalog)
440-
&& ft.catalogTable.isDefined =>
441-
val tableIdent = ft.catalogTable.get.identifier
442-
if (partitionSpec.isEmpty) {
443-
AnalyzeTableCommand(tableIdent, noScan)
444-
} else {
445-
AnalyzePartitionCommand(
446-
tableIdent, partitionSpec, noScan)
447-
}
448-
449433
case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
450434
AnalyzeColumnCommand(ident, columnNames, allColumns)
451435

452-
case AnalyzeColumn(
453-
ResolvedTable(catalog, _, ft: FileTable, _),
454-
columnNames, allColumns)
455-
if supportsV1Command(catalog)
456-
&& ft.catalogTable.isDefined =>
457-
AnalyzeColumnCommand(
458-
ft.catalogTable.get.identifier,
459-
columnNames, allColumns)
460-
461436
// V2 catalog doesn't support REPAIR TABLE yet, we must use v1 command here.
462437
case RepairTable(
463438
ResolvedV1TableIdentifierInSessionCatalog(ident),
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.v2
18+
19+
import org.apache.spark.sql.catalyst.InternalRow
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, TableChange}
22+
import org.apache.spark.sql.execution.command.CommandUtils
23+
24+
/**
25+
* Physical plan for ANALYZE TABLE ... FOR COLUMNS on V2
26+
* file tables. Computes column-level statistics and
27+
* persists them as table properties via
28+
* [[TableCatalog.alterTable()]].
29+
*
30+
* Column stats property key format:
31+
* `spark.sql.statistics.colStats.<col>.<stat>`
32+
*/
33+
case class AnalyzeColumnExec(
34+
catalog: TableCatalog,
35+
ident: Identifier,
36+
table: FileTable,
37+
columnNames: Option[Seq[String]],
38+
allColumns: Boolean)
39+
extends LeafV2CommandExec {
40+
41+
override def output: Seq[Attribute] = Seq.empty
42+
43+
override protected def run(): Seq[InternalRow] = {
44+
val relation = DataSourceV2Relation.create(
45+
table, Some(catalog), Some(ident))
46+
47+
val columnsToAnalyze = if (allColumns) {
48+
relation.output
49+
} else {
50+
columnNames.getOrElse(Seq.empty).map { name =>
51+
relation.output.find(
52+
_.name.equalsIgnoreCase(name)).getOrElse(
53+
throw new IllegalArgumentException(
54+
s"Column '$name' not found"))
55+
}
56+
}
57+
58+
val (rowCount, colStats) =
59+
CommandUtils.computeColumnStats(
60+
session, relation, columnsToAnalyze)
61+
62+
// Refresh fileIndex for accurate size
63+
table.fileIndex.refresh()
64+
val totalSize = table.fileIndex.sizeInBytes
65+
66+
val changes =
67+
scala.collection.mutable.ArrayBuffer(
68+
TableChange.setProperty(
69+
"spark.sql.statistics.totalSize",
70+
totalSize.toString),
71+
TableChange.setProperty(
72+
"spark.sql.statistics.numRows",
73+
rowCount.toString))
74+
75+
// Store column stats as table properties
76+
val prefix = "spark.sql.statistics.colStats."
77+
colStats.foreach { case (attr, stat) =>
78+
val catalogStat = stat.toCatalogColumnStat(
79+
attr.name, attr.dataType)
80+
catalogStat.toMap(attr.name).foreach {
81+
case (k, v) =>
82+
changes += TableChange.setProperty(
83+
prefix + k, v)
84+
}
85+
}
86+
87+
catalog.alterTable(ident, changes.toSeq: _*)
88+
Seq.empty
89+
}
90+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.v2
18+
19+
import org.apache.spark.sql.catalyst.InternalRow
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, TableChange}
22+
23+
/**
24+
* Physical plan for ANALYZE TABLE on V2 file tables.
25+
* Computes table statistics and persists them as table
26+
* properties via [[TableCatalog.alterTable()]].
27+
*
28+
* Statistics property keys:
29+
* - `spark.sql.statistics.totalSize`
30+
* - `spark.sql.statistics.numRows`
31+
*/
32+
case class AnalyzeTableExec(
33+
catalog: TableCatalog,
34+
ident: Identifier,
35+
table: FileTable,
36+
partitionSpec: Map[String, Option[String]],
37+
noScan: Boolean) extends LeafV2CommandExec {
38+
39+
override def output: Seq[Attribute] = Seq.empty
40+
41+
override protected def run(): Seq[InternalRow] = {
42+
table.fileIndex.refresh()
43+
val totalSize = table.fileIndex.sizeInBytes
44+
45+
val changes =
46+
scala.collection.mutable.ArrayBuffer(
47+
TableChange.setProperty(
48+
"spark.sql.statistics.totalSize",
49+
totalSize.toString))
50+
51+
if (!noScan) {
52+
val relation = DataSourceV2Relation.create(
53+
table, Some(catalog), Some(ident))
54+
val df = session.internalCreateDataFrame(
55+
session.sessionState.executePlan(
56+
relation).toRdd,
57+
relation.schema)
58+
val rowCount = df.count()
59+
changes += TableChange.setProperty(
60+
"spark.sql.statistics.numRows",
61+
rowCount.toString)
62+
}
63+
64+
catalog.alterTable(ident, changes.toSeq: _*)
65+
Seq.empty
66+
}
67+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,8 +530,26 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
530530
case ShowTableProperties(rt: ResolvedTable, propertyKey, output) =>
531531
ShowTablePropertiesExec(output, rt.table, rt.name, propertyKey) :: Nil
532532

533-
case AnalyzeTable(_: ResolvedTable, _, _) | AnalyzeColumn(_: ResolvedTable, _, _) =>
534-
throw QueryCompilationErrors.analyzeTableNotSupportedForV2TablesError()
533+
case AnalyzeTable(
534+
ResolvedTable(catalog, ident,
535+
ft: FileTable, _),
536+
partitionSpec, noScan) =>
537+
AnalyzeTableExec(
538+
catalog, ident, ft,
539+
partitionSpec, noScan) :: Nil
540+
541+
case AnalyzeColumn(
542+
ResolvedTable(catalog, ident,
543+
ft: FileTable, _),
544+
columnNames, allColumns) =>
545+
AnalyzeColumnExec(
546+
catalog, ident, ft,
547+
columnNames, allColumns) :: Nil
548+
549+
case AnalyzeTable(_: ResolvedTable, _, _) |
550+
AnalyzeColumn(_: ResolvedTable, _, _) =>
551+
throw QueryCompilationErrors
552+
.analyzeTableNotSupportedForV2TablesError()
535553

536554
case AddPartitions(
537555
r @ ResolvedTable(_, _, table: SupportsPartitionManagement, _), parts, ignoreIfExists) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf}
3737
import org.apache.spark.sql.internal.connector.SupportsMetadata
3838
import org.apache.spark.sql.sources.Filter
3939
import org.apache.spark.sql.types.StructType
40+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4041
import org.apache.spark.util.Utils
4142

4243
trait FileScan extends Scan
@@ -68,6 +69,8 @@ trait FileScan extends Scan
6869
*/
6970
def readPartitionSchema: StructType
7071

72+
def options: CaseInsensitiveStringMap
73+
7174
/**
7275
* Returns the filters that can be use for partition pruning
7376
*/
@@ -197,10 +200,22 @@ trait FileScan extends Scan
197200
OptionalLong.of(size)
198201
}
199202

200-
override def numRows(): OptionalLong = OptionalLong.empty()
203+
override def numRows(): OptionalLong = {
204+
// Try to read stored row count from table
205+
// properties (set by ANALYZE TABLE).
206+
storedNumRows.map(OptionalLong.of)
207+
.getOrElse(OptionalLong.empty())
208+
}
201209
}
202210
}
203211

212+
/**
213+
* Stored row count from ANALYZE TABLE, if available.
214+
* Injected via FileTable.mergedOptions as __numRows.
215+
*/
216+
protected def storedNumRows: Option[Long] =
217+
Option(options.get("__numRows")).map(_.toLong)
218+
204219
override def toBatch: Batch = this
205220

206221
override def readSchema(): StructType =

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,16 @@ abstract class FileTable(
273273
* @return
274274
*/
275275
protected def mergedOptions(options: CaseInsensitiveStringMap): CaseInsensitiveStringMap = {
276-
val finalOptions = this.options.asCaseSensitiveMap().asScala ++
276+
val base = this.options.asCaseSensitiveMap().asScala ++
277277
options.asCaseSensitiveMap().asScala
278-
new CaseInsensitiveStringMap(finalOptions.asJava)
278+
// Inject stored numRows from catalog for FileScan.estimateStatistics()
279+
val withStats = catalogTable.flatMap(_.stats)
280+
.flatMap(_.rowCount) match {
281+
case Some(rows) =>
282+
base ++ Map("__numRows" -> rows.toString)
283+
case None => base
284+
}
285+
new CaseInsensitiveStringMap(withStats.asJava)
279286
}
280287

281288
/**

0 commit comments

Comments
 (0)