Format the code as coding style agreed by Matei/TD/Haoyuan
This commit is contained in:
parent
b72d93a0da
commit
651932e703
|
@ -5,10 +5,13 @@ import java.io._
|
|||
import scala.collection.mutable.Map
|
||||
|
||||
class Accumulator[T] (
|
||||
@transient initialValue: T, param: AccumulatorParam[T]) extends Serializable
|
||||
{
|
||||
@transient initialValue: T,
|
||||
param: AccumulatorParam[T]
|
||||
) extends Serializable {
|
||||
|
||||
val id = Accumulators.newId
|
||||
@transient var value_ = initialValue // Current value on master
|
||||
@transient
|
||||
var value_ = initialValue // Current value on master
|
||||
val zero = param.zero(initialValue) // Zero value to be passed to workers
|
||||
var deserialized = false
|
||||
|
||||
|
@ -39,14 +42,16 @@ trait AccumulatorParam[T] extends Serializable {
|
|||
|
||||
// TODO: The multi-thread support in accumulators is kind of lame; check
|
||||
// if there's a more intuitive way of doing it right
|
||||
private object Accumulators
|
||||
{
|
||||
private object Accumulators {
|
||||
// TODO: Use soft references? => need to make readObject work properly then
|
||||
val originals = Map[Long, Accumulator[_]]()
|
||||
val localAccums = Map[Thread, Map[Long, Accumulator[_]]]()
|
||||
var lastId: Long = 0
|
||||
|
||||
def newId: Long = synchronized { lastId += 1; return lastId }
|
||||
def newId: Long = synchronized {
|
||||
lastId += 1
|
||||
return lastId
|
||||
}
|
||||
|
||||
def register(a: Accumulator[_], original: Boolean): Unit = synchronized {
|
||||
if (original) {
|
||||
|
@ -65,8 +70,9 @@ private object Accumulators
|
|||
// Get the values of the local accumulators for the current thread (by ID)
|
||||
def values: Map[Long, Any] = synchronized {
|
||||
val ret = Map[Long, Any]()
|
||||
for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map()))
|
||||
for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
|
||||
ret(id) = accum.value
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ package spark
|
|||
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
|
||||
/**
|
||||
* An interface for caches in Spark, to allow for multiple implementations.
|
||||
* Caches are used to store both partitions of cached RDDs and broadcast
|
||||
|
@ -29,7 +28,6 @@ abstract class Cache {
|
|||
def put(key: Any, value: Any): Unit
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A key namespace in a Cache.
|
||||
*/
|
||||
|
|
|
@ -9,7 +9,6 @@ import org.objectweb.asm.{ClassReader, MethodVisitor, Type}
|
|||
import org.objectweb.asm.commons.EmptyVisitor
|
||||
import org.objectweb.asm.Opcodes._
|
||||
|
||||
|
||||
object ClosureCleaner extends Logging {
|
||||
// Get an ASM class reader for a given class from the JAR that loaded it
|
||||
private def getClassReader(cls: Class[_]): ClassReader = {
|
||||
|
@ -154,7 +153,6 @@ object ClosureCleaner extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor {
|
||||
override def visitMethod(access: Int, name: String, desc: String,
|
||||
sig: String, exceptions: Array[String]): MethodVisitor = {
|
||||
|
|
|
@ -4,18 +4,28 @@ import java.util.concurrent.LinkedBlockingQueue
|
|||
import java.util.concurrent.TimeUnit
|
||||
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
|
||||
|
||||
// A task created by the DAG scheduler. Knows its stage ID and map ouput tracker generation.
|
||||
/**
|
||||
* A task created by the DAG scheduler. Knows its stage ID and map ouput tracker generation.
|
||||
*/
|
||||
abstract class DAGTask[T](val stageId: Int) extends Task[T] {
|
||||
val gen = SparkEnv.get.mapOutputTracker.getGeneration
|
||||
override def generation: Option[Long] = Some(gen)
|
||||
}
|
||||
|
||||
// A completion event passed by the underlying task scheduler to the DAG scheduler
|
||||
case class CompletionEvent(task: DAGTask[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any])
|
||||
/**
|
||||
* A completion event passed by the underlying task scheduler to the DAG scheduler
|
||||
*/
|
||||
case class CompletionEvent(
|
||||
task: DAGTask[_],
|
||||
reason: TaskEndReason,
|
||||
result: Any,
|
||||
accumUpdates: Map[Long, Any])
|
||||
|
||||
// Various possible reasons why a DAG task ended. The underlying scheduler is supposed
|
||||
// to retry tasks several times for "ephemeral" failures, and only report back failures
|
||||
// that require some old stages to be resubmitted, such as shuffle map fetch failures.
|
||||
/**
|
||||
* Various possible reasons why a DAG task ended. The underlying scheduler is supposed to retry
|
||||
* tasks several times for "ephemeral" failures, and only report back failures that require some
|
||||
* old stages to be resubmitted, such as shuffle map fetch failures.
|
||||
*/
|
||||
sealed trait TaskEndReason
|
||||
case object Success extends TaskEndReason
|
||||
case class FetchFailed(serverUri: String, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
|
||||
|
@ -23,11 +33,10 @@ case class ExceptionFailure(exception: Throwable) extends TaskEndReason
|
|||
case class OtherFailure(message: String) extends TaskEndReason
|
||||
|
||||
/**
|
||||
* A Scheduler subclass that implements stage-oriented scheduling. It computes
|
||||
* a DAG of stages for each job, keeps track of which RDDs and stage outputs
|
||||
* are materialized, and computes a minimal schedule to run the job. Subclasses
|
||||
* only need to implement the code to send a task to the cluster and to report
|
||||
* fetch failures (the submitTasks method, and code to add CompletionEvents).
|
||||
* A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for
|
||||
* each job, keeps track of which RDDs and stage outputs are materialized, and computes a minimal
|
||||
* schedule to run the job. Subclasses only need to implement the code to send a task to the cluster
|
||||
* and to report fetch failures (the submitTasks method, and code to add CompletionEvents).
|
||||
*/
|
||||
private trait DAGScheduler extends Scheduler with Logging {
|
||||
// Must be implemented by subclasses to start running a set of tasks
|
||||
|
@ -39,16 +48,15 @@ private trait DAGScheduler extends Scheduler with Logging {
|
|||
completionEvents.put(CompletionEvent(dagTask, reason, result, accumUpdates))
|
||||
}
|
||||
|
||||
// The time, in millis, to wait for fetch failure events to stop coming in after
|
||||
// one is detected; this is a simplistic way to avoid resubmitting tasks in the
|
||||
// non-fetchable map stage one by one as more failure events come in
|
||||
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
|
||||
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
|
||||
// as more failure events come in
|
||||
val RESUBMIT_TIMEOUT = 2000L
|
||||
|
||||
// The time, in millis, to wake up between polls of the completion queue
|
||||
// in order to potentially resubmit failed stages
|
||||
// The time, in millis, to wake up between polls of the completion queue in order to potentially
|
||||
// resubmit failed stages
|
||||
val POLL_TIMEOUT = 500L
|
||||
|
||||
|
||||
private val completionEvents = new LinkedBlockingQueue[CompletionEvent]
|
||||
|
||||
var nextStageId = 0
|
||||
|
@ -110,10 +118,8 @@ private trait DAGScheduler extends Scheduler with Logging {
|
|||
cacheTracker.registerRDD(r.id, r.splits.size)
|
||||
for (dep <- r.dependencies) {
|
||||
dep match {
|
||||
case shufDep: ShuffleDependency[_,_,_] =>
|
||||
parents += getShuffleMapStage(shufDep)
|
||||
case _ =>
|
||||
visit(dep.rdd)
|
||||
case shufDep: ShuffleDependency[_,_,_] => parents += getShuffleMapStage(shufDep)
|
||||
case _ => visit(dep.rdd)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -135,10 +141,10 @@ private trait DAGScheduler extends Scheduler with Logging {
|
|||
dep match {
|
||||
case shufDep: ShuffleDependency[_,_,_] =>
|
||||
val stage = getShuffleMapStage(shufDep)
|
||||
if (!stage.isAvailable)
|
||||
if (!stage.isAvailable) {
|
||||
missing += stage
|
||||
case narrowDep: NarrowDependency[_] =>
|
||||
visit(narrowDep.rdd)
|
||||
}
|
||||
case narrowDep: NarrowDependency[_] => visit(narrowDep.rdd)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -149,10 +155,11 @@ private trait DAGScheduler extends Scheduler with Logging {
|
|||
missing.toList
|
||||
}
|
||||
|
||||
override def runJob[T, U](finalRdd: RDD[T], func: (TaskContext, Iterator[T]) => U,
|
||||
partitions: Seq[Int], allowLocal: Boolean)
|
||||
(implicit m: ClassManifest[U])
|
||||
: Array[U] = {
|
||||
override def runJob[T, U](finalRdd: RDD[T],
|
||||
func: (TaskContext, Iterator[T]) => U,
|
||||
partitions: Seq[Int],
|
||||
allowLocal: Boolean
|
||||
)(implicit m: ClassManifest[U]) : Array[U] = {
|
||||
val outputParts = partitions.toArray
|
||||
val numOutputParts: Int = partitions.size
|
||||
val finalStage = newStage(finalRdd, None)
|
||||
|
@ -189,8 +196,9 @@ private trait DAGScheduler extends Scheduler with Logging {
|
|||
submitMissingTasks(stage)
|
||||
running += stage
|
||||
} else {
|
||||
for (parent <- missing)
|
||||
for (parent <- missing) {
|
||||
submitStage(parent)
|
||||
}
|
||||
waiting += stage
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,8 +27,9 @@ class Executor extends org.apache.mesos.Executor with Logging {
|
|||
override def init(d: ExecutorDriver, args: ExecutorArgs) {
|
||||
// Read spark.* system properties from executor arg
|
||||
val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray)
|
||||
for ((key, value) <- props)
|
||||
for ((key, value) <- props) {
|
||||
System.setProperty(key, value)
|
||||
}
|
||||
|
||||
// Make sure an appropriate class loader is set for remote actors
|
||||
RemoteActor.classLoader = getClass.getClassLoader
|
||||
|
@ -45,7 +46,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
|
|||
|
||||
// Start worker thread pool
|
||||
threadPool = new ThreadPoolExecutor(
|
||||
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
|
||||
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
|
||||
}
|
||||
|
||||
override def launchTask(d: ExecutorDriver, task: TaskDescription) {
|
||||
|
@ -58,9 +59,9 @@ class Executor extends org.apache.mesos.Executor with Logging {
|
|||
val tid = desc.getTaskId.getValue
|
||||
logInfo("Running task ID " + tid)
|
||||
d.sendStatusUpdate(TaskStatus.newBuilder()
|
||||
.setTaskId(desc.getTaskId)
|
||||
.setState(TaskState.TASK_RUNNING)
|
||||
.build())
|
||||
.setTaskId(desc.getTaskId)
|
||||
.setState(TaskState.TASK_RUNNING)
|
||||
.build())
|
||||
try {
|
||||
SparkEnv.set(env)
|
||||
Thread.currentThread.setContextClassLoader(classLoader)
|
||||
|
@ -72,27 +73,27 @@ class Executor extends org.apache.mesos.Executor with Logging {
|
|||
val accumUpdates = Accumulators.values
|
||||
val result = new TaskResult(value, accumUpdates)
|
||||
d.sendStatusUpdate(TaskStatus.newBuilder()
|
||||
.setTaskId(desc.getTaskId)
|
||||
.setState(TaskState.TASK_FINISHED)
|
||||
.setData(ByteString.copyFrom(Utils.serialize(result)))
|
||||
.build())
|
||||
.setTaskId(desc.getTaskId)
|
||||
.setState(TaskState.TASK_FINISHED)
|
||||
.setData(ByteString.copyFrom(Utils.serialize(result)))
|
||||
.build())
|
||||
logInfo("Finished task ID " + tid)
|
||||
} catch {
|
||||
case ffe: FetchFailedException => {
|
||||
val reason = ffe.toTaskEndReason
|
||||
d.sendStatusUpdate(TaskStatus.newBuilder()
|
||||
.setTaskId(desc.getTaskId)
|
||||
.setState(TaskState.TASK_FAILED)
|
||||
.setData(ByteString.copyFrom(Utils.serialize(reason)))
|
||||
.build())
|
||||
.setTaskId(desc.getTaskId)
|
||||
.setState(TaskState.TASK_FAILED)
|
||||
.setData(ByteString.copyFrom(Utils.serialize(reason)))
|
||||
.build())
|
||||
}
|
||||
case t: Throwable => {
|
||||
val reason = ExceptionFailure(t)
|
||||
d.sendStatusUpdate(TaskStatus.newBuilder()
|
||||
.setTaskId(desc.getTaskId)
|
||||
.setState(TaskState.TASK_FAILED)
|
||||
.setData(ByteString.copyFrom(Utils.serialize(reason)))
|
||||
.build())
|
||||
.setTaskId(desc.getTaskId)
|
||||
.setState(TaskState.TASK_FAILED)
|
||||
.setData(ByteString.copyFrom(Utils.serialize(reason)))
|
||||
.build())
|
||||
|
||||
// TODO: Handle errors in tasks less dramatically
|
||||
logError("Exception in task ID " + tid, t)
|
||||
|
@ -102,8 +103,10 @@ class Executor extends org.apache.mesos.Executor with Logging {
|
|||
}
|
||||
}
|
||||
|
||||
// Create a ClassLoader for use in tasks, adding any JARs specified by the
|
||||
// user or any classes created by the interpreter to the search path
|
||||
/**
|
||||
* Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes
|
||||
* created by the interpreter to the search path
|
||||
*/
|
||||
private def createClassLoader(): ClassLoader = {
|
||||
var loader = this.getClass.getClassLoader
|
||||
|
||||
|
|
|
@ -12,9 +12,15 @@ import org.apache.hadoop.mapred.RecordReader
|
|||
import org.apache.hadoop.mapred.Reporter
|
||||
import org.apache.hadoop.util.ReflectionUtils
|
||||
|
||||
/** A Spark split class that wraps around a Hadoop InputSplit */
|
||||
class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
|
||||
extends Split with Serializable {
|
||||
/**
|
||||
* A Spark split class that wraps around a Hadoop InputSplit.
|
||||
*/
|
||||
class HadoopSplit(
|
||||
rddId: Int,
|
||||
idx: Int,
|
||||
@transient s: InputSplit
|
||||
) extends Split with Serializable {
|
||||
|
||||
val inputSplit = new SerializableWritable[InputSplit](s)
|
||||
|
||||
override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
|
||||
|
@ -22,10 +28,9 @@ extends Split with Serializable {
|
|||
override val index = idx
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in
|
||||
* HDFS, the local file system, or S3, tables in HBase, etc).
|
||||
* An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
|
||||
* system, or S3, tables in HBase, etc).
|
||||
*/
|
||||
class HadoopRDD[K, V](
|
||||
sc: SparkContext,
|
||||
|
@ -33,31 +38,37 @@ class HadoopRDD[K, V](
|
|||
inputFormatClass: Class[_ <: InputFormat[K, V]],
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int)
|
||||
extends RDD[(K, V)](sc) {
|
||||
minSplits: Int
|
||||
) extends RDD[(K, V)](sc) {
|
||||
|
||||
val serializableConf = new SerializableWritable(conf)
|
||||
|
||||
@transient val splits_ : Array[Split] = {
|
||||
@transient
|
||||
val splits_ : Array[Split] = {
|
||||
val inputFormat = createInputFormat(conf)
|
||||
val inputSplits = inputFormat.getSplits(conf, minSplits)
|
||||
val array = new Array[Split] (inputSplits.size)
|
||||
for (i <- 0 until inputSplits.size)
|
||||
val array = new Array[Split](inputSplits.size)
|
||||
for (i <- 0 until inputSplits.size) {
|
||||
array(i) = new HadoopSplit(id, i, inputSplits(i))
|
||||
}
|
||||
array
|
||||
}
|
||||
|
||||
def createInputFormat(conf: JobConf): InputFormat[K, V] = {
|
||||
ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
|
||||
.asInstanceOf[InputFormat[K, V]]
|
||||
.asInstanceOf[InputFormat[K, V]]
|
||||
}
|
||||
|
||||
// Helper method for creating a Hadoop Writable, because the commonly used
|
||||
// NullWritable class has no constructor
|
||||
/**
|
||||
* Helper method for creating a Hadoop Writable, because the commonly used NullWritable class has
|
||||
* no constructor.
|
||||
*/
|
||||
def createWritable[T](clazz: Class[T]): T = {
|
||||
if (clazz == classOf[NullWritable])
|
||||
if (clazz == classOf[NullWritable]) {
|
||||
NullWritable.get().asInstanceOf[T]
|
||||
else
|
||||
} else {
|
||||
clazz.newInstance()
|
||||
}
|
||||
}
|
||||
|
||||
override def splits = splits_
|
||||
|
@ -80,8 +91,7 @@ extends RDD[(K, V)](sc) {
|
|||
try {
|
||||
finished = !reader.next(key, value)
|
||||
} catch {
|
||||
case eofe: java.io.EOFException =>
|
||||
finished = true
|
||||
case eofe: java.io.EOFException => finished = true
|
||||
}
|
||||
gotNext = true
|
||||
}
|
||||
|
|
|
@ -16,11 +16,14 @@ import spark.SerializableWritable
|
|||
import spark.Logging
|
||||
|
||||
/**
|
||||
* Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should
|
||||
* also contain an output key class, an output value class, a filename to write to, etc
|
||||
* exactly like in a Hadoop job.
|
||||
* Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The
|
||||
* JobConf should also contain an output key class, an output value class, a
|
||||
* filename to write to, etc exactly like in a Hadoop job.
|
||||
*/
|
||||
class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable {
|
||||
class HadoopWriter(
|
||||
@transient jobConf: JobConf
|
||||
) extends Logging with Serializable {
|
||||
|
||||
private val now = new Date()
|
||||
private val conf = new SerializableWritable(jobConf)
|
||||
|
||||
|
@ -58,22 +61,25 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl
|
|||
val outputName = "part-" + numfmt.format(splitID)
|
||||
val path = FileOutputFormat.getOutputPath(conf.value)
|
||||
val fs: FileSystem = {
|
||||
if (path != null)
|
||||
if (path != null) {
|
||||
path.getFileSystem(conf.value)
|
||||
else
|
||||
} else {
|
||||
FileSystem.get(conf.value)
|
||||
}
|
||||
}
|
||||
|
||||
getOutputCommitter().setupTask(getTaskContext())
|
||||
writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
|
||||
writer = getOutputFormat().getRecordWriter(
|
||||
fs, conf.value, outputName, Reporter.NULL)
|
||||
}
|
||||
|
||||
def write(key: AnyRef, value: AnyRef) {
|
||||
if (writer!=null) {
|
||||
//println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
|
||||
writer.write(key, value)
|
||||
} else
|
||||
} else {
|
||||
throw new IOException("Writer is null, open() has not been called")
|
||||
}
|
||||
}
|
||||
|
||||
def close() {
|
||||
|
@ -109,26 +115,31 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl
|
|||
// ********* Private Functions *********
|
||||
|
||||
private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
|
||||
if (format == null)
|
||||
format = conf.value.getOutputFormat().asInstanceOf[OutputFormat[AnyRef,AnyRef]]
|
||||
if (format == null) {
|
||||
format = conf.value.getOutputFormat()
|
||||
.asInstanceOf[OutputFormat[AnyRef,AnyRef]]
|
||||
}
|
||||
return format
|
||||
}
|
||||
|
||||
private def getOutputCommitter(): OutputCommitter = {
|
||||
if (committer == null)
|
||||
if (committer == null) {
|
||||
committer = conf.value.getOutputCommitter().asInstanceOf[OutputCommitter]
|
||||
}
|
||||
return committer
|
||||
}
|
||||
|
||||
private def getJobContext(): JobContext = {
|
||||
if (jobContext == null)
|
||||
jobContext = new JobContext(conf.value, jID.value)
|
||||
if (jobContext == null) {
|
||||
jobContext = new JobContext(conf.value, jID.value)
|
||||
}
|
||||
return jobContext
|
||||
}
|
||||
|
||||
private def getTaskContext(): TaskAttemptContext = {
|
||||
if (taskContext == null)
|
||||
if (taskContext == null) {
|
||||
taskContext = new TaskAttemptContext(conf.value, taID.value)
|
||||
}
|
||||
return taskContext
|
||||
}
|
||||
|
||||
|
@ -158,12 +169,14 @@ object HadoopWriter {
|
|||
}
|
||||
|
||||
def createPathFromString(path: String, conf: JobConf): Path = {
|
||||
if (path == null)
|
||||
if (path == null) {
|
||||
throw new IllegalArgumentException("Output path is null")
|
||||
}
|
||||
var outputPath = new Path(path)
|
||||
val fs = outputPath.getFileSystem(conf)
|
||||
if (outputPath == null || fs == null)
|
||||
if (outputPath == null || fs == null) {
|
||||
throw new IllegalArgumentException("Incorrectly formatted output path")
|
||||
}
|
||||
outputPath = outputPath.makeQualified(fs)
|
||||
return outputPath
|
||||
}
|
||||
|
|
|
@ -9,18 +9,15 @@ import org.eclipse.jetty.server.handler.HandlerList
|
|||
import org.eclipse.jetty.server.handler.ResourceHandler
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool
|
||||
|
||||
|
||||
/**
|
||||
* Exception type thrown by HttpServer when it is in the wrong state
|
||||
* for an operation.
|
||||
* Exception type thrown by HttpServer when it is in the wrong state for an operation.
|
||||
*/
|
||||
class ServerStateException(message: String) extends Exception(message)
|
||||
|
||||
|
||||
/**
|
||||
* An HTTP server for static content used to allow worker nodes to access JARs
|
||||
* added to SparkContext as well as classes created by the interpreter when
|
||||
* the user types in code. This is just a wrapper around a Jetty server.
|
||||
* An HTTP server for static content used to allow worker nodes to access JARs added to SparkContext
|
||||
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
|
||||
* around a Jetty server.
|
||||
*/
|
||||
class HttpServer(resourceBase: File) extends Logging {
|
||||
private var server: Server = null
|
||||
|
|
|
@ -4,8 +4,8 @@ import org.apache.mesos._
|
|||
import org.apache.mesos.Protos._
|
||||
|
||||
/**
|
||||
* Class representing a parallel job in MesosScheduler. Schedules the
|
||||
* job by implementing various callbacks.
|
||||
* Class representing a parallel job in MesosScheduler. Schedules the job by implementing various
|
||||
* callbacks.
|
||||
*/
|
||||
abstract class Job(jobId: Int) {
|
||||
def slaveOffer(s: Offer, availableCpus: Double): Option[TaskDescription]
|
||||
|
|
|
@ -4,9 +4,9 @@ import java.util.concurrent.Executors
|
|||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
/**
|
||||
* A simple Scheduler implementation that runs tasks locally in a thread pool.
|
||||
* Optionally the scheduler also allows each task to fail up to maxFailures times,
|
||||
* which is useful for testing fault recovery.
|
||||
* A simple Scheduler implementation that runs tasks locally in a thread pool. Optionally the
|
||||
* scheduler also allows each task to fail up to maxFailures times, which is useful for testing
|
||||
* fault recovery.
|
||||
*/
|
||||
private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGScheduler with Logging {
|
||||
var attemptId = new AtomicInteger(0)
|
||||
|
@ -35,9 +35,8 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule
|
|||
// Set the Spark execution environment for the worker thread
|
||||
SparkEnv.set(env)
|
||||
try {
|
||||
// Serialize and deserialize the task so that accumulators are
|
||||
// changed to thread-local ones; this adds a bit of unnecessary
|
||||
// overhead but matches how the Mesos Executor works
|
||||
// Serialize and deserialize the task so that accumulators are changed to thread-local ones;
|
||||
// this adds a bit of unnecessary overhead but matches how the Mesos Executor works
|
||||
Accumulators.clear
|
||||
val bytes = Utils.serialize(task)
|
||||
logInfo("Size of task " + idInJob + " is " + bytes.size + " bytes")
|
||||
|
|
|
@ -19,13 +19,15 @@ import org.apache.mesos._
|
|||
import org.apache.mesos.Protos._
|
||||
|
||||
/**
|
||||
* The main Scheduler implementation, which runs jobs on Mesos. Clients should
|
||||
* first call start(), then submit tasks through the runTasks method.
|
||||
* The main Scheduler implementation, which runs jobs on Mesos. Clients should first call start(),
|
||||
* then submit tasks through the runTasks method.
|
||||
*/
|
||||
private class MesosScheduler(
|
||||
sc: SparkContext, master: String, frameworkName: String)
|
||||
extends MScheduler with DAGScheduler with Logging
|
||||
{
|
||||
sc: SparkContext,
|
||||
master: String,
|
||||
frameworkName: String
|
||||
)extends MScheduler with DAGScheduler with Logging {
|
||||
|
||||
// Environment variables to pass to our executors
|
||||
val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
|
||||
"SPARK_MEM",
|
||||
|
@ -36,14 +38,15 @@ extends MScheduler with DAGScheduler with Logging
|
|||
|
||||
// Memory used by each executor (in megabytes)
|
||||
val EXECUTOR_MEMORY = {
|
||||
if (System.getenv("SPARK_MEM") != null)
|
||||
if (System.getenv("SPARK_MEM") != null) {
|
||||
memoryStringToMb(System.getenv("SPARK_MEM"))
|
||||
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
|
||||
else
|
||||
} else {
|
||||
512
|
||||
}
|
||||
}
|
||||
|
||||
// Lock used to wait for scheduler to be registered
|
||||
// Lock used to wait for scheduler to be registered
|
||||
private var isRegistered = false
|
||||
private val registeredLock = new Object()
|
||||
|
||||
|
@ -92,13 +95,13 @@ extends MScheduler with DAGScheduler with Logging
|
|||
setDaemon(true)
|
||||
override def run {
|
||||
val sched = MesosScheduler.this
|
||||
driver = new MesosSchedulerDriver(sched, frameworkName, getExecutorInfo, master)
|
||||
driver = new MesosSchedulerDriver(
|
||||
sched, frameworkName, getExecutorInfo, master)
|
||||
try {
|
||||
val ret = driver.run()
|
||||
logInfo("driver.run() returned with code " + ret)
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logError("driver.run() failed", e)
|
||||
case e: Exception => logError("driver.run() failed", e)
|
||||
}
|
||||
}
|
||||
}.start
|
||||
|
@ -117,17 +120,16 @@ extends MScheduler with DAGScheduler with Logging
|
|||
for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
|
||||
if (System.getenv(key) != null) {
|
||||
params.addParam(Param.newBuilder()
|
||||
.setKey("env." + key)
|
||||
.setValue(System.getenv(key))
|
||||
.build())
|
||||
.setKey("env." + key)
|
||||
.setValue(System.getenv(key))
|
||||
.build())
|
||||
}
|
||||
}
|
||||
val memory = Resource.newBuilder()
|
||||
.setName("mem")
|
||||
.setType(Resource.Type.SCALAR)
|
||||
.setScalar(Resource.Scalar.newBuilder()
|
||||
.setValue(EXECUTOR_MEMORY).build())
|
||||
.build()
|
||||
.setName("mem")
|
||||
.setType(Resource.Type.SCALAR)
|
||||
.setScalar(Resource.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build())
|
||||
.build()
|
||||
ExecutorInfo.newBuilder()
|
||||
.setExecutorId(ExecutorID.newBuilder().setValue("default").build())
|
||||
.setUri(execScript)
|
||||
|
@ -178,9 +180,9 @@ extends MScheduler with DAGScheduler with Logging
|
|||
}
|
||||
|
||||
/**
|
||||
* Method called by Mesos to offer resources on slaves. We resond by asking
|
||||
* our active jobs for tasks in FIFO order. We fill each node with tasks in
|
||||
* a round-robin manner so that tasks are balanced across the cluster.
|
||||
* Method called by Mesos to offer resources on slaves. We resond by asking our active jobs for
|
||||
* tasks in FIFO order. We fill each node with tasks in a round-robin manner so that tasks are
|
||||
* balanced across the cluster.
|
||||
*/
|
||||
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
|
||||
synchronized {
|
||||
|
@ -238,7 +240,8 @@ extends MScheduler with DAGScheduler with Logging
|
|||
synchronized {
|
||||
try {
|
||||
val tid = status.getTaskId.getValue
|
||||
if (status.getState == TaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
|
||||
if (status.getState == TaskState.TASK_LOST
|
||||
&& taskIdToSlaveId.contains(tid)) {
|
||||
// We lost the executor on this slave, so remember that it's gone
|
||||
slavesWithExecutors -= taskIdToSlaveId(tid)
|
||||
}
|
||||
|
@ -249,8 +252,9 @@ extends MScheduler with DAGScheduler with Logging
|
|||
}
|
||||
if (isFinished(status.getState)) {
|
||||
taskIdToJobId.remove(tid)
|
||||
if (jobTasks.contains(jobId))
|
||||
if (jobTasks.contains(jobId)) {
|
||||
jobTasks(jobId) -= tid
|
||||
}
|
||||
taskIdToSlaveId.remove(tid)
|
||||
}
|
||||
case None =>
|
||||
|
@ -346,7 +350,10 @@ extends MScheduler with DAGScheduler with Logging
|
|||
return Utils.serialize(props.toArray)
|
||||
}
|
||||
|
||||
override def frameworkMessage(d: SchedulerDriver, s: SlaveID, e: ExecutorID, b: Array[Byte]) {}
|
||||
override def frameworkMessage(d: SchedulerDriver,
|
||||
s: SlaveID,
|
||||
e: ExecutorID,
|
||||
b: Array[Byte]) {}
|
||||
|
||||
override def slaveLost(d: SchedulerDriver, s: SlaveID) {
|
||||
slavesWithExecutors.remove(s.getValue)
|
||||
|
@ -361,15 +368,16 @@ extends MScheduler with DAGScheduler with Logging
|
|||
*/
|
||||
def memoryStringToMb(str: String): Int = {
|
||||
val lower = str.toLowerCase
|
||||
if (lower.endsWith("k"))
|
||||
if (lower.endsWith("k")) {
|
||||
(lower.substring(0, lower.length-1).toLong / 1024).toInt
|
||||
else if (lower.endsWith("m"))
|
||||
} else if (lower.endsWith("m")) {
|
||||
lower.substring(0, lower.length-1).toInt
|
||||
else if (lower.endsWith("g"))
|
||||
} else if (lower.endsWith("g")) {
|
||||
lower.substring(0, lower.length-1).toInt * 1024
|
||||
else if (lower.endsWith("t"))
|
||||
} else if (lower.endsWith("t")) {
|
||||
lower.substring(0, lower.length-1).toInt * 1024 * 1024
|
||||
else // no suffix, so it's just a number in bytes
|
||||
(lower.toLong / 1024 / 1024).toInt
|
||||
} else {// no suffix, so it's just a number in bytes
|
||||
(lower.toLong / 1024 / 1024).toInt
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,15 +4,17 @@ import scala.collection.immutable.NumericRange
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
class ParallelCollectionSplit[T: ClassManifest](
|
||||
val rddId: Long, val slice: Int, values: Seq[T])
|
||||
extends Split with Serializable {
|
||||
val rddId: Long,
|
||||
val slice: Int,
|
||||
values: Seq[T]
|
||||
) extends Split with Serializable {
|
||||
|
||||
def iterator(): Iterator[T] = values.iterator
|
||||
|
||||
override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt
|
||||
|
||||
override def equals(other: Any): Boolean = other match {
|
||||
case that: ParallelCollectionSplit[_] =>
|
||||
(this.rddId == that.rddId && this.slice == that.slice)
|
||||
case that: ParallelCollectionSplit[_] => (this.rddId == that.rddId && this.slice == that.slice)
|
||||
case _ => false
|
||||
}
|
||||
|
||||
|
@ -20,13 +22,16 @@ extends Split with Serializable {
|
|||
}
|
||||
|
||||
class ParallelCollection[T: ClassManifest](
|
||||
sc: SparkContext, @transient data: Seq[T], numSlices: Int)
|
||||
extends RDD[T](sc) {
|
||||
// TODO: Right now, each split sends along its full data, even if later down
|
||||
// the RDD chain it gets cached. It might be worthwhile to write the data to
|
||||
// a file in the DFS and read it in the split instead.
|
||||
sc: SparkContext,
|
||||
@transient data: Seq[T],
|
||||
numSlices: Int
|
||||
) extends RDD[T](sc) {
|
||||
// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
|
||||
// cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
|
||||
// instead.
|
||||
|
||||
@transient val splits_ = {
|
||||
@transient
|
||||
val splits_ = {
|
||||
val slices = ParallelCollection.slice(data, numSlices).toArray
|
||||
slices.indices.map(i => new ParallelCollectionSplit(id, i, slices(i))).toArray
|
||||
}
|
||||
|
@ -41,17 +46,24 @@ extends RDD[T](sc) {
|
|||
}
|
||||
|
||||
private object ParallelCollection {
|
||||
// Slice a collection into numSlices sub-collections. One extra thing we do here is
|
||||
// to treat Range collections specially, encoding the slices as other Ranges to
|
||||
// minimize memory cost. This makes it efficient to run Spark over RDDs representing
|
||||
// large sets of numbers.
|
||||
/**
|
||||
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
|
||||
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
|
||||
* it efficient to run Spark over RDDs representing large sets of numbers.
|
||||
*/
|
||||
def slice[T: ClassManifest](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
|
||||
if (numSlices < 1)
|
||||
if (numSlices < 1) {
|
||||
throw new IllegalArgumentException("Positive number of slices required")
|
||||
}
|
||||
seq match {
|
||||
case r: Range.Inclusive => {
|
||||
val sign = if (r.step < 0) -1 else 1
|
||||
slice(new Range(r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
|
||||
val sign = if (r.step < 0) {
|
||||
-1
|
||||
} else {
|
||||
1
|
||||
}
|
||||
slice(new Range(
|
||||
r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
|
||||
}
|
||||
case r: Range => {
|
||||
(0 until numSlices).map(i => {
|
||||
|
|
|
@ -27,24 +27,26 @@ import org.apache.hadoop.mapred.TextOutputFormat
|
|||
import SparkContext._
|
||||
|
||||
/**
|
||||
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents
|
||||
* an immutable, partitioned collection of elements that can be operated on in parallel.
|
||||
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
|
||||
* partitioned collection of elements that can be operated on in parallel.
|
||||
*
|
||||
* Each RDD is characterized by five main properties:
|
||||
* - A list of splits (partitions)
|
||||
* - A function for computing each split
|
||||
* - A list of dependencies on other RDDs
|
||||
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
|
||||
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for HDFS)
|
||||
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
|
||||
* HDFS)
|
||||
*
|
||||
* All the scheduling and execution in Spark is done based on these methods, allowing each
|
||||
* RDD to implement its own way of computing itself.
|
||||
* All the scheduling and execution in Spark is done based on these methods, allowing each RDD to
|
||||
* implement its own way of computing itself.
|
||||
*
|
||||
* This class also contains transformation methods available on all RDDs (e.g. map and filter).
|
||||
* In addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs,
|
||||
* and SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles.
|
||||
* This class also contains transformation methods available on all RDDs (e.g. map and filter). In
|
||||
* addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs, and
|
||||
* SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles.
|
||||
*/
|
||||
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serializable {
|
||||
|
||||
// Methods that must be implemented by subclasses
|
||||
def splits: Array[Split]
|
||||
def compute(split: Split): Iterator[T]
|
||||
|
@ -100,19 +102,16 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
|
|||
|
||||
if (initialCount > Integer.MAX_VALUE) {
|
||||
maxSelected = Integer.MAX_VALUE
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
maxSelected = initialCount.toInt
|
||||
}
|
||||
|
||||
if (num > initialCount) {
|
||||
total = maxSelected
|
||||
fraction = Math.min(multiplier*(maxSelected+1)/initialCount, 1.0)
|
||||
}
|
||||
else if (num < 0) {
|
||||
} else if (num < 0) {
|
||||
throw(new IllegalArgumentException("Negative number of elements requested"))
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
fraction = Math.min(multiplier*(num+1)/initialCount, 1.0)
|
||||
total = num.toInt
|
||||
}
|
||||
|
@ -134,22 +133,18 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
|
|||
|
||||
def glom(): RDD[Array[T]] = new GlommedRDD(this)
|
||||
|
||||
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
|
||||
new CartesianRDD(sc, this, other)
|
||||
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
|
||||
|
||||
def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
|
||||
val cleanF = sc.clean(f)
|
||||
this.map(t => (cleanF(t), t)).groupByKey(numSplits)
|
||||
}
|
||||
|
||||
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
|
||||
groupBy[K](f, sc.defaultParallelism)
|
||||
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism)
|
||||
|
||||
def pipe(command: String): RDD[String] =
|
||||
new PipedRDD(this, command)
|
||||
def pipe(command: String): RDD[String] = new PipedRDD(this, command)
|
||||
|
||||
def pipe(command: Seq[String]): RDD[String] =
|
||||
new PipedRDD(this, command)
|
||||
def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command)
|
||||
|
||||
def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] =
|
||||
new MapPartitionsRDD(this, sc.clean(f))
|
||||
|
@ -169,26 +164,29 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
|
|||
def reduce(f: (T, T) => T): T = {
|
||||
val cleanF = sc.clean(f)
|
||||
val reducePartition: Iterator[T] => Option[T] = iter => {
|
||||
if (iter.hasNext)
|
||||
if (iter.hasNext) {
|
||||
Some(iter.reduceLeft(cleanF))
|
||||
else
|
||||
}else {
|
||||
None
|
||||
}
|
||||
}
|
||||
val options = sc.runJob(this, reducePartition)
|
||||
val results = new ArrayBuffer[T]
|
||||
for (opt <- options; elem <- opt)
|
||||
for (opt <- options; elem <- opt) {
|
||||
results += elem
|
||||
if (results.size == 0)
|
||||
}
|
||||
if (results.size == 0) {
|
||||
throw new UnsupportedOperationException("empty collection")
|
||||
else
|
||||
} else {
|
||||
return results.reduceLeft(cleanF)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate the elements of each partition, and then the results for all the
|
||||
* partitions, using a given associative function and a neutral "zero value".
|
||||
* The function op(t1, t2) is allowed to modify t1 and return it as its result
|
||||
* value to avoid object allocation; however, it should not modify t2.
|
||||
* Aggregate the elements of each partition, and then the results for all the partitions, using a
|
||||
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
|
||||
* modify t1 and return it as its result value to avoid object allocation; however, it should not
|
||||
* modify t2.
|
||||
*/
|
||||
def fold(zeroValue: T)(op: (T, T) => T): T = {
|
||||
val cleanOp = sc.clean(op)
|
||||
|
@ -197,19 +195,20 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
|
|||
}
|
||||
|
||||
/**
|
||||
* Aggregate the elements of each partition, and then the results for all the
|
||||
* partitions, using given combine functions and a neutral "zero value". This
|
||||
* function can return a different result type, U, than the type of this RDD, T.
|
||||
* Thus, we need one operation for merging a T into an U and one operation for
|
||||
* merging two U's, as in scala.TraversableOnce. Both of these functions are
|
||||
* allowed to modify and return their first argument instead of creating a new U
|
||||
* to avoid memory allocation.
|
||||
* Aggregate the elements of each partition, and then the results for all the partitions, using
|
||||
* given combine functions and a neutral "zero value". This function can return a different result
|
||||
* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
|
||||
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
|
||||
* allowed to modify and return their first argument instead of creating a new U to avoid memory
|
||||
* allocation.
|
||||
*/
|
||||
def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
|
||||
def aggregate[U: ClassManifest](zeroValue: U)(
|
||||
seqOp: (U, T) => U,
|
||||
combOp: (U, U) => U): U = {
|
||||
val cleanSeqOp = sc.clean(seqOp)
|
||||
val cleanCombOp = sc.clean(combOp)
|
||||
val results = sc.runJob(this,
|
||||
(iter: Iterator[T]) => iter.aggregate(zeroValue)(cleanSeqOp, cleanCombOp))
|
||||
(iter: Iterator[T]) => iter.aggregate(zeroValue)(cleanSeqOp, cleanCombOp))
|
||||
return results.fold(zeroValue)(cleanCombOp)
|
||||
}
|
||||
|
||||
|
@ -226,12 +225,15 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
|
|||
|
||||
def toArray(): Array[T] = collect()
|
||||
|
||||
// Take the first num elements of the RDD. This currently scans the partitions
|
||||
// *one by one*, so it will be slow if a lot of partitions are required. In that
|
||||
// case, use collect() to get the whole RDD instead.
|
||||
/**
|
||||
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
|
||||
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
|
||||
* whole RDD instead.
|
||||
*/
|
||||
def take(num: Int): Array[T] = {
|
||||
if (num == 0)
|
||||
if (num == 0) {
|
||||
return new Array[T](0)
|
||||
}
|
||||
val buf = new ArrayBuffer[T]
|
||||
var p = 0
|
||||
while (buf.size < num && p < splits.size) {
|
||||
|
@ -251,48 +253,57 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
|
|||
}
|
||||
|
||||
def saveAsTextFile(path: String) {
|
||||
this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
|
||||
this.map(x => (NullWritable.get(), new Text(x.toString)))
|
||||
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
|
||||
}
|
||||
|
||||
def saveAsObjectFile(path: String) {
|
||||
this.glom.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)
|
||||
this.glom
|
||||
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
|
||||
.saveAsSequenceFile(path)
|
||||
}
|
||||
}
|
||||
|
||||
class MappedRDD[U: ClassManifest, T: ClassManifest](
|
||||
prev: RDD[T], f: T => U)
|
||||
extends RDD[U](prev.context) {
|
||||
prev: RDD[T],
|
||||
f: T => U
|
||||
) extends RDD[U](prev.context) {
|
||||
override def splits = prev.splits
|
||||
override val dependencies = List(new OneToOneDependency(prev))
|
||||
override def compute(split: Split) = prev.iterator(split).map(f)
|
||||
}
|
||||
|
||||
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
|
||||
prev: RDD[T], f: T => Traversable[U])
|
||||
extends RDD[U](prev.context) {
|
||||
prev: RDD[T],
|
||||
f: T => Traversable[U]
|
||||
) extends RDD[U](prev.context) {
|
||||
override def splits = prev.splits
|
||||
override val dependencies = List(new OneToOneDependency(prev))
|
||||
override def compute(split: Split) = prev.iterator(split).flatMap(f)
|
||||
}
|
||||
|
||||
class FilteredRDD[T: ClassManifest](
|
||||
prev: RDD[T], f: T => Boolean)
|
||||
extends RDD[T](prev.context) {
|
||||
prev: RDD[T],
|
||||
f: T => Boolean
|
||||
) extends RDD[T](prev.context) {
|
||||
override def splits = prev.splits
|
||||
override val dependencies = List(new OneToOneDependency(prev))
|
||||
override def compute(split: Split) = prev.iterator(split).filter(f)
|
||||
override def compute(split: Split) =
|
||||
prev.iterator(split).filter(f)
|
||||
}
|
||||
|
||||
class GlommedRDD[T: ClassManifest](prev: RDD[T])
|
||||
extends RDD[Array[T]](prev.context) {
|
||||
class GlommedRDD[T: ClassManifest](
|
||||
prev: RDD[T]
|
||||
) extends RDD[Array[T]](prev.context) {
|
||||
override def splits = prev.splits
|
||||
override val dependencies = List(new OneToOneDependency(prev))
|
||||
override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator
|
||||
}
|
||||
|
||||
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
|
||||
prev: RDD[T], f: Iterator[T] => Iterator[U])
|
||||
extends RDD[U](prev.context) {
|
||||
prev: RDD[T],
|
||||
f: Iterator[T] => Iterator[U]
|
||||
) extends RDD[U](prev.context) {
|
||||
override def splits = prev.splits
|
||||
override val dependencies = List(new OneToOneDependency(prev))
|
||||
override def compute(split: Split) = f(prev.iterator(split))
|
||||
|
|
|
@ -7,16 +7,24 @@ class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Seriali
|
|||
}
|
||||
|
||||
class SampledRDD[T: ClassManifest](
|
||||
prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int)
|
||||
extends RDD[T](prev.context) {
|
||||
prev: RDD[T],
|
||||
withReplacement: Boolean,
|
||||
frac: Double,
|
||||
seed: Int
|
||||
) extends RDD[T](prev.context) {
|
||||
|
||||
@transient val splits_ = { val rg = new Random(seed); prev.splits.map(x => new SampledRDDSplit(x, rg.nextInt)) }
|
||||
@transient
|
||||
val splits_ = {
|
||||
val rg = new Random(seed);
|
||||
prev.splits.map(x => new SampledRDDSplit(x, rg.nextInt))
|
||||
}
|
||||
|
||||
override def splits = splits_.asInstanceOf[Array[Split]]
|
||||
|
||||
override val dependencies = List(new OneToOneDependency(prev))
|
||||
|
||||
override def preferredLocations(split: Split) = prev.preferredLocations(split.asInstanceOf[SampledRDDSplit].prev)
|
||||
override def preferredLocations(split: Split) =
|
||||
prev.preferredLocations(split.asInstanceOf[SampledRDDSplit].prev)
|
||||
|
||||
override def compute(splitIn: Split) = {
|
||||
val split = splitIn.asInstanceOf[SampledRDDSplit]
|
||||
|
@ -25,11 +33,13 @@ extends RDD[T](prev.context) {
|
|||
if (withReplacement) {
|
||||
val oldData = prev.iterator(split.prev).toArray
|
||||
val sampleSize = (oldData.size * frac).ceil.toInt
|
||||
val sampledData = for (i <- 1 to sampleSize) yield oldData(rg.nextInt(oldData.size)) // all of oldData's indices are candidates, even if sampleSize < oldData.size
|
||||
val sampledData = {
|
||||
// all of oldData's indices are candidates, even if sampleSize < oldData.size
|
||||
for (i <- 1 to sampleSize)
|
||||
yield oldData(rg.nextInt(oldData.size))
|
||||
}
|
||||
sampledData.iterator
|
||||
}
|
||||
// Sampling without replacement
|
||||
else {
|
||||
} else { // Sampling without replacement
|
||||
prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,17 +1,24 @@
|
|||
package spark
|
||||
|
||||
// Scheduler trait, implemented by both MesosScheduler and LocalScheduler.
|
||||
/**
|
||||
* Scheduler trait, implemented by both MesosScheduler and LocalScheduler.
|
||||
*/
|
||||
private trait Scheduler {
|
||||
def start()
|
||||
|
||||
// Wait for registration with Mesos.
|
||||
def waitForRegister()
|
||||
|
||||
// Run a function on some partitions of an RDD, returning an array of results. The allowLocal flag specifies
|
||||
// whether the scheduler is allowed to run the job on the master machine rather than shipping it to the cluster,
|
||||
// for actions that create short jobs such as first() and take().
|
||||
def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,
|
||||
partitions: Seq[Int], allowLocal: Boolean): Array[U]
|
||||
/**
|
||||
* Run a function on some partitions of an RDD, returning an array of results. The allowLocal
|
||||
* flag specifies whether the scheduler is allowed to run the job on the master machine rather
|
||||
* than shipping it to the cluster, for actions that create short jobs such as first() and take().
|
||||
*/
|
||||
def runJob[T, U: ClassManifest](
|
||||
rdd: RDD[T],
|
||||
func: (TaskContext, Iterator[T]) => U,
|
||||
partitions: Seq[Int],
|
||||
allowLocal: Boolean): Array[U]
|
||||
|
||||
def stop()
|
||||
|
||||
|
|
|
@ -61,7 +61,8 @@ extends Job(jobId) with Logging
|
|||
var causeOfFailure = ""
|
||||
|
||||
// How frequently to reprint duplicate exceptions in full, in milliseconds
|
||||
val EXCEPTION_PRINT_INTERVAL = System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
|
||||
val EXCEPTION_PRINT_INTERVAL =
|
||||
System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
|
||||
// Map of recent exceptions (identified by string representation and
|
||||
// top stack frame) to duplicate count (how many times the same
|
||||
// exception has appeared) and time the full exception was
|
||||
|
@ -171,12 +172,12 @@ extends Job(jobId) with Logging
|
|||
logDebug("Serialized size: " + serializedTask.size)
|
||||
val taskName = "task %d:%d".format(jobId, index)
|
||||
return Some(TaskDescription.newBuilder()
|
||||
.setTaskId(taskId)
|
||||
.setSlaveId(offer.getSlaveId)
|
||||
.setName(taskName)
|
||||
.addResources(cpuRes)
|
||||
.setData(ByteString.copyFrom(serializedTask))
|
||||
.build())
|
||||
.setTaskId(taskId)
|
||||
.setSlaveId(offer.getSlaveId)
|
||||
.setName(taskName)
|
||||
.addResources(cpuRes)
|
||||
.setData(ByteString.copyFrom(serializedTask))
|
||||
.build())
|
||||
}
|
||||
case _ =>
|
||||
}
|
||||
|
|
|
@ -31,29 +31,32 @@ import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
|
|||
import spark.broadcast._
|
||||
|
||||
class SparkContext(
|
||||
master: String,
|
||||
frameworkName: String,
|
||||
val sparkHome: String = null,
|
||||
val jars: Seq[String] = Nil)
|
||||
extends Logging {
|
||||
master: String,
|
||||
frameworkName: String,
|
||||
val sparkHome: String = null,
|
||||
val jars: Seq[String] = Nil
|
||||
) extends Logging {
|
||||
// Ensure logging is initialized before we spawn any threads
|
||||
initLogging()
|
||||
|
||||
// Set Spark master host and port system properties
|
||||
if (System.getProperty("spark.master.host") == null)
|
||||
if (System.getProperty("spark.master.host") == null) {
|
||||
System.setProperty("spark.master.host", Utils.localHostName)
|
||||
if (System.getProperty("spark.master.port") == null)
|
||||
}
|
||||
if (System.getProperty("spark.master.port") == null) {
|
||||
System.setProperty("spark.master.port", "7077")
|
||||
}
|
||||
|
||||
// Make sure a proper class loader is set for remote actors (unless user set one)
|
||||
if (RemoteActor.classLoader == null)
|
||||
if (RemoteActor.classLoader == null) {
|
||||
RemoteActor.classLoader = getClass.getClassLoader
|
||||
}
|
||||
|
||||
// Create the Spark execution environment (cache, map output tracker, etc)
|
||||
val env = SparkEnv.createFromSystemProperties(true)
|
||||
SparkEnv.set(env)
|
||||
Broadcast.initialize(true)
|
||||
|
||||
|
||||
// Create and start the scheduler
|
||||
private var scheduler: Scheduler = {
|
||||
// Regular expression used for local[N] master format
|
||||
|
@ -61,10 +64,8 @@ extends Logging {
|
|||
// Regular expression for local[N, maxRetries], used in tests with failing tasks
|
||||
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
|
||||
master match {
|
||||
case "local" =>
|
||||
new LocalScheduler(1, 0)
|
||||
case LOCAL_N_REGEX(threads) =>
|
||||
new LocalScheduler(threads.toInt, 0)
|
||||
case "local" => new LocalScheduler(1, 0)
|
||||
case LOCAL_N_REGEX(threads) => new LocalScheduler(threads.toInt, 0)
|
||||
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
|
||||
new LocalScheduler(threads.toInt, maxFailures.toInt)
|
||||
case _ =>
|
||||
|
@ -78,11 +79,19 @@ extends Logging {
|
|||
|
||||
// Methods for creating RDDs
|
||||
|
||||
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] =
|
||||
def parallelize[T: ClassManifest](
|
||||
seq: Seq[T],
|
||||
numSlices: Int = defaultParallelism
|
||||
): RDD[T] = {
|
||||
new ParallelCollection[T](this, seq, numSlices)
|
||||
}
|
||||
|
||||
def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] =
|
||||
def makeRDD[T: ClassManifest](
|
||||
seq: Seq[T],
|
||||
numSlices: Int = defaultParallelism
|
||||
): RDD[T] = {
|
||||
parallelize(seq, numSlices)
|
||||
}
|
||||
|
||||
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
|
||||
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits)
|
||||
|
@ -90,26 +99,28 @@ extends Logging {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving
|
||||
* its InputFormat and any other necessary info (e.g. file name for a
|
||||
* filesystem-based dataset, table name for HyperTable, etc).
|
||||
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
|
||||
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
|
||||
* etc).
|
||||
*/
|
||||
def hadoopRDD[K, V](conf: JobConf,
|
||||
inputFormatClass: Class[_ <: InputFormat[K, V]],
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int = defaultMinSplits)
|
||||
: RDD[(K, V)] = {
|
||||
def hadoopRDD[K, V](
|
||||
conf: JobConf,
|
||||
inputFormatClass: Class[_ <: InputFormat[K, V]],
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int = defaultMinSplits
|
||||
): RDD[(K, V)] = {
|
||||
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
|
||||
}
|
||||
|
||||
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
|
||||
def hadoopFile[K, V](path: String,
|
||||
inputFormatClass: Class[_ <: InputFormat[K, V]],
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int = defaultMinSplits)
|
||||
: RDD[(K, V)] = {
|
||||
def hadoopFile[K, V](
|
||||
path: String,
|
||||
inputFormatClass: Class[_ <: InputFormat[K, V]],
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int = defaultMinSplits
|
||||
) : RDD[(K, V)] = {
|
||||
val conf = new JobConf()
|
||||
FileInputFormat.setInputPaths(conf, path)
|
||||
val bufferSize = System.getProperty("spark.buffer.size", "65536")
|
||||
|
@ -118,15 +129,17 @@ extends Logging {
|
|||
}
|
||||
|
||||
/**
|
||||
* Smarter version of hadoopFile() that uses class manifests to figure out
|
||||
* the classes of keys, values and the InputFormat so that users don't need
|
||||
* to pass them directly.
|
||||
* Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
|
||||
* values and the InputFormat so that users don't need to pass them directly.
|
||||
*/
|
||||
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
|
||||
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F])
|
||||
: RDD[(K, V)] = {
|
||||
hadoopFile(path, fm.erasure.asInstanceOf[Class[F]], km.erasure.asInstanceOf[Class[K]],
|
||||
vm.erasure.asInstanceOf[Class[V]], minSplits)
|
||||
hadoopFile(path,
|
||||
fm.erasure.asInstanceOf[Class[F]],
|
||||
km.erasure.asInstanceOf[Class[K]],
|
||||
vm.erasure.asInstanceOf[Class[V]],
|
||||
minSplits)
|
||||
}
|
||||
|
||||
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
|
||||
|
@ -136,31 +149,34 @@ extends Logging {
|
|||
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
|
||||
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
|
||||
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = {
|
||||
val job = new NewHadoopJob
|
||||
NewFileInputFormat.addInputPath(job, new Path(path))
|
||||
val conf = job.getConfiguration
|
||||
newAPIHadoopFile(path,
|
||||
fm.erasure.asInstanceOf[Class[F]],
|
||||
km.erasure.asInstanceOf[Class[K]],
|
||||
vm.erasure.asInstanceOf[Class[V]],
|
||||
conf)
|
||||
val job = new NewHadoopJob
|
||||
NewFileInputFormat.addInputPath(job, new Path(path))
|
||||
val conf = job.getConfiguration
|
||||
newAPIHadoopFile(path,
|
||||
fm.erasure.asInstanceOf[Class[F]],
|
||||
km.erasure.asInstanceOf[Class[K]],
|
||||
vm.erasure.asInstanceOf[Class[V]],
|
||||
conf)
|
||||
}
|
||||
|
||||
/** Get an RDD for a given Hadoop file with an arbitrary new API InputFormat and extra
|
||||
* configuration options to pass to the input format.
|
||||
/**
|
||||
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
|
||||
* and extra configuration options to pass to the input format.
|
||||
*/
|
||||
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String,
|
||||
fClass: Class[F],
|
||||
kClass: Class[K],
|
||||
vClass: Class[V],
|
||||
conf: Configuration): RDD[(K, V)] =
|
||||
new NewHadoopRDD(this, fClass, kClass, vClass, conf)
|
||||
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
|
||||
path: String,
|
||||
fClass: Class[F],
|
||||
kClass: Class[K],
|
||||
vClass: Class[V],
|
||||
conf: Configuration
|
||||
): RDD[(K, V)] = new NewHadoopRDD(this, fClass, kClass, vClass, conf)
|
||||
|
||||
/** Get an RDD for a Hadoop SequenceFile with given key and value types */
|
||||
def sequenceFile[K, V](path: String,
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int): RDD[(K, V)] = {
|
||||
keyClass: Class[K],
|
||||
valueClass: Class[V],
|
||||
minSplits: Int
|
||||
): RDD[(K, V)] = {
|
||||
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
|
||||
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
|
||||
}
|
||||
|
@ -169,41 +185,49 @@ extends Logging {
|
|||
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
|
||||
|
||||
/**
|
||||
* Version of sequenceFile() for types implicitly convertible to Writables through a WritableConverter.
|
||||
* Version of sequenceFile() for types implicitly convertible to Writables
|
||||
* through a WritableConverter.
|
||||
*
|
||||
* WritableConverters are provided in a somewhat strange way (by an implicit function) to support both
|
||||
* subclasses of Writable and types for which we define a converter (e.g. Int to IntWritable). The most
|
||||
* natural thing would've been to have implicit objects for the converters, but then we couldn't have
|
||||
* an object for every subclass of Writable (you can't have a parameterized singleton object). We use
|
||||
* functions instead to create a new converter for the appropriate type. In addition, we pass the converter
|
||||
* a ClassManifest of its type to allow it to figure out the Writable class to use in the subclass case.
|
||||
* WritableConverters are provided in a somewhat strange way (by an implicit
|
||||
* function) to support both subclasses of Writable and types for which we
|
||||
* define a converter (e.g. Int to IntWritable). The most natural thing
|
||||
* would've been to have implicit objects for the converters, but then we
|
||||
* couldn't have an object for every subclass of Writable (you can't have a
|
||||
* parameterized singleton object). We use functions instead to create a new
|
||||
* converter for the appropriate type. In addition, we pass the converter a
|
||||
* ClassManifest of its type to allow it to figure out the Writable class to
|
||||
* use in the subclass case.
|
||||
*/
|
||||
def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
|
||||
(implicit km: ClassManifest[K], vm: ClassManifest[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
|
||||
(implicit km: ClassManifest[K], vm: ClassManifest[V],
|
||||
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
|
||||
: RDD[(K, V)] = {
|
||||
val kc = kcf()
|
||||
val vc = vcf()
|
||||
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
|
||||
val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]],
|
||||
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
|
||||
val writables = hadoopFile(path, format,
|
||||
kc.writableClass(km).asInstanceOf[Class[Writable]],
|
||||
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
|
||||
writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys
|
||||
* and BytesWritable values that contain a serialized partition. This is still an experimental
|
||||
* storage format and may not be supported exactly as is in future Spark releases. It will also
|
||||
* be pretty slow if you use the default serializer (Java serialization), though the nice thing
|
||||
* about it is that there's very little effort required to save arbitrary objects.
|
||||
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
|
||||
* BytesWritable values that contain a serialized partition. This is still an experimental storage
|
||||
* format and may not be supported exactly as is in future Spark releases. It will also be pretty
|
||||
* slow if you use the default serializer (Java serialization), though the nice thing about it is
|
||||
* that there's very little effort required to save arbitrary objects.
|
||||
*/
|
||||
def objectFile[T: ClassManifest](path: String, minSplits: Int = defaultMinSplits): RDD[T] = {
|
||||
def objectFile[T: ClassManifest](
|
||||
path: String,
|
||||
minSplits: Int = defaultMinSplits
|
||||
): RDD[T] = {
|
||||
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
|
||||
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
|
||||
}
|
||||
|
||||
/** Build the union of a list of RDDs. */
|
||||
def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] =
|
||||
new UnionRDD(this, rdds)
|
||||
def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = new UnionRDD(this, rdds)
|
||||
|
||||
// Methods for creating shared variables
|
||||
|
||||
|
@ -211,18 +235,17 @@ extends Logging {
|
|||
new Accumulator(initialValue, param)
|
||||
|
||||
// Keep around a weak hash map of values to Cached versions?
|
||||
def broadcast[T](value: T) =
|
||||
Broadcast.getBroadcastFactory.newBroadcast[T] (value, isLocal)
|
||||
def broadcast[T](value: T) = Broadcast.getBroadcastFactory.newBroadcast[T] (value, isLocal)
|
||||
|
||||
// Stop the SparkContext
|
||||
def stop() {
|
||||
scheduler.stop()
|
||||
scheduler = null
|
||||
// TODO: Broadcast.stop(), Cache.stop()?
|
||||
env.mapOutputTracker.stop()
|
||||
env.cacheTracker.stop()
|
||||
env.shuffleFetcher.stop()
|
||||
SparkEnv.set(null)
|
||||
scheduler.stop()
|
||||
scheduler = null
|
||||
// TODO: Broadcast.stop(), Cache.stop()?
|
||||
env.mapOutputTracker.stop()
|
||||
env.cacheTracker.stop()
|
||||
env.shuffleFetcher.stop()
|
||||
SparkEnv.set(null)
|
||||
}
|
||||
|
||||
// Wait for the scheduler to be registered
|
||||
|
@ -234,25 +257,29 @@ extends Logging {
|
|||
// or the spark.home Java property, or the SPARK_HOME environment variable
|
||||
// (in that order of preference). If neither of these is set, return None.
|
||||
def getSparkHome(): Option[String] = {
|
||||
if (sparkHome != null)
|
||||
if (sparkHome != null) {
|
||||
Some(sparkHome)
|
||||
else if (System.getProperty("spark.home") != null)
|
||||
} else if (System.getProperty("spark.home") != null) {
|
||||
Some(System.getProperty("spark.home"))
|
||||
else if (System.getenv("SPARK_HOME") != null)
|
||||
} else if (System.getenv("SPARK_HOME") != null) {
|
||||
Some(System.getenv("SPARK_HOME"))
|
||||
else
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a function on a given set of partitions in an RDD and return the results.
|
||||
* This is the main entry point to the scheduler, by which all actions get launched.
|
||||
* The allowLocal flag specifies whether the scheduler can run the computation on the
|
||||
* master rather than shipping it out to the cluster, for short actions like first().
|
||||
* Run a function on a given set of partitions in an RDD and return the results. This is the main
|
||||
* entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies
|
||||
* whether the scheduler can run the computation on the master rather than shipping it out to the
|
||||
* cluster, for short actions like first().
|
||||
*/
|
||||
def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,
|
||||
partitions: Seq[Int], allowLocal: Boolean)
|
||||
: Array[U] = {
|
||||
def runJob[T, U: ClassManifest](
|
||||
rdd: RDD[T],
|
||||
func: (TaskContext, Iterator[T]) => U,
|
||||
partitions: Seq[Int],
|
||||
allowLocal: Boolean
|
||||
): Array[U] = {
|
||||
logInfo("Starting job...")
|
||||
val start = System.nanoTime
|
||||
val result = scheduler.runJob(rdd, func, partitions, allowLocal)
|
||||
|
@ -260,22 +287,23 @@ extends Logging {
|
|||
result
|
||||
}
|
||||
|
||||
def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int],
|
||||
allowLocal: Boolean)
|
||||
: Array[U] = {
|
||||
def runJob[T, U: ClassManifest](
|
||||
rdd: RDD[T],
|
||||
func: Iterator[T] => U,
|
||||
partitions: Seq[Int],
|
||||
allowLocal: Boolean
|
||||
): Array[U] = {
|
||||
runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a job on all partitions in an RDD and return the results in an array.
|
||||
*/
|
||||
def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U)
|
||||
: Array[U] = {
|
||||
def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
|
||||
runJob(rdd, func, 0 until rdd.splits.size, false)
|
||||
}
|
||||
|
||||
def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U)
|
||||
: Array[U] = {
|
||||
def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
|
||||
runJob(rdd, func, 0 until rdd.splits.size, false)
|
||||
}
|
||||
|
||||
|
@ -306,10 +334,9 @@ extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The SparkContext object contains a number of implicit conversions and
|
||||
* parameters for use with various Spark features.
|
||||
* The SparkContext object contains a number of implicit conversions and parameters for use with
|
||||
* various Spark features.
|
||||
*/
|
||||
object SparkContext {
|
||||
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
|
||||
|
@ -323,7 +350,6 @@ object SparkContext {
|
|||
}
|
||||
|
||||
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
|
||||
|
||||
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
|
||||
new PairRDDFunctions(rdd)
|
||||
|
||||
|
@ -346,13 +372,14 @@ object SparkContext {
|
|||
|
||||
implicit def stringToText(s: String) = new Text(s)
|
||||
|
||||
private implicit def arrayToArrayWritable[T <% Writable: ClassManifest] (arr: Traversable[T]): ArrayWritable = {
|
||||
private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = {
|
||||
def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
|
||||
val c = {
|
||||
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure))
|
||||
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
|
||||
classManifest[T].erasure
|
||||
else
|
||||
} else {
|
||||
implicitly[T => Writable].getClass.getMethods()(0).getReturnType
|
||||
}
|
||||
// TODO: use something like WritableConverter to avoid reflection
|
||||
}
|
||||
c.asInstanceOf[Class[ _ <: Writable]]
|
||||
|
@ -360,11 +387,11 @@ object SparkContext {
|
|||
|
||||
def anyToWritable[U <% Writable](u: U): Writable = u
|
||||
|
||||
new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], arr.map(x => anyToWritable(x)).toArray)
|
||||
new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]],
|
||||
arr.map(x => anyToWritable(x)).toArray)
|
||||
}
|
||||
|
||||
// Helper objects for converting common types to Writable
|
||||
|
||||
private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = {
|
||||
val wClass = classManifest[W].erasure.asInstanceOf[Class[W]]
|
||||
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
|
||||
|
|
|
@ -30,8 +30,10 @@ object SparkEnv {
|
|||
|
||||
val mapOutputTracker = new MapOutputTracker(isMaster)
|
||||
|
||||
val shuffleFetcherClass = System.getProperty("spark.shuffle.fetcher", "spark.SimpleShuffleFetcher")
|
||||
val shuffleFetcher = Class.forName(shuffleFetcherClass).newInstance().asInstanceOf[ShuffleFetcher]
|
||||
val shuffleFetcherClass =
|
||||
System.getProperty("spark.shuffle.fetcher", "spark.SimpleShuffleFetcher")
|
||||
val shuffleFetcher =
|
||||
Class.forName(shuffleFetcherClass).newInstance().asInstanceOf[ShuffleFetcher]
|
||||
|
||||
new SparkEnv(cache, serializer, cacheTracker, mapOutputTracker, shuffleFetcher)
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ package spark
|
|||
class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) extends Serializable
|
||||
|
||||
abstract class Task[T] extends Serializable {
|
||||
def run (id: Int): T
|
||||
def run(id: Int): T
|
||||
def preferredLocations: Seq[String] = Nil
|
||||
def generation: Option[Long] = None
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ object SparkKMeans {
|
|||
while(tempDist > convergeDist) {
|
||||
var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
|
||||
|
||||
var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1+y2)}
|
||||
var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
|
||||
|
||||
var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collect()
|
||||
|
||||
|
|
Loading…
Reference in a new issue