Merge remote-tracking branch 'apache/master' into error-handling

This commit is contained in:
Tathagata Das 2014-01-11 23:40:57 -08:00
commit 18f4889d96
30 changed files with 1350 additions and 177 deletions

View file

@ -17,7 +17,7 @@
package org.apache.spark
import org.apache.spark.util.AppendOnlyMap
import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
/**
* A set of functions used to aggregate data.
@ -31,30 +31,51 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
val combiners = new AppendOnlyMap[K, C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (iter.hasNext) {
kv = iter.next()
combiners.changeValue(kv._1, update)
}
combiners.iterator
} else {
val combiners =
new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
combiners.iterator
}
while (iter.hasNext) {
kv = iter.next()
combiners.changeValue(kv._1, update)
}
combiners.iterator
}
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
val combiners = new AppendOnlyMap[K, C]
var kc: (K, C) = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
}
while (iter.hasNext) {
kc = iter.next()
combiners.changeValue(kc._1, update)
}
combiners.iterator
} else {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
while (iter.hasNext) {
val (k, c) = iter.next()
combiners.insert(k, c)
}
combiners.iterator
}
while (iter.hasNext) {
kc = iter.next()
combiners.changeValue(kc._1, update)
}
combiners.iterator
}
}

View file

@ -345,29 +345,42 @@ class SparkContext(
}
/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
* using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* @param conf JobConf for setting up the dataset
* @param inputFormatClass Class of the [[InputFormat]]
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minSplits Minimum number of Hadoop Splits to generate.
* @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
* Most RecordReader implementations reuse wrapper objects across multiple
* records, and can cause problems in RDD collect or aggregation operations.
* By default the records are cloned in Spark. However, application
* programmers can explicitly disable the cloning for better performance.
*/
def hadoopRDD[K, V](
def hadoopRDD[K: ClassTag, V: ClassTag](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
minSplits: Int = defaultMinSplits,
cloneRecords: Boolean = true
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
def hadoopFile[K, V](
def hadoopFile[K: ClassTag, V: ClassTag](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
minSplits: Int = defaultMinSplits,
cloneRecords: Boolean = true
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@ -379,7 +392,8 @@ class SparkContext(
inputFormatClass,
keyClass,
valueClass,
minSplits)
minSplits,
cloneRecords)
}
/**
@ -390,14 +404,15 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
* }}}
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
: RDD[(K, V)] = {
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minSplits: Int, cloneRecords: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
minSplits)
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
minSplits,
cloneRecords)
}
/**
@ -408,61 +423,67 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
* }}}
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinSplits)
hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String, cloneRecords: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile(
path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]])
path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
cloneRecords = cloneRecords)
}
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
conf: Configuration = hadoopConfiguration,
cloneRecords: Boolean = true): RDD[(K, V)] = {
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords)
}
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): RDD[(K, V)] = {
new NewHadoopRDD(this, fClass, kClass, vClass, conf)
vClass: Class[V],
cloneRecords: Boolean = true): RDD[(K, V)] = {
new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
def sequenceFile[K, V](path: String,
def sequenceFile[K: ClassTag, V: ClassTag](path: String,
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int
minSplits: Int,
cloneRecords: Boolean = true
): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
cloneRecords: Boolean = true): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords)
/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
@ -480,17 +501,18 @@ class SparkContext(
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
def sequenceFile[K, V]
(path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
val kc = kcf()
val vc = vcf()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords)
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}
/**

View file

@ -54,7 +54,11 @@ class SparkEnv private[spark] (
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val conf: SparkConf) {
val conf: SparkConf) extends Logging {
// A mapping of thread ID to amount of memory used for shuffle in bytes
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

View file

@ -279,6 +279,11 @@ private[spark] class Executor(
//System.exit(1)
}
} finally {
// TODO: Unregister shuffle memory only for ShuffleMapTask
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
runningTasks.remove(taskId)
}
}

View file

@ -23,8 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.util.AppendOnlyMap
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
private[spark] sealed trait CoGroupSplitDep extends Serializable
@ -44,14 +43,12 @@ private[spark] case class NarrowCoGroupSplitDep(
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
private[spark]
class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
extends Partition with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
}
/**
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
* tuple with the list of values for that key.
@ -62,6 +59,14 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
// For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
// Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
// CoGroupValue is the intermediate state of each value before being merged in compute.
private type CoGroup = ArrayBuffer[Any]
private type CoGroupValue = (Any, Int) // Int is dependency number
private type CoGroupCombiner = Seq[CoGroup]
private val sparkConf = SparkEnv.get.conf
private var serializerClass: String = null
def setSerializer(cls: String): CoGroupedRDD[K] = {
@ -100,37 +105,74 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
// e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => {
if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
}
val getSeq = (k: K) => {
map.changeValue(k, update)
}
val ser = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv =>
getSeq(kv._1)(depNum) += kv._2
}
val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]]
rddIterators += ((it, depNum))
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach {
kv => getSeq(kv._1)(depNum) += kv._2
}
val ser = SparkEnv.get.serializerManager.get(serializerClass, sparkConf)
val it = fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser)
rddIterators += ((it, depNum))
}
}
new InterruptibleIterator(context, map.iterator)
if (!externalSorting) {
val map = new AppendOnlyMap[K, CoGroupCombiner]
val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup)
}
val getCombiner: K => CoGroupCombiner = key => {
map.changeValue(key, update)
}
rddIterators.foreach { case (it, depNum) =>
while (it.hasNext) {
val kv = it.next()
getCombiner(kv._1)(depNum) += kv._2
}
}
new InterruptibleIterator(context, map.iterator)
} else {
val map = createExternalMap(numRdds)
rddIterators.foreach { case (it, depNum) =>
while (it.hasNext) {
val kv = it.next()
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
}
new InterruptibleIterator(context, map.iterator)
}
}
private def createExternalMap(numRdds: Int)
: ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
val newCombiner = Array.fill(numRdds)(new CoGroup)
value match { case (v, depNum) => newCombiner(depNum) += v }
newCombiner
}
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
(combiner, value) => {
value match { case (v, depNum) => combiner(depNum) += v }
combiner
}
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
(combiner1, combiner2) => {
combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 }
}
new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
createCombiner, mergeValue, mergeCombiners)
}
override def clearDependencies() {

View file

@ -19,7 +19,10 @@ package org.apache.spark.rdd
import java.io.EOFException
import org.apache.hadoop.mapred.FileInputFormat
import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
@ -31,7 +34,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.spark.util.Utils.cloneWritables
/**
@ -42,14 +45,14 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
val inputSplit = new SerializableWritable[InputSplit](s)
override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
override def hashCode(): Int = 41 * (41 + rddId) + idx
override val index: Int = idx
}
/**
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
* sources in HBase, or S3).
* sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
@ -61,15 +64,21 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
* @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
* Most RecordReader implementations reuse wrapper objects across multiple
* records, and can cause problems in RDD collect or aggregation operations.
* By default the records are cloned in Spark. However, application
* programmers can explicitly disable the cloning for better performance.
*/
class HadoopRDD[K, V](
class HadoopRDD[K: ClassTag, V: ClassTag](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
minSplits: Int,
cloneRecords: Boolean)
extends RDD[(K, V)](sc, Nil) with Logging {
def this(
@ -78,7 +87,8 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int) = {
minSplits: Int,
cloneRecords: Boolean) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
@ -87,7 +97,8 @@ class HadoopRDD[K, V](
inputFormatClass,
keyClass,
valueClass,
minSplits)
minSplits,
cloneRecords)
}
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@ -158,10 +169,10 @@ class HadoopRDD[K, V](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val keyCloneFunc = cloneWritables[K](jobConf)
val value: V = reader.createValue()
val valueCloneFunc = cloneWritables[V](jobConf)
override def getNext() = {
try {
finished = !reader.next(key, value)
@ -169,7 +180,11 @@ class HadoopRDD[K, V](
case eof: EOFException =>
finished = true
}
(key, value)
if (cloneRecords) {
(keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
} else {
(key, value)
}
}
override def close() {

View file

@ -20,11 +20,14 @@ package org.apache.spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.util.Utils.cloneWritables
private[spark]
@ -33,15 +36,31 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
val serializableHadoopSplit = new SerializableWritable(rawSplit)
override def hashCode(): Int = (41 * (41 + rddId) + index)
override def hashCode(): Int = 41 * (41 + rddId) + index
}
class NewHadoopRDD[K, V](
/**
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
* sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
*
* @param sc The SparkContext to associate the RDD with.
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param conf The Hadoop configuration.
* @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
* Most RecordReader implementations reuse wrapper objects across multiple
* records, and can cause problems in RDD collect or aggregation operations.
* By default the records are cloned in Spark. However, application
* programmers can explicitly disable the cloning for better performance.
*/
class NewHadoopRDD[K: ClassTag, V: ClassTag](
sc : SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
@transient conf: Configuration)
@transient conf: Configuration,
cloneRecords: Boolean)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
@ -88,7 +107,8 @@ class NewHadoopRDD[K, V](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close())
val keyCloneFunc = cloneWritables[K](conf)
val valueCloneFunc = cloneWritables[V](conf)
var havePair = false
var finished = false
@ -105,7 +125,13 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
(reader.getCurrentKey, reader.getCurrentValue)
val key = reader.getCurrentKey
val value = reader.getCurrentValue
if (cloneRecords) {
(keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
} else {
(key, value)
}
}
private def close() {

View file

@ -99,8 +99,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
@ -267,8 +265,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
// into a hash table, leading to more objects in the old gen.
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}
@ -339,7 +338,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* existing partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
: RDD[(K, C)] = {
: RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}

View file

@ -17,12 +17,14 @@
package org.apache.spark.storage
import java.util.UUID
/**
* Identifies a particular Block of data, usually associated with a single file.
* A Block can be uniquely identified by its filename, but each type of Block has a different
* set of keys which produce its unique name.
*
* If your BlockId should be serializable, be sure to add it to the BlockId.fromString() method.
* If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
*/
private[spark] sealed abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */
@ -55,7 +57,8 @@ private[spark] case class BroadcastBlockId(broadcastId: Long) extends BlockId {
def name = "broadcast_" + broadcastId
}
private[spark] case class BroadcastHelperBlockId(broadcastId: BroadcastBlockId, hType: String) extends BlockId {
private[spark]
case class BroadcastHelperBlockId(broadcastId: BroadcastBlockId, hType: String) extends BlockId {
def name = broadcastId.name + "_" + hType
}
@ -67,6 +70,11 @@ private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends B
def name = "input-" + streamId + "-" + uniqueId
}
/** Id associated with temporary data managed as blocks. Not serializable. */
private[spark] case class TempBlockId(id: UUID) extends BlockId {
def name = "temp_" + id
}
// Intended only for testing purposes
private[spark] case class TestBlockId(id: String) extends BlockId {
def name = "test_" + id

View file

@ -159,7 +159,7 @@ private[spark] class BlockManager(
/**
* Reregister with the master and report all blocks to it. This will be called by the heart beat
* thread if our heartbeat to the block amnager indicates that we were not registered.
* thread if our heartbeat to the block manager indicates that we were not registered.
*
* Note that this method must be called without any BlockInfo locks held.
*/
@ -864,7 +864,7 @@ private[spark] object BlockManager extends Logging {
val ID_GENERATOR = new IdGenerator
def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.66)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}

View file

@ -181,4 +181,8 @@ class DiskBlockObjectWriter(
// Only valid if called after close()
override def timeWriting() = _timeWriting
def bytesWritten: Long = {
lastValidPosition - initialPosition
}
}

View file

@ -19,7 +19,7 @@ package org.apache.spark.storage
import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Random}
import java.util.{Date, Random, UUID}
import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
@ -90,6 +90,15 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
def getFile(blockId: BlockId): File = getFile(blockId.name)
/** Produces a unique block id and File suitable for intermediate results. */
def createTempBlock(): (TempBlockId, File) = {
var blockId = new TempBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId))
}
private def createLocalDirs(): Array[File] = {
logDebug("Creating local directories at root dirs '" + rootDirs + "'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")

View file

@ -171,7 +171,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq() ++
<h4>Tasks</h4> ++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)

View file

@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
<th>Task Time</th>
<th>Duration</th>
<th>Tasks: Succeeded/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>

View file

@ -26,23 +26,47 @@ import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
import scala.reflect.{classTag, ClassTag}
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import org.apache.hadoop.io._
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging}
import org.apache.spark.{SparkConf, SparkException, Logging}
/**
* Various utility methods used by Spark.
*/
private[spark] object Utils extends Logging {
/**
* We try to clone for most common types of writables and we call WritableUtils.clone otherwise
* intention is to optimize, for example for NullWritable there is no need and for Long, int and
* String creating a new object with value set would be faster.
*/
def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
val cloneFunc = classTag[T] match {
case ClassTag(_: Text) =>
(w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
case ClassTag(_: LongWritable) =>
(w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
case ClassTag(_: IntWritable) =>
(w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
case ClassTag(_: NullWritable) =>
(w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
case _ =>
(w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
}
cloneFunc
}
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()

View file

@ -17,6 +17,8 @@
package org.apache.spark.util
import scala.util.Random
class Vector(val elements: Array[Double]) extends Serializable {
def length = elements.length
@ -124,6 +126,12 @@ object Vector {
def ones(length: Int) = Vector(length, _ => 1)
/**
* Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
* between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided.
*/
def random(length: Int, random: Random = new XORShiftRandom()) = Vector(length, _ => random.nextDouble())
class Multiplier(num: Double) {
def * (vec: Vector) = vec * num
}

View file

@ -15,7 +15,9 @@
* limitations under the License.
*/
package org.apache.spark.util
package org.apache.spark.util.collection
import java.util.{Arrays, Comparator}
/**
* A simple open hash table optimized for the append-only use case, where keys
@ -28,14 +30,15 @@ package org.apache.spark.util
* TODO: Cache the hash values of each key? java.util.HashMap does that.
*/
private[spark]
class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable {
class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
V)] with Serializable {
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
require(initialCapacity >= 1, "Invalid initial capacity")
private var capacity = nextPowerOf2(initialCapacity)
private var mask = capacity - 1
private var curSize = 0
private var growThreshold = LOAD_FACTOR * capacity
private var growThreshold = (LOAD_FACTOR * capacity).toInt
// Holds keys and values in the same array for memory locality; specifically, the order of
// elements is key0, value0, key1, value1, key2, value2, etc.
@ -45,10 +48,15 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
private var haveNullValue = false
private var nullValue: V = null.asInstanceOf[V]
// Triggered by destructiveSortedIterator; the underlying data array may no longer be used
private var destroyed = false
private val destructionMessage = "Map state is invalid from destructive sorting!"
private val LOAD_FACTOR = 0.7
/** Get the value for a given key */
def apply(key: K): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
return nullValue
@ -72,6 +80,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
/** Set the value for a key */
def update(key: K, value: V): Unit = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
@ -106,6 +115,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
* for key, if any, or null otherwise. Returns the newly updated value.
*/
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
@ -139,35 +149,38 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
/** Iterator method from Iterable */
override def iterator: Iterator[(K, V)] = new Iterator[(K, V)] {
var pos = -1
override def iterator: Iterator[(K, V)] = {
assert(!destroyed, destructionMessage)
new Iterator[(K, V)] {
var pos = -1
/** Get the next value we should return from next(), or null if we're finished iterating */
def nextValue(): (K, V) = {
if (pos == -1) { // Treat position -1 as looking at the null value
if (haveNullValue) {
return (null.asInstanceOf[K], nullValue)
/** Get the next value we should return from next(), or null if we're finished iterating */
def nextValue(): (K, V) = {
if (pos == -1) { // Treat position -1 as looking at the null value
if (haveNullValue) {
return (null.asInstanceOf[K], nullValue)
}
pos += 1
}
while (pos < capacity) {
if (!data(2 * pos).eq(null)) {
return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
}
pos += 1
}
null
}
override def hasNext: Boolean = nextValue() != null
override def next(): (K, V) = {
val value = nextValue()
if (value == null) {
throw new NoSuchElementException("End of iterator")
}
pos += 1
value
}
while (pos < capacity) {
if (!data(2 * pos).eq(null)) {
return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
}
pos += 1
}
null
}
override def hasNext: Boolean = nextValue() != null
override def next(): (K, V) = {
val value = nextValue()
if (value == null) {
throw new NoSuchElementException("End of iterator")
}
pos += 1
value
}
}
@ -190,7 +203,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
/** Double the table's size and re-hash everything */
private def growTable() {
protected def growTable() {
val newCapacity = capacity * 2
if (newCapacity >= (1 << 30)) {
// We can't make the table this big because we want an array of 2x
@ -227,11 +240,58 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
data = newData
capacity = newCapacity
mask = newMask
growThreshold = LOAD_FACTOR * newCapacity
growThreshold = (LOAD_FACTOR * newCapacity).toInt
}
private def nextPowerOf2(n: Int): Int = {
val highBit = Integer.highestOneBit(n)
if (highBit == n) n else highBit << 1
}
/**
* Return an iterator of the map in sorted order. This provides a way to sort the map without
* using additional memory, at the expense of destroying the validity of the map.
*/
def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
destroyed = true
// Pack KV pairs into the front of the underlying array
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
newIndex += 1
}
keyIndex += 1
}
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
// Sort by the given ordering
val rawOrdering = new Comparator[AnyRef] {
def compare(x: AnyRef, y: AnyRef): Int = {
cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
}
}
Arrays.sort(data, 0, newIndex, rawOrdering)
new Iterator[(K, V)] {
var i = 0
var nullValueReady = haveNullValue
def hasNext: Boolean = (i < newIndex || nullValueReady)
def next(): (K, V) = {
if (nullValueReady) {
nullValueReady = false
(null.asInstanceOf[K], nullValue)
} else {
val item = data(i).asInstanceOf[(K, V)]
i += 1
item
}
}
}
}
/**
* Return whether the next insert will cause the map to grow
*/
def atGrowThreshold: Boolean = curSize == growThreshold
}

View file

@ -0,0 +1,350 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util.collection
import java.io._
import java.util.Comparator
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
/**
* An append-only map that spills sorted content to disk when there is insufficient space for it
* to grow.
*
* This map takes two passes over the data:
*
* (1) Values are merged into combiners, which are sorted and spilled to disk as necessary
* (2) Combiners are read from disk and merged together
*
* The setting of the spill threshold faces the following trade-off: If the spill threshold is
* too high, the in-memory map may occupy more memory than is available, resulting in OOM.
* However, if the spill threshold is too low, we spill frequently and incur unnecessary disk
* writes. This may lead to a performance regression compared to the normal case of using the
* non-spilling AppendOnlyMap.
*
* Two parameters control the memory threshold:
*
* `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing
* these maps as a fraction of the executor's total memory. Since each concurrently running
* task maintains one map, the actual threshold for each map is this quantity divided by the
* number of running tasks.
*
* `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of
* this threshold, in case map size estimation is not sufficiently accurate.
*/
private[spark] class ExternalAppendOnlyMap[K, V, C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
serializer: Serializer = SparkEnv.get.serializerManager.default,
diskBlockManager: DiskBlockManager = SparkEnv.get.blockManager.diskBlockManager)
extends Iterable[(K, C)] with Serializable with Logging {
import ExternalAppendOnlyMap._
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
private val sparkConf = SparkEnv.get.conf
// Collective memory threshold shared across all running tasks
private val maxMemoryThreshold = {
val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3)
val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}
// Number of pairs in the in-memory map
private var numPairsInMemory = 0
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
private val trackMemoryThreshold = 1000
// How many times we have spilled so far
private var spillCount = 0
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
private val comparator = new KCComparator[K, C]
private val ser = serializer.newInstance()
/**
* Insert the given key and value into the map.
*
* If the underlying map is about to grow, check if the global pool of shuffle memory has
* enough room for this to happen. If so, allocate the memory required to grow the map;
* otherwise, spill the in-memory map to disk.
*
* The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
*/
def insert(key: K, value: V) {
val update: (Boolean, C) => C = (hadVal, oldVal) => {
if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
}
if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
val mapSize = currentMap.estimateSize()
var shouldSpill = false
val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
// Atomically check whether there is sufficient memory in the global pool for
// this map to grow and, if possible, allocate the required amount
shuffleMemoryMap.synchronized {
val threadId = Thread.currentThread().getId
val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
val availableMemory = maxMemoryThreshold -
(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
// Assume map growth factor is 2x
shouldSpill = availableMemory < mapSize * 2
if (!shouldSpill) {
shuffleMemoryMap(threadId) = mapSize * 2
}
}
// Do not synchronize spills
if (shouldSpill) {
spill(mapSize)
}
}
currentMap.changeValue(key, update)
numPairsInMemory += 1
}
/**
* Sort the existing contents of the in-memory map and spill them to a temporary file on disk
*/
private def spill(mapSize: Long) {
spillCount += 1
logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
val writer =
new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites)
try {
val it = currentMap.destructiveSortedIterator(comparator)
while (it.hasNext) {
val kv = it.next()
writer.write(kv)
}
writer.commit()
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
writer.close()
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
spilledMaps.append(new DiskMapIterator(file))
// Reset the amount of shuffle memory used by this map in the global pool
val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap(Thread.currentThread().getId) = 0
}
numPairsInMemory = 0
}
/**
* Return an iterator that merges the in-memory map with the spilled maps.
* If no spill has occurred, simply return the in-memory map's iterator.
*/
override def iterator: Iterator[(K, C)] = {
if (spilledMaps.isEmpty) {
currentMap.iterator
} else {
new ExternalIterator()
}
}
/**
* An iterator that sort-merges (K, C) pairs from the in-memory map and the spilled maps
*/
private class ExternalIterator extends Iterator[(K, C)] {
// A fixed-size queue that maintains a buffer for each stream we are currently merging
val mergeHeap = new mutable.PriorityQueue[StreamBuffer]
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
val sortedMap = currentMap.destructiveSortedIterator(comparator)
val inputStreams = Seq(sortedMap) ++ spilledMaps
inputStreams.foreach { it =>
val kcPairs = getMorePairs(it)
mergeHeap.enqueue(StreamBuffer(it, kcPairs))
}
/**
* Fetch from the given iterator until a key of different hash is retrieved. In the
* event of key hash collisions, this ensures no pairs are hidden from being merged.
* Assume the given iterator is in sorted order.
*/
def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)]
if (it.hasNext) {
var kc = it.next()
kcPairs += kc
val minHash = kc._1.hashCode()
while (it.hasNext && kc._1.hashCode() == minHash) {
kc = it.next()
kcPairs += kc
}
}
kcPairs
}
/**
* If the given buffer contains a value for the given key, merge that value into
* baseCombiner and remove the corresponding (K, C) pair from the buffer
*/
def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
var i = 0
while (i < buffer.pairs.size) {
val (k, c) = buffer.pairs(i)
if (k == key) {
buffer.pairs.remove(i)
return mergeCombiners(baseCombiner, c)
}
i += 1
}
baseCombiner
}
/**
* Return true if there exists an input stream that still has unvisited pairs
*/
override def hasNext: Boolean = mergeHeap.exists(!_.pairs.isEmpty)
/**
* Select a key with the minimum hash, then combine all values with the same key from all input streams.
*/
override def next(): (K, C) = {
// Select a key from the StreamBuffer that holds the lowest key hash
val minBuffer = mergeHeap.dequeue()
val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
if (minPairs.length == 0) {
// Should only happen when no other stream buffers have any pairs left
throw new NoSuchElementException
}
var (minKey, minCombiner) = minPairs.remove(0)
assert(minKey.hashCode() == minHash)
// For all other streams that may have this key (i.e. have the same minimum key hash),
// merge in the corresponding value (if any) from that stream
val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
while (!mergeHeap.isEmpty && mergeHeap.head.minKeyHash == minHash) {
val newBuffer = mergeHeap.dequeue()
minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer)
mergedBuffers += newBuffer
}
// Repopulate each visited stream buffer and add it back to the merge heap
mergedBuffers.foreach { buffer =>
if (buffer.pairs.length == 0) {
buffer.pairs ++= getMorePairs(buffer.iterator)
}
mergeHeap.enqueue(buffer)
}
(minKey, minCombiner)
}
/**
* A buffer for streaming from a map iterator (in-memory or on-disk) sorted by key hash.
* Each buffer maintains the lowest-ordered keys in the corresponding iterator. Due to
* hash collisions, it is possible for multiple keys to be "tied" for being the lowest.
*
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
*/
case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
extends Comparable[StreamBuffer] {
def minKeyHash: Int = {
if (pairs.length > 0){
// pairs are already sorted by key hash
pairs(0)._1.hashCode()
} else {
Int.MaxValue
}
}
override def compareTo(other: StreamBuffer): Int = {
// minus sign because mutable.PriorityQueue dequeues the max, not the min
-minKeyHash.compareTo(other.minKeyHash)
}
}
}
/**
* An iterator that returns (K, C) pairs in sorted order from an on-disk map
*/
private class DiskMapIterator(file: File) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream)
val deserializeStream = ser.deserializeStream(bufferedStream)
var nextItem: (K, C) = null
var eof = false
def readNextItem(): (K, C) = {
if (!eof) {
try {
return deserializeStream.readObject().asInstanceOf[(K, C)]
} catch {
case e: EOFException =>
eof = true
cleanup()
}
}
null
}
override def hasNext: Boolean = {
if (nextItem == null) {
nextItem = readNextItem()
}
nextItem != null
}
override def next(): (K, C) = {
val item = if (nextItem == null) readNextItem() else nextItem
if (item == null) {
throw new NoSuchElementException
}
nextItem = null
item
}
// TODO: Ensure this gets called even if the iterator isn't drained.
def cleanup() {
deserializeStream.close()
file.delete()
}
}
}
private[spark] object ExternalAppendOnlyMap {
private class KCComparator[K, C] extends Comparator[(K, C)] {
def compare(kc1: (K, C), kc2: (K, C)): Int = {
kc1._1.hashCode().compareTo(kc2._1.hashCode())
}
}
}

View file

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util.collection
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.SizeEstimator
import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample
/**
* Append-only map that keeps track of its estimated size in bytes.
* We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
* as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
*/
private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {
/**
* Controls the base of the exponential which governs the rate of sampling.
* E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
*/
private val SAMPLE_GROWTH_RATE = 1.1
/** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
private val samples = new ArrayBuffer[Sample]()
/** Total number of insertions and updates into the map since the last resetSamples(). */
private var numUpdates: Long = _
/** The value of 'numUpdates' at which we will take our next sample. */
private var nextSampleNum: Long = _
/** The average number of bytes per update between our last two samples. */
private var bytesPerUpdate: Double = _
resetSamples()
/** Called after the map grows in size, as this can be a dramatic change for small objects. */
def resetSamples() {
numUpdates = 1
nextSampleNum = 1
samples.clear()
takeSample()
}
override def update(key: K, value: V): Unit = {
super.update(key, value)
numUpdates += 1
if (nextSampleNum == numUpdates) { takeSample() }
}
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
numUpdates += 1
if (nextSampleNum == numUpdates) { takeSample() }
newValue
}
/** Takes a new sample of the current map's size. */
def takeSample() {
samples += Sample(SizeEstimator.estimate(this), numUpdates)
// Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
bytesPerUpdate = math.max(0, samples.toSeq.reverse match {
case latest :: previous :: tail =>
(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
case _ =>
0
})
nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
}
override protected def growTable() {
super.growTable()
resetSamples()
}
/** Estimates the current size of the map in bytes. O(1) time. */
def estimateSize(): Long = {
assert(samples.nonEmpty)
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
(samples.last.size + extrapolatedDelta).toLong
}
}
private object SizeTrackingAppendOnlyMap {
case class Sample(size: Long, numUpdates: Long)
}

View file

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
import scala.util.Random
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass
import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll {
val NORMAL_ERROR = 0.20
val HIGH_ERROR = 0.30
test("fixed size insertions") {
testWith[Int, Long](10000, i => (i, i.toLong))
testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass()))
}
test("variable size insertions") {
val rand = new Random(123456789)
def randString(minLen: Int, maxLen: Int): String = {
"a" * (rand.nextInt(maxLen - minLen) + minLen)
}
testWith[Int, String](10000, i => (i, randString(0, 10)))
testWith[Int, String](10000, i => (i, randString(0, 100)))
testWith[Int, String](10000, i => (i, randString(90, 100)))
}
test("updates") {
val rand = new Random(123456789)
def randString(minLen: Int, maxLen: Int): String = {
"a" * (rand.nextInt(maxLen - minLen) + minLen)
}
testWith[String, Int](10000, i => (randString(0, 10000), i))
}
def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
val map = new SizeTrackingAppendOnlyMap[K, V]()
for (i <- 0 until numElements) {
val (k, v) = makeElement(i)
map(k) = v
expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
}
}
def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
val betterEstimatedSize = SizeEstimator.estimate(obj)
assert(betterEstimatedSize * (1 - error) < estimatedSize,
s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
}
}
object SizeTrackingAppendOnlyMapSuite {
// Speed test, for reproducibility of results.
// These could be highly non-deterministic in general, however.
// Results:
// AppendOnlyMap: 31 ms
// SizeTracker: 54 ms
// SizeEstimator: 1500 ms
def main(args: Array[String]) {
val numElements = 100000
val baseTimes = for (i <- 0 until 10) yield time {
val map = new AppendOnlyMap[Int, LargeDummyClass]()
for (i <- 0 until numElements) {
map(i) = new LargeDummyClass()
}
}
val sampledTimes = for (i <- 0 until 10) yield time {
val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
for (i <- 0 until numElements) {
map(i) = new LargeDummyClass()
map.estimateSize()
}
}
val unsampledTimes = for (i <- 0 until 3) yield time {
val map = new AppendOnlyMap[Int, LargeDummyClass]()
for (i <- 0 until numElements) {
map(i) = new LargeDummyClass()
SizeEstimator.estimate(map)
}
}
println("Base: " + baseTimes)
println("SizeTracker (sampled): " + sampledTimes)
println("SizeEstimator (unsampled): " + unsampledTimes)
}
def time(f: => Unit): Long = {
val start = System.currentTimeMillis()
f
System.currentTimeMillis() - start
}
private class LargeDummyClass {
val arr = new Array[Int](100)
}
}

