[SPARK-14672][SQL] Move HiveContext analyze logic to AnalyzeTable
## What changes were proposed in this pull request? Move the implementation of `hiveContext.analyze` to the command of `AnalyzeTable`. ## How was this patch tested? Existing tests. Closes #12429 Author: Yin Huai <yhuai@databricks.com> Author: Andrew Or <andrew@databricks.com> Closes #12448 from yhuai/analyzeTable.
This commit is contained in:
parent
5cefecc95a
commit
3394b12c37
|
@ -29,12 +29,9 @@ import scala.collection.mutable.HashMap
|
|||
import scala.language.implicitConversions
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.hive.common.StatsSetupConst
|
||||
import org.apache.hadoop.hive.common.`type`.HiveDecimal
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
|
||||
import org.apache.hadoop.hive.ql.metadata.Table
|
||||
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
|
||||
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
|
||||
import org.apache.hadoop.util.VersionInfo
|
||||
|
@ -45,13 +42,12 @@ import org.apache.spark.internal.Logging
|
|||
import org.apache.spark.internal.config.ConfigEntry
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.analysis._
|
||||
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
|
||||
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
|
||||
import org.apache.spark.sql.hive.client._
|
||||
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
|
||||
import org.apache.spark.sql.hive.execution.{AnalyzeTable, DescribeHiveTableCommand, HiveNativeCommand}
|
||||
import org.apache.spark.sql.internal.{SharedState, SQLConf}
|
||||
import org.apache.spark.sql.internal.SQLConf._
|
||||
import org.apache.spark.sql.types._
|
||||
|
@ -192,77 +188,7 @@ class HiveContext private[hive](
|
|||
* @since 1.2.0
|
||||
*/
|
||||
def analyze(tableName: String) {
|
||||
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
|
||||
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
|
||||
|
||||
relation match {
|
||||
case relation: MetastoreRelation =>
|
||||
// This method is mainly based on
|
||||
// org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
|
||||
// in Hive 0.13 (except that we do not use fs.getContentSummary).
|
||||
// TODO: Generalize statistics collection.
|
||||
// TODO: Why fs.getContentSummary returns wrong size on Jenkins?
|
||||
// Can we use fs.getContentSummary in future?
|
||||
// Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
|
||||
// countFileSize to count the table size.
|
||||
val stagingDir = metadataHive.getConf(HiveConf.ConfVars.STAGINGDIR.varname,
|
||||
HiveConf.ConfVars.STAGINGDIR.defaultStrVal)
|
||||
|
||||
def calculateTableSize(fs: FileSystem, path: Path): Long = {
|
||||
val fileStatus = fs.getFileStatus(path)
|
||||
val size = if (fileStatus.isDirectory) {
|
||||
fs.listStatus(path)
|
||||
.map { status =>
|
||||
if (!status.getPath().getName().startsWith(stagingDir)) {
|
||||
calculateTableSize(fs, status.getPath)
|
||||
} else {
|
||||
0L
|
||||
}
|
||||
}
|
||||
.sum
|
||||
} else {
|
||||
fileStatus.getLen
|
||||
}
|
||||
|
||||
size
|
||||
}
|
||||
|
||||
def getFileSizeForTable(conf: HiveConf, table: Table): Long = {
|
||||
val path = table.getPath
|
||||
var size: Long = 0L
|
||||
try {
|
||||
val fs = path.getFileSystem(conf)
|
||||
size = calculateTableSize(fs, path)
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logWarning(
|
||||
s"Failed to get the size of table ${table.getTableName} in the " +
|
||||
s"database ${table.getDbName} because of ${e.toString}", e)
|
||||
size = 0L
|
||||
}
|
||||
|
||||
size
|
||||
}
|
||||
|
||||
val tableParameters = relation.hiveQlTable.getParameters
|
||||
val oldTotalSize =
|
||||
Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE))
|
||||
.map(_.toLong)
|
||||
.getOrElse(0L)
|
||||
val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable)
|
||||
// Update the Hive metastore if the total size of the table is different than the size
|
||||
// recorded in the Hive metastore.
|
||||
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
|
||||
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
|
||||
sessionState.catalog.alterTable(
|
||||
relation.table.copy(
|
||||
properties = relation.table.properties +
|
||||
(StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
|
||||
}
|
||||
case otherRelation =>
|
||||
throw new UnsupportedOperationException(
|
||||
s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")
|
||||
}
|
||||
AnalyzeTable(tableName).run(self)
|
||||
}
|
||||
|
||||
override def setConf(key: String, value: String): Unit = {
|
||||
|
|
|
@ -17,7 +17,11 @@
|
|||
|
||||
package org.apache.spark.sql.hive.execution
|
||||
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.hive.common.StatsSetupConst
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.hadoop.hive.metastore.MetaStoreUtils
|
||||
import org.apache.hadoop.hive.ql.metadata.Table
|
||||
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
|
@ -26,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
|
|||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.execution.command.RunnableCommand
|
||||
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, LogicalRelation}
|
||||
import org.apache.spark.sql.hive.HiveContext
|
||||
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
|
||||
import org.apache.spark.sql.sources._
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
|
@ -41,7 +45,80 @@ private[hive]
|
|||
case class AnalyzeTable(tableName: String) extends RunnableCommand {
|
||||
|
||||
override def run(sqlContext: SQLContext): Seq[Row] = {
|
||||
sqlContext.asInstanceOf[HiveContext].analyze(tableName)
|
||||
val sessionState = sqlContext.sessionState
|
||||
val hiveContext = sqlContext.asInstanceOf[HiveContext]
|
||||
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
|
||||
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
|
||||
|
||||
relation match {
|
||||
case relation: MetastoreRelation =>
|
||||
// This method is mainly based on
|
||||
// org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
|
||||
// in Hive 0.13 (except that we do not use fs.getContentSummary).
|
||||
// TODO: Generalize statistics collection.
|
||||
// TODO: Why fs.getContentSummary returns wrong size on Jenkins?
|
||||
// Can we use fs.getContentSummary in future?
|
||||
// Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
|
||||
// countFileSize to count the table size.
|
||||
val stagingDir = hiveContext.metadataHive.getConf(
|
||||
HiveConf.ConfVars.STAGINGDIR.varname,
|
||||
HiveConf.ConfVars.STAGINGDIR.defaultStrVal)
|
||||
|
||||
def calculateTableSize(fs: FileSystem, path: Path): Long = {
|
||||
val fileStatus = fs.getFileStatus(path)
|
||||
val size = if (fileStatus.isDirectory) {
|
||||
fs.listStatus(path)
|
||||
.map { status =>
|
||||
if (!status.getPath().getName().startsWith(stagingDir)) {
|
||||
calculateTableSize(fs, status.getPath)
|
||||
} else {
|
||||
0L
|
||||
}
|
||||
}
|
||||
.sum
|
||||
} else {
|
||||
fileStatus.getLen
|
||||
}
|
||||
|
||||
size
|
||||
}
|
||||
|
||||
def getFileSizeForTable(conf: HiveConf, table: Table): Long = {
|
||||
val path = table.getPath
|
||||
var size: Long = 0L
|
||||
try {
|
||||
val fs = path.getFileSystem(conf)
|
||||
size = calculateTableSize(fs, path)
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logWarning(
|
||||
s"Failed to get the size of table ${table.getTableName} in the " +
|
||||
s"database ${table.getDbName} because of ${e.toString}", e)
|
||||
size = 0L
|
||||
}
|
||||
|
||||
size
|
||||
}
|
||||
|
||||
val tableParameters = relation.hiveQlTable.getParameters
|
||||
val oldTotalSize =
|
||||
Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE))
|
||||
.map(_.toLong)
|
||||
.getOrElse(0L)
|
||||
val newTotalSize = getFileSizeForTable(hiveContext.hiveconf, relation.hiveQlTable)
|
||||
// Update the Hive metastore if the total size of the table is different than the size
|
||||
// recorded in the Hive metastore.
|
||||
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
|
||||
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
|
||||
sessionState.catalog.alterTable(
|
||||
relation.table.copy(
|
||||
properties = relation.table.properties +
|
||||
(StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
|
||||
}
|
||||
case otherRelation =>
|
||||
throw new UnsupportedOperationException(
|
||||
s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")
|
||||
}
|
||||
Seq.empty[Row]
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue