diff --git a/assembly/README b/assembly/README index 6ee2a536d7..14a5ff8dfc 100644 --- a/assembly/README +++ b/assembly/README @@ -4,10 +4,9 @@ It creates a single tar.gz file that includes all needed dependency of the proje except for org.apache.hadoop.* jars that are supposed to be available from the deployed Hadoop cluster. -This module is off by default to avoid spending extra time on top of repl-bin -module. To activate it specify the profile in the command line - -Passembly +This module is off by default. To activate it specify the profile in the command line + -Pbigtop-dist -In case you want to avoid building time-expensive repl-bin module, that shaders -all the dependency into a big flat jar supplement maven command with - -DnoExpensive +If you need to build an assembly for a different version of Hadoop the +hadoop-version system property needs to be set as in this example: + -Dhadoop.version=2.0.6-alpha diff --git a/assembly/pom.xml b/assembly/pom.xml index d19f44d292..808a829e19 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -103,6 +103,12 @@ shade + + + + META-INF/services/org.apache.hadoop.fs.FileSystem + + diff --git a/conf/fairscheduler.xml.template b/conf/fairscheduler.xml.template index 04a6b418dc..acf59e2a35 100644 --- a/conf/fairscheduler.xml.template +++ b/conf/fairscheduler.xml.template @@ -1,15 +1,13 @@ - - 2 - 1 + FAIR - - - 3 - 2 + 1 + 2 + + FIFO - - - + 2 + 3 + diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index 6c36f3cca4..ae10f615d1 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -31,7 +31,7 @@ # 1. To add a new sink, set the "class" option to a fully qualified class # name (see examples below). # 2. Some sinks involve a polling period. The minimum allowed polling period -# is 1 second. +# is 1 second. # 3. Wild card properties can be overridden by more specific properties. # For example, master.sink.console.period takes precedence over # *.sink.console.period. @@ -47,11 +47,45 @@ # instance master and applications. MetricsServlet may not be configured by self. # +## List of available sinks and their properties. + +# org.apache.spark.metrics.sink.ConsoleSink +# Name: Default: Description: +# period 10 Poll period +# unit seconds Units of poll period + +# org.apache.spark.metrics.sink.CSVSink +# Name: Default: Description: +# period 10 Poll period +# unit seconds Units of poll period +# directory /tmp Where to store CSV files + +# org.apache.spark.metrics.sink.GangliaSink +# Name: Default: Description: +# host NONE Hostname or multicast group of Ganglia server +# port NONE Port of Ganglia server(s) +# period 10 Poll period +# unit seconds Units of poll period +# ttl 1 TTL of messages sent by Ganglia +# mode multicast Ganglia network mode ('unicast' or 'mulitcast') + +# org.apache.spark.metrics.sink.JmxSink + +# org.apache.spark.metrics.sink.MetricsServlet +# Name: Default: Description: +# path VARIES* Path prefix from the web server root +# sample false Whether to show entire set of samples for histograms ('false' or 'true') +# +# * Default path is /metrics/json for all instances except the master. The master has two paths: +# /metrics/aplications/json # App information +# /metrics/master/json # Master information + +## Examples # Enable JmxSink for all instances by class name -#*.sink.jmx.class=spark.metrics.sink.JmxSink +#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink # Enable ConsoleSink for all instances by class name -#*.sink.console.class=spark.metrics.sink.ConsoleSink +#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink # Polling period for ConsoleSink #*.sink.console.period=10 @@ -64,7 +98,7 @@ #master.sink.console.unit=seconds # Enable CsvSink for all instances -#*.sink.csv.class=spark.metrics.sink.CsvSink +#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink # Polling period for CsvSink #*.sink.csv.period=1 @@ -80,11 +114,11 @@ #worker.sink.csv.unit=minutes # Enable jvm source for instance master, worker, driver and executor -#master.source.jvm.class=spark.metrics.source.JvmSource +#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource -#worker.source.jvm.class=spark.metrics.source.JvmSource +#worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource -#driver.source.jvm.class=spark.metrics.source.JvmSource +#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource -#executor.source.jvm.class=spark.metrics.source.JvmSource +#executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource diff --git a/core/pom.xml b/core/pom.xml index 5738b7406f..14cd520aaf 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -36,6 +36,11 @@ org.apache.hadoop hadoop-client + + net.java.dev.jets3t + jets3t + 0.7.1 + org.apache.avro avro @@ -146,6 +151,10 @@ com.codahale.metrics metrics-json + + com.codahale.metrics + metrics-ganglia + org.apache.derby derby diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index e299a106ee..68b99ca125 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -66,10 +66,12 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } try { // If we got here, we have to load the split - val elements = new ArrayBuffer[Any] logInfo("Computing partition " + split) - elements ++= rdd.computeOrReadCheckpoint(split, context) - // Try to put this block in the blockManager + val computedValues = rdd.computeOrReadCheckpoint(split, context) + // Persist the result, so long as the task is not running locally + if (context.runningLocally) { return computedValues } + val elements = new ArrayBuffer[Any] + elements ++= computedValues blockManager.put(key, elements, storageLevel, true) return elements.iterator.asInstanceOf[Iterator[T]] } finally { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f2641851cb..72540c712a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -27,7 +27,6 @@ import scala.collection.generic.Growable import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import scala.util.DynamicVariable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -196,7 +195,7 @@ class SparkContext( case "yarn-standalone" => val scheduler = try { - val clazz = Class.forName("spark.scheduler.cluster.YarnClusterScheduler") + val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(this).asInstanceOf[ClusterScheduler] } catch { @@ -257,20 +256,20 @@ class SparkContext( private[spark] var checkpointDir: Option[String] = None // Thread Local variable that can be used by users to pass information down the stack - private val localProperties = new DynamicVariable[Properties](null) + private val localProperties = new ThreadLocal[Properties] def initLocalProperties() { - localProperties.value = new Properties() + localProperties.set(new Properties()) } def setLocalProperty(key: String, value: String) { - if (localProperties.value == null) { - localProperties.value = new Properties() + if (localProperties.get() == null) { + localProperties.set(new Properties()) } if (value == null) { - localProperties.value.remove(key) + localProperties.get.remove(key) } else { - localProperties.value.setProperty(key, value) + localProperties.get.setProperty(key, value) } } @@ -282,8 +281,8 @@ class SparkContext( // Post init taskScheduler.postStartHook() - val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler) - val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager) + val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) + val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) @@ -723,7 +722,8 @@ class SparkContext( val callSite = Utils.formatSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value) + val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, + localProperties.get) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result @@ -806,7 +806,8 @@ class SparkContext( val callSite = Utils.formatSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value) + val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, + localProperties.get) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") result } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 478e5a0aaf..29968c273c 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -62,7 +62,7 @@ class SparkEnv ( val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) if(yarnMode) { try { - Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] + Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] } catch { case th: Throwable => throw new SparkException("Unable to load YARN support", th) } diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index b2dd668330..c2c358c7ad 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -24,6 +24,7 @@ class TaskContext( val stageId: Int, val splitId: Int, val attemptId: Long, + val runningLocally: Boolean = false, val taskMetrics: TaskMetrics = TaskMetrics.empty() ) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c31619db27..1cfff5e565 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -127,4 +127,8 @@ private[deploy] object DeployMessages { case object CheckForWorkerTimeOut + case object RequestWebUIPort + + case class WebUIPortResponse(webUIBoundPort: Int) + } diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index a6be8efef1..87a703427c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -44,7 +44,9 @@ private[spark] object JsonProtocol { ("cores" -> obj.desc.maxCores) ~ ("user" -> obj.desc.user) ~ ("memoryperslave" -> obj.desc.memoryPerSlave) ~ - ("submitdate" -> obj.submitDate.toString) + ("submitdate" -> obj.submitDate.toString) ~ + ("state" -> obj.state.toString) ~ + ("duration" -> obj.duration) } def writeApplicationDescription(obj: ApplicationDescription) = { diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 78e3747ad8..10161c8204 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -43,7 +43,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ - val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0) + val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0) masterActorSystems += masterSystem val masterUrl = "spark://" + localHostname + ":" + masterPort diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7cf0a7754f..bde59905bc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -17,15 +17,18 @@ package org.apache.spark.deploy.master -import java.text.SimpleDateFormat import java.util.Date +import java.text.SimpleDateFormat import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import akka.actor._ import akka.actor.Terminated +import akka.dispatch.Await +import akka.pattern.ask import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} import akka.util.duration._ +import akka.util.Timeout import org.apache.spark.{Logging, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} @@ -33,6 +36,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} +import akka.util.{Duration, Timeout} private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { @@ -180,6 +184,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case CheckForWorkerTimeOut => { timeOutDeadWorkers() } + + case RequestWebUIPort => { + sender ! WebUIPortResponse(webUi.boundPort.getOrElse(-1)) + } } /** @@ -364,7 +372,7 @@ private[spark] object Master { def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) - val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort) + val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort) actorSystem.awaitTermination() } @@ -378,9 +386,14 @@ private[spark] object Master { } } - def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = { + def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName) - (actorSystem, boundPort) + val timeoutDuration = Duration.create( + System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + implicit val timeout = Timeout(timeoutDuration) + val respFuture = actor ? RequestWebUIPort // ask pattern + val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse] + (actorSystem, boundPort, resp.webUIBoundPort) } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d365804994..ceae3b8289 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -98,7 +98,7 @@ private[spark] class Executor( } ) - val executorSource = new ExecutorSource(this) + val executorSource = new ExecutorSource(this, executorId) // Initialize Spark environment (using system properties read above) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index bf8fb4fd21..18c9dc1c0a 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.metrics.source.Source -class ExecutorSource(val executor: Executor) extends Source { +class ExecutorSource(val executor: Executor, executorId: String) extends Source { private def fileStats(scheme: String) : Option[FileSystem.Statistics] = FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption @@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source { } val metricRegistry = new MetricRegistry() - val sourceName = "executor" + // TODO: It would be nice to pass the application name here + val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 0f9c4e00b1..caab748d60 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -37,10 +37,9 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi private def setDefaultProperties(prop: Properties) { prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet") - prop.setProperty("*.sink.servlet.uri", "/metrics/json") - prop.setProperty("*.sink.servlet.sample", "false") - prop.setProperty("master.sink.servlet.uri", "/metrics/master/json") - prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json") + prop.setProperty("*.sink.servlet.path", "/metrics/json") + prop.setProperty("master.sink.servlet.path", "/metrics/master/json") + prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json") } def initialize() { diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala new file mode 100644 index 0000000000..b924907070 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import com.codahale.metrics.ganglia.GangliaReporter +import com.codahale.metrics.MetricRegistry +import info.ganglia.gmetric4j.gmetric.GMetric + +import org.apache.spark.metrics.MetricsSystem + +class GangliaSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val GANGLIA_KEY_PERIOD = "period" + val GANGLIA_DEFAULT_PERIOD = 10 + + val GANGLIA_KEY_UNIT = "unit" + val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS + + val GANGLIA_KEY_MODE = "mode" + val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST + + // TTL for multicast messages. If listeners are X hops away in network, must be at least X. + val GANGLIA_KEY_TTL = "ttl" + val GANGLIA_DEFAULT_TTL = 1 + + val GANGLIA_KEY_HOST = "host" + val GANGLIA_KEY_PORT = "port" + + def propertyToOption(prop: String) = Option(property.getProperty(prop)) + + if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) { + throw new Exception("Ganglia sink requires 'host' property.") + } + + if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) { + throw new Exception("Ganglia sink requires 'port' property.") + } + + val host = propertyToOption(GANGLIA_KEY_HOST).get + val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt + val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) + val mode = propertyToOption(GANGLIA_KEY_MODE) + .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) + val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) + .getOrElse(GANGLIA_DEFAULT_PERIOD) + val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase)) + .getOrElse(GANGLIA_DEFAULT_UNIT) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val ganglia = new GMetric(host, port, mode, ttl) + val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build(ganglia) + + override def start() { + reporter.start(pollPeriod, pollUnit) + } + + override def stop() { + reporter.stop() + } +} + diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 4e90dd4323..99357fede6 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -31,18 +31,21 @@ import org.eclipse.jetty.server.Handler import org.apache.spark.ui.JettyUtils class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink { - val SERVLET_KEY_URI = "uri" + val SERVLET_KEY_PATH = "path" val SERVLET_KEY_SAMPLE = "sample" - val servletURI = property.getProperty(SERVLET_KEY_URI) + val SERVLET_DEFAULT_SAMPLE = false - val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean + val servletPath = property.getProperty(SERVLET_KEY_PATH) + + val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean) + .getOrElse(SERVLET_DEFAULT_SAMPLE) val mapper = new ObjectMapper().registerModule( new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) def getHandlers = Array[(String, Handler)]( - (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json")) + (servletPath, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json")) ) def getMetricsSnapshot(request: HttpServletRequest): String = { diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 537f225469..8afcbe190a 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -20,6 +20,7 @@ package org.apache.spark.network.netty import java.io.File import org.apache.spark.Logging +import org.apache.spark.util.Utils private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging { @@ -57,7 +58,7 @@ private[spark] object ShuffleSender { throw new Exception("Block " + blockId + " is not a shuffle block") } // Figure out which local directory it hashes to, and which subdirectory in that - val hash = math.abs(blockId.hashCode) + val hash = Utils.nonNegativeHash(blockId) val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e143ecd096..1082cbae3e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -267,6 +267,23 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD that is reduced into `numPartitions` partitions. + * + * This results in a narrow dependency, e.g. if you go from 1000 partitions + * to 100 partitions, there will not be a shuffle, instead each of the 100 + * new partitions will claim 10 of the current partitions. + * + * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, + * this may result in your computation taking place on fewer nodes than + * you like (e.g. one node in the case of numPartitions = 1). To avoid this, + * you can pass shuffle = true. This will add a shuffle step, but means the + * current upstream partitions will be executed in parallel (per whatever + * the current partitioning is). + * + * Note: With shuffle = true, you can actually coalesce to a larger number + * of partitions. This is useful if you have a small number of partitions, + * say 100, potentially with a few partitions being abnormally large. Calling + * coalesce(1000, shuffle = true) will result in 1000 partitions with the + * data distributed using a hash partitioner. */ def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = { if (shuffle) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 92add5b073..3e3f04f087 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -478,7 +478,8 @@ class DAGScheduler( SparkEnv.set(env) val rdd = job.finalStage.rdd val split = rdd.partitions(job.partitions(0)) - val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0) + val taskContext = + new TaskContext(job.finalStage.id, job.partitions(0), 0, runningLocally = true) try { val result = job.func(taskContext, rdd.iterator(split, taskContext)) job.listener.taskSucceeded(0, result) @@ -531,9 +532,16 @@ class DAGScheduler( tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) } } + + val properties = if (idToActiveJob.contains(stage.jobId)) { + idToActiveJob(stage.jobId).properties + } else { + //this stage will be assigned to "default" pool + null + } + // must be run listener before possible NotSerializableException // should be "StageSubmitted" first and then "JobEnded" - val properties = idToActiveJob(stage.jobId).properties listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties)) if (tasks.size > 0) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 22e3723ac8..446d490cc9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -20,10 +20,12 @@ package org.apache.spark.scheduler import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.SparkContext -private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source { +private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) + extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "DAGScheduler" + val sourceName = "%s.DAGScheduler".format(sc.appName) metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 2b007cbe82..07e8317e3a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -77,7 +77,7 @@ private[spark] class ResultTask[T, U]( var func: (TaskContext, Iterator[T]) => U, var partition: Int, @transient locs: Seq[TaskLocation], - val outputId: Int) + var outputId: Int) extends Task[U](stageId) with Externalizable { def this() = this(0, null, null, 0, null, 0) @@ -93,7 +93,7 @@ private[spark] class ResultTask[T, U]( } override def run(attemptId: Long): U = { - val context = new TaskContext(stageId, partition, attemptId) + val context = new TaskContext(stageId, partition, attemptId, runningLocally = false) metrics = Some(context.taskMetrics) try { func(context, rdd.iterator(split, context)) @@ -130,7 +130,7 @@ private[spark] class ResultTask[T, U]( rdd = rdd_.asInstanceOf[RDD[T]] func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U] partition = in.readInt() - val outputId = in.readInt() + outputId = in.readInt() epoch = in.readLong() split = in.readObject().asInstanceOf[Partition] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 764775fede..d23df0dd2b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -132,7 +132,7 @@ private[spark] class ShuffleMapTask( override def run(attemptId: Long): MapStatus = { val numOutputSplits = dep.partitioner.numPartitions - val taskContext = new TaskContext(stageId, partition, attemptId) + val taskContext = new TaskContext(stageId, partition, attemptId, runningLocally = false) metrics = Some(taskContext.taskMetrics) val blockManager = SparkEnv.get.blockManager diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 3196ab5022..919acce828 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -94,7 +94,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) var rootPool: Pool = null // default scheduler is FIFO val schedulingMode: SchedulingMode = SchedulingMode.withName( - System.getProperty("spark.cluster.schedulingmode", "FIFO")) + System.getProperty("spark.scheduler.mode", "FIFO")) override def setListener(listener: TaskSchedulerListener) { this.listener = listener diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index 1b31c8c57e..0ac3d7bcfd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -335,7 +335,7 @@ private[spark] class ClusterTaskSetManager( } /** - * Respond to an offer of a single slave from the scheduler by finding a task + * Respond to an offer of a single executor from the scheduler by finding a task */ override def resourceOffer( execId: String, @@ -358,7 +358,7 @@ private[spark] class ClusterTaskSetManager( val task = tasks(index) val taskId = sched.newTaskId() // Figure out whether this should count as a preferred launch - logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format( + logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format( taskSet.id, index, taskId, execId, host, taskLocality)) // Do various bookkeeping copiesRunning(index) += 1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala index d04eeb6b98..f80823317b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala @@ -51,8 +51,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { - val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file") - val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool" + val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file") + val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" val DEFAULT_POOL_NAME = "default" val MINIMUM_SHARES_PROPERTY = "minShare" val SCHEDULING_MODE_PROPERTY = "schedulingMode" @@ -60,7 +60,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) val POOL_NAME_PROPERTY = "@name" val POOLS_PROPERTY = "pool" val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO - val DEFAULT_MINIMUM_SHARE = 2 + val DEFAULT_MINIMUM_SHARE = 0 val DEFAULT_WEIGHT = 1 override def buildPools() { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index d003bf1bba..9c49768c0c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -50,7 +50,7 @@ private[spark] class SparkDeploySchedulerBackend( "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome, - sc.ui.appUIAddress) + "http://" + sc.ui.appUIAddress) client = new Client(sc.env.actorSystem, master, appDesc, this) client.start() @@ -76,17 +76,17 @@ private[spark] class SparkDeploySchedulerBackend( } } - override def executorAdded(executorId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { + override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( - executorId, hostPort, cores, Utils.megabytesToString(memory))) + fullId, hostPort, cores, Utils.megabytesToString(memory))) } - override def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) { + override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) { val reason: ExecutorLossReason = exitStatus match { case Some(code) => ExecutorExited(code) case None => SlaveLost(message) } - logInfo("Executor %s removed: %s".format(executorId, message)) - removeExecutor(executorId, reason.toString) + logInfo("Executor %s removed: %s".format(fullId, message)) + removeExecutor(fullId.split("/")(1), reason.toString) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala index e8fa5e2f17..8cb4d1396f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala @@ -91,7 +91,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var schedulableBuilder: SchedulableBuilder = null var rootPool: Pool = null val schedulingMode: SchedulingMode = SchedulingMode.withName( - System.getProperty("spark.cluster.schedulingmode", "FIFO")) + System.getProperty("spark.scheduler.mode", "FIFO")) val activeTaskSets = new HashMap[String, TaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 3d709cfde4..acc3951088 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -20,11 +20,13 @@ package org.apache.spark.storage import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.SparkContext -private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source { +private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) + extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "BlockManager" + val sourceName = "%s.BlockManager".format(sc.appName) metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] { override def getValue: Long = { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index fc25ef0fae..63447baf8c 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -238,7 +238,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Getting file for block " + blockId) // Figure out which local directory it hashes to, and which subdirectory in that - val hash = math.abs(blockId.hashCode) + val hash = Utils.nonNegativeHash(blockId) val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 755f1a760e..632ff047d1 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -23,9 +23,9 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, * whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory * in a serialized format, and whether to replicate the RDD partitions on multiple nodes. - * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants for - * commonly useful storage levels. To create your own storage level object, use the factor method - * of the singleton object (`StorageLevel(...)`). + * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants + * for commonly useful storage levels. To create your own storage level object, use the + * factory method of the singleton object (`StorageLevel(...)`). */ class StorageLevel private( private var useDisk_ : Boolean, diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index ad456ea565..f1d86c0221 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -79,10 +79,11 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { server.foreach(_.stop()) } - private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1") + private[spark] def appUIAddress = host + ":" + boundPort.getOrElse("-1") + } private[spark] object SparkUI { - val DEFAULT_PORT = "3030" + val DEFAULT_PORT = "4040" val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index ce1acf564c..5573b3847b 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -25,38 +25,45 @@ import org.apache.spark.SparkContext private[spark] object UIUtils { import Page._ + // Yarn has to go through a proxy so the base uri is provided and has to be on all links + private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")). + getOrElse("") + + def prependBaseUri(resource: String = "") = uiRoot + resource + /** Returns a spark page with correctly formatted headers */ def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value) : Seq[Node] = { val jobs = page match { - case Stages =>
  • Stages
  • - case _ =>
  • Stages
  • + case Stages =>
  • Stages
  • + case _ =>
  • Stages
  • } val storage = page match { - case Storage =>
  • Storage
  • - case _ =>
  • Storage
  • + case Storage =>
  • Storage
  • + case _ =>
  • Storage
  • } val environment = page match { - case Environment =>
  • Environment
  • - case _ =>
  • Environment
  • + case Environment => +
  • Environment
  • + case _ =>
  • Environment
  • } val executors = page match { - case Executors =>
  • Executors
  • - case _ =>
  • Executors
  • + case Executors =>
  • Executors
  • + case _ =>
  • Executors
  • } - - - + + + {sc.appName} - {title}