View file

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
import scala.util.Random
import org.scalatest.FunSuite
/**
* Tests org.apache.spark.util.Vector functionality
*/
class VectorSuite extends FunSuite {
def verifyVector(vector: Vector, expectedLength: Int) = {
assert(vector.length == expectedLength)
assert(vector.elements.min > 0.0)
assert(vector.elements.max < 1.0)
}
test("random with default random number generator") {
val vector100 = Vector.random(100)
verifyVector(vector100, 100)
}
test("random with given random number generator") {
val vector100 = Vector.random(100, new Random(100))
verifyVector(vector100, 100)
}
}

View file

@ -15,11 +15,12 @@
* limitations under the License.
*/
package org.apache.spark.util
package org.apache.spark.util.collection
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
import java.util.Comparator
class AppendOnlyMapSuite extends FunSuite {
test("initialization") {
@ -151,4 +152,47 @@ class AppendOnlyMapSuite extends FunSuite {
assert(map("" + i) === "" + i)
}
}
test("destructive sort") {
val map = new AppendOnlyMap[String, String]()
for (i <- 1 to 100) {
map("" + i) = "" + i
}
map.update(null, "happy new year!")
try {
map.apply("1")
map.update("1", "2013")
map.changeValue("1", (hadValue, oldValue) => "2014")
map.iterator
} catch {
case e: IllegalStateException => fail()
}
val it = map.destructiveSortedIterator(new Comparator[(String, String)] {
def compare(kv1: (String, String), kv2: (String, String)): Int = {
val x = if (kv1 != null && kv1._1 != null) kv1._1.toInt else Int.MinValue
val y = if (kv2 != null && kv2._1 != null) kv2._1.toInt else Int.MinValue
x.compareTo(y)
}
})
// Should be sorted by key
assert(it.hasNext)
var previous = it.next()
assert(previous == (null, "happy new year!"))
previous = it.next()
assert(previous == ("1", "2014"))
while (it.hasNext) {
val kv = it.next()
assert(kv._1.toInt > previous._1.toInt)
previous = kv
}
// All subsequent calls to apply, update, changeValue and iterator should throw exception
intercept[AssertionError] { map.apply("1") }
intercept[AssertionError] { map.update("1", "2013") }
intercept[AssertionError] { map.changeValue("1", (hadValue, oldValue) => "2014") }
intercept[AssertionError] { map.iterator }
}
}

