Revert "[SPARK-14014][SQL] Replace existing catalog with SessionCatalog"
This reverts commit 5dfc01976b
.
This commit is contained in:
parent
cf823bead1
commit
c44d140cae
|
@ -1817,8 +1817,7 @@ test_that("approxQuantile() on a DataFrame", {
|
|||
|
||||
test_that("SQL error message is returned from JVM", {
|
||||
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
|
||||
expect_equal(grepl("Table not found", retError), TRUE)
|
||||
expect_equal(grepl("blah", retError), TRUE)
|
||||
expect_equal(grepl("Table not found: blah", retError), TRUE)
|
||||
})
|
||||
|
||||
irisDF <- suppressWarnings(createDataFrame(sqlContext, iris))
|
||||
|
|
|
@ -562,9 +562,6 @@ object MimaExcludes {
|
|||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.Logging.initializeLogIfNecessary"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerEvent.logEvent"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.OutputWriterFactory.newInstance")
|
||||
) ++ Seq(
|
||||
// [SPARK-14014] Replace existing analysis.Catalog with SessionCatalog
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.this")
|
||||
) ++ Seq(
|
||||
// [SPARK-13928] Move org.apache.spark.Logging into org.apache.spark.internal.Logging
|
||||
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Logging"),
|
||||
|
|
|
@ -554,7 +554,7 @@ class SQLContext(object):
|
|||
>>> sqlContext.registerDataFrameAsTable(df, "table1")
|
||||
>>> "table1" in sqlContext.tableNames()
|
||||
True
|
||||
>>> "table1" in sqlContext.tableNames("default")
|
||||
>>> "table1" in sqlContext.tableNames("db")
|
||||
True
|
||||
"""
|
||||
if dbName is None:
|
||||
|
|
|
@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer
|
|||
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf}
|
||||
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
|
||||
import org.apache.spark.sql.catalyst.encoders.OuterScopes
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.expressions.aggregate._
|
||||
|
@ -37,22 +36,23 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
|
|||
import org.apache.spark.sql.types._
|
||||
|
||||
/**
|
||||
* A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
|
||||
* Used for testing when all relations are already filled in and the analyzer needs only
|
||||
* to resolve attribute references.
|
||||
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
|
||||
* when all relations are already filled in and the analyzer needs only to resolve attribute
|
||||
* references.
|
||||
*/
|
||||
object SimpleAnalyzer
|
||||
extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true))
|
||||
class SimpleAnalyzer(conf: CatalystConf)
|
||||
extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf)
|
||||
extends Analyzer(
|
||||
EmptyCatalog,
|
||||
EmptyFunctionRegistry,
|
||||
new SimpleCatalystConf(caseSensitiveAnalysis = true))
|
||||
|
||||
/**
|
||||
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
|
||||
* [[UnresolvedRelation]]s into fully typed objects using information in a
|
||||
* [[SessionCatalog]] and a [[FunctionRegistry]].
|
||||
* [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
|
||||
* a [[FunctionRegistry]].
|
||||
*/
|
||||
class Analyzer(
|
||||
catalog: SessionCatalog,
|
||||
catalog: Catalog,
|
||||
registry: FunctionRegistry,
|
||||
conf: CatalystConf,
|
||||
maxIterations: Int = 100)
|
||||
|
|
|
@ -0,0 +1,218 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.catalyst.analysis
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
|
||||
|
||||
|
||||
/**
|
||||
* An interface for looking up relations by name. Used by an [[Analyzer]].
|
||||
*/
|
||||
trait Catalog {
|
||||
|
||||
val conf: CatalystConf
|
||||
|
||||
def tableExists(tableIdent: TableIdentifier): Boolean
|
||||
|
||||
def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan
|
||||
|
||||
def setCurrentDatabase(databaseName: String): Unit = {
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns tuples of (tableName, isTemporary) for all tables in the given database.
|
||||
* isTemporary is a Boolean value indicates if a table is a temporary or not.
|
||||
*/
|
||||
def getTables(databaseName: Option[String]): Seq[(String, Boolean)]
|
||||
|
||||
def refreshTable(tableIdent: TableIdentifier): Unit
|
||||
|
||||
def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit
|
||||
|
||||
def unregisterTable(tableIdent: TableIdentifier): Unit
|
||||
|
||||
def unregisterAllTables(): Unit
|
||||
|
||||
/**
|
||||
* Get the table name of TableIdentifier for temporary tables.
|
||||
*/
|
||||
protected def getTableName(tableIdent: TableIdentifier): String = {
|
||||
// It is not allowed to specify database name for temporary tables.
|
||||
// We check it here and throw exception if database is defined.
|
||||
if (tableIdent.database.isDefined) {
|
||||
throw new AnalysisException("Specifying database name or other qualifiers are not allowed " +
|
||||
"for temporary tables. If the table name has dots (.) in it, please quote the " +
|
||||
"table name with backticks (`).")
|
||||
}
|
||||
if (conf.caseSensitiveAnalysis) {
|
||||
tableIdent.table
|
||||
} else {
|
||||
tableIdent.table.toLowerCase
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
|
||||
private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]
|
||||
|
||||
override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
|
||||
tables.put(getTableName(tableIdent), plan)
|
||||
}
|
||||
|
||||
override def unregisterTable(tableIdent: TableIdentifier): Unit = {
|
||||
tables.remove(getTableName(tableIdent))
|
||||
}
|
||||
|
||||
override def unregisterAllTables(): Unit = {
|
||||
tables.clear()
|
||||
}
|
||||
|
||||
override def tableExists(tableIdent: TableIdentifier): Boolean = {
|
||||
tables.containsKey(getTableName(tableIdent))
|
||||
}
|
||||
|
||||
override def lookupRelation(
|
||||
tableIdent: TableIdentifier,
|
||||
alias: Option[String] = None): LogicalPlan = {
|
||||
val tableName = getTableName(tableIdent)
|
||||
val table = tables.get(tableName)
|
||||
if (table == null) {
|
||||
throw new AnalysisException("Table not found: " + tableName)
|
||||
}
|
||||
val qualifiedTable = SubqueryAlias(tableName, table)
|
||||
|
||||
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
|
||||
// properly qualified with this alias.
|
||||
alias
|
||||
.map(a => SubqueryAlias(a, qualifiedTable))
|
||||
.getOrElse(qualifiedTable)
|
||||
}
|
||||
|
||||
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
|
||||
tables.keySet().asScala.map(_ -> true).toSeq
|
||||
}
|
||||
|
||||
override def refreshTable(tableIdent: TableIdentifier): Unit = {
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A trait that can be mixed in with other Catalogs allowing specific tables to be overridden with
|
||||
* new logical plans. This can be used to bind query result to virtual tables, or replace tables
|
||||
* with in-memory cached versions. Note that the set of overrides is stored in memory and thus
|
||||
* lost when the JVM exits.
|
||||
*/
|
||||
trait OverrideCatalog extends Catalog {
|
||||
private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan]
|
||||
|
||||
private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = {
|
||||
if (tableIdent.database.isDefined) {
|
||||
None
|
||||
} else {
|
||||
Option(overrides.get(getTableName(tableIdent)))
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def tableExists(tableIdent: TableIdentifier): Boolean = {
|
||||
getOverriddenTable(tableIdent) match {
|
||||
case Some(_) => true
|
||||
case None => super.tableExists(tableIdent)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def lookupRelation(
|
||||
tableIdent: TableIdentifier,
|
||||
alias: Option[String] = None): LogicalPlan = {
|
||||
getOverriddenTable(tableIdent) match {
|
||||
case Some(table) =>
|
||||
val tableName = getTableName(tableIdent)
|
||||
val qualifiedTable = SubqueryAlias(tableName, table)
|
||||
|
||||
// If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes
|
||||
// are properly qualified with this alias.
|
||||
alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
|
||||
|
||||
case None => super.lookupRelation(tableIdent, alias)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
|
||||
overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName)
|
||||
}
|
||||
|
||||
override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
|
||||
overrides.put(getTableName(tableIdent), plan)
|
||||
}
|
||||
|
||||
override def unregisterTable(tableIdent: TableIdentifier): Unit = {
|
||||
if (tableIdent.database.isEmpty) {
|
||||
overrides.remove(getTableName(tableIdent))
|
||||
}
|
||||
}
|
||||
|
||||
override def unregisterAllTables(): Unit = {
|
||||
overrides.clear()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A trivial catalog that returns an error when a relation is requested. Used for testing when all
|
||||
* relations are already filled in and the analyzer needs only to resolve attribute references.
|
||||
*/
|
||||
object EmptyCatalog extends Catalog {
|
||||
|
||||
override val conf: CatalystConf = EmptyConf
|
||||
|
||||
override def tableExists(tableIdent: TableIdentifier): Boolean = {
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
override def lookupRelation(
|
||||
tableIdent: TableIdentifier,
|
||||
alias: Option[String] = None): LogicalPlan = {
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
override def unregisterTable(tableIdent: TableIdentifier): Unit = {
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
override def unregisterAllTables(): Unit = {
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
override def refreshTable(tableIdent: TableIdentifier): Unit = {
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
}
|
|
@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
|
|||
errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)
|
||||
|
||||
/**
|
||||
* Holds the name of a relation that has yet to be looked up in a catalog.
|
||||
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
|
||||
*/
|
||||
case class UnresolvedRelation(
|
||||
tableIdentifier: TableIdentifier,
|
||||
|
|
|
@ -52,34 +52,37 @@ class InMemoryCatalog extends ExternalCatalog {
|
|||
names.filter { funcName => regex.pattern.matcher(funcName).matches() }
|
||||
}
|
||||
|
||||
private def functionExists(db: String, funcName: String): Boolean = {
|
||||
private def existsFunction(db: String, funcName: String): Boolean = {
|
||||
requireDbExists(db)
|
||||
catalog(db).functions.contains(funcName)
|
||||
}
|
||||
|
||||
private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
|
||||
private def existsTable(db: String, table: String): Boolean = {
|
||||
requireDbExists(db)
|
||||
catalog(db).tables.contains(table)
|
||||
}
|
||||
|
||||
private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
|
||||
requireTableExists(db, table)
|
||||
catalog(db).tables(table).partitions.contains(spec)
|
||||
}
|
||||
|
||||
private def requireFunctionExists(db: String, funcName: String): Unit = {
|
||||
if (!functionExists(db, funcName)) {
|
||||
throw new AnalysisException(
|
||||
s"Function not found: '$funcName' does not exist in database '$db'")
|
||||
if (!existsFunction(db, funcName)) {
|
||||
throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
|
||||
}
|
||||
}
|
||||
|
||||
private def requireTableExists(db: String, table: String): Unit = {
|
||||
if (!tableExists(db, table)) {
|
||||
throw new AnalysisException(
|
||||
s"Table not found: '$table' does not exist in database '$db'")
|
||||
if (!existsTable(db, table)) {
|
||||
throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
|
||||
}
|
||||
}
|
||||
|
||||
private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
|
||||
if (!partitionExists(db, table, spec)) {
|
||||
if (!existsPartition(db, table, spec)) {
|
||||
throw new AnalysisException(
|
||||
s"Partition not found: database '$db' table '$table' does not contain: '$spec'")
|
||||
s"Partition does not exist in database '$db' table '$table': '$spec'")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,7 +159,7 @@ class InMemoryCatalog extends ExternalCatalog {
|
|||
ignoreIfExists: Boolean): Unit = synchronized {
|
||||
requireDbExists(db)
|
||||
val table = tableDefinition.name.table
|
||||
if (tableExists(db, table)) {
|
||||
if (existsTable(db, table)) {
|
||||
if (!ignoreIfExists) {
|
||||
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
|
||||
}
|
||||
|
@ -170,7 +173,7 @@ class InMemoryCatalog extends ExternalCatalog {
|
|||
table: String,
|
||||
ignoreIfNotExists: Boolean): Unit = synchronized {
|
||||
requireDbExists(db)
|
||||
if (tableExists(db, table)) {
|
||||
if (existsTable(db, table)) {
|
||||
catalog(db).tables.remove(table)
|
||||
} else {
|
||||
if (!ignoreIfNotExists) {
|
||||
|
@ -197,17 +200,13 @@ class InMemoryCatalog extends ExternalCatalog {
|
|||
catalog(db).tables(table).table
|
||||
}
|
||||
|
||||
override def tableExists(db: String, table: String): Boolean = synchronized {
|
||||
requireDbExists(db)
|
||||
catalog(db).tables.contains(table)
|
||||
}
|
||||
|
||||
override def listTables(db: String): Seq[String] = synchronized {
|
||||
requireDbExists(db)
|
||||
catalog(db).tables.keySet.toSeq
|
||||
}
|
||||
|
||||
override def listTables(db: String, pattern: String): Seq[String] = synchronized {
|
||||
requireDbExists(db)
|
||||
filterPattern(listTables(db), pattern)
|
||||
}
|
||||
|
||||
|
@ -296,7 +295,7 @@ class InMemoryCatalog extends ExternalCatalog {
|
|||
|
||||
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
|
||||
requireDbExists(db)
|
||||
if (functionExists(db, func.name.funcName)) {
|
||||
if (existsFunction(db, func.name.funcName)) {
|
||||
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
|
||||
} else {
|
||||
catalog(db).functions.put(func.name.funcName, func)
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentHashMap
|
|||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
|
||||
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
|
||||
|
||||
|
@ -32,34 +31,17 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
|
|||
* proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
|
||||
* tables and functions of the Spark Session that it belongs to.
|
||||
*/
|
||||
class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
||||
class SessionCatalog(externalCatalog: ExternalCatalog) {
|
||||
import ExternalCatalog._
|
||||
|
||||
def this(externalCatalog: ExternalCatalog) {
|
||||
this(externalCatalog, new SimpleCatalystConf(true))
|
||||
}
|
||||
|
||||
protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
|
||||
protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
|
||||
private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
|
||||
private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
|
||||
|
||||
// Note: we track current database here because certain operations do not explicitly
|
||||
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
|
||||
// check whether the temporary table or function exists, then, if not, operate on
|
||||
// the corresponding item in the current database.
|
||||
protected[this] var currentDb = {
|
||||
val defaultName = "default"
|
||||
val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map())
|
||||
// Initialize default database if it doesn't already exist
|
||||
createDatabase(defaultDbDefinition, ignoreIfExists = true)
|
||||
defaultName
|
||||
}
|
||||
|
||||
/**
|
||||
* Format table name, taking into account case sensitivity.
|
||||
*/
|
||||
protected[this] def formatTableName(name: String): String = {
|
||||
if (conf.caseSensitiveAnalysis) name else name.toLowerCase
|
||||
}
|
||||
private[this] var currentDb = "default"
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// Databases
|
||||
|
@ -123,8 +105,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
*/
|
||||
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
|
||||
val db = tableDefinition.name.database.getOrElse(currentDb)
|
||||
val table = formatTableName(tableDefinition.name.table)
|
||||
val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
|
||||
val newTableDefinition = tableDefinition.copy(
|
||||
name = TableIdentifier(tableDefinition.name.table, Some(db)))
|
||||
externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
|
||||
}
|
||||
|
||||
|
@ -139,8 +121,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
*/
|
||||
def alterTable(tableDefinition: CatalogTable): Unit = {
|
||||
val db = tableDefinition.name.database.getOrElse(currentDb)
|
||||
val table = formatTableName(tableDefinition.name.table)
|
||||
val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
|
||||
val newTableDefinition = tableDefinition.copy(
|
||||
name = TableIdentifier(tableDefinition.name.table, Some(db)))
|
||||
externalCatalog.alterTable(db, newTableDefinition)
|
||||
}
|
||||
|
||||
|
@ -150,8 +132,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
*/
|
||||
def getTable(name: TableIdentifier): CatalogTable = {
|
||||
val db = name.database.getOrElse(currentDb)
|
||||
val table = formatTableName(name.table)
|
||||
externalCatalog.getTable(db, table)
|
||||
externalCatalog.getTable(db, name.table)
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------
|
||||
|
@ -165,11 +146,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
name: String,
|
||||
tableDefinition: LogicalPlan,
|
||||
ignoreIfExists: Boolean): Unit = {
|
||||
val table = formatTableName(name)
|
||||
if (tempTables.containsKey(table) && !ignoreIfExists) {
|
||||
if (tempTables.containsKey(name) && !ignoreIfExists) {
|
||||
throw new AnalysisException(s"Temporary table '$name' already exists.")
|
||||
}
|
||||
tempTables.put(table, tableDefinition)
|
||||
tempTables.put(name, tableDefinition)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -186,13 +166,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
throw new AnalysisException("rename does not support moving tables across databases")
|
||||
}
|
||||
val db = oldName.database.getOrElse(currentDb)
|
||||
val oldTableName = formatTableName(oldName.table)
|
||||
val newTableName = formatTableName(newName.table)
|
||||
if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) {
|
||||
externalCatalog.renameTable(db, oldTableName, newTableName)
|
||||
if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) {
|
||||
externalCatalog.renameTable(db, oldName.table, newName.table)
|
||||
} else {
|
||||
val table = tempTables.remove(oldTableName)
|
||||
tempTables.put(newTableName, table)
|
||||
val table = tempTables.remove(oldName.table)
|
||||
tempTables.put(newName.table, table)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -205,11 +183,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
*/
|
||||
def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
|
||||
val db = name.database.getOrElse(currentDb)
|
||||
val table = formatTableName(name.table)
|
||||
if (name.database.isDefined || !tempTables.containsKey(table)) {
|
||||
externalCatalog.dropTable(db, table, ignoreIfNotExists)
|
||||
if (name.database.isDefined || !tempTables.containsKey(name.table)) {
|
||||
externalCatalog.dropTable(db, name.table, ignoreIfNotExists)
|
||||
} else {
|
||||
tempTables.remove(table)
|
||||
tempTables.remove(name.table)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -222,42 +199,27 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
*/
|
||||
def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
|
||||
val db = name.database.getOrElse(currentDb)
|
||||
val table = formatTableName(name.table)
|
||||
val relation =
|
||||
if (name.database.isDefined || !tempTables.containsKey(table)) {
|
||||
val metadata = externalCatalog.getTable(db, table)
|
||||
if (name.database.isDefined || !tempTables.containsKey(name.table)) {
|
||||
val metadata = externalCatalog.getTable(db, name.table)
|
||||
CatalogRelation(db, metadata, alias)
|
||||
} else {
|
||||
tempTables.get(table)
|
||||
tempTables.get(name.table)
|
||||
}
|
||||
val qualifiedTable = SubqueryAlias(table, relation)
|
||||
val qualifiedTable = SubqueryAlias(name.table, relation)
|
||||
// If an alias was specified by the lookup, wrap the plan in a subquery so that
|
||||
// attributes are properly qualified with this alias.
|
||||
alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
|
||||
}
|
||||
|
||||
/**
|
||||
* Return whether a table with the specified name exists.
|
||||
*
|
||||
* Note: If a database is explicitly specified, then this will return whether the table
|
||||
* exists in that particular database instead. In that case, even if there is a temporary
|
||||
* table with the same name, we will return false if the specified database does not
|
||||
* contain the table.
|
||||
*/
|
||||
def tableExists(name: TableIdentifier): Boolean = {
|
||||
val db = name.database.getOrElse(currentDb)
|
||||
val table = formatTableName(name.table)
|
||||
if (name.database.isDefined || !tempTables.containsKey(table)) {
|
||||
externalCatalog.tableExists(db, table)
|
||||
} else {
|
||||
true // it's a temporary table
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List all tables in the specified database, including temporary tables.
|
||||
*/
|
||||
def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")
|
||||
def listTables(db: String): Seq[TableIdentifier] = {
|
||||
val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) }
|
||||
val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) }
|
||||
dbTables ++ _tempTables
|
||||
}
|
||||
|
||||
/**
|
||||
* List all matching tables in the specified database, including temporary tables.
|
||||
|
@ -272,19 +234,6 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
dbTables ++ _tempTables
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh the cache entry for a metastore table, if any.
|
||||
*/
|
||||
def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }
|
||||
|
||||
/**
|
||||
* Drop all existing temporary tables.
|
||||
* For testing only.
|
||||
*/
|
||||
def clearTempTables(): Unit = {
|
||||
tempTables.clear()
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a temporary table exactly as it was stored.
|
||||
* For testing only.
|
||||
|
@ -314,8 +263,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
parts: Seq[CatalogTablePartition],
|
||||
ignoreIfExists: Boolean): Unit = {
|
||||
val db = tableName.database.getOrElse(currentDb)
|
||||
val table = formatTableName(tableName.table)
|
||||
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
|
||||
externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -327,8 +275,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
parts: Seq[TablePartitionSpec],
|
||||
ignoreIfNotExists: Boolean): Unit = {
|
||||
val db = tableName.database.getOrElse(currentDb)
|
||||
val table = formatTableName(tableName.table)
|
||||
externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists)
|
||||
externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -342,8 +289,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
specs: Seq[TablePartitionSpec],
|
||||
newSpecs: Seq[TablePartitionSpec]): Unit = {
|
||||
val db = tableName.database.getOrElse(currentDb)
|
||||
val table = formatTableName(tableName.table)
|
||||
externalCatalog.renamePartitions(db, table, specs, newSpecs)
|
||||
externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -357,8 +303,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
*/
|
||||
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
|
||||
val db = tableName.database.getOrElse(currentDb)
|
||||
val table = formatTableName(tableName.table)
|
||||
externalCatalog.alterPartitions(db, table, parts)
|
||||
externalCatalog.alterPartitions(db, tableName.table, parts)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -367,8 +312,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
*/
|
||||
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
|
||||
val db = tableName.database.getOrElse(currentDb)
|
||||
val table = formatTableName(tableName.table)
|
||||
externalCatalog.getPartition(db, table, spec)
|
||||
externalCatalog.getPartition(db, tableName.table, spec)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -377,8 +321,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
|
|||
*/
|
||||
def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = {
|
||||
val db = tableName.database.getOrElse(currentDb)
|
||||
val table = formatTableName(tableName.table)
|
||||
externalCatalog.listPartitions(db, table)
|
||||
externalCatalog.listPartitions(db, tableName.table)
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
|
|
@ -91,8 +91,6 @@ abstract class ExternalCatalog {
|
|||
|
||||
def getTable(db: String, table: String): CatalogTable
|
||||
|
||||
def tableExists(db: String, table: String): Boolean
|
||||
|
||||
def listTables(db: String): Seq[String]
|
||||
|
||||
def listTables(db: String, pattern: String): Seq[String]
|
||||
|
|
|
@ -161,10 +161,14 @@ class AnalysisSuite extends AnalysisTest {
|
|||
}
|
||||
|
||||
test("resolve relations") {
|
||||
assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq())
|
||||
assertAnalysisError(
|
||||
UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe"))
|
||||
|
||||
checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
|
||||
|
||||
checkAnalysis(
|
||||
UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false)
|
||||
|
||||
checkAnalysis(
|
||||
UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false)
|
||||
}
|
||||
|
|
|
@ -18,21 +18,26 @@
|
|||
package org.apache.spark.sql.catalyst.analysis
|
||||
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.catalyst.SimpleCatalystConf
|
||||
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
|
||||
import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier}
|
||||
import org.apache.spark.sql.catalyst.plans.PlanTest
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
|
||||
trait AnalysisTest extends PlanTest {
|
||||
|
||||
protected val caseSensitiveAnalyzer = makeAnalyzer(caseSensitive = true)
|
||||
protected val caseInsensitiveAnalyzer = makeAnalyzer(caseSensitive = false)
|
||||
val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = {
|
||||
val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
|
||||
val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false)
|
||||
|
||||
private def makeAnalyzer(caseSensitive: Boolean): Analyzer = {
|
||||
val conf = new SimpleCatalystConf(caseSensitive)
|
||||
val catalog = new SessionCatalog(new InMemoryCatalog, conf)
|
||||
catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true)
|
||||
new Analyzer(catalog, EmptyFunctionRegistry, conf) {
|
||||
val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
|
||||
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
|
||||
|
||||
caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
|
||||
caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
|
||||
|
||||
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) {
|
||||
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
|
||||
} ->
|
||||
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) {
|
||||
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,8 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
|
|||
|
||||
import org.scalatest.BeforeAndAfter
|
||||
|
||||
import org.apache.spark.sql.catalyst.SimpleCatalystConf
|
||||
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
|
||||
import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier}
|
||||
import org.apache.spark.sql.catalyst.dsl.expressions._
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.expressions.aggregate._
|
||||
|
@ -31,11 +30,11 @@ import org.apache.spark.sql.types._
|
|||
|
||||
|
||||
class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
|
||||
private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
|
||||
private val catalog = new SessionCatalog(new InMemoryCatalog, conf)
|
||||
private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
|
||||
val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
|
||||
val catalog = new SimpleCatalog(conf)
|
||||
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
|
||||
|
||||
private val relation = LocalRelation(
|
||||
val relation = LocalRelation(
|
||||
AttributeReference("i", IntegerType)(),
|
||||
AttributeReference("d1", DecimalType(2, 1))(),
|
||||
AttributeReference("d2", DecimalType(5, 2))(),
|
||||
|
@ -44,15 +43,15 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
|
|||
AttributeReference("b", DoubleType)()
|
||||
)
|
||||
|
||||
private val i: Expression = UnresolvedAttribute("i")
|
||||
private val d1: Expression = UnresolvedAttribute("d1")
|
||||
private val d2: Expression = UnresolvedAttribute("d2")
|
||||
private val u: Expression = UnresolvedAttribute("u")
|
||||
private val f: Expression = UnresolvedAttribute("f")
|
||||
private val b: Expression = UnresolvedAttribute("b")
|
||||
val i: Expression = UnresolvedAttribute("i")
|
||||
val d1: Expression = UnresolvedAttribute("d1")
|
||||
val d2: Expression = UnresolvedAttribute("d2")
|
||||
val u: Expression = UnresolvedAttribute("u")
|
||||
val f: Expression = UnresolvedAttribute("f")
|
||||
val b: Expression = UnresolvedAttribute("b")
|
||||
|
||||
before {
|
||||
catalog.createTempTable("table", relation, ignoreIfExists = true)
|
||||
catalog.registerTable(TableIdentifier("table"), relation)
|
||||
}
|
||||
|
||||
private def checkType(expression: Expression, expectedType: DataType): Unit = {
|
||||
|
|
|
@ -225,14 +225,13 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
|
|||
|
||||
test("list tables without pattern") {
|
||||
val catalog = newBasicCatalog()
|
||||
intercept[AnalysisException] { catalog.listTables("unknown_db") }
|
||||
assert(catalog.listTables("db1").toSet == Set.empty)
|
||||
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
|
||||
}
|
||||
|
||||
test("list tables with pattern") {
|
||||
val catalog = newBasicCatalog()
|
||||
intercept[AnalysisException] { catalog.listTables("unknown_db", "*") }
|
||||
intercept[AnalysisException] { catalog.listTables("unknown_db") }
|
||||
assert(catalog.listTables("db1", "*").toSet == Set.empty)
|
||||
assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
|
||||
assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
|
||||
|
|
|
@ -397,24 +397,6 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias)
|
||||
}
|
||||
|
||||
test("table exists") {
|
||||
val catalog = new SessionCatalog(newBasicCatalog())
|
||||
assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2"))))
|
||||
assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2"))))
|
||||
assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
|
||||
assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1"))))
|
||||
assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
|
||||
// If database is explicitly specified, do not check temporary tables
|
||||
val tempTable = Range(1, 10, 1, 10, Seq())
|
||||
catalog.createTempTable("tbl3", tempTable, ignoreIfExists = false)
|
||||
assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
|
||||
// If database is not explicitly specified, check the current database
|
||||
catalog.setCurrentDatabase("db2")
|
||||
assert(catalog.tableExists(TableIdentifier("tbl1")))
|
||||
assert(catalog.tableExists(TableIdentifier("tbl2")))
|
||||
assert(catalog.tableExists(TableIdentifier("tbl3")))
|
||||
}
|
||||
|
||||
test("list tables without pattern") {
|
||||
val catalog = new SessionCatalog(newBasicCatalog())
|
||||
val tempTable = Range(1, 10, 2, 10, Seq())
|
||||
|
@ -447,7 +429,7 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
assert(catalog.listTables("db2", "*1").toSet ==
|
||||
Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2"))))
|
||||
intercept[AnalysisException] {
|
||||
catalog.listTables("unknown_db", "*")
|
||||
catalog.listTables("unknown_db")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.optimizer
|
|||
|
||||
import org.apache.spark.sql.catalyst.SimpleCatalystConf
|
||||
import org.apache.spark.sql.catalyst.analysis._
|
||||
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
|
||||
import org.apache.spark.sql.catalyst.dsl.expressions._
|
||||
import org.apache.spark.sql.catalyst.dsl.plans._
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
|
@ -138,11 +137,11 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
|
|||
checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d))
|
||||
}
|
||||
|
||||
private val caseInsensitiveConf = new SimpleCatalystConf(false)
|
||||
private val caseInsensitiveAnalyzer = new Analyzer(
|
||||
new SessionCatalog(new InMemoryCatalog, caseInsensitiveConf),
|
||||
EmptyFunctionRegistry,
|
||||
caseInsensitiveConf)
|
||||
private val caseInsensitiveAnalyzer =
|
||||
new Analyzer(
|
||||
EmptyCatalog,
|
||||
EmptyFunctionRegistry,
|
||||
new SimpleCatalystConf(caseSensitiveAnalysis = false))
|
||||
|
||||
test("(a && b) || (a && c) => a && (b || c) when case insensitive") {
|
||||
val plan = caseInsensitiveAnalyzer.execute(
|
||||
|
|
|
@ -18,8 +18,7 @@
|
|||
package org.apache.spark.sql.catalyst.optimizer
|
||||
|
||||
import org.apache.spark.sql.catalyst.SimpleCatalystConf
|
||||
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
|
||||
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
|
||||
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog}
|
||||
import org.apache.spark.sql.catalyst.dsl.expressions._
|
||||
import org.apache.spark.sql.catalyst.dsl.plans._
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
|
@ -29,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules._
|
|||
|
||||
class EliminateSortsSuite extends PlanTest {
|
||||
val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false)
|
||||
val catalog = new SessionCatalog(new InMemoryCatalog, conf)
|
||||
val catalog = new SimpleCatalog(conf)
|
||||
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
|
||||
|
||||
object Optimize extends RuleExecutor[LogicalPlan] {
|
||||
|
|
|
@ -25,14 +25,13 @@ import scala.collection.JavaConverters._
|
|||
import scala.collection.immutable
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkContext, SparkException}
|
||||
import org.apache.spark.{SparkContext, SparkException}
|
||||
import org.apache.spark.annotation.{DeveloperApi, Experimental}
|
||||
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
|
||||
import org.apache.spark.sql.catalyst._
|
||||
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
|
||||
import org.apache.spark.sql.catalyst.encoders.encoderFor
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
|
||||
|
@ -66,14 +65,13 @@ class SQLContext private[sql](
|
|||
@transient val sparkContext: SparkContext,
|
||||
@transient protected[sql] val cacheManager: CacheManager,
|
||||
@transient private[sql] val listener: SQLListener,
|
||||
val isRootContext: Boolean,
|
||||
@transient private[sql] val externalCatalog: ExternalCatalog)
|
||||
val isRootContext: Boolean)
|
||||
extends Logging with Serializable {
|
||||
|
||||
self =>
|
||||
|
||||
def this(sc: SparkContext) = {
|
||||
this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog)
|
||||
def this(sparkContext: SparkContext) = {
|
||||
this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true)
|
||||
}
|
||||
|
||||
def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
|
||||
|
@ -111,8 +109,7 @@ class SQLContext private[sql](
|
|||
sparkContext = sparkContext,
|
||||
cacheManager = cacheManager,
|
||||
listener = listener,
|
||||
isRootContext = false,
|
||||
externalCatalog = externalCatalog)
|
||||
isRootContext = false)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -189,12 +186,6 @@ class SQLContext private[sql](
|
|||
*/
|
||||
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
|
||||
|
||||
// Extract `spark.sql.*` entries and put it in our SQLConf.
|
||||
// Subclasses may additionally set these entries in other confs.
|
||||
SQLContext.getSQLProperties(sparkContext.getConf).asScala.foreach { case (k, v) =>
|
||||
setConf(k, v)
|
||||
}
|
||||
|
||||
protected[sql] def parseSql(sql: String): LogicalPlan = sessionState.sqlParser.parsePlan(sql)
|
||||
|
||||
protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql))
|
||||
|
@ -208,6 +199,30 @@ class SQLContext private[sql](
|
|||
sparkContext.addJar(path)
|
||||
}
|
||||
|
||||
{
|
||||
// We extract spark sql settings from SparkContext's conf and put them to
|
||||
// Spark SQL's conf.
|
||||
// First, we populate the SQLConf (conf). So, we can make sure that other values using
|
||||
// those settings in their construction can get the correct settings.
|
||||
// For example, metadataHive in HiveContext may need both spark.sql.hive.metastore.version
|
||||
// and spark.sql.hive.metastore.jars to get correctly constructed.
|
||||
val properties = new Properties
|
||||
sparkContext.getConf.getAll.foreach {
|
||||
case (key, value) if key.startsWith("spark.sql") => properties.setProperty(key, value)
|
||||
case _ =>
|
||||
}
|
||||
// We directly put those settings to conf to avoid of calling setConf, which may have
|
||||
// side-effects. For example, in HiveContext, setConf may cause executionHive and metadataHive
|
||||
// get constructed. If we call setConf directly, the constructed metadataHive may have
|
||||
// wrong settings, or the construction may fail.
|
||||
conf.setConf(properties)
|
||||
// After we have populated SQLConf, we call setConf to populate other confs in the subclass
|
||||
// (e.g. hiveconf in HiveContext).
|
||||
properties.asScala.foreach {
|
||||
case (key, value) => setConf(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* :: Experimental ::
|
||||
* A collection of methods that are considered experimental, but can be used to hook into
|
||||
|
@ -668,10 +683,8 @@ class SQLContext private[sql](
|
|||
* only during the lifetime of this instance of SQLContext.
|
||||
*/
|
||||
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
|
||||
sessionState.catalog.createTempTable(
|
||||
sessionState.sqlParser.parseTableIdentifier(tableName).table,
|
||||
df.logicalPlan,
|
||||
ignoreIfExists = true)
|
||||
sessionState.catalog.registerTable(
|
||||
sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -684,7 +697,7 @@ class SQLContext private[sql](
|
|||
*/
|
||||
def dropTempTable(tableName: String): Unit = {
|
||||
cacheManager.tryUncacheQuery(table(tableName))
|
||||
sessionState.catalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true)
|
||||
sessionState.catalog.unregisterTable(TableIdentifier(tableName))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -811,7 +824,9 @@ class SQLContext private[sql](
|
|||
* @since 1.3.0
|
||||
*/
|
||||
def tableNames(): Array[String] = {
|
||||
tableNames(sessionState.catalog.getCurrentDatabase)
|
||||
sessionState.catalog.getTables(None).map {
|
||||
case (tableName, _) => tableName
|
||||
}.toArray
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -821,7 +836,9 @@ class SQLContext private[sql](
|
|||
* @since 1.3.0
|
||||
*/
|
||||
def tableNames(databaseName: String): Array[String] = {
|
||||
sessionState.catalog.listTables(databaseName).map(_.table).toArray
|
||||
sessionState.catalog.getTables(Some(databaseName)).map {
|
||||
case (tableName, _) => tableName
|
||||
}.toArray
|
||||
}
|
||||
|
||||
@transient
|
||||
|
@ -1008,18 +1025,4 @@ object SQLContext {
|
|||
}
|
||||
sqlListener.get()
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract `spark.sql.*` properties from the conf and return them as a [[Properties]].
|
||||
*/
|
||||
private[sql] def getSQLProperties(sparkConf: SparkConf): Properties = {
|
||||
val properties = new Properties
|
||||
sparkConf.getAll.foreach { case (key, value) =>
|
||||
if (key.startsWith("spark.sql")) {
|
||||
properties.setProperty(key, value)
|
||||
}
|
||||
}
|
||||
properties
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -339,12 +339,10 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma
|
|||
override def run(sqlContext: SQLContext): Seq[Row] = {
|
||||
// Since we need to return a Seq of rows, we will call getTables directly
|
||||
// instead of calling tables in sqlContext.
|
||||
val catalog = sqlContext.sessionState.catalog
|
||||
val db = databaseName.getOrElse(catalog.getCurrentDatabase)
|
||||
val rows = catalog.listTables(db).map { t =>
|
||||
val isTemp = t.database.isEmpty
|
||||
Row(t.table, isTemp)
|
||||
val rows = sqlContext.sessionState.catalog.getTables(databaseName).map {
|
||||
case (tableName, isTemporary) => Row(tableName, isTemporary)
|
||||
}
|
||||
|
||||
rows
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,21 +93,15 @@ case class CreateTempTableUsing(
|
|||
provider: String,
|
||||
options: Map[String, String]) extends RunnableCommand {
|
||||
|
||||
if (tableIdent.database.isDefined) {
|
||||
throw new AnalysisException(
|
||||
s"Temporary table '$tableIdent' should not have specified a database")
|
||||
}
|
||||
|
||||
def run(sqlContext: SQLContext): Seq[Row] = {
|
||||
val dataSource = DataSource(
|
||||
sqlContext,
|
||||
userSpecifiedSchema = userSpecifiedSchema,
|
||||
className = provider,
|
||||
options = options)
|
||||
sqlContext.sessionState.catalog.createTempTable(
|
||||
tableIdent.table,
|
||||
Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
|
||||
ignoreIfExists = true)
|
||||
sqlContext.sessionState.catalog.registerTable(
|
||||
tableIdent,
|
||||
Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
|
||||
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
@ -121,11 +115,6 @@ case class CreateTempTableUsingAsSelect(
|
|||
options: Map[String, String],
|
||||
query: LogicalPlan) extends RunnableCommand {
|
||||
|
||||
if (tableIdent.database.isDefined) {
|
||||
throw new AnalysisException(
|
||||
s"Temporary table '$tableIdent' should not have specified a database")
|
||||
}
|
||||
|
||||
override def run(sqlContext: SQLContext): Seq[Row] = {
|
||||
val df = Dataset.ofRows(sqlContext, query)
|
||||
val dataSource = DataSource(
|
||||
|
@ -135,10 +124,9 @@ case class CreateTempTableUsingAsSelect(
|
|||
bucketSpec = None,
|
||||
options = options)
|
||||
val result = dataSource.write(mode, df)
|
||||
sqlContext.sessionState.catalog.createTempTable(
|
||||
tableIdent.table,
|
||||
Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan,
|
||||
ignoreIfExists = true)
|
||||
sqlContext.sessionState.catalog.registerTable(
|
||||
tableIdent,
|
||||
Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan)
|
||||
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
|
|
@ -19,12 +19,10 @@ package org.apache.spark.sql.execution.datasources
|
|||
|
||||
import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
|
||||
import org.apache.spark.sql.catalyst.analysis._
|
||||
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
|
||||
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering}
|
||||
import org.apache.spark.sql.catalyst.plans.logical
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation}
|
||||
|
||||
/**
|
||||
|
@ -101,9 +99,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
|
|||
/**
|
||||
* A rule to do various checks before inserting into or writing to a data source table.
|
||||
*/
|
||||
private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
|
||||
extends (LogicalPlan => Unit) {
|
||||
|
||||
private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) {
|
||||
def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
|
||||
|
||||
def apply(plan: LogicalPlan): Unit = {
|
||||
|
@ -143,7 +139,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
|
|||
}
|
||||
|
||||
PartitioningUtils.validatePartitionColumnDataTypes(
|
||||
r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)
|
||||
r.schema, part.keySet.toSeq, catalog.conf.caseSensitiveAnalysis)
|
||||
|
||||
// Get all input data source relations of the query.
|
||||
val srcRelations = query.collect {
|
||||
|
@ -194,7 +190,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
|
|||
}
|
||||
|
||||
PartitioningUtils.validatePartitionColumnDataTypes(
|
||||
c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
|
||||
c.child.schema, c.partitionColumns, catalog.conf.caseSensitiveAnalysis)
|
||||
|
||||
for {
|
||||
spec <- c.bucketSpec
|
||||
|
|
|
@ -18,8 +18,7 @@
|
|||
package org.apache.spark.sql.internal
|
||||
|
||||
import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration}
|
||||
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
|
||||
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
|
||||
import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog}
|
||||
import org.apache.spark.sql.catalyst.optimizer.Optimizer
|
||||
import org.apache.spark.sql.catalyst.parser.ParserInterface
|
||||
import org.apache.spark.sql.catalyst.rules.RuleExecutor
|
||||
|
@ -46,7 +45,7 @@ private[sql] class SessionState(ctx: SQLContext) {
|
|||
/**
|
||||
* Internal catalog for managing table and database states.
|
||||
*/
|
||||
lazy val catalog = new SessionCatalog(ctx.externalCatalog, conf)
|
||||
lazy val catalog: Catalog = new SimpleCatalog(conf)
|
||||
|
||||
/**
|
||||
* Internal catalog for managing functions registered by the user.
|
||||
|
@ -69,7 +68,7 @@ private[sql] class SessionState(ctx: SQLContext) {
|
|||
DataSourceAnalysis ::
|
||||
(if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
|
||||
|
||||
override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog))
|
||||
override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -33,8 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
|
|||
}
|
||||
|
||||
after {
|
||||
sqlContext.sessionState.catalog.dropTable(
|
||||
TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
|
||||
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
|
||||
}
|
||||
|
||||
test("get all tables") {
|
||||
|
@ -46,22 +45,20 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
|
|||
sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
|
||||
Row("ListTablesSuiteTable", true))
|
||||
|
||||
sqlContext.sessionState.catalog.dropTable(
|
||||
TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
|
||||
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
|
||||
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
|
||||
}
|
||||
|
||||
test("getting all tables with a database name has no impact on returned table names") {
|
||||
test("getting all Tables with a database name has no impact on returned table names") {
|
||||
checkAnswer(
|
||||
sqlContext.tables("default").filter("tableName = 'ListTablesSuiteTable'"),
|
||||
sqlContext.tables("DB").filter("tableName = 'ListTablesSuiteTable'"),
|
||||
Row("ListTablesSuiteTable", true))
|
||||
|
||||
checkAnswer(
|
||||
sql("show TABLES in default").filter("tableName = 'ListTablesSuiteTable'"),
|
||||
sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
|
||||
Row("ListTablesSuiteTable", true))
|
||||
|
||||
sqlContext.sessionState.catalog.dropTable(
|
||||
TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
|
||||
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
|
||||
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
|||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
||||
class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
|
||||
class SQLContextSuite extends SparkFunSuite with SharedSparkContext{
|
||||
|
||||
object DummyRule extends Rule[LogicalPlan] {
|
||||
def apply(p: LogicalPlan): LogicalPlan = p
|
||||
|
@ -78,11 +78,4 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
|
|||
sqlContext.experimental.extraOptimizations = Seq(DummyRule)
|
||||
assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule))
|
||||
}
|
||||
|
||||
test("SQLContext can access `spark.sql.*` configs") {
|
||||
sc.conf.set("spark.sql.with.or.without.you", "my love")
|
||||
val sqlContext = new SQLContext(sc)
|
||||
assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1397,16 +1397,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|
|||
}
|
||||
|
||||
test("SPARK-4699 case sensitivity SQL query") {
|
||||
val orig = sqlContext.getConf(SQLConf.CASE_SENSITIVE)
|
||||
try {
|
||||
sqlContext.setConf(SQLConf.CASE_SENSITIVE, false)
|
||||
val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
|
||||
val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
|
||||
rdd.toDF().registerTempTable("testTable1")
|
||||
checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
|
||||
} finally {
|
||||
sqlContext.setConf(SQLConf.CASE_SENSITIVE, orig)
|
||||
}
|
||||
sqlContext.setConf(SQLConf.CASE_SENSITIVE, false)
|
||||
val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
|
||||
val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
|
||||
rdd.toDF().registerTempTable("testTable1")
|
||||
checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
|
||||
sqlContext.setConf(SQLConf.CASE_SENSITIVE, true)
|
||||
}
|
||||
|
||||
test("SPARK-6145: ORDER BY test for nested fields") {
|
||||
|
@ -1680,8 +1676,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|
|||
.format("parquet")
|
||||
.save(path)
|
||||
|
||||
// We don't support creating a temporary table while specifying a database
|
||||
intercept[AnalysisException] {
|
||||
val message = intercept[AnalysisException] {
|
||||
sqlContext.sql(
|
||||
s"""
|
||||
|CREATE TEMPORARY TABLE db.t
|
||||
|
@ -1691,8 +1686,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|
|||
|)
|
||||
""".stripMargin)
|
||||
}.getMessage
|
||||
assert(message.contains("Specifying database name or other qualifiers are not allowed"))
|
||||
|
||||
// If you use backticks to quote the name then it's OK.
|
||||
// If you use backticks to quote the name of a temporary table having dot in it.
|
||||
sqlContext.sql(
|
||||
s"""
|
||||
|CREATE TEMPORARY TABLE `db.t`
|
||||
|
|
|
@ -51,8 +51,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
|
|||
sql("INSERT INTO TABLE t SELECT * FROM tmp")
|
||||
checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
|
||||
}
|
||||
sqlContext.sessionState.catalog.dropTable(
|
||||
TableIdentifier("tmp"), ignoreIfNotExists = true)
|
||||
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
|
||||
}
|
||||
|
||||
test("overwriting") {
|
||||
|
@ -62,8 +61,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
|
|||
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
|
||||
checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
|
||||
}
|
||||
sqlContext.sessionState.catalog.dropTable(
|
||||
TableIdentifier("tmp"), ignoreIfNotExists = true)
|
||||
sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
|
||||
}
|
||||
|
||||
test("self-join") {
|
||||
|
|
|
@ -189,8 +189,8 @@ private[sql] trait SQLTestUtils
|
|||
* `f` returns.
|
||||
*/
|
||||
protected def activateDatabase(db: String)(f: => Unit): Unit = {
|
||||
sqlContext.sessionState.catalog.setCurrentDatabase(db)
|
||||
try f finally sqlContext.sessionState.catalog.setCurrentDatabase("default")
|
||||
sqlContext.sql(s"USE $db")
|
||||
try f finally sqlContext.sql(s"USE default")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -150,8 +150,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
|
|||
}
|
||||
|
||||
if (sessionState.database != null) {
|
||||
SparkSQLEnv.hiveContext.sessionState.catalog.setCurrentDatabase(
|
||||
s"${sessionState.database}")
|
||||
SparkSQLEnv.hiveContext.runSqlHive(s"USE ${sessionState.database}")
|
||||
}
|
||||
|
||||
// Execute -i init files (always in silent mode)
|
||||
|
|
|
@ -193,7 +193,10 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
|
|||
)
|
||||
|
||||
runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))(
|
||||
"" -> "hive_test"
|
||||
""
|
||||
-> "OK",
|
||||
""
|
||||
-> "hive_test"
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -85,6 +85,7 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
|
|||
withClient { getTable(db, table) }
|
||||
}
|
||||
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// Databases
|
||||
// --------------------------------------------------------------------------
|
||||
|
@ -181,10 +182,6 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
|
|||
client.getTable(db, table)
|
||||
}
|
||||
|
||||
override def tableExists(db: String, table: String): Boolean = withClient {
|
||||
client.getTableOption(db, table).isDefined
|
||||
}
|
||||
|
||||
override def listTables(db: String): Seq[String] = withClient {
|
||||
requireDbExists(db)
|
||||
client.listTables(db)
|
||||
|
|
|
@ -28,7 +28,6 @@ import scala.collection.JavaConverters._
|
|||
import scala.collection.mutable.HashMap
|
||||
import scala.language.implicitConversions
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.hive.common.StatsSetupConst
|
||||
import org.apache.hadoop.hive.common.`type`.HiveDecimal
|
||||
|
@ -39,7 +38,7 @@ import org.apache.hadoop.hive.ql.parse.VariableSubstitution
|
|||
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
|
||||
import org.apache.hadoop.util.VersionInfo
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.api.java.JavaSparkContext
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql._
|
||||
|
@ -53,7 +52,6 @@ import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
|
|||
import org.apache.spark.sql.execution.ui.SQLListener
|
||||
import org.apache.spark.sql.hive.client._
|
||||
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
|
||||
import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._
|
||||
import org.apache.spark.sql.types._
|
||||
|
@ -69,7 +67,7 @@ private[hive] case class CurrentDatabase(ctx: HiveContext)
|
|||
override def foldable: Boolean = true
|
||||
override def nullable: Boolean = false
|
||||
override def eval(input: InternalRow): Any = {
|
||||
UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase)
|
||||
UTF8String.fromString(ctx.metadataHive.currentDatabase)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -83,31 +81,15 @@ class HiveContext private[hive](
|
|||
sc: SparkContext,
|
||||
cacheManager: CacheManager,
|
||||
listener: SQLListener,
|
||||
@transient private[hive] val executionHive: HiveClientImpl,
|
||||
@transient private[hive] val metadataHive: HiveClient,
|
||||
isRootContext: Boolean,
|
||||
@transient private[sql] val hiveCatalog: HiveCatalog)
|
||||
extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging {
|
||||
@transient private val execHive: HiveClientImpl,
|
||||
@transient private val metaHive: HiveClient,
|
||||
isRootContext: Boolean)
|
||||
extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging {
|
||||
self =>
|
||||
|
||||
private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) {
|
||||
this(
|
||||
sc,
|
||||
new CacheManager,
|
||||
SQLContext.createListenerAndUI(sc),
|
||||
execHive,
|
||||
metaHive,
|
||||
true,
|
||||
new HiveCatalog(metaHive))
|
||||
}
|
||||
|
||||
def this(sc: SparkContext) = {
|
||||
this(
|
||||
sc,
|
||||
HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
|
||||
HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration))
|
||||
this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), null, null, true)
|
||||
}
|
||||
|
||||
def this(sc: JavaSparkContext) = this(sc.sc)
|
||||
|
||||
import org.apache.spark.sql.hive.HiveContext._
|
||||
|
@ -124,10 +106,9 @@ class HiveContext private[hive](
|
|||
sc = sc,
|
||||
cacheManager = cacheManager,
|
||||
listener = listener,
|
||||
executionHive = executionHive.newSession(),
|
||||
metadataHive = metadataHive.newSession(),
|
||||
isRootContext = false,
|
||||
hiveCatalog = hiveCatalog)
|
||||
execHive = executionHive.newSession(),
|
||||
metaHive = metadataHive.newSession(),
|
||||
isRootContext = false)
|
||||
}
|
||||
|
||||
@transient
|
||||
|
@ -168,6 +149,41 @@ class HiveContext private[hive](
|
|||
*/
|
||||
protected[sql] def convertCTAS: Boolean = getConf(CONVERT_CTAS)
|
||||
|
||||
/**
|
||||
* The version of the hive client that will be used to communicate with the metastore. Note that
|
||||
* this does not necessarily need to be the same version of Hive that is used internally by
|
||||
* Spark SQL for execution.
|
||||
*/
|
||||
protected[hive] def hiveMetastoreVersion: String = getConf(HIVE_METASTORE_VERSION)
|
||||
|
||||
/**
|
||||
* The location of the jars that should be used to instantiate the HiveMetastoreClient. This
|
||||
* property can be one of three options:
|
||||
* - a classpath in the standard format for both hive and hadoop.
|
||||
* - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
|
||||
* option is only valid when using the execution version of Hive.
|
||||
* - maven - download the correct version of hive on demand from maven.
|
||||
*/
|
||||
protected[hive] def hiveMetastoreJars: String = getConf(HIVE_METASTORE_JARS)
|
||||
|
||||
/**
|
||||
* A comma separated list of class prefixes that should be loaded using the classloader that
|
||||
* is shared between Spark SQL and a specific version of Hive. An example of classes that should
|
||||
* be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
|
||||
* to be shared are those that interact with classes that are already shared. For example,
|
||||
* custom appenders that are used by log4j.
|
||||
*/
|
||||
protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
|
||||
getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "")
|
||||
|
||||
/**
|
||||
* A comma separated list of class prefixes that should explicitly be reloaded for each version
|
||||
* of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
|
||||
* prefix that typically would be shared (i.e. org.apache.spark.*)
|
||||
*/
|
||||
protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
|
||||
getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "")
|
||||
|
||||
/*
|
||||
* hive thrift server use background spark sql thread pool to execute sql queries
|
||||
*/
|
||||
|
@ -179,6 +195,29 @@ class HiveContext private[hive](
|
|||
@transient
|
||||
protected[sql] lazy val substitutor = new VariableSubstitution()
|
||||
|
||||
/**
|
||||
* The copy of the hive client that is used for execution. Currently this must always be
|
||||
* Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the
|
||||
* client is used for execution related tasks like registering temporary functions or ensuring
|
||||
* that the ThreadLocal SessionState is correctly populated. This copy of Hive is *not* used
|
||||
* for storing persistent metadata, and only point to a dummy metastore in a temporary directory.
|
||||
*/
|
||||
@transient
|
||||
protected[hive] lazy val executionHive: HiveClientImpl = if (execHive != null) {
|
||||
execHive
|
||||
} else {
|
||||
logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
|
||||
val loader = new IsolatedClientLoader(
|
||||
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
|
||||
sparkConf = sc.conf,
|
||||
execJars = Seq(),
|
||||
hadoopConf = sc.hadoopConfiguration,
|
||||
config = newTemporaryConfiguration(useInMemoryDerby = true),
|
||||
isolationOn = false,
|
||||
baseClassLoader = Utils.getContextOrSparkClassLoader)
|
||||
loader.createClient().asInstanceOf[HiveClientImpl]
|
||||
}
|
||||
|
||||
/**
|
||||
* Overrides default Hive configurations to avoid breaking changes to Spark SQL users.
|
||||
* - allow SQL11 keywords to be used as identifiers
|
||||
|
@ -189,6 +228,111 @@ class HiveContext private[hive](
|
|||
|
||||
defaultOverrides()
|
||||
|
||||
/**
|
||||
* The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore.
|
||||
* The version of the Hive client that is used here must match the metastore that is configured
|
||||
* in the hive-site.xml file.
|
||||
*/
|
||||
@transient
|
||||
protected[hive] lazy val metadataHive: HiveClient = if (metaHive != null) {
|
||||
metaHive
|
||||
} else {
|
||||
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
|
||||
|
||||
// We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
|
||||
// into the isolated client loader
|
||||
val metadataConf = new HiveConf(sc.hadoopConfiguration, classOf[HiveConf])
|
||||
|
||||
val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir")
|
||||
logInfo("default warehouse location is " + defaultWarehouseLocation)
|
||||
|
||||
// `configure` goes second to override other settings.
|
||||
val allConfig = metadataConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configure
|
||||
|
||||
val isolatedLoader = if (hiveMetastoreJars == "builtin") {
|
||||
if (hiveExecutionVersion != hiveMetastoreVersion) {
|
||||
throw new IllegalArgumentException(
|
||||
"Builtin jars can only be used when hive execution version == hive metastore version. " +
|
||||
s"Execution: ${hiveExecutionVersion} != Metastore: ${hiveMetastoreVersion}. " +
|
||||
"Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
|
||||
s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.")
|
||||
}
|
||||
|
||||
// We recursively find all jars in the class loader chain,
|
||||
// starting from the given classLoader.
|
||||
def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
|
||||
case null => Array.empty[URL]
|
||||
case urlClassLoader: URLClassLoader =>
|
||||
urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
|
||||
case other => allJars(other.getParent)
|
||||
}
|
||||
|
||||
val classLoader = Utils.getContextOrSparkClassLoader
|
||||
val jars = allJars(classLoader)
|
||||
if (jars.length == 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"Unable to locate hive jars to connect to metastore. " +
|
||||
"Please set spark.sql.hive.metastore.jars.")
|
||||
}
|
||||
|
||||
logInfo(
|
||||
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
|
||||
new IsolatedClientLoader(
|
||||
version = metaVersion,
|
||||
sparkConf = sc.conf,
|
||||
execJars = jars.toSeq,
|
||||
hadoopConf = sc.hadoopConfiguration,
|
||||
config = allConfig,
|
||||
isolationOn = true,
|
||||
barrierPrefixes = hiveMetastoreBarrierPrefixes,
|
||||
sharedPrefixes = hiveMetastoreSharedPrefixes)
|
||||
} else if (hiveMetastoreJars == "maven") {
|
||||
// TODO: Support for loading the jars from an already downloaded location.
|
||||
logInfo(
|
||||
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
|
||||
IsolatedClientLoader.forVersion(
|
||||
hiveMetastoreVersion = hiveMetastoreVersion,
|
||||
hadoopVersion = VersionInfo.getVersion,
|
||||
sparkConf = sc.conf,
|
||||
hadoopConf = sc.hadoopConfiguration,
|
||||
config = allConfig,
|
||||
barrierPrefixes = hiveMetastoreBarrierPrefixes,
|
||||
sharedPrefixes = hiveMetastoreSharedPrefixes)
|
||||
} else {
|
||||
// Convert to files and expand any directories.
|
||||
val jars =
|
||||
hiveMetastoreJars
|
||||
.split(File.pathSeparator)
|
||||
.flatMap {
|
||||
case path if new File(path).getName() == "*" =>
|
||||
val files = new File(path).getParentFile().listFiles()
|
||||
if (files == null) {
|
||||
logWarning(s"Hive jar path '$path' does not exist.")
|
||||
Nil
|
||||
} else {
|
||||
files.filter(_.getName().toLowerCase().endsWith(".jar"))
|
||||
}
|
||||
case path =>
|
||||
new File(path) :: Nil
|
||||
}
|
||||
.map(_.toURI.toURL)
|
||||
|
||||
logInfo(
|
||||
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
|
||||
s"using ${jars.mkString(":")}")
|
||||
new IsolatedClientLoader(
|
||||
version = metaVersion,
|
||||
sparkConf = sc.conf,
|
||||
execJars = jars.toSeq,
|
||||
hadoopConf = sc.hadoopConfiguration,
|
||||
config = allConfig,
|
||||
isolationOn = true,
|
||||
barrierPrefixes = hiveMetastoreBarrierPrefixes,
|
||||
sharedPrefixes = hiveMetastoreSharedPrefixes)
|
||||
}
|
||||
isolatedLoader.createClient()
|
||||
}
|
||||
|
||||
protected[sql] override def parseSql(sql: String): LogicalPlan = {
|
||||
executionHive.withHiveState {
|
||||
super.parseSql(substitutor.substitute(hiveconf, sql))
|
||||
|
@ -288,7 +432,7 @@ class HiveContext private[hive](
|
|||
// recorded in the Hive metastore.
|
||||
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
|
||||
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
|
||||
sessionState.catalog.alterTable(
|
||||
sessionState.catalog.client.alterTable(
|
||||
relation.table.copy(
|
||||
properties = relation.table.properties +
|
||||
(StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
|
||||
|
@ -315,10 +459,64 @@ class HiveContext private[hive](
|
|||
setConf(entry.key, entry.stringConverter(value))
|
||||
}
|
||||
|
||||
/** Overridden by child classes that need to set configuration before the client init. */
|
||||
protected def configure(): Map[String, String] = {
|
||||
// Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
|
||||
// of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
|
||||
// compatibility when users are trying to connecting to a Hive metastore of lower version,
|
||||
// because these options are expected to be integral values in lower versions of Hive.
|
||||
//
|
||||
// Here we enumerate all time `ConfVar`s and convert their values to numeric strings according
|
||||
// to their output time units.
|
||||
Seq(
|
||||
ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS,
|
||||
ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS
|
||||
).map { case (confVar, unit) =>
|
||||
confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString
|
||||
}.toMap
|
||||
}
|
||||
|
||||
/**
|
||||
* SQLConf and HiveConf contracts:
|
||||
*
|
||||
* 1. create a new o.a.h.hive.ql.session.SessionState for each HiveContext
|
||||
* 1. create a new SessionState for each HiveContext
|
||||
* 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
|
||||
* SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
|
||||
* set in the SQLConf *as well as* in the HiveConf.
|
||||
|
@ -402,7 +600,7 @@ class HiveContext private[hive](
|
|||
}
|
||||
|
||||
|
||||
private[hive] object HiveContext extends Logging {
|
||||
private[hive] object HiveContext {
|
||||
/** The version of hive used internally by Spark SQL. */
|
||||
val hiveExecutionVersion: String = "1.2.1"
|
||||
|
||||
|
@ -468,242 +666,6 @@ private[hive] object HiveContext extends Logging {
|
|||
defaultValue = Some(true),
|
||||
doc = "When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
|
||||
|
||||
/**
|
||||
* The version of the hive client that will be used to communicate with the metastore. Note that
|
||||
* this does not necessarily need to be the same version of Hive that is used internally by
|
||||
* Spark SQL for execution.
|
||||
*/
|
||||
private def hiveMetastoreVersion(conf: SQLConf): String = {
|
||||
conf.getConf(HIVE_METASTORE_VERSION)
|
||||
}
|
||||
|
||||
/**
|
||||
* The location of the jars that should be used to instantiate the HiveMetastoreClient. This
|
||||
* property can be one of three options:
|
||||
* - a classpath in the standard format for both hive and hadoop.
|
||||
* - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
|
||||
* option is only valid when using the execution version of Hive.
|
||||
* - maven - download the correct version of hive on demand from maven.
|
||||
*/
|
||||
private def hiveMetastoreJars(conf: SQLConf): String = {
|
||||
conf.getConf(HIVE_METASTORE_JARS)
|
||||
}
|
||||
|
||||
/**
|
||||
* A comma separated list of class prefixes that should be loaded using the classloader that
|
||||
* is shared between Spark SQL and a specific version of Hive. An example of classes that should
|
||||
* be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
|
||||
* to be shared are those that interact with classes that are already shared. For example,
|
||||
* custom appenders that are used by log4j.
|
||||
*/
|
||||
private def hiveMetastoreSharedPrefixes(conf: SQLConf): Seq[String] = {
|
||||
conf.getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "")
|
||||
}
|
||||
|
||||
/**
|
||||
* A comma separated list of class prefixes that should explicitly be reloaded for each version
|
||||
* of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
|
||||
* prefix that typically would be shared (i.e. org.apache.spark.*)
|
||||
*/
|
||||
private def hiveMetastoreBarrierPrefixes(conf: SQLConf): Seq[String] = {
|
||||
conf.getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "")
|
||||
}
|
||||
|
||||
/**
|
||||
* Configurations needed to create a [[HiveClient]].
|
||||
*/
|
||||
private[hive] def hiveClientConfigurations(hiveconf: HiveConf): Map[String, String] = {
|
||||
// Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
|
||||
// of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
|
||||
// compatibility when users are trying to connecting to a Hive metastore of lower version,
|
||||
// because these options are expected to be integral values in lower versions of Hive.
|
||||
//
|
||||
// Here we enumerate all time `ConfVar`s and convert their values to numeric strings according
|
||||
// to their output time units.
|
||||
Seq(
|
||||
ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS,
|
||||
ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS,
|
||||
ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS,
|
||||
ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS,
|
||||
ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS
|
||||
).map { case (confVar, unit) =>
|
||||
confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString
|
||||
}.toMap
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a [[HiveClient]] used for execution.
|
||||
*
|
||||
* Currently this must always be Hive 13 as this is the version of Hive that is packaged
|
||||
* with Spark SQL. This copy of the client is used for execution related tasks like
|
||||
* registering temporary functions or ensuring that the ThreadLocal SessionState is
|
||||
* correctly populated. This copy of Hive is *not* used for storing persistent metadata,
|
||||
* and only point to a dummy metastore in a temporary directory.
|
||||
*/
|
||||
protected[hive] def newClientForExecution(
|
||||
conf: SparkConf,
|
||||
hadoopConf: Configuration): HiveClientImpl = {
|
||||
logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
|
||||
val loader = new IsolatedClientLoader(
|
||||
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
|
||||
sparkConf = conf,
|
||||
execJars = Seq(),
|
||||
hadoopConf = hadoopConf,
|
||||
config = newTemporaryConfiguration(useInMemoryDerby = true),
|
||||
isolationOn = false,
|
||||
baseClassLoader = Utils.getContextOrSparkClassLoader)
|
||||
loader.createClient().asInstanceOf[HiveClientImpl]
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
|
||||
*
|
||||
* The version of the Hive client that is used here must match the metastore that is configured
|
||||
* in the hive-site.xml file.
|
||||
*/
|
||||
private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = {
|
||||
val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
|
||||
val configurations = hiveClientConfigurations(hiveConf)
|
||||
newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
|
||||
}
|
||||
|
||||
protected[hive] def newClientForMetadata(
|
||||
conf: SparkConf,
|
||||
hiveConf: HiveConf,
|
||||
hadoopConf: Configuration,
|
||||
configurations: Map[String, String]): HiveClient = {
|
||||
val sqlConf = new SQLConf
|
||||
sqlConf.setConf(SQLContext.getSQLProperties(conf))
|
||||
val hiveMetastoreVersion = HiveContext.hiveMetastoreVersion(sqlConf)
|
||||
val hiveMetastoreJars = HiveContext.hiveMetastoreJars(sqlConf)
|
||||
val hiveMetastoreSharedPrefixes = HiveContext.hiveMetastoreSharedPrefixes(sqlConf)
|
||||
val hiveMetastoreBarrierPrefixes = HiveContext.hiveMetastoreBarrierPrefixes(sqlConf)
|
||||
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
|
||||
|
||||
val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir")
|
||||
logInfo("default warehouse location is " + defaultWarehouseLocation)
|
||||
|
||||
// `configure` goes second to override other settings.
|
||||
val allConfig = hiveConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configurations
|
||||
|
||||
val isolatedLoader = if (hiveMetastoreJars == "builtin") {
|
||||
if (hiveExecutionVersion != hiveMetastoreVersion) {
|
||||
throw new IllegalArgumentException(
|
||||
"Builtin jars can only be used when hive execution version == hive metastore version. " +
|
||||
s"Execution: $hiveExecutionVersion != Metastore: $hiveMetastoreVersion. " +
|
||||
"Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
|
||||
s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.")
|
||||
}
|
||||
|
||||
// We recursively find all jars in the class loader chain,
|
||||
// starting from the given classLoader.
|
||||
def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
|
||||
case null => Array.empty[URL]
|
||||
case urlClassLoader: URLClassLoader =>
|
||||
urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
|
||||
case other => allJars(other.getParent)
|
||||
}
|
||||
|
||||
val classLoader = Utils.getContextOrSparkClassLoader
|
||||
val jars = allJars(classLoader)
|
||||
if (jars.length == 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"Unable to locate hive jars to connect to metastore. " +
|
||||
"Please set spark.sql.hive.metastore.jars.")
|
||||
}
|
||||
|
||||
logInfo(
|
||||
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
|
||||
new IsolatedClientLoader(
|
||||
version = metaVersion,
|
||||
sparkConf = conf,
|
||||
hadoopConf = hadoopConf,
|
||||
execJars = jars.toSeq,
|
||||
config = allConfig,
|
||||
isolationOn = true,
|
||||
barrierPrefixes = hiveMetastoreBarrierPrefixes,
|
||||
sharedPrefixes = hiveMetastoreSharedPrefixes)
|
||||
} else if (hiveMetastoreJars == "maven") {
|
||||
// TODO: Support for loading the jars from an already downloaded location.
|
||||
logInfo(
|
||||
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
|
||||
IsolatedClientLoader.forVersion(
|
||||
hiveMetastoreVersion = hiveMetastoreVersion,
|
||||
hadoopVersion = VersionInfo.getVersion,
|
||||
sparkConf = conf,
|
||||
hadoopConf = hadoopConf,
|
||||
config = allConfig,
|
||||
barrierPrefixes = hiveMetastoreBarrierPrefixes,
|
||||
sharedPrefixes = hiveMetastoreSharedPrefixes)
|
||||
} else {
|
||||
// Convert to files and expand any directories.
|
||||
val jars =
|
||||
hiveMetastoreJars
|
||||
.split(File.pathSeparator)
|
||||
.flatMap {
|
||||
case path if new File(path).getName == "*" =>
|
||||
val files = new File(path).getParentFile.listFiles()
|
||||
if (files == null) {
|
||||
logWarning(s"Hive jar path '$path' does not exist.")
|
||||
Nil
|
||||
} else {
|
||||
files.filter(_.getName.toLowerCase.endsWith(".jar"))
|
||||
}
|
||||
case path =>
|
||||
new File(path) :: Nil
|
||||
}
|
||||
.map(_.toURI.toURL)
|
||||
|
||||
logInfo(
|
||||
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
|
||||
s"using ${jars.mkString(":")}")
|
||||
new IsolatedClientLoader(
|
||||
version = metaVersion,
|
||||
sparkConf = conf,
|
||||
hadoopConf = hadoopConf,
|
||||
execJars = jars.toSeq,
|
||||
config = allConfig,
|
||||
isolationOn = true,
|
||||
barrierPrefixes = hiveMetastoreBarrierPrefixes,
|
||||
sharedPrefixes = hiveMetastoreSharedPrefixes)
|
||||
}
|
||||
isolatedLoader.createClient()
|
||||
}
|
||||
|
||||
/** Constructs a configuration for hive, where the metastore is located in a temp directory. */
|
||||
def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String] = {
|
||||
val withInMemoryMode = if (useInMemoryDerby) "memory:" else ""
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
|
|||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
|
||||
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
|
||||
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
|
||||
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
|
||||
import org.apache.spark.sql.catalyst.catalog._
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.parser.DataTypeParser
|
||||
|
@ -98,33 +98,27 @@ private[hive] object HiveSerDe {
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* Legacy catalog for interacting with the Hive metastore.
|
||||
*
|
||||
* This is still used for things like creating data source tables, but in the future will be
|
||||
* cleaned up to integrate more nicely with [[HiveCatalog]].
|
||||
*/
|
||||
// TODO: replace this with o.a.s.sql.hive.HiveCatalog once we merge SQLContext and HiveContext
|
||||
private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
|
||||
extends Logging {
|
||||
extends Catalog with Logging {
|
||||
|
||||
val conf = hive.conf
|
||||
|
||||
/** Usages should lock on `this`. */
|
||||
protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
|
||||
|
||||
/** A fully qualified identifier for a table (i.e., database.tableName) */
|
||||
case class QualifiedTableName(database: String, name: String)
|
||||
|
||||
private def getCurrentDatabase: String = {
|
||||
hive.sessionState.catalog.getCurrentDatabase
|
||||
}
|
||||
|
||||
def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
|
||||
private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
|
||||
QualifiedTableName(
|
||||
tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase,
|
||||
tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
|
||||
tableIdent.table.toLowerCase)
|
||||
}
|
||||
|
||||
private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
|
||||
QualifiedTableName(
|
||||
t.name.database.getOrElse(getCurrentDatabase).toLowerCase,
|
||||
t.name.database.getOrElse(client.currentDatabase).toLowerCase,
|
||||
t.name.table.toLowerCase)
|
||||
}
|
||||
|
||||
|
@ -200,7 +194,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
|
|||
CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
|
||||
}
|
||||
|
||||
def refreshTable(tableIdent: TableIdentifier): Unit = {
|
||||
override def refreshTable(tableIdent: TableIdentifier): Unit = {
|
||||
// refreshTable does not eagerly reload the cache. It just invalidate the cache.
|
||||
// Next time when we use the table, it will be populated in the cache.
|
||||
// Since we also cache ParquetRelations converted from Hive Parquet tables and
|
||||
|
@ -414,7 +408,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
|
|||
new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString
|
||||
}
|
||||
|
||||
def lookupRelation(
|
||||
override def tableExists(tableIdent: TableIdentifier): Boolean = {
|
||||
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
|
||||
client.getTableOption(dbName, tblName).isDefined
|
||||
}
|
||||
|
||||
override def lookupRelation(
|
||||
tableIdent: TableIdentifier,
|
||||
alias: Option[String]): LogicalPlan = {
|
||||
val qualifiedTableName = getQualifiedTableName(tableIdent)
|
||||
|
@ -556,6 +555,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
|
|||
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
|
||||
}
|
||||
|
||||
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
|
||||
val db = databaseName.getOrElse(client.currentDatabase)
|
||||
|
||||
client.listTables(db).map(tableName => (tableName, false))
|
||||
}
|
||||
|
||||
/**
|
||||
* When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
|
||||
* data source relations for better performance.
|
||||
|
@ -711,6 +716,27 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
|
||||
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
|
||||
*/
|
||||
override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
/**
|
||||
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
|
||||
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
|
||||
*/
|
||||
override def unregisterTable(tableIdent: TableIdentifier): Unit = {
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
override def unregisterAllTables(): Unit = {}
|
||||
|
||||
override def setCurrentDatabase(databaseName: String): Unit = {
|
||||
client.setCurrentDatabase(databaseName)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,104 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hive
|
||||
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.execution.datasources.BucketSpec
|
||||
import org.apache.spark.sql.hive.client.HiveClient
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
|
||||
class HiveSessionCatalog(
|
||||
externalCatalog: HiveCatalog,
|
||||
client: HiveClient,
|
||||
context: HiveContext,
|
||||
conf: SQLConf)
|
||||
extends SessionCatalog(externalCatalog, conf) {
|
||||
|
||||
override def setCurrentDatabase(db: String): Unit = {
|
||||
super.setCurrentDatabase(db)
|
||||
client.setCurrentDatabase(db)
|
||||
}
|
||||
|
||||
override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = {
|
||||
val table = formatTableName(name.table)
|
||||
if (name.database.isDefined || !tempTables.containsKey(table)) {
|
||||
val newName = name.copy(table = table)
|
||||
metastoreCatalog.lookupRelation(newName, alias)
|
||||
} else {
|
||||
val relation = tempTables.get(table)
|
||||
val tableWithQualifiers = SubqueryAlias(table, relation)
|
||||
// If an alias was specified by the lookup, wrap the plan in a subquery so that
|
||||
// attributes are properly qualified with this alias.
|
||||
alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------
|
||||
// | Methods and fields for interacting with HiveMetastoreCatalog |
|
||||
// ----------------------------------------------------------------
|
||||
|
||||
// Catalog for handling data source tables. TODO: This really doesn't belong here since it is
|
||||
// essentially a cache for metastore tables. However, it relies on a lot of session-specific
|
||||
// things so it would be a lot of work to split its functionality between HiveSessionCatalog
|
||||
// and HiveCatalog. We should still do it at some point...
|
||||
private val metastoreCatalog = new HiveMetastoreCatalog(client, context)
|
||||
|
||||
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
|
||||
val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
|
||||
val PreInsertionCasts: Rule[LogicalPlan] = metastoreCatalog.PreInsertionCasts
|
||||
|
||||
override def refreshTable(name: TableIdentifier): Unit = {
|
||||
metastoreCatalog.refreshTable(name)
|
||||
}
|
||||
|
||||
def invalidateTable(name: TableIdentifier): Unit = {
|
||||
metastoreCatalog.invalidateTable(name)
|
||||
}
|
||||
|
||||
def invalidateCache(): Unit = {
|
||||
metastoreCatalog.cachedDataSourceTables.invalidateAll()
|
||||
}
|
||||
|
||||
def createDataSourceTable(
|
||||
name: TableIdentifier,
|
||||
userSpecifiedSchema: Option[StructType],
|
||||
partitionColumns: Array[String],
|
||||
bucketSpec: Option[BucketSpec],
|
||||
provider: String,
|
||||
options: Map[String, String],
|
||||
isExternal: Boolean): Unit = {
|
||||
metastoreCatalog.createDataSourceTable(
|
||||
name, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options, isExternal)
|
||||
}
|
||||
|
||||
def hiveDefaultTableFilePath(name: TableIdentifier): String = {
|
||||
metastoreCatalog.hiveDefaultTableFilePath(name)
|
||||
}
|
||||
|
||||
// For testing only
|
||||
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
|
||||
val key = metastoreCatalog.getQualifiedTableName(table)
|
||||
metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
|
||||
}
|
||||
|
||||
}
|
|
@ -18,7 +18,7 @@
|
|||
package org.apache.spark.sql.hive
|
||||
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
|
||||
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog}
|
||||
import org.apache.spark.sql.catalyst.parser.ParserInterface
|
||||
import org.apache.spark.sql.execution.{python, SparkPlanner}
|
||||
import org.apache.spark.sql.execution.datasources._
|
||||
|
@ -35,11 +35,9 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
|
|||
}
|
||||
|
||||
/**
|
||||
* Internal catalog for managing table and database states.
|
||||
* A metadata catalog that points to the Hive metastore.
|
||||
*/
|
||||
override lazy val catalog = {
|
||||
new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, conf)
|
||||
}
|
||||
override lazy val catalog = new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog
|
||||
|
||||
/**
|
||||
* Internal catalog for managing functions registered by the user.
|
||||
|
@ -63,7 +61,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
|
|||
DataSourceAnalysis ::
|
||||
(if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
|
||||
|
||||
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
|
||||
override val extendedCheckRules = Seq(PreWriteCheck(catalog))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -53,6 +53,9 @@ private[hive] trait HiveClient {
|
|||
/** Returns the names of tables in the given database that matches the given pattern. */
|
||||
def listTables(dbName: String, pattern: String): Seq[String]
|
||||
|
||||
/** Returns the name of the active database. */
|
||||
def currentDatabase: String
|
||||
|
||||
/** Sets the name of current database. */
|
||||
def setCurrentDatabase(databaseName: String): Unit
|
||||
|
||||
|
|
|
@ -241,6 +241,10 @@ private[hive] class HiveClientImpl(
|
|||
state.err = stream
|
||||
}
|
||||
|
||||
override def currentDatabase: String = withHiveState {
|
||||
state.getCurrentDatabase
|
||||
}
|
||||
|
||||
override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
|
||||
if (getDatabaseOption(databaseName).isDefined) {
|
||||
state.setCurrentDatabase(databaseName)
|
||||
|
|
|
@ -69,10 +69,10 @@ case class CreateTableAsSelect(
|
|||
withFormat
|
||||
}
|
||||
|
||||
hiveContext.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
|
||||
hiveContext.sessionState.catalog.client.createTable(withSchema, ignoreIfExists = false)
|
||||
|
||||
// Get the Metastore Relation
|
||||
hiveContext.sessionState.catalog.lookupRelation(tableIdentifier) match {
|
||||
hiveContext.sessionState.catalog.lookupRelation(tableIdentifier, None) match {
|
||||
case r: MetastoreRelation => r
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ private[hive] case class CreateViewAsSelect(
|
|||
|
||||
case true if orReplace =>
|
||||
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
|
||||
hiveContext.metadataHive.alertView(prepareTable(sqlContext))
|
||||
hiveContext.sessionState.catalog.client.alertView(prepareTable(sqlContext))
|
||||
|
||||
case true =>
|
||||
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
|
||||
|
@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect(
|
|||
"CREATE OR REPLACE VIEW AS")
|
||||
|
||||
case false =>
|
||||
hiveContext.metadataHive.createView(prepareTable(sqlContext))
|
||||
hiveContext.sessionState.catalog.client.createView(prepareTable(sqlContext))
|
||||
}
|
||||
|
||||
Seq.empty[Row]
|
||||
|
|
|
@ -45,7 +45,7 @@ case class InsertIntoHiveTable(
|
|||
|
||||
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
|
||||
@transient private lazy val hiveContext = new Context(sc.hiveconf)
|
||||
@transient private lazy val client = sc.metadataHive
|
||||
@transient private lazy val catalog = sc.sessionState.catalog
|
||||
|
||||
def output: Seq[Attribute] = Seq.empty
|
||||
|
||||
|
@ -186,8 +186,8 @@ case class InsertIntoHiveTable(
|
|||
// TODO: Correctly set isSkewedStoreAsSubdir.
|
||||
val isSkewedStoreAsSubdir = false
|
||||
if (numDynamicPartitions > 0) {
|
||||
client.synchronized {
|
||||
client.loadDynamicPartitions(
|
||||
catalog.synchronized {
|
||||
catalog.client.loadDynamicPartitions(
|
||||
outputPath.toString,
|
||||
qualifiedTableName,
|
||||
orderedPartitionSpec,
|
||||
|
@ -202,12 +202,12 @@ case class InsertIntoHiveTable(
|
|||
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
|
||||
// scalastyle:on
|
||||
val oldPart =
|
||||
client.getPartitionOption(
|
||||
client.getTable(table.databaseName, table.tableName),
|
||||
catalog.client.getPartitionOption(
|
||||
catalog.client.getTable(table.databaseName, table.tableName),
|
||||
partitionSpec)
|
||||
|
||||
if (oldPart.isEmpty || !ifNotExists) {
|
||||
client.loadPartition(
|
||||
catalog.client.loadPartition(
|
||||
outputPath.toString,
|
||||
qualifiedTableName,
|
||||
orderedPartitionSpec,
|
||||
|
@ -218,7 +218,7 @@ case class InsertIntoHiveTable(
|
|||
}
|
||||
}
|
||||
} else {
|
||||
client.loadTable(
|
||||
catalog.client.loadTable(
|
||||
outputPath.toString, // TODO: URI
|
||||
qualifiedTableName,
|
||||
overwrite,
|
||||
|
|
|
@ -71,8 +71,7 @@ case class DropTable(
|
|||
}
|
||||
hiveContext.invalidateTable(tableName)
|
||||
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
|
||||
hiveContext.sessionState.catalog.dropTable(
|
||||
TableIdentifier(tableName), ignoreIfNotExists = true)
|
||||
hiveContext.sessionState.catalog.unregisterTable(TableIdentifier(tableName))
|
||||
Seq.empty[Row]
|
||||
}
|
||||
}
|
||||
|
@ -143,8 +142,7 @@ case class CreateMetastoreDataSource(
|
|||
val optionsWithPath =
|
||||
if (!options.contains("path") && managedIfNoPath) {
|
||||
isExternal = false
|
||||
options + ("path" ->
|
||||
hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
|
||||
options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
|
||||
} else {
|
||||
options
|
||||
}
|
||||
|
@ -202,8 +200,7 @@ case class CreateMetastoreDataSourceAsSelect(
|
|||
val optionsWithPath =
|
||||
if (!options.contains("path")) {
|
||||
isExternal = false
|
||||
options + ("path" ->
|
||||
hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
|
||||
options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
|
||||
} else {
|
||||
options
|
||||
}
|
||||
|
|
|
@ -24,8 +24,6 @@ import scala.collection.JavaConverters._
|
|||
import scala.collection.mutable
|
||||
import scala.language.implicitConversions
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
|
||||
import org.apache.hadoop.hive.ql.exec.FunctionRegistry
|
||||
import org.apache.hadoop.hive.ql.processors._
|
||||
|
@ -37,11 +35,9 @@ import org.apache.spark.sql.catalyst.analysis._
|
|||
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
|
||||
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.execution.CacheManager
|
||||
import org.apache.spark.sql.execution.command.CacheTableCommand
|
||||
import org.apache.spark.sql.execution.ui.SQLListener
|
||||
import org.apache.spark.sql.hive._
|
||||
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
|
||||
import org.apache.spark.sql.hive.client.HiveClientImpl
|
||||
import org.apache.spark.sql.hive.execution.HiveNativeCommand
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.util.{ShutdownHookManager, Utils}
|
||||
|
@ -75,77 +71,10 @@ trait TestHiveSingleton {
|
|||
* hive metastore seems to lead to weird non-deterministic failures. Therefore, the execution of
|
||||
* test cases that rely on TestHive must be serialized.
|
||||
*/
|
||||
class TestHiveContext private[hive](
|
||||
sc: SparkContext,
|
||||
cacheManager: CacheManager,
|
||||
listener: SQLListener,
|
||||
executionHive: HiveClientImpl,
|
||||
metadataHive: HiveClient,
|
||||
isRootContext: Boolean,
|
||||
hiveCatalog: HiveCatalog,
|
||||
val warehousePath: File,
|
||||
val scratchDirPath: File)
|
||||
extends HiveContext(
|
||||
sc,
|
||||
cacheManager,
|
||||
listener,
|
||||
executionHive,
|
||||
metadataHive,
|
||||
isRootContext,
|
||||
hiveCatalog) { self =>
|
||||
class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
|
||||
self =>
|
||||
|
||||
// Unfortunately, due to the complex interactions between the construction parameters
|
||||
// and the limitations in scala constructors, we need many of these constructors to
|
||||
// provide a shorthand to create a new TestHiveContext with only a SparkContext.
|
||||
// This is not a great design pattern but it's necessary here.
|
||||
|
||||
private def this(
|
||||
sc: SparkContext,
|
||||
executionHive: HiveClientImpl,
|
||||
metadataHive: HiveClient,
|
||||
warehousePath: File,
|
||||
scratchDirPath: File) {
|
||||
this(
|
||||
sc,
|
||||
new CacheManager,
|
||||
SQLContext.createListenerAndUI(sc),
|
||||
executionHive,
|
||||
metadataHive,
|
||||
true,
|
||||
new HiveCatalog(metadataHive),
|
||||
warehousePath,
|
||||
scratchDirPath)
|
||||
}
|
||||
|
||||
private def this(sc: SparkContext, warehousePath: File, scratchDirPath: File) {
|
||||
this(
|
||||
sc,
|
||||
HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
|
||||
TestHiveContext.newClientForMetadata(
|
||||
sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath),
|
||||
warehousePath,
|
||||
scratchDirPath)
|
||||
}
|
||||
|
||||
def this(sc: SparkContext) {
|
||||
this(
|
||||
sc,
|
||||
Utils.createTempDir(namePrefix = "warehouse"),
|
||||
TestHiveContext.makeScratchDir())
|
||||
}
|
||||
|
||||
override def newSession(): HiveContext = {
|
||||
new TestHiveContext(
|
||||
sc = sc,
|
||||
cacheManager = cacheManager,
|
||||
listener = listener,
|
||||
executionHive = executionHive.newSession(),
|
||||
metadataHive = metadataHive.newSession(),
|
||||
isRootContext = false,
|
||||
hiveCatalog = hiveCatalog,
|
||||
warehousePath = warehousePath,
|
||||
scratchDirPath = scratchDirPath)
|
||||
}
|
||||
import HiveContext._
|
||||
|
||||
// By clearing the port we force Spark to pick a new one. This allows us to rerun tests
|
||||
// without restarting the JVM.
|
||||
|
@ -154,6 +83,26 @@ class TestHiveContext private[hive](
|
|||
|
||||
hiveconf.set("hive.plan.serialization.format", "javaXML")
|
||||
|
||||
lazy val warehousePath = Utils.createTempDir(namePrefix = "warehouse-")
|
||||
|
||||
lazy val scratchDirPath = {
|
||||
val dir = Utils.createTempDir(namePrefix = "scratch-")
|
||||
dir.delete()
|
||||
dir
|
||||
}
|
||||
|
||||
private lazy val temporaryConfig = newTemporaryConfiguration(useInMemoryDerby = false)
|
||||
|
||||
/** Sets up the system initially or after a RESET command */
|
||||
protected override def configure(): Map[String, String] = {
|
||||
super.configure() ++ temporaryConfig ++ Map(
|
||||
ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
|
||||
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
|
||||
ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,
|
||||
ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1"
|
||||
)
|
||||
}
|
||||
|
||||
val testTempDir = Utils.createTempDir()
|
||||
|
||||
// For some hive test case which contain ${system:test.tmp.dir}
|
||||
|
@ -478,9 +427,9 @@ class TestHiveContext private[hive](
|
|||
|
||||
cacheManager.clearCache()
|
||||
loadedTables.clear()
|
||||
sessionState.catalog.clearTempTables()
|
||||
sessionState.catalog.invalidateCache()
|
||||
metadataHive.reset()
|
||||
sessionState.catalog.cachedDataSourceTables.invalidateAll()
|
||||
sessionState.catalog.client.reset()
|
||||
sessionState.catalog.unregisterAllTables()
|
||||
|
||||
FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
|
||||
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
|
||||
|
@ -499,8 +448,13 @@ class TestHiveContext private[hive](
|
|||
// Lots of tests fail if we do not change the partition whitelist from the default.
|
||||
runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*")
|
||||
|
||||
configure().foreach {
|
||||
case (k, v) =>
|
||||
metadataHive.runSqlHive(s"SET $k=$v")
|
||||
}
|
||||
defaultOverrides()
|
||||
sessionState.catalog.setCurrentDatabase("default")
|
||||
|
||||
runSqlHive("USE default")
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logError("FATAL ERROR: Failed to reset TestDB state.", e)
|
||||
|
@ -536,43 +490,4 @@ private[hive] object TestHiveContext {
|
|||
// Fewer shuffle partitions to speed up testing.
|
||||
SQLConf.SHUFFLE_PARTITIONS.key -> "5"
|
||||
)
|
||||
|
||||
/**
|
||||
* Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
|
||||
*/
|
||||
private def newClientForMetadata(
|
||||
conf: SparkConf,
|
||||
hadoopConf: Configuration,
|
||||
warehousePath: File,
|
||||
scratchDirPath: File): HiveClient = {
|
||||
val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
|
||||
HiveContext.newClientForMetadata(
|
||||
conf,
|
||||
hiveConf,
|
||||
hadoopConf,
|
||||
hiveClientConfigurations(hiveConf, warehousePath, scratchDirPath))
|
||||
}
|
||||
|
||||
/**
|
||||
* Configurations needed to create a [[HiveClient]].
|
||||
*/
|
||||
private def hiveClientConfigurations(
|
||||
hiveconf: HiveConf,
|
||||
warehousePath: File,
|
||||
scratchDirPath: File): Map[String, String] = {
|
||||
HiveContext.hiveClientConfigurations(hiveconf) ++
|
||||
HiveContext.newTemporaryConfiguration(useInMemoryDerby = false) ++ Map(
|
||||
ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
|
||||
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
|
||||
ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,
|
||||
ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1"
|
||||
)
|
||||
}
|
||||
|
||||
private def makeScratchDir(): File = {
|
||||
val scratchDir = Utils.createTempDir(namePrefix = "scratch")
|
||||
scratchDir.delete()
|
||||
scratchDir
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -70,9 +70,8 @@ public class JavaMetastoreDataSourcesSuite {
|
|||
if (path.exists()) {
|
||||
path.delete();
|
||||
}
|
||||
hiveManagedPath = new Path(
|
||||
sqlContext.sessionState().catalog().hiveDefaultTableFilePath(
|
||||
new TableIdentifier("javaSavedTable")));
|
||||
hiveManagedPath = new Path(sqlContext.sessionState().catalog().hiveDefaultTableFilePath(
|
||||
new TableIdentifier("javaSavedTable")));
|
||||
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
|
||||
if (fs.exists(hiveManagedPath)){
|
||||
fs.delete(hiveManagedPath, true);
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hive
|
||||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.sql.hive.test.TestHive
|
||||
|
||||
|
||||
class HiveContextSuite extends SparkFunSuite {
|
||||
|
||||
// TODO: investigate; this passes locally but fails on Jenkins for some reason.
|
||||
ignore("HiveContext can access `spark.sql.*` configs") {
|
||||
// Avoid creating another SparkContext in the same JVM
|
||||
val sc = TestHive.sparkContext
|
||||
require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") ==
|
||||
"org.apache.spark.sql.hive.execution.PairSerDe")
|
||||
assert(TestHive.getConf("spark.sql.hive.metastore.barrierPrefixes") ==
|
||||
"org.apache.spark.sql.hive.execution.PairSerDe")
|
||||
assert(TestHive.metadataHive.getConf("spark.sql.hive.metastore.barrierPrefixes", "") ==
|
||||
"org.apache.spark.sql.hive.execution.PairSerDe")
|
||||
}
|
||||
|
||||
}
|
|
@ -21,7 +21,6 @@ import java.io.File
|
|||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
|
||||
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
@ -84,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
|
|||
.saveAsTable("t")
|
||||
}
|
||||
|
||||
val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
|
||||
val hiveTable = sessionState.catalog.client.getTable("default", "t")
|
||||
assert(hiveTable.storage.inputFormat === Some(inputFormat))
|
||||
assert(hiveTable.storage.outputFormat === Some(outputFormat))
|
||||
assert(hiveTable.storage.serde === Some(serde))
|
||||
|
@ -115,8 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
|
|||
.saveAsTable("t")
|
||||
}
|
||||
|
||||
val hiveTable =
|
||||
sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
|
||||
val hiveTable = sessionState.catalog.client.getTable("default", "t")
|
||||
assert(hiveTable.storage.inputFormat === Some(inputFormat))
|
||||
assert(hiveTable.storage.outputFormat === Some(outputFormat))
|
||||
assert(hiveTable.storage.serde === Some(serde))
|
||||
|
@ -146,8 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
|
|||
|AS SELECT 1 AS d1, "val_1" AS d2
|
||||
""".stripMargin)
|
||||
|
||||
val hiveTable =
|
||||
sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
|
||||
val hiveTable = sessionState.catalog.client.getTable("default", "t")
|
||||
assert(hiveTable.storage.inputFormat === Some(inputFormat))
|
||||
assert(hiveTable.storage.outputFormat === Some(outputFormat))
|
||||
assert(hiveTable.storage.serde === Some(serde))
|
||||
|
|
|
@ -32,16 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
|
|||
|
||||
override def beforeAll(): Unit = {
|
||||
// The catalog in HiveContext is a case insensitive one.
|
||||
sessionState.catalog.createTempTable(
|
||||
"ListTablesSuiteTable", df.logicalPlan, ignoreIfExists = true)
|
||||
sessionState.catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
|
||||
sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
|
||||
sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")
|
||||
sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)")
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
sessionState.catalog.dropTable(
|
||||
TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
|
||||
sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
|
||||
sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
|
||||
sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
|
||||
sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
|
||||
|
|
|
@ -693,13 +693,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
|||
test("SPARK-6024 wide schema support") {
|
||||
withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") {
|
||||
withTable("wide_schema") {
|
||||
withTempDir { tempDir =>
|
||||
withTempDir( tempDir => {
|
||||
// We will need 80 splits for this schema if the threshold is 4000.
|
||||
val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))
|
||||
|
||||
// Manually create a metastore data source table.
|
||||
sessionState.catalog.createDataSourceTable(
|
||||
name = TableIdentifier("wide_schema"),
|
||||
tableIdent = TableIdentifier("wide_schema"),
|
||||
userSpecifiedSchema = Some(schema),
|
||||
partitionColumns = Array.empty[String],
|
||||
bucketSpec = None,
|
||||
|
@ -711,7 +711,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
|||
|
||||
val actualSchema = table("wide_schema").schema
|
||||
assert(schema === actualSchema)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -737,7 +737,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
|||
"spark.sql.sources.schema" -> schema.json,
|
||||
"EXTERNAL" -> "FALSE"))
|
||||
|
||||
hiveCatalog.createTable("default", hiveTable, ignoreIfExists = false)
|
||||
sessionState.catalog.client.createTable(hiveTable, ignoreIfExists = false)
|
||||
|
||||
invalidateTable(tableName)
|
||||
val actualSchema = table(tableName).schema
|
||||
|
@ -752,7 +752,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
|||
withTable(tableName) {
|
||||
df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
|
||||
invalidateTable(tableName)
|
||||
val metastoreTable = hiveCatalog.getTable("default", tableName)
|
||||
val metastoreTable = sessionState.catalog.client.getTable("default", tableName)
|
||||
val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
|
||||
|
||||
val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt
|
||||
|
@ -787,7 +787,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
|||
.sortBy("c")
|
||||
.saveAsTable(tableName)
|
||||
invalidateTable(tableName)
|
||||
val metastoreTable = hiveCatalog.getTable("default", tableName)
|
||||
val metastoreTable = sessionState.catalog.client.getTable("default", tableName)
|
||||
val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
|
||||
val expectedSortByColumns = StructType(df.schema("c") :: Nil)
|
||||
|
||||
|
@ -903,11 +903,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
|||
|
||||
|
||||
test("skip hive metadata on table creation") {
|
||||
withTempDir { tempPath =>
|
||||
withTempDir(tempPath => {
|
||||
val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType)))
|
||||
|
||||
sessionState.catalog.createDataSourceTable(
|
||||
name = TableIdentifier("not_skip_hive_metadata"),
|
||||
tableIdent = TableIdentifier("not_skip_hive_metadata"),
|
||||
userSpecifiedSchema = Some(schema),
|
||||
partitionColumns = Array.empty[String],
|
||||
bucketSpec = None,
|
||||
|
@ -917,11 +917,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
|||
|
||||
// As a proxy for verifying that the table was stored in Hive compatible format,
|
||||
// we verify that each column of the table is of native type StringType.
|
||||
assert(hiveCatalog.getTable("default", "not_skip_hive_metadata").schema
|
||||
assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema
|
||||
.forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType))
|
||||
|
||||
sessionState.catalog.createDataSourceTable(
|
||||
name = TableIdentifier("skip_hive_metadata"),
|
||||
tableIdent = TableIdentifier("skip_hive_metadata"),
|
||||
userSpecifiedSchema = Some(schema),
|
||||
partitionColumns = Array.empty[String],
|
||||
bucketSpec = None,
|
||||
|
@ -929,11 +929,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
|||
options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "true"),
|
||||
isExternal = false)
|
||||
|
||||
// As a proxy for verifying that the table was stored in SparkSQL format,
|
||||
// we verify that the table has a column type as array of StringType.
|
||||
assert(hiveCatalog.getTable("default", "skip_hive_metadata").schema.forall { c =>
|
||||
HiveMetastoreTypes.toDataType(c.dataType) == ArrayType(StringType)
|
||||
})
|
||||
}
|
||||
// As a proxy for verifying that the table was stored in SparkSQL format, we verify that
|
||||
// the table has a column type as array of StringType.
|
||||
assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema
|
||||
.forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType)))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,8 +25,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
|
|||
private lazy val df = sqlContext.range(10).coalesce(1).toDF()
|
||||
|
||||
private def checkTablePath(dbName: String, tableName: String): Unit = {
|
||||
val metastoreTable = hiveContext.hiveCatalog.getTable(dbName, tableName)
|
||||
val expectedPath = hiveContext.hiveCatalog.getDatabase(dbName).locationUri + "/" + tableName
|
||||
val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName)
|
||||
val expectedPath =
|
||||
hiveContext.sessionState.catalog.client.getDatabase(dbName).locationUri + "/" + tableName
|
||||
|
||||
assert(metastoreTable.storage.serdeProperties("path") === expectedPath)
|
||||
}
|
||||
|
|
|
@ -121,8 +121,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
|
|||
intercept[UnsupportedOperationException] {
|
||||
hiveContext.analyze("tempTable")
|
||||
}
|
||||
hiveContext.sessionState.catalog.dropTable(
|
||||
TableIdentifier("tempTable"), ignoreIfNotExists = true)
|
||||
hiveContext.sessionState.catalog.unregisterTable(TableIdentifier("tempTable"))
|
||||
}
|
||||
|
||||
test("estimates the size of a test MetastoreRelation") {
|
||||
|
|
|
@ -171,6 +171,10 @@ class VersionsSuite extends SparkFunSuite with Logging {
|
|||
assert(client.listTables("default") === Seq("src"))
|
||||
}
|
||||
|
||||
test(s"$version: currentDatabase") {
|
||||
assert(client.currentDatabase === "default")
|
||||
}
|
||||
|
||||
test(s"$version: getDatabase") {
|
||||
client.getDatabase("default")
|
||||
}
|
||||
|
|
|
@ -49,7 +49,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
|
|||
import org.apache.spark.sql.hive.test.TestHive.implicits._
|
||||
|
||||
override def beforeAll() {
|
||||
super.beforeAll()
|
||||
TestHive.cacheTables = true
|
||||
// Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*)
|
||||
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
|
||||
|
@ -58,14 +57,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
|
|||
}
|
||||
|
||||
override def afterAll() {
|
||||
try {
|
||||
TestHive.cacheTables = false
|
||||
TimeZone.setDefault(originalTimeZone)
|
||||
Locale.setDefault(originalLocale)
|
||||
sql("DROP TEMPORARY FUNCTION udtf_count2")
|
||||
} finally {
|
||||
super.afterAll()
|
||||
}
|
||||
TestHive.cacheTables = false
|
||||
TimeZone.setDefault(originalTimeZone)
|
||||
Locale.setDefault(originalLocale)
|
||||
sql("DROP TEMPORARY FUNCTION udtf_count2")
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
test("SPARK-4908: concurrent hive native commands") {
|
||||
|
@ -1213,7 +1209,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
|
|||
sql("USE hive_test_db")
|
||||
assert("hive_test_db" == sql("select current_database()").first().getString(0))
|
||||
|
||||
intercept[AnalysisException] {
|
||||
intercept[NoSuchDatabaseException] {
|
||||
sql("USE not_existing_db")
|
||||
}
|
||||
|
||||
|
|
|
@ -1325,7 +1325,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
|||
.format("parquet")
|
||||
.save(path)
|
||||
|
||||
// We don't support creating a temporary table while specifying a database
|
||||
val message = intercept[AnalysisException] {
|
||||
sqlContext.sql(
|
||||
s"""
|
||||
|
@ -1336,8 +1335,9 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
|||
|)
|
||||
""".stripMargin)
|
||||
}.getMessage
|
||||
assert(message.contains("Specifying database name or other qualifiers are not allowed"))
|
||||
|
||||
// If you use backticks to quote the name then it's OK.
|
||||
// If you use backticks to quote the name of a temporary table having dot in it.
|
||||
sqlContext.sql(
|
||||
s"""
|
||||
|CREATE TEMPORARY TABLE `db.t`
|
||||
|
|
|
@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
|
|||
sql("INSERT INTO TABLE t SELECT * FROM tmp")
|
||||
checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
|
||||
}
|
||||
sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
|
||||
sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
|
||||
}
|
||||
|
||||
test("overwriting") {
|
||||
|
@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
|
|||
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
|
||||
checkAnswer(table("t"), data.map(Row.fromTuple))
|
||||
}
|
||||
sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
|
||||
sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
|
||||
}
|
||||
|
||||
test("self-join") {
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.spark.sql.hive
|
|||
import java.io.File
|
||||
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.execution.DataSourceScan
|
||||
import org.apache.spark.sql.execution.command.ExecutedCommand
|
||||
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
|
||||
|
@ -426,9 +425,10 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
}
|
||||
|
||||
test("Caching converted data source Parquet Relations") {
|
||||
def checkCached(tableIdentifier: TableIdentifier): Unit = {
|
||||
val _catalog = sessionState.catalog
|
||||
def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = {
|
||||
// Converted test_parquet should be cached.
|
||||
sessionState.catalog.getCachedDataSourceTable(tableIdentifier) match {
|
||||
sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
|
||||
case null => fail("Converted test_parquet should be cached in the cache.")
|
||||
case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => // OK
|
||||
case other =>
|
||||
|
@ -453,17 +453,17 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||
""".stripMargin)
|
||||
|
||||
var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default"))
|
||||
var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet")
|
||||
|
||||
// First, make sure the converted test_parquet is not cached.
|
||||
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
|
||||
assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
|
||||
// Table lookup will make the table cached.
|
||||
table("test_insert_parquet")
|
||||
checkCached(tableIdentifier)
|
||||
// For insert into non-partitioned table, we will do the conversion,
|
||||
// so the converted test_insert_parquet should be cached.
|
||||
invalidateTable("test_insert_parquet")
|
||||
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
|
||||
assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
|
||||
sql(
|
||||
"""
|
||||
|INSERT INTO TABLE test_insert_parquet
|
||||
|
@ -476,7 +476,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
sql("select a, b from jt").collect())
|
||||
// Invalidate the cache.
|
||||
invalidateTable("test_insert_parquet")
|
||||
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
|
||||
assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
|
||||
|
||||
// Create a partitioned table.
|
||||
sql(
|
||||
|
@ -493,8 +493,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||
""".stripMargin)
|
||||
|
||||
tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default"))
|
||||
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
|
||||
tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
|
||||
assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
|
||||
sql(
|
||||
"""
|
||||
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|
||||
|
@ -503,14 +503,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
""".stripMargin)
|
||||
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
|
||||
// So, we expect it is not cached.
|
||||
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
|
||||
assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
|
||||
sql(
|
||||
"""
|
||||
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|
||||
|PARTITION (`date`='2015-04-02')
|
||||
|select a, b from jt
|
||||
""".stripMargin)
|
||||
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
|
||||
assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
|
||||
|
||||
// Make sure we can cache the partitioned table.
|
||||
table("test_parquet_partitioned_cache_test")
|
||||
|
@ -526,7 +526,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
""".stripMargin).collect())
|
||||
|
||||
invalidateTable("test_parquet_partitioned_cache_test")
|
||||
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
|
||||
assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
|
||||
|
||||
dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue