Renamed this version of BlockedLocalFileShuffle to CustomBlockedLocalFileShuffle.
This commit is contained in:
parent
e30fdeb025
commit
89172fcd69
|
@ -1 +1 @@
|
|||
-Dspark.shuffle.class=spark.BlockedLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50
|
||||
-Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50
|
||||
|
|
|
@ -22,7 +22,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
|
|||
* TODO: Add support for compression when spark.compress is set to true.
|
||||
*/
|
||||
@serializable
|
||||
class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
||||
class CustomBlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
||||
@transient var totalSplits = 0
|
||||
@transient var hasSplits = 0
|
||||
|
||||
|
@ -42,7 +42,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
|||
: RDD[(K, C)] =
|
||||
{
|
||||
val sc = input.sparkContext
|
||||
val shuffleId = BlockedLocalFileShuffle.newShuffleId()
|
||||
val shuffleId = CustomBlockedLocalFileShuffle.newShuffleId()
|
||||
logInfo("Shuffle ID: " + shuffleId)
|
||||
|
||||
val splitRdd = new NumberedSplitRDD(input)
|
||||
|
@ -77,7 +77,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
|||
buckets(i).foreach(pair => {
|
||||
// Open a new file if necessary
|
||||
if (!isDirty) {
|
||||
file = BlockedLocalFileShuffle.getOutputFile(shuffleId, myIndex, i,
|
||||
file = CustomBlockedLocalFileShuffle.getOutputFile(shuffleId, myIndex, i,
|
||||
blockNum)
|
||||
writeStartTime = System.currentTimeMillis
|
||||
logInfo("BEGIN WRITE: " + file)
|
||||
|
@ -90,7 +90,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
|||
isDirty = true
|
||||
|
||||
// Close the old file if has crossed the blockSize limit
|
||||
if (file.length > BlockedLocalFileShuffle.BlockSize) {
|
||||
if (file.length > CustomBlockedLocalFileShuffle.BlockSize) {
|
||||
out.close()
|
||||
logInfo("END WRITE: " + file)
|
||||
val writeTime = System.currentTimeMillis - writeStartTime
|
||||
|
@ -112,13 +112,13 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
|||
|
||||
// Write the BLOCKNUM file
|
||||
file =
|
||||
BlockedLocalFileShuffle.getBlockNumOutputFile(shuffleId, myIndex, i)
|
||||
CustomBlockedLocalFileShuffle.getBlockNumOutputFile(shuffleId, myIndex, i)
|
||||
out = new ObjectOutputStream(new FileOutputStream(file))
|
||||
out.writeObject(blockNum)
|
||||
out.close()
|
||||
}
|
||||
|
||||
(myIndex, BlockedLocalFileShuffle.serverUri)
|
||||
(myIndex, CustomBlockedLocalFileShuffle.serverUri)
|
||||
}).collect()
|
||||
|
||||
// TODO: Could broadcast outputLocs
|
||||
|
@ -137,12 +137,12 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
|||
|
||||
combiners = new HashMap[K, C]
|
||||
|
||||
var threadPool = BlockedLocalFileShuffle.newDaemonFixedThreadPool(
|
||||
BlockedLocalFileShuffle.MaxConnections)
|
||||
var threadPool = CustomBlockedLocalFileShuffle.newDaemonFixedThreadPool(
|
||||
CustomBlockedLocalFileShuffle.MaxConnections)
|
||||
|
||||
while (hasSplits < totalSplits) {
|
||||
var numThreadsToCreate =
|
||||
Math.min(totalSplits, BlockedLocalFileShuffle.MaxConnections) -
|
||||
Math.min(totalSplits, CustomBlockedLocalFileShuffle.MaxConnections) -
|
||||
threadPool.getActiveCount
|
||||
|
||||
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
|
||||
|
@ -165,7 +165,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
|||
}
|
||||
|
||||
// Sleep for a while before creating new threads
|
||||
Thread.sleep(BlockedLocalFileShuffle.MinKnockInterval)
|
||||
Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval)
|
||||
}
|
||||
combiners
|
||||
})
|
||||
|
@ -183,7 +183,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
|||
}
|
||||
|
||||
if (requiredSplits.size > 0) {
|
||||
requiredSplits(BlockedLocalFileShuffle.ranGen.nextInt(
|
||||
requiredSplits(CustomBlockedLocalFileShuffle.ranGen.nextInt(
|
||||
requiredSplits.size))
|
||||
} else {
|
||||
-1
|
||||
|
@ -273,7 +273,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
|||
}
|
||||
|
||||
|
||||
object BlockedLocalFileShuffle extends Logging {
|
||||
object CustomBlockedLocalFileShuffle extends Logging {
|
||||
// Used thoughout the code for small and large waits/timeouts
|
||||
private var BlockSize_ = 1024 * 1024
|
||||
|
Loading…
Reference in a new issue