[SPARK-17051][SQL] we should use hadoopConf in InsertIntoHiveTable

## What changes were proposed in this pull request?

Hive confs in hive-site.xml will be loaded in `hadoopConf`, so we should use `hadoopConf` in `InsertIntoHiveTable` instead of `SessionState.conf`

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14634 from cloud-fan/bug.
This commit is contained in:
Wenchen Fan 2016-09-20 09:53:28 -07:00 committed by Yin Huai
parent d5ec5dbb0d
commit eb004c6620
2 changed files with 31 additions and 10 deletions

View file

@ -147,8 +147,7 @@ case class InsertIntoHiveTable(
val hadoopConf = sessionState.newHadoopConf()
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed =
sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
@ -182,15 +181,13 @@ case class InsertIntoHiveTable(
// Validate partition spec if there exist any dynamic partitions
if (numDynamicPartitions > 0) {
// Report error if dynamic partitioning is not enabled
if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) {
if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
}
// Report error if dynamic partition strict mode is on but no static partition is found
if (numStaticPartitions == 0 &&
sessionState.conf.getConfString(
"hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict"))
{
hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
}

View file

@ -26,16 +26,17 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkException, SparkFiles}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.SparkFiles
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
case class TestData(a: Int, b: String)
@ -43,7 +44,7 @@ case class TestData(a: Int, b: String)
* A set of test cases expressed in Hive QL that are not covered by the tests
* included in the hive distribution.
*/
class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAndAfter {
private val originalTimeZone = TimeZone.getDefault
private val originalLocale = Locale.getDefault
@ -51,6 +52,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
def spark: SparkSession = sparkSession
override def beforeAll() {
super.beforeAll()
TestHive.setCacheTables(true)
@ -1199,6 +1202,27 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
assertUnsupportedFeature { sql("DROP TEMPORARY MACRO SIGMOID") }
}
test("dynamic partitioning is allowed when hive.exec.dynamic.partition.mode is nonstrict") {
val modeConfKey = "hive.exec.dynamic.partition.mode"
withTable("with_parts") {
sql("CREATE TABLE with_parts(key INT) PARTITIONED BY (p INT)")
withSQLConf(modeConfKey -> "nonstrict") {
sql("INSERT OVERWRITE TABLE with_parts partition(p) select 1, 2")
assert(spark.table("with_parts").filter($"p" === 2).collect().head == Row(1, 2))
}
val originalValue = spark.sparkContext.hadoopConfiguration.get(modeConfKey, "nonstrict")
try {
spark.sparkContext.hadoopConfiguration.set(modeConfKey, "nonstrict")
sql("INSERT OVERWRITE TABLE with_parts partition(p) select 3, 4")
assert(spark.table("with_parts").filter($"p" === 4).collect().head == Row(3, 4))
} finally {
spark.sparkContext.hadoopConfiguration.set(modeConfKey, originalValue)
}
}
}
}
// for SPARK-2180 test