[SPARK-22833][IMPROVEMENT] in SparkHive Scala Examples
## What changes were proposed in this pull request? SparkHive Scala Examples Improvement made: * Writing DataFrame / DataSet to Hive Managed , Hive External table using different storage format. * Implementation of Partition, Reparition, Coalesce with appropriate example. ## How was this patch tested? * Patch has been tested manually and by running ./dev/run-tests. Author: chetkhatri <ckhatrimanjal@gmail.com> Closes #20018 from chetkhatri/scala-sparkhive-examples.
This commit is contained in:
parent
8941a4abca
commit
86db9b2d7d
|
@ -19,8 +19,7 @@ package org.apache.spark.examples.sql.hive
|
|||
// $example on:spark_hive$
|
||||
import java.io.File
|
||||
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
|
||||
// $example off:spark_hive$
|
||||
|
||||
object SparkHiveExample {
|
||||
|
@ -102,8 +101,41 @@ object SparkHiveExample {
|
|||
// | 4| val_4| 4| val_4|
|
||||
// | 5| val_5| 5| val_5|
|
||||
// ...
|
||||
// $example off:spark_hive$
|
||||
|
||||
// Create Hive managed table with Parquet
|
||||
sql("CREATE TABLE records(key int, value string) STORED AS PARQUET")
|
||||
// Save DataFrame to Hive managed table as Parquet format
|
||||
val hiveTableDF = sql("SELECT * FROM records")
|
||||
hiveTableDF.write.mode(SaveMode.Overwrite).saveAsTable("database_name.records")
|
||||
// Create External Hive table with Parquet
|
||||
sql("CREATE EXTERNAL TABLE records(key int, value string) " +
|
||||
"STORED AS PARQUET LOCATION '/user/hive/warehouse/'")
|
||||
// to make Hive Parquet format compatible with Spark Parquet format
|
||||
spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")
|
||||
|
||||
// Multiple Parquet files could be created accordingly to volume of data under directory given.
|
||||
val hiveExternalTableLocation = "/user/hive/warehouse/database_name.db/records"
|
||||
|
||||
// Save DataFrame to Hive External table as compatible Parquet format
|
||||
hiveTableDF.write.mode(SaveMode.Overwrite).parquet(hiveExternalTableLocation)
|
||||
|
||||
// Turn on flag for Dynamic Partitioning
|
||||
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
|
||||
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
|
||||
|
||||
// You can create partitions in Hive table, so downstream queries run much faster.
|
||||
hiveTableDF.write.mode(SaveMode.Overwrite).partitionBy("key")
|
||||
.parquet(hiveExternalTableLocation)
|
||||
|
||||
// Reduce number of files for each partition by repartition
|
||||
hiveTableDF.repartition($"key").write.mode(SaveMode.Overwrite)
|
||||
.partitionBy("key").parquet(hiveExternalTableLocation)
|
||||
|
||||
// Control the number of files in each partition by coalesce
|
||||
hiveTableDF.coalesce(10).write.mode(SaveMode.Overwrite)
|
||||
.partitionBy("key").parquet(hiveExternalTableLocation)
|
||||
// $example off:spark_hive$
|
||||
|
||||
spark.stop()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue