[SPARK-2805] Upgrade Akka to 2.3.4
This is a second rev of the Akka upgrade (earlier merged, but reverted). I made a slight modification which is that I also upgrade Hive to deal with a compatibility issue related to the protocol buffers library. Author: Anand Avati <avati@redhat.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #2752 from pwendell/akka-upgrade and squashes the following commits: 4c7ca3f [Patrick Wendell] Upgrading to new hive->protobuf version 57a2315 [Anand Avati] SPARK-1812: streaming - remove tests which depend on akka.actor.IO 2a551d3 [Anand Avati] SPARK-1812: core - upgrade to akka 2.3.4
This commit is contained in:
parent
6f98902a3d
commit
411cf29fff
|
@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
|
|||
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
|
||||
System.exit(-1)
|
||||
|
||||
case AssociationErrorEvent(cause, _, remoteAddress, _) =>
|
||||
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
|
||||
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
|
||||
println(s"Cause was: $cause")
|
||||
System.exit(-1)
|
||||
|
|
|
@ -154,7 +154,7 @@ private[spark] class AppClient(
|
|||
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
|
||||
markDisconnected()
|
||||
|
||||
case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
|
||||
case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) =>
|
||||
logWarning(s"Could not connect to $address: $cause")
|
||||
|
||||
case StopAppClient =>
|
||||
|
|
|
@ -54,7 +54,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
|
|||
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
|
||||
logInfo(s"Successfully connected to $workerUrl")
|
||||
|
||||
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
|
||||
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
|
||||
if isWorker(remoteAddress) =>
|
||||
// These logs may not be seen if the worker (and associated pipe) has died
|
||||
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
|
||||
|
|
|
@ -146,7 +146,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
|
|||
val masterTracker = new MapOutputTrackerMaster(conf)
|
||||
val actorSystem = ActorSystem("test")
|
||||
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
|
||||
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
|
||||
Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
|
||||
val masterActor = actorRef.underlyingActor
|
||||
|
||||
// Frame size should be ~123B, and no exception should be thrown
|
||||
|
@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
|
|||
val masterTracker = new MapOutputTrackerMaster(conf)
|
||||
val actorSystem = ActorSystem("test")
|
||||
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
|
||||
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
|
||||
Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
|
||||
val masterActor = actorRef.underlyingActor
|
||||
|
||||
// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
|
||||
|
|
4
pom.xml
4
pom.xml
|
@ -118,7 +118,7 @@
|
|||
<mesos.version>0.18.1</mesos.version>
|
||||
<mesos.classifier>shaded-protobuf</mesos.classifier>
|
||||
<akka.group>org.spark-project.akka</akka.group>
|
||||
<akka.version>2.2.3-shaded-protobuf</akka.version>
|
||||
<akka.version>2.3.4-spark</akka.version>
|
||||
<slf4j.version>1.7.5</slf4j.version>
|
||||
<log4j.version>1.2.17</log4j.version>
|
||||
<hadoop.version>1.0.4</hadoop.version>
|
||||
|
@ -127,7 +127,7 @@
|
|||
<hbase.version>0.94.6</hbase.version>
|
||||
<flume.version>1.4.0</flume.version>
|
||||
<zookeeper.version>3.4.5</zookeeper.version>
|
||||
<hive.version>0.12.0</hive.version>
|
||||
<hive.version>0.12.0-protobuf</hive.version>
|
||||
<parquet.version>1.4.3</parquet.version>
|
||||
<jblas.version>1.2.3</jblas.version>
|
||||
<jetty.version>8.1.14.v20131031</jetty.version>
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
package org.apache.spark.streaming
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.IO
|
||||
import akka.actor.IOManager
|
||||
import akka.actor.Props
|
||||
import akka.util.ByteString
|
||||
|
||||
|
@ -143,59 +141,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
|
||||
}
|
||||
|
||||
// TODO: This test works in IntelliJ but not through SBT
|
||||
ignore("actor input stream") {
|
||||
// Start the server
|
||||
val testServer = new TestServer()
|
||||
val port = testServer.port
|
||||
testServer.start()
|
||||
|
||||
// Set up the streaming context and input streams
|
||||
val ssc = new StreamingContext(conf, batchDuration)
|
||||
val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
|
||||
// Had to pass the local value of port to prevent from closing over entire scope
|
||||
StorageLevel.MEMORY_AND_DISK)
|
||||
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
|
||||
val outputStream = new TestOutputStream(networkStream, outputBuffer)
|
||||
def output = outputBuffer.flatMap(x => x)
|
||||
outputStream.register()
|
||||
ssc.start()
|
||||
|
||||
// Feed data to the server to send to the network receiver
|
||||
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
|
||||
val input = 1 to 9
|
||||
val expectedOutput = input.map(x => x.toString)
|
||||
Thread.sleep(1000)
|
||||
for (i <- 0 until input.size) {
|
||||
testServer.send(input(i).toString)
|
||||
Thread.sleep(500)
|
||||
clock.addToTime(batchDuration.milliseconds)
|
||||
}
|
||||
Thread.sleep(1000)
|
||||
logInfo("Stopping server")
|
||||
testServer.stop()
|
||||
logInfo("Stopping context")
|
||||
ssc.stop()
|
||||
|
||||
// Verify whether data received was as expected
|
||||
logInfo("--------------------------------")
|
||||
logInfo("output.size = " + outputBuffer.size)
|
||||
logInfo("output")
|
||||
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
|
||||
logInfo("expected output.size = " + expectedOutput.size)
|
||||
logInfo("expected output")
|
||||
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
|
||||
logInfo("--------------------------------")
|
||||
|
||||
// Verify whether all the elements received are as expected
|
||||
// (whether the elements were received one in each interval is not verified)
|
||||
assert(output.size === expectedOutput.size)
|
||||
for (i <- 0 until output.size) {
|
||||
assert(output(i) === expectedOutput(i))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
test("multi-thread receiver") {
|
||||
// set up the test receiver
|
||||
val numThreads = 10
|
||||
|
@ -377,22 +322,6 @@ class TestServer(portToBind: Int = 0) extends Logging {
|
|||
def port = serverSocket.getLocalPort
|
||||
}
|
||||
|
||||
/** This is an actor for testing actor input stream */
|
||||
class TestActor(port: Int) extends Actor with ActorHelper {
|
||||
|
||||
def bytesToString(byteString: ByteString) = byteString.utf8String
|
||||
|
||||
override def preStart(): Unit = {
|
||||
@deprecated("suppress compile time deprecation warning", "1.0.0")
|
||||
val unit = IOManager(context.system).connect(new InetSocketAddress(port))
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case IO.Read(socket, bytes) =>
|
||||
store(bytesToString(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
/** This is a receiver to test multiple threads inserting data using block generator */
|
||||
class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
|
||||
extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {
|
||||
|
|
Loading…
Reference in a new issue