Switch to Akka futures in connection manager.

It's still not good because each Future ends up waiting on a lock, but
it seems to work better than Scala Actors, and more importantly it
allows us to use onComplete and other listeners on futures.
This commit is contained in:
Matei Zaharia 2012-08-12 19:40:37 +02:00
parent ad8a7612a4
commit e17ed9a21d
2 changed files with 17 additions and 18 deletions

View file

@ -2,20 +2,19 @@ package spark.network
import spark._ import spark._
import scala.actors.Future import java.nio._
import scala.actors.Futures.future import java.nio.channels._
import java.nio.channels.spi._
import java.net._
import java.util.concurrent.Executors
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.collection.mutable.SynchronizedMap import scala.collection.mutable.SynchronizedMap
import scala.collection.mutable.SynchronizedQueue import scala.collection.mutable.SynchronizedQueue
import scala.collection.mutable.Queue import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import java.io._ import akka.dispatch.{ExecutionContext, Future}
import java.nio._
import java.nio.channels._
import java.nio.channels.spi._
import java.net._
import java.util.concurrent.Executors
case class ConnectionManagerId(host: String, port: Int) { case class ConnectionManagerId(host: String, port: Int) {
def toSocketAddress() = new InetSocketAddress(host, port) def toSocketAddress() = new InetSocketAddress(host, port)
@ -44,6 +43,9 @@ class ConnectionManager(port: Int) extends Logging {
val connectionRequests = new SynchronizedQueue[SendingConnection] val connectionRequests = new SynchronizedQueue[SendingConnection]
val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
val sendMessageRequests = new Queue[(Message, SendingConnection)] val sendMessageRequests = new Queue[(Message, SendingConnection)]
implicit val futureExecContext = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool(DaemonThreadFactory))
var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
@ -312,9 +314,9 @@ class ConnectionManager(port: Int) extends Logging {
messageStatuses += ((message.id, messageStatus)) messageStatuses += ((message.id, messageStatus))
} }
sendMessage(connectionManagerId, message) sendMessage(connectionManagerId, message)
future { Future {
messageStatus.synchronized { messageStatus.synchronized {
if (!messageStatus.attempted) { while (!messageStatus.attempted) {
logTrace("Waiting, " + messageStatuses.size + " statuses" ) logTrace("Waiting, " + messageStatuses.size + " statuses" )
messageStatus.wait() messageStatus.wait()
logTrace("Done waiting") logTrace("Done waiting")

View file

@ -9,12 +9,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import java.util.Collections import java.util.Collections
import scala.actors._ import akka.dispatch.{Await, Future}
import scala.actors.Actor._
import scala.actors.Future
import scala.actors.Futures.future
import scala.actors.remote._
import scala.actors.remote.RemoteActor._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
@ -30,6 +25,7 @@ import spark.SparkException
import spark.Utils import spark.Utils
import spark.util.ByteBufferInputStream import spark.util.ByteBufferInputStream
import spark.network._ import spark.network._
import akka.util.Duration
class BlockManagerId(var ip: String, var port: Int) extends Externalizable { class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
def this() = this(null, 0) def this() = this(null, 0)
@ -81,6 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(0) val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
val connectionManagerId = connectionManager.id val connectionManagerId = connectionManager.id
val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port) val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port)
@ -439,7 +436,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Initiate the replication before storing it locally. This is faster as // Initiate the replication before storing it locally. This is faster as
// data is already serialized and ready for sending // data is already serialized and ready for sending
val replicationFuture = if (level.replication > 1) { val replicationFuture = if (level.replication > 1) {
future { Future {
replicate(blockId, bytes, level) replicate(blockId, bytes, level)
} }
} else { } else {
@ -475,7 +472,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (replicationFuture == null) { if (replicationFuture == null) {
throw new Exception("Unexpected") throw new Exception("Unexpected")
} }
replicationFuture() Await.ready(replicationFuture, Duration.Inf)
} }
val finishTime = System.currentTimeMillis val finishTime = System.currentTimeMillis