diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index c8b61d8df3..19a8fcdd8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -83,14 +83,7 @@ class SessionCatalog( // check whether the temporary table or function exists, then, if not, operate on // the corresponding item in the current database. @GuardedBy("this") - protected var currentDb = { - val defaultName = DEFAULT_DATABASE - val defaultDbDefinition = - CatalogDatabase(defaultName, "default database", conf.warehousePath, Map()) - // Initialize default database if it doesn't already exist - createDatabase(defaultDbDefinition, ignoreIfExists = true) - formatDatabaseName(defaultName) - } + protected var currentDb = formatDatabaseName(DEFAULT_DATABASE) /** * Format table name, taking into account case sensitivity. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6372936bd7..b2a50c646b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -56,11 +56,6 @@ object SQLConf { } - val WAREHOUSE_PATH = SQLConfigBuilder("spark.sql.warehouse.dir") - .doc("The default location for managed databases and tables.") - .stringConf - .createWithDefault(Utils.resolveURI("spark-warehouse").toString) - val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations") .internal() .doc("The max number of iterations the optimizer and analyzer runs.") @@ -806,7 +801,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH) - def warehousePath: String = new Path(getConf(WAREHOUSE_PATH)).toString + def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES) @@ -951,6 +946,11 @@ object StaticSQLConf { } } + val WAREHOUSE_PATH = buildConf("spark.sql.warehouse.dir") + .doc("The default location for managed databases and tables.") + .stringConf + .createWithDefault(Utils.resolveURI("spark-warehouse").toString) + val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation") .internal() .stringConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index c6083b372a..6232c18b1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -23,10 +23,9 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext, SparkException} -import org.apache.spark.internal.config._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager, InMemoryCatalog} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ @@ -40,34 +39,35 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. - { + val warehousePath = { val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") if (configFile != null) { sparkContext.hadoopConfiguration.addResource(configFile) } // Set the Hive metastore warehouse path to the one we use - val tempConf = new SQLConf - sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) } val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") - if (hiveWarehouseDir != null && !tempConf.contains(SQLConf.WAREHOUSE_PATH.key)) { + if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) { // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, // we will respect the value of hive.metastore.warehouse.dir. - tempConf.setConfString(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir) - sparkContext.conf.set(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir) - logInfo(s"${SQLConf.WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " + - s"is set. Setting ${SQLConf.WAREHOUSE_PATH.key} to the value of " + + sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) + logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " + + s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " + s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") + hiveWarehouseDir } else { // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using // the value of spark.sql.warehouse.dir. // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set, // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. - sparkContext.conf.set("hive.metastore.warehouse.dir", tempConf.warehousePath) + val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH) + sparkContext.conf.set("hive.metastore.warehouse.dir", sparkWarehouseDir) + sparkWarehouseDir } - logInfo(s"Warehouse path is '${tempConf.warehousePath}'.") } + logInfo(s"Warehouse path is '$warehousePath'.") + /** * Class for caching query results reused in future executions. @@ -88,6 +88,14 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { sparkContext.conf, sparkContext.hadoopConfiguration) + // Create the default database if it doesn't exist. + { + val defaultDbDefinition = CatalogDatabase( + SessionCatalog.DEFAULT_DATABASE, "default database", warehousePath, Map()) + // Initialize default database if it doesn't already exist + externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true) + } + /** * A manager for global temporary views. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 363715c6d2..a010739874 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -125,17 +125,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert("file" === pathInCatalog.getScheme) val expectedPath = new Path(path).toUri assert(expectedPath.getPath === pathInCatalog.getPath) - - withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { - sql(s"CREATE DATABASE db2") - val pathInCatalog2 = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri - assert("file" === pathInCatalog2.getScheme) - val expectedPath2 = new Path(spark.sessionState.conf.warehousePath + "/" + "db2.db").toUri - assert(expectedPath2.getPath === pathInCatalog2.getPath) - } - sql("DROP DATABASE db1") - sql("DROP DATABASE db2") } } @@ -146,55 +136,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { fs.makeQualified(hadoopPath).toString } - test("Create/Drop Database") { - withTempDir { tmpDir => - val path = tmpDir.getCanonicalPath - withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { - val catalog = spark.sessionState.catalog - val databaseNames = Seq("db1", "`database`") - - databaseNames.foreach { dbName => - try { - val dbNameWithoutBackTicks = cleanIdentifier(dbName) - - sql(s"CREATE DATABASE $dbName") - val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) - val expectedLocation = makeQualifiedPath(s"$path/$dbNameWithoutBackTicks.db") - assert(db1 == CatalogDatabase( - dbNameWithoutBackTicks, - "", - expectedLocation, - Map.empty)) - sql(s"DROP DATABASE $dbName CASCADE") - assert(!catalog.databaseExists(dbNameWithoutBackTicks)) - } finally { - catalog.reset() - } - } - } - } - } - test("Create Database using Default Warehouse Path") { - withSQLConf(SQLConf.WAREHOUSE_PATH.key -> "") { - // Will use the default location if and only if we unset the conf - spark.conf.unset(SQLConf.WAREHOUSE_PATH.key) - val catalog = spark.sessionState.catalog - val dbName = "db1" - try { - sql(s"CREATE DATABASE $dbName") - val db1 = catalog.getDatabaseMetadata(dbName) - val expectedLocation = makeQualifiedPath(s"spark-warehouse/$dbName.db") - assert(db1 == CatalogDatabase( - dbName, - "", - expectedLocation, - Map.empty)) - sql(s"DROP DATABASE $dbName CASCADE") - assert(!catalog.databaseExists(dbName)) - } finally { - catalog.reset() - } + val catalog = spark.sessionState.catalog + val dbName = "db1" + try { + sql(s"CREATE DATABASE $dbName") + val db1 = catalog.getDatabaseMetadata(dbName) + val expectedLocation = makeQualifiedPath(s"spark-warehouse/$dbName.db") + assert(db1 == CatalogDatabase( + dbName, + "", + expectedLocation, + Map.empty)) + sql(s"DROP DATABASE $dbName CASCADE") + assert(!catalog.databaseExists(dbName)) + } finally { + catalog.reset() } } @@ -224,31 +181,26 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } test("Create Database - database already exists") { - withTempDir { tmpDir => - val path = tmpDir.getCanonicalPath - withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { - val catalog = spark.sessionState.catalog - val databaseNames = Seq("db1", "`database`") + val catalog = spark.sessionState.catalog + val databaseNames = Seq("db1", "`database`") - databaseNames.foreach { dbName => - try { - val dbNameWithoutBackTicks = cleanIdentifier(dbName) - sql(s"CREATE DATABASE $dbName") - val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) - val expectedLocation = makeQualifiedPath(s"$path/$dbNameWithoutBackTicks.db") - assert(db1 == CatalogDatabase( - dbNameWithoutBackTicks, - "", - expectedLocation, - Map.empty)) + databaseNames.foreach { dbName => + try { + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + sql(s"CREATE DATABASE $dbName") + val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) + val expectedLocation = makeQualifiedPath(s"spark-warehouse/$dbNameWithoutBackTicks.db") + assert(db1 == CatalogDatabase( + dbNameWithoutBackTicks, + "", + expectedLocation, + Map.empty)) - intercept[DatabaseAlreadyExistsException] { - sql(s"CREATE DATABASE $dbName") - } - } finally { - catalog.reset() - } + intercept[DatabaseAlreadyExistsException] { + sql(s"CREATE DATABASE $dbName") } + } finally { + catalog.reset() } } } @@ -473,47 +425,42 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } test("Alter/Describe Database") { - withTempDir { tmpDir => - val path = tmpDir.getCanonicalPath - withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { - val catalog = spark.sessionState.catalog - val databaseNames = Seq("db1", "`database`") + val catalog = spark.sessionState.catalog + val databaseNames = Seq("db1", "`database`") - databaseNames.foreach { dbName => - try { - val dbNameWithoutBackTicks = cleanIdentifier(dbName) - val location = makeQualifiedPath(s"$path/$dbNameWithoutBackTicks.db") + databaseNames.foreach { dbName => + try { + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + val location = makeQualifiedPath(s"spark-warehouse/$dbNameWithoutBackTicks.db") - sql(s"CREATE DATABASE $dbName") + sql(s"CREATE DATABASE $dbName") - checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), - Row("Database Name", dbNameWithoutBackTicks) :: - Row("Description", "") :: - Row("Location", location) :: - Row("Properties", "") :: Nil) + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", location) :: + Row("Properties", "") :: Nil) - sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") + sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") - checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), - Row("Database Name", dbNameWithoutBackTicks) :: - Row("Description", "") :: - Row("Location", location) :: - Row("Properties", "((a,a), (b,b), (c,c))") :: Nil) + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", location) :: + Row("Properties", "((a,a), (b,b), (c,c))") :: Nil) - sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") + sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") - checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), - Row("Database Name", dbNameWithoutBackTicks) :: - Row("Description", "") :: - Row("Location", location) :: - Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil) - } finally { - catalog.reset() - } - } + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", location) :: + Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil) + } finally { + catalog.reset() } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 11d4693f1c..a283ff971a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -215,18 +215,10 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } test("default value of WAREHOUSE_PATH") { - - val original = spark.conf.get(SQLConf.WAREHOUSE_PATH) - try { - // to get the default value, always unset it - spark.conf.unset(SQLConf.WAREHOUSE_PATH.key) - // JVM adds a trailing slash if the directory exists and leaves it as-is, if it doesn't - // In our comparison, strip trailing slash off of both sides, to account for such cases - assert(new Path(Utils.resolveURI("spark-warehouse")).toString.stripSuffix("/") === spark - .sessionState.conf.warehousePath.stripSuffix("/")) - } finally { - sql(s"set ${SQLConf.WAREHOUSE_PATH}=$original") - } + // JVM adds a trailing slash if the directory exists and leaves it as-is, if it doesn't + // In our comparison, strip trailing slash off of both sides, to account for such cases + assert(new Path(Utils.resolveURI("spark-warehouse")).toString.stripSuffix("/") === spark + .sessionState.conf.warehousePath.stripSuffix("/")) } test("MAX_CASES_BRANCHES") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index a5ef8723c8..81cd65c3cc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ -import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, WAREHOUSE_PATH} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -373,7 +373,7 @@ private[spark] object HiveUtils extends Logging { propMap.put(confvar.varname, confvar.getDefaultExpr()) } } - propMap.put(SQLConf.WAREHOUSE_PATH.key, localMetastore.toURI.toString) + propMap.put(WAREHOUSE_PATH.key, localMetastore.toURI.toString) propMap.put(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, s"jdbc:derby:${withInMemoryMode};databaseName=${localMetastore.getAbsolutePath};create=true") propMap.put("datanucleus.rdbms.datastoreAdapterClassName", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index a2b04863d3..15e3927b75 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -722,53 +722,46 @@ class HiveDDLSuite } private def dropDatabase(cascade: Boolean, tableExists: Boolean): Unit = { - withTempPath { tmpDir => - val path = tmpDir.toString - withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { - val dbName = "db1" - val fs = new Path(path).getFileSystem(spark.sessionState.newHadoopConf()) - val dbPath = new Path(path) - // the database directory does not exist - assert(!fs.exists(dbPath)) + val dbName = "db1" + val dbPath = new Path(spark.sessionState.conf.warehousePath) + val fs = dbPath.getFileSystem(spark.sessionState.newHadoopConf()) - sql(s"CREATE DATABASE $dbName") - val catalog = spark.sessionState.catalog - val expectedDBLocation = "file:" + appendTrailingSlash(dbPath.toString) + s"$dbName.db" - val db1 = catalog.getDatabaseMetadata(dbName) - assert(db1 == CatalogDatabase( - dbName, - "", - expectedDBLocation, - Map.empty)) - // the database directory was created - assert(fs.exists(dbPath) && fs.isDirectory(dbPath)) - sql(s"USE $dbName") + sql(s"CREATE DATABASE $dbName") + val catalog = spark.sessionState.catalog + val expectedDBLocation = "file:" + appendTrailingSlash(dbPath.toString) + s"$dbName.db" + val db1 = catalog.getDatabaseMetadata(dbName) + assert(db1 == CatalogDatabase( + dbName, + "", + expectedDBLocation, + Map.empty)) + // the database directory was created + assert(fs.exists(dbPath) && fs.isDirectory(dbPath)) + sql(s"USE $dbName") - val tabName = "tab1" - assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation))) - sql(s"CREATE TABLE $tabName as SELECT 1") - assert(tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation))) + val tabName = "tab1" + assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation))) + sql(s"CREATE TABLE $tabName as SELECT 1") + assert(tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation))) - if (!tableExists) { - sql(s"DROP TABLE $tabName") - assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation))) - } + if (!tableExists) { + sql(s"DROP TABLE $tabName") + assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation))) + } - sql(s"USE default") - val sqlDropDatabase = s"DROP DATABASE $dbName ${if (cascade) "CASCADE" else "RESTRICT"}" - if (tableExists && !cascade) { - val message = intercept[AnalysisException] { - sql(sqlDropDatabase) - }.getMessage - assert(message.contains(s"Database $dbName is not empty. One or more tables exist.")) - // the database directory was not removed - assert(fs.exists(new Path(expectedDBLocation))) - } else { - sql(sqlDropDatabase) - // the database directory was removed and the inclusive table directories are also removed - assert(!fs.exists(new Path(expectedDBLocation))) - } - } + sql(s"USE default") + val sqlDropDatabase = s"DROP DATABASE $dbName ${if (cascade) "CASCADE" else "RESTRICT"}" + if (tableExists && !cascade) { + val message = intercept[AnalysisException] { + sql(sqlDropDatabase) + }.getMessage + assert(message.contains(s"Database $dbName is not empty. One or more tables exist.")) + // the database directory was not removed + assert(fs.exists(new Path(expectedDBLocation))) + } else { + sql(sqlDropDatabase) + // the database directory was removed and the inclusive table directories are also removed + assert(!fs.exists(new Path(expectedDBLocation))) } }