View file

@ -0,0 +1,230 @@
package org.apache.spark.util.collection
import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark._
import org.apache.spark.SparkContext._
class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
override def beforeEach() {
val conf = new SparkConf(false)
conf.set("spark.shuffle.externalSorting", "true")
sc = new SparkContext("local", "test", conf)
}
val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i)
val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => {
buffer += i
}
val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] =
(buf1, buf2) => {
buf1 ++= buf2
}
test("simple insert") {
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
// Single insert
map.insert(1, 10)
var it = map.iterator
assert(it.hasNext)
val kv = it.next()
assert(kv._1 == 1 && kv._2 == ArrayBuffer[Int](10))
assert(!it.hasNext)
// Multiple insert
map.insert(2, 20)
map.insert(3, 30)
it = map.iterator
assert(it.hasNext)
assert(it.toSet == Set[(Int, ArrayBuffer[Int])](
(1, ArrayBuffer[Int](10)),
(2, ArrayBuffer[Int](20)),
(3, ArrayBuffer[Int](30))))
}
test("insert with collision") {
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
map.insert(1, 10)
map.insert(2, 20)
map.insert(3, 30)
map.insert(1, 100)
map.insert(2, 200)
map.insert(1, 1000)
val it = map.iterator
assert(it.hasNext)
val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
assert(result == Set[(Int, Set[Int])](
(1, Set[Int](10, 100, 1000)),
(2, Set[Int](20, 200)),
(3, Set[Int](30))))
}
test("ordering") {
val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
map1.insert(1, 10)
map1.insert(2, 20)
map1.insert(3, 30)
val map2 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
map2.insert(2, 20)
map2.insert(3, 30)
map2.insert(1, 10)
val map3 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
map3.insert(3, 30)
map3.insert(1, 10)
map3.insert(2, 20)
val it1 = map1.iterator
val it2 = map2.iterator
val it3 = map3.iterator
var kv1 = it1.next()
var kv2 = it2.next()
var kv3 = it3.next()
assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
kv1 = it1.next()
kv2 = it2.next()
kv3 = it3.next()
assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
kv1 = it1.next()
kv2 = it2.next()
kv3 = it3.next()
assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
}
test("null keys and values") {
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
map.insert(1, 5)
map.insert(2, 6)
map.insert(3, 7)
assert(map.size === 3)
assert(map.iterator.toSet == Set[(Int, Seq[Int])](
(1, Seq[Int](5)),
(2, Seq[Int](6)),
(3, Seq[Int](7))
))
// Null keys
val nullInt = null.asInstanceOf[Int]
map.insert(nullInt, 8)
assert(map.size === 4)
assert(map.iterator.toSet == Set[(Int, Seq[Int])](
(1, Seq[Int](5)),
(2, Seq[Int](6)),
(3, Seq[Int](7)),
(nullInt, Seq[Int](8))
))
// Null values
map.insert(4, nullInt)
map.insert(nullInt, nullInt)
assert(map.size === 5)
val result = map.iterator.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
assert(result == Set[(Int, Set[Int])](
(1, Set[Int](5)),
(2, Set[Int](6)),
(3, Set[Int](7)),
(4, Set[Int](nullInt)),
(nullInt, Set[Int](nullInt, 8))
))
}
test("simple aggregator") {
// reduceByKey
val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1))
val result1 = rdd.reduceByKey(_+_).collect()
assert(result1.toSet == Set[(Int, Int)]((0, 5), (1, 5)))
// groupByKey
val result2 = rdd.groupByKey().collect()
assert(result2.toSet == Set[(Int, Seq[Int])]
((0, ArrayBuffer[Int](1, 1, 1, 1, 1)), (1, ArrayBuffer[Int](1, 1, 1, 1, 1))))
}
test("simple cogroup") {
val rdd1 = sc.parallelize(1 to 4).map(i => (i, i))
val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i))
val result = rdd1.cogroup(rdd2).collect()
result.foreach { case (i, (seq1, seq2)) =>
i match {
case 0 => assert(seq1.toSet == Set[Int]() && seq2.toSet == Set[Int](2, 4))
case 1 => assert(seq1.toSet == Set[Int](1) && seq2.toSet == Set[Int](1, 3))
case 2 => assert(seq1.toSet == Set[Int](2) && seq2.toSet == Set[Int]())
case 3 => assert(seq1.toSet == Set[Int](3) && seq2.toSet == Set[Int]())
case 4 => assert(seq1.toSet == Set[Int](4) && seq2.toSet == Set[Int]())
}
}
}
test("spilling") {
// TODO: Figure out correct memory parameters to actually induce spilling
// System.setProperty("spark.shuffle.buffer.mb", "1")
// System.setProperty("spark.shuffle.buffer.fraction", "0.05")
// reduceByKey - should spill exactly 6 times
val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i))
val resultA = rddA.reduceByKey(math.max(_, _)).collect()
assert(resultA.length == 5000)
resultA.foreach { case(k, v) =>
k match {
case 0 => assert(v == 1)
case 2500 => assert(v == 5001)
case 4999 => assert(v == 9999)
case _ =>
}
}
// groupByKey - should spill exactly 11 times
val rddB = sc.parallelize(0 until 10000).map(i => (i/4, i))
val resultB = rddB.groupByKey().collect()
assert(resultB.length == 2500)
resultB.foreach { case(i, seq) =>
i match {
case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3))
case 1250 => assert(seq.toSet == Set[Int](5000, 5001, 5002, 5003))
case 2499 => assert(seq.toSet == Set[Int](9996, 9997, 9998, 9999))
case _ =>
}
}
// cogroup - should spill exactly 7 times
val rddC1 = sc.parallelize(0 until 1000).map(i => (i, i))
val rddC2 = sc.parallelize(0 until 1000).map(i => (i%100, i))
val resultC = rddC1.cogroup(rddC2).collect()
assert(resultC.length == 1000)
resultC.foreach { case(i, (seq1, seq2)) =>
i match {
case 0 =>
assert(seq1.toSet == Set[Int](0))
assert(seq2.toSet == Set[Int](0, 100, 200, 300, 400, 500, 600, 700, 800, 900))
case 500 =>
assert(seq1.toSet == Set[Int](500))
assert(seq2.toSet == Set[Int]())
case 999 =>
assert(seq1.toSet == Set[Int](999))
assert(seq2.toSet == Set[Int]())
case _ =>
}
}
}
// TODO: Test memory allocation for multiple concurrently running tasks
}

