[SPARK-22676][FOLLOW-UP] fix code style for test.
## What changes were proposed in this pull request? This pr address comments in https://github.com/apache/spark/pull/19868 ; Fix the code style for `org.apache.spark.sql.hive.QueryPartitionSuite` by using: `withTempView`, `withTempDir`, `withTable`... Author: jinxing <jinxing6042@126.com> Closes #21091 from jinxing64/SPARK-22676-FOLLOW-UP.
This commit is contained in:
parent
e13416502f
commit
9e10f69df5
|
@ -33,80 +33,53 @@ import org.apache.spark.util.Utils
|
|||
class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
||||
import spark.implicits._
|
||||
|
||||
private def queryWhenPathNotExist(): Unit = {
|
||||
withTempView("testData") {
|
||||
withTable("table_with_partition", "createAndInsertTest") {
|
||||
withTempDir { tmpDir =>
|
||||
val testData = sparkContext.parallelize(
|
||||
(1 to 10).map(i => TestData(i, i.toString))).toDF()
|
||||
testData.createOrReplaceTempView("testData")
|
||||
|
||||
// create the table for test
|
||||
sql(s"CREATE TABLE table_with_partition(key int,value string) " +
|
||||
s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " +
|
||||
"SELECT key,value FROM testData")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " +
|
||||
"SELECT key,value FROM testData")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " +
|
||||
"SELECT key,value FROM testData")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " +
|
||||
"SELECT key,value FROM testData")
|
||||
|
||||
// test for the exist path
|
||||
checkAnswer(sql("select key,value from table_with_partition"),
|
||||
testData.union(testData).union(testData).union(testData))
|
||||
|
||||
// delete the path of one partition
|
||||
tmpDir.listFiles
|
||||
.find { f => f.isDirectory && f.getName().startsWith("ds=") }
|
||||
.foreach { f => Utils.deleteRecursively(f) }
|
||||
|
||||
// test for after delete the path
|
||||
checkAnswer(sql("select key,value from table_with_partition"),
|
||||
testData.union(testData).union(testData))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-5068: query data when path doesn't exist") {
|
||||
withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) {
|
||||
val testData = sparkContext.parallelize(
|
||||
(1 to 10).map(i => TestData(i, i.toString))).toDF()
|
||||
testData.createOrReplaceTempView("testData")
|
||||
|
||||
val tmpDir = Files.createTempDir()
|
||||
// create the table for test
|
||||
sql(s"CREATE TABLE table_with_partition(key int,value string) " +
|
||||
s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " +
|
||||
"SELECT key,value FROM testData")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " +
|
||||
"SELECT key,value FROM testData")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " +
|
||||
"SELECT key,value FROM testData")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " +
|
||||
"SELECT key,value FROM testData")
|
||||
|
||||
// test for the exist path
|
||||
checkAnswer(sql("select key,value from table_with_partition"),
|
||||
testData.toDF.collect ++ testData.toDF.collect
|
||||
++ testData.toDF.collect ++ testData.toDF.collect)
|
||||
|
||||
// delete the path of one partition
|
||||
tmpDir.listFiles
|
||||
.find { f => f.isDirectory && f.getName().startsWith("ds=") }
|
||||
.foreach { f => Utils.deleteRecursively(f) }
|
||||
|
||||
// test for after delete the path
|
||||
checkAnswer(sql("select key,value from table_with_partition"),
|
||||
testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect)
|
||||
|
||||
sql("DROP TABLE IF EXISTS table_with_partition")
|
||||
sql("DROP TABLE IF EXISTS createAndInsertTest")
|
||||
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "true") {
|
||||
queryWhenPathNotExist()
|
||||
}
|
||||
}
|
||||
|
||||
test("Replace spark.sql.hive.verifyPartitionPath by spark.files.ignoreMissingFiles") {
|
||||
withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "false")) {
|
||||
withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "false") {
|
||||
sparkContext.conf.set(IGNORE_MISSING_FILES.key, "true")
|
||||
val testData = sparkContext.parallelize(
|
||||
(1 to 10).map(i => TestData(i, i.toString))).toDF()
|
||||
testData.createOrReplaceTempView("testData")
|
||||
|
||||
val tmpDir = Files.createTempDir()
|
||||
// create the table for test
|
||||
sql(s"CREATE TABLE table_with_partition(key int,value string) " +
|
||||
s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " +
|
||||
"SELECT key,value FROM testData")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " +
|
||||
"SELECT key,value FROM testData")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " +
|
||||
"SELECT key,value FROM testData")
|
||||
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " +
|
||||
"SELECT key,value FROM testData")
|
||||
|
||||
// test for the exist path
|
||||
checkAnswer(sql("select key,value from table_with_partition"),
|
||||
testData.toDF.collect ++ testData.toDF.collect
|
||||
++ testData.toDF.collect ++ testData.toDF.collect)
|
||||
|
||||
// delete the path of one partition
|
||||
tmpDir.listFiles
|
||||
.find { f => f.isDirectory && f.getName().startsWith("ds=") }
|
||||
.foreach { f => Utils.deleteRecursively(f) }
|
||||
|
||||
// test for after delete the path
|
||||
checkAnswer(sql("select key,value from table_with_partition"),
|
||||
testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect)
|
||||
|
||||
sql("DROP TABLE IF EXISTS table_with_partition")
|
||||
sql("DROP TABLE IF EXISTS createAndInsertTest")
|
||||
queryWhenPathNotExist()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue