From 4a45019fb0458b5f943253c0c16c9e257ef2c129 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Mon, 14 Oct 2013 00:24:17 -0700 Subject: [PATCH] Address Matei's comments --- .../network/netty/FileServerHandler.java | 2 +- .../spark/broadcast/HttpBroadcast.scala | 4 +-- .../spark/network/netty/ShuffleCopier.scala | 2 +- .../spark/network/netty/ShuffleSender.scala | 2 +- .../org/apache/spark/storage/BlockId.scala | 35 ++++++++++--------- .../apache/spark/storage/BlockManager.scala | 2 +- .../org/apache/spark/storage/DiskStore.scala | 2 +- .../apache/spark/storage/BlockIdSuite.scala | 13 ++----- 8 files changed, 28 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index 097b2ad6d5..cfd8132891 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -37,7 +37,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter { @Override public void messageReceived(ChannelHandlerContext ctx, String blockIdString) { BlockId blockId = BlockId.apply(blockIdString); - String path = pResolver.getAbsolutePath(blockId.asFilename()); + String path = pResolver.getAbsolutePath(blockId.name()); // if getFilePath returns null, close the channel if (path == null) { //ctx.close(); diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 919f9765de..609464e38d 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -120,7 +120,7 @@ private object HttpBroadcast extends Logging { } def write(id: Long, value: Any) { - val file = new File(broadcastDir, BroadcastBlockId(id).asFilename) + val file = new File(broadcastDir, BroadcastBlockId(id).name) val out: OutputStream = { if (compress) { compressionCodec.compressedOutputStream(new FileOutputStream(file)) @@ -136,7 +136,7 @@ private object HttpBroadcast extends Logging { } def read[T](id: Long): T = { - val url = serverUri + "/" + BroadcastBlockId(id).asFilename + val url = serverUri + "/" + BroadcastBlockId(id).name val in = { if (compress) { compressionCodec.compressedInputStream(new URL(url).openStream()) diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala index eac2177b54..481ff8c3e0 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala @@ -42,7 +42,7 @@ private[spark] class ShuffleCopier extends Logging { try { fc.init() fc.connect(host, port) - fc.sendRequest(blockId.asFilename) + fc.sendRequest(blockId.name) fc.waitForClose() fc.close() } catch { diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index b88fbaa4d1..1586dff254 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -64,7 +64,7 @@ private[spark] object ShuffleSender { val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) - val file = new File(subDir, blockId.asFilename) + val file = new File(subDir, blockId.name) return file.getAbsolutePath } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index b477a82e33..c7efc67a4a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -24,13 +24,10 @@ package org.apache.spark.storage * * If your BlockId should be serializable, be sure to add it to the BlockId.fromString() method. */ -private[spark] abstract class BlockId { +private[spark] sealed abstract class BlockId { /** A globally unique identifier for this Block. Can be used for ser/de. */ def name: String - /** Physical filename for this block. May not be valid for Blocks are not file-backed. */ - def asFilename = name - // convenience methods def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None def isRDD = isInstanceOf[RDDBlockId] @@ -73,21 +70,27 @@ private[spark] case class TestBlockId(id: String) extends BlockId { private[spark] object BlockId { val RDD = "rdd_([0-9]+)_([0-9]+)".r - val Shuffle = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r - val Broadcast = "broadcast_([0-9]+)".r - val TaskResult = "taskresult_([0-9]+)".r - val StreamInput = "input-([0-9]+)-([0-9]+)".r - val Test = "test_(.*)".r + val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r + val BROADCAST = "broadcast_([0-9]+)".r + val TASKRESULT = "taskresult_([0-9]+)".r + val STREAM = "input-([0-9]+)-([0-9]+)".r + val TEST = "test_(.*)".r /** Converts a BlockId "name" String back into a BlockId. */ def apply(id: String) = id match { - case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt) - case Shuffle(shuffleId, mapId, reduceId) => + case RDD(rddId, splitIndex) => + RDDBlockId(rddId.toInt, splitIndex.toInt) + case SHUFFLE(shuffleId, mapId, reduceId) => ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) - case Broadcast(broadcastId) => BroadcastBlockId(broadcastId.toLong) - case TaskResult(taskId) => TaskResultBlockId(taskId.toLong) - case StreamInput(streamId, uniqueId) => StreamBlockId(streamId.toInt, uniqueId.toLong) - case Test(value) => TestBlockId(value) - case _ => throw new IllegalStateException("Unrecognized BlockId: " + id) + case BROADCAST(broadcastId) => + BroadcastBlockId(broadcastId.toLong) + case TASKRESULT(taskId) => + TaskResultBlockId(taskId.toLong) + case STREAM(streamId, uniqueId) => + StreamBlockId(streamId.toInt, uniqueId.toLong) + case TEST(value) => + TestBlockId(value) + case _ => + throw new IllegalStateException("Unrecognized BlockId: " + id) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 4ca86b7015..801f88a3db 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -970,7 +970,7 @@ private[spark] class BlockManager( case ShuffleBlockId(_, _, _) => compressShuffle case BroadcastBlockId(_) => compressBroadcast case RDDBlockId(_, _) => compressRdds - case _ => false // Won't happen in a real cluster, but it can in tests + case _ => false } /** diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c0b0076a67..b7ca61e938 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -258,7 +258,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } } - new File(subDir, blockId.asFilename) + new File(subDir, blockId.name) } private def createLocalDirs(): Array[File] = { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala index 27f0dce9c9..cb76275e39 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala @@ -22,29 +22,20 @@ import org.scalatest.FunSuite class BlockIdSuite extends FunSuite { def assertSame(id1: BlockId, id2: BlockId) { assert(id1.name === id2.name) - assert(id1.asFilename === id2.asFilename) assert(id1.hashCode === id2.hashCode) assert(id1 === id2) } def assertDifferent(id1: BlockId, id2: BlockId) { assert(id1.name != id2.name) - assert(id1.asFilename != id2.asFilename) assert(id1.hashCode != id2.hashCode) assert(id1 != id2) } - test("basic-functions") { - case class MyBlockId(name: String) extends BlockId - - val id = MyBlockId("a") - assertSame(id, MyBlockId("a")) - assertDifferent(id, MyBlockId("b")) - assert(id.asRDDId === None) - + test("test-bad-deserialization") { try { // Try to deserialize an invalid block id. - BlockId("a") + BlockId("myblock") fail() } catch { case e: IllegalStateException => // OK