[SPARK-32517][CORE] Add StorageLevel.DISK_ONLY_3

### What changes were proposed in this pull request?

This PR aims to add `StorageLevel.DISK_ONLY_3` as a built-in `StorageLevel`.

### Why are the changes needed?

In a YARN cluster, HDFS uaually provides storages with replication factor 3. So, we can save the result to HDFS to get `StorageLevel.DISK_ONLY_3` technically. However, disaggregate clusters or clusters without storage services are rising. Previously, in that situation, the users were able to use similar `MEMORY_AND_DISK_2` or a user-created `StorageLevel`. This PR aims to support those use cases officially for better UX.

### Does this PR introduce _any_ user-facing change?

Yes. This provides a new built-in option.

### How was this patch tested?

Pass the GitHub Action or Jenkins with the revised test cases.

Closes #29331 from dongjoon-hyun/SPARK-32517.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Dongjoon Hyun 2020-08-10 07:33:06 -07:00
parent f80a480ee5
commit b421bf0196
10 changed files with 17 additions and 4 deletions

View file

@ -376,6 +376,7 @@ varargsToStrEnv <- function(...) {
getStorageLevel <- function(newLevel = c("DISK_ONLY",
"DISK_ONLY_2",
"DISK_ONLY_3",
"MEMORY_AND_DISK",
"MEMORY_AND_DISK_2",
"MEMORY_AND_DISK_SER",
@ -390,6 +391,7 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
storageLevel <- switch(newLevel,
"DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
"DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
"DISK_ONLY_3" = callJStatic(storageLevelClass, "DISK_ONLY_3"),
"MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
"MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
"MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
@ -415,6 +417,8 @@ storageLevelToString <- function(levelObj) {
"DISK_ONLY"
} else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
"DISK_ONLY_2"
} else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 3) {
"DISK_ONLY_3"
} else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
"MEMORY_ONLY"
} else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {

View file

@ -26,6 +26,7 @@ public class StorageLevels {
public static final StorageLevel NONE = create(false, false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
public static final StorageLevel DISK_ONLY_3 = create(true, false, false, false, 3);
public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);

View file

@ -153,6 +153,7 @@ object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
@ -172,6 +173,7 @@ object StorageLevel {
case "NONE" => NONE
case "DISK_ONLY" => DISK_ONLY
case "DISK_ONLY_2" => DISK_ONLY_2
case "DISK_ONLY_3" => DISK_ONLY_3
case "MEMORY_ONLY" => MEMORY_ONLY
case "MEMORY_ONLY_2" => MEMORY_ONLY_2
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER

View file

@ -38,7 +38,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
val clusterUrl = "local-cluster[2,1,1024]"
val clusterUrl = "local-cluster[3,1,1024]"
test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
@ -174,7 +174,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
private def testCaching(conf: SparkConf, storageLevel: StorageLevel): Unit = {
sc = new SparkContext(conf.setMaster(clusterUrl).setAppName("test"))
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
TestUtils.waitUntilExecutorsUp(sc, 3, 60000)
val data = sc.parallelize(1 to 1000, 10)
val cachedData = data.persist(storageLevel)
assert(cachedData.count === 1000)
@ -206,7 +206,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
"caching on disk" -> StorageLevel.DISK_ONLY,
"caching in memory, replicated" -> StorageLevel.MEMORY_ONLY_2,
"caching in memory, serialized, replicated" -> StorageLevel.MEMORY_ONLY_SER_2,
"caching on disk, replicated" -> StorageLevel.DISK_ONLY_2,
"caching on disk, replicated 2" -> StorageLevel.DISK_ONLY_2,
"caching on disk, replicated 3" -> StorageLevel.DISK_ONLY_3,
"caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2,
"caching in memory and disk, serialized, replicated" -> StorageLevel.MEMORY_AND_DISK_SER_2
).foreach { case (testName, storageLevel) =>

View file

@ -44,6 +44,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext {
assert(transform(StorageLevel.MEMORY_ONLY_SER_2) === StorageLevel.MEMORY_AND_DISK_SER_2)
assert(transform(StorageLevel.DISK_ONLY) === StorageLevel.DISK_ONLY)
assert(transform(StorageLevel.DISK_ONLY_2) === StorageLevel.DISK_ONLY_2)
assert(transform(StorageLevel.DISK_ONLY_3) === StorageLevel.DISK_ONLY_3)
assert(transform(StorageLevel.MEMORY_AND_DISK) === StorageLevel.MEMORY_AND_DISK)
assert(transform(StorageLevel.MEMORY_AND_DISK_SER) === StorageLevel.MEMORY_AND_DISK_SER)
assert(transform(StorageLevel.MEMORY_AND_DISK_2) === StorageLevel.MEMORY_AND_DISK_2)

View file

@ -170,6 +170,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testStorageLevel(StorageLevel.NONE)
testStorageLevel(StorageLevel.DISK_ONLY)
testStorageLevel(StorageLevel.DISK_ONLY_2)
testStorageLevel(StorageLevel.DISK_ONLY_3)
testStorageLevel(StorageLevel.MEMORY_ONLY)
testStorageLevel(StorageLevel.MEMORY_ONLY_2)
testStorageLevel(StorageLevel.MEMORY_ONLY_SER)

View file

@ -1256,7 +1256,7 @@ storage levels is:
**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, and `DISK_ONLY_2`.*
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2`, and `DISK_ONLY_3`.*
Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it.

View file

@ -49,6 +49,7 @@ CACHE [ LAZY ] TABLE table_identifier
* `NONE`
* `DISK_ONLY`
* `DISK_ONLY_2`
* `DISK_ONLY_3`
* `MEMORY_ONLY`
* `MEMORY_ONLY_2`
* `MEMORY_ONLY_SER`

View file

@ -249,6 +249,7 @@ Management
SparkFiles.getRootDirectory
StorageLevel.DISK_ONLY
StorageLevel.DISK_ONLY_2
StorageLevel.DISK_ONLY_3
StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK_2
StorageLevel.MEMORY_ONLY

View file

@ -51,6 +51,7 @@ class StorageLevel(object):
StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
StorageLevel.DISK_ONLY_3 = StorageLevel(True, False, False, False, 3)
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)