View file

@ -104,13 +104,24 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td>spark.storage.memoryFraction</td>
<td>0.66</td>
<td>0.6</td>
<td>
Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase
generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase
it if you configure your own old generation size.
</td>
</tr>
<tr>
<td>spark.shuffle.memoryFraction</td>
<td>0.3</td>
<td>
Fraction of Java heap to use for aggregation and cogroups during shuffles, if
<code>spark.shuffle.externalSorting</code> is enabled. At any given time, the collective size of
all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
begin to spill to disk. If spills are often, consider increasing this value at the expense of
<code>spark.storage.memoryFraction</code>.
</td>
</tr>
<tr>
<td>spark.mesos.coarse</td>
<td>false</td>
@ -376,6 +387,14 @@ Apart from these, the following properties are also available, and may be useful
If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations.
</td>
</tr>
<tr>
<td>spark.shuffle.externalSorting</td>
<td>true</td>
<td>
If set to "true", limits the amount of memory used during reduces by spilling data out to disk. This spilling
threshold is specified by <code>spark.shuffle.memoryFraction</code>.
</td>
</tr>
<tr>
<td>spark.speculation</td>
<td>false</td>

View file

@ -185,7 +185,11 @@ def get_spark_ami(opts):
"hi1.4xlarge": "hvm",
"m3.xlarge": "hvm",
"m3.2xlarge": "hvm",
"cr1.8xlarge": "hvm"
"cr1.8xlarge": "hvm",
"i2.xlarge": "hvm",
"i2.2xlarge": "hvm",
"i2.4xlarge": "hvm",
"i2.8xlarge": "hvm"
}
if opts.instance_type in instance_types:
instance_type = instance_types[opts.instance_type]
@ -478,7 +482,11 @@ def get_num_disks(instance_type):
"cr1.8xlarge": 2,
"hi1.4xlarge": 2,
"m3.xlarge": 0,
"m3.2xlarge": 0
"m3.2xlarge": 0,
"i2.xlarge": 1,
"i2.2xlarge": 2,
"i2.4xlarge": 4,
"i2.8xlarge": 8
}
if instance_type in disks_by_instance:
return disks_by_instance[instance_type]

