Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Ameet Talwalkar 2013-09-08 18:41:38 -07:00
commit bf280c8b0f
38 changed files with 786 additions and 261 deletions

View file

@ -31,7 +31,7 @@
# 1. To add a new sink, set the "class" option to a fully qualified class
# name (see examples below).
# 2. Some sinks involve a polling period. The minimum allowed polling period
# is 1 second.
# is 1 second.
# 3. Wild card properties can be overridden by more specific properties.
# For example, master.sink.console.period takes precedence over
# *.sink.console.period.
@ -47,11 +47,45 @@
# instance master and applications. MetricsServlet may not be configured by self.
#
## List of available sinks and their properties.
# org.apache.spark.metrics.sink.ConsoleSink
# Name: Default: Description:
# period 10 Poll period
# unit seconds Units of poll period
# org.apache.spark.metrics.sink.CSVSink
# Name: Default: Description:
# period 10 Poll period
# unit seconds Units of poll period
# directory /tmp Where to store CSV files
# org.apache.spark.metrics.sink.GangliaSink
# Name: Default: Description:
# host NONE Hostname or multicast group of Ganglia server
# port NONE Port of Ganglia server(s)
# period 10 Poll period
# unit seconds Units of poll period
# ttl 1 TTL of messages sent by Ganglia
# mode multicast Ganglia network mode ('unicast' or 'mulitcast')
# org.apache.spark.metrics.sink.JmxSink
# org.apache.spark.metrics.sink.MetricsServlet
# Name: Default: Description:
# path VARIES* Path prefix from the web server root
# sample false Whether to show entire set of samples for histograms ('false' or 'true')
#
# * Default path is /metrics/json for all instances except the master. The master has two paths:
# /metrics/aplications/json # App information
# /metrics/master/json # Master information
## Examples
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=spark.metrics.sink.JmxSink
#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
# Enable ConsoleSink for all instances by class name
#*.sink.console.class=spark.metrics.sink.ConsoleSink
#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
# Polling period for ConsoleSink
#*.sink.console.period=10
@ -64,7 +98,7 @@
#master.sink.console.unit=seconds
# Enable CsvSink for all instances
#*.sink.csv.class=spark.metrics.sink.CsvSink
#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
# Polling period for CsvSink
#*.sink.csv.period=1
@ -80,11 +114,11 @@
#worker.sink.csv.unit=minutes
# Enable jvm source for instance master, worker, driver and executor
#master.source.jvm.class=spark.metrics.source.JvmSource
#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
#worker.source.jvm.class=spark.metrics.source.JvmSource
#worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
#driver.source.jvm.class=spark.metrics.source.JvmSource
#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
#executor.source.jvm.class=spark.metrics.source.JvmSource
#executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

View file

@ -66,10 +66,12 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
try {
// If we got here, we have to load the split
val elements = new ArrayBuffer[Any]
logInfo("Computing partition " + split)
elements ++= rdd.computeOrReadCheckpoint(split, context)
// Try to put this block in the blockManager
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, true)
return elements.iterator.asInstanceOf[Iterator[T]]
} finally {

View file

@ -282,8 +282,8 @@ class SparkContext(
// Post init
taskScheduler.postStartHook()
val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)

View file

@ -24,6 +24,7 @@ class TaskContext(
val stageId: Int,
val splitId: Int,
val attemptId: Long,
val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty()
) extends Serializable {

View file

@ -98,7 +98,7 @@ private[spark] class Executor(
}
)
val executorSource = new ExecutorSource(this)
val executorSource = new ExecutorSource(this, executorId)
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)

View file

