diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index cef2fa9b47..d6f9f927d5 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -376,6 +376,7 @@ varargsToStrEnv <- function(...) { getStorageLevel <- function(newLevel = c("DISK_ONLY", "DISK_ONLY_2", + "DISK_ONLY_3", "MEMORY_AND_DISK", "MEMORY_AND_DISK_2", "MEMORY_AND_DISK_SER", @@ -390,6 +391,7 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY", storageLevel <- switch(newLevel, "DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"), "DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"), + "DISK_ONLY_3" = callJStatic(storageLevelClass, "DISK_ONLY_3"), "MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"), "MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"), "MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass, @@ -415,6 +417,8 @@ storageLevelToString <- function(levelObj) { "DISK_ONLY" } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) { "DISK_ONLY_2" + } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 3) { + "DISK_ONLY_3" } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) { "MEMORY_ONLY" } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) { diff --git a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java index 3fcb52f615..b51cde48e6 100644 --- a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java +++ b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java @@ -26,6 +26,7 @@ public class StorageLevels { public static final StorageLevel NONE = create(false, false, false, false, 1); public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1); public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2); + public static final StorageLevel DISK_ONLY_3 = create(true, false, false, false, 3); public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1); public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2); public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1); diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 4c6998d7a8..f6db73ba80 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -153,6 +153,7 @@ object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) + val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) @@ -172,6 +173,7 @@ object StorageLevel { case "NONE" => NONE case "DISK_ONLY" => DISK_ONLY case "DISK_ONLY_2" => DISK_ONLY_2 + case "DISK_ONLY_3" => DISK_ONLY_3 case "MEMORY_ONLY" => MEMORY_ONLY case "MEMORY_ONLY_2" => MEMORY_ONLY_2 case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 3478b16325..ce1df3adf6 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -38,7 +38,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x implicit val defaultSignaler: Signaler = ThreadSignaler - val clusterUrl = "local-cluster[2,1,1024]" + val clusterUrl = "local-cluster[3,1,1024]" test("task throws not serializable exception") { // Ensures that executors do not crash when an exn is not serializable. If executors crash, @@ -174,7 +174,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex private def testCaching(conf: SparkConf, storageLevel: StorageLevel): Unit = { sc = new SparkContext(conf.setMaster(clusterUrl).setAppName("test")) - TestUtils.waitUntilExecutorsUp(sc, 2, 60000) + TestUtils.waitUntilExecutorsUp(sc, 3, 60000) val data = sc.parallelize(1 to 1000, 10) val cachedData = data.persist(storageLevel) assert(cachedData.count === 1000) @@ -206,7 +206,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex "caching on disk" -> StorageLevel.DISK_ONLY, "caching in memory, replicated" -> StorageLevel.MEMORY_ONLY_2, "caching in memory, serialized, replicated" -> StorageLevel.MEMORY_ONLY_SER_2, - "caching on disk, replicated" -> StorageLevel.DISK_ONLY_2, + "caching on disk, replicated 2" -> StorageLevel.DISK_ONLY_2, + "caching on disk, replicated 3" -> StorageLevel.DISK_ONLY_3, "caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2, "caching in memory and disk, serialized, replicated" -> StorageLevel.MEMORY_AND_DISK_SER_2 ).foreach { case (testName, storageLevel) => diff --git a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala index c942328acc..9e3f279110 100644 --- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala @@ -44,6 +44,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext { assert(transform(StorageLevel.MEMORY_ONLY_SER_2) === StorageLevel.MEMORY_AND_DISK_SER_2) assert(transform(StorageLevel.DISK_ONLY) === StorageLevel.DISK_ONLY) assert(transform(StorageLevel.DISK_ONLY_2) === StorageLevel.DISK_ONLY_2) + assert(transform(StorageLevel.DISK_ONLY_3) === StorageLevel.DISK_ONLY_3) assert(transform(StorageLevel.MEMORY_AND_DISK) === StorageLevel.MEMORY_AND_DISK) assert(transform(StorageLevel.MEMORY_AND_DISK_SER) === StorageLevel.MEMORY_AND_DISK_SER) assert(transform(StorageLevel.MEMORY_AND_DISK_2) === StorageLevel.MEMORY_AND_DISK_2) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index f3ed233da7..2ae51f425d 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -170,6 +170,7 @@ class JsonProtocolSuite extends SparkFunSuite { testStorageLevel(StorageLevel.NONE) testStorageLevel(StorageLevel.DISK_ONLY) testStorageLevel(StorageLevel.DISK_ONLY_2) + testStorageLevel(StorageLevel.DISK_ONLY_3) testStorageLevel(StorageLevel.MEMORY_ONLY) testStorageLevel(StorageLevel.MEMORY_ONLY_2) testStorageLevel(StorageLevel.MEMORY_ONLY_SER) diff --git a/docs/rdd-programming-guide.md b/docs/rdd-programming-guide.md index 07207f62bb..b48540dc09 100644 --- a/docs/rdd-programming-guide.md +++ b/docs/rdd-programming-guide.md @@ -1256,7 +1256,7 @@ storage levels is: **Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`, -`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, and `DISK_ONLY_2`.* +`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2`, and `DISK_ONLY_3`.* Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it. diff --git a/docs/sql-ref-syntax-aux-cache-cache-table.md b/docs/sql-ref-syntax-aux-cache-cache-table.md index 8829016fc1..9a1e61abba 100644 --- a/docs/sql-ref-syntax-aux-cache-cache-table.md +++ b/docs/sql-ref-syntax-aux-cache-cache-table.md @@ -49,6 +49,7 @@ CACHE [ LAZY ] TABLE table_identifier * `NONE` * `DISK_ONLY` * `DISK_ONLY_2` + * `DISK_ONLY_3` * `MEMORY_ONLY` * `MEMORY_ONLY_2` * `MEMORY_ONLY_SER` diff --git a/python/docs/source/reference/pyspark.rst b/python/docs/source/reference/pyspark.rst index b50ae37b99..8d9dedd2df 100644 --- a/python/docs/source/reference/pyspark.rst +++ b/python/docs/source/reference/pyspark.rst @@ -249,6 +249,7 @@ Management SparkFiles.getRootDirectory StorageLevel.DISK_ONLY StorageLevel.DISK_ONLY_2 + StorageLevel.DISK_ONLY_3 StorageLevel.MEMORY_AND_DISK StorageLevel.MEMORY_AND_DISK_2 StorageLevel.MEMORY_ONLY diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py index 951af45bb3..9c0d1ca661 100644 --- a/python/pyspark/storagelevel.py +++ b/python/pyspark/storagelevel.py @@ -51,6 +51,7 @@ class StorageLevel(object): StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False) StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) +StorageLevel.DISK_ONLY_3 = StorageLevel(True, False, False, False, 3) StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False) StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2) StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)