View file

@ -45,9 +45,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sksamuel.kafka</groupId>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.8.0-beta1</version>
<version>0.8.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>

View file

@ -80,7 +80,7 @@ class MQTTReceiver(brokerUrl: String,
var peristance: MqttClientPersistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
// Connect to MqttBroker
client.connect()

View file

@ -22,7 +22,7 @@ import scala.util.Random
import scala.util.Sorting
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext}
import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoRegistrator
@ -578,12 +578,13 @@ object ALS {
val implicitPrefs = if (args.length >= 7) args(6).toBoolean else false
val alpha = if (args.length >= 8) args(7).toDouble else 1
val blocks = if (args.length == 9) args(8).toInt else -1
val sc = new SparkContext(master, "ALS")
sc.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc.conf.set("spark.kryo.registrator", classOf[ALSRegistrator].getName)
sc.conf.set("spark.kryo.referenceTracking", "false")
sc.conf.set("spark.kryoserializer.buffer.mb", "8")
sc.conf.set("spark.locality.wait", "10000")
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", classOf[ALSRegistrator].getName)
.set("spark.kryo.referenceTracking", "false")
.set("spark.kryoserializer.buffer.mb", "8")
.set("spark.locality.wait", "10000")
val sc = new SparkContext(master, "ALS", conf)
val ratings = sc.textFile(ratingsFile).map { line =>
val fields = line.split(',')

View file

@ -90,7 +90,7 @@ object SparkBuild extends Build {
lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](if (isNewHadoop) yarn else yarnAlpha) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](if (isNewHadoop) yarn else yarnAlpha) else Seq[ProjectReference]()
lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings)
lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings)
.dependsOn(streaming % "compile->compile;test->test")
lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings)
@ -98,23 +98,23 @@ object SparkBuild extends Build {
lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings)
.dependsOn(streaming % "compile->compile;test->test")
lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings)
.dependsOn(streaming % "compile->compile;test->test")
lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings)
.dependsOn(streaming % "compile->compile;test->test")
lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
.dependsOn(core, mllib, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
// Everything except assembly, tools and examples belong to packageProjects
lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj)
lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj)
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.apache.spark",
@ -321,7 +321,7 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
libraryDependencies ++= Seq(
"commons-io" % "commons-io" % "2.4"
"commons-io" % "commons-io" % "2.4"
)
)
@ -388,19 +388,19 @@ object SparkBuild extends Build {
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty)
)
)
def kafkaSettings() = sharedSettings ++ Seq(
name := "spark-streaming-kafka",
libraryDependencies ++= Seq(
"com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
"com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
"org.apache.kafka" %% "kafka" % "0.8.0"
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("net.sf.jopt-simple", "jopt-simple")
excludeAll(excludeNetty)
)
)
def flumeSettings() = sharedSettings ++ Seq(
name := "spark-streaming-flume",
libraryDependencies ++= Seq(

View file

@ -28,6 +28,7 @@ import java.util.*;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.collect.Sets;
import org.apache.spark.SparkConf;
import org.apache.spark.HashPartitioner;
@ -441,13 +442,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<String, String>("new york", "islanders")));
List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
Arrays.asList(
List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
Sets.newHashSet(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("dodgers", "giants")),
new Tuple2<String, Tuple2<String, String>>("new york",
new Tuple2<String, String>("yankees", "mets"))),
Arrays.asList(
new Tuple2<String, String>("yankees", "mets"))),
Sets.newHashSet(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("sharks", "ducks")),
new Tuple2<String, Tuple2<String, String>>("new york",
@ -482,8 +483,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaTestUtils.attachTestOutputStream(joined);
List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
for (List<Tuple2<String, Tuple2<String, String>>> res: result) {
unorderedResult.add(Sets.newHashSet(res));
}
Assert.assertEquals(expected, result);
Assert.assertEquals(expected, unorderedResult);
}
@ -1196,15 +1201,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("hello", "moon"),
Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
Arrays.asList(
List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList(
Sets.newHashSet(
new Tuple2<String, Long>("hello", 1L),
new Tuple2<String, Long>("world", 1L)),
Arrays.asList(
Sets.newHashSet(
new Tuple2<String, Long>("hello", 2L),
new Tuple2<String, Long>("world", 1L),
new Tuple2<String, Long>("moon", 1L)),
Arrays.asList(
Sets.newHashSet(
new Tuple2<String, Long>("hello", 2L),
new Tuple2<String, Long>("moon", 1L)));
@ -1214,8 +1219,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
List<HashSet<Tuple2<String, Long>>> unorderedResult = Lists.newArrayList();
for (List<Tuple2<String, Long>> res: result) {
unorderedResult.add(Sets.newHashSet(res));
}
Assert.assertEquals(expected, result);
Assert.assertEquals(expected, unorderedResult);
}
@Test