SPARK-1497. Fix scalastyle warnings in YARN, Hive code
(I wasn't sure how to automatically set `SPARK_YARN=true` and `SPARK_HIVE=true` when running scalastyle, but these are the errors that turn up.) Author: Sean Owen <sowen@cloudera.com> Closes #413 from srowen/SPARK-1497 and squashes the following commits: f0c9318 [Sean Owen] Fix more scalastyle warnings in yarn 80bf4c3 [Sean Owen] Add YARN alpha / YARN profile to scalastyle check 026319c [Sean Owen] Fix scalastyle warnings in YARN, Hive code
This commit is contained in:
parent
c3527a333a
commit
77f8367996
|
@ -18,6 +18,10 @@
|
|||
#
|
||||
|
||||
echo -e "q\n" | sbt/sbt clean scalastyle > scalastyle.txt
|
||||
# Check style with YARN alpha built too
|
||||
SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt
|
||||
# Check style with YARN built too
|
||||
SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt
|
||||
ERRORS=$(cat scalastyle.txt | grep -e "\<error\>")
|
||||
if test ! -z "$ERRORS"; then
|
||||
echo -e "Scalastyle checks failed at following occurrences:\n$ERRORS"
|
||||
|
|
|
@ -37,7 +37,8 @@ import org.apache.spark.scheduler.SplitInfo
|
|||
class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
|
||||
extends Logging {
|
||||
|
||||
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
|
||||
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
|
||||
this(args, new Configuration(), sparkConf)
|
||||
|
||||
def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
|
||||
|
||||
|
@ -63,7 +64,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
|
|||
override def preStart() {
|
||||
logInfo("Listen to driver: " + driverUrl)
|
||||
driver = context.actorSelection(driverUrl)
|
||||
// Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
|
||||
// Send a hello message thus the connection is actually established, thus we can
|
||||
// monitor Lifecycle Events.
|
||||
driver ! "Hello"
|
||||
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
|
||||
}
|
||||
|
@ -104,8 +106,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
|
|||
// Allocate all containers
|
||||
allocateExecutors()
|
||||
|
||||
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
|
||||
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
|
||||
// Launch a progress reporter thread, else app will get killed after expiration
|
||||
// (def: 10mins) timeout ensure that progress is sent before
|
||||
// YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
|
||||
|
||||
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
|
||||
// we want to be reasonably responsive without causing too many requests to RM.
|
||||
|
@ -163,8 +166,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
|
|||
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
|
||||
.asInstanceOf[RegisterApplicationMasterRequest]
|
||||
appMasterRequest.setApplicationAttemptId(appAttemptId)
|
||||
// Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
|
||||
// Users can then monitor stderr/stdout on that node if required.
|
||||
// Setting this to master host,port - so that the ApplicationReport at client has
|
||||
// some sensible info. Users can then monitor stderr/stdout on that node if required.
|
||||
appMasterRequest.setHost(Utils.localHostName())
|
||||
appMasterRequest.setRpcPort(0)
|
||||
// What do we provide here ? Might make sense to expose something sensible later ?
|
||||
|
@ -213,7 +216,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
|
|||
// TODO: This is a bit ugly. Can we make it nicer?
|
||||
// TODO: Handle container failure
|
||||
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
|
||||
yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
|
||||
yarnAllocator.allocateContainers(
|
||||
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
|
||||
Thread.sleep(100)
|
||||
}
|
||||
|
||||
|
@ -230,7 +234,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
|
|||
while (!driverClosed) {
|
||||
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
|
||||
if (missingExecutorCount > 0) {
|
||||
logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers")
|
||||
logInfo("Allocating " + missingExecutorCount +
|
||||
" containers to make up for (potentially ?) lost containers")
|
||||
yarnAllocator.allocateContainers(missingExecutorCount)
|
||||
}
|
||||
else sendProgress()
|
||||
|
|
|
@ -225,8 +225,8 @@ private[yarn] class YarnAllocationHandler(
|
|||
val executorHostname = container.getNodeId.getHost
|
||||
val containerId = container.getId
|
||||
|
||||
assert(
|
||||
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
|
||||
assert( container.getResource.getMemory >=
|
||||
(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
|
||||
|
||||
if (numExecutorsRunningNow > maxExecutors) {
|
||||
logInfo("""Ignoring container %s at host %s, since we already have the required number of
|
||||
|
@ -393,9 +393,10 @@ private[yarn] class YarnAllocationHandler(
|
|||
|
||||
// default.
|
||||
if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
|
||||
logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty)
|
||||
resourceRequests = List(
|
||||
createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
|
||||
logDebug("numExecutors: " + numExecutors + ", host preferences: " +
|
||||
preferredHostToCount.isEmpty)
|
||||
resourceRequests = List(createResourceRequest(
|
||||
AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
|
||||
}
|
||||
else {
|
||||
// request for all hosts in preferred nodes and for numExecutors -
|
||||
|
|
|
@ -137,7 +137,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
|
|||
System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
|
||||
|
||||
val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
|
||||
System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
|
||||
System.setProperty(
|
||||
"spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
|
||||
}
|
||||
|
||||
/** Get the Yarn approved local directories. */
|
||||
|
|
|
@ -65,7 +65,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
|
|||
override def preStart() {
|
||||
logInfo("Listen to driver: " + driverUrl)
|
||||
driver = context.actorSelection(driverUrl)
|
||||
// Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
|
||||
// Send a hello message thus the connection is actually established,
|
||||
// thus we can monitor Lifecycle Events.
|
||||
driver ! "Hello"
|
||||
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
|
||||
}
|
||||
|
@ -95,8 +96,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
|
|||
// Allocate all containers
|
||||
allocateExecutors()
|
||||
|
||||
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
|
||||
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
|
||||
// Launch a progress reporter thread, else app will get killed after expiration
|
||||
// (def: 10mins) timeout ensure that progress is sent before
|
||||
// YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
|
||||
|
||||
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
|
||||
// we want to be reasonably responsive without causing too many requests to RM.
|
||||
|
|
|
@ -276,7 +276,8 @@ private[yarn] class YarnAllocationHandler(
|
|||
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
|
||||
}
|
||||
}
|
||||
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname))
|
||||
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
|
||||
driverUrl, executorHostname))
|
||||
val executorRunnable = new ExecutorRunnable(
|
||||
container,
|
||||
conf,
|
||||
|
@ -314,8 +315,8 @@ private[yarn] class YarnAllocationHandler(
|
|||
// `pendingReleaseContainers`.
|
||||
pendingReleaseContainers.remove(containerId)
|
||||
} else {
|
||||
// Decrement the number of executors running. The next iteration of the ApplicationMaster's
|
||||
// reporting thread will take care of allocating.
|
||||
// Decrement the number of executors running. The next iteration of
|
||||
// the ApplicationMaster's reporting thread will take care of allocating.
|
||||
numExecutorsRunning.decrementAndGet()
|
||||
logInfo("Completed container %s (state: %s, exit status: %s)".format(
|
||||
containerId,
|
||||
|
|
Loading…
Reference in a new issue