@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.metrics.source.Source
class ExecutorSource(val executor: Executor) extends Source {
class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source {
}
val metricRegistry = new MetricRegistry()
val sourceName = "executor"
// TODO: It would be nice to pass the application name here
val sourceName = "executor.%s".format(executorId)
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {

View file

@ -37,10 +37,9 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
prop.setProperty("*.sink.servlet.uri", "/metrics/json")
prop.setProperty("*.sink.servlet.sample", "false")
prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
prop.setProperty("*.sink.servlet.path", "/metrics/json")
prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}
def initialize() {

View file

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.metrics.sink
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.codahale.metrics.ganglia.GangliaReporter
import com.codahale.metrics.MetricRegistry
import info.ganglia.gmetric4j.gmetric.GMetric
import org.apache.spark.metrics.MetricsSystem
class GangliaSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val GANGLIA_KEY_PERIOD = "period"
val GANGLIA_DEFAULT_PERIOD = 10
val GANGLIA_KEY_UNIT = "unit"
val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS
val GANGLIA_KEY_MODE = "mode"
val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST
// TTL for multicast messages. If listeners are X hops away in network, must be at least X.
val GANGLIA_KEY_TTL = "ttl"
val GANGLIA_DEFAULT_TTL = 1
val GANGLIA_KEY_HOST = "host"
val GANGLIA_KEY_PORT = "port"
def propertyToOption(prop: String) = Option(property.getProperty(prop))
if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
throw new Exception("Ganglia sink requires 'host' property.")
}
if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) {
throw new Exception("Ganglia sink requires 'port' property.")
}
val host = propertyToOption(GANGLIA_KEY_HOST).get
val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
val mode = propertyToOption(GANGLIA_KEY_MODE)
.map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
.getOrElse(GANGLIA_DEFAULT_PERIOD)
val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase))
.getOrElse(GANGLIA_DEFAULT_UNIT)
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val ganglia = new GMetric(host, port, mode, ttl)
val reporter: GangliaReporter = GangliaReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build(ganglia)
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
}

View file

