From b3a82b7df38e4c3e9cd12ca60aa18fa10c8d84c8 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 4 Sep 2013 06:57:38 +0530 Subject: [PATCH 1/2] Fix hash bug - caused failure after 35k stages, sigh --- .../apache/spark/network/netty/ShuffleSender.scala | 3 ++- .../scala/org/apache/spark/storage/DiskStore.scala | 2 +- .../main/scala/org/apache/spark/util/Utils.scala | 14 ++++++++++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 537f225469..17d59caf9a 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -20,6 +20,7 @@ package org.apache.spark.network.netty import java.io.File import org.apache.spark.Logging +import org.apache.spark.util.Utils private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging { @@ -57,7 +58,7 @@ private[spark] object ShuffleSender { throw new Exception("Block " + blockId + " is not a shuffle block") } // Figure out which local directory it hashes to, and which subdirectory in that - val hash = math.abs(blockId.hashCode) + val hash = Utils.toHash(blockId) val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index fc25ef0fae..e24f627dd8 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -238,7 +238,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Getting file for block " + blockId) // Figure out which local directory it hashes to, and which subdirectory in that - val hash = math.abs(blockId.hashCode) + val hash = Utils.toHash(blockId) val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 468800b2bd..54768b798f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -778,4 +778,18 @@ private[spark] object Utils extends Logging { val rawMod = x % mod rawMod + (if (rawMod < 0) mod else 0) } + + // Handles idiosyncracies with hash (add more as required) + def toHash(obj: AnyRef): Int = { + + // Required ? + if (obj eq null) return 0 + + val hash = obj.hashCode + // math.abs fails for Int.MinValue + val hashAbs = if (Int.MinValue != hash) math.abs(hash) else 0 + + // Nothing else to guard against ? + hashAbs + } } From 1e2474b814719673ce0faa2a8551d1acfb135ead Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 4 Sep 2013 07:46:46 +0530 Subject: [PATCH 2/2] Address review comments - rename toHash to nonNegativeHash --- .../scala/org/apache/spark/network/netty/ShuffleSender.scala | 2 +- core/src/main/scala/org/apache/spark/storage/DiskStore.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 17d59caf9a..8afcbe190a 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -58,7 +58,7 @@ private[spark] object ShuffleSender { throw new Exception("Block " + blockId + " is not a shuffle block") } // Figure out which local directory it hashes to, and which subdirectory in that - val hash = Utils.toHash(blockId) + val hash = Utils.nonNegativeHash(blockId) val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index e24f627dd8..63447baf8c 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -238,7 +238,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Getting file for block " + blockId) // Figure out which local directory it hashes to, and which subdirectory in that - val hash = Utils.toHash(blockId) + val hash = Utils.nonNegativeHash(blockId) val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 54768b798f..b890be2f6f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -780,7 +780,7 @@ private[spark] object Utils extends Logging { } // Handles idiosyncracies with hash (add more as required) - def toHash(obj: AnyRef): Int = { + def nonNegativeHash(obj: AnyRef): Int = { // Required ? if (obj eq null) return 0