[SPARK-22233][CORE][FOLLOW-UP] Allow user to filter out empty split in HadoopRDD
## What changes were proposed in this pull request? Update the config `spark.files.ignoreEmptySplits`, rename it and make it internal. This is followup of #19464 ## How was this patch tested? Exsiting tests. Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes #19504 from jiangxb1987/partitionsplit.
This commit is contained in:
parent
0ae96495de
commit
0fa10666cf
|
@ -270,11 +270,12 @@ package object config {
|
|||
.longConf
|
||||
.createWithDefault(4 * 1024 * 1024)
|
||||
|
||||
private[spark] val IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.files.ignoreEmptySplits")
|
||||
.doc("If true, methods that use HadoopRDD and NewHadoopRDD such as " +
|
||||
"SparkContext.textFiles will not create a partition for input splits that are empty.")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
private[spark] val HADOOP_RDD_IGNORE_EMPTY_SPLITS =
|
||||
ConfigBuilder("spark.hadoopRDD.ignoreEmptySplits")
|
||||
.internal()
|
||||
.doc("When true, HadoopRDD/NewHadoopRDD will not create partitions for empty input splits.")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
private[spark] val SECRET_REDACTION_PATTERN =
|
||||
ConfigBuilder("spark.redaction.regex")
|
||||
|
|
|
@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi
|
|||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
|
||||
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
@ -134,7 +134,7 @@ class HadoopRDD[K, V](
|
|||
|
||||
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
|
||||
|
||||
private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
|
||||
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
|
||||
|
||||
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
|
||||
protected def getJobConf(): JobConf = {
|
||||
|
|
|
@ -35,7 +35,7 @@ import org.apache.spark._
|
|||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
|
||||
|
@ -90,7 +90,7 @@ class NewHadoopRDD[K, V](
|
|||
|
||||
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
|
||||
|
||||
private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)
|
||||
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
|
||||
|
||||
def getConf: Configuration = {
|
||||
val conf: Configuration = confBroadcast.value.value
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job
|
|||
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
|
||||
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
|
||||
|
||||
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.util.Utils
|
||||
|
@ -510,9 +510,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
|
|||
}
|
||||
}
|
||||
|
||||
test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
|
||||
test("spark.hadoopRDD.ignoreEmptySplits work correctly (old Hadoop API)") {
|
||||
val conf = new SparkConf()
|
||||
conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
|
||||
.setAppName("test")
|
||||
.setMaster("local")
|
||||
.set(HADOOP_RDD_IGNORE_EMPTY_SPLITS, true)
|
||||
sc = new SparkContext(conf)
|
||||
|
||||
def testIgnoreEmptySplits(
|
||||
|
@ -549,9 +551,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
|
|||
expectedPartitionNum = 2)
|
||||
}
|
||||
|
||||
test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
|
||||
test("spark.hadoopRDD.ignoreEmptySplits work correctly (new Hadoop API)") {
|
||||
val conf = new SparkConf()
|
||||
conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
|
||||
.setAppName("test")
|
||||
.setMaster("local")
|
||||
.set(HADOOP_RDD_IGNORE_EMPTY_SPLITS, true)
|
||||
sc = new SparkContext(conf)
|
||||
|
||||
def testIgnoreEmptySplits(
|
||||
|
|
Loading…
Reference in a new issue