[SPARK-27464][CORE] Added Constant instead of referring string literal used from many places
## What changes were proposed in this pull request? Added Constant instead of referring the same String literal "spark.buffer.pageSize" from many places ## How was this patch tested? Run the corresponding Unit Test Cases manually. Closes #24368 from shivusondur/Constant. Authored-by: shivusondur <shivusondur@gmail.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
parent
257d01a6b8
commit
88d9de26dd
|
@ -1303,4 +1303,10 @@ package object config {
|
||||||
.doc("Staging directory used while submitting applications.")
|
.doc("Staging directory used while submitting applications.")
|
||||||
.stringConf
|
.stringConf
|
||||||
.createOptional
|
.createOptional
|
||||||
|
|
||||||
|
private[spark] val BUFFER_PAGESIZE = ConfigBuilder("spark.buffer.pageSize")
|
||||||
|
.doc("The amount of memory used per page in bytes")
|
||||||
|
.bytesConf(ByteUnit.BYTE)
|
||||||
|
.createOptional
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -255,7 +255,7 @@ private[spark] abstract class MemoryManager(
|
||||||
}
|
}
|
||||||
val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor)
|
val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor)
|
||||||
val default = math.min(maxPageSize, math.max(minPageSize, size))
|
val default = math.min(maxPageSize, math.max(minPageSize, size))
|
||||||
conf.getSizeAsBytes("spark.buffer.pageSize", default)
|
conf.get(BUFFER_PAGESIZE).getOrElse(default)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class UnsafeShuffleWriterSuite {
|
||||||
partitionSizesInMergedFile = null;
|
partitionSizesInMergedFile = null;
|
||||||
spillFilesCreated.clear();
|
spillFilesCreated.clear();
|
||||||
conf = new SparkConf()
|
conf = new SparkConf()
|
||||||
.set("spark.buffer.pageSize", "1m")
|
.set(package$.MODULE$.BUFFER_PAGESIZE().key(), "1m")
|
||||||
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false);
|
.set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false);
|
||||||
taskMetrics = new TaskMetrics();
|
taskMetrics = new TaskMetrics();
|
||||||
memoryManager = new TestMemoryManager(conf);
|
memoryManager = new TestMemoryManager(conf);
|
||||||
|
|
|
@ -88,7 +88,8 @@ public class UnsafeExternalSorterSuite {
|
||||||
|
|
||||||
protected boolean shouldUseRadixSort() { return false; }
|
protected boolean shouldUseRadixSort() { return false; }
|
||||||
|
|
||||||
private final long pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "4m");
|
private final long pageSizeBytes = conf.getSizeAsBytes(
|
||||||
|
package$.MODULE$.BUFFER_PAGESIZE().key(), "4m");
|
||||||
|
|
||||||
private final int spillThreshold =
|
private final int spillThreshold =
|
||||||
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
|
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
|
||||||
|
|
|
@ -23,7 +23,7 @@ import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
|
||||||
import com.esotericsoftware.kryo.io.{Input, Output}
|
import com.esotericsoftware.kryo.io.{Input, Output}
|
||||||
|
|
||||||
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
|
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
|
||||||
import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
|
import org.apache.spark.internal.config.{BUFFER_PAGESIZE, MEMORY_OFFHEAP_ENABLED}
|
||||||
import org.apache.spark.memory._
|
import org.apache.spark.memory._
|
||||||
import org.apache.spark.sql.catalyst.InternalRow
|
import org.apache.spark.sql.catalyst.InternalRow
|
||||||
import org.apache.spark.sql.catalyst.expressions._
|
import org.apache.spark.sql.catalyst.expressions._
|
||||||
|
@ -235,7 +235,7 @@ private[joins] class UnsafeHashedRelation(
|
||||||
0)
|
0)
|
||||||
|
|
||||||
val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes)
|
val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes)
|
||||||
.getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m"))
|
.getOrElse(new SparkConf().get(BUFFER_PAGESIZE).getOrElse(16L * 1024 * 1024))
|
||||||
|
|
||||||
// TODO(josh): We won't need this dummy memory manager after future refactorings; revisit
|
// TODO(josh): We won't need this dummy memory manager after future refactorings; revisit
|
||||||
// during code review
|
// during code review
|
||||||
|
@ -285,8 +285,7 @@ private[joins] object UnsafeHashedRelation {
|
||||||
taskMemoryManager: TaskMemoryManager): HashedRelation = {
|
taskMemoryManager: TaskMemoryManager): HashedRelation = {
|
||||||
|
|
||||||
val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes)
|
val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes)
|
||||||
.getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m"))
|
.getOrElse(new SparkConf().get(BUFFER_PAGESIZE).getOrElse(16L * 1024 * 1024))
|
||||||
|
|
||||||
val binaryMap = new BytesToBytesMap(
|
val binaryMap = new BytesToBytesMap(
|
||||||
taskMemoryManager,
|
taskMemoryManager,
|
||||||
// Only 70% of the slots can be used before growing, more capacity help to reduce collision
|
// Only 70% of the slots can be used before growing, more capacity help to reduce collision
|
||||||
|
|
Loading…
Reference in a new issue