@ -31,18 +31,21 @@ import org.eclipse.jetty.server.Handler
import org.apache.spark.ui.JettyUtils
class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
val SERVLET_KEY_URI = "uri"
val SERVLET_KEY_PATH = "path"
val SERVLET_KEY_SAMPLE = "sample"
val servletURI = property.getProperty(SERVLET_KEY_URI)
val SERVLET_DEFAULT_SAMPLE = false
val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
val servletPath = property.getProperty(SERVLET_KEY_PATH)
val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean)
.getOrElse(SERVLET_DEFAULT_SAMPLE)
val mapper = new ObjectMapper().registerModule(
new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
def getHandlers = Array[(String, Handler)](
(servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
(servletPath, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
)
def getMetricsSnapshot(request: HttpServletRequest): String = {

View file

@ -478,7 +478,8 @@ class DAGScheduler(
SparkEnv.set(env)
val rdd = job.finalStage.rdd
val split = rdd.partitions(job.partitions(0))
val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0)
val taskContext =
new TaskContext(job.finalStage.id, job.partitions(0), 0, runningLocally = true)
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
job.listener.taskSucceeded(0, result)
@ -531,9 +532,16 @@ class DAGScheduler(
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
val properties = if (idToActiveJob.contains(stage.jobId)) {
idToActiveJob(stage.jobId).properties
} else {
//this stage will be assigned to "default" pool
null
}
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
val properties = idToActiveJob(stage.jobId).properties
listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))
if (tasks.size > 0) {

View file

@ -20,10 +20,12 @@ package org.apache.spark.scheduler
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
import org.apache.spark.SparkContext
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "DAGScheduler"
val sourceName = "%s.DAGScheduler".format(sc.appName)
metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size

View file

@ -77,7 +77,7 @@ private[spark] class ResultTask[T, U](
var func: (TaskContext, Iterator[T]) => U,
var partition: Int,
@transient locs: Seq[TaskLocation],
val outputId: Int)
var outputId: Int)
extends Task[U](stageId) with Externalizable {
def this() = this(0, null, null, 0, null, 0)
@ -93,7 +93,7 @@ private[spark] class ResultTask[T, U](
}
override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId)
val context = new TaskContext(stageId, partition, attemptId, runningLocally = false)
metrics = Some(context.taskMetrics)
try {
func(context, rdd.iterator(split, context))
@ -130,7 +130,7 @@ private[spark] class ResultTask[T, U](
rdd = rdd_.asInstanceOf[RDD[T]]
func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
partition = in.readInt()
val outputId = in.readInt()
outputId = in.readInt()
epoch = in.readLong()
split = in.readObject().asInstanceOf[Partition]
}

View file

@ -132,7 +132,7 @@ private[spark] class ShuffleMapTask(
override def run(attemptId: Long): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
val taskContext = new TaskContext(stageId, partition, attemptId)
val taskContext = new TaskContext(stageId, partition, attemptId, runningLocally = false)
metrics = Some(taskContext.taskMetrics)
val blockManager = SparkEnv.get.blockManager

View file

@ -20,11 +20,13 @@ package org.apache.spark.storage
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
import org.apache.spark.SparkContext
private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "BlockManager"
val sourceName = "%s.BlockManager".format(sc.appName)
metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = {

View file

@ -23,9 +23,9 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
* whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
* in a serialized format, and whether to replicate the RDD partitions on multiple nodes.
* The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants for
* commonly useful storage levels. To create your own storage level object, use the factor method
* of the singleton object (`StorageLevel(...)`).
* The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
* for commonly useful storage levels. To create your own storage level object, use the
* factory method of the singleton object (`StorageLevel(...)`).
*/
class StorageLevel private(
private var useDisk_ : Boolean,

View file

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.EasyMockSugar
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockManager, StorageLevel}
// TODO: Test the CacheManager's thread-safety aspects
class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar {
var sc : SparkContext = _
var blockManager: BlockManager = _
var cacheManager: CacheManager = _
var split: Partition = _
/** An RDD which returns the values [1, 2, 3, 4]. */
var rdd: RDD[Int] = _
before {
sc = new SparkContext("local", "test")
blockManager = mock[BlockManager]
cacheManager = new CacheManager(blockManager)
split = new Partition { override def index: Int = 0 }
rdd = new RDD[Int](sc, Nil) {
override def getPartitions: Array[Partition] = Array(split)
override val getDependencies = List[Dependency[_]]()
override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator
}
}
after {
sc.stop()
}
test("get uncached rdd") {
expecting {
blockManager.get("rdd_0_0").andReturn(None)
blockManager.put("rdd_0_0", ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY, true).
andReturn(0)
}
whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, runningLocally = false, null)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
}
test("get cached rdd") {
expecting {
blockManager.get("rdd_0_0").andReturn(Some(ArrayBuffer(5, 6, 7).iterator))
}
whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, runningLocally = false, null)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(5, 6, 7))
}
}
test("get uncached local rdd") {
expecting {
// Local computation should not persist the resulting value, so don't expect a put().
blockManager.get("rdd_0_0").andReturn(None)
}
whenExecuting(blockManager) {
val context = new TaskContext(0, 0, 0, runningLocally = true, null)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
}
}

View file

@ -495,7 +495,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
TaskContext context = new TaskContext(0, 0, 0, null);
TaskContext context = new TaskContext(0, 0, 0, false, null);
Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue());
}

View file

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import java.io.File
import java.util.Date
import net.liftweb.json.{JsonAST, JsonParser}
import net.liftweb.json.JsonAST.JValue
import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
import org.apache.spark.deploy.worker.ExecutorRunner
class JsonProtocolSuite extends FunSuite {
test("writeApplicationInfo") {
val output = JsonProtocol.writeApplicationInfo(createAppInfo())
assertValidJson(output)
}
test("writeWorkerInfo") {
val output = JsonProtocol.writeWorkerInfo(createWorkerInfo())
assertValidJson(output)
}
test("writeApplicationDescription") {
val output = JsonProtocol.writeApplicationDescription(createAppDesc())
assertValidJson(output)
}
test("writeExecutorRunner") {
val output = JsonProtocol.writeExecutorRunner(createExecutorRunner())
assertValidJson(output)
}
test("writeMasterState") {
val workers = Array[WorkerInfo](createWorkerInfo(), createWorkerInfo())
val activeApps = Array[ApplicationInfo](createAppInfo())
val completedApps = Array[ApplicationInfo]()
val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps)
val output = JsonProtocol.writeMasterState(stateResponse)
assertValidJson(output)
}
test("writeWorkerState") {
val executors = List[ExecutorRunner]()
val finishedExecutors = List[ExecutorRunner](createExecutorRunner(), createExecutorRunner())
val stateResponse = new WorkerStateResponse("host", 8080, "workerId", executors,
finishedExecutors, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl")
val output = JsonProtocol.writeWorkerState(stateResponse)
assertValidJson(output)
}
def createAppDesc() : ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
new ApplicationDescription("name", 4, 1234, cmd, "sparkHome", "appUiUrl")
}
def createAppInfo() : ApplicationInfo = {
new ApplicationInfo(3, "id", createAppDesc(), new Date(123456789), null, "appUriStr")
}
def createWorkerInfo() : WorkerInfo = {
new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
}
def createExecutorRunner() : ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
new File("sparkHome"), new File("workDir"))
}
def assertValidJson(json: JValue) {
try {
JsonParser.parse(JsonAST.compactRender(json))
} catch {
case e: JsonParser.ParseException => fail("Invalid Json detected", e)
}
}
}

