From 88d9de26dda1c91132fd909b0995492388dc5fac Mon Sep 17 00:00:00 2001 From: shivusondur Date: Tue, 16 Apr 2019 09:30:46 -0500 Subject: [PATCH] [SPARK-27464][CORE] Added Constant instead of referring string literal used from many places ## What changes were proposed in this pull request? Added Constant instead of referring the same String literal "spark.buffer.pageSize" from many places ## How was this patch tested? Run the corresponding Unit Test Cases manually. Closes #24368 from shivusondur/Constant. Authored-by: shivusondur Signed-off-by: Sean Owen --- .../scala/org/apache/spark/internal/config/package.scala | 6 ++++++ .../main/scala/org/apache/spark/memory/MemoryManager.scala | 2 +- .../spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 2 +- .../collection/unsafe/sort/UnsafeExternalSorterSuite.java | 3 ++- .../apache/spark/sql/execution/joins/HashedRelation.scala | 7 +++---- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0bd46bef35..8e59ce7101 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1303,4 +1303,10 @@ package object config { .doc("Staging directory used while submitting applications.") .stringConf .createOptional + + private[spark] val BUFFER_PAGESIZE = ConfigBuilder("spark.buffer.pageSize") + .doc("The amount of memory used per page in bytes") + .bytesConf(ByteUnit.BYTE) + .createOptional + } diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index ff6d84b57e..c08b47f99d 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -255,7 +255,7 @@ private[spark] abstract class MemoryManager( } val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor) val default = math.min(maxPageSize, math.max(minPageSize, size)) - conf.getSizeAsBytes("spark.buffer.pageSize", default) + conf.get(BUFFER_PAGESIZE).getOrElse(default) } /** diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 9bf707f783..88125a6b93 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -101,7 +101,7 @@ public class UnsafeShuffleWriterSuite { partitionSizesInMergedFile = null; spillFilesCreated.clear(); conf = new SparkConf() - .set("spark.buffer.pageSize", "1m") + .set(package$.MODULE$.BUFFER_PAGESIZE().key(), "1m") .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false); taskMetrics = new TaskMetrics(); memoryManager = new TestMemoryManager(conf); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index dd71d3201b..c6aa623560 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -88,7 +88,8 @@ public class UnsafeExternalSorterSuite { protected boolean shouldUseRadixSort() { return false; } - private final long pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "4m"); + private final long pageSizeBytes = conf.getSizeAsBytes( + package$.MODULE$.BUFFER_PAGESIZE().key(), "4m"); private final int spillThreshold = (int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index b03e8f551c..9d8063d53b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -23,7 +23,7 @@ import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark.{SparkConf, SparkEnv, SparkException} -import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED +import org.apache.spark.internal.config.{BUFFER_PAGESIZE, MEMORY_OFFHEAP_ENABLED} import org.apache.spark.memory._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -235,7 +235,7 @@ private[joins] class UnsafeHashedRelation( 0) val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes) - .getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m")) + .getOrElse(new SparkConf().get(BUFFER_PAGESIZE).getOrElse(16L * 1024 * 1024)) // TODO(josh): We won't need this dummy memory manager after future refactorings; revisit // during code review @@ -285,8 +285,7 @@ private[joins] object UnsafeHashedRelation { taskMemoryManager: TaskMemoryManager): HashedRelation = { val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes) - .getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m")) - + .getOrElse(new SparkConf().get(BUFFER_PAGESIZE).getOrElse(16L * 1024 * 1024)) val binaryMap = new BytesToBytesMap( taskMemoryManager, // Only 70% of the slots can be used before growing, more capacity help to reduce collision