[SPARK-5126][Core] Verify Spark urls before creating Actors so that invalid urls can crash the process.

Because `actorSelection` will return `deadLetters` for an invalid path,  Worker keeps quiet for an invalid master url. It's better to log an error so that people can find such problem quickly.

This PR will check the url before sending to `actorSelection`, throw and log a SparkException for an invalid url.

Author: zsxwing <zsxwing@gmail.com>

Closes #3927 from zsxwing/SPARK-5126 and squashes the following commits:

9d429ee [zsxwing] Create a utility method in Utils to parse Spark url; verify urls before creating Actors so that invalid urls can crash the process.
8286e51 [zsxwing] Check the url before sending to Akka and log the error if the url is invalid
This commit is contained in:
zsxwing 2015-01-07 23:01:30 -08:00 committed by Reynold Xin
parent d345ebebd5
commit 2b729d2250
6 changed files with 116 additions and 33 deletions

View file

@ -160,6 +160,8 @@ object Client {
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
Master.toAkkaUrl(driverArgs.master)
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
actorSystem.awaitTermination()

View file

@ -26,7 +26,7 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@ -47,6 +47,8 @@ private[spark] class AppClient(
conf: SparkConf)
extends Logging {
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
@ -75,9 +77,9 @@ private[spark] class AppClient(
}
def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterApplication(appDescription)
}
}
@ -103,20 +105,14 @@ private[spark] class AppClient(
}
def changeMaster(url: String) {
// activeMasterUrl is a valid Spark url since we receive it from master.
activeMasterUrl = url
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(host, port) =>
Address("akka.tcp", Master.systemName, host, port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
masterAddress = Master.toAkkaAddress(activeMasterUrl)
}
private def isPossibleMaster(remoteUrl: Address) = {
masterUrls.map(s => Master.toAkkaUrl(s))
.map(u => AddressFromURIString(u).hostPort)
.contains(remoteUrl.hostPort)
masterAkkaUrls.map(AddressFromURIString(_).hostPort).contains(remoteUrl.hostPort)
}
override def receiveWithLogging = {

View file

@ -845,7 +845,6 @@ private[spark] class Master(
private[spark] object Master extends Logging {
val systemName = "sparkMaster"
private val actorName = "Master"
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
SignalLogger.register(log)
@ -855,14 +854,24 @@ private[spark] object Master extends Logging {
actorSystem.awaitTermination()
}
/** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
/**
* Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:port`.
*
* @throws SparkException if the url is invalid
*/
def toAkkaUrl(sparkUrl: String): String = {
sparkUrl match {
case sparkUrlRegex(host, port) =>
"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
case _ =>
throw new SparkException("Invalid master URL: " + sparkUrl)
}
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
}
/**
* Returns an akka `Address` for the Master actor given a sparkUrl `spark://host:port`.
*
* @throws SparkException if the url is invalid
*/
def toAkkaAddress(sparkUrl: String): Address = {
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
Address("akka.tcp", systemName, host, port)
}
def startSystemAndActor(

View file

@ -40,7 +40,7 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
/**
* @param masterUrls Each url should look like spark://host:port.
* @param masterAkkaUrls Each url should be a valid akka url.
*/
private[spark] class Worker(
host: String,
@ -48,7 +48,7 @@ private[spark] class Worker(
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
masterAkkaUrls: Array[String],
actorSystemName: String,
actorName: String,
workDirPath: String = null,
@ -171,15 +171,11 @@ private[spark] class Worker(
}
def changeMaster(url: String, uiUrl: String) {
// activeMasterUrl it's a valid Spark url since we receive it from master.
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(_host, _port) =>
Address("akka.tcp", Master.systemName, _host, _port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
masterAddress = Master.toAkkaAddress(activeMasterUrl)
connected = true
// Cancel any outstanding re-registration attempts because we found a new master
registrationRetryTimer.foreach(_.cancel())
@ -187,9 +183,9 @@ private[spark] class Worker(
}
private def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
}
@ -527,8 +523,9 @@ private[spark] object Worker extends Logging {
val securityMgr = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf, securityManager = securityMgr)
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
(actorSystem, boundPort)
}

View file

@ -1842,6 +1842,35 @@ private[spark] object Utils extends Logging {
sparkValue
}
}
/**
* Return a pair of host and port extracted from the `sparkUrl`.
*
* A spark url (`spark://host:port`) is a special URI that its scheme is `spark` and only contains
* host and port.
*
* @throws SparkException if `sparkUrl` is invalid.
*/
def extractHostPortFromSparkUrl(sparkUrl: String): (String, Int) = {
try {
val uri = new java.net.URI(sparkUrl)
val host = uri.getHost
val port = uri.getPort
if (uri.getScheme != "spark" ||
host == null ||
port < 0 ||
(uri.getPath != null && !uri.getPath.isEmpty) || // uri.getPath returns "" instead of null
uri.getFragment != null ||
uri.getQuery != null ||
uri.getUserInfo != null) {
throw new SparkException("Invalid master URL: " + sparkUrl)
}
(host, port)
} catch {
case e: java.net.URISyntaxException =>
throw new SparkException("Invalid master URL: " + sparkUrl, e)
}
}
}
/**

View file

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.master
import akka.actor.Address
import org.scalatest.FunSuite
import org.apache.spark.SparkException
class MasterSuite extends FunSuite {
test("toAkkaUrl") {
val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234")
assert("akka.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl)
}
test("toAkkaUrl: a typo url") {
val e = intercept[SparkException] {
Master.toAkkaUrl("spark://1.2. 3.4:1234")
}
assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
}
test("toAkkaAddress") {
val address = Master.toAkkaAddress("spark://1.2.3.4:1234")
assert(Address("akka.tcp", "sparkMaster", "1.2.3.4", 1234) === address)
}
test("toAkkaAddress: a typo url") {
val e = intercept[SparkException] {
Master.toAkkaAddress("spark://1.2. 3.4:1234")
}
assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
}
}