View file

@ -30,14 +30,13 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
val conf = new MetricsConfig(Option("dummy-file"))
conf.initialize()
assert(conf.properties.size() === 5)
assert(conf.properties.size() === 4)
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
assert(property.size() === 3)
assert(property.size() === 2)
assert(property.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
assert(property.getProperty("sink.servlet.sample") === "false")
assert(property.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with properties set") {
@ -45,22 +44,20 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val masterProp = conf.getInstance("master")
assert(masterProp.size() === 6)
assert(masterProp.size() === 5)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
assert(masterProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
assert(masterProp.getProperty("sink.servlet.sample") === "false")
assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
val workerProp = conf.getInstance("worker")
assert(workerProp.size() === 6)
assert(workerProp.size() === 5)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
assert(workerProp.getProperty("sink.servlet.sample") === "false")
assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with subProperties") {
@ -84,6 +81,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
assert(consoleProps.size() === 2)
val servletProps = sinkProps("servlet")
assert(servletProps.size() === 3)
assert(servletProps.size() === 2)
}
}

View file

@ -97,7 +97,9 @@
<a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="configuration.html">Configuration</a></li>
<li><a href="monitoring.html">Monitoring</a></li>
<li><a href="tuning.html">Tuning Guide</a></li>
<li><a href="hadoop-third-party-distributions.html">Running with CDH/HDP</a></li>
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
<li><a href="building-with-maven.html">Building Spark with Maven</a></li>
<li><a href="contributing-to-spark.html">Contributing to Spark</a></li>

View file

@ -0,0 +1,76 @@
---
layout: global
title: Running with Cloudera and HortonWorks Distributions
---
Spark can run against all versions of Cloudera's Distribution Including Hadoop (CDH) and
the Hortonworks Data Platform (HDP). There are a few things to keep in mind when using Spark with
these distributions:
# Compile-time Hadoop Version
When compiling Spark, you'll need to
[set the HADOOP_VERSION flag](http://localhost:4000/index.html#a-note-about-hadoop-versions):
HADOOP_VERSION=1.0.4 sbt/sbt assembly
The table below lists the corresponding HADOOP_VERSION for each CDH/HDP release. Note that
some Hadoop releases are binary compatible across client versions. This means the pre-built Spark
distribution may "just work" without you needing to compile. That said, we recommend compiling with
the _exact_ Hadoop version you are running to avoid any compatibility errors.
<table>
<tr valign="top">
<td>
<h3>CDH Releases</h3>
<table class="table" style="width:350px;">
<tr><th>Version</th><th>HADOOP_VERSION</th></tr>
<tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-chd4.X.X</td></tr>
<tr><td>CDH 4.X.X</td><td>2.0.0-mr1-chd4.X.X</td></tr>
<tr><td>CDH 3u6</td><td>0.20.2-cdh3u6</td></tr>
<tr><td>CDH 3u5</td><td>0.20.2-cdh3u5</td></tr>
<tr><td>CDH 3u4</td><td>0.20.2-cdh3u4</td></tr>
</table>
</td>
<td>
<h3>HDP Releases</h3>
<table class="table" style="width:350px;">
<tr><th>Version</th><th>HADOOP_VERSION</th></tr>
<tr><td>HDP 1.3</td><td>1.2.0</td></tr>
<tr><td>HDP 1.2</td><td>1.1.2</td></tr>
<tr><td>HDP 1.1</td><td>1.0.3</td></tr>
<tr><td>HDP 1.0</td><td>1.0.3</td></tr>
</table>
</td>
</tr>
</table>
# Where to Run Spark
As described in the [Hardware Provisioning](hardware-provisioning.html#storage-systems) guide,
Spark can run in a variety of deployment modes:
* Using dedicated set of Spark nodes in your cluster. These nodes should be co-located with your
Hadoop installation.
* Running on the same nodes as an existing Hadoop installation, with a fixed amount memory and
cores dedicated to Spark on each node.
* Run Spark alongside Hadoop using a cluster resource manager, such as YARN or Mesos.
These options are identical for those using CDH and HDP.
# Inheriting Cluster Configuration
If you plan to read and write from HDFS using Spark, there are two Hadoop configuration files that
should be included on Spark's classpath:
* `hdfs-site.xml`, which provides default behaviors for the HDFS client.
* `core-site.xml`, which sets the default filesystem name.
The location of these configuration files varies across CDH and HDP versions, but
a common location is inside of `/etc/hadoop/conf`. Some tools, such as Cloudera Manager, create
configurations on-the-fly, but offer a mechanisms to download copies of them.
There are a few ways to make these files visible to Spark:
* You can copy these files into `$SPARK_HOME/conf` and they will be included in Spark's
classpath automatically.
* If you are running Spark on the same nodes as Hadoop _and_ your distribution includes both
`hdfs-site.xml` and `core-site.xml` in the same directory, you can set `HADOOP_CONF_DIR`
in `$SPARK_HOME/spark-env.sh` to that directory.

View file

@ -46,6 +46,11 @@ Spark supports several options for deployment:
* [Apache Mesos](running-on-mesos.html)
* [Hadoop YARN](running-on-yarn.html)
There is a script, `./make-distribution.sh`, which will create a binary distribution of Spark for deployment
to any machine with only the Java runtime as a necessary dependency.
Running the script creates a distribution directory in `dist/`, or the `-tgz` option to create a .tgz file.
Check the script for additional options.
# A Note About Hadoop Versions
Spark uses the Hadoop-client library to talk to HDFS and other Hadoop-supported

58
docs/monitoring.md Normal file
View file

@ -0,0 +1,58 @@
---
layout: global
title: Monitoring and Instrumentation
---
There are several ways to monitor the progress of Spark jobs.
# Web Interfaces
When a SparkContext is initialized, it launches a web server (by default at port 3030) which
displays useful information. This includes a list of active and completed scheduler stages,
a summary of RDD blocks and partitions, and environmental information. If multiple SparkContexts
are running on the same host, they will bind to succesive ports beginning with 3030 (3031, 3032,
etc).
Spark's Standlone Mode scheduler also has its own
[web interface](spark-standalone.html#monitoring-and-logging).
# Spark Metrics
Spark has a configurable metrics system based on the
[Coda Hale Metrics Library](http://metrics.codahale.com/).
This allows users to report Spark metrics to a variety of sinks including HTTP, JMX, and CSV
files. The metrics system is configured via a configuration file that Spark expects to be present
at `$SPARK_HOME/conf/metrics.conf`. A custom file location can be specified via the
`spark.metrics.conf` Java system property. Spark's metrics are decoupled into different
_instances_ corresponding to Spark components. Within each instance, you can configure a
set of sinks to which metrics are reported. The following instances are currently supported:
* `master`: The Spark standalone master process.
* `applications`: A component within the master which reports on various applications.
* `worker`: A Spark standalone worker process.
* `executor`: A Spark executor.
* `driver`: The Spark driver process (the process in which your SparkContext is created).
Each instance can report to zero or more _sinks_. Sinks are contained in the
`org.apache.spark.metrics.sink` package:
* `ConsoleSink`: Logs metrics information to the console.
* `CSVSink`: Exports metrics data to CSV files at regular intervals.
* `GangliaSink`: Sends metrics to a Ganglia node or multicast group.
* `JmxSink`: Registers metrics for viewing in a JXM console.
* `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.
The syntax of the metrics configuration file is defined in an example configuration file,
`$SPARK_HOME/conf/metrics.conf.template`.
# Advanced Instrumentation
Several external tools can be used to help profile the performance of Spark jobs:
* Cluster-wide monitoring tools, such as [Ganglia](http://ganglia.sourceforge.net/), can provide
insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia
dashboard can quickly reveal whether a particular workload is disk bound, network bound, or
CPU bound.
* OS profiling tools such as [dstat](http://dag.wieers.com/home-made/dstat/),
[iostat](http://linux.die.net/man/1/iostat), and [iotop](http://linux.die.net/man/1/iotop)
can provide fine-grained profiling on individual nodes.
* JVM utilities such as `jstack` for providing stack traces, `jmap` for creating heap-dumps,
`jstat` for reporting time-series statistics and `jconsole` for visually exploring various JVM
properties are useful for those comfortable with JVM internals.

View file

@ -42,7 +42,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t
The command to launch the YARN Client is as follows:
SPARK_JAR=<SPARK_YARN_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \
SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \
--jar <YOUR_APP_JAR_FILE> \
--class <APP_MAIN_CLASS> \
--args <APP_MAIN_ARGUMENTS> \
@ -54,14 +54,27 @@ The command to launch the YARN Client is as follows:
For example:
SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./spark-class org.apache.spark.deploy.yarn.Client \
--jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \
--class org.apache.spark.examples.SparkPi \
--args yarn-standalone \
--num-workers 3 \
--master-memory 4g \
--worker-memory 2g \
--worker-cores 1
# Build the Spark assembly JAR and the Spark examples JAR
$ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true ./sbt/sbt assembly
# Configure logging
$ cp conf/log4j.properties.template conf/log4j.properties
# Submit Spark's ApplicationMaster to YARN's ResourceManager, and instruct Spark to run the SparkPi example
$ SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \
./spark-class org.apache.spark.deploy.yarn.Client \
--jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
--class org.apache.spark.examples.SparkPi \
--args yarn-standalone \
--num-workers 3 \
--master-memory 4g \
--worker-memory 2g \
--worker-cores 1
# Examine the output (replace $YARN_APP_ID in the following with the "application identifier" output by the previous command)
# (Note: YARN_APP_LOGS_DIR is usually /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version.)
$ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout
Pi is roughly 3.13794
The above starts a YARN Client programs which periodically polls the Application Master for status updates and displays them in the console. The client will exit once your application has finished running.

View file

@ -3,13 +3,21 @@ layout: global
title: Spark Standalone Mode
---
In addition to running on the Mesos or YARN cluster managers, Spark also provides a simple standalone deploy mode. You can launch a standalone cluster either manually, by starting a master and workers by hand, or use our provided [deploy scripts](#cluster-launch-scripts). It is also possible to run these daemons on a single machine for testing.
In addition to running on the Mesos or YARN cluster managers, Spark also provides a simple standalone deploy mode. You can launch a standalone cluster either manually, by starting a master and workers by hand, or use our provided [launch scripts](#cluster-launch-scripts). It is also possible to run these daemons on a single machine for testing.
# Installing Spark Standalone to a Cluster
The easiest way to deploy Spark is by running the `./make-distribution.sh` script to create a binary distribution.
This distribution can be deployed to any machine with the Java runtime installed; there is no need to install Scala.
The recommended procedure is to deploy and start the master on one node first, get the master spark URL,
then modify `conf/spark-env.sh` in the `dist/` directory before deploying to all the other nodes.
# Starting a Cluster Manually
You can start a standalone master server by executing:
./spark-class org.apache.spark.deploy.master.Master
./bin/start-master.sh
Once started, the master will print out a `spark://HOST:PORT` URL for itself, which you can use to connect workers to it,
or pass as the "master" argument to `SparkContext`. You can also find this URL on
@ -22,7 +30,7 @@ Similarly, you can start one or more workers and connect them to the master via:
Once you have started a worker, look at the master's web UI ([http://localhost:8080](http://localhost:8080) by default).
You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS).
Finally, the following configuration options can be passed to the master and worker:
Finally, the following configuration options can be passed to the master and worker:
<table class="table">
<tr><th style="width:21%">Argument</th><th>Meaning</th></tr>
@ -55,7 +63,7 @@ Finally, the following configuration options can be passed to the master and wor
# Cluster Launch Scripts
To launch a Spark standalone cluster with the deploy scripts, you need to create a file called `conf/slaves` in your Spark directory, which should contain the hostnames of all the machines where you would like to start Spark workers, one per line. The master machine must be able to access each of the slave machines via password-less `ssh` (using a private key). For testing, you can just put `localhost` in this file.
To launch a Spark standalone cluster with the launch scripts, you need to create a file called `conf/slaves` in your Spark directory, which should contain the hostnames of all the machines where you would like to start Spark workers, one per line. The master machine must be able to access each of the slave machines via password-less `ssh` (using a private key). For testing, you can just put `localhost` in this file.
Once you've set up this file, you can launch or stop your cluster with the following shell scripts, based on Hadoop's deploy scripts, and available in `SPARK_HOME/bin`:
@ -134,6 +142,10 @@ To run an interactive Spark shell against the cluster, run the following command
MASTER=spark://IP:PORT ./spark-shell
Note that if you are running spark-shell from one of the spark cluster machines, the `spark-shell` script will
automatically set MASTER from the `SPARK_MASTER_IP` and `SPARK_MASTER_PORT` variables in `conf/spark-env.sh`.
You can also pass an option `-c <numCores>` to control the number of cores that spark-shell uses on the cluster.
# Job Scheduling

View file

@ -1,5 +1,22 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# These variables are automatically filled in by the spark-ec2 script.
export MASTERS="{{master_list}}"
export SLAVES="{{slave_list}}"

View file

@ -127,20 +127,6 @@
</dependency>
</dependencies>
<profiles>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>

218
pom.xml
View file

@ -368,6 +368,99 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Specify Avro version because Kafka also has it as a dependency -->
<dependency>
<groupId>org.apache.avro</groupId>
@ -620,131 +713,6 @@
<dependencyManagement>
<dependencies>
<!-- TODO: check versions, bringover from yarn branch ! -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
</profile>

View file

@ -33,15 +33,19 @@ object SparkBuild extends Build {
// HBase version; set as appropriate.
val HBASE_VERSION = "0.94.6"
// Target JVM version
val SCALAC_JVM_VERSION = "jvm-1.5"
val JAVAC_JVM_VERSION = "1.5"
lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects: _*)
lazy val core = Project("core", file("core"), settings = coreSettings)
lazy val repl = Project("repl", file("repl"), settings = replSettings)
.dependsOn(core, bagel, mllib) dependsOn(maybeYarn: _*)
.dependsOn(core, bagel, mllib)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
.dependsOn(core, mllib, bagel, streaming) dependsOn(maybeYarn: _*)
.dependsOn(core, mllib, bagel, streaming)
lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
@ -77,7 +81,9 @@ object SparkBuild extends Build {
organization := "org.apache.spark",
version := "0.8.0-SNAPSHOT",
scalaVersion := "2.9.3",
scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"),
scalacOptions := Seq("-unchecked", "-optimize", "-deprecation",
"-target:" + SCALAC_JVM_VERSION),
javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
@ -207,6 +213,7 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",
"com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
)

View file

@ -1,7 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import sbt._
object SparkPluginDef extends Build {
lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener)
/* This is not published in a Maven repository, so we get it from GitHub directly */
lazy val junitXmlListener = uri("git://github.com/ijuma/junit_xml_listener.git#fe434773255b451a38e8d889536ebc260f4225ce")
}
}

View file

@ -30,6 +30,8 @@ Public classes:
An "add-only" shared variable that tasks can only add values to.
- L{SparkFiles<pyspark.files.SparkFiles>}
Access files shipped with jobs.
- L{StorageLevel<pyspark.storagelevel.StorageLevel>}
Finer-grained cache persistence levels.
"""
import sys
import os
@ -39,6 +41,7 @@ sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.eg
from pyspark.context import SparkContext
from pyspark.rdd import RDD
from pyspark.files import SparkFiles
from pyspark.storagelevel import StorageLevel
__all__ = ["SparkContext", "RDD", "SparkFiles"]
__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"]

View file

@ -27,6 +27,7 @@ from pyspark.broadcast import Broadcast
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import dump_pickle, write_with_length, batched
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from py4j.java_collections import ListConverter
@ -279,6 +280,16 @@ class SparkContext(object):
"""
self._jsc.sc().setCheckpointDir(dirName, useExisting)
def _getJavaStorageLevel(self, storageLevel):
"""
Returns a Java StorageLevel based on a pyspark.StorageLevel.
"""
if not isinstance(storageLevel, StorageLevel):
raise Exception("storageLevel must be of type pyspark.StorageLevel")
newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory,
storageLevel.deserialized, storageLevel.replication)
def _test():
import atexit

View file

@ -70,6 +70,25 @@ class RDD(object):
self._jrdd.cache()
return self
def persist(self, storageLevel):
"""
Set this RDD's storage level to persist its values across operations after the first time
it is computed. This can only be used to assign a new storage level if the RDD does not
have a storage level set yet.
"""
self.is_cached = True
javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
self._jrdd.persist(javaStorageLevel)
return self
def unpersist(self):
"""
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
"""
self.is_cached = False
self._jrdd.unpersist()
return self
def checkpoint(self):
"""
Mark this RDD for checkpointing. It will be saved to a file inside the

View file

@ -24,6 +24,7 @@ import os
import platform
import pyspark
from pyspark.context import SparkContext
from pyspark.storagelevel import StorageLevel
# this is the equivalent of ADD_JARS
add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None

View file

@ -0,0 +1,43 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
__all__ = ["StorageLevel"]
class StorageLevel:
"""
Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
in a serialized format, and whether to replicate the RDD partitions on multiple nodes.
Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY.
"""
def __init__(self, useDisk, useMemory, deserialized, replication = 1):
self.useDisk = useDisk
self.useMemory = useMemory
self.deserialized = deserialized
self.replication = replication
StorageLevel.DISK_ONLY = StorageLevel(True, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2)
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)
StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, True, 2)
StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False)
StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, 2)
StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, True)
StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, True, 2)
StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False)
StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, 2)

View file

@ -105,16 +105,6 @@
</build>
<profiles>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>deb</id>
<build>

View file

@ -131,16 +131,4 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View file

@ -30,6 +30,34 @@
<name>Spark Project YARN Support</name>
<url>http://spark.incubator.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
@ -75,37 